Skip to content
Snippets Groups Projects
Commit 6faba65a authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

Merge branch 'XX-3650/dummyMessageSendStatus' into 'release'

Add pausing/resuming to dummy message thread and getting of status

See merge request !96
parents 9189e7cd 4ebf5d3d
No related branches found
No related tags found
2 merge requests!117Release,!96Add pausing/resuming to dummy message thread and getting of status
...@@ -12,18 +12,49 @@ import ( ...@@ -12,18 +12,49 @@ import (
"time" "time"
) )
// StartDummyTraffic starts sending dummy traffic. The maxNumMessages is the // DummyTraffic contains the file dummy traffic manager. The manager can be used
// upper bound of the random number of messages sent each send. avgSendDeltaMS // to set and get the status of the send thread.
// is the average duration, in milliseconds, to wait between sends. Sends occur type DummyTraffic struct {
// every avgSendDeltaMS +/- a random duration with an upper bound of m *dummy.Manager
// randomRangeMS. }
func StartDummyTraffic(client *Client, maxNumMessages, avgSendDeltaMS,
randomRangeMS int) error { // NewDummyTrafficManager creates a DummyTraffic manager and initialises the
// dummy traffic send thread. Note that the manager does not start sending dummy
// traffic until its status is set to true using DummyTraffic.SetStatus.
// The maxNumMessages is the upper bound of the random number of messages sent
// each send. avgSendDeltaMS is the average duration, in milliseconds, to wait
// between sends. Sends occur every avgSendDeltaMS +/- a random duration with an
// upper bound of randomRangeMS.
func NewDummyTrafficManager(client *Client, maxNumMessages, avgSendDeltaMS,
randomRangeMS int) (*DummyTraffic, error) {
avgSendDelta := time.Duration(avgSendDeltaMS) * time.Millisecond avgSendDelta := time.Duration(avgSendDeltaMS) * time.Millisecond
randomRange := time.Duration(randomRangeMS) * time.Millisecond randomRange := time.Duration(randomRangeMS) * time.Millisecond
m := dummy.NewManager( m := dummy.NewManager(
maxNumMessages, avgSendDelta, randomRange, &client.api) maxNumMessages, avgSendDelta, randomRange, &client.api)
return client.api.AddService(m.StartDummyTraffic) return &DummyTraffic{m}, client.api.AddService(m.StartDummyTraffic)
}
// SetStatus sets the state of the dummy traffic send thread, which determines
// if the thread is running or paused. The possible statuses are:
// true = send thread is sending dummy messages
// false = send thread is paused/stopped and not sending dummy messages
// Returns an error if the channel is full.
// Note that this function cannot change the status of the send thread if it has
// yet to be started or stopped.
func (dt *DummyTraffic) SetStatus(status bool) error {
return dt.m.SetStatus(status)
}
// GetStatus returns the current state of the dummy traffic send thread. It has
// the following return values:
// true = send thread is sending dummy messages
// false = send thread is paused/stopped and not sending dummy messages
// Note that this function does not return the status set by SetStatus directly;
// it returns the current status of the send thread, which means any call to
// SetStatus will have a small delay before it is returned by GetStatus.
func (dt *DummyTraffic) GetStatus() bool {
return dt.m.GetStatus()
} }
...@@ -11,16 +11,32 @@ ...@@ -11,16 +11,32 @@
package dummy package dummy
import ( import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/api" "gitlab.com/elixxir/client/api"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
"sync/atomic"
"time" "time"
) )
const ( const (
dummyTrafficStoppableName = "DummyTraffic" dummyTrafficStoppableName = "DummyTraffic"
statusChanLen = 100
)
// Thread status.
const (
notStarted uint32 = iota
running
paused
stopped
)
// Error messages.
const (
setStatusErr = "Failed to change status of dummy traffic send thread to %t: channel full"
) )
// Manager manages the sending of dummy messages. // Manager manages the sending of dummy messages.
...@@ -34,6 +50,12 @@ type Manager struct { ...@@ -34,6 +50,12 @@ type Manager struct {
// Upper limit for random duration that modified avgSendDelta // Upper limit for random duration that modified avgSendDelta
randomRange time.Duration randomRange time.Duration
// Indicates the current status of the thread (0 = paused, 1 = running)
status uint32
// Pauses/Resumes the dummy send thread when triggered
statusChan chan bool
// Client interfaces // Client interfaces
client *api.Client client *api.Client
store *storage.Session store *storage.Session
...@@ -58,6 +80,8 @@ func newManager(maxNumMessages int, avgSendDelta, randomRange time.Duration, ...@@ -58,6 +80,8 @@ func newManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
maxNumMessages: maxNumMessages, maxNumMessages: maxNumMessages,
avgSendDelta: avgSendDelta, avgSendDelta: avgSendDelta,
randomRange: randomRange, randomRange: randomRange,
status: notStarted,
statusChan: make(chan bool, statusChanLen),
client: client, client: client,
store: store, store: store,
net: net, net: net,
...@@ -73,3 +97,35 @@ func (m *Manager) StartDummyTraffic() (stoppable.Stoppable, error) { ...@@ -73,3 +97,35 @@ func (m *Manager) StartDummyTraffic() (stoppable.Stoppable, error) {
return stop, nil return stop, nil
} }
// SetStatus sets the state of the dummy traffic send thread, which determines
// if the thread is running or paused. The possible statuses are:
// true = send thread is sending dummy messages
// false = send thread is paused/stopped and not sending dummy messages
// Returns an error if the channel is full.
// Note that this function cannot change the status of the send thread if it has
// yet to be started via StartDummyTraffic or if it has been stopped.
func (m *Manager) SetStatus(status bool) error {
select {
case m.statusChan <- status:
return nil
default:
return errors.Errorf(setStatusErr, status)
}
}
// GetStatus returns the current state of the dummy traffic send thread. It has
// the following return values:
// true = send thread is sending dummy messages
// false = send thread is paused/stopped and not sending dummy messages
// Note that this function does not return the status set by SetStatus directly;
// it returns the current status of the send thread, which means any call to
// SetStatus will have a small delay before it is returned by GetStatus.
func (m *Manager) GetStatus() bool {
switch atomic.LoadUint32(&m.status) {
case running:
return true
default:
return false
}
}
...@@ -8,7 +8,10 @@ ...@@ -8,7 +8,10 @@
package dummy package dummy
import ( import (
"fmt"
"gitlab.com/elixxir/client/stoppable"
"reflect" "reflect"
"sync/atomic"
"testing" "testing"
"time" "time"
) )
...@@ -19,11 +22,20 @@ func Test_newManager(t *testing.T) { ...@@ -19,11 +22,20 @@ func Test_newManager(t *testing.T) {
maxNumMessages: 10, maxNumMessages: 10,
avgSendDelta: time.Minute, avgSendDelta: time.Minute,
randomRange: time.Second, randomRange: time.Second,
status: notStarted,
statusChan: make(chan bool, statusChanLen),
} }
received := newManager(expected.maxNumMessages, expected.avgSendDelta, received := newManager(expected.maxNumMessages, expected.avgSendDelta,
expected.randomRange, nil, nil, nil, nil) expected.randomRange, nil, nil, nil, nil)
if statusChanLen != cap(received.statusChan) {
t.Errorf("Capacity of status channel unexpected."+
"\nexpected: %d\nreceived: %d",
statusChanLen, cap(received.statusChan))
}
received.statusChan = expected.statusChan
if !reflect.DeepEqual(expected, received) { if !reflect.DeepEqual(expected, received) {
t.Errorf("New manager does not match expected."+ t.Errorf("New manager does not match expected."+
"\nexpected: %+v\nreceived: %+v", expected, received) "\nexpected: %+v\nreceived: %+v", expected, received)
...@@ -35,6 +47,11 @@ func Test_newManager(t *testing.T) { ...@@ -35,6 +47,11 @@ func Test_newManager(t *testing.T) {
func TestManager_StartDummyTraffic(t *testing.T) { func TestManager_StartDummyTraffic(t *testing.T) {
m := newTestManager(10, 50*time.Millisecond, 10*time.Millisecond, false, t) m := newTestManager(10, 50*time.Millisecond, 10*time.Millisecond, false, t)
err := m.SetStatus(true)
if err != nil {
t.Errorf("Failed to set status to true.")
}
stop, err := m.StartDummyTraffic() stop, err := m.StartDummyTraffic()
if err != nil { if err != nil {
t.Errorf("StartDummyTraffic returned an error: %+v", err) t.Errorf("StartDummyTraffic returned an error: %+v", err)
...@@ -82,3 +99,245 @@ func TestManager_StartDummyTraffic(t *testing.T) { ...@@ -82,3 +99,245 @@ func TestManager_StartDummyTraffic(t *testing.T) {
t.Error("Received new messages after stoppable was stopped.") t.Error("Received new messages after stoppable was stopped.")
} }
} }
// Tests that Manager.SetStatus prevents messages from being sent and that it
// can be called multiple times with the same status without it affecting
// anything. Also tests that the thread quits even when paused.
func TestManager_SetStatus(t *testing.T) {
m := newTestManager(10, 50*time.Millisecond, 10*time.Millisecond, false, t)
err := m.SetStatus(false)
if err != nil {
t.Errorf("setStatus returned an error: %+v", err)
}
stop := stoppable.NewSingle("sendThreadTest")
go m.sendThread(stop)
msgChan := make(chan bool, 10)
go func() {
var numReceived int
for i := 0; i < 2; i++ {
for m.net.(*testNetworkManager).GetMsgListLen() == numReceived {
time.Sleep(5 * time.Millisecond)
}
numReceived = m.net.(*testNetworkManager).GetMsgListLen()
msgChan <- true
}
}()
time.Sleep(3 * time.Millisecond)
if stat := atomic.LoadUint32(&m.status); stat != paused {
t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d",
paused, stat)
}
// Setting status to false should cause the messages to not send
err = m.SetStatus(false)
if err != nil {
t.Errorf("setStatus returned an error: %+v", err)
}
var numReceived int
select {
case <-time.NewTimer(3 * m.avgSendDelta).C:
case <-msgChan:
t.Errorf("Should not have received messages when thread was pasued.")
}
err = m.SetStatus(true)
if err != nil {
t.Errorf("setStatus returned an error: %+v", err)
}
time.Sleep(3 * time.Millisecond)
if stat := atomic.LoadUint32(&m.status); stat != running {
t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d",
running, stat)
}
select {
case <-time.NewTimer(3 * m.avgSendDelta).C:
t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta)
case <-msgChan:
numReceived += m.net.(*testNetworkManager).GetMsgListLen()
}
// Setting status to true multiple times does not interrupt sending
for i := 0; i < 3; i++ {
err = m.SetStatus(true)
if err != nil {
t.Errorf("setStatus returned an error (%d): %+v", i, err)
}
}
select {
case <-time.NewTimer(3 * m.avgSendDelta).C:
t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta)
case <-msgChan:
if m.net.(*testNetworkManager).GetMsgListLen() <= numReceived {
t.Errorf("Failed to receive second send."+
"\nmessages on last receive: %d\nmessages on this receive: %d",
numReceived, m.net.(*testNetworkManager).GetMsgListLen())
}
}
// Shows that the stoppable still stops when the thread is paused
err = m.SetStatus(false)
if err != nil {
t.Errorf("setStatus returned an error: %+v", err)
}
time.Sleep(3 * time.Millisecond)
if stat := atomic.LoadUint32(&m.status); stat != paused {
t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d",
paused, stat)
}
err = stop.Close()
if err != nil {
t.Errorf("Failed to close stoppable: %+v", err)
}
time.Sleep(10 * time.Millisecond)
if !stop.IsStopped() {
t.Error("Stoppable never stopped.")
}
if stat := atomic.LoadUint32(&m.status); stat != stopped {
t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d",
stopped, stat)
}
}
// Error path: tests that Manager.SetStatus returns an error if the status
// cannot be set.
func TestManager_SetStatus_ChannelError(t *testing.T) {
m := newTestManager(10, 50*time.Millisecond, 10*time.Millisecond, false, t)
// Send the max number of status changes on the channel
for i := 0; i < statusChanLen; i++ {
err := m.SetStatus(false)
if err != nil {
t.Errorf("setStatus returned an error (%d): %+v", i, err)
}
}
// Calling one more time causes an error
expectedErr := fmt.Sprintf(setStatusErr, true)
err := m.SetStatus(true)
if err == nil || err.Error() != expectedErr {
t.Errorf("setStatus returned unexpected error when channel is full."+
"\nexpected: %s\nreceived: %+v", expectedErr, err)
}
}
// Tests that Manager.GetStatus gets the correct status before the send thread
// starts, while sending, while paused, and after it is stopped.
func TestManager_GetStatus(t *testing.T) {
m := newTestManager(10, 50*time.Millisecond, 10*time.Millisecond, false, t)
err := m.SetStatus(false)
if err != nil {
t.Errorf("setStatus returned an error: %+v", err)
}
stop := stoppable.NewSingle("sendThreadTest")
go m.sendThread(stop)
if m.GetStatus() {
t.Errorf("GetStatus reported thread as running.")
}
msgChan := make(chan bool, 10)
go func() {
var numReceived int
for i := 0; i < 2; i++ {
for m.net.(*testNetworkManager).GetMsgListLen() == numReceived {
time.Sleep(5 * time.Millisecond)
}
numReceived = m.net.(*testNetworkManager).GetMsgListLen()
msgChan <- true
}
}()
// Setting status to false should cause the messages to not send
err = m.SetStatus(false)
if err != nil {
t.Errorf("setStatus returned an error: %+v", err)
}
if m.GetStatus() {
t.Errorf("GetStatus reported thread as running.")
}
var numReceived int
select {
case <-time.NewTimer(3 * m.avgSendDelta).C:
case <-msgChan:
t.Errorf("Should not have received messages when thread was pasued.")
}
err = m.SetStatus(true)
if err != nil {
t.Errorf("setStatus returned an error: %+v", err)
}
time.Sleep(3 * time.Millisecond)
if !m.GetStatus() {
t.Errorf("GetStatus reported thread as paused.")
}
select {
case <-time.NewTimer(3 * m.avgSendDelta).C:
t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta)
case <-msgChan:
numReceived += m.net.(*testNetworkManager).GetMsgListLen()
}
// Setting status to true multiple times does not interrupt sending
for i := 0; i < 3; i++ {
err = m.SetStatus(true)
if err != nil {
t.Errorf("setStatus returned an error (%d): %+v", i, err)
}
}
if !m.GetStatus() {
t.Errorf("GetStatus reported thread as paused.")
}
select {
case <-time.NewTimer(3 * m.avgSendDelta).C:
t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta)
case <-msgChan:
if m.net.(*testNetworkManager).GetMsgListLen() <= numReceived {
t.Errorf("Failed to receive second send."+
"\nmessages on last receive: %d\nmessages on this receive: %d",
numReceived, m.net.(*testNetworkManager).GetMsgListLen())
}
}
// Shows that the stoppable still stops when the thread is paused
err = m.SetStatus(false)
if err != nil {
t.Errorf("setStatus returned an error: %+v", err)
}
time.Sleep(3 * time.Millisecond)
if m.GetStatus() {
t.Errorf("GetStatus reported thread as running.")
}
err = stop.Close()
if err != nil {
t.Errorf("Failed to close stoppable: %+v", err)
}
time.Sleep(10 * time.Millisecond)
if !stop.IsStopped() {
t.Error("Stoppable never stopped.")
}
if m.GetStatus() {
t.Errorf("GetStatus reported thread as running.")
}
}
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/crypto/csprng"
"time" "time"
) // Error messages. ) // Error messages.
const ( const (
payloadSizeRngErr = "failed to generate random payload size: %+v" payloadSizeRngErr = "failed to generate random payload size: %+v"
) )
......
...@@ -33,18 +33,27 @@ const ( ...@@ -33,18 +33,27 @@ const (
func (m *Manager) sendThread(stop *stoppable.Single) { func (m *Manager) sendThread(stop *stoppable.Single) {
jww.DEBUG.Print("Starting dummy traffic sending thread.") jww.DEBUG.Print("Starting dummy traffic sending thread.")
timer := m.randomTimer() nextSendChan := make(<-chan time.Time)
nextSendChanPtr := &(nextSendChan)
for { for {
select { select {
case <-stop.Quit(): case <-stop.Quit():
jww.DEBUG.Print("Stopping dummy traffic sending thread: stoppable " + m.stopSendThread(stop)
"triggered")
stop.ToStopped()
return return
case <-timer.C: case status := <-m.statusChan:
timer = m.randomTimer() if status {
atomic.StoreUint32(&m.status, running)
nextSendChanPtr = &(m.randomTimer().C)
} else {
atomic.StoreUint32(&m.status, paused)
nextSendChan = make(<-chan time.Time)
nextSendChanPtr = &nextSendChan
}
case <-*nextSendChanPtr:
nextSendChanPtr = &(m.randomTimer().C)
go func() {
// Get list of random messages and recipients // Get list of random messages and recipients
rng := m.rng.GetStream() rng := m.rng.GetStream()
msgs, err := m.newRandomMessages(rng) msgs, err := m.newRandomMessages(rng)
...@@ -57,8 +66,20 @@ func (m *Manager) sendThread(stop *stoppable.Single) { ...@@ -57,8 +66,20 @@ func (m *Manager) sendThread(stop *stoppable.Single) {
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to send dummy messages: %+v", err) jww.FATAL.Panicf("Failed to send dummy messages: %+v", err)
} }
}()
}
} }
} }
// stopSendThread is triggered when the stoppable is triggered. It prints a
// debug message, sets the thread status to stopped, and sets the status of the
// stoppable to stopped.
func (m *Manager) stopSendThread(stop *stoppable.Single) {
jww.DEBUG.Print(
"Stopping dummy traffic sending thread: stoppable triggered")
atomic.StoreUint32(&m.status, stopped)
stop.ToStopped()
} }
// sendMessages generates and sends random messages. // sendMessages generates and sends random messages.
...@@ -70,24 +91,25 @@ func (m *Manager) sendMessages(msgs map[id.ID]format.Message) error { ...@@ -70,24 +91,25 @@ func (m *Manager) sendMessages(msgs map[id.ID]format.Message) error {
wg.Add(1) wg.Add(1)
go func(i int64, recipient id.ID, msg format.Message) { go func(i int64, recipient id.ID, msg format.Message) {
//fill the preiamge with random data to ensure it isnt repeatable defer wg.Done()
// Fill the preimage with random data to ensure it is not repeatable
p := params.GetDefaultCMIX() p := params.GetDefaultCMIX()
p.IdentityPreimage = make([]byte, 32) p.IdentityPreimage = make([]byte, 32)
rng := m.rng.GetStream() rng := m.rng.GetStream()
if _, err := rng.Read(p.IdentityPreimage); err != nil { if _, err := rng.Read(p.IdentityPreimage); err != nil {
jww.FATAL.Panicf("Failed to generate data for random "+ jww.FATAL.Panicf("Failed to generate data for random identity "+
"identity preimage in e2e send: %+v", err) "preimage in e2e send: %+v", err)
} }
rng.Close() rng.Close()
_, _, err := m.net.SendCMIX(msg, &recipient, p) _, _, err := m.net.SendCMIX(msg, &recipient, p)
if err != nil { if err != nil {
jww.WARN.Printf("failed to send dummy message %d/%d: %+v", jww.WARN.Printf("Failed to send dummy message %d/%d via "+
i, len(msgs), err) "SendCMIX: %+v", i, len(msgs), err)
} else { } else {
atomic.AddInt64(&sent, 1) atomic.AddInt64(&sent, 1)
} }
wg.Done()
}(i, recipient, msg) }(i, recipient, msg)
i++ i++
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"reflect" "reflect"
"sync/atomic"
"testing" "testing"
"time" "time"
) )
...@@ -25,6 +26,16 @@ func TestManager_sendThread(t *testing.T) { ...@@ -25,6 +26,16 @@ func TestManager_sendThread(t *testing.T) {
stop := stoppable.NewSingle("sendThreadTest") stop := stoppable.NewSingle("sendThreadTest")
go m.sendThread(stop) go m.sendThread(stop)
if stat := atomic.LoadUint32(&m.status); stat != notStarted {
t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d",
notStarted, stat)
}
err := m.SetStatus(true)
if err != nil {
t.Errorf("Failed to set status to true.")
}
msgChan := make(chan bool, 10) msgChan := make(chan bool, 10)
go func() { go func() {
var numReceived int var numReceived int
...@@ -58,7 +69,7 @@ func TestManager_sendThread(t *testing.T) { ...@@ -58,7 +69,7 @@ func TestManager_sendThread(t *testing.T) {
} }
} }
err := stop.Close() err = stop.Close()
if err != nil { if err != nil {
t.Errorf("Failed to close stoppable: %+v", err) t.Errorf("Failed to close stoppable: %+v", err)
} }
...@@ -67,6 +78,12 @@ func TestManager_sendThread(t *testing.T) { ...@@ -67,6 +78,12 @@ func TestManager_sendThread(t *testing.T) {
if !stop.IsStopped() { if !stop.IsStopped() {
t.Error("Stoppable never stopped.") t.Error("Stoppable never stopped.")
} }
if stat := atomic.LoadUint32(&m.status); stat != stopped {
t.Errorf("Unexpected thread status.\nexpected: %d\nreceived: %d",
stopped, stat)
}
} }
// Tests that Manager.sendMessages sends all the messages with the correct // Tests that Manager.sendMessages sends all the messages with the correct
......
...@@ -54,6 +54,7 @@ func newTestManager(maxNumMessages int, avgSendDelta, randomRange time.Duration, ...@@ -54,6 +54,7 @@ func newTestManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
maxNumMessages: maxNumMessages, maxNumMessages: maxNumMessages,
avgSendDelta: avgSendDelta, avgSendDelta: avgSendDelta,
randomRange: randomRange, randomRange: randomRange,
statusChan: make(chan bool, statusChanLen),
store: storage.InitTestingSession(t), store: storage.InitTestingSession(t),
net: newTestNetworkManager(sendErr, t), net: newTestNetworkManager(sendErr, t),
rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment