diff --git a/bindings/fileTransfer.go b/bindings/fileTransfer.go index 88cda1ebc444dc719860b6adb1b0ddee64f02dde..dd3f01e92fb0004d0a73a0c7139e9a95a130e97d 100644 --- a/bindings/fileTransfer.go +++ b/bindings/fileTransfer.go @@ -9,6 +9,7 @@ package bindings import ( "encoding/json" + jww "github.com/spf13/jwalterweatherman" "time" "gitlab.com/elixxir/client/fileTransfer" @@ -127,7 +128,7 @@ type FileTransferReceiveProgressCallback interface { // - paramsJSON - JSON marshalled fileTransfer.Params func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback, e2eFileTransferParamsJson, fileTransferParamsJson []byte) (*FileTransfer, error) { - + jww.INFO.Printf("Calling InitFileTransfer()") // Get user from singleton user, err := e2eTrackerSingleton.get(e2eID) if err != nil { diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index 549a9a799dc9d6f75c415a71757954a36cc0d0f7..3963a3ca9e7e0c5a2a61c529c2c855d6f148b192 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -215,21 +215,21 @@ func NewManager(params Params, user FtE2e) (FileTransfer, error) { // StartProcesses starts the sending threads. Adheres to the xxdk.Service type. func (m *manager) StartProcesses() (stoppable.Stoppable, error) { // Construct stoppables + multiStoppable := stoppable.NewMulti(fileTransferStoppable) senderPoolStop := stoppable.NewMulti(workerPoolStoppable) batchBuilderStop := stoppable.NewSingle(batchBuilderThreadStoppable) // Start sending threads + go m.batchBuilderThread(batchBuilderStop) + // Note that the startSendingWorkerPool already creates thread for every // worker. As a result, there is no need to run it asynchronously. In fact, // running this asynchronously could result in a race condition where // some worker threads are not added to senderPoolStop before that stoppable // is added to the multiStoppable. m.startSendingWorkerPool(senderPoolStop) - go m.batchBuilderThread(batchBuilderStop) // Create a multi stoppable - multiStoppable := stoppable.NewMulti(fileTransferStoppable) - multiStoppable.Add(senderPoolStop) multiStoppable.Add(batchBuilderStop) return multiStoppable, nil diff --git a/fileTransfer/send.go b/fileTransfer/send.go index c266a35c8f5d8f9882b8b07a7b308566c4c6c69d..fd736280c8f55e64b927002c5aeec42120784cd5 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -74,16 +74,44 @@ func (m *manager) sendingThread(stop *stoppable.Single) { healthChanID := m.cmix.AddHealthCallback(func(b bool) { healthChan <- b }) for { select { + // A quit signal has been sent by the user. Typically, this is a result + // of a user-level shutdown of the client. case <-stop.Quit(): jww.DEBUG.Printf("[FT] Stopping file part sending thread (%s): "+ "stoppable triggered.", stop.Name()) m.cmix.RemoveHealthCallback(healthChanID) stop.ToStopped() return + + // If the network becomes unhealthy, we will cease sending files until + // it is resolved. case healthy := <-healthChan: + // There exists an edge case where an unhealthy signal is received + // due to a user-level shutdown, meaning the health tracker has + // ceased operation. If the health tracker is shutdown, a healthy + // signal will never be received, and this for loop will run + // infinitely. If we are caught in this loop, the stop's Quit() + // signal will never be received in case statement above, and this + // sender thread will run indefinitely. To avoid lingering threads + // in the case of a shutdown, we must actively listen for either the + // Quit() signal or a network health update here. for !healthy { - healthy = <-healthChan + select { + case <-stop.Quit(): + // Listen for a quit signal if the network becomes unhealthy + // before a user-level shutdown. + jww.DEBUG.Printf("[FT] Stopping file part sending "+ + "thread (%s): stoppable triggered.", stop.Name()) + m.cmix.RemoveHealthCallback(healthChanID) + stop.ToStopped() + return + + // Wait for a healthy signal before continuing to send files. + case healthy = <-healthChan: + } } + // A file part has been sent through the queue and must be sent by + // this thread. case packet := <-m.sendQueue: m.sendCmix(packet) } diff --git a/stoppable/multi.go b/stoppable/multi.go index 3ac77c7507fb058af1deac2b098121b16d3569b0..456e376a7b71851f254f786ae8a13d2e086ee60f 100644 --- a/stoppable/multi.go +++ b/stoppable/multi.go @@ -121,15 +121,15 @@ func (m *Multi) Close() error { m.once.Do(func() { var wg sync.WaitGroup - jww.TRACE.Printf("Sending on quit channel to multi stoppable %q.", - m.Name()) + jww.TRACE.Printf("Sending on quit channel to multi stoppable %q with subprocesseses %v.", + m.Name(), m.GetRunningProcesses()) m.mux.Lock() // Attempt to stop each stoppable in its own goroutine for _, stoppable := range m.stoppables { wg.Add(1) - go func(stoppable Stoppable) { - if stoppable.Close() != nil { + go func(s Stoppable) { + if s.Close() != nil { atomic.AddUint32(&numErrors, 1) } wg.Done() diff --git a/stoppable/single.go b/stoppable/single.go index ea11ab42b586c3c1543509b5f575cda75f0cfca5..5af656b67f92e8e1e6dfb8404a37b73a698e5d3b 100644 --- a/stoppable/single.go +++ b/stoppable/single.go @@ -110,6 +110,10 @@ func (s *Single) Close() error { // Send on quit channel s.quit <- struct{}{} + + jww.TRACE.Printf("Sent to quit channel for single stoppable %q.", + s.Name()) + }) if err != nil {