Skip to content
Snippets Groups Projects
Commit 7aac8e09 authored by Josh Brooks's avatar Josh Brooks
Browse files

Merge branch 'XX-4177/StoppingNetwork' into 'release'

Add a debug GetRunningProcesses to bindings

See merge request !378
parents 12eb14f1 a60bd10c
No related branches found
No related tags found
3 merge requests!510Release,!381Initial CTIDH,!378Add a debug GetRunningProcesses to bindings
......@@ -136,6 +136,22 @@ func (c *Cmix) IsHealthy() bool {
return c.api.GetCmix().IsHealthy()
}
// GetRunningProcesses returns the names of all running processes at the time
// of this call. Note that this list may change and is subject to race
// conditions if multiple threads are in the process of starting or stopping.
//
// Returns:
// - []byte - A JSON marshalled list of all running processes.
//
// JSON Example:
// {
// "FileTransfer{BatchBuilderThread, FilePartSendingThread#0, FilePartSendingThread#1, FilePartSendingThread#2, FilePartSendingThread#3}",
// "MessageReception Worker 0"
// }
func (c *Cmix) GetRunningProcesses() ([]byte, error) {
return json.Marshal(c.api.GetRunningProcesses())
}
// NetworkHealthCallback contains a callback that is used to receive
// notification if network health changes.
type NetworkHealthCallback interface {
......
......@@ -9,8 +9,6 @@ package bindings
import (
"encoding/json"
"testing"
"gitlab.com/elixxir/crypto/cmix"
"gitlab.com/elixxir/crypto/cyclic"
dh "gitlab.com/elixxir/crypto/diffieHellman"
......@@ -18,6 +16,7 @@ import (
"gitlab.com/xx_network/crypto/large"
"gitlab.com/xx_network/crypto/signature/rsa"
"gitlab.com/xx_network/primitives/id"
"testing"
)
func TestIdentity_JSON(t *testing.T) {
......
......@@ -215,16 +215,21 @@ func NewManager(params Params, user FtE2e) (FileTransfer, error) {
// StartProcesses starts the sending threads. Adheres to the xxdk.Service type.
func (m *manager) StartProcesses() (stoppable.Stoppable, error) {
// Construct stoppables
multiStop := stoppable.NewMulti(workerPoolStoppable)
senderPoolStop := stoppable.NewMulti(workerPoolStoppable)
batchBuilderStop := stoppable.NewSingle(batchBuilderThreadStoppable)
// Start sending threads
go m.startSendingWorkerPool(multiStop)
// Note that the startSendingWorkerPool already creates thread for every
// worker. As a result, there is no need to run it asynchronously. In fact,
// running this asynchronously could result in a race condition where
// some worker threads are not added to senderPoolStop before that stoppable
// is added to the multiStoppable.
m.startSendingWorkerPool(senderPoolStop)
go m.batchBuilderThread(batchBuilderStop)
// Create a multi stoppable
multiStoppable := stoppable.NewMulti(fileTransferStoppable)
multiStoppable.Add(multiStop)
multiStoppable.Add(senderPoolStop)
multiStoppable.Add(batchBuilderStop)
return multiStoppable, nil
......
......@@ -62,9 +62,10 @@ func (m *manager) startSendingWorkerPool(multiStop *stoppable.Multi) {
for i := 0; i < workerPoolThreads; i++ {
stop := stoppable.NewSingle(sendThreadStoppableName + strconv.Itoa(i))
multiStop.Add(stop)
go m.sendingThread(stop)
multiStop.Add(stop)
}
}
// sendingThread sends part packets that become available oin the send queue.
......@@ -74,8 +75,8 @@ func (m *manager) sendingThread(stop *stoppable.Single) {
for {
select {
case <-stop.Quit():
jww.DEBUG.Printf("[FT] Stopping file part sending thread: " +
"stoppable triggered.")
jww.DEBUG.Printf("[FT] Stopping file part sending thread (%s): "+
"stoppable triggered.", stop.Name())
m.cmix.RemoveHealthCallback(healthChanID)
stop.ToStopped()
return
......
......@@ -61,10 +61,13 @@ func (m *Multi) GetStatus() Status {
lowestStatus := Stopped
m.mux.RLock()
for _, s := range m.stoppables {
for i := range m.stoppables {
s := m.stoppables[i]
status := s.GetStatus()
if status < lowestStatus {
lowestStatus = status
jww.TRACE.Printf("Stoppable %s has status %s",
s.Name(), status.String())
}
}
......@@ -73,6 +76,26 @@ func (m *Multi) GetStatus() Status {
return lowestStatus
}
// GetRunningProcesses returns the names of all running processes at the time
// of this call. Note that this list may change and is subject to race
// conditions if multiple threads are in the process of starting or stopping.
func (m *Multi) GetRunningProcesses() []string {
m.mux.RLock()
runningProcesses := make([]string, 0)
for i := range m.stoppables {
s := m.stoppables[i]
status := s.GetStatus()
if status < Stopped {
runningProcesses = append(runningProcesses, s.Name())
}
}
m.mux.RUnlock()
return runningProcesses
}
// IsRunning returns true if Stoppable is marked as running.
func (m *Multi) IsRunning() bool {
return m.GetStatus() == Running
......@@ -90,7 +113,7 @@ func (m *Multi) IsStopped() bool {
// Close issues a close signal to all child stoppables and marks the status of
// the Multi Stoppable as stopping. Returns an error if one or more child
// stoppables failed to close but it does not return their specific errors and
// stoppables failed to close, but it does not return their specific errors and
// assumes they print them to the log.
func (m *Multi) Close() error {
var numErrors uint32
......
......@@ -397,6 +397,13 @@ func (c *Cmix) HasRunningProcessies() bool {
return !c.followerServices.stoppable.IsStopped()
}
// GetRunningProcesses returns the names of all running processes at the time
// of this call. Note that this list may change and is subject to race
// conditions if multiple threads are in the process of starting or stopping.
func (c *Cmix) GetRunningProcesses() []string {
return c.followerServices.stoppable.GetRunningProcesses()
}
// GetRoundEvents registers a callback for round events.
func (c *Cmix) GetRoundEvents() interfaces.RoundEvents {
jww.INFO.Printf("GetRoundEvents()")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment