Skip to content
Snippets Groups Projects
Commit c5f274cc authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

Merge remote-tracking branch 'origin/restructure' into restructure

parents 579143ab a0764447
No related branches found
No related tags found
3 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast
package network
import (
"errors"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/comms/mixmessages"
ds "gitlab.com/elixxir/comms/network/dataStructures"
"gitlab.com/elixxir/ekv"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"testing"
"time"
)
type mockMonitor struct{}
func (m *mockMonitor) AddHealthCallback(f func(bool)) uint64 {
return 0
}
func (m *mockMonitor) RemoveHealthCallback(uint64) {
return
}
func (m *mockMonitor) IsHealthy() bool {
return true
}
func (m *mockMonitor) WasHealthy() bool {
return true
}
func (m *mockMonitor) StartProcesses() (stoppable.Stoppable, error) {
return stoppable.NewSingle("t"), nil
}
type mockRegistrar struct {
statusReturn bool
}
func (mr *mockRegistrar) AddRoundEventChan(rid id.Round, eventChan chan ds.EventReturn,
timeout time.Duration, validStates ...states.Round) *ds.EventCallback {
eventChan <- ds.EventReturn{
RoundInfo: &mixmessages.RoundInfo{
ID: 2,
UpdateID: 0,
State: 0,
BatchSize: 0,
Topology: nil,
Timestamps: nil,
Errors: nil,
ClientErrors: nil,
ResourceQueueTimeoutMillis: 0,
Signature: nil,
AddressSpaceSize: 0,
EccSignature: nil,
},
TimedOut: mr.statusReturn,
}
return &ds.EventCallback{}
}
func mockCriticalSender(msg format.Message, recipient *id.ID,
params CMIXParams) (id.Round, ephemeral.Id, error) {
return id.Round(1), ephemeral.Id{}, nil
}
func mockFailCriticalSender(msg format.Message, recipient *id.ID,
params CMIXParams) (id.Round, ephemeral.Id, error) {
return id.Round(1), ephemeral.Id{}, errors.New("Test error")
}
// TestCritical tests the basic functions of the critical messaging system
func TestCritical(t *testing.T) {
// Init mock structures & start thread
kv := versioned.NewKV(ekv.Memstore{})
mr := &mockRegistrar{
mr := &mockRoundEventRegistrar{
statusReturn: true,
}
c := newCritical(kv, &mockMonitor{}, mr, mockCriticalSender)
......
package network
//func TestManager_Follow(t *testing.T) {
// m, err := newTestManager(t)
// if err != nil {
// t.Fatalf("Failed to create test manager: %+v", err)
// }
//
// clientErrorReport := func(source, message, trace string) {
//
// }
// s, err := m.Follow(clientErrorReport)
// if err != nil {
// t.Errorf("Failed to follow network: %+v", err)
// }
//
// err = s.Close()
// if err != nil {
// t.Errorf("Failed to close follower: %+v", err)
// }
//}
......@@ -5,6 +5,7 @@ import (
"gitlab.com/elixxir/client/network/historical"
"gitlab.com/elixxir/client/network/identity"
"gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/network/nodes"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/primitives/format"
......@@ -287,3 +288,9 @@ type Manager interface {
}
type ClientErrorReport func(source, message, trace string)
type clientCommsInterface interface {
followNetworkComms
SendCmixCommsInterface
nodes.RegisterNodeCommsInterface
}
......@@ -53,7 +53,7 @@ type manager struct {
// Generic RNG for client
rng *fastRNG.StreamGenerator
// Comms pointer to send/receive messages
comms *client.Comms
comms clientCommsInterface
// Contains the network instance
instance *commNetwork.Instance
......
package network
import (
"gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id"
"testing"
)
/*
type dummyEvent struct{}
......@@ -101,3 +109,28 @@ func Test_attemptSendCmix(t *testing.T) {
return
}
}*/
func TestManager_SendCMIX(t *testing.T) {
m, err := newTestManager(t)
if err != nil {
t.Fatalf("Failed to create test manager: %+v", err)
}
recipientID := id.NewIdFromString("zezima", id.User, t)
contents := []byte("message")
fp := format.NewFingerprint(contents)
service := message.GetDefaultService(recipientID)
mac := make([]byte, 32)
_, err = csprng.NewSystemRNG().Read(mac)
if err != nil {
t.Errorf("Failed to read random mac bytes: %+v", err)
}
mac[0] = 0
params := GetDefaultCMIXParams()
rid, eid, err := m.SendCMIX(recipientID, fp, service, contents, mac, params)
if err != nil {
t.Errorf("Failed to sendcmix: %+v", err)
t.FailNow()
}
t.Logf("Test of SendCMIX returned:\n\trid: %v\teid: %+v", rid, eid)
}
package network
import (
"gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id"
"testing"
)
/*
import (
"github.com/pkg/errors"
......@@ -139,3 +147,44 @@ func Test_attemptSendManyCmix(t *testing.T) {
t.Errorf("Failed to sendcmix: %+v", err)
}
}*/
func TestManager_SendManyCMIX(t *testing.T) {
m, err := newTestManager(t)
if err != nil {
t.Fatalf("Failed to create test manager: %+v", err)
}
recipientID := id.NewIdFromString("zezima", id.User, t)
contents := []byte("message")
fp := format.NewFingerprint(contents)
service := message.GetDefaultService(recipientID)
mac := make([]byte, 32)
_, err = csprng.NewSystemRNG().Read(mac)
if err != nil {
t.Errorf("Failed to read random mac bytes: %+v", err)
}
mac[0] = 0
messages := []TargetedCmixMessage{
{
Recipient: recipientID,
Payload: contents,
Fingerprint: fp,
Service: service,
Mac: mac,
},
{
Recipient: recipientID,
Payload: contents,
Fingerprint: fp,
Service: service,
Mac: mac,
},
}
rid, eid, err := m.SendManyCMIX(messages, GetDefaultCMIXParams())
if err != nil {
t.Errorf("Failed to run SendManyCMIX: %+v", err)
}
t.Logf("Test of SendManyCMIX returned:\n\trid: %v\teid: %+v", rid, eid)
}
......@@ -7,61 +7,270 @@
package network
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/network/gateway"
"gitlab.com/elixxir/client/network/nodes"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/comms/client"
"gitlab.com/elixxir/comms/mixmessages"
commsNetwork "gitlab.com/elixxir/comms/network"
ds "gitlab.com/elixxir/comms/network/dataStructures"
"gitlab.com/elixxir/crypto/cyclic"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/ekv"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/comms/signature"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/crypto/large"
"gitlab.com/xx_network/crypto/signature/rsa"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/ndf"
"testing"
"time"
)
func newTestManager(t *testing.T) (*manager, error) {
// mockManagerComms
type mockManagerComms struct {
mockFollowNetworkComms
mockSendCmixComms
mockRegisterNodeComms
}
// mockFollowNetworkComms
type mockFollowNetworkComms struct{}
func (mfnc *mockFollowNetworkComms) GetHost(hostId *id.ID) (*connect.Host, bool) {
return nil, true
}
func (mfnc *mockFollowNetworkComms) SendPoll(host *connect.Host, message *mixmessages.GatewayPoll) (
*mixmessages.GatewayPollResponse, error) {
return &mixmessages.GatewayPollResponse{}, nil
}
// mockSendCmixComms
type mockSendCmixComms struct{}
func (mscc *mockSendCmixComms) SendPutMessage(host *connect.Host, message *mixmessages.GatewaySlot,
timeout time.Duration) (*mixmessages.GatewaySlotResponse, error) {
return &mixmessages.GatewaySlotResponse{
Accepted: true,
RoundID: 5,
}, nil
}
func (mscc *mockSendCmixComms) SendPutManyMessages(host *connect.Host, messages *mixmessages.GatewaySlots,
timeout time.Duration) (*mixmessages.GatewaySlotResponse, error) {
return &mixmessages.GatewaySlotResponse{
Accepted: true,
RoundID: 5,
}, nil
}
// mockRegisterNodeComms
type mockRegisterNodeComms struct{}
func (mrnc *mockRegisterNodeComms) SendRequestClientKeyMessage(host *connect.Host,
message *mixmessages.SignedClientKeyRequest) (*mixmessages.SignedKeyResponse, error) {
return &mixmessages.SignedKeyResponse{}, nil
}
// mockMixCypher
type mockMixCypher struct{}
func (mmc *mockMixCypher) Encrypt(msg format.Message, salt []byte, roundID id.Round) (
format.Message, [][]byte) {
return format.Message{}, nil
}
func (mmc *mockMixCypher) MakeClientGatewayAuthMAC(salt, digest []byte) []byte {
return nil
}
// mockEventManager
type mockEventManager struct{}
func (mem *mockEventManager) Report(priority int, category, evtType, details string) {}
// mockNodesRegistrar
type mockNodesRegistrar struct{}
func (mnr *mockNodesRegistrar) StartProcesses(numParallel uint) stoppable.Stoppable {
return stoppable.NewSingle("mockNodesRegistrar")
}
func (mnr *mockNodesRegistrar) HasNode(nid *id.ID) bool {
return true
}
func (mnr *mockNodesRegistrar) RemoveNode(nid *id.ID) {
return
}
func (mnr *mockNodesRegistrar) GetNodeKeys(topology *connect.Circuit) (nodes.MixCypher, error) {
return &mockMixCypher{}, nil
}
func (mnr *mockNodesRegistrar) NumRegisteredNodes() int {
return 1
}
func (mnr *mockNodesRegistrar) GetInputChannel() chan<- commsNetwork.NodeGateway {
return nil
}
func (mnr *mockNodesRegistrar) TriggerNodeRegistration(nid *id.ID) {
return
}
// mockGatewaySender
type mockGatewaySender struct{}
func (mgw *mockGatewaySender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error),
stop *stoppable.Single) (interface{}, error) {
return nil, nil
}
func (mgw *mockGatewaySender) SendToPreferred(targets []*id.ID, sendFunc gateway.SendToPreferredFunc,
stop *stoppable.Single, timeout time.Duration) (interface{}, error) {
hp := connect.GetDefaultHostParams()
hp.MaxSendRetries = 5
hp.MaxRetries = 5
h, err := connect.NewHost(targets[0], "0.0.0.0", []byte(pub), hp)
if err != nil {
return nil, errors.WithMessage(err, "[mockGatewaySender] Failed to create host during sendtopreferred")
}
return sendFunc(h, targets[0], time.Second)
//ret := &mixmessages.GatewaySlotResponse{
// Accepted: true,
// RoundID: 5,
//}
//return ret, nil
}
func (mgw *mockGatewaySender) UpdateNdf(ndf *ndf.NetworkDefinition) {
return
}
func (mgw *mockGatewaySender) SetGatewayFilter(f gateway.Filter) {}
func (mgw *mockGatewaySender) GetHostParams() connect.HostParams {
return connect.GetDefaultHostParams()
}
// mockMonitor
type mockMonitor struct{}
func (mm *mockMonitor) AddHealthCallback(f func(bool)) uint64 {
return 0
}
func (mm *mockMonitor) RemoveHealthCallback(uint64) {
return
}
func (mm *mockMonitor) IsHealthy() bool {
return true
}
func (mm *mockMonitor) WasHealthy() bool {
return true
}
func (mm *mockMonitor) StartProcesses() (stoppable.Stoppable, error) {
return stoppable.NewSingle("t"), nil
}
commsManager := connect.NewManagerTesting(t)
instanceComms := &connect.ProtoComms{
Manager: commsManager,
// mockRoundEventRegistrar
type mockRoundEventRegistrar struct {
statusReturn bool
}
func (mrr *mockRoundEventRegistrar) AddRoundEventChan(rid id.Round, eventChan chan ds.EventReturn,
timeout time.Duration, validStates ...states.Round) *ds.EventCallback {
eventChan <- ds.EventReturn{
RoundInfo: &mixmessages.RoundInfo{
ID: 2,
UpdateID: 0,
State: 0,
BatchSize: 0,
Topology: nil,
Timestamps: nil,
Errors: nil,
ClientErrors: nil,
ResourceQueueTimeoutMillis: 0,
Signature: nil,
AddressSpaceSize: 0,
EccSignature: nil,
},
TimedOut: mrr.statusReturn,
}
return &ds.EventCallback{}
}
thisInstance, err := commsNetwork.NewInstanceTesting(instanceComms, getNDF(), getNDF(), nil, nil, t)
// mockCriticalSender
func mockCriticalSender(msg format.Message, recipient *id.ID,
params CMIXParams) (id.Round, ephemeral.Id, error) {
return id.Round(1), ephemeral.Id{}, nil
}
// mockFailCriticalSender
func mockFailCriticalSender(msg format.Message, recipient *id.ID,
params CMIXParams) (id.Round, ephemeral.Id, error) {
return id.Round(1), ephemeral.Id{}, errors.New("Test error")
}
func newTestManager(t *testing.T) (*manager, error) {
kv := versioned.NewKV(ekv.Memstore{})
myID := id.NewIdFromString("zezima", id.User, t)
comms, err := client.NewClientComms(myID, nil, nil, nil)
if err != nil {
return nil, err
}
comms, err := client.NewClientComms(id.NewIdFromUInt(100, id.User, t), nil, nil, nil)
inst, err := commsNetwork.NewInstanceTesting(comms.ProtoComms, getNDF(), getNDF(), getGroup(), getGroup(), t)
if err != nil {
t.Fatal(err)
return nil, err
}
pk, err := rsa.GenerateKey(csprng.NewSystemRNG(), 2048)
if err != nil {
return nil, err
}
pubKey := pk.GetPublic()
m := &manager{
session: storage.InitTestingSession(t),
rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG),
comms: comms,
instance: thisInstance,
param: GetDefaultParams(),
// todo: for other tests, these may need to be initialized with test values.
// That explodes this setup function massively, may need a different setup
// function for each embedded interface?
Sender: nil,
Handler: nil,
Registrar: nil,
Retriever: nil,
Pickup: nil,
Space: nil,
Tracker: nil,
Monitor: nil,
crit: nil,
earliestRound: nil,
tracker: nil,
latencySum: 0,
numLatencies: 0,
verboseRounds: nil,
events: nil,
maxMsgLen: 0,
now := time.Now()
timestamps := []uint64{
uint64(now.Add(-30 * time.Second).UnixNano()), //PENDING
uint64(now.Add(-25 * time.Second).UnixNano()), //PRECOMPUTING
uint64(now.Add(-5 * time.Second).UnixNano()), //STANDBY
uint64(now.Add(5 * time.Second).UnixNano()), //QUEUED
0} //REALTIME
nid1 := id.NewIdFromString("nid1", id.Node, t)
nid2 := id.NewIdFromString("nid2", id.Node, t)
nid3 := id.NewIdFromString("nid3", id.Node, t)
ri := &mixmessages.RoundInfo{
ID: 3,
UpdateID: 0,
State: uint32(states.QUEUED),
BatchSize: 0,
Topology: [][]byte{nid1.Marshal(), nid2.Marshal(), nid3.Marshal()},
Timestamps: timestamps,
Errors: nil,
ClientErrors: nil,
ResourceQueueTimeoutMillis: 0,
Signature: nil,
AddressSpaceSize: 4,
}
err = signature.SignRsa(ri, pk)
if err != nil {
return nil, err
}
rnd := ds.NewRound(ri, pubKey, nil)
inst.GetWaitingRounds().Insert([]*ds.Round{rnd}, nil)
m := &manager{
session: storage.InitTestingSession(t),
rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG),
instance: inst,
comms: &mockManagerComms{},
param: GetDefaultParams(),
Sender: &mockGatewaySender{},
Registrar: &mockNodesRegistrar{},
Monitor: &mockMonitor{},
crit: newCritical(kv, &mockMonitor{}, &mockRoundEventRegistrar{}, mockCriticalSender),
events: &mockEventManager{},
}
return m, nil
}
......@@ -108,6 +317,29 @@ func getNDF() *ndf.NetworkDefinition {
}
}
func getGroup() *cyclic.Group {
e2eGrp := cyclic.NewGroup(
large.NewIntFromString("E2EE983D031DC1DB6F1A7A67DF0E9A8E5561DB8E8D49413394C049B"+
"7A8ACCEDC298708F121951D9CF920EC5D146727AA4AE535B0922C688B55B3DD2AE"+
"DF6C01C94764DAB937935AA83BE36E67760713AB44A6337C20E7861575E745D31F"+
"8B9E9AD8412118C62A3E2E29DF46B0864D0C951C394A5CBBDC6ADC718DD2A3E041"+
"023DBB5AB23EBB4742DE9C1687B5B34FA48C3521632C4A530E8FFB1BC51DADDF45"+
"3B0B2717C2BC6669ED76B4BDD5C9FF558E88F26E5785302BEDBCA23EAC5ACE9209"+
"6EE8A60642FB61E8F3D24990B8CB12EE448EEF78E184C7242DD161C7738F32BF29"+
"A841698978825B4111B4BC3E1E198455095958333D776D8B2BEEED3A1A1A221A6E"+
"37E664A64B83981C46FFDDC1A45E3D5211AAF8BFBC072768C4F50D7D7803D2D4F2"+
"78DE8014A47323631D7E064DE81C0C6BFA43EF0E6998860F1390B5D3FEACAF1696"+
"015CB79C3F9C2D93D961120CD0E5F12CBB687EAB045241F96789C38E89D796138E"+
"6319BE62E35D87B1048CA28BE389B575E994DCA755471584A09EC723742DC35873"+
"847AEF49F66E43873", 16),
large.NewIntFromString("2", 16))
return e2eGrp
}
var pub = "-----BEGIN CERTIFICATE-----\nMIIGHTCCBAWgAwIBAgIUOcAn9cpH+hyRH8/UfqtbFDoSxYswDQYJKoZIhvcNAQEL\nBQAwgZIxCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTESMBAGA1UEBwwJQ2xhcmVt\nb250MRAwDgYDVQQKDAdFbGl4eGlyMRQwEgYDVQQLDAtEZXZlbG9wbWVudDEZMBcG\nA1UEAwwQZ2F0ZXdheS5jbWl4LnJpcDEfMB0GCSqGSIb3DQEJARYQYWRtaW5AZWxp\neHhpci5pbzAeFw0xOTA4MTYwMDQ4MTNaFw0yMDA4MTUwMDQ4MTNaMIGSMQswCQYD\nVQQGEwJVUzELMAkGA1UECAwCQ0ExEjAQBgNVBAcMCUNsYXJlbW9udDEQMA4GA1UE\nCgwHRWxpeHhpcjEUMBIGA1UECwwLRGV2ZWxvcG1lbnQxGTAXBgNVBAMMEGdhdGV3\nYXkuY21peC5yaXAxHzAdBgkqhkiG9w0BCQEWEGFkbWluQGVsaXh4aXIuaW8wggIi\nMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQC7Dkb6VXFn4cdpU0xh6ji0nTDQ\nUyT9DSNW9I3jVwBrWfqMc4ymJuonMZbuqK+cY2l+suS2eugevWZrtzujFPBRFp9O\n14Jl3fFLfvtjZvkrKbUMHDHFehascwzrp3tXNryiRMmCNQV55TfITVCv8CLE0t1i\nbiyOGM9ZWYB2OjXt59j76lPARYww5qwC46vS6+3Cn2Yt9zkcrGeskWEFa2VttHqF\n910TP+DZk2R5C7koAh6wZYK6NQ4S83YQurdHAT51LKGrbGehFKXq6/OAXCU1JLi3\nkW2PovTb6MZuvxEiRmVAONsOcXKu7zWCmFjuZZwfRt2RhnpcSgzfrarmsGM0LZh6\nJY3MGJ9YdPcVGSz+Vs2E4zWbNW+ZQoqlcGeMKgsIiQ670g0xSjYICqldpt79gaET\n9PZsoXKEmKUaj6pq1d4qXDk7s63HRQazwVLGBdJQK8qX41eCdR8VMKbrCaOkzD5z\ngnEu0jBBAwdMtcigkMIk1GRv91j7HmqwryOBHryLi6NWBY3tjb4So9AppDQB41SH\n3SwNenAbNO1CXeUqN0hHX6I1bE7OlbjqI7tXdrTllHAJTyVVjenPel2ApMXp+LVR\ndDbKtwBiuM6+n+z0I7YYerxN1gfvpYgcXm4uye8dfwotZj6H2J/uSALsU2v9UHBz\nprdrLSZk2YpozJb+CQIDAQABo2kwZzAdBgNVHQ4EFgQUDaTvG7SwgRQ3wcYx4l+W\nMcZjX7owHwYDVR0jBBgwFoAUDaTvG7SwgRQ3wcYx4l+WMcZjX7owDwYDVR0TAQH/\nBAUwAwEB/zAUBgNVHREEDTALgglmb28uY28udWswDQYJKoZIhvcNAQELBQADggIB\nADKz0ST0uS57oC4rT9zWhFqVZkEGh1x1XJ28bYtNUhozS8GmnttV9SnJpq0EBCm/\nr6Ub6+Wmf60b85vCN5WDYdoZqGJEBjGGsFzl4jkYEE1eeMfF17xlNUSdt1qLCE8h\nU0glr32uX4a6nsEkvw1vo1Liuyt+y0cOU/w4lgWwCqyweu3VuwjZqDoD+3DShVzX\n8f1p7nfnXKitrVJt9/uE+AtAk2kDnjBFbRxCfO49EX4Cc5rADUVXMXm0itquGBYp\nMbzSgFmsMp40jREfLYRRzijSZj8tw14c2U9z0svvK9vrLCrx9+CZQt7cONGHpr/C\n/GIrP/qvlg0DoLAtjea73WxjSCbdL3Nc0uNX/ymXVHdQ5husMCZbczc9LYdoT2VP\nD+GhkAuZV9g09COtRX4VP09zRdXiiBvweiq3K78ML7fISsY7kmc8KgVH22vcXvMX\nCgGwbrxi6QbQ80rWjGOzW5OxNFvjhvJ3vlbOT6r9cKZGIPY8IdN/zIyQxHiim0Jz\noavr9CPDdQefu9onizsmjsXFridjG/ctsJxcUEqK7R12zvaTxu/CVYZbYEUFjsCe\nq6ZAACiEJGvGeKbb/mSPvGs2P1kS70/cGp+P5kBCKqrm586FB7BcafHmGFrWhT3E\nLOUYkOV/gADT2hVDCrkPosg7Wb6ND9/mhCVVhf4hLGRh\n-----END CERTIFICATE-----\n"
// Round IDs to return on mock historicalRounds comm
const failedHistoricalRoundID = 7
const completedHistoricalRoundID = 8
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment