From e276e1fa2143394af3acf73ea35d659dc9eb5487 Mon Sep 17 00:00:00 2001 From: joshemb <josh@elixxir.io> Date: Thu, 29 Sep 2022 10:16:39 -0700 Subject: [PATCH] Clean up MR --- bindings/fileTransfer.go | 1 - fileTransfer/manager.go | 8 ++------ fileTransfer/send.go | 22 +++++++++++----------- stoppable/multi.go | 5 ++--- stoppable/single.go | 9 ++++----- xxdk/services.go | 3 --- 6 files changed, 19 insertions(+), 29 deletions(-) diff --git a/bindings/fileTransfer.go b/bindings/fileTransfer.go index c9ae897f3..dd3f01e92 100644 --- a/bindings/fileTransfer.go +++ b/bindings/fileTransfer.go @@ -166,7 +166,6 @@ func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback, } // Add file transfer processes to API services tracking - jww.INFO.Printf("FT PROCESS IS STARTING HERE AT InitFileTransfer") err = user.api.AddService(m.StartProcesses) if err != nil { return nil, err diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index 6c6aa0575..3963a3ca9 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -216,6 +216,7 @@ func NewManager(params Params, user FtE2e) (FileTransfer, error) { func (m *manager) StartProcesses() (stoppable.Stoppable, error) { // Construct stoppables multiStoppable := stoppable.NewMulti(fileTransferStoppable) + senderPoolStop := stoppable.NewMulti(workerPoolStoppable) batchBuilderStop := stoppable.NewSingle(batchBuilderThreadStoppable) // Start sending threads @@ -226,16 +227,11 @@ func (m *manager) StartProcesses() (stoppable.Stoppable, error) { // running this asynchronously could result in a race condition where // some worker threads are not added to senderPoolStop before that stoppable // is added to the multiStoppable. - m.startSendingWorkerPool(multiStoppable) - jww.INFO.Printf("STOPPING FT THREAD DEBUG: \nmultiStoppable running proc: %v\nmultistop: %v\n", - multiStoppable, multiStoppable.GetRunningProcesses()) + m.startSendingWorkerPool(senderPoolStop) // Create a multi stoppable multiStoppable.Add(batchBuilderStop) - jww.INFO.Printf("STOPPING FT THREAD DEBUG: \nmultiStoppable running proc: %v\nmultistop: %v\n", - multiStoppable.GetRunningProcesses(), multiStoppable) - return multiStoppable, nil } diff --git a/fileTransfer/send.go b/fileTransfer/send.go index af0ef7133..88a61173b 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -62,10 +62,7 @@ func (m *manager) startSendingWorkerPool(multiStop *stoppable.Multi) { for i := 0; i < workerPoolThreads; i++ { stop := stoppable.NewSingle(sendThreadStoppableName + strconv.Itoa(i)) - go func(single *stoppable.Single) { - m.sendingThread(single) - }(stop) - jww.INFO.Printf("Adding stoppable %s", stop.Name()) + m.sendingThread(stop) multiStop.Add(stop) } @@ -85,22 +82,25 @@ func (m *manager) sendingThread(stop *stoppable.Single) { return case healthy := <-healthChan: for !healthy { - jww.INFO.Printf("not healthy, waiting for health update") select { - // Wait for health update or a quit signal case <-stop.Quit(): - jww.DEBUG.Printf("[FT] Stopping file part sending thread (%s): "+ - "stoppable triggered.", stop.Name()) + // It's possible during shutdown that the health tracker gets + // shutdown before the quit signal is received by the case + // statement listening for stop.Quit() above. As a result, we + // must listen for the quit signal here, to avoid waiting + // for a healthy signal that will never come. + jww.DEBUG.Printf("[FT] Stopping file part sending "+ + "thread (%s): stoppable triggered.", stop.Name()) m.cmix.RemoveHealthCallback(healthChanID) stop.ToStopped() return - case healthy = <-healthChan: - jww.INFO.Printf("received health update, it is now set to %s", healthy) + // If a quit signal is not received, we must wait until the + // network is healthy before we can continue sending files. + case healthy = <-healthChan: } } case packet := <-m.sendQueue: - jww.INFO.Printf("sending pack") m.sendCmix(packet) } } diff --git a/stoppable/multi.go b/stoppable/multi.go index 270f28a17..95c7ecdb4 100644 --- a/stoppable/multi.go +++ b/stoppable/multi.go @@ -66,7 +66,7 @@ func (m *Multi) GetStatus() Status { status := s.GetStatus() if status < lowestStatus { lowestStatus = status - jww.INFO.Printf("Stoppable %s has status %s", + jww.TRACE.Printf("Stoppable %s has status %s", s.Name(), status.String()) } } @@ -121,7 +121,7 @@ func (m *Multi) Close() error { m.once.Do(func() { var wg sync.WaitGroup - jww.INFO.Printf("Sending on quit channel to multi stoppable %q with processes: %v.", + jww.TRACE.Printf("Sending on quit channel to multi stoppable %q with subprocesses %v.", m.Name(), m.GetRunningProcesses()) m.mux.Lock() @@ -129,7 +129,6 @@ func (m *Multi) Close() error { for _, stoppable := range m.stoppables { wg.Add(1) go func(s Stoppable) { - jww.INFO.Printf("FT DEBUG: stopping %s", s.Name()) if s.Close() != nil { atomic.AddUint32(&numErrors, 1) } diff --git a/stoppable/single.go b/stoppable/single.go index b2313a3fc..5af656b67 100644 --- a/stoppable/single.go +++ b/stoppable/single.go @@ -68,7 +68,7 @@ func (s *Single) toStopping() error { return errors.Errorf(toStoppingErr, s.Name(), s.GetStatus(), Running) } - jww.INFO.Printf("Switched status of single stoppable %q from %s to %s.", + jww.TRACE.Printf("Switched status of single stoppable %q from %s to %s.", s.Name(), Running, Stopping) return nil @@ -83,14 +83,13 @@ func (s *Single) ToStopped() { s.Name(), s.GetStatus(), Stopping) } - jww.INFO.Printf("Switched status of single stoppable %q from %s to %s.", + jww.TRACE.Printf("Switched status of single stoppable %q from %s to %s.", s.Name(), Stopping, Stopped) } // Quit returns a receive-only channel that will be triggered when the Stoppable // quits. func (s *Single) Quit() <-chan struct{} { - jww.INFO.Printf("Quit for %s", s.name) return s.quit } @@ -106,13 +105,13 @@ func (s *Single) Close() error { return } - jww.INFO.Printf("Sending on quit channel to single stoppable %q.", + jww.TRACE.Printf("Sending on quit channel to single stoppable %q.", s.Name()) // Send on quit channel s.quit <- struct{}{} - jww.INFO.Printf("Sent to quit channel for single stoppable %q.", + jww.TRACE.Printf("Sent to quit channel for single stoppable %q.", s.Name()) }) diff --git a/xxdk/services.go b/xxdk/services.go index 3642fdd8e..840215dc1 100644 --- a/xxdk/services.go +++ b/xxdk/services.go @@ -9,7 +9,6 @@ package xxdk import ( "github.com/pkg/errors" - jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/stoppable" "sync" "time" @@ -52,8 +51,6 @@ func (s *services) add(sp Service) error { return errors.WithMessage(err, "Failed to start added service") } s.stoppable.Add(stop) - jww.INFO.Printf("STOPPING FT THREAD DEBUG: adding service %v", stop.Name()) - } return nil } -- GitLab