diff --git a/bindings/fileTransfer.go b/bindings/fileTransfer.go index c9ae897f33e518e9de9c2c7cdd6046ae342bd516..dd3f01e92fb0004d0a73a0c7139e9a95a130e97d 100644 --- a/bindings/fileTransfer.go +++ b/bindings/fileTransfer.go @@ -166,7 +166,6 @@ func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback, } // Add file transfer processes to API services tracking - jww.INFO.Printf("FT PROCESS IS STARTING HERE AT InitFileTransfer") err = user.api.AddService(m.StartProcesses) if err != nil { return nil, err diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index 6c6aa0575f8fb3bac4b832008a8bdda31465bfb7..3963a3ca9e7e0c5a2a61c529c2c855d6f148b192 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -216,6 +216,7 @@ func NewManager(params Params, user FtE2e) (FileTransfer, error) { func (m *manager) StartProcesses() (stoppable.Stoppable, error) { // Construct stoppables multiStoppable := stoppable.NewMulti(fileTransferStoppable) + senderPoolStop := stoppable.NewMulti(workerPoolStoppable) batchBuilderStop := stoppable.NewSingle(batchBuilderThreadStoppable) // Start sending threads @@ -226,16 +227,11 @@ func (m *manager) StartProcesses() (stoppable.Stoppable, error) { // running this asynchronously could result in a race condition where // some worker threads are not added to senderPoolStop before that stoppable // is added to the multiStoppable. - m.startSendingWorkerPool(multiStoppable) - jww.INFO.Printf("STOPPING FT THREAD DEBUG: \nmultiStoppable running proc: %v\nmultistop: %v\n", - multiStoppable, multiStoppable.GetRunningProcesses()) + m.startSendingWorkerPool(senderPoolStop) // Create a multi stoppable multiStoppable.Add(batchBuilderStop) - jww.INFO.Printf("STOPPING FT THREAD DEBUG: \nmultiStoppable running proc: %v\nmultistop: %v\n", - multiStoppable.GetRunningProcesses(), multiStoppable) - return multiStoppable, nil } diff --git a/fileTransfer/send.go b/fileTransfer/send.go index af0ef713312f22e7b7f2f2703207c245f0a4dda1..88a61173baaebe7c1857cb153fb5c9df305816b2 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -62,10 +62,7 @@ func (m *manager) startSendingWorkerPool(multiStop *stoppable.Multi) { for i := 0; i < workerPoolThreads; i++ { stop := stoppable.NewSingle(sendThreadStoppableName + strconv.Itoa(i)) - go func(single *stoppable.Single) { - m.sendingThread(single) - }(stop) - jww.INFO.Printf("Adding stoppable %s", stop.Name()) + m.sendingThread(stop) multiStop.Add(stop) } @@ -85,22 +82,25 @@ func (m *manager) sendingThread(stop *stoppable.Single) { return case healthy := <-healthChan: for !healthy { - jww.INFO.Printf("not healthy, waiting for health update") select { - // Wait for health update or a quit signal case <-stop.Quit(): - jww.DEBUG.Printf("[FT] Stopping file part sending thread (%s): "+ - "stoppable triggered.", stop.Name()) + // It's possible during shutdown that the health tracker gets + // shutdown before the quit signal is received by the case + // statement listening for stop.Quit() above. As a result, we + // must listen for the quit signal here, to avoid waiting + // for a healthy signal that will never come. + jww.DEBUG.Printf("[FT] Stopping file part sending "+ + "thread (%s): stoppable triggered.", stop.Name()) m.cmix.RemoveHealthCallback(healthChanID) stop.ToStopped() return - case healthy = <-healthChan: - jww.INFO.Printf("received health update, it is now set to %s", healthy) + // If a quit signal is not received, we must wait until the + // network is healthy before we can continue sending files. + case healthy = <-healthChan: } } case packet := <-m.sendQueue: - jww.INFO.Printf("sending pack") m.sendCmix(packet) } } diff --git a/stoppable/multi.go b/stoppable/multi.go index 270f28a17209700bebb0e7ba77460e03a7755997..95c7ecdb431fb158f4f77fe98673e431a5c388f8 100644 --- a/stoppable/multi.go +++ b/stoppable/multi.go @@ -66,7 +66,7 @@ func (m *Multi) GetStatus() Status { status := s.GetStatus() if status < lowestStatus { lowestStatus = status - jww.INFO.Printf("Stoppable %s has status %s", + jww.TRACE.Printf("Stoppable %s has status %s", s.Name(), status.String()) } } @@ -121,7 +121,7 @@ func (m *Multi) Close() error { m.once.Do(func() { var wg sync.WaitGroup - jww.INFO.Printf("Sending on quit channel to multi stoppable %q with processes: %v.", + jww.TRACE.Printf("Sending on quit channel to multi stoppable %q with subprocesses %v.", m.Name(), m.GetRunningProcesses()) m.mux.Lock() @@ -129,7 +129,6 @@ func (m *Multi) Close() error { for _, stoppable := range m.stoppables { wg.Add(1) go func(s Stoppable) { - jww.INFO.Printf("FT DEBUG: stopping %s", s.Name()) if s.Close() != nil { atomic.AddUint32(&numErrors, 1) } diff --git a/stoppable/single.go b/stoppable/single.go index b2313a3fc1f813aae9284b3a385011db1408d9cb..5af656b67f92e8e1e6dfb8404a37b73a698e5d3b 100644 --- a/stoppable/single.go +++ b/stoppable/single.go @@ -68,7 +68,7 @@ func (s *Single) toStopping() error { return errors.Errorf(toStoppingErr, s.Name(), s.GetStatus(), Running) } - jww.INFO.Printf("Switched status of single stoppable %q from %s to %s.", + jww.TRACE.Printf("Switched status of single stoppable %q from %s to %s.", s.Name(), Running, Stopping) return nil @@ -83,14 +83,13 @@ func (s *Single) ToStopped() { s.Name(), s.GetStatus(), Stopping) } - jww.INFO.Printf("Switched status of single stoppable %q from %s to %s.", + jww.TRACE.Printf("Switched status of single stoppable %q from %s to %s.", s.Name(), Stopping, Stopped) } // Quit returns a receive-only channel that will be triggered when the Stoppable // quits. func (s *Single) Quit() <-chan struct{} { - jww.INFO.Printf("Quit for %s", s.name) return s.quit } @@ -106,13 +105,13 @@ func (s *Single) Close() error { return } - jww.INFO.Printf("Sending on quit channel to single stoppable %q.", + jww.TRACE.Printf("Sending on quit channel to single stoppable %q.", s.Name()) // Send on quit channel s.quit <- struct{}{} - jww.INFO.Printf("Sent to quit channel for single stoppable %q.", + jww.TRACE.Printf("Sent to quit channel for single stoppable %q.", s.Name()) }) diff --git a/xxdk/services.go b/xxdk/services.go index 3642fdd8e25d2f6d44cadf3a17aeee994aea03d4..840215dc1d3998ebc301766b6232ebef10384d39 100644 --- a/xxdk/services.go +++ b/xxdk/services.go @@ -9,7 +9,6 @@ package xxdk import ( "github.com/pkg/errors" - jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/stoppable" "sync" "time" @@ -52,8 +51,6 @@ func (s *services) add(sp Service) error { return errors.WithMessage(err, "Failed to start added service") } s.stoppable.Add(stop) - jww.INFO.Printf("STOPPING FT THREAD DEBUG: adding service %v", stop.Name()) - } return nil }