Skip to content
Snippets Groups Projects
Commit e276e1fa authored by Josh Brooks's avatar Josh Brooks
Browse files

Clean up MR

parent 18fc2bd4
No related branches found
No related tags found
2 merge requests!510Release,!398Add debug log for file transfer not quitting
...@@ -166,7 +166,6 @@ func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback, ...@@ -166,7 +166,6 @@ func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback,
} }
// Add file transfer processes to API services tracking // Add file transfer processes to API services tracking
jww.INFO.Printf("FT PROCESS IS STARTING HERE AT InitFileTransfer")
err = user.api.AddService(m.StartProcesses) err = user.api.AddService(m.StartProcesses)
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -216,6 +216,7 @@ func NewManager(params Params, user FtE2e) (FileTransfer, error) { ...@@ -216,6 +216,7 @@ func NewManager(params Params, user FtE2e) (FileTransfer, error) {
func (m *manager) StartProcesses() (stoppable.Stoppable, error) { func (m *manager) StartProcesses() (stoppable.Stoppable, error) {
// Construct stoppables // Construct stoppables
multiStoppable := stoppable.NewMulti(fileTransferStoppable) multiStoppable := stoppable.NewMulti(fileTransferStoppable)
senderPoolStop := stoppable.NewMulti(workerPoolStoppable)
batchBuilderStop := stoppable.NewSingle(batchBuilderThreadStoppable) batchBuilderStop := stoppable.NewSingle(batchBuilderThreadStoppable)
// Start sending threads // Start sending threads
...@@ -226,16 +227,11 @@ func (m *manager) StartProcesses() (stoppable.Stoppable, error) { ...@@ -226,16 +227,11 @@ func (m *manager) StartProcesses() (stoppable.Stoppable, error) {
// running this asynchronously could result in a race condition where // running this asynchronously could result in a race condition where
// some worker threads are not added to senderPoolStop before that stoppable // some worker threads are not added to senderPoolStop before that stoppable
// is added to the multiStoppable. // is added to the multiStoppable.
m.startSendingWorkerPool(multiStoppable) m.startSendingWorkerPool(senderPoolStop)
jww.INFO.Printf("STOPPING FT THREAD DEBUG: \nmultiStoppable running proc: %v\nmultistop: %v\n",
multiStoppable, multiStoppable.GetRunningProcesses())
// Create a multi stoppable // Create a multi stoppable
multiStoppable.Add(batchBuilderStop) multiStoppable.Add(batchBuilderStop)
jww.INFO.Printf("STOPPING FT THREAD DEBUG: \nmultiStoppable running proc: %v\nmultistop: %v\n",
multiStoppable.GetRunningProcesses(), multiStoppable)
return multiStoppable, nil return multiStoppable, nil
} }
......
...@@ -62,10 +62,7 @@ func (m *manager) startSendingWorkerPool(multiStop *stoppable.Multi) { ...@@ -62,10 +62,7 @@ func (m *manager) startSendingWorkerPool(multiStop *stoppable.Multi) {
for i := 0; i < workerPoolThreads; i++ { for i := 0; i < workerPoolThreads; i++ {
stop := stoppable.NewSingle(sendThreadStoppableName + strconv.Itoa(i)) stop := stoppable.NewSingle(sendThreadStoppableName + strconv.Itoa(i))
go func(single *stoppable.Single) { m.sendingThread(stop)
m.sendingThread(single)
}(stop)
jww.INFO.Printf("Adding stoppable %s", stop.Name())
multiStop.Add(stop) multiStop.Add(stop)
} }
...@@ -85,22 +82,25 @@ func (m *manager) sendingThread(stop *stoppable.Single) { ...@@ -85,22 +82,25 @@ func (m *manager) sendingThread(stop *stoppable.Single) {
return return
case healthy := <-healthChan: case healthy := <-healthChan:
for !healthy { for !healthy {
jww.INFO.Printf("not healthy, waiting for health update")
select { select {
// Wait for health update or a quit signal
case <-stop.Quit(): case <-stop.Quit():
jww.DEBUG.Printf("[FT] Stopping file part sending thread (%s): "+ // It's possible during shutdown that the health tracker gets
"stoppable triggered.", stop.Name()) // shutdown before the quit signal is received by the case
// statement listening for stop.Quit() above. As a result, we
// must listen for the quit signal here, to avoid waiting
// for a healthy signal that will never come.
jww.DEBUG.Printf("[FT] Stopping file part sending "+
"thread (%s): stoppable triggered.", stop.Name())
m.cmix.RemoveHealthCallback(healthChanID) m.cmix.RemoveHealthCallback(healthChanID)
stop.ToStopped() stop.ToStopped()
return return
case healthy = <-healthChan:
jww.INFO.Printf("received health update, it is now set to %s", healthy)
// If a quit signal is not received, we must wait until the
// network is healthy before we can continue sending files.
case healthy = <-healthChan:
} }
} }
case packet := <-m.sendQueue: case packet := <-m.sendQueue:
jww.INFO.Printf("sending pack")
m.sendCmix(packet) m.sendCmix(packet)
} }
} }
......
...@@ -66,7 +66,7 @@ func (m *Multi) GetStatus() Status { ...@@ -66,7 +66,7 @@ func (m *Multi) GetStatus() Status {
status := s.GetStatus() status := s.GetStatus()
if status < lowestStatus { if status < lowestStatus {
lowestStatus = status lowestStatus = status
jww.INFO.Printf("Stoppable %s has status %s", jww.TRACE.Printf("Stoppable %s has status %s",
s.Name(), status.String()) s.Name(), status.String())
} }
} }
...@@ -121,7 +121,7 @@ func (m *Multi) Close() error { ...@@ -121,7 +121,7 @@ func (m *Multi) Close() error {
m.once.Do(func() { m.once.Do(func() {
var wg sync.WaitGroup var wg sync.WaitGroup
jww.INFO.Printf("Sending on quit channel to multi stoppable %q with processes: %v.", jww.TRACE.Printf("Sending on quit channel to multi stoppable %q with subprocesses %v.",
m.Name(), m.GetRunningProcesses()) m.Name(), m.GetRunningProcesses())
m.mux.Lock() m.mux.Lock()
...@@ -129,7 +129,6 @@ func (m *Multi) Close() error { ...@@ -129,7 +129,6 @@ func (m *Multi) Close() error {
for _, stoppable := range m.stoppables { for _, stoppable := range m.stoppables {
wg.Add(1) wg.Add(1)
go func(s Stoppable) { go func(s Stoppable) {
jww.INFO.Printf("FT DEBUG: stopping %s", s.Name())
if s.Close() != nil { if s.Close() != nil {
atomic.AddUint32(&numErrors, 1) atomic.AddUint32(&numErrors, 1)
} }
......
...@@ -68,7 +68,7 @@ func (s *Single) toStopping() error { ...@@ -68,7 +68,7 @@ func (s *Single) toStopping() error {
return errors.Errorf(toStoppingErr, s.Name(), s.GetStatus(), Running) return errors.Errorf(toStoppingErr, s.Name(), s.GetStatus(), Running)
} }
jww.INFO.Printf("Switched status of single stoppable %q from %s to %s.", jww.TRACE.Printf("Switched status of single stoppable %q from %s to %s.",
s.Name(), Running, Stopping) s.Name(), Running, Stopping)
return nil return nil
...@@ -83,14 +83,13 @@ func (s *Single) ToStopped() { ...@@ -83,14 +83,13 @@ func (s *Single) ToStopped() {
s.Name(), s.GetStatus(), Stopping) s.Name(), s.GetStatus(), Stopping)
} }
jww.INFO.Printf("Switched status of single stoppable %q from %s to %s.", jww.TRACE.Printf("Switched status of single stoppable %q from %s to %s.",
s.Name(), Stopping, Stopped) s.Name(), Stopping, Stopped)
} }
// Quit returns a receive-only channel that will be triggered when the Stoppable // Quit returns a receive-only channel that will be triggered when the Stoppable
// quits. // quits.
func (s *Single) Quit() <-chan struct{} { func (s *Single) Quit() <-chan struct{} {
jww.INFO.Printf("Quit for %s", s.name)
return s.quit return s.quit
} }
...@@ -106,13 +105,13 @@ func (s *Single) Close() error { ...@@ -106,13 +105,13 @@ func (s *Single) Close() error {
return return
} }
jww.INFO.Printf("Sending on quit channel to single stoppable %q.", jww.TRACE.Printf("Sending on quit channel to single stoppable %q.",
s.Name()) s.Name())
// Send on quit channel // Send on quit channel
s.quit <- struct{}{} s.quit <- struct{}{}
jww.INFO.Printf("Sent to quit channel for single stoppable %q.", jww.TRACE.Printf("Sent to quit channel for single stoppable %q.",
s.Name()) s.Name())
}) })
......
...@@ -9,7 +9,6 @@ package xxdk ...@@ -9,7 +9,6 @@ package xxdk
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"sync" "sync"
"time" "time"
...@@ -52,8 +51,6 @@ func (s *services) add(sp Service) error { ...@@ -52,8 +51,6 @@ func (s *services) add(sp Service) error {
return errors.WithMessage(err, "Failed to start added service") return errors.WithMessage(err, "Failed to start added service")
} }
s.stoppable.Add(stop) s.stoppable.Add(stop)
jww.INFO.Printf("STOPPING FT THREAD DEBUG: adding service %v", stop.Name())
} }
return nil return nil
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment