diff --git a/bindings/follow.go b/bindings/follow.go index 2303274e3c51e897c2331faed61dd56f4d939f71..362ace98f2c8c66f53717b405153c261f75b2597 100644 --- a/bindings/follow.go +++ b/bindings/follow.go @@ -136,6 +136,22 @@ func (c *Cmix) IsHealthy() bool { return c.api.GetCmix().IsHealthy() } +// GetRunningProcesses returns the names of all running processes at the time +// of this call. Note that this list may change and is subject to race +// conditions if multiple threads are in the process of starting or stopping. +// +// Returns: +// - []byte - A JSON marshalled list of all running processes. +// +// JSON Example: +// { +// "FileTransfer{BatchBuilderThread, FilePartSendingThread#0, FilePartSendingThread#1, FilePartSendingThread#2, FilePartSendingThread#3}", +// "MessageReception Worker 0" +// } +func (c *Cmix) GetRunningProcesses() ([]byte, error) { + return json.Marshal(c.api.GetRunningProcesses()) +} + // NetworkHealthCallback contains a callback that is used to receive // notification if network health changes. type NetworkHealthCallback interface { diff --git a/bindings/identity_test.go b/bindings/identity_test.go index d30efb1078de7c8b204b3911bc97470a41c58e4c..eeff0b1a83edb91a79526f4b7e4fb225fe9174a0 100644 --- a/bindings/identity_test.go +++ b/bindings/identity_test.go @@ -9,8 +9,6 @@ package bindings import ( "encoding/json" - "testing" - "gitlab.com/elixxir/crypto/cmix" "gitlab.com/elixxir/crypto/cyclic" dh "gitlab.com/elixxir/crypto/diffieHellman" @@ -18,6 +16,7 @@ import ( "gitlab.com/xx_network/crypto/large" "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/primitives/id" + "testing" ) func TestIdentity_JSON(t *testing.T) { diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index 7e167a3826cbd80ad8a848e74a500722e37be41a..549a9a799dc9d6f75c415a71757954a36cc0d0f7 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -215,16 +215,21 @@ func NewManager(params Params, user FtE2e) (FileTransfer, error) { // StartProcesses starts the sending threads. Adheres to the xxdk.Service type. func (m *manager) StartProcesses() (stoppable.Stoppable, error) { // Construct stoppables - multiStop := stoppable.NewMulti(workerPoolStoppable) + senderPoolStop := stoppable.NewMulti(workerPoolStoppable) batchBuilderStop := stoppable.NewSingle(batchBuilderThreadStoppable) // Start sending threads - go m.startSendingWorkerPool(multiStop) + // Note that the startSendingWorkerPool already creates thread for every + // worker. As a result, there is no need to run it asynchronously. In fact, + // running this asynchronously could result in a race condition where + // some worker threads are not added to senderPoolStop before that stoppable + // is added to the multiStoppable. + m.startSendingWorkerPool(senderPoolStop) go m.batchBuilderThread(batchBuilderStop) // Create a multi stoppable multiStoppable := stoppable.NewMulti(fileTransferStoppable) - multiStoppable.Add(multiStop) + multiStoppable.Add(senderPoolStop) multiStoppable.Add(batchBuilderStop) return multiStoppable, nil diff --git a/fileTransfer/send.go b/fileTransfer/send.go index adb3df6e19396302f0482b9348cfdd46ab8dab83..c266a35c8f5d8f9882b8b07a7b308566c4c6c69d 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -62,9 +62,10 @@ func (m *manager) startSendingWorkerPool(multiStop *stoppable.Multi) { for i := 0; i < workerPoolThreads; i++ { stop := stoppable.NewSingle(sendThreadStoppableName + strconv.Itoa(i)) - multiStop.Add(stop) go m.sendingThread(stop) + multiStop.Add(stop) } + } // sendingThread sends part packets that become available oin the send queue. @@ -74,8 +75,8 @@ func (m *manager) sendingThread(stop *stoppable.Single) { for { select { case <-stop.Quit(): - jww.DEBUG.Printf("[FT] Stopping file part sending thread: " + - "stoppable triggered.") + jww.DEBUG.Printf("[FT] Stopping file part sending thread (%s): "+ + "stoppable triggered.", stop.Name()) m.cmix.RemoveHealthCallback(healthChanID) stop.ToStopped() return diff --git a/stoppable/multi.go b/stoppable/multi.go index 81bd270257b19fd3a6d75c78591b390335230d2b..3ac77c7507fb058af1deac2b098121b16d3569b0 100644 --- a/stoppable/multi.go +++ b/stoppable/multi.go @@ -61,10 +61,13 @@ func (m *Multi) GetStatus() Status { lowestStatus := Stopped m.mux.RLock() - for _, s := range m.stoppables { + for i := range m.stoppables { + s := m.stoppables[i] status := s.GetStatus() if status < lowestStatus { lowestStatus = status + jww.TRACE.Printf("Stoppable %s has status %s", + s.Name(), status.String()) } } @@ -73,6 +76,26 @@ func (m *Multi) GetStatus() Status { return lowestStatus } +// GetRunningProcesses returns the names of all running processes at the time +// of this call. Note that this list may change and is subject to race +// conditions if multiple threads are in the process of starting or stopping. +func (m *Multi) GetRunningProcesses() []string { + m.mux.RLock() + + runningProcesses := make([]string, 0) + for i := range m.stoppables { + s := m.stoppables[i] + status := s.GetStatus() + if status < Stopped { + runningProcesses = append(runningProcesses, s.Name()) + } + } + + m.mux.RUnlock() + + return runningProcesses +} + // IsRunning returns true if Stoppable is marked as running. func (m *Multi) IsRunning() bool { return m.GetStatus() == Running @@ -90,7 +113,7 @@ func (m *Multi) IsStopped() bool { // Close issues a close signal to all child stoppables and marks the status of // the Multi Stoppable as stopping. Returns an error if one or more child -// stoppables failed to close but it does not return their specific errors and +// stoppables failed to close, but it does not return their specific errors and // assumes they print them to the log. func (m *Multi) Close() error { var numErrors uint32 diff --git a/xxdk/cmix.go b/xxdk/cmix.go index 33658c752d3242de2f7ae1606e14d6babfb52e99..a2b8b8b9ad3c6e7cc77c6eb325fabba17084cff0 100644 --- a/xxdk/cmix.go +++ b/xxdk/cmix.go @@ -397,6 +397,13 @@ func (c *Cmix) HasRunningProcessies() bool { return !c.followerServices.stoppable.IsStopped() } +// GetRunningProcesses returns the names of all running processes at the time +// of this call. Note that this list may change and is subject to race +// conditions if multiple threads are in the process of starting or stopping. +func (c *Cmix) GetRunningProcesses() []string { + return c.followerServices.stoppable.GetRunningProcesses() +} + // GetRoundEvents registers a callback for round events. func (c *Cmix) GetRoundEvents() interfaces.RoundEvents { jww.INFO.Printf("GetRoundEvents()")