Skip to content
Snippets Groups Projects
Commit 2ae8ef36 authored by Jake Taylor's avatar Jake Taylor :lips:
Browse files

Merge branch 'hotfix/DummyMessageBindings' into 'release'

Hotfix/dummy message bindings

See merge request !329
parents c026d27b 0b664bff
No related branches found
No related tags found
2 merge requests!510Release,!329Hotfix/dummy message bindings
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package bindings
import (
"gitlab.com/elixxir/client/dummy"
"time"
)
// DummyTraffic is the bindings-layer dummy (or "cover") traffic manager. T
// The manager can be used to set and get the status of the thread responsible for
// sending dummy messages.
type DummyTraffic struct {
m *dummy.Manager
}
// NewDummyTrafficManager creates a DummyTraffic manager and initialises the
// dummy traffic sending thread. Note that the manager does not start sending dummy
// traffic until `True` is passed into DummyTraffic.SetStatus. The time duration
// between each sending operation and the amount of messages sent each interval
// are randomly generated values with bounds defined by the
// given parameters below.
//
// Params:
// - cmixId - a Cmix object ID in the tracker.
// - maxNumMessages - the upper bound of the random number of messages sent
// each sending cycle.
// - avgSendDeltaMS - the average duration, in milliseconds, to wait
// between sends.
// - randomRangeMS - the upper bound of the interval between sending cycles,
// in milliseconds. Sends occur every avgSendDeltaMS +/- a random duration
// with an upper bound of randomRangeMS.
func NewDummyTrafficManager(cmixId, maxNumMessages, avgSendDeltaMS,
randomRangeMS int) (*DummyTraffic, error) {
// Get user from singleton
net, err := cmixTrackerSingleton.get(cmixId)
if err != nil {
return nil, err
}
avgSendDelta := time.Duration(avgSendDeltaMS) * time.Millisecond
randomRange := time.Duration(randomRangeMS) * time.Millisecond
m := dummy.NewManager(
maxNumMessages, avgSendDelta, randomRange, net.api)
return &DummyTraffic{m}, net.api.AddService(m.StartDummyTraffic)
}
// SetStatus sets the state of the dummy traffic send thread by passing in
// a boolean parameter. There may be a small delay in between this call
// and the status of the sending thread to change accordingly. For example,
// passing False into this call while the sending thread is currently sending messages
// will not cancel nor halt the sending operation, but will pause the thread once that
// operation has completed.
//
// Params:
// - boolean - True: Sending thread is sending dummy messages.
// False: Sending thread is paused/stopped and is not sending dummy messages.
// Returns:
// - error - if the DummyTraffic.SetStatus is called too frequently, causing the
// internal status channel to fill.
func (dt *DummyTraffic) SetStatus(status bool) error {
return dt.m.SetStatus(status)
}
// GetStatus returns the current state of the dummy traffic sending thread.
// Note that this function does not return the status set by the most recent call to
// SetStatus directly. Instead, this call returns the current status of the sending thread.
// This is due to the small delay that may occur between calling SetStatus and the
// sending thread taking into effect that status change.
//
// Returns:
// - boolean - True: Sending thread is sending dummy messages.
// - False: Sending thread is paused/stopped and is not sending dummy messages.
func (dt *DummyTraffic) GetStatus() bool {
return dt.m.GetStatus()
}
......@@ -12,7 +12,7 @@ package dummy
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/xxdk"
......@@ -57,26 +57,38 @@ type Manager struct {
statusChan chan bool
// Cmix interfaces
net *xxdk.Cmix
store *storage.Session
networkManager interfaces.NetworkManager
rng *fastRNG.StreamGenerator
net cmix.Client
store storage.Session
rng *fastRNG.StreamGenerator
}
// NewManager creates a new dummy Manager with the specified average send delta
// and the range used for generating random durations.
func NewManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
net *xxdk.Cmix, manager interfaces.NetworkManager) *Manager {
clientStorage := net.GetStorage()
return newManager(maxNumMessages, avgSendDelta, randomRange, net,
&clientStorage, manager, net.GetRng())
// NewManager creates a Manager object and initialises the
// dummy traffic sending thread. Note that the Manager does not start sending dummy
// traffic until True is passed into Manager.SetStatus. The time duration
// between each sending operation and the amount of messages sent each interval
// are randomly generated values with bounds defined by the
// given parameters below.
//
// Params:
// - maxNumMessages - the upper bound of the random number of messages sent
// each sending cycle.
// - avgSendDeltaMS - the average duration, in milliseconds, to wait
// between sends.
// - randomRangeMS - the upper bound of the interval between sending cycles.
// Sends occur every avgSendDeltaMS +/- a random duration with an
// upper bound of randomRangeMS
func NewManager(maxNumMessages int,
avgSendDelta, randomRange time.Duration,
net *xxdk.Cmix) *Manager {
return newManager(maxNumMessages, avgSendDelta, randomRange, net.GetCmix(),
net.GetStorage(), net.GetRng())
}
// newManager builds a new dummy Manager from fields explicitly passed in. This
// function is a helper function for NewManager to make it easier to test.
func newManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
net *xxdk.Cmix, store *storage.Session, networkManager interfaces.NetworkManager,
rng *fastRNG.StreamGenerator) *Manager {
net cmix.Client, store storage.Session, rng *fastRNG.StreamGenerator) *Manager {
return &Manager{
maxNumMessages: maxNumMessages,
avgSendDelta: avgSendDelta,
......@@ -85,7 +97,6 @@ func newManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
statusChan: make(chan bool, statusChanLen),
net: net,
store: store,
networkManager: networkManager,
rng: rng,
}
}
......@@ -99,13 +110,19 @@ func (m *Manager) StartDummyTraffic() (stoppable.Stoppable, error) {
return stop, nil
}
// SetStatus sets the state of the dummy traffic send thread, which determines
// if the thread is running or paused. The possible statuses are:
// true = send thread is sending dummy messages
// false = send thread is paused/stopped and not sending dummy messages
// Returns an error if the channel is full.
// Note that this function cannot change the status of the send thread if it has
// yet to be started via StartDummyTraffic or if it has been stopped.
// SetStatus sets the state of the dummy traffic send thread by passing in
// a boolean parameter. There may be a small delay in between this call
// and the status of the sending thread to change accordingly. For example,
// passing False into this call while the sending thread is currently sending messages
// will not cancel nor halt the sending operation, but will pause the thread once that
// operation has completed.
//
// Params:
// - boolean - True: Sending thread is sending dummy messages.
// False: Sending thread is paused/stopped and is not sending dummy messages
// Returns:
// - error - if the DummyTraffic.SetStatus is called too frequently, causing the
// internal status channel to fill.
func (m *Manager) SetStatus(status bool) error {
select {
case m.statusChan <- status:
......@@ -115,13 +132,15 @@ func (m *Manager) SetStatus(status bool) error {
}
}
// GetStatus returns the current state of the dummy traffic send thread. It has
// the following return values:
// true = send thread is sending dummy messages
// false = send thread is paused/stopped and not sending dummy messages
// Note that this function does not return the status set by SetStatus directly;
// it returns the current status of the send thread, which means any call to
// SetStatus will have a small delay before it is returned by GetStatus.
// GetStatus returns the current state of the dummy traffic sending thread.
// Note that this function does not return the status set by the most recent call to
// SetStatus directly. Instead, this call returns the current status of the sending thread.
// This is due to the small delay that may occur between calling SetStatus and the
// sending thread taking into effect that status change.
//
// Returns:
// - boolean - True: Sending thread is sending dummy messages.
// - False: Sending thread is paused/stopped and is not sending dummy messages.
func (m *Manager) GetStatus() bool {
switch atomic.LoadUint32(&m.status) {
case running:
......
......@@ -27,7 +27,7 @@ func Test_newManager(t *testing.T) {
}
received := newManager(expected.maxNumMessages, expected.avgSendDelta,
expected.randomRange, nil, nil, nil, nil)
expected.randomRange, nil, nil, nil)
if statusChanLen != cap(received.statusChan) {
t.Errorf("Capacity of status channel unexpected."+
......@@ -59,7 +59,7 @@ func TestManager_StartDummyTraffic(t *testing.T) {
msgChan := make(chan bool)
go func() {
for m.networkManager.(*testNetworkManager).GetMsgListLen() == 0 {
for m.net.(*mockCmix).GetMsgListLen() == 0 {
time.Sleep(5 * time.Millisecond)
}
msgChan <- true
......@@ -71,7 +71,7 @@ func TestManager_StartDummyTraffic(t *testing.T) {
t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta)
case <-msgChan:
numReceived += m.networkManager.(*testNetworkManager).GetMsgListLen()
numReceived += m.net.(*mockCmix).GetMsgListLen()
}
err = stop.Close()
......@@ -86,7 +86,7 @@ func TestManager_StartDummyTraffic(t *testing.T) {
msgChan = make(chan bool)
go func() {
for m.networkManager.(*testNetworkManager).GetMsgListLen() == numReceived {
for m.net.(*mockCmix).GetMsgListLen() == numReceived {
time.Sleep(5 * time.Millisecond)
}
msgChan <- true
......@@ -118,10 +118,10 @@ func TestManager_SetStatus(t *testing.T) {
go func() {
var numReceived int
for i := 0; i < 2; i++ {
for m.networkManager.(*testNetworkManager).GetMsgListLen() == numReceived {
for m.net.(*mockCmix).GetMsgListLen() == numReceived {
time.Sleep(5 * time.Millisecond)
}
numReceived = m.networkManager.(*testNetworkManager).GetMsgListLen()
numReceived = m.net.(*mockCmix).GetMsgListLen()
msgChan <- true
}
}()
......@@ -161,7 +161,7 @@ func TestManager_SetStatus(t *testing.T) {
t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta)
case <-msgChan:
numReceived += m.networkManager.(*testNetworkManager).GetMsgListLen()
numReceived += m.net.(*mockCmix).GetMsgListLen()
}
// Setting status to true multiple times does not interrupt sending
......@@ -177,10 +177,10 @@ func TestManager_SetStatus(t *testing.T) {
t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta)
case <-msgChan:
if m.networkManager.(*testNetworkManager).GetMsgListLen() <= numReceived {
if m.net.(*mockCmix).GetMsgListLen() <= numReceived {
t.Errorf("Failed to receive second send."+
"\nmessages on last receive: %d\nmessages on this receive: %d",
numReceived, m.networkManager.(*testNetworkManager).GetMsgListLen())
numReceived, m.net.(*mockCmix).GetMsgListLen())
}
}
......@@ -254,10 +254,10 @@ func TestManager_GetStatus(t *testing.T) {
go func() {
var numReceived int
for i := 0; i < 2; i++ {
for m.networkManager.(*testNetworkManager).GetMsgListLen() == numReceived {
for m.net.(*mockCmix).GetMsgListLen() == numReceived {
time.Sleep(5 * time.Millisecond)
}
numReceived = m.networkManager.(*testNetworkManager).GetMsgListLen()
numReceived = m.net.(*mockCmix).GetMsgListLen()
msgChan <- true
}
}()
......@@ -292,7 +292,7 @@ func TestManager_GetStatus(t *testing.T) {
t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta)
case <-msgChan:
numReceived += m.networkManager.(*testNetworkManager).GetMsgListLen()
numReceived += m.net.(*mockCmix).GetMsgListLen()
}
// Setting status to true multiple times does not interrupt sending
......@@ -311,10 +311,10 @@ func TestManager_GetStatus(t *testing.T) {
t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta)
case <-msgChan:
if m.networkManager.(*testNetworkManager).GetMsgListLen() <= numReceived {
if m.net.(*mockCmix).GetMsgListLen() <= numReceived {
t.Errorf("Failed to receive second send."+
"\nmessages on last receive: %d\nmessages on this receive: %d",
numReceived, m.networkManager.(*testNetworkManager).GetMsgListLen())
numReceived, m.net.(*mockCmix).GetMsgListLen())
}
}
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package dummy
import (
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/gateway"
"gitlab.com/elixxir/client/cmix/identity"
"gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"sync"
"time"
)
// mockCmix is a testing structure that adheres to cmix.Client.
type mockCmix struct {
messages map[id.ID][]byte
sync.RWMutex
}
func newMockCmix() cmix.Client {
return &mockCmix{
messages: make(map[id.ID][]byte),
}
}
func (m *mockCmix) Send(recipient *id.ID, fingerprint format.Fingerprint, service message.Service, payload, mac []byte, cmixParams cmix.CMIXParams) (id.Round, ephemeral.Id, error) {
m.Lock()
defer m.Unlock()
m.messages[*recipient] = fingerprint.Bytes()
return 0, ephemeral.Id{}, nil
}
func (m *mockCmix) GetMsgListLen() int {
m.RLock()
defer m.RUnlock()
return len(m.messages)
}
func (m *mockCmix) GetMsgList() map[id.ID][]byte {
m.RLock()
defer m.RUnlock()
return m.messages
}
func (m mockCmix) Follow(report cmix.ClientErrorReport) (stoppable.Stoppable, error) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetMaxMessageLength() int {
//TODO implement me
panic("implement me")
}
func (m *mockCmix) SendMany(messages []cmix.TargetedCmixMessage, p cmix.CMIXParams) (id.Round, []ephemeral.Id, error) {
//TODO implement me
panic("implement me")
}
func (m *mockCmix) AddIdentity(id *id.ID, validUntil time.Time, persistent bool) {
//TODO implement me
panic("implement me")
}
func (m *mockCmix) RemoveIdentity(id *id.ID) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetIdentity(get *id.ID) (identity.TrackedID, error) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) AddFingerprint(identity *id.ID, fingerprint format.Fingerprint, mp message.Processor) error {
//TODO implement me
panic("implement me")
}
func (m mockCmix) DeleteFingerprint(identity *id.ID, fingerprint format.Fingerprint) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) DeleteClientFingerprints(identity *id.ID) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) AddService(clientID *id.ID, newService message.Service, response message.Processor) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) DeleteService(clientID *id.ID, toDelete message.Service, processor message.Processor) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) DeleteClientService(clientID *id.ID) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) TrackServices(tracker message.ServicesTracker) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) CheckInProgressMessages() {
//TODO implement me
panic("implement me")
}
func (m mockCmix) IsHealthy() bool {
//TODO implement me
panic("implement me")
}
func (m mockCmix) WasHealthy() bool {
//TODO implement me
panic("implement me")
}
func (m mockCmix) AddHealthCallback(f func(bool)) uint64 {
//TODO implement me
panic("implement me")
}
func (m mockCmix) RemoveHealthCallback(u uint64) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) HasNode(nid *id.ID) bool {
//TODO implement me
panic("implement me")
}
func (m mockCmix) NumRegisteredNodes() int {
//TODO implement me
panic("implement me")
}
func (m mockCmix) TriggerNodeRegistration(nid *id.ID) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetRoundResults(timeout time.Duration, roundCallback cmix.RoundEventCallback, roundList ...id.Round) error {
//TODO implement me
panic("implement me")
}
func (m mockCmix) LookupHistoricalRound(rid id.Round, callback rounds.RoundResultCallback) error {
//TODO implement me
panic("implement me")
}
func (m mockCmix) SendToAny(sendFunc func(host *connect.Host) (interface{}, error), stop *stoppable.Single) (interface{}, error) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) SendToPreferred(targets []*id.ID, sendFunc gateway.SendToPreferredFunc, stop *stoppable.Single, timeout time.Duration) (interface{}, error) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) SetGatewayFilter(f gateway.Filter) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetHostParams() connect.HostParams {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetAddressSpace() uint8 {
//TODO implement me
panic("implement me")
}
func (m mockCmix) RegisterAddressSpaceNotification(tag string) (chan uint8, error) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) UnregisterAddressSpaceNotification(tag string) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetInstance() *network.Instance {
//TODO implement me
panic("implement me")
}
func (m mockCmix) GetVerboseRounds() string {
//TODO implement me
panic("implement me")
}
......@@ -8,6 +8,7 @@
package dummy
import (
"gitlab.com/elixxir/client/cmix/message"
"sync"
"sync/atomic"
"time"
......@@ -32,7 +33,7 @@ const (
// sendThread is a thread that sends the dummy messages at random intervals.
func (m *Manager) sendThread(stop *stoppable.Single) {
jww.DEBUG.Print("Starting dummy traffic sending thread.")
jww.INFO.Print("Starting dummy traffic sending thread.")
nextSendChan := make(<-chan time.Time)
nextSendChanPtr := &(nextSendChan)
......@@ -57,15 +58,16 @@ func (m *Manager) sendThread(stop *stoppable.Single) {
go func() {
// get list of random messages and recipients
rng := m.rng.GetStream()
defer rng.Close()
msgs, err := m.newRandomMessages(rng)
if err != nil {
jww.FATAL.Panicf("Failed to generate dummy messages: %+v", err)
jww.ERROR.Printf("Failed to generate dummy messages: %+v", err)
return
}
rng.Close()
err = m.sendMessages(msgs)
err = m.sendMessages(msgs, rng)
if err != nil {
jww.FATAL.Panicf("Failed to send dummy messages: %+v", err)
jww.ERROR.Printf("Failed to send dummy messages: %+v", err)
}
}()
......@@ -84,7 +86,7 @@ func (m *Manager) stopSendThread(stop *stoppable.Single) {
}
// sendMessages generates and sends random messages.
func (m *Manager) sendMessages(msgs map[id.ID]format.Message) error {
func (m *Manager) sendMessages(msgs map[id.ID]format.Message, rng csprng.Source) error {
var sent, i int64
var wg sync.WaitGroup
......@@ -95,18 +97,9 @@ func (m *Manager) sendMessages(msgs map[id.ID]format.Message) error {
defer wg.Done()
// Fill the preimage with random data to ensure it is not repeatable
p := cmix.GetDefaultParams()
// FIXME: these fields no longer available
// through these params objects
// p.IdentityPreimage = make([]byte, 32)
// rng := m.rng.GetStream()
// if _, err := rng.Read(p.IdentityPreimage); err != nil {
// jww.FATAL.Panicf("Failed to generate data for random identity "+
// "preimage in e2e send: %+v", err)
// }
// rng.Close()
// p.DebugTag = "dummy"
_, _, err := m.networkManager.SendCMIX(msg, &recipient, p)
p := cmix.GetDefaultCMIXParams()
_, _, err := m.net.Send(&recipient, msg.GetKeyFP(),
message.GetRandomService(rng), msg.GetContents(), msg.GetMac(), p)
if err != nil {
jww.WARN.Printf("Failed to send dummy message %d/%d via "+
"Send: %+v", i, len(msgs), err)
......@@ -156,8 +149,7 @@ func (m *Manager) newRandomMessages(rng csprng.Source) (
// generated payload, fingerprint, and MAC.
func (m *Manager) newRandomCmixMessage(rng csprng.Source) (format.Message, error) {
// Create new empty cMix message
clientStorage := *m.store
cMixMsg := format.NewMessage(clientStorage.GetCmixGroup().GetP().ByteLen())
cMixMsg := format.NewMessage(m.store.GetCmixGroup().GetP().ByteLen())
// Generate random message
randomMsg, err := newRandomPayload(cMixMsg.ContentsSize(), rng)
......
......@@ -40,10 +40,10 @@ func TestManager_sendThread(t *testing.T) {
go func() {
var numReceived int
for i := 0; i < 2; i++ {
for m.networkManager.(*testNetworkManager).GetMsgListLen() == numReceived {
for m.net.(*mockCmix).GetMsgListLen() == numReceived {
time.Sleep(5 * time.Millisecond)
}
numReceived = m.networkManager.(*testNetworkManager).GetMsgListLen()
numReceived = m.net.(*mockCmix).GetMsgListLen()
msgChan <- true
}
}()
......@@ -54,7 +54,7 @@ func TestManager_sendThread(t *testing.T) {
t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta)
case <-msgChan:
numReceived += m.networkManager.(*testNetworkManager).GetMsgListLen()
numReceived += m.net.(*mockCmix).GetMsgListLen()
}
select {
......@@ -62,10 +62,10 @@ func TestManager_sendThread(t *testing.T) {
t.Errorf("Timed out after %s waiting for messages to be sent.",
3*m.avgSendDelta)
case <-msgChan:
if m.networkManager.(*testNetworkManager).GetMsgListLen() <= numReceived {
if m.net.(*mockCmix).GetMsgListLen() <= numReceived {
t.Errorf("Failed to receive second send."+
"\nmessages on last receive: %d\nmessages on this receive: %d",
numReceived, m.networkManager.(*testNetworkManager).GetMsgListLen())
numReceived, m.net.(*mockCmix).GetMsgListLen())
}
}
......@@ -109,13 +109,13 @@ func TestManager_sendMessages(t *testing.T) {
}
// Send the messages
err := m.sendMessages(msgs)
err := m.sendMessages(msgs, prng)
if err != nil {
t.Errorf("sendMessages returned an error: %+v", err)
}
// get sent messages
receivedMsgs := m.networkManager.(*testNetworkManager).GetMsgList()
receivedMsgs := m.net.(*mockCmix).GetMsgList()
// Test that all messages were received
if len(receivedMsgs) != len(msgs) {
......@@ -128,9 +128,10 @@ func TestManager_sendMessages(t *testing.T) {
receivedMsg, exists := receivedMsgs[recipient]
if !exists {
t.Errorf("Failed to receive message from %s: %+v", &recipient, msg)
} else if !reflect.DeepEqual(msg, receivedMsg) {
} else if !reflect.DeepEqual(msg.GetKeyFP().Bytes(), receivedMsg) {
// In mockCmix.Send, we map recipientId to the passed fingerprint.
t.Errorf("Received unexpected message for recipient %s."+
"\nexpected: %+v\nreceived: %+v", &recipient, msg, receivedMsg)
"\nexpected: %+v\nreceived: %+v", &recipient, msg.GetKeyFP(), receivedMsg)
}
}
}
......
......@@ -8,27 +8,12 @@
package dummy
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/gateway"
"gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/client/event"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/e2e"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/ndf"
"io"
"math/rand"
"sync"
"testing"
"time"
)
......@@ -58,156 +43,14 @@ func newTestManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
avgSendDelta: avgSendDelta,
randomRange: randomRange,
statusChan: make(chan bool, statusChanLen),
store: &store,
networkManager: newTestNetworkManager(sendErr, t),
store: store,
net: newMockCmix(),
rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG),
}
return m
}
// ////////////////////////////////////////////////////////////////////////////////
// // Test Network State //
// ////////////////////////////////////////////////////////////////////////////////
// // testNetworkManager is a test implementation of NetworkManager interface.
type testNetworkManager struct {
instance *network.Instance
messages map[id.ID]format.Message
sendErr bool
sync.RWMutex
}
func newTestNetworkManager(sendErr bool, t *testing.T) interfaces.NetworkManager {
instanceComms := &connect.ProtoComms{
Manager: connect.NewManagerTesting(t),
}
thisInstance, err := network.NewInstanceTesting(instanceComms, getNDF(),
getNDF(), nil, nil, t)
if err != nil {
t.Fatalf("Failed to create new test instance: %v", err)
}
return &testNetworkManager{
instance: thisInstance,
messages: make(map[id.ID]format.Message),
sendErr: sendErr,
}
}
func (tnm *testNetworkManager) GetMsgListLen() int {
tnm.RLock()
defer tnm.RUnlock()
return len(tnm.messages)
}
func (tnm *testNetworkManager) GetMsgList() map[id.ID]format.Message {
tnm.RLock()
defer tnm.RUnlock()
return tnm.messages
}
func (tnm *testNetworkManager) GetMsg(recipient id.ID) format.Message {
tnm.RLock()
defer tnm.RUnlock()
return tnm.messages[recipient]
}
// TEST
func (tnm *testNetworkManager) SendE2E() (
[]id.Round, e2e.MessageID, time.Time, error) {
return nil, e2e.MessageID{}, time.Time{}, nil
}
// TEST
func (tnm *testNetworkManager) SendUnsafe() ([]id.Round, error) {
return []id.Round{}, nil
}
func (tnm *testNetworkManager) SendCMIX(message format.Message,
recipient *id.ID, _ cmix.Params) (id.Round, ephemeral.Id, error) {
tnm.Lock()
defer tnm.Unlock()
if tnm.sendErr {
return 0, ephemeral.Id{}, errors.New("Send error")
}
tnm.messages[*recipient] = message
return 0, ephemeral.Id{}, nil
}
func (tnm *testNetworkManager) SendManyCMIX([]cmix.TargetedCmixMessage, cmix.Params) (
id.Round, []ephemeral.Id, error) {
return 0, nil, nil
}
type dummyEventMgr struct{}
func (d *dummyEventMgr) Report(int, string, string, string) {}
func (tnm *testNetworkManager) GetEventManager() event.Reporter {
return &dummyEventMgr{}
}
func (tnm *testNetworkManager) GetInstance() *network.Instance { return tnm.instance }
func (tnm *testNetworkManager) GetAddressSpace() uint8 { return 0 }
func (tnm *testNetworkManager) GetHostParams() connect.HostParams { return connect.HostParams{} }
func (tnm *testNetworkManager) GetHealthTracker() interfaces.HealthTracker { return nil }
func (tnm *testNetworkManager) Follow(interfaces.ClientErrorReport) (stoppable.Stoppable, error) {
return nil, nil
}
func (tnm *testNetworkManager) CheckGarbledMessages() {}
func (tnm *testNetworkManager) CheckInProgressMessages() {}
func (tnm *testNetworkManager) InProgressRegistrations() int { return 0 }
func (tnm *testNetworkManager) GetSender() *gateway.Sender { return nil }
func (tnm *testNetworkManager) GetAddressSize() uint8 { return 0 }
func (tnm *testNetworkManager) RegisterAddressSizeNotification(string) (chan uint8, error) {
return nil, nil
}
func (tnm *testNetworkManager) UnregisterAddressSizeNotification(string) {}
func (tnm *testNetworkManager) SetPoolFilter(gateway.Filter) {}
func (tnm *testNetworkManager) GetVerboseRounds() string { return "" }
func (tnm *testNetworkManager) HasNode(*id.ID) bool { return false }
func (tnm *testNetworkManager) LookupHistoricalRound(id.Round, func(*mixmessages.RoundInfo, bool)) error {
return nil
}
func (tnm *testNetworkManager) NumRegisteredNodes() int { return 0 }
func (tnm *testNetworkManager) RegisterAddressSpaceNotification(string) (chan uint8, error) {
return nil, nil
}
func (tnm *testNetworkManager) SendToAny(func(*connect.Host) (interface{}, error), *stoppable.Single) (interface{}, error) {
return nil, nil
}
func (tnm *testNetworkManager) SendToPreferred([]*id.ID, func(*connect.Host, *id.ID, time.Duration) (interface{}, error), *stoppable.Single, time.Duration) (interface{}, error) {
return nil, nil
}
func (tnm *testNetworkManager) SetGatewayFilter(func(map[id.ID]int, *ndf.NetworkDefinition) map[id.ID]int) {
}
func (tnm *testNetworkManager) TrackServices(message.ServicesTracker) {}
func (tnm *testNetworkManager) TriggerNodeRegistration(*id.ID) {}
func (tnm *testNetworkManager) UnregisterAddressSpaceNotification(string) {}
func (tnm *testNetworkManager) AddFingerprint(*id.ID, format.Fingerprint, message.Processor) error {
return nil
}
func (tnm *testNetworkManager) DeleteFingerprint(*id.ID, format.Fingerprint) {}
func (tnm *testNetworkManager) DeleteClientFingerprints(*id.ID) {}
func (tnm *testNetworkManager) AddIdentity(*id.ID, time.Time, bool) error { return nil }
func (tnm *testNetworkManager) RemoveIdentity(*id.ID) {}
func (tnm *testNetworkManager) AddTrigger(*id.ID, message.Service, message.Processor) {}
func (tnm *testNetworkManager) DeleteTrigger(*id.ID, interfaces.Preimage, message.Processor) error {
return nil
}
func (tnm *testNetworkManager) DeleteClientTriggers(*id.ID) {}
// ////////////////////////////////////////////////////////////////////////////////
// // NDF Primes //
// ////////////////////////////////////////////////////////////////////////////////
func getNDF() *ndf.NetworkDefinition {
return &ndf.NetworkDefinition{
E2E: ndf.Group{
......
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package interfaces
import (
"time"
"gitlab.com/elixxir/comms/network"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/ndf"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
)
type NetworkManager interface {
// Follow starts the tracking of the network in a new thread.
// Errors that occur are reported on the ClientErrorReport function if
// passed. The returned stopable can be used to stop the follower.
// Only one follower may run at a time.
Follow(report ClientErrorReport) (stoppable.Stoppable, error)
/*===Sending==========================================================*/
// SendCMIX sends a "raw" CMIX message payload to the provided
// recipient. Returns the round ID of the round the payload
// was sent or an error if it fails.
SendCMIX(message format.Message, recipient *id.ID, p cmix.Params) (
id.Round, ephemeral.Id, error)
// SendManyCMIX sends many "raw" cMix message payloads to each
// of the provided recipients. Used to send messages in group
// chats. Metadata is NOT as well protected with this call and
// can leak data about yourself. Should be replaced with
// multiple uses of SendCmix in most cases. Returns the round
// ID of the round the payload was sent or an error if it
// fails.
// WARNING: Potentially Unsafe
SendManyCMIX(messages []cmix.TargetedCmixMessage, p cmix.Params) (
id.Round, []ephemeral.Id, error)
/*===Message Reception================================================*/
/* Identities are all network identities which the client is currently
trying to pick up message on. An identity must be added
to receive messages, fake ones will be used to poll the network
if none are present. On creation of the network handler, the identity in
session storage will be automatically added*/
// AddIdentity adds an identity to be tracked
// If persistent is false, the identity will not be stored to disk and
// will be dropped on reload.
AddIdentity(id *id.ID, validUntil time.Time, persistent bool) error
// RemoveIdentity removes a currently tracked identity.
RemoveIdentity(id *id.ID)
/* Fingerprints are the primary mechanism of identifying a
picked up message over cMix. They are a unique one time use
255 bit vector generally associated with a specific encryption
key, but can be used for an alternative protocol.When
registering a fingerprint, a MessageProcessor is registered to
handle the message.*/
// AddFingerprint - Adds a fingerprint which will be handled by a
// specific processor for messages received by the given identity
AddFingerprint(identity *id.ID, fingerprint format.Fingerprint,
mp message.Processor) error
// DeleteFingerprint deletes a single fingerprint associated
// with the given identity if it exists
DeleteFingerprint(identity *id.ID, fingerprint format.Fingerprint)
// DeleteClientFingerprints deletes al fingerprint associated
// with the given identity if it exists
DeleteClientFingerprints(identity *id.ID)
/* trigger - predefined hash based tags appended to all cMix messages
which, though trial hashing, are used to determine if a message applies
to this client
Triggers are used for 2 purposes - They can be processed by the
notifications system, or can be used to implement custom non fingerprint
processing of payloads. I.E. key negotiation, broadcast negotiation
A tag is appended to the message of the format tag =
H(H(messageContents), preimage) and trial hashing is used to
determine if a message adheres to a tag.
WARNING: If a preimage is known by an adversary, they can
determine which messages are for the client on reception
(which is normally hidden due to collision between ephemeral
IDs.
Due to the extra overhead of trial hashing, triggers are
processed after fingerprints. If a fingerprint match occurs
on the message, triggers will not be handled.
Triggers are address to the session. When starting a new
client, all triggers must be re-added before
StartNetworkFollower is called.
*/
// AddTrigger - Adds a trigger which can call a message
// handing function or be used for notifications. Multiple
// triggers can be registered for the same preimage.
// preimage - the preimage which is triggered on
// type - a descriptive string of the trigger. Generally
// used in notifications
// source - a byte buffer of related data. Generally used in
// notifications.
// Example: Sender ID
AddTrigger(identity *id.ID, newTrigger message.Service,
response message.Processor)
// DeleteTrigger - If only a single response is associated with the
// preimage, the entire preimage is removed. If there is more than one
// response, only the given response is removed if nil is passed in for
// response, all triggers for the preimage will be removed
DeleteTrigger(identity *id.ID, preimage Preimage,
response message.Processor) error
// DeleteClientTriggers - deletes all triggers assoseated with
// the given identity
DeleteClientTriggers(identity *id.ID)
// TrackServices - Registers a callback which will get called
// every time triggers change.
// It will receive the triggers list every time it is modified.
// Will only get callbacks while the Network Follower is running.
// Multiple trackTriggers can be registered
TrackServices(message.ServicesTracker)
/* In inProcess */
// it is possible to receive a message over cMix before the
// fingerprints or triggers are registered. As a result, when
// handling fails, messages are put in the inProcess que for a
// set number of retries.
// CheckInProgressMessages - retry processing all messages in check in
// progress messages. Call this after adding fingerprints or triggers
//while the follower is running.
CheckInProgressMessages()
/*===Nodes============================================================*/
/* Keys must be registed with nodes in order to send messages
throug them. this process is in general automatically handled
by the Network Manager*/
// HasNode can be used to determine if a keying relationship
// exists with a node.
HasNode(nid *id.ID) bool
// NumRegisteredNodes Returns the total number of nodes we have a keying
// relationship with
NumRegisteredNodes() int
// TriggerNodeRegistration triggers the generation of a keying
// relationship with a given node
TriggerNodeRegistration(nid *id.ID)
/*===Historical Rounds================================================*/
/* A complete set of round info is not kept on the client, and sometimes
the network will need to be queried to get round info. Historical rounds
is the system internal to the Network Manager to do this.
It can be used externally as well.*/
// LookupHistoricalRound - looks up the passed historical round on the
// network
LookupHistoricalRound(rid id.Round,
callback func(info *mixmessages.RoundInfo,
success bool)) error
/*===Sender===========================================================*/
/* The sender handles sending comms to the network. It tracks
connections to gateways and handles proxying to gateways for
targeted comms. It can be used externally to contact gateway
directly, bypassing the majority of the network package*/
// SendToAny can be used to send the comm to any gateway in the network.
SendToAny(sendFunc func(host *connect.Host) (interface{}, error),
stop *stoppable.Single) (interface{}, error)
// SendToPreferred sends to a specific gateway, doing so through another
// gateway as a proxy if not directly connected.
SendToPreferred(targets []*id.ID, sendFunc func(host *connect.Host,
target *id.ID, timeout time.Duration) (interface{}, error),
stop *stoppable.Single, timeout time.Duration) (interface{},
error)
// SetGatewayFilter sets a function which will be used to
// filter gateways before connecting.
SetGatewayFilter(f func(map[id.ID]int,
*ndf.NetworkDefinition) map[id.ID]int)
// GetHostParams - returns the host params used when
// connectign to gateways
GetHostParams() connect.HostParams
/*===Address Space====================================================*/
// The network compasses identities into a smaller address
// space to cause collisions and hide the actual recipient of
// messages. These functions allow for the tracking of this
// addresses space. In general, address space issues are
// completely handled by the network package
// GetAddressSpace GetAddressSize returns the current address
// size of IDs. Blocks until an address size is known.
GetAddressSpace() uint8
// RegisterAddressSpaceNotification returns a channel that
// will trigger for every address space size update. The
// provided tag is the unique ID for the channel. Returns an
// error if the tag is already used.
RegisterAddressSpaceNotification(tag string) (chan uint8, error)
// UnregisterAddressSpaceNotification stops broadcasting
// address space size updates on the channel with the
// specified tag.
UnregisterAddressSpaceNotification(tag string)
/*===Accessors========================================================*/
// GetInstance returns the network instance object, which tracks the
// state of the network
GetInstance() *network.Instance
// GetHealthTracker returns the health tracker, which using a polling or
// event api lets you determine if network following is functioning
GetHealthTracker() HealthTracker
// GetVerboseRounds returns stringification of verbose round info
GetVerboseRounds() string
}
type Preimage [32]byte
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment