diff --git a/api/client.go b/api/client.go index d68601829729a929b917923eeb1dec5841ce293e..2a07d1743783e8a22dff5bc4d7c384b5e092a772 100644 --- a/api/client.go +++ b/api/client.go @@ -40,6 +40,8 @@ type Client struct { switchboard *switchboard.Switchboard //object used for communications comms *client.Comms + // Network parameters + parameters params.Network // note that the manager has a pointer to the context in many cases, but // this interface allows it to be mocked for easy testing without the @@ -146,7 +148,7 @@ func NewPrecannedClient(precannedID uint, defJSON, storageDir string, password [ } // OpenClient session, but don't connect to the network or log in -func OpenClient(storageDir string, password []byte) (*Client, error) { +func OpenClient(storageDir string, password []byte, parameters params.Network) (*Client, error) { jww.INFO.Printf("OpenClient()") // Use fastRNG for RNG ops (AES fortuna based RNG using system RNG) rngStreamGen := fastRNG.NewStreamGenerator(12, 3, @@ -168,16 +170,17 @@ func OpenClient(storageDir string, password []byte) (*Client, error) { network: nil, runner: stoppable.NewMulti("client"), status: newStatusTracker(), + parameters: parameters, } return c, nil } // Login initalizes a client object from existing storage. -func Login(storageDir string, password []byte) (*Client, error) { +func Login(storageDir string, password []byte, parameters params.Network) (*Client, error) { jww.INFO.Printf("Login()") - c, err := OpenClient(storageDir, password) + c, err := OpenClient(storageDir, password, parameters) if err != nil { return nil, err @@ -187,8 +190,8 @@ func Login(storageDir string, password []byte) (*Client, error) { c.services = newServiceProcessiesList(c.runner) //get the user from session - user := c.storage.User() - cryptoUser := user.GetCryptographicIdentity() + u := c.storage.User() + cryptoUser := u.GetCryptographicIdentity() //start comms c.comms, err = client.NewClientComms(cryptoUser.GetTransmissionID(), @@ -228,7 +231,7 @@ func Login(storageDir string, password []byte) (*Client, error) { // Initialize network and link it to context c.network, err = network.NewManager(c.storage, c.switchboard, c.rng, c.comms, - params.GetDefaultNetwork(), def) + parameters, def) if err != nil { return nil, err } @@ -293,7 +296,7 @@ func (c *Client) StartNetworkFollower() error { } c.runner.Add(stopFollow) // Key exchange - c.runner.Add(keyExchange.Start(c.switchboard, c.storage, c.network, params.GetDefaultRekey())) + c.runner.Add(keyExchange.Start(c.switchboard, c.storage, c.network, c.parameters.Rekey)) err = c.status.toRunning() if err != nil { @@ -353,6 +356,7 @@ func (c *Client) GetSwitchboard() interfaces.Switchboard { // events. func (c *Client) GetRoundEvents() interfaces.RoundEvents { jww.INFO.Printf("GetRoundEvents()") + jww.WARN.Printf("GetRoundEvents does not handle Client Errors edge case!") return c.network.GetInstance().GetRoundEvents() } diff --git a/bindings/client.go b/bindings/client.go index 2789061f4119519698a8263daa5db0252ef7300f..7b026bd913960409189070363fb87d7bad92ed34 100644 --- a/bindings/client.go +++ b/bindings/client.go @@ -15,6 +15,7 @@ import ( "gitlab.com/elixxir/client/api" "gitlab.com/elixxir/client/interfaces/contact" "gitlab.com/elixxir/client/interfaces/message" + "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/utility" "gitlab.com/elixxir/comms/mixmessages" ds "gitlab.com/elixxir/comms/network/dataStructures" @@ -75,8 +76,13 @@ func NewPrecannedClient(precannedID int, network, storageDir string, password [] // memory and stored as securely as possible using the memguard library. // Login does not block on network connection, and instead loads and // starts subprocesses to perform network operations. -func Login(storageDir string, password []byte) (*Client, error) { - client, err := api.Login(storageDir, password) +func Login(storageDir string, password []byte, parameters string) (*Client, error) { + p, err := params.GetNetworkParameters(parameters) + if err != nil { + return nil, errors.New(fmt.Sprintf("Failed to login: %+v", err)) + } + + client, err := api.Login(storageDir, password, p) if err != nil { return nil, errors.New(fmt.Sprintf("Failed to login: %+v", err)) } @@ -291,15 +297,15 @@ func (c *Client) RegisterRoundEventsHandler(rid int, cb RoundEventCallback, } // RegisterMessageDeliveryCB allows the caller to get notified if the rounds a -// message was sent in sucesfully completed. Under the hood, this uses the same -// interface as RegisterRoundEventsHandler, but provides a convienet way to use +// message was sent in successfully completed. Under the hood, this uses the same +// interface as RegisterRoundEventsHandler, but provides a convent way to use // the interface in its most common form, looking up the result of message -// retreval +// retrieval // // The callbacks will return at timeoutMS if no state update occurs // // This function takes the marshaled send report to ensure a memory leak does -// not occur as a result of both sides of the bindings holding a refrence to +// not occur as a result of both sides of the bindings holding a reference to // the same pointer. func (c *Client) RegisterMessageDeliveryCB(marshaledSendReport []byte, mdc MessageDeliveryCallback, timeoutMS int) (*Unregister, error) { diff --git a/bindings/params.go b/bindings/params.go new file mode 100644 index 0000000000000000000000000000000000000000..35afbb8901cfd5e8309f8be7a30a14c907496c23 --- /dev/null +++ b/bindings/params.go @@ -0,0 +1,34 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2021 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +// Contains params-related bindings + +package bindings + +import ( + "gitlab.com/elixxir/client/interfaces/params" +) + +func (c *Client) GetCMIXParams() (string, error) { + p, err := params.GetDefaultCMIX().Marshal() + return string(p), err +} + +func (c *Client) GetE2EParams() (string, error) { + p, err := params.GetDefaultE2E().Marshal() + return string(p), err +} + +func (c *Client) GetNetworkParams() (string, error) { + p, err := params.GetDefaultNetwork().Marshal() + return string(p), err +} + +func (c *Client) GetUnsafeParams() (string, error) { + p, err := params.GetDefaultUnsafe().Marshal() + return string(p), err +} diff --git a/bindings/send.go b/bindings/send.go index f9c116d5bcf71246f57c8eda450c0d2bda49aeaf..605fddd26d2ceace3a7081e73a0a17b0d007d87a 100644 --- a/bindings/send.go +++ b/bindings/send.go @@ -30,7 +30,13 @@ import ( // This will return the round the message was sent on if it is successfully sent // This can be used to register a round event to learn about message delivery. // on failure a round id of -1 is returned -func (c *Client) SendCmix(recipient, contents []byte) (int, error) { +func (c *Client) SendCmix(recipient, contents []byte, parameters string) (int, error) { + p, err := params.GetCMIXParameters(parameters) + if err != nil { + return -1, errors.New(fmt.Sprintf("Failed to sendCmix: %+v", + err)) + } + u, err := id.Unmarshal(recipient) if err != nil { return -1, errors.New(fmt.Sprintf("Failed to sendCmix: %+v", @@ -43,7 +49,7 @@ func (c *Client) SendCmix(recipient, contents []byte) (int, error) { err)) } - rid, err := c.api.SendCMIX(msg, params.GetDefaultCMIX()) + rid, err := c.api.SendCMIX(msg, p) if err != nil { return -1, errors.New(fmt.Sprintf("Failed to sendCmix: %+v", err)) @@ -60,7 +66,12 @@ func (c *Client) SendCmix(recipient, contents []byte) (int, error) { // Message Types can be found in client/interfaces/message/type.go // Make sure to not conflict with ANY default message types with custom types func (c *Client) SendUnsafe(recipient, payload []byte, - messageType int) (*RoundList, error) { + messageType int, parameters string) (*RoundList, error) { + p, err := params.GetUnsafeParameters(parameters) + if err != nil { + return nil, errors.New(fmt.Sprintf("Failed to sendUnsafe: %+v", + err)) + } u, err := id.Unmarshal(recipient) if err != nil { return nil, errors.New(fmt.Sprintf("Failed to sendUnsafe: %+v", @@ -73,7 +84,7 @@ func (c *Client) SendUnsafe(recipient, payload []byte, MessageType: message.Type(messageType), } - rids, err := c.api.SendUnsafe(m, params.GetDefaultUnsafe()) + rids, err := c.api.SendUnsafe(m, p) if err != nil { return nil, errors.New(fmt.Sprintf("Failed to sendUnsafe: %+v", err)) @@ -88,7 +99,12 @@ func (c *Client) SendUnsafe(recipient, payload []byte, // // Message Types can be found in client/interfaces/message/type.go // Make sure to not conflict with ANY default message types -func (c *Client) SendE2E(recipient, payload []byte, messageType int) (*SendReport, error) { +func (c *Client) SendE2E(recipient, payload []byte, messageType int, parameters string) (*SendReport, error) { + p, err := params.GetE2EParameters(parameters) + if err != nil { + return nil, errors.New(fmt.Sprintf("Failed SendE2E: %+v", err)) + } + u, err := id.Unmarshal(recipient) if err != nil { return nil, errors.New(fmt.Sprintf("Failed SendE2E: %+v", err)) @@ -100,7 +116,7 @@ func (c *Client) SendE2E(recipient, payload []byte, messageType int) (*SendRepor MessageType: message.Type(messageType), } - rids, mid, err := c.api.SendE2E(m, params.GetDefaultE2E()) + rids, mid, err := c.api.SendE2E(m, p) if err != nil { return nil, errors.New(fmt.Sprintf("Failed SendE2E: %+v", err)) } diff --git a/cmd/root.go b/cmd/root.go index 36539fba6738c70f00b862802d0642ffbf594017..656ac527f385cae9defde1e4526c398b14d218a5 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -212,7 +212,7 @@ func createClient() *api.Client { } } - client, err := api.OpenClient(storeDir, []byte(pass)) + client, err := api.OpenClient(storeDir, []byte(pass), params.GetDefaultNetwork()) if err != nil { jww.FATAL.Panicf("%+v", err) } @@ -226,7 +226,7 @@ func initClient() *api.Client { storeDir := viper.GetString("session") //load the client - client, err := api.Login(storeDir, []byte(pass)) + client, err := api.Login(storeDir, []byte(pass), params.GetDefaultNetwork()) if err != nil { jww.FATAL.Panicf("%+v", err) } diff --git a/interfaces/params/CMIX.go b/interfaces/params/CMIX.go index 8c55059999f117da1239baa0e9b33ef6c0623eb2..e4142eb5e509dd8ffce8cfd6b3e7a8d33d4fdc46 100644 --- a/interfaces/params/CMIX.go +++ b/interfaces/params/CMIX.go @@ -7,7 +7,10 @@ package params -import "time" +import ( + "encoding/json" + "time" +) type CMIX struct { //maximum number of rounds to try and send on @@ -23,3 +26,19 @@ func GetDefaultCMIX() CMIX { RetryDelay: 1 * time.Second, } } + +func (c CMIX) Marshal() ([]byte, error) { + return json.Marshal(c) +} + +// Obtain default CMIX parameters, or override with given parameters if set +func GetCMIXParameters(params string) (CMIX, error) { + p := GetDefaultCMIX() + if len(params) > 0 { + err := json.Unmarshal([]byte(params), &p) + if err != nil { + return CMIX{}, err + } + } + return p, nil +} diff --git a/interfaces/params/CMIX_test.go b/interfaces/params/CMIX_test.go index 01af75d68875ba2b1b0572d8a9debdbb8f0a7e2a..06d1968bbbd46aeef40e0a5c343c9e863493b5ae 100644 --- a/interfaces/params/CMIX_test.go +++ b/interfaces/params/CMIX_test.go @@ -18,3 +18,38 @@ func TestGetDefaultCMIX(t *testing.T) { t.Errorf("GetDefaultCMIX did not return expected values") } } + +// New params path +func TestGetCMIXParameters(t *testing.T) { + p := GetDefaultCMIX() + + expected := p.RoundTries + 1 + p.RoundTries = expected + jsonString, err := p.Marshal() + if err != nil { + t.Errorf("%+v", err) + } + + q, err := GetCMIXParameters(string(jsonString)) + if err != nil { + t.Errorf("%+v", err) + } + + if q.RoundTries != expected { + t.Errorf("Parameters failed to change! Got %d, Expected %d", q.RoundTries, expected) + } +} + +// No new params path +func TestGetCMIXParameters_Default(t *testing.T) { + p := GetDefaultCMIX() + + q, err := GetCMIXParameters("") + if err != nil { + t.Errorf("%+v", err) + } + + if q.RoundTries != p.RoundTries { + t.Errorf("Parameters failed to change! Got %d, Expected %d", q.RoundTries, p.RoundTries) + } +} diff --git a/interfaces/params/E2E.go b/interfaces/params/E2E.go index a62b5ad58e0e14113d295d0de09905890a65dcbe..7a2e38428cfcf7c60604ad4445ec80e9ba680145 100644 --- a/interfaces/params/E2E.go +++ b/interfaces/params/E2E.go @@ -7,7 +7,10 @@ package params -import "fmt" +import ( + "encoding/json" + "fmt" +) type E2E struct { Type SendType @@ -19,6 +22,21 @@ func GetDefaultE2E() E2E { CMIX: GetDefaultCMIX(), } } +func (e E2E) Marshal() ([]byte, error) { + return json.Marshal(e) +} + +// Obtain default E2E parameters, or override with given parameters if set +func GetE2EParameters(params string) (E2E, error) { + p := GetDefaultE2E() + if len(params) > 0 { + err := json.Unmarshal([]byte(params), &p) + if err != nil { + return E2E{}, err + } + } + return p, nil +} type SendType uint8 diff --git a/interfaces/params/E2E_test.go b/interfaces/params/E2E_test.go index 09230e135d3576761d38211a9a53f913661892bf..f7d90c4eece73233d37e3941879ccff7bf640f57 100644 --- a/interfaces/params/E2E_test.go +++ b/interfaces/params/E2E_test.go @@ -31,3 +31,38 @@ func TestSendType_String(t *testing.T) { t.Errorf("Running String on unknown E2E type got %s", e.Type.String()) } } + +// New params path +func TestGetE2EParameters(t *testing.T) { + p := GetDefaultE2E() + + expected := p.RoundTries + 1 + p.RoundTries = expected + jsonString, err := p.Marshal() + if err != nil { + t.Errorf("%+v", err) + } + + q, err := GetE2EParameters(string(jsonString)) + if err != nil { + t.Errorf("%+v", err) + } + + if q.RoundTries != expected { + t.Errorf("Parameters failed to change! Got %d, Expected %d", q.RoundTries, expected) + } +} + +// No new params path +func TestGetE2EParameters_Default(t *testing.T) { + p := GetDefaultE2E() + + q, err := GetE2EParameters("") + if err != nil { + t.Errorf("%+v", err) + } + + if q.RoundTries != p.RoundTries { + t.Errorf("Parameters failed to change! Got %d, Expected %d", q.RoundTries, p.RoundTries) + } +} diff --git a/interfaces/params/keyExchange.go b/interfaces/params/keyExchange.go index 203703d22f12fc3943786c9f770b4d7626208206..c30b49c6168f52fb7ad97098f123bb226486ee11 100644 --- a/interfaces/params/keyExchange.go +++ b/interfaces/params/keyExchange.go @@ -7,7 +7,9 @@ package params -import "time" +import ( + "time" +) type Rekey struct { RoundTimeout time.Duration diff --git a/interfaces/params/message.go b/interfaces/params/message.go index cc452a85f680b09629818cd1973abb6d5821ffaf..fbf9779829b939145cf7bc1277fa79b5617b826a 100644 --- a/interfaces/params/message.go +++ b/interfaces/params/message.go @@ -7,7 +7,9 @@ package params -import "time" +import ( + "time" +) type Messages struct { MessageReceptionBuffLen uint diff --git a/interfaces/params/network.go b/interfaces/params/network.go index cf7a2b1a1b4cda2623e23ced93adf70213e02445..75e7aa4e4dd04d8a8a9d33f253d95eac8ae44e67 100644 --- a/interfaces/params/network.go +++ b/interfaces/params/network.go @@ -8,6 +8,7 @@ package params import ( + "encoding/json" "time" ) @@ -23,6 +24,7 @@ type Network struct { Rounds Messages + Rekey } func GetDefaultNetwork() Network { @@ -36,3 +38,19 @@ func GetDefaultNetwork() Network { n.Messages = GetDefaultMessage() return n } + +func (n Network) Marshal() ([]byte, error) { + return json.Marshal(n) +} + +// Obtain default Network parameters, or override with given parameters if set +func GetNetworkParameters(params string) (Network, error) { + p := GetDefaultNetwork() + if len(params) > 0 { + err := json.Unmarshal([]byte(params), &p) + if err != nil { + return Network{}, err + } + } + return p, nil +} diff --git a/interfaces/params/network_test.go b/interfaces/params/network_test.go new file mode 100644 index 0000000000000000000000000000000000000000..3b8b609678f279048a3dcd6ec9b9dc634607e6f7 --- /dev/null +++ b/interfaces/params/network_test.go @@ -0,0 +1,44 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2021 Privategrity Corporation / +// / +// All rights reserved. / +//////////////////////////////////////////////////////////////////////////////// + +package params + +import "testing" + +// New params path +func TestGetNetworkParameters(t *testing.T) { + p := GetDefaultNetwork() + + expected := p.MaxCheckedRounds + 1 + p.MaxCheckedRounds = expected + jsonString, err := p.Marshal() + if err != nil { + t.Errorf("%+v", err) + } + + q, err := GetNetworkParameters(string(jsonString)) + if err != nil { + t.Errorf("%+v", err) + } + + if q.MaxCheckedRounds != expected { + t.Errorf("Parameters failed to change! Got %d, Expected %d", q.MaxCheckedRounds, expected) + } +} + +// No new params path +func TestGetNetworkParameters_Default(t *testing.T) { + p := GetDefaultNetwork() + + q, err := GetNetworkParameters("") + if err != nil { + t.Errorf("%+v", err) + } + + if q.MaxCheckedRounds != p.MaxCheckedRounds { + t.Errorf("Parameters failed to change! Got %d, Expected %d", q.MaxCheckedRounds, p.MaxCheckedRounds) + } +} diff --git a/interfaces/params/node.go b/interfaces/params/node.go deleted file mode 100644 index 150cc71f3d9773ef93892310ad4046ba4fb841cd..0000000000000000000000000000000000000000 --- a/interfaces/params/node.go +++ /dev/null @@ -1,22 +0,0 @@ -/////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 xx network SEZC // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file // -/////////////////////////////////////////////////////////////////////////////// - -package params - -//import ( -// "time" -//) - -type NodeKeys struct { - WorkerPoolSize uint -} - -func GetDefaultNodeKeys() NodeKeys { - return NodeKeys{ - WorkerPoolSize: 10, - } -} diff --git a/interfaces/params/rounds.go b/interfaces/params/rounds.go index 73e5ac613afcf8e3bc343be6252a72ccd1c75628..483e6872101ceee68e329173a09c92b0790886c9 100644 --- a/interfaces/params/rounds.go +++ b/interfaces/params/rounds.go @@ -7,7 +7,9 @@ package params -import "time" +import ( + "time" +) type Rounds struct { // maximum number of times to attempt to retrieve a round from a gateway diff --git a/interfaces/params/Unsafe.go b/interfaces/params/unsafe.go similarity index 60% rename from interfaces/params/Unsafe.go rename to interfaces/params/unsafe.go index 19654d88e9ae6fb84e9b2ab9a99ee4de08089d96..556559a88a253107ab803521a550c403e5040113 100644 --- a/interfaces/params/Unsafe.go +++ b/interfaces/params/unsafe.go @@ -7,6 +7,8 @@ package params +import "encoding/json" + type Unsafe struct { CMIX } @@ -14,3 +16,19 @@ type Unsafe struct { func GetDefaultUnsafe() Unsafe { return Unsafe{CMIX: GetDefaultCMIX()} } + +func (u Unsafe) Marshal() ([]byte, error) { + return json.Marshal(u) +} + +// Obtain default Unsafe parameters, or override with given parameters if set +func GetUnsafeParameters(params string) (Unsafe, error) { + p := GetDefaultUnsafe() + if len(params) > 0 { + err := json.Unmarshal([]byte(params), &p) + if err != nil { + return Unsafe{}, err + } + } + return p, nil +} diff --git a/interfaces/params/unsafe_test.go b/interfaces/params/unsafe_test.go new file mode 100644 index 0000000000000000000000000000000000000000..49763fdf11e978201d18f29dacdc5f68bd0c2ec7 --- /dev/null +++ b/interfaces/params/unsafe_test.go @@ -0,0 +1,44 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2021 Privategrity Corporation / +// / +// All rights reserved. / +//////////////////////////////////////////////////////////////////////////////// + +package params + +import "testing" + +// New params path +func TestGetUnsafeParameters(t *testing.T) { + p := GetDefaultUnsafe() + + expected := p.RoundTries + 1 + p.RoundTries = expected + jsonString, err := p.Marshal() + if err != nil { + t.Errorf("%+v", err) + } + + q, err := GetUnsafeParameters(string(jsonString)) + if err != nil { + t.Errorf("%+v", err) + } + + if q.RoundTries != expected { + t.Errorf("Parameters failed to change! Got %d, Expected %d", q.RoundTries, expected) + } +} + +// No new params path +func TestGetUnsafeParameters_Default(t *testing.T) { + p := GetDefaultUnsafe() + + q, err := GetUnsafeParameters("") + if err != nil { + t.Errorf("%+v", err) + } + + if q.RoundTries != p.RoundTries { + t.Errorf("Parameters failed to change! Got %d, Expected %d", q.RoundTries, p.RoundTries) + } +} diff --git a/network/follow.go b/network/follow.go index d711a7381bad44aea2a1e1bc9d34948ac99c2380..fdde482420c16384e9e061636522f46c6a28ebc0 100644 --- a/network/follow.go +++ b/network/follow.go @@ -23,12 +23,13 @@ package network // instance import ( - "gitlab.com/elixxir/client/network/gateway" - //"gitlab.com/elixxir/client/storage" + "bytes" jww "github.com/spf13/jwalterweatherman" bloom "gitlab.com/elixxir/bloomfilter" + "gitlab.com/elixxir/client/network/gateway" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/primitives/knownRounds" + "gitlab.com/elixxir/primitives/states" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/primitives/id" @@ -62,7 +63,7 @@ func (m *manager) followNetwork(quitCh <-chan struct{}) { } } -var followCnt int = 0 +var followCnt = 0 // executes each iteration of the follower func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { @@ -86,10 +87,10 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { LastUpdate: uint64(m.Instance.GetLastUpdateID()), ClientID: m.Uid.Bytes(), } - jww.TRACE.Printf("polling %s for NDF", gwHost) + jww.TRACE.Printf("Polling %s for NDF...", gwHost) pollResp, err := comms.SendPoll(gwHost, &pollReq) if err != nil { - jww.ERROR.Printf("%+v", err) + jww.ERROR.Printf("Unable to poll %s for NDF: %+v", gwHost, err) return } @@ -98,7 +99,7 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { gwRoundsState := &knownRounds.KnownRounds{} err = gwRoundsState.Unmarshal(pollResp.KnownRounds) if err != nil { - jww.ERROR.Printf("Failed to unmartial: %+v", err) + jww.ERROR.Printf("Failed to unmarshal: %+v", err) return } var filterList []*bloom.Ring @@ -127,13 +128,13 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { if pollResp.PartialNDF != nil { err = m.Instance.UpdatePartialNdf(pollResp.PartialNDF) if err != nil { - jww.ERROR.Printf("%+v", err) + jww.ERROR.Printf("Unable to update partial NDF: %+v", err) return } err = m.Instance.UpdateGatewayConnections() if err != nil { - jww.ERROR.Printf("%+v", err) + jww.ERROR.Printf("Unable to update gateway connections: %+v", err) return } } @@ -142,11 +143,48 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { // network if pollResp.Updates != nil { err = m.Instance.RoundUpdates(pollResp.Updates) - //jww.TRACE.Printf("%+v", pollResp.Updates) if err != nil { jww.ERROR.Printf("%+v", err) return } + + // Iterate over ClientErrors for each RoundUpdate + for _, update := range pollResp.Updates { + + // Ignore irrelevant updates + if update.State != uint32(states.COMPLETED) && update.State != uint32(states.FAILED) { + continue + } + + for _, clientErr := range update.ClientErrors { + + // If this Client appears in the ClientError + if bytes.Equal(clientErr.ClientId, m.Session.GetUser().ID.Marshal()) { + + // Obtain relevant NodeGateway information + nGw, err := m.Instance.GetNodeAndGateway(gwHost.GetId()) + if err != nil { + jww.ERROR.Printf("Unable to get NodeGateway: %+v", err) + return + } + nid, err := nGw.Node.GetNodeId() + if err != nil { + jww.ERROR.Printf("Unable to get NodeID: %+v", err) + return + } + + // FIXME: Should be able to trigger proper type of round event + // FIXME: without mutating the RoundInfo. Signature also needs verified + // FIXME: before keys are deleted + update.State = uint32(states.FAILED) + m.Instance.GetRoundEvents().TriggerRoundEvent(update) + + // Delete all existing keys and trigger a re-registration with the relevant Node + m.Session.Cmix().Remove(nid) + m.Instance.GetAddGatewayChan() <- nGw + } + } + } } // ---- Round Processing ----- diff --git a/network/internal/internal.go b/network/internal/internal.go index ae0580da5d72bbe83cd8e84e3d61485a80bdc933..51b25214cb89e94cc10234488b6ea6010e8ed765 100644 --- a/network/internal/internal.go +++ b/network/internal/internal.go @@ -35,6 +35,4 @@ type Internal struct { //channels NodeRegistration chan network.NodeGateway - //local pointer to user ID because it is used often - } diff --git a/network/message/critical.go b/network/message/critical.go index c6dbfbd9f8efc24b3d9e644136b7325340cdd9ce..e43cee7aa56684fcada1218136fb3549abd7e8ec 100644 --- a/network/message/critical.go +++ b/network/message/critical.go @@ -44,7 +44,7 @@ func (m *Manager) processCriticalMessages(quitCh <-chan struct{}) { func (m *Manager) criticalMessages() { critMsgs := m.Session.GetCriticalMessages() // try to send every message in the critical messages and the raw critical - // messages buffer in paralell + // messages buffer in parallel //critical messages for msg, param, has := critMsgs.Next(); has; msg, param, has = critMsgs.Next() { @@ -60,7 +60,7 @@ func (m *Manager) criticalMessages() { return } jww.INFO.Printf("critical RoundIDs: %v", rounds) - //wait on the results to make sure the rounds were sucesfull + //wait on the results to make sure the rounds were successful sendResults := make(chan ds.EventReturn, len(rounds)) roundEvents := m.Instance.GetRoundEvents() for _, r := range rounds { diff --git a/network/message/garbled.go b/network/message/garbled.go index b5ddc1b0f4c51a55a833d2961a19949977002b20..6ae13f50ee2409ac7ca43e81bc3645985e0b2c07 100644 --- a/network/message/garbled.go +++ b/network/message/garbled.go @@ -9,6 +9,7 @@ package message import ( "gitlab.com/elixxir/client/interfaces/message" + "gitlab.com/elixxir/primitives/format" "time" ) @@ -45,6 +46,7 @@ func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) { func (m *Manager) handleGarbledMessages() { garbledMsgs := m.Session.GetGarbledMessages() e2eKv := m.Session.E2e() + var failedMsgs []format.Message //try to decrypt every garbled message, excising those who's counts are too high for grbldMsg, count, timestamp, has := garbledMsgs.Next(); has; grbldMsg, count, timestamp, has = garbledMsgs.Next() { fingerprint := grbldMsg.GetKeyFP() @@ -52,9 +54,9 @@ func (m *Manager) handleGarbledMessages() { if key, isE2E := e2eKv.PopKey(fingerprint); isE2E { // Decrypt encrypted message msg, err := key.Decrypt(grbldMsg) - // get the sender - sender := key.GetSession().GetPartner() if err == nil { + // get the sender + sender := key.GetSession().GetPartner() //remove from the buffer if decryption is successful garbledMsgs.Remove(grbldMsg) //handle the successfully decrypted message @@ -74,7 +76,10 @@ func (m *Manager) handleGarbledMessages() { time.Since(timestamp) > m.param.GarbledMessageWait { garbledMsgs.Remove(grbldMsg) } else { - garbledMsgs.Failed(grbldMsg) + failedMsgs = append(failedMsgs, grbldMsg) } } + for _, grbldMsg := range failedMsgs { + garbledMsgs.Failed(grbldMsg) + } } diff --git a/network/message/garbled_test.go b/network/message/garbled_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b59fded2594341bb41003f7e6ad84b899320464b --- /dev/null +++ b/network/message/garbled_test.go @@ -0,0 +1,123 @@ +package message + +import ( + "encoding/binary" + "gitlab.com/elixxir/client/interfaces/message" + "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/network/internal" + "gitlab.com/elixxir/client/network/message/parse" + "gitlab.com/elixxir/client/storage" + "gitlab.com/elixxir/client/storage/e2e" + "gitlab.com/elixxir/client/switchboard" + "gitlab.com/elixxir/comms/client" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/crypto/csprng" + "math/rand" + "testing" + "time" +) + +type TestListener struct { + ch chan bool +} + +// the Hear function is called to exercise the listener, passing in the +// data as an item +func (l TestListener) Hear(item message.Receive) { + l.ch <- true +} + +// Returns a name, used for debugging +func (l TestListener) Name() string { + return "TEST LISTENER FOR GARBLED MESSAGES" +} + +func TestManager_CheckGarbledMessages(t *testing.T) { + sess1 := storage.InitTestingSession(t) + + sess2 := storage.InitTestingSession(t) + + sw := switchboard.New() + l := TestListener{ + ch: make(chan bool), + } + sw.RegisterListener(sess2.GetUser().ID, message.Raw, l) + comms, err := client.NewClientComms(sess1.GetUser().ID, nil, nil, nil) + if err != nil { + t.Errorf("Failed to start client comms: %+v", err) + } + i := internal.Internal{ + Session: sess1, + Switchboard: sw, + Rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), + Comms: comms, + Health: nil, + Uid: sess1.GetUser().ID, + Instance: nil, + NodeRegistration: nil, + } + m := NewManager(i, params.Messages{ + MessageReceptionBuffLen: 20, + MessageReceptionWorkerPoolSize: 20, + MaxChecksGarbledMessage: 20, + GarbledMessageWait: time.Hour, + }, nil) + + e2ekv := i.Session.E2e() + err = e2ekv.AddPartner(sess2.GetUser().ID, sess2.E2e().GetDHPublicKey(), e2ekv.GetDHPrivateKey(), e2e.GetDefaultSessionParams(), e2e.GetDefaultSessionParams()) + if err != nil { + t.Errorf("Failed to add e2e partner: %+v", err) + t.FailNow() + } + + err = sess2.E2e().AddPartner(sess1.GetUser().ID, sess1.E2e().GetDHPublicKey(), sess2.E2e().GetDHPrivateKey(), e2e.GetDefaultSessionParams(), e2e.GetDefaultSessionParams()) + if err != nil { + t.Errorf("Failed to add e2e partner: %+v", err) + t.FailNow() + } + partner1, err := sess2.E2e().GetPartner(sess1.GetUser().ID) + if err != nil { + t.Errorf("Failed to get partner: %+v", err) + t.FailNow() + } + + msg := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen()) + + key, err := partner1.GetKeyForSending(params.Standard) + if err != nil { + t.Errorf("failed to get key: %+v", err) + t.FailNow() + } + + contents := make([]byte, msg.ContentsSize()) + prng := rand.New(rand.NewSource(42)) + prng.Read(contents) + fmp := parse.FirstMessagePartFromBytes(contents) + binary.BigEndian.PutUint32(fmp.Type, uint32(message.Raw)) + fmp.NumParts[0] = uint8(1) + binary.BigEndian.PutUint16(fmp.Len, 256) + fmp.Part[0] = 0 + ts, err := time.Now().MarshalBinary() + if err != nil { + t.Errorf("failed to martial ts: %+v", err) + } + copy(fmp.Timestamp, ts) + msg.SetContents(fmp.Bytes()) + encryptedMsg := key.Encrypt(msg) + i.Session.GetGarbledMessages().Add(encryptedMsg) + + quitch := make(chan struct{}) + go m.processGarbledMessages(quitch) + + m.CheckGarbledMessages() + + ticker := time.NewTicker(time.Second) + select { + case <-ticker.C: + t.Error("Didn't hear anything") + case <-l.ch: + t.Log("Heard something") + } + +} diff --git a/storage/session.go b/storage/session.go index 258506d4bad0657a5e4d0686203b60f00b1511e0..44278489c94d868f1b76556c2375c8fe8981d298 100644 --- a/storage/session.go +++ b/storage/session.go @@ -336,5 +336,13 @@ func InitTestingSession(i interface{}) *Session { globals.Log.FATAL.Panicf("InitTestingSession failed to create dummy critical messages: %+v", err) } + s.garbledMessages, err = utility.NewMeteredCmixMessageBuffer(s.kv, garbledMessagesKey) + if err != nil { + globals.Log.FATAL.Panicf("Failed to create garbledMessages buffer: %+v", err) + } + + s.conversations = conversation.NewStore(s.kv) + s.partition = partition.New(s.kv) + return s } diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go index 5464881caf40e48df6a5f44bd6c60259b956b6bf..bbfc1727d65f5e9155d1f5a030670c61df394168 100644 --- a/storage/utility/messageBuffer.go +++ b/storage/utility/messageBuffer.go @@ -257,6 +257,7 @@ func (mb *MessageBuffer) Next() (interface{}, bool) { if err != nil { jww.FATAL.Panicf("Could not load message: %v", err) } + return m, true }