Skip to content
Snippets Groups Projects
Commit e71ddf23 authored by Josh Brooks's avatar Josh Brooks
Browse files

WIP: Light touch fix dummy messages

parent a50730d2
No related branches found
No related tags found
2 merge requests!510Release,!329Hotfix/dummy message bindings
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package bindings
import (
"gitlab.com/elixxir/client/dummy"
"time"
)
// DummyTraffic contains the file dummy traffic manager. The manager can be used
// to set and get the status of the send thread.
type DummyTraffic struct {
m *dummy.Manager
}
// NewDummyTrafficManager creates a DummyTraffic manager and initialises the
// dummy traffic send thread. Note that the manager does not start sending dummy
// traffic until its status is set to true using DummyTraffic.SetStatus.
// The maxNumMessages is the upper bound of the random number of messages sent
// each send. avgSendDeltaMS is the average duration, in milliseconds, to wait
// between sends. Sends occur every avgSendDeltaMS +/- a random duration with an
// upper bound of randomRangeMS.
func NewDummyTrafficManager(e2eID, maxNumMessages, avgSendDeltaMS,
randomRangeMS int) (*DummyTraffic, error) {
// Get user from singleton
user, err := e2eTrackerSingleton.get(e2eID)
if err != nil {
return nil, err
}
avgSendDelta := time.Duration(avgSendDeltaMS) * time.Millisecond
randomRange := time.Duration(randomRangeMS) * time.Millisecond
m := dummy.NewManager(
maxNumMessages, avgSendDelta, randomRange, user.api.Cmix)
return &DummyTraffic{m}, user.api.AddService(m.StartDummyTraffic)
}
// SetStatus sets the state of the dummy traffic send thread, which determines
// if the thread is running or paused. The possible statuses are:
// true = send thread is sending dummy messages
// false = send thread is paused/stopped and not sending dummy messages
// Returns an error if the channel is full.
// Note that this function cannot change the status of the send thread if it has
// yet to be started or stopped.
func (dt *DummyTraffic) SetStatus(status bool) error {
return dt.m.SetStatus(status)
}
// GetStatus returns the current state of the dummy traffic send thread. It has
// the following return values:
// true = send thread is sending dummy messages
// false = send thread is paused/stopped and not sending dummy messages
// Note that this function does not return the status set by SetStatus directly;
// it returns the current status of the send thread, which means any call to
// SetStatus will have a small delay before it is returned by GetStatus.
func (dt *DummyTraffic) GetStatus() bool {
return dt.m.GetStatus()
}
...@@ -12,6 +12,7 @@ package dummy ...@@ -12,6 +12,7 @@ package dummy
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/xxdk" "gitlab.com/elixxir/client/xxdk"
...@@ -56,23 +57,25 @@ type Manager struct { ...@@ -56,23 +57,25 @@ type Manager struct {
statusChan chan bool statusChan chan bool
// Cmix interfaces // Cmix interfaces
net *xxdk.Cmix net cmix.Client
store *storage.Session store storage.Session
rng *fastRNG.StreamGenerator rng *fastRNG.StreamGenerator
} }
// NewManager creates a new dummy Manager with the specified average send delta // NewManager creates a new dummy Manager with the specified average send delta
// and the range used for generating random durations. // and the range used for generating random durations.
func NewManager(maxNumMessages int, avgSendDelta, randomRange time.Duration, net *xxdk.Cmix) *Manager { func NewManager(maxNumMessages int,
clientStorage := net.GetStorage() avgSendDelta, randomRange time.Duration,
return newManager(maxNumMessages, avgSendDelta, randomRange, net, net *xxdk.Cmix) *Manager {
&clientStorage, net.GetRng())
return newManager(maxNumMessages, avgSendDelta, randomRange, net.GetCmix(),
net.GetStorage(), net.GetRng())
} }
// newManager builds a new dummy Manager from fields explicitly passed in. This // newManager builds a new dummy Manager from fields explicitly passed in. This
// function is a helper function for NewManager to make it easier to test. // function is a helper function for NewManager to make it easier to test.
func newManager(maxNumMessages int, avgSendDelta, randomRange time.Duration, func newManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
net *xxdk.Cmix, store *storage.Session, rng *fastRNG.StreamGenerator) *Manager { net cmix.Client, store storage.Session, rng *fastRNG.StreamGenerator) *Manager {
return &Manager{ return &Manager{
maxNumMessages: maxNumMessages, maxNumMessages: maxNumMessages,
avgSendDelta: avgSendDelta, avgSendDelta: avgSendDelta,
......
...@@ -59,7 +59,7 @@ func TestManager_StartDummyTraffic(t *testing.T) { ...@@ -59,7 +59,7 @@ func TestManager_StartDummyTraffic(t *testing.T) {
msgChan := make(chan bool) msgChan := make(chan bool)
go func() { go func() {
for m.networkManager.(*testNetworkManager).GetMsgListLen() == 0 { for m.net.(*mockCmix).GetMsgListLen() == 0 {
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
} }
msgChan <- true msgChan <- true
...@@ -71,7 +71,7 @@ func TestManager_StartDummyTraffic(t *testing.T) { ...@@ -71,7 +71,7 @@ func TestManager_StartDummyTraffic(t *testing.T) {
t.Errorf("Timed out after %s waiting for messages to be sent.", t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta) 3*m.avgSendDelta)
case <-msgChan: case <-msgChan:
numReceived += m.networkManager.(*testNetworkManager).GetMsgListLen() numReceived += m.net.(*mockCmix).GetMsgListLen()
} }
err = stop.Close() err = stop.Close()
...@@ -86,7 +86,7 @@ func TestManager_StartDummyTraffic(t *testing.T) { ...@@ -86,7 +86,7 @@ func TestManager_StartDummyTraffic(t *testing.T) {
msgChan = make(chan bool) msgChan = make(chan bool)
go func() { go func() {
for m.networkManager.(*testNetworkManager).GetMsgListLen() == numReceived { for m.net.(*mockCmix).GetMsgListLen() == numReceived {
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
} }
msgChan <- true msgChan <- true
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package dummy
import (
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/gateway"
"gitlab.com/elixxir/client/cmix/identity"
"gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"sync"
"time"
)
// mockCmix is a testing structure that adheres to cmix.Client.
type mockCmix struct {
messages map[id.ID][]byte
sync.RWMutex
}
func newMockCmix() cmix.Client {
return &mockCmix{
messages: make(map[id.ID][]byte),
}
}
func (m *mockCmix) Send(recipient *id.ID, fingerprint format.Fingerprint, service message.Service, payload, mac []byte, cmixParams cmix.CMIXParams) (id.Round, ephemeral.Id, error) {
m.Lock()
defer m.Unlock()
m.messages[*recipient] = payload
return 0, ephemeral.Id{}, nil
}
func (m *mockCmix) GetMsgListLen() int {
m.RLock()
defer m.RUnlock()
return len(m.messages)
}
func (m *mockCmix) GetMsgList() map[id.ID][]byte {
m.RLock()
defer m.RUnlock()
return m.messages
}
func (m mockCmix) Follow(report cmix.ClientErrorReport) (stoppable.Stoppable, error) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetMaxMessageLength() int {
//TODO implement me
panic("implement me")
}
func (m *mockCmix) SendMany(messages []cmix.TargetedCmixMessage, p cmix.CMIXParams) (id.Round, []ephemeral.Id, error) {
//TODO implement me
panic("implement me")
}
func (m *mockCmix) AddIdentity(id *id.ID, validUntil time.Time, persistent bool) {
//TODO implement me
panic("implement me")
}
func (m *mockCmix) RemoveIdentity(id *id.ID) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetIdentity(get *id.ID) (identity.TrackedID, error) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) AddFingerprint(identity *id.ID, fingerprint format.Fingerprint, mp message.Processor) error {
//TODO implement me
panic("implement me")
}
func (m mockCmix) DeleteFingerprint(identity *id.ID, fingerprint format.Fingerprint) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) DeleteClientFingerprints(identity *id.ID) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) AddService(clientID *id.ID, newService message.Service, response message.Processor) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) DeleteService(clientID *id.ID, toDelete message.Service, processor message.Processor) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) DeleteClientService(clientID *id.ID) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) TrackServices(tracker message.ServicesTracker) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) CheckInProgressMessages() {
//TODO implement me
panic("implement me")
}
func (m mockCmix) IsHealthy() bool {
//TODO implement me
panic("implement me")
}
func (m mockCmix) WasHealthy() bool {
//TODO implement me
panic("implement me")
}
func (m mockCmix) AddHealthCallback(f func(bool)) uint64 {
//TODO implement me
panic("implement me")
}
func (m mockCmix) RemoveHealthCallback(u uint64) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) HasNode(nid *id.ID) bool {
//TODO implement me
panic("implement me")
}
func (m mockCmix) NumRegisteredNodes() int {
//TODO implement me
panic("implement me")
}
func (m mockCmix) TriggerNodeRegistration(nid *id.ID) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetRoundResults(timeout time.Duration, roundCallback cmix.RoundEventCallback, roundList ...id.Round) error {
//TODO implement me
panic("implement me")
}
func (m mockCmix) LookupHistoricalRound(rid id.Round, callback rounds.RoundResultCallback) error {
//TODO implement me
panic("implement me")
}
func (m mockCmix) SendToAny(sendFunc func(host *connect.Host) (interface{}, error), stop *stoppable.Single) (interface{}, error) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) SendToPreferred(targets []*id.ID, sendFunc gateway.SendToPreferredFunc, stop *stoppable.Single, timeout time.Duration) (interface{}, error) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) SetGatewayFilter(f gateway.Filter) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetHostParams() connect.HostParams {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetAddressSpace() uint8 {
//TODO implement me
panic("implement me")
}
func (m mockCmix) RegisterAddressSpaceNotification(tag string) (chan uint8, error) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) UnregisterAddressSpaceNotification(tag string) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetInstance() *network.Instance {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetVerboseRounds() string {
//TODO implement me
panic("implement me")
}
...@@ -101,7 +101,7 @@ func (m *Manager) sendMessages(msgs map[id.ID]format.Message, rng csprng.Source) ...@@ -101,7 +101,7 @@ func (m *Manager) sendMessages(msgs map[id.ID]format.Message, rng csprng.Source)
//Send(recipient *id.ID, fingerprint format.Fingerprint, //Send(recipient *id.ID, fingerprint format.Fingerprint,
// service message.Service, payload, mac []byte, cmixParams CMIXParams) ( // service message.Service, payload, mac []byte, cmixParams CMIXParams) (
// id.Round, ephemeral.Id, error) // id.Round, ephemeral.Id, error)
_, _, err := m.net.GetCmix().Send(&recipient, msg.GetKeyFP(), _, _, err := m.net.Send(&recipient, msg.GetKeyFP(),
message.GetRandomService(rng), msg.GetContents(), msg.GetMac(), p) message.GetRandomService(rng), msg.GetContents(), msg.GetMac(), p)
if err != nil { if err != nil {
jww.WARN.Printf("Failed to send dummy message %d/%d via "+ jww.WARN.Printf("Failed to send dummy message %d/%d via "+
...@@ -152,8 +152,7 @@ func (m *Manager) newRandomMessages(rng csprng.Source) ( ...@@ -152,8 +152,7 @@ func (m *Manager) newRandomMessages(rng csprng.Source) (
// generated payload, fingerprint, and MAC. // generated payload, fingerprint, and MAC.
func (m *Manager) newRandomCmixMessage(rng csprng.Source) (format.Message, error) { func (m *Manager) newRandomCmixMessage(rng csprng.Source) (format.Message, error) {
// Create new empty cMix message // Create new empty cMix message
clientStorage := *m.store cMixMsg := format.NewMessage(m.store.GetCmixGroup().GetP().ByteLen())
cMixMsg := format.NewMessage(clientStorage.GetCmixGroup().GetP().ByteLen())
// Generate random message // Generate random message
randomMsg, err := newRandomPayload(cMixMsg.ContentsSize(), rng) randomMsg, err := newRandomPayload(cMixMsg.ContentsSize(), rng)
......
...@@ -40,10 +40,10 @@ func TestManager_sendThread(t *testing.T) { ...@@ -40,10 +40,10 @@ func TestManager_sendThread(t *testing.T) {
go func() { go func() {
var numReceived int var numReceived int
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
for m.networkManager.(*testNetworkManager).GetMsgListLen() == numReceived { for m.net.(*mockCmix).GetMsgListLen() == numReceived {
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
} }
numReceived = m.networkManager.(*testNetworkManager).GetMsgListLen() numReceived = m.net.(*mockCmix).GetMsgListLen()
msgChan <- true msgChan <- true
} }
}() }()
...@@ -54,7 +54,7 @@ func TestManager_sendThread(t *testing.T) { ...@@ -54,7 +54,7 @@ func TestManager_sendThread(t *testing.T) {
t.Errorf("Timed out after %s waiting for messages to be sent.", t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta) 3*m.avgSendDelta)
case <-msgChan: case <-msgChan:
numReceived += m.networkManager.(*testNetworkManager).GetMsgListLen() numReceived += m.net.(*mockCmix).GetMsgListLen()
} }
select { select {
...@@ -62,10 +62,10 @@ func TestManager_sendThread(t *testing.T) { ...@@ -62,10 +62,10 @@ func TestManager_sendThread(t *testing.T) {
t.Errorf("Timed out after %s waiting for messages to be sent.", t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta) 3*m.avgSendDelta)
case <-msgChan: case <-msgChan:
if m.networkManager.(*testNetworkManager).GetMsgListLen() <= numReceived { if m.net.(*mockCmix).GetMsgListLen() <= numReceived {
t.Errorf("Failed to receive second send."+ t.Errorf("Failed to receive second send."+
"\nmessages on last receive: %d\nmessages on this receive: %d", "\nmessages on last receive: %d\nmessages on this receive: %d",
numReceived, m.networkManager.(*testNetworkManager).GetMsgListLen()) numReceived, m.net.(*mockCmix).GetMsgListLen())
} }
} }
...@@ -109,13 +109,13 @@ func TestManager_sendMessages(t *testing.T) { ...@@ -109,13 +109,13 @@ func TestManager_sendMessages(t *testing.T) {
} }
// Send the messages // Send the messages
err := m.sendMessages(msgs) err := m.sendMessages(msgs, prng)
if err != nil { if err != nil {
t.Errorf("sendMessages returned an error: %+v", err) t.Errorf("sendMessages returned an error: %+v", err)
} }
// get sent messages // get sent messages
receivedMsgs := m.networkManager.(*testNetworkManager).GetMsgList() receivedMsgs := m.net.(*mockCmix).GetMsgList()
// Test that all messages were received // Test that all messages were received
if len(receivedMsgs) != len(msgs) { if len(receivedMsgs) != len(msgs) {
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/ndf"
"io" "io"
"math/rand" "math/rand"
"testing" "testing"
...@@ -42,9 +43,52 @@ func newTestManager(maxNumMessages int, avgSendDelta, randomRange time.Duration, ...@@ -42,9 +43,52 @@ func newTestManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
avgSendDelta: avgSendDelta, avgSendDelta: avgSendDelta,
randomRange: randomRange, randomRange: randomRange,
statusChan: make(chan bool, statusChanLen), statusChan: make(chan bool, statusChanLen),
store: &store, store: store,
net: newMockCmix(),
rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG),
} }
return m return m
} }
func getNDF() *ndf.NetworkDefinition {
return &ndf.NetworkDefinition{
E2E: ndf.Group{
Prime: "E2EE983D031DC1DB6F1A7A67DF0E9A8E5561DB8E8D49413394C049B7A" +
"8ACCEDC298708F121951D9CF920EC5D146727AA4AE535B0922C688B55B3D" +
"D2AEDF6C01C94764DAB937935AA83BE36E67760713AB44A6337C20E78615" +
"75E745D31F8B9E9AD8412118C62A3E2E29DF46B0864D0C951C394A5CBBDC" +
"6ADC718DD2A3E041023DBB5AB23EBB4742DE9C1687B5B34FA48C3521632C" +
"4A530E8FFB1BC51DADDF453B0B2717C2BC6669ED76B4BDD5C9FF558E88F2" +
"6E5785302BEDBCA23EAC5ACE92096EE8A60642FB61E8F3D24990B8CB12EE" +
"448EEF78E184C7242DD161C7738F32BF29A841698978825B4111B4BC3E1E" +
"198455095958333D776D8B2BEEED3A1A1A221A6E37E664A64B83981C46FF" +
"DDC1A45E3D5211AAF8BFBC072768C4F50D7D7803D2D4F278DE8014A47323" +
"631D7E064DE81C0C6BFA43EF0E6998860F1390B5D3FEACAF1696015CB79C" +
"3F9C2D93D961120CD0E5F12CBB687EAB045241F96789C38E89D796138E63" +
"19BE62E35D87B1048CA28BE389B575E994DCA755471584A09EC723742DC3" +
"5873847AEF49F66E43873",
Generator: "2",
},
CMIX: ndf.Group{
Prime: "9DB6FB5951B66BB6FE1E140F1D2CE5502374161FD6538DF1648218642" +
"F0B5C48C8F7A41AADFA187324B87674FA1822B00F1ECF8136943D7C55757" +
"264E5A1A44FFE012E9936E00C1D3E9310B01C7D179805D3058B2A9F4BB6F" +
"9716BFE6117C6B5B3CC4D9BE341104AD4A80AD6C94E005F4B993E14F091E" +
"B51743BF33050C38DE235567E1B34C3D6A5C0CEAA1A0F368213C3D19843D" +
"0B4B09DCB9FC72D39C8DE41F1BF14D4BB4563CA28371621CAD3324B6A2D3" +
"92145BEBFAC748805236F5CA2FE92B871CD8F9C36D3292B5509CA8CAA77A" +
"2ADFC7BFD77DDA6F71125A7456FEA153E433256A2261C6A06ED3693797E7" +
"995FAD5AABBCFBE3EDA2741E375404AE25B",
Generator: "5C7FF6B06F8F143FE8288433493E4769C4D988ACE5BE25A0E2480" +
"9670716C613D7B0CEE6932F8FAA7C44D2CB24523DA53FBE4F6EC3595892D" +
"1AA58C4328A06C46A15662E7EAA703A1DECF8BBB2D05DBE2EB956C142A33" +
"8661D10461C0D135472085057F3494309FFA73C611F78B32ADBB5740C361" +
"C9F35BE90997DB2014E2EF5AA61782F52ABEB8BD6432C4DD097BC5423B28" +
"5DAFB60DC364E8161F4A2A35ACA3A10B1C4D203CC76A470A33AFDCBDD929" +
"59859ABD8B56E1725252D78EAC66E71BA9AE3F1DD2487199874393CD4D83" +
"2186800654760E1E34C09E4D155179F9EC0DC4473F996BDCE6EED1CABED8" +
"B6F116F7AD9CF505DF0F998E34AB27514B0FFE7",
},
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment