Skip to content
Snippets Groups Projects
Commit 531f3dc6 authored by Josh Brooks's avatar Josh Brooks
Browse files

Merge branch 'release' of git.xx.network:elixxir/client into XX-4223/BackupMultiLookup

parents fd0ce8b5 66798228
No related branches found
No related tags found
1 merge request!399Add initial debug prints
......@@ -9,6 +9,7 @@ package bindings
import (
"encoding/json"
jww "github.com/spf13/jwalterweatherman"
"time"
"gitlab.com/elixxir/client/fileTransfer"
......@@ -127,7 +128,7 @@ type FileTransferReceiveProgressCallback interface {
// - paramsJSON - JSON marshalled fileTransfer.Params
func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback,
e2eFileTransferParamsJson, fileTransferParamsJson []byte) (*FileTransfer, error) {
jww.INFO.Printf("Calling InitFileTransfer()")
// Get user from singleton
user, err := e2eTrackerSingleton.get(e2eID)
if err != nil {
......
......@@ -215,21 +215,21 @@ func NewManager(params Params, user FtE2e) (FileTransfer, error) {
// StartProcesses starts the sending threads. Adheres to the xxdk.Service type.
func (m *manager) StartProcesses() (stoppable.Stoppable, error) {
// Construct stoppables
multiStoppable := stoppable.NewMulti(fileTransferStoppable)
senderPoolStop := stoppable.NewMulti(workerPoolStoppable)
batchBuilderStop := stoppable.NewSingle(batchBuilderThreadStoppable)
// Start sending threads
go m.batchBuilderThread(batchBuilderStop)
// Note that the startSendingWorkerPool already creates thread for every
// worker. As a result, there is no need to run it asynchronously. In fact,
// running this asynchronously could result in a race condition where
// some worker threads are not added to senderPoolStop before that stoppable
// is added to the multiStoppable.
m.startSendingWorkerPool(senderPoolStop)
go m.batchBuilderThread(batchBuilderStop)
// Create a multi stoppable
multiStoppable := stoppable.NewMulti(fileTransferStoppable)
multiStoppable.Add(senderPoolStop)
multiStoppable.Add(batchBuilderStop)
return multiStoppable, nil
......
......@@ -74,16 +74,44 @@ func (m *manager) sendingThread(stop *stoppable.Single) {
healthChanID := m.cmix.AddHealthCallback(func(b bool) { healthChan <- b })
for {
select {
// A quit signal has been sent by the user. Typically, this is a result
// of a user-level shutdown of the client.
case <-stop.Quit():
jww.DEBUG.Printf("[FT] Stopping file part sending thread (%s): "+
"stoppable triggered.", stop.Name())
m.cmix.RemoveHealthCallback(healthChanID)
stop.ToStopped()
return
// If the network becomes unhealthy, we will cease sending files until
// it is resolved.
case healthy := <-healthChan:
// There exists an edge case where an unhealthy signal is received
// due to a user-level shutdown, meaning the health tracker has
// ceased operation. If the health tracker is shutdown, a healthy
// signal will never be received, and this for loop will run
// infinitely. If we are caught in this loop, the stop's Quit()
// signal will never be received in case statement above, and this
// sender thread will run indefinitely. To avoid lingering threads
// in the case of a shutdown, we must actively listen for either the
// Quit() signal or a network health update here.
for !healthy {
healthy = <-healthChan
select {
case <-stop.Quit():
// Listen for a quit signal if the network becomes unhealthy
// before a user-level shutdown.
jww.DEBUG.Printf("[FT] Stopping file part sending "+
"thread (%s): stoppable triggered.", stop.Name())
m.cmix.RemoveHealthCallback(healthChanID)
stop.ToStopped()
return
// Wait for a healthy signal before continuing to send files.
case healthy = <-healthChan:
}
}
// A file part has been sent through the queue and must be sent by
// this thread.
case packet := <-m.sendQueue:
m.sendCmix(packet)
}
......
......@@ -121,15 +121,15 @@ func (m *Multi) Close() error {
m.once.Do(func() {
var wg sync.WaitGroup
jww.TRACE.Printf("Sending on quit channel to multi stoppable %q.",
m.Name())
jww.TRACE.Printf("Sending on quit channel to multi stoppable %q with subprocesseses %v.",
m.Name(), m.GetRunningProcesses())
m.mux.Lock()
// Attempt to stop each stoppable in its own goroutine
for _, stoppable := range m.stoppables {
wg.Add(1)
go func(stoppable Stoppable) {
if stoppable.Close() != nil {
go func(s Stoppable) {
if s.Close() != nil {
atomic.AddUint32(&numErrors, 1)
}
wg.Done()
......
......@@ -110,6 +110,10 @@ func (s *Single) Close() error {
// Send on quit channel
s.quit <- struct{}{}
jww.TRACE.Printf("Sent to quit channel for single stoppable %q.",
s.Name())
})
if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment