diff --git a/api/client.go b/api/client.go index b775354dcbd951d04bd289a05ef91bd099b58f96..fc89f70d3464128d1bf4276fbd623f23f8d05dc3 100644 --- a/api/client.go +++ b/api/client.go @@ -593,7 +593,7 @@ func (c *Client) GetErrorsChannel() <-chan interfaces.ClientError { // Requests all messages in a given round from the gateway of the last nodes // - Message Handling Worker Group (/network/message/handle.go) // Decrypts and partitions messages when signals via the Switchboard -// - Health Tracker (/network/health) +// - health Tracker (/network/health) // Via the network instance tracks the state of the network // - Garbled Messages (/network/message/garbled.go) // Can be signaled to check all recent messages which could be be decoded diff --git a/api/utils_test.go b/api/utils_test.go index d53aa46eaa616c321881457ee99f84a4d30232bf..7bfa0836856e31ef230a8660199715d7021153b1 100644 --- a/api/utils_test.go +++ b/api/utils_test.go @@ -53,7 +53,7 @@ func newTestingClient(face interface{}) (*Client, error) { cert, err := utils.ReadFile(testkeys.GetNodeCertPath()) if err != nil { - jww.FATAL.Panicf("Failed to create new test Instance: %v", err) + jww.FATAL.Panicf("Failed to create new test instance: %v", err) } commsManager.AddHost(&id.Permissioning, "", cert, connect.GetDefaultHostParams()) diff --git a/bindings/client.go b/bindings/client.go index 6e0f519a4d69b6df52b2631afb2023ab0efe0c86..f0f74433f46bcbb2870935d16174ef89661fe8f4 100644 --- a/bindings/client.go +++ b/bindings/client.go @@ -230,7 +230,7 @@ func UnmarshalSendReport(b []byte) (*SendReport, error) { // Requests all messages in a given round from the gateway of the last nodes // - Message Handling Worker Group (/network/message/handle.go) // Decrypts and partitions messages when signals via the Switchboard -// - Health Tracker (/network/health) +// - health Tracker (/network/health) // Via the network instance tracks the state of the network // - Garbled Messages (/network/message/garbled.go) // Can be signaled to check all recent messages which could be be decoded diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index a982d3ba226565db9e4886132f164998560826a6..5e4271c13796e22d93aabd2f1f13836066f29df8 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -538,7 +538,7 @@ type testHealthTracker struct { } //////////////////////////////////////////////////////////////////////////////// -// Test Health Tracker // +// Test health Tracker // //////////////////////////////////////////////////////////////////////////////// func newTestHealthTracker() testHealthTracker { diff --git a/interfaces/params/CMIX.go b/interfaces/params/CMIX.go index b7351ce00a4fe641159f8225f71a4e19f02650b5..5d4fadbdf190019a0cf33af67d28426422728560 100644 --- a/interfaces/params/CMIX.go +++ b/interfaces/params/CMIX.go @@ -9,7 +9,9 @@ package params import ( "encoding/json" + "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/primitives/excludedRounds" + "gitlab.com/xx_network/primitives/id" "time" ) @@ -31,6 +33,13 @@ type CMIX struct { // Tag which prints with sending logs to help localize the source // All internal sends are tagged, so the default tag is "External" DebugTag string + + //Threading interface, can be used to stop the send early + Stop *stoppable.Single + + //List of nodes to not send to, will skip a round with these + //nodes in it + BlacklistedNodes map[id.ID]interface{} } func GetDefaultCMIX() CMIX { diff --git a/interfaces/params/network.go b/interfaces/params/network.go index 16fa06bec47950aa3fab8d525528f3c38aa72a9b..370cee8e16d180ce6f13a2dafc80ec1f0068c145 100644 --- a/interfaces/params/network.go +++ b/interfaces/params/network.go @@ -18,7 +18,7 @@ type Network struct { MaxCheckedRounds uint // Size of the buffer of nodes to register RegNodesBufferLen uint - // Longest delay between network events for Health tracker to denote that + // Longest delay between network events for health tracker to denote that // the network is in a bad state NetworkHealthTimeout time.Duration //Number of parallel nodes registration the client is capable of diff --git a/network/ephemeral/testutil.go b/network/ephemeral/testutil.go index 32ce9574c6081a2cb3b201ee1228794e2f06e754..a1d0602bae786e983af00d9dc942a00b18abb6e0 100644 --- a/network/ephemeral/testutil.go +++ b/network/ephemeral/testutil.go @@ -118,7 +118,7 @@ func NewTestNetworkManager(i interface{}) interfaces.NetworkManager { cert, err := utils.ReadFile(testkeys.GetNodeCertPath()) if err != nil { - jww.FATAL.Panicf("Failed to create new test Instance: %+v", err) + jww.FATAL.Panicf("Failed to create new test instance: %+v", err) } _, err = commsManager.AddHost( @@ -133,7 +133,7 @@ func NewTestNetworkManager(i interface{}) interfaces.NetworkManager { thisInstance, err := network.NewInstanceTesting( instanceComms, getNDF(), getNDF(), nil, nil, i) if err != nil { - jww.FATAL.Panicf("Failed to create new test Instance: %+v", err) + jww.FATAL.Panicf("Failed to create new test instance: %+v", err) } thisManager := &testNetworkManager{instance: thisInstance} diff --git a/network/follow.go b/network/follow.go index d222ae813772c8020b71e27d2afdbef48461fd9b..fa5386c6d353d73c64ff283397596ed44574b87a 100644 --- a/network/follow.go +++ b/network/follow.go @@ -62,7 +62,7 @@ func (m *manager) followNetwork(report interfaces.ClientErrorReport, stop *stoppable.Single) { ticker := time.NewTicker(m.param.TrackNetworkPeriod) TrackTicker := time.NewTicker(debugTrackPeriod) - rng := m.Rng.GetStream() + rng := m.rng.GetStream() abandon := func(round id.Round) { return } if m.verboseRounds != nil { @@ -112,7 +112,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, comms followNetworkComms, stop *stoppable.Single, abandon func(round id.Round)) { //get the identity we will poll for - identity, err := m.Session.Reception().GetIdentity(rng, m.addrSpace.GetWithoutWait()) + identity, err := m.session.Reception().GetIdentity(rng, m.addrSpace.GetWithoutWait()) if err != nil { jww.FATAL.Panicf("Failed to get an identity, this should be "+ "impossible: %+v", err) @@ -131,7 +131,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, atomic.AddUint64(m.tracker, 1) // get client version for poll - version := m.Session.GetClientVersion() + version := m.session.GetClientVersion() // Poll network updates pollReq := pb.GatewayPoll{ @@ -198,7 +198,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, // update gateway connections m.GetSender().UpdateNdf(m.GetInstance().GetPartialNdf().Get()) - m.Session.SetNDF(m.GetInstance().GetPartialNdf().Get()) + m.session.SetNDF(m.GetInstance().GetPartialNdf().Get()) } // Pull rate limiting parameter values from NDF @@ -241,7 +241,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, for _, clientErr := range update.ClientErrors { // If this Client appears in the ClientError - if bytes.Equal(clientErr.ClientId, m.Session.GetUser().TransmissionID.Marshal()) { + if bytes.Equal(clientErr.ClientId, m.session.GetUser().TransmissionID.Marshal()) { // Obtain relevant NodeGateway information nid, err := id.Unmarshal(clientErr.Source) @@ -262,7 +262,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, update.State = uint32(states.FAILED) // delete all existing keys and trigger a re-registration with the relevant Node - m.Session.Cmix().Remove(nid) + m.session.Cmix().Remove(nid) m.Instance.GetAddGatewayChan() <- nGw } } diff --git a/network/health/tracker.go b/network/health/tracker.go index 8be8c05ff98b7a37f9fad79e3b6572c846bf9f61..bfef7a53678fcd677cacd59ba7460aa9afcde6a5 100644 --- a/network/health/tracker.go +++ b/network/health/tracker.go @@ -145,7 +145,7 @@ func (t *Tracker) Start() (stoppable.Stoppable, error) { t.mux.Lock() if t.running { t.mux.Unlock() - return nil, errors.New("cannot start Health tracker threads, " + + return nil, errors.New("cannot start health tracker threads, " + "they are already running") } t.running = true @@ -153,7 +153,7 @@ func (t *Tracker) Start() (stoppable.Stoppable, error) { t.isHealthy = false t.mux.Unlock() - stop := stoppable.NewSingle("Health Tracker") + stop := stoppable.NewSingle("health Tracker") go t.start(stop) @@ -197,7 +197,7 @@ func (t *Tracker) transmit(health bool) { select { case c <- health: default: - jww.DEBUG.Printf("Unable to send Health event") + jww.DEBUG.Printf("Unable to send health event") } } diff --git a/network/internal/internal.go b/network/internal/internal.go index 8fe96d073d123e971f14f3a3985bb05c30c0ce10..636330aa2110999ff7bf91b1ac0fd5033dd8c3d3 100644 --- a/network/internal/internal.go +++ b/network/internal/internal.go @@ -14,7 +14,6 @@ import ( "gitlab.com/elixxir/client/switchboard" "gitlab.com/elixxir/comms/client" "gitlab.com/elixxir/comms/network" - "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/xx_network/primitives/id" ) @@ -22,7 +21,6 @@ type Internal struct { Session *storage.Session Switchboard *switchboard.Switchboard //generic RNG for client - Rng *fastRNG.StreamGenerator // Comms pointer to send/recv messages Comms *client.Comms diff --git a/network/manager.go b/network/manager.go index 8831f7f19522acfccee54c91f78e68b22bbf5783..3c8540f34bb2da4fa60f3f3d489aa36fdb53d94a 100644 --- a/network/manager.go +++ b/network/manager.go @@ -21,13 +21,11 @@ import ( "gitlab.com/elixxir/client/network/ephemeral" "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/health" - "gitlab.com/elixxir/client/network/internal" "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/network/nodes" "gitlab.com/elixxir/client/network/rounds" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" - "gitlab.com/elixxir/client/switchboard" "gitlab.com/elixxir/comms/client" commNetwork "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" @@ -49,17 +47,27 @@ const fakeIdentityRange = 800 // controls access to network resources and implements all the communications // functions used by the client. type manager struct { + //User Identity Storage + session *storage.Session + //generic RNG for client + rng *fastRNG.StreamGenerator + // comms pointer to send/recv messages + comms *client.Comms + //contains the health tracker which keeps track of if from the client's + //perspective, the network is in good condition + health *health.Tracker + //contains the network instance + instance *commNetwork.Instance + // parameters of the network param params.Network // handles message sending sender *gateway.Sender - //Shared data with all sub managers - internal.Internal - //sub-managers - round *rounds.Manager - message *message.manager + message.Pickup + nodes.Registrar + round *rounds.Manager // Earliest tracked round earliestRound *uint64 @@ -78,7 +86,7 @@ type manager struct { } // NewManager builds a new reception manager object using inputted key fields -func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, +func NewManager(session *storage.Session, rng *fastRNG.StreamGenerator, events interfaces.EventManager, comms *client.Comms, params params.Network, ndf *ndf.NetworkDefinition) (interfaces.NetworkManager, error) { @@ -90,11 +98,6 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, " client network manager") } - // Note: These are not loaded/stored in E2E Store, but the - // E2E Session Params are a part of the network parameters, so we - // set them here when they are needed on startup - session.E2e().SetE2ESessionParams(params.E2EParams) - tracker := uint64(0) earliest := uint64(0) // create manager object @@ -104,6 +107,11 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, addrSpace: ephemeral.NewAddressSpace(), events: events, earliestRound: &earliest, + session: session, + rng: rng, + comms: comms, + health: health.Init(instance, params.NetworkHealthTimeout), + instance: instance, } m.addrSpace.Update(18) @@ -111,21 +119,9 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, m.verboseRounds = NewRoundTracker() } - m.Internal = internal.Internal{ - Session: session, - Switchboard: switchboard, - Rng: rng, - Comms: comms, - Health: health.Init(instance, params.NetworkHealthTimeout), - NodeRegistration: make(chan commNetwork.NodeGateway, params.RegNodesBufferLen), - Instance: instance, - TransmissionID: session.User().GetCryptographicIdentity().GetTransmissionID(), - ReceptionID: session.User().GetCryptographicIdentity().GetReceptionID(), - Events: events, - } + /* set up modules */ - // Set up nodes registration chan for network instance - m.Instance.SetAddGatewayChan(m.NodeRegistration) + nodechan := make(chan commNetwork.NodeGateway, nodes.InputChanLen) // Set up gateway.Sender poolParams := gateway.DefaultPoolParams() @@ -135,21 +131,33 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, poolParams.MaxPings = 50 poolParams.ForceConnection = true m.sender, err = gateway.NewSender(poolParams, rng, - ndf, comms, session, m.NodeRegistration) + ndf, comms, session, nodechan) + if err != nil { + return nil, err + } + + //setup the node registrar + m.Registrar, err = nodes.LoadRegistrar(session, m.sender, m.comms, m.rng, nodechan) if err != nil { return nil, err } + //set up round handler + m.round = rounds.NewManager(m.Internal, m.param.Rounds, m.Pickup.GetMessageReceptionChannel(), m.sender) + + //Set up Message Pickup + m.Pickup = message.NewPickup(params, nodechan, m.sender, m.session, m.rng, + m.events, m.comms, m.Registrar, m.instance) + + // Set upthe ability to register with new nodes when they appear + m.instance.SetAddGatewayChan(nodechan) + // Report health events - m.Internal.Health.AddFunc(func(isHealthy bool) { - m.Internal.Events.Report(5, "Health", "IsHealthy", + m.health.AddFunc(func(isHealthy bool) { + m.events.Report(5, "health", "IsHealthy", fmt.Sprintf("%v", isHealthy)) }) - //create sub managers - m.message = message.NewManager(m.Internal, m.param, m.NodeRegistration, m.sender) - m.round = rounds.NewManager(m.Internal, m.param.Rounds, m.message.GetMessageReceptionChannel(), m.sender) - return &m, nil } @@ -159,7 +167,7 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, // - Historical Round Retrieval (/network/rounds/historical.go) // - Message Retrieval Worker Group (/network/rounds/retrieve.go) // - Message Handling Worker Group (/network/message/handle.go) -// - Health Tracker (/network/health) +// - health Tracker (/network/health) // - Garbled Messages (/network/message/inProgress.go) // - Critical Messages (/network/message/critical.go) // - Ephemeral ID tracking (network/ephemeral/tracker.go) @@ -167,17 +175,15 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab multi := stoppable.NewMulti("networkManager") // health tracker - healthStop, err := m.Health.Start() + healthStop, err := m.health.Start() if err != nil { return nil, errors.Errorf("failed to follow") } multi.Add(healthStop) // Node Updates - multi.Add(nodes.StartRegistration(m.GetSender(), m.Session, m.Rng, - m.Comms, m.NodeRegistration, m.param.ParallelNodeRegistrations)) // Adding/MixCypher - //TODO-remover - //m.runners.AddFingerprint(StartNodeRemover(m.Context)) // Removing + multi.Add(m.Registrar.StartProcesses(m.param.ParallelNodeRegistrations)) // Adding/MixCypher + //TODO-node remover // Start the Network Tracker trackNetworkStopper := stoppable.NewSingle("TrackNetwork") @@ -185,12 +191,12 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab multi.Add(trackNetworkStopper) // Message reception - multi.Add(m.message.StartProcessies()) + multi.Add(m.Pickup.StartProcessies()) // Round processing multi.Add(m.round.StartProcessors()) - multi.Add(ephemeral.Track(m.Session, m.addrSpace, m.ReceptionID)) + multi.Add(ephemeral.Track(m.session, m.addrSpace, m.ReceptionID)) return multi, nil } @@ -202,12 +208,12 @@ func (m *manager) GetEventManager() interfaces.EventManager { // GetHealthTracker returns the health tracker func (m *manager) GetHealthTracker() interfaces.HealthTracker { - return m.Health + return m.health } // GetInstance returns the network instance object (ndf state) func (m *manager) GetInstance() *commNetwork.Instance { - return m.Instance + return m.instance } // GetSender returns the gateway.Sender object @@ -215,19 +221,6 @@ func (m *manager) GetSender() *gateway.Sender { return m.sender } -// CheckGarbledMessages triggers a check on garbled messages to see if they can be decrypted -// this should be done when a new e2e client is added in case messages were -// received early or arrived out of order -func (m *manager) CheckGarbledMessages() { - m.message.CheckGarbledMessages() -} - -// InProgressRegistrations returns an approximation of the number of in progress -// nodes registrations. -func (m *manager) InProgressRegistrations() int { - return len(m.Internal.NodeRegistration) -} - // GetAddressSize returns the current address space size. It blocks until an // address space size is set. func (m *manager) GetAddressSize() uint8 { diff --git a/network/message/inProgress_test.go b/network/message/inProgress_test.go index f5c76b05717169b148fbfe9b52ea625838805c94..c8646569468969f130144187364e29deafea3646 100644 --- a/network/message/inProgress_test.go +++ b/network/message/inProgress_test.go @@ -77,7 +77,7 @@ func TestManager_CheckGarbledMessages(t *testing.T) { if err != nil { t.Errorf(err.Error()) } - m := NewManager(i, params.Network{Messages: params.Messages{ + m := NewPickup(i, params.Network{Messages: params.Messages{ MessageReceptionBuffLen: 20, MessageReceptionWorkerPoolSize: 20, MaxChecksInProcessMessage: 20, diff --git a/network/message/manager.go b/network/message/pickup.go similarity index 99% rename from network/message/manager.go rename to network/message/pickup.go index 8afaf5c67ca9fb771c8484829602ba3c7d1f36f6..8cea698e3d902f91a601377b0d3005cf339788e7 100644 --- a/network/message/manager.go +++ b/network/message/pickup.go @@ -74,7 +74,7 @@ type pickup struct { rateLimitParams utility.BucketParamStore } -func NewManager(param params.Network, +func NewPickup(param params.Network, nodeRegistration chan network.NodeGateway, sender *gateway.Sender, session *storage.Session, rng *fastRNG.StreamGenerator, events interfaces.EventManager, comms network2.SendCmixCommsInterface, diff --git a/network/nodes/registrar.go b/network/nodes/registrar.go index 3f23def58b7a3ef3bdfb4624cca9e00c5936272e..1fa12a26013cc6f4fb1c3b4c42f16faa83a108ce 100644 --- a/network/nodes/registrar.go +++ b/network/nodes/registrar.go @@ -59,10 +59,10 @@ type registrar struct { // LoadRegistrar loads a registrar from disk, and creates a new one if it does // not exist. -func LoadRegistrar(kv *versioned.KV, session *storage.Session, +func LoadRegistrar(session *storage.Session, sender *gateway.Sender, comms RegisterNodeCommsInterface, - rngGen *fastRNG.StreamGenerator) (Registrar, error) { - kv = kv.Prefix(prefix) + rngGen *fastRNG.StreamGenerator, c chan network.NodeGateway) (Registrar, error) { + kv := session.GetKV().Prefix(prefix) r := ®istrar{ nodes: make(map[id.ID]*key), kv: kv, @@ -88,7 +88,7 @@ func LoadRegistrar(kv *versioned.KV, session *storage.Session, r.comms = comms r.rng = rngGen - r.c = make(chan network.NodeGateway, InputChanLen) + r.c = c return r, nil } diff --git a/network/send.go b/network/send.go deleted file mode 100644 index 4752b9f6a4d41a666fb09785192469330b2e8fa6..0000000000000000000000000000000000000000 --- a/network/send.go +++ /dev/null @@ -1,75 +0,0 @@ -/////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 xx network SEZC // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file // -/////////////////////////////////////////////////////////////////////////////// - -package network - -import ( - "github.com/pkg/errors" - jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/interfaces/message" - "gitlab.com/elixxir/client/interfaces/params" - "gitlab.com/elixxir/client/stoppable" - "gitlab.com/elixxir/crypto/e2e" - "gitlab.com/elixxir/primitives/format" - "gitlab.com/xx_network/primitives/id" - "gitlab.com/xx_network/primitives/id/ephemeral" - "time" -) - -// SendCMIX sends a "raw" CMIX message payload to the provided -// recipient. Note that both SendE2E and SendUnsafe call SendCMIX. -// Returns the round ID of the round the payload was sent or an error -// if it fails. -func (m *manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CMIX) (id.Round, ephemeral.Id, error) { - if !m.Health.IsHealthy() { - return 0, ephemeral.Id{}, errors.New("Cannot send cmix message when the " + - "network is not healthy") - } - - return m.message.SendCMIX(m.GetSender(), msg, recipient, param, nil) -} - -// SendManyCMIX sends many "raw" CMIX message payloads to each of the -// provided recipients. Used for group chat functionality. Returns the -// round ID of the round the payload was sent or an error if it fails. -func (m *manager) SendManyCMIX(msgs []message.TargetedCmixMessage, - p params.CMIX) (id.Round, []ephemeral.Id, error) { - - return m.message.SendManyCMIX(m.sender, msgs, p, nil) -} - -// SendUnsafe sends an unencrypted payload to the provided recipient -// with the provided msgType. Returns the list of rounds in which parts -// of the message were sent or an error if it fails. -// NOTE: Do not use this function unless you know what you are doing. -// This function always produces an error message in client logging. -func (m *manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { - if !m.Health.IsHealthy() { - return nil, errors.New("cannot send unsafe message when the " + - "network is not healthy") - } - - jww.WARN.Println("Sending unsafe message. Unsafe payloads have no end" + - " to end encryption, they have limited security and privacy " + - "preserving properties") - - return m.message.SendUnsafe(msg, param) -} - -// SendE2E sends an end-to-end payload to the provided recipient with -// the provided msgType. Returns the list of rounds in which parts of -// the message were sent or an error if it fails. -func (m *manager) SendE2E(msg message.Send, e2eP params.E2E, stop *stoppable.Single) ( - []id.Round, e2e.MessageID, time.Time, error) { - - if !m.Health.IsHealthy() { - return nil, e2e.MessageID{}, time.Time{}, errors.New("Cannot send e2e " + - "message when the network is not healthy") - } - - return m.message.SendE2E(msg, e2eP, stop) -} diff --git a/network/sendCmix.go b/network/sendCmix.go index c8e01cc22dc745aaa9933d37e6d591fe0f4f5bb7..96f0c6479d1068fa5d04714c6aebfd6b23cba946 100644 --- a/network/sendCmix.go +++ b/network/sendCmix.go @@ -23,7 +23,6 @@ import ( "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/primitives/excludedRounds" "gitlab.com/elixxir/primitives/format" - "gitlab.com/elixxir/primitives/states" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" @@ -33,33 +32,21 @@ import ( "time" ) -// WARNING: Potentially Unsafe -// Public manager function to send a message over CMIX -func (m *message.manager) SendCMIX(sender *gateway.Sender, msg format.Message, - recipient *id.ID, cmixParams params.CMIX, - stop *stoppable.Single) (id.Round, ephemeral.Id, error) { +// SendCMIX sends a "raw" CMIX message payload to the provided +// recipient. Note that both SendE2E and SendUnsafe call SendCMIX. +// Returns the round ID of the round the payload was sent or an error +// if it fails. +func (m *manager) SendCMIX(msg format.Message, + recipient *id.ID, cmixParams params.CMIX) (id.Round, ephemeral.Id, error) { + if !m.health.IsHealthy() { + return 0, ephemeral.Id{}, errors.New("Cannot send cmix message when the " + + "network is not healthy") + } msgCopy := msg.Copy() - return sendCmixHelper(sender, msgCopy, recipient, cmixParams, m.blacklistedNodes, m.instance, - m.session, m.nodeRegistration, m.rng, m.Internal.Events, - m.TransmissionID, m.Comms, stop) -} - -func calculateSendTimeout(best *pb.RoundInfo, max time.Duration) time.Duration { - RoundStartTime := time.Unix(0, - int64(best.Timestamps[states.QUEUED])) - // 250ms AFTER the round starts to hear the response. - timeout := RoundStartTime.Sub( - netTime.Now().Add(250 * time.Millisecond)) - if timeout > max { - timeout = max - } - // time.Duration is a signed int, so check for negative - if timeout < 0 { - // TODO: should this produce a warning? - timeout = 100 * time.Millisecond - } - return timeout + return sendCmixHelper(m.sender, msgCopy, recipient, cmixParams, m.instance, + m.session, m.Registrar, m.rng, m.events, + m.session.User().GetCryptographicIdentity().GetTransmissionID(), m.comms) } // Helper function for sendCmix @@ -72,11 +59,10 @@ func calculateSendTimeout(best *pb.RoundInfo, max time.Duration) time.Duration { // which can be registered with the network instance to get a callback on // its status func sendCmixHelper(sender *gateway.Sender, msg format.Message, - recipient *id.ID, cmixParams params.CMIX, blacklistedNodes map[string]interface{}, instance *network.Instance, + recipient *id.ID, cmixParams params.CMIX, instance *network.Instance, session *storage.Session, nodes nodes.Registrar, rng *fastRNG.StreamGenerator, events interfaces.EventManager, - senderId *id.ID, comms SendCmixCommsInterface, - stop *stoppable.Single) (id.Round, ephemeral.Id, error) { + senderId *id.ID, comms SendCmixCommsInterface) (id.Round, ephemeral.Id, error) { timeStart := netTime.Now() maxTimeout := sender.GetHostParams().SendTimeout @@ -135,12 +121,17 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, // Determine whether the selected round contains any Nodes // that are blacklisted by the params.Network object containsBlacklisted := false - for _, nodeId := range bestRound.Topology { - if _, isBlacklisted := blacklistedNodes[string(nodeId)]; isBlacklisted { - containsBlacklisted = true - break + if cmixParams.BlacklistedNodes != nil { + for _, nodeId := range bestRound.Topology { + nid := &id.ID{} + copy(nid[:], nodeId) + if _, isBlacklisted := cmixParams.BlacklistedNodes[*nid]; isBlacklisted { + containsBlacklisted = true + break + } } } + if containsBlacklisted { jww.WARN.Printf("[SendCMIX-%s]Round %d contains blacklisted nodes, "+ "skipping...", cmixParams.DebugTag, bestRound.ID) @@ -203,7 +194,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, jww.TRACE.Printf("[SendCMIX-%s] sendToPreferred %s", cmixParams.DebugTag, firstGateway) result, err := sender.SendToPreferred( - []*id.ID{firstGateway}, sendFunc, stop, cmixParams.SendTimeout) + []*id.ID{firstGateway}, sendFunc, cmixParams.Stop, cmixParams.SendTimeout) jww.DEBUG.Printf("[SendCMIX-%s] sendToPreferred %s returned", cmixParams.DebugTag, firstGateway) diff --git a/network/sendCmixUtils.go b/network/sendCmixUtils.go index 958ee44004fcb418eced68f56b73aa1b98cc3115..07a0c8884dd616e0182213298b8c6c0b675e9fb4 100644 --- a/network/sendCmixUtils.go +++ b/network/sendCmixUtils.go @@ -23,6 +23,7 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" + "gitlab.com/xx_network/primitives/netTime" "strconv" "strings" "time" @@ -246,3 +247,20 @@ func ephemeralIdListToString(idList []ephemeral.Id) string { return strings.Join(idStrings, ",") } + +func calculateSendTimeout(best *pb.RoundInfo, max time.Duration) time.Duration { + RoundStartTime := time.Unix(0, + int64(best.Timestamps[states.QUEUED])) + // 250ms AFTER the round starts to hear the response. + timeout := RoundStartTime.Sub( + netTime.Now().Add(250 * time.Millisecond)) + if timeout > max { + timeout = max + } + // time.Duration is a signed int, so check for negative + if timeout < 0 { + // TODO: should this produce a warning? + timeout = 100 * time.Millisecond + } + return timeout +} diff --git a/network/sendCmix_test.go b/network/sendCmix_test.go index c45f8b38cdc28e3315b0f210097349e9a88948e3..faaed0fac7799296f15bbc0bd34dffa2c20231fe 100644 --- a/network/sendCmix_test.go +++ b/network/sendCmix_test.go @@ -109,7 +109,7 @@ func Test_attemptSendCmix(t *testing.T) { t.Errorf("%+v", errors.New(err.Error())) return } - m := message2.NewManager(i, params.Network{Messages: params.Messages{ + m := message2.NewPickup(i, params.Network{Messages: params.Messages{ MessageReceptionBuffLen: 20, MessageReceptionWorkerPoolSize: 20, MaxChecksRetryMessage: 20, diff --git a/network/sendManyCmix.go b/network/sendManyCmix.go index 872bacb7eac85b536330ca417b9b742b7fd650be..9e03583b4bb0ecaadf7a2f6c7e73096af8ef7a0e 100644 --- a/network/sendManyCmix.go +++ b/network/sendManyCmix.go @@ -15,6 +15,7 @@ import ( "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/network/gateway" + "gitlab.com/elixxir/client/network/nodes" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" pb "gitlab.com/elixxir/comms/mixmessages" @@ -35,13 +36,13 @@ import ( // with this call and can leak data about yourself. Returns the round ID of the // round the payload was sent or an error if it fails. // WARNING: Potentially Unsafe -func (m *message2.manager) SendManyCMIX(sender *gateway.Sender, +func (m *manager) SendManyCMIX(sender *gateway.Sender, messages []message.TargetedCmixMessage, p params.CMIX, stop *stoppable.Single) (id.Round, []ephemeral.Id, error) { - return sendManyCmixHelper(sender, messages, p, m.blacklistedNodes, - m.Instance, m.Session, m.nodeRegistration, m.Rng, m.Internal.Events, - m.TransmissionID, m.Comms, stop) + return sendManyCmixHelper(sender, messages, p, + m.instance, m.session, m.Registrar, m.rng, m.events, + m.session.GetUser().TransmissionID, m.comms, stop) } // sendManyCmixHelper is a helper function for manager.SendManyCMIX. @@ -56,9 +57,8 @@ func (m *message2.manager) SendManyCMIX(sender *gateway.Sender, // which can be registered with the network instance to get a callback on its // status. func sendManyCmixHelper(sender *gateway.Sender, - msgs []message.TargetedCmixMessage, param params.CMIX, - blacklistedNodes map[string]interface{}, instance *network.Instance, - session *storage.Session, nodeRegistration chan network.NodeGateway, + msgs []message.TargetedCmixMessage, param params.CMIX, instance *network.Instance, + session *storage.Session, registrar nodes.Registrar, rng *fastRNG.StreamGenerator, events interfaces.EventManager, senderId *id.ID, comms SendCmixCommsInterface, stop *stoppable.Single) ( id.Round, []ephemeral.Id, error) { @@ -109,10 +109,14 @@ func sendManyCmixHelper(sender *gateway.Sender, // Determine whether the selected round contains any nodes that are // blacklisted by the params.Network object containsBlacklisted := false - for _, nodeId := range bestRound.Topology { - if _, isBlacklisted := blacklistedNodes[string(nodeId)]; isBlacklisted { - containsBlacklisted = true - break + if param.BlacklistedNodes != nil { + for _, nodeId := range bestRound.Topology { + nid := &id.ID{} + copy(nid[:], nodeId) + if _, isBlacklisted := param.BlacklistedNodes[*nid]; isBlacklisted { + containsBlacklisted = true + break + } } } if containsBlacklisted { @@ -122,8 +126,8 @@ func sendManyCmixHelper(sender *gateway.Sender, } // Retrieve host and key information from round - firstGateway, roundKeys, err := processRound(instance, session, - nodeRegistration, bestRound, recipientString, msgDigests) + firstGateway, roundKeys, err := processRound( + registrar, bestRound, recipientString, msgDigests) if err != nil { jww.INFO.Printf("[SendManyCMIX-%s]error processing round: %v", param.DebugTag, err) jww.WARN.Printf("[SendManyCMIX-%s]SendManyCMIX failed to process round %d "+ @@ -178,8 +182,8 @@ func sendManyCmixHelper(sender *gateway.Sender, result, err := comms.SendPutManyMessages( host, wrappedMessage, timeout) if err != nil { - err := handlePutMessageError(firstGateway, instance, - session, nodeRegistration, recipientString, bestRound, err) + err := handlePutMessageError(firstGateway, registrar, + recipientString, bestRound, err) return result, errors.WithMessagef(err, "SendManyCMIX %s (via %s): %s", target, host, unrecoverableError) @@ -215,7 +219,6 @@ func sendManyCmixHelper(sender *gateway.Sender, "in round %d", param.DebugTag, ephemeralIDsString, recipientString, bestRound.ID) jww.INFO.Print(m) events.Report(1, "MessageSendMany", "Metric", m) - trackNetworkRateLimit(uint32(len(msgs)), session) return id.Round(bestRound.ID), ephemeralIDs, nil } else { jww.FATAL.Panicf("Gateway %s returned no error, but failed to "+ diff --git a/network/sendManyCmix_test.go b/network/sendManyCmix_test.go index 8d5cb338787fb5be661625409fe9384a149ecf2f..045dff60fdb0ba76906faa09d63acab26ab77d10 100644 --- a/network/sendManyCmix_test.go +++ b/network/sendManyCmix_test.go @@ -109,7 +109,7 @@ func Test_attemptSendManyCmix(t *testing.T) { t.Errorf("%+v", errors.New(err.Error())) return } - m := message2.NewManager(i, params.Network{Messages: params.Messages{ + m := message2.NewPickup(i, params.Network{Messages: params.Messages{ MessageReceptionBuffLen: 20, MessageReceptionWorkerPoolSize: 20, MaxChecksRetryMessage: 20, diff --git a/single/manager_test.go b/single/manager_test.go index acffc2f62eca15feaf1c1305cbf084cdbf0e4c64..ca93b42b27c1bf58cd38309878b137a24433bc21 100644 --- a/single/manager_test.go +++ b/single/manager_test.go @@ -51,7 +51,7 @@ func Test_newManager(t *testing.T) { if e.client != m.client || e.store != m.store || e.net != m.net || e.rng != m.rng || !reflect.DeepEqual(e.p, m.p) { - t.Errorf("NewManager() did not return the expected new Manager."+ + t.Errorf("NewPickup() did not return the expected new Manager."+ "\nexpected: %+v\nreceived: %+v", e, m) } }