From 4ebf5d3da7f4a6deebebfedc3189bbc8c34d07be Mon Sep 17 00:00:00 2001 From: Jono Wenger <jono@elixxir.io> Date: Wed, 22 Dec 2021 23:10:02 +0000 Subject: [PATCH] Add pausing/resuming to dummy message thread and getting of status --- bindings/dummy.go | 47 ++++++-- dummy/manager.go | 56 +++++++++ dummy/manager_test.go | 259 ++++++++++++++++++++++++++++++++++++++++++ dummy/random.go | 1 + dummy/send.go | 70 ++++++++---- dummy/send_test.go | 19 +++- dummy/utils_test.go | 1 + 7 files changed, 420 insertions(+), 33 deletions(-) diff --git a/bindings/dummy.go b/bindings/dummy.go index 8cbaa84dc..d6facc68e 100644 --- a/bindings/dummy.go +++ b/bindings/dummy.go @@ -12,18 +12,49 @@ import ( "time" ) -// StartDummyTraffic starts sending dummy traffic. The maxNumMessages is the -// upper bound of the random number of messages sent each send. avgSendDeltaMS -// is the average duration, in milliseconds, to wait between sends. Sends occur -// every avgSendDeltaMS +/- a random duration with an upper bound of -// randomRangeMS. -func StartDummyTraffic(client *Client, maxNumMessages, avgSendDeltaMS, - randomRangeMS int) error { +// DummyTraffic contains the file dummy traffic manager. The manager can be used +// to set and get the status of the send thread. +type DummyTraffic struct { + m *dummy.Manager +} + +// NewDummyTrafficManager creates a DummyTraffic manager and initialises the +// dummy traffic send thread. Note that the manager does not start sending dummy +// traffic until its status is set to true using DummyTraffic.SetStatus. +// The maxNumMessages is the upper bound of the random number of messages sent +// each send. avgSendDeltaMS is the average duration, in milliseconds, to wait +// between sends. Sends occur every avgSendDeltaMS +/- a random duration with an +// upper bound of randomRangeMS. +func NewDummyTrafficManager(client *Client, maxNumMessages, avgSendDeltaMS, + randomRangeMS int) (*DummyTraffic, error) { + avgSendDelta := time.Duration(avgSendDeltaMS) * time.Millisecond randomRange := time.Duration(randomRangeMS) * time.Millisecond m := dummy.NewManager( maxNumMessages, avgSendDelta, randomRange, &client.api) - return client.api.AddService(m.StartDummyTraffic) + return &DummyTraffic{m}, client.api.AddService(m.StartDummyTraffic) +} + +// SetStatus sets the state of the dummy traffic send thread, which determines +// if the thread is running or paused. The possible statuses are: +// true = send thread is sending dummy messages +// false = send thread is paused/stopped and not sending dummy messages +// Returns an error if the channel is full. +// Note that this function cannot change the status of the send thread if it has +// yet to be started or stopped. +func (dt *DummyTraffic) SetStatus(status bool) error { + return dt.m.SetStatus(status) +} + +// GetStatus returns the current state of the dummy traffic send thread. It has +// the following return values: +// true = send thread is sending dummy messages +// false = send thread is paused/stopped and not sending dummy messages +// Note that this function does not return the status set by SetStatus directly; +// it returns the current status of the send thread, which means any call to +// SetStatus will have a small delay before it is returned by GetStatus. +func (dt *DummyTraffic) GetStatus() bool { + return dt.m.GetStatus() } diff --git a/dummy/manager.go b/dummy/manager.go index edf0b3ac3..6a3b71ccb 100644 --- a/dummy/manager.go +++ b/dummy/manager.go @@ -11,16 +11,32 @@ package dummy import ( + "github.com/pkg/errors" "gitlab.com/elixxir/client/api" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/crypto/fastRNG" + "sync/atomic" "time" ) const ( dummyTrafficStoppableName = "DummyTraffic" + statusChanLen = 100 +) + +// Thread status. +const ( + notStarted uint32 = iota + running + paused + stopped +) + +// Error messages. +const ( + setStatusErr = "Failed to change status of dummy traffic send thread to %t: channel full" ) // Manager manages the sending of dummy messages. @@ -34,6 +50,12 @@ type Manager struct { // Upper limit for random duration that modified avgSendDelta randomRange time.Duration + // Indicates the current status of the thread (0 = paused, 1 = running) + status uint32 + + // Pauses/Resumes the dummy send thread when triggered + statusChan chan bool + // Client interfaces client *api.Client store *storage.Session @@ -58,6 +80,8 @@ func newManager(maxNumMessages int, avgSendDelta, randomRange time.Duration, maxNumMessages: maxNumMessages, avgSendDelta: avgSendDelta, randomRange: randomRange, + status: notStarted, + statusChan: make(chan bool, statusChanLen), client: client, store: store, net: net, @@ -73,3 +97,35 @@ func (m *Manager) StartDummyTraffic() (stoppable.Stoppable, error) { return stop, nil } + +// SetStatus sets the state of the dummy traffic send thread, which determines +// if the thread is running or paused. The possible statuses are: +// true = send thread is sending dummy messages +// false = send thread is paused/stopped and not sending dummy messages +// Returns an error if the channel is full. +// Note that this function cannot change the status of the send thread if it has +// yet to be started via StartDummyTraffic or if it has been stopped. +func (m *Manager) SetStatus(status bool) error { + select { + case m.statusChan <- status: + return nil + default: + return errors.Errorf(setStatusErr, status) + } +} + +// GetStatus returns the current state of the dummy traffic send thread. It has +// the following return values: +// true = send thread is sending dummy messages +// false = send thread is paused/stopped and not sending dummy messages +// Note that this function does not return the status set by SetStatus directly; +// it returns the current status of the send thread, which means any call to +// SetStatus will have a small delay before it is returned by GetStatus. +func (m *Manager) GetStatus() bool { + switch atomic.LoadUint32(&m.status) { + case running: + return true + default: + return false + } +} diff --git a/dummy/manager_test.go b/dummy/manager_test.go index 7f8ec99d0..753b0fcb2 100644 --- a/dummy/manager_test.go +++ b/dummy/manager_test.go @@ -8,7 +8,10 @@ package dummy import ( + "fmt" + "gitlab.com/elixxir/client/stoppable" "reflect" + "sync/atomic" "testing" "time" ) @@ -19,11 +22,20 @@ func Test_newManager(t *testing.T) { maxNumMessages: 10, avgSendDelta: time.Minute, randomRange: time.Second, + status: notStarted, + statusChan: make(chan bool, statusChanLen), } received := newManager(expected.maxNumMessages, expected.avgSendDelta, expected.randomRange, nil, nil, nil, nil) + if statusChanLen != cap(received.statusChan) { + t.Errorf("Capacity of status channel unexpected."+ + "\nexpected: %d\nreceived: %d", + statusChanLen, cap(received.statusChan)) + } + received.statusChan = expected.statusChan + if !reflect.DeepEqual(expected, received) { t.Errorf("New manager does not match expected."+ "\nexpected: %+v\nreceived: %+v", expected, received) @@ -35,6 +47,11 @@ func Test_newManager(t *testing.T) { func TestManager_StartDummyTraffic(t *testing.T) { m := newTestManager(10, 50*time.Millisecond, 10*time.Millisecond, false, t) + err := m.SetStatus(true) + if err != nil { + t.Errorf("Failed to set status to true.") + } + stop, err := m.StartDummyTraffic() if err != nil { t.Errorf("StartDummyTraffic returned an error: %+v", err) @@ -82,3 +99,245 @@ func TestManager_StartDummyTraffic(t *testing.T) { t.Error("Received new messages after stoppable was stopped.") } } + +// Tests that Manager.SetStatus prevents messages from being sent and that it +// can be called multiple times with the same status without it affecting +// anything. Also tests that the thread quits even when paused. +func TestManager_SetStatus(t *testing.T) { + m := newTestManager(10, 50*time.Millisecond, 10*time.Millisecond, false, t) + + err := m.SetStatus(false) + if err != nil { + t.Errorf("setStatus returned an error: %+v", err) + } + + stop := stoppable.NewSingle("sendThreadTest") + go m.sendThread(stop) + + msgChan := make(chan bool, 10) + go func() { + var numReceived int + for i := 0; i < 2; i++ { + for m.net.(*testNetworkManager).GetMsgListLen() == numReceived { + time.Sleep(5 * time.Millisecond) + } + numReceived = m.net.(*testNetworkManager).GetMsgListLen() + msgChan <- true + } + }() + + time.Sleep(3 * time.Millisecond) + if stat := atomic.LoadUint32(&m.status); stat != paused { + t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d", + paused, stat) + } + + // Setting status to false should cause the messages to not send + err = m.SetStatus(false) + if err != nil { + t.Errorf("setStatus returned an error: %+v", err) + } + + var numReceived int + select { + case <-time.NewTimer(3 * m.avgSendDelta).C: + case <-msgChan: + t.Errorf("Should not have received messages when thread was pasued.") + } + + err = m.SetStatus(true) + if err != nil { + t.Errorf("setStatus returned an error: %+v", err) + } + + time.Sleep(3 * time.Millisecond) + if stat := atomic.LoadUint32(&m.status); stat != running { + t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d", + running, stat) + } + + select { + case <-time.NewTimer(3 * m.avgSendDelta).C: + t.Errorf("Timed out after %s waiting for messages to be sent.", + 3*m.avgSendDelta) + case <-msgChan: + numReceived += m.net.(*testNetworkManager).GetMsgListLen() + } + + // Setting status to true multiple times does not interrupt sending + for i := 0; i < 3; i++ { + err = m.SetStatus(true) + if err != nil { + t.Errorf("setStatus returned an error (%d): %+v", i, err) + } + } + + select { + case <-time.NewTimer(3 * m.avgSendDelta).C: + t.Errorf("Timed out after %s waiting for messages to be sent.", + 3*m.avgSendDelta) + case <-msgChan: + if m.net.(*testNetworkManager).GetMsgListLen() <= numReceived { + t.Errorf("Failed to receive second send."+ + "\nmessages on last receive: %d\nmessages on this receive: %d", + numReceived, m.net.(*testNetworkManager).GetMsgListLen()) + } + } + + // Shows that the stoppable still stops when the thread is paused + err = m.SetStatus(false) + if err != nil { + t.Errorf("setStatus returned an error: %+v", err) + } + time.Sleep(3 * time.Millisecond) + if stat := atomic.LoadUint32(&m.status); stat != paused { + t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d", + paused, stat) + } + + err = stop.Close() + if err != nil { + t.Errorf("Failed to close stoppable: %+v", err) + } + + time.Sleep(10 * time.Millisecond) + if !stop.IsStopped() { + t.Error("Stoppable never stopped.") + } + if stat := atomic.LoadUint32(&m.status); stat != stopped { + t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d", + stopped, stat) + } +} + +// Error path: tests that Manager.SetStatus returns an error if the status +// cannot be set. +func TestManager_SetStatus_ChannelError(t *testing.T) { + m := newTestManager(10, 50*time.Millisecond, 10*time.Millisecond, false, t) + + // Send the max number of status changes on the channel + for i := 0; i < statusChanLen; i++ { + err := m.SetStatus(false) + if err != nil { + t.Errorf("setStatus returned an error (%d): %+v", i, err) + } + } + + // Calling one more time causes an error + expectedErr := fmt.Sprintf(setStatusErr, true) + err := m.SetStatus(true) + if err == nil || err.Error() != expectedErr { + t.Errorf("setStatus returned unexpected error when channel is full."+ + "\nexpected: %s\nreceived: %+v", expectedErr, err) + } + +} + +// Tests that Manager.GetStatus gets the correct status before the send thread +// starts, while sending, while paused, and after it is stopped. +func TestManager_GetStatus(t *testing.T) { + m := newTestManager(10, 50*time.Millisecond, 10*time.Millisecond, false, t) + + err := m.SetStatus(false) + if err != nil { + t.Errorf("setStatus returned an error: %+v", err) + } + + stop := stoppable.NewSingle("sendThreadTest") + go m.sendThread(stop) + + if m.GetStatus() { + t.Errorf("GetStatus reported thread as running.") + } + + msgChan := make(chan bool, 10) + go func() { + var numReceived int + for i := 0; i < 2; i++ { + for m.net.(*testNetworkManager).GetMsgListLen() == numReceived { + time.Sleep(5 * time.Millisecond) + } + numReceived = m.net.(*testNetworkManager).GetMsgListLen() + msgChan <- true + } + }() + + // Setting status to false should cause the messages to not send + err = m.SetStatus(false) + if err != nil { + t.Errorf("setStatus returned an error: %+v", err) + } + if m.GetStatus() { + t.Errorf("GetStatus reported thread as running.") + } + + var numReceived int + select { + case <-time.NewTimer(3 * m.avgSendDelta).C: + case <-msgChan: + t.Errorf("Should not have received messages when thread was pasued.") + } + + err = m.SetStatus(true) + if err != nil { + t.Errorf("setStatus returned an error: %+v", err) + } + time.Sleep(3 * time.Millisecond) + if !m.GetStatus() { + t.Errorf("GetStatus reported thread as paused.") + } + + select { + case <-time.NewTimer(3 * m.avgSendDelta).C: + t.Errorf("Timed out after %s waiting for messages to be sent.", + 3*m.avgSendDelta) + case <-msgChan: + numReceived += m.net.(*testNetworkManager).GetMsgListLen() + } + + // Setting status to true multiple times does not interrupt sending + for i := 0; i < 3; i++ { + err = m.SetStatus(true) + if err != nil { + t.Errorf("setStatus returned an error (%d): %+v", i, err) + } + } + if !m.GetStatus() { + t.Errorf("GetStatus reported thread as paused.") + } + + select { + case <-time.NewTimer(3 * m.avgSendDelta).C: + t.Errorf("Timed out after %s waiting for messages to be sent.", + 3*m.avgSendDelta) + case <-msgChan: + if m.net.(*testNetworkManager).GetMsgListLen() <= numReceived { + t.Errorf("Failed to receive second send."+ + "\nmessages on last receive: %d\nmessages on this receive: %d", + numReceived, m.net.(*testNetworkManager).GetMsgListLen()) + } + } + + // Shows that the stoppable still stops when the thread is paused + err = m.SetStatus(false) + if err != nil { + t.Errorf("setStatus returned an error: %+v", err) + } + time.Sleep(3 * time.Millisecond) + if m.GetStatus() { + t.Errorf("GetStatus reported thread as running.") + } + + err = stop.Close() + if err != nil { + t.Errorf("Failed to close stoppable: %+v", err) + } + + time.Sleep(10 * time.Millisecond) + if !stop.IsStopped() { + t.Error("Stoppable never stopped.") + } + if m.GetStatus() { + t.Errorf("GetStatus reported thread as running.") + } +} diff --git a/dummy/random.go b/dummy/random.go index 2327ddf6c..8c8a87a6c 100644 --- a/dummy/random.go +++ b/dummy/random.go @@ -14,6 +14,7 @@ import ( "gitlab.com/xx_network/crypto/csprng" "time" ) // Error messages. + const ( payloadSizeRngErr = "failed to generate random payload size: %+v" ) diff --git a/dummy/send.go b/dummy/send.go index f37f79927..2ae9d9288 100644 --- a/dummy/send.go +++ b/dummy/send.go @@ -33,34 +33,55 @@ const ( func (m *Manager) sendThread(stop *stoppable.Single) { jww.DEBUG.Print("Starting dummy traffic sending thread.") - timer := m.randomTimer() + nextSendChan := make(<-chan time.Time) + nextSendChanPtr := &(nextSendChan) for { select { case <-stop.Quit(): - jww.DEBUG.Print("Stopping dummy traffic sending thread: stoppable " + - "triggered") - stop.ToStopped() + m.stopSendThread(stop) return - case <-timer.C: - timer = m.randomTimer() - - // Get list of random messages and recipients - rng := m.rng.GetStream() - msgs, err := m.newRandomMessages(rng) - if err != nil { - jww.FATAL.Panicf("Failed to generate dummy messages: %+v", err) + case status := <-m.statusChan: + if status { + atomic.StoreUint32(&m.status, running) + nextSendChanPtr = &(m.randomTimer().C) + } else { + atomic.StoreUint32(&m.status, paused) + nextSendChan = make(<-chan time.Time) + nextSendChanPtr = &nextSendChan } - rng.Close() + case <-*nextSendChanPtr: + nextSendChanPtr = &(m.randomTimer().C) + + go func() { + // Get list of random messages and recipients + rng := m.rng.GetStream() + msgs, err := m.newRandomMessages(rng) + if err != nil { + jww.FATAL.Panicf("Failed to generate dummy messages: %+v", err) + } + rng.Close() + + err = m.sendMessages(msgs) + if err != nil { + jww.FATAL.Panicf("Failed to send dummy messages: %+v", err) + } + }() - err = m.sendMessages(msgs) - if err != nil { - jww.FATAL.Panicf("Failed to send dummy messages: %+v", err) - } } } } +// stopSendThread is triggered when the stoppable is triggered. It prints a +// debug message, sets the thread status to stopped, and sets the status of the +// stoppable to stopped. +func (m *Manager) stopSendThread(stop *stoppable.Single) { + jww.DEBUG.Print( + "Stopping dummy traffic sending thread: stoppable triggered") + atomic.StoreUint32(&m.status, stopped) + stop.ToStopped() +} + // sendMessages generates and sends random messages. func (m *Manager) sendMessages(msgs map[id.ID]format.Message) error { var sent, i int64 @@ -70,24 +91,25 @@ func (m *Manager) sendMessages(msgs map[id.ID]format.Message) error { wg.Add(1) go func(i int64, recipient id.ID, msg format.Message) { - //fill the preiamge with random data to ensure it isnt repeatable + defer wg.Done() + + // Fill the preimage with random data to ensure it is not repeatable p := params.GetDefaultCMIX() p.IdentityPreimage = make([]byte, 32) rng := m.rng.GetStream() if _, err := rng.Read(p.IdentityPreimage); err != nil { - jww.FATAL.Panicf("Failed to generate data for random "+ - "identity preimage in e2e send: %+v", err) + jww.FATAL.Panicf("Failed to generate data for random identity "+ + "preimage in e2e send: %+v", err) } rng.Close() + _, _, err := m.net.SendCMIX(msg, &recipient, p) if err != nil { - jww.WARN.Printf("failed to send dummy message %d/%d: %+v", - i, len(msgs), err) + jww.WARN.Printf("Failed to send dummy message %d/%d via "+ + "SendCMIX: %+v", i, len(msgs), err) } else { atomic.AddInt64(&sent, 1) } - - wg.Done() }(i, recipient, msg) i++ diff --git a/dummy/send_test.go b/dummy/send_test.go index acf08cb55..11d40fbc6 100644 --- a/dummy/send_test.go +++ b/dummy/send_test.go @@ -14,6 +14,7 @@ import ( "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" "reflect" + "sync/atomic" "testing" "time" ) @@ -25,6 +26,16 @@ func TestManager_sendThread(t *testing.T) { stop := stoppable.NewSingle("sendThreadTest") go m.sendThread(stop) + if stat := atomic.LoadUint32(&m.status); stat != notStarted { + t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d", + notStarted, stat) + } + + err := m.SetStatus(true) + if err != nil { + t.Errorf("Failed to set status to true.") + } + msgChan := make(chan bool, 10) go func() { var numReceived int @@ -58,7 +69,7 @@ func TestManager_sendThread(t *testing.T) { } } - err := stop.Close() + err = stop.Close() if err != nil { t.Errorf("Failed to close stoppable: %+v", err) } @@ -67,6 +78,12 @@ func TestManager_sendThread(t *testing.T) { if !stop.IsStopped() { t.Error("Stoppable never stopped.") } + + if stat := atomic.LoadUint32(&m.status); stat != stopped { + t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d", + stopped, stat) + } + } // Tests that Manager.sendMessages sends all the messages with the correct diff --git a/dummy/utils_test.go b/dummy/utils_test.go index 5ad1a5b65..48bff1261 100644 --- a/dummy/utils_test.go +++ b/dummy/utils_test.go @@ -54,6 +54,7 @@ func newTestManager(maxNumMessages int, avgSendDelta, randomRange time.Duration, maxNumMessages: maxNumMessages, avgSendDelta: avgSendDelta, randomRange: randomRange, + statusChan: make(chan bool, statusChanLen), store: storage.InitTestingSession(t), net: newTestNetworkManager(sendErr, t), rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), -- GitLab