From b9d5198d2ab5069a36861ffd392b36b78c3a2d13 Mon Sep 17 00:00:00 2001 From: Benjamin Wenger <ben@elixxir.ioo> Date: Tue, 22 Mar 2022 13:32:45 -0700 Subject: [PATCH] continued refactor --- cmd/root.go | 12 +- interfaces/networkManager.go | 3 +- network/manager.go | 8 +- network/message/fingerprints.go | 20 ++-- network/message/fingerprints_test.go | 28 ++--- network/message/{garbled.go => inProgress.go} | 52 ++++----- network/message/manager.go | 66 ++++++++--- .../message}/meteredCmixMessageBuffer.go | 110 +++++++++++------- .../message}/meteredCmixMessageBuffer_test.go | 15 +-- network/message/triggers.go | 12 +- network/nodes/registrar.go | 13 +++ network/{message => }/sendCmix.go | 30 ++--- network/{message => }/sendCmixUtils.go | 35 +++--- network/{message => }/sendCmix_test.go | 17 +-- network/{message => }/sendManyCmix.go | 8 +- network/{message => }/sendManyCmix_test.go | 17 +-- storage/partition/multiPartMessage_test.go | 2 +- storage/utility/cmixMessageBuffer_test.go | 4 +- storage/utility/e2eMessageBuffer_test.go | 4 +- storage/utility/knownRounds.go | 4 +- storage/utility/messageBuffer.go | 58 ++++----- storage/utility/messageBuffer_test.go | 6 +- 22 files changed, 288 insertions(+), 236 deletions(-) rename network/message/{garbled.go => inProgress.go} (50%) rename {storage/utility => network/message}/meteredCmixMessageBuffer.go (64%) rename {storage/utility => network/message}/meteredCmixMessageBuffer_test.go (94%) rename network/{message => }/sendCmix.go (90%) rename network/{message => }/sendCmixUtils.go (88%) rename network/{message => }/sendCmix_test.go (88%) rename network/{message => }/sendManyCmix.go (97%) rename network/{message => }/sendManyCmix_test.go (88%) diff --git a/cmd/root.go b/cmd/root.go index 22615567e..c34ab06cc 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -311,7 +311,7 @@ var rootCmd = &cobra.Command{ " took %d seconds", scnt) } - // Delete this recipient + // DeleteFingerprint this recipient if viper.GetBool("delete-channel") { deleteChannel(client, recipientID) } @@ -1122,27 +1122,27 @@ func init() { rootCmd.Flags().Lookup("accept-channel")) rootCmd.PersistentFlags().Bool("delete-channel", false, - "Delete the channel information for the corresponding recipient ID") + "DeleteFingerprint the channel information for the corresponding recipient ID") viper.BindPFlag("delete-channel", rootCmd.PersistentFlags().Lookup("delete-channel")) rootCmd.PersistentFlags().Bool("delete-receive-requests", false, - "Delete the all received contact requests.") + "DeleteFingerprint the all received contact requests.") viper.BindPFlag("delete-receive-requests", rootCmd.PersistentFlags().Lookup("delete-receive-requests")) rootCmd.PersistentFlags().Bool("delete-sent-requests", false, - "Delete the all sent contact requests.") + "DeleteFingerprint the all sent contact requests.") viper.BindPFlag("delete-sent-requests", rootCmd.PersistentFlags().Lookup("delete-sent-requests")) rootCmd.PersistentFlags().Bool("delete-all-requests", false, - "Delete the all contact requests, both sent and received.") + "DeleteFingerprint the all contact requests, both sent and received.") viper.BindPFlag("delete-all-requests", rootCmd.PersistentFlags().Lookup("delete-all-requests")) rootCmd.PersistentFlags().Bool("delete-request", false, - "Delete the request for the specified ID given by the "+ + "DeleteFingerprint the request for the specified ID given by the "+ "destfile flag's contact file.") viper.BindPFlag("delete-request", rootCmd.PersistentFlags().Lookup("delete-request")) diff --git a/interfaces/networkManager.go b/interfaces/networkManager.go index f6feba860..c950e11e3 100644 --- a/interfaces/networkManager.go +++ b/interfaces/networkManager.go @@ -138,6 +138,7 @@ type Identity struct { EphId ephemeral.Id Source *id.ID } + type IdentityParams struct { AddressSize uint8 @@ -184,7 +185,7 @@ type MessageProcessor interface { // myPrivKey *cyclic.Int, partnerSIDHPubKey *sidh.PublicKey, // mySIDHPrivKey *sidh.PrivateKey, // sendParams, receiveParams params.E2ESessionParams) -// GetPartner(partnerID *id.ID) (*Manager, error) +// GetPartner(partnerID *id.ID) (*manager, error) // DeletePartner(partnerId *id.ID) // GetAllPartnerIDs() []*id.ID //} diff --git a/network/manager.go b/network/manager.go index 790e00833..8831f7f19 100644 --- a/network/manager.go +++ b/network/manager.go @@ -45,7 +45,7 @@ import ( // fake identity. const fakeIdentityRange = 800 -// Manager implements the NetworkManager interface inside context. It +// manager implements the NetworkManager interface inside context. It // controls access to network resources and implements all the communications // functions used by the client. type manager struct { @@ -59,7 +59,7 @@ type manager struct { //sub-managers round *rounds.Manager - message *message.Manager + message *message.manager // Earliest tracked round earliestRound *uint64 @@ -160,7 +160,7 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, // - Message Retrieval Worker Group (/network/rounds/retrieve.go) // - Message Handling Worker Group (/network/message/handle.go) // - Health Tracker (/network/health) -// - Garbled Messages (/network/message/garbled.go) +// - Garbled Messages (/network/message/inProgress.go) // - Critical Messages (/network/message/critical.go) // - Ephemeral ID tracking (network/ephemeral/tracker.go) func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppable, error) { @@ -177,7 +177,7 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab multi.Add(nodes.StartRegistration(m.GetSender(), m.Session, m.Rng, m.Comms, m.NodeRegistration, m.param.ParallelNodeRegistrations)) // Adding/MixCypher //TODO-remover - //m.runners.Add(StartNodeRemover(m.Context)) // Removing + //m.runners.AddFingerprint(StartNodeRemover(m.Context)) // Removing // Start the Network Tracker trackNetworkStopper := stoppable.NewSingle("TrackNetwork") diff --git a/network/message/fingerprints.go b/network/message/fingerprints.go index fa4091163..35ddd9546 100644 --- a/network/message/fingerprints.go +++ b/network/message/fingerprints.go @@ -23,8 +23,8 @@ type FingerprintsManager struct { sync.Mutex } -// NewFingerprints is a constructor function for the Fingerprints tracker. -func NewFingerprints() *FingerprintsManager { +// newFingerprints is a constructor function for the Fingerprints tracker. +func newFingerprints() *FingerprintsManager { return &FingerprintsManager{ fpMap: make(map[id.ID]map[format.Fingerprint]interfaces.MessageProcessor), } @@ -57,7 +57,7 @@ func (f *FingerprintsManager) pop(clientID *id.ID, // map. AddFingerprint maps the given fingerprint key to the processor // value. If there is already an entry for this fingerprint, the // method returns with no write operation. -func (f *FingerprintsManager) Add(clientID *id.ID, +func (f *FingerprintsManager) AddFingerprint(clientID *id.ID, fingerprint format.Fingerprint, mp interfaces.MessageProcessor) error { f.Lock() @@ -79,9 +79,9 @@ func (f *FingerprintsManager) Add(clientID *id.ID, return nil } -// Delete is a thread-safe deletion operation on the Fingerprints map. +// DeleteFingerprint is a thread-safe deletion operation on the Fingerprints map. // It will remove the entry for the given fingerprint from the map. -func (f *FingerprintsManager) Delete(clientID *id.ID, +func (f *FingerprintsManager) DeleteFingerprint(clientID *id.ID, fingerprint format.Fingerprint) { f.Lock() defer f.Unlock() @@ -98,14 +98,10 @@ func (f *FingerprintsManager) Delete(clientID *id.ID, } } -// DeleteClient is a thread-safe deletion operation on the Fingerprints map. +// DeleteClientFingerprints is a thread-safe deletion operation on the Fingerprints map. // It will remove all entres for the given clientID from the map. -func (f *FingerprintsManager) DeleteClient(clientID *id.ID, - fingerprint format.Fingerprint) { +func (f *FingerprintsManager) DeleteClientFingerprints(clientID *id.ID) { f.Lock() defer f.Unlock() - - cid := *clientID - - delete(f.fpMap, cid) + delete(f.fpMap, *clientID) } diff --git a/network/message/fingerprints_test.go b/network/message/fingerprints_test.go index 597a54f44..eca547874 100644 --- a/network/message/fingerprints_test.go +++ b/network/message/fingerprints_test.go @@ -24,7 +24,7 @@ func TestNewFingerprints(t *testing.T) { RWMutex: sync.RWMutex{}, } - received := NewFingerprints() + received := newFingerprints() if !reflect.DeepEqual(expected, received) { t.Fatalf("NewFingerprint error: Did not construct expected object."+ @@ -36,7 +36,7 @@ func TestNewFingerprints(t *testing.T) { // Unit test. func TestFingerprints_Get(t *testing.T) { // Construct fingerprint map - fpTracker := NewFingerprints() + fpTracker := newFingerprints() // Construct fingerprint and processor values fp := format.NewFingerprint([]byte("test")) @@ -65,7 +65,7 @@ func TestFingerprints_Get(t *testing.T) { // Unit test. func TestFingerprints_AddFingerprint(t *testing.T) { // Construct fingerprint map - fpTracker := NewFingerprints() + fpTracker := newFingerprints() // Construct fingerprint and processor values fp := format.NewFingerprint([]byte("test")) @@ -77,14 +77,14 @@ func TestFingerprints_AddFingerprint(t *testing.T) { // Check that the fingerprint key has a map entry received, exists := fpTracker.fingerprints[fp] if !exists { - t.Fatalf("AddFingerprint did not write to map as expected. "+ + t.Fatalf("Add did not write to map as expected. "+ "Fingerprint %s not found in map", fp) } // Check that received value contains the expected data expected := newProcessor(mp) if !reflect.DeepEqual(received, expected) { - t.Fatalf("AddFingerprint error: Map does not contain expected data."+ + t.Fatalf("Add error: Map does not contain expected data."+ "\nExpected: %v"+ "\nReceived: %v", expected, received) } @@ -93,7 +93,7 @@ func TestFingerprints_AddFingerprint(t *testing.T) { // Unit test. func TestFingerprints_AddFingerprints(t *testing.T) { // Construct fingerprints map - fpTracker := NewFingerprints() + fpTracker := newFingerprints() // Construct slices of fingerprints and processors numTests := 100 @@ -110,19 +110,19 @@ func TestFingerprints_AddFingerprints(t *testing.T) { // Add slices to map err := fpTracker.AddFingerprints(fingerprints, processors) if err != nil { - t.Fatalf("AddFingerprints unexpected error: %v", err) + t.Fatalf("Adds unexpected error: %v", err) } // Make sure every fingerprint is mapped to it's expected processor for i, expected := range fingerprints { received, exists := fpTracker.fingerprints[expected] if !exists { - t.Errorf("AddFingerprints did not write to map as expected. "+ + t.Errorf("Adds did not write to map as expected. "+ "Fingerprint number %d (value: %s) not found in map", i, expected) } if !reflect.DeepEqual(received, expected) { - t.Fatalf("AddFingerprints error: Map does not contain expected data for "+ + t.Fatalf("Adds error: Map does not contain expected data for "+ "fingerprint number %d."+ "\nExpected: %v"+ "\nReceived: %v", i, expected, received) @@ -135,7 +135,7 @@ func TestFingerprints_AddFingerprints(t *testing.T) { // slices of different lengths. func TestFingerprints_AddFingerprints_Error(t *testing.T) { // Construct fingerprint map - fpTracker := NewFingerprints() + fpTracker := newFingerprints() // Construct 2 slices of different lengths fingerprints := []format.Fingerprint{ @@ -150,7 +150,7 @@ func TestFingerprints_AddFingerprints_Error(t *testing.T) { // Attempt to add fingerprints err := fpTracker.AddFingerprints(fingerprints, processors) if err == nil { - t.Fatalf("AddFingerprints should have received an error with mismatched " + + t.Fatalf("Add should have received an error with mismatched " + "slices length") } @@ -159,7 +159,7 @@ func TestFingerprints_AddFingerprints_Error(t *testing.T) { func TestFingerprints_RemoveFingerprint(t *testing.T) { // Construct fingerprint map - fpTracker := NewFingerprints() + fpTracker := newFingerprints() // Construct fingerprint and processor values fp := format.NewFingerprint([]byte("test")) @@ -181,7 +181,7 @@ func TestFingerprints_RemoveFingerprint(t *testing.T) { // Unit test. func TestFingerprints_RemoveFingerprints(t *testing.T) { // Construct fingerprints map - fpTracker := NewFingerprints() + fpTracker := newFingerprints() // Construct slices of fingerprints and processors numTests := 100 @@ -198,7 +198,7 @@ func TestFingerprints_RemoveFingerprints(t *testing.T) { // Add slices to map err := fpTracker.AddFingerprints(fingerprints, processors) if err != nil { - t.Fatalf("AddFingerprints unexpected error: %v", err) + t.Fatalf("Add unexpected error: %v", err) } fpTracker.RemoveFingerprints(fingerprints) diff --git a/network/message/garbled.go b/network/message/inProgress.go similarity index 50% rename from network/message/garbled.go rename to network/message/inProgress.go index 7295eaf08..da4c8fea8 100644 --- a/network/message/garbled.go +++ b/network/message/inProgress.go @@ -8,65 +8,61 @@ package message import ( - "fmt" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" - "gitlab.com/xx_network/primitives/id/ephemeral" - "gitlab.com/xx_network/primitives/netTime" - "time" ) // Messages can arrive in the network out of order. When message handling fails // to decrypt a message, it is added to the garbled message buffer (which is // stored on disk) and the message decryption is retried here whenever triggered. -// This can be triggered through the CheckGarbledMessages on the network manager +// This can be triggered through the CheckInProgressMessages on the network pickup // and is used in the /keyExchange package on successful rekey triggering -// Triggers Garbled message checking if the queue is not full -// Exposed on the network manager -func (m *Manager) CheckGarbledMessages() { +// CheckInProgressMessages triggers rechecking all in progress messages +// if the queue is not full Exposed on the network pickup +func (m *pickup) CheckInProgressMessages() { select { - case m.triggerGarbled <- struct{}{}: + case m.checkInProgress <- struct{}{}: default: jww.WARN.Println("Failed to check garbled messages " + "due to full channel") } } -//long running thread which processes garbled messages -func (m *Manager) processGarbledMessages(stop *stoppable.Single) { +//long running thread which processes messages that need to be checked +func (m *pickup) recheckInProgressRunner(stop *stoppable.Single) { for { select { case <-stop.Quit(): stop.ToStopped() return - case <-m.triggerGarbled: + case <-m.checkInProgress: jww.INFO.Printf("[GARBLE] Checking Garbled messages") - m.handleGarbledMessages() + m.recheckInProgress() } } } -//handler for a single run of garbled messages -func (m *Manager) handleGarbledMessages() { +//handler for a single run of recheck messages +func (m *pickup) recheckInProgress() { //try to decrypt every garbled message, excising those who's counts are too high - for grbldMsg, count, timestamp, has := m.garbledStore.Next(); has; grbldMsg, count, timestamp, has = m.garbledStore.Next() { - //if it exists, check against all in the list - grbldContents := grbldMsg.GetContents() - identity := m.session.GetUser().ReceptionID + for grbldMsg, ri, identity, has := m.inProcess.Next(); has; grbldMsg, ri, identity, has = m.inProcess.Next() { + bundle := Bundle{ + Round: id.Round(ri.ID), + RoundInfo: ri, + Messages: []format.Message{grbldMsg}, + Finish: func() {}, + Identity: identity, + } + select { + case m.messageReception <- bundle: + default: + jww.WARN.Printf("failed to send bundle, channel full") - // fail the message if any part of the decryption fails, - // unless it is the last attempts and has been in the buffer long - // enough, in which case remove it - if count == m.param.MaxChecksGarbledMessage && - netTime.Since(timestamp) > m.param.GarbledMessageWait { - garbledMsgs.Remove(grbldMsg) - } else { - failedMsgs = append(failedMsgs, grbldMsg) } + } } diff --git a/network/message/manager.go b/network/message/manager.go index 75918d941..1da8c8bee 100644 --- a/network/message/manager.go +++ b/network/message/manager.go @@ -11,9 +11,14 @@ import ( "encoding/base64" "fmt" "gitlab.com/elixxir/client/interfaces" + network2 "gitlab.com/elixxir/client/network" + "gitlab.com/elixxir/client/network/nodes" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/rateLimiting" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/params" @@ -23,10 +28,26 @@ import ( ) const ( - garbledMessagesKey = "GarbledMessages" + inProcessKey = "InProcessMessagesKey" ) -type Manager struct { +type Pickup interface { + GetMessageReceptionChannel() chan<- Bundle + StartProcessies() stoppable.Stoppable + CheckInProgressMessages() + + //Fingerprints + AddFingerprint(clientID *id.ID, fingerprint format.Fingerprint, mp interfaces.MessageProcessor) error + DeleteFingerprint(clientID *id.ID, fingerprint format.Fingerprint) + DeleteClientFingerprints(clientID *id.ID) + + //Triggers + AddTrigger(clientID *id.ID, newTrigger interfaces.Trigger, response interfaces.MessageProcessor) + DeleteTriggers(clientID *id.ID, preimage interfaces.Preimage, response interfaces.MessageProcessor) error + DeleteClientTriggers(clientID *id.ID) +} + +type pickup struct { param params.Network sender *gateway.Sender blacklistedNodes map[string]interface{} @@ -34,41 +55,50 @@ type Manager struct { messageReception chan Bundle nodeRegistration chan network.NodeGateway networkIsHealthy chan bool - triggerGarbled chan struct{} + checkInProgress chan struct{} - garbledStore *utility.MeteredCmixMessageBuffer + inProcess *MeteredCmixMessageBuffer - rng *fastRNG.StreamGenerator - events interfaces.EventManager - comms SendCmixCommsInterface - session *storage.Session + rng *fastRNG.StreamGenerator + events interfaces.EventManager + comms network2.SendCmixCommsInterface + session *storage.Session + nodes nodes.Registrar + instance *network.Instance FingerprintsManager TriggersManager + + //sending rate limit tracker + rateLimitBucket *rateLimiting.Bucket + rateLimitParams utility.BucketParamStore } func NewManager(param params.Network, nodeRegistration chan network.NodeGateway, sender *gateway.Sender, session *storage.Session, rng *fastRNG.StreamGenerator, - events interfaces.EventManager, comms SendCmixCommsInterface) *Manager { + events interfaces.EventManager, comms network2.SendCmixCommsInterface, + nodes nodes.Registrar, instance *network.Instance) Pickup { - garbled, err := utility.NewOrLoadMeteredCmixMessageBuffer(session.GetKV(), garbledMessagesKey) + garbled, err := NewOrLoadMeteredCmixMessageBuffer(session.GetKV(), inProcessKey) if err != nil { jww.FATAL.Panicf("Failed to load or new the Garbled Messages system") } - m := Manager{ + m := pickup{ param: param, messageReception: make(chan Bundle, param.MessageReceptionBuffLen), networkIsHealthy: make(chan bool, 1), - triggerGarbled: make(chan struct{}, 100), + checkInProgress: make(chan struct{}, 100), nodeRegistration: nodeRegistration, sender: sender, - garbledStore: garbled, + inProcess: garbled, rng: rng, events: events, comms: comms, session: session, + nodes: nodes, + instance: instance, } for _, nodeId := range param.BlacklistedNodes { decodedId, err := base64.StdEncoding.DecodeString(nodeId) @@ -79,18 +109,18 @@ func NewManager(param params.Network, m.blacklistedNodes[string(decodedId)] = nil } - m.FingerprintsManager = *NewFingerprints() + m.FingerprintsManager = *newFingerprints() m.TriggersManager = *NewTriggers() return &m } //Gets the channel to send received messages on -func (m *Manager) GetMessageReceptionChannel() chan<- Bundle { +func (m *pickup) GetMessageReceptionChannel() chan<- Bundle { return m.messageReception } //Starts all worker pool -func (m *Manager) StartProcessies() stoppable.Stoppable { +func (m *pickup) StartProcessies() stoppable.Stoppable { multi := stoppable.NewMulti("MessageReception") //create the message handler workers @@ -100,9 +130,9 @@ func (m *Manager) StartProcessies() stoppable.Stoppable { multi.Add(stop) } - //create the garbled messages thread + //create the in progress messages thread garbledStop := stoppable.NewSingle("GarbledMessages") - go m.processGarbledMessages(garbledStop) + go m.recheckInProgressRunner(garbledStop) multi.Add(garbledStop) return multi diff --git a/storage/utility/meteredCmixMessageBuffer.go b/network/message/meteredCmixMessageBuffer.go similarity index 64% rename from storage/utility/meteredCmixMessageBuffer.go rename to network/message/meteredCmixMessageBuffer.go index 1061e2a3c..de7008d58 100644 --- a/storage/utility/meteredCmixMessageBuffer.go +++ b/network/message/meteredCmixMessageBuffer.go @@ -5,15 +5,18 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package utility +package message import ( "encoding/json" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/interfaces" + "gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/versioned" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/primitives/format" + "gitlab.com/elixxir/primitives/states" "gitlab.com/xx_network/primitives/netTime" "golang.org/x/crypto/blake2b" "google.golang.org/protobuf/proto" @@ -27,6 +30,7 @@ type meteredCmixMessageHandler struct{} type meteredCmixMessage struct { M []byte Ri []byte + Identity []byte Count uint Timestamp time.Time } @@ -49,7 +53,7 @@ func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, k } // Save versioned object - return kv.Set(key, currentMessageBufferVersion, &obj) + return kv.Set(key, utility.CurrentMessageBufferVersion, &obj) } // LoadMessage returns the message with the specified key from the key value @@ -79,13 +83,14 @@ func (*meteredCmixMessageHandler) DeleteMessage(kv *versioned.KV, key string) er } // HashMessage generates a hash of the message. -func (*meteredCmixMessageHandler) HashMessage(m interface{}) MessageHash { +func (*meteredCmixMessageHandler) HashMessage(m interface{}) utility.MessageHash { h, _ := blake2b.New256(nil) h.Write(m.(meteredCmixMessage).M) h.Write(m.(meteredCmixMessage).Ri) + h.Write(m.(meteredCmixMessage).Identity) - var messageHash MessageHash + var messageHash utility.MessageHash copy(messageHash[:], h.Sum(nil)) return messageHash @@ -94,13 +99,13 @@ func (*meteredCmixMessageHandler) HashMessage(m interface{}) MessageHash { // CmixMessageBuffer wraps the message buffer to store and load raw cmix // messages. type MeteredCmixMessageBuffer struct { - mb *MessageBuffer + mb *utility.MessageBuffer kv *versioned.KV key string } func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { - mb, err := NewMessageBuffer(kv, &meteredCmixMessageHandler{}, key) + mb, err := utility.NewMessageBuffer(kv, &meteredCmixMessageHandler{}, key) if err != nil { return nil, err } @@ -109,7 +114,7 @@ func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMess } func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { - mb, err := LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key) + mb, err := utility.LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key) if err != nil { return nil, err } @@ -118,7 +123,7 @@ func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMes } func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { - mb, err := LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key) + mb, err := utility.LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key) if err != nil { jww.WARN.Printf("Failed to find MeteredCmixMessageBuffer %s, making a new one", key) return NewMeteredCmixMessageBuffer(kv, key) @@ -127,53 +132,44 @@ func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCm return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil } -func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo) { +func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) (uint, time.Time) { if m.GetPrimeByteLen() == 0 { jww.FATAL.Panicf("Cannot handle a metered " + "cmix message with a length of 0") } - riMarshal, err := proto.Marshal(ri) - if err != nil { - jww.FATAL.Panicf("Failed to marshal round info") - } - msg := meteredCmixMessage{ - M: m.Marshal(), - Ri: riMarshal, - Count: 0, - Timestamp: netTime.Now(), - } - mcmb.mb.Add(msg) + msg := buildMsg(m, ri, identity) + addedMsgFace := mcmb.mb.Add(msg) + addedMessage := addedMsgFace.(meteredCmixMessage) + + return addedMessage.Count, addedMessage.Timestamp } -func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message, ri *pb.RoundInfo) { - riMarshal, err := proto.Marshal(ri) - if err != nil { - jww.FATAL.Panicf("Failed to marshal round info") +func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) (uint, time.Time) { + if m.GetPrimeByteLen() == 0 { + jww.FATAL.Panicf("Cannot handle a metered " + + "cmix message with a length of 0") } - msg := meteredCmixMessage{ - M: m.Marshal(), - Ri: riMarshal, - Count: 0, - Timestamp: netTime.Now(), - } - mcmb.mb.AddProcessing(msg) + msg := buildMsg(m, ri, identity) + addedMsgFace := mcmb.mb.AddProcessing(msg) + addedMessage := addedMsgFace.(meteredCmixMessage) + + return addedMessage.Count, addedMessage.Timestamp } -func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, uint, time.Time, bool) { +func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, interfaces.Identity, bool) { m, ok := mcmb.mb.Next() if !ok { - return format.Message{}, nil, 0, time.Time{}, false + return format.Message{}, nil, interfaces.Identity{}, false } msg := m.(meteredCmixMessage) - rtnCnt := msg.Count // increment the count and save msg.Count++ mcmh := &meteredCmixMessageHandler{} - err := mcmh.SaveMessage(mcmb.kv, msg, makeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg))) + err := mcmh.SaveMessage(mcmb.kv, msg, utility.MakeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg))) if err != nil { jww.FATAL.Panicf("Failed to save metered message after count "+ "update: %s", err) @@ -187,15 +183,47 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, uin ri := &pb.RoundInfo{} err = proto.Unmarshal(msg.Ri, ri) - jww.FATAL.Panicf("Failed to unmarshal round info from msg format") + if err != nil { + jww.FATAL.Panicf("Failed to unmarshal round info from msg format") + } + + identity := interfaces.Identity{} + err = json.Unmarshal(msg.Identity, &identity) + if err != nil { + jww.FATAL.Panicf("Failed to unmarshal identity from msg format") + } + + return msfFormat, ri, identity, true +} - return msfFormat, ri, rtnCnt, msg.Timestamp, true +func (mcmb *MeteredCmixMessageBuffer) Remove(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) { + mcmb.mb.Succeeded(buildMsg(m, ri, identity)) } -func (mcmb *MeteredCmixMessageBuffer) Remove(m format.Message) { - mcmb.mb.Succeeded(meteredCmixMessage{M: m.Marshal()}) +func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) { + mcmb.mb.Failed(buildMsg(m, ri, identity)) } -func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message) { - mcmb.mb.Failed(meteredCmixMessage{M: m.Marshal()}) +func buildMsg(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) meteredCmixMessage { + if m.GetPrimeByteLen() == 0 { + jww.FATAL.Panicf("Cannot handle a metered " + + "cmix message with a length of 0") + } + riMarshal, err := proto.Marshal(ri) + if err != nil { + jww.FATAL.Panicf("Failed to marshal round info") + } + + identityMarshal, err := json.Marshal(&identity) + if err != nil { + jww.FATAL.Panicf("Failed to marshal identity") + } + + return meteredCmixMessage{ + M: m.Marshal(), + Ri: riMarshal, + Identity: identityMarshal, + Count: 0, + Timestamp: time.Unix(0, int64(ri.Timestamps[states.QUEUED])), + } } diff --git a/storage/utility/meteredCmixMessageBuffer_test.go b/network/message/meteredCmixMessageBuffer_test.go similarity index 94% rename from storage/utility/meteredCmixMessageBuffer_test.go rename to network/message/meteredCmixMessageBuffer_test.go index be5a937d3..f1c46b8f8 100644 --- a/storage/utility/meteredCmixMessageBuffer_test.go +++ b/network/message/meteredCmixMessageBuffer_test.go @@ -5,11 +5,12 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package utility +package message import ( "bytes" "encoding/json" + "gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/ekv" "gitlab.com/elixxir/primitives/format" @@ -27,7 +28,7 @@ func Test_meteredCmixMessageHandler_SaveMessage(t *testing.T) { testMsgs, _ := makeTestMeteredCmixMessage(10) for _, msg := range testMsgs { - key := makeStoredMessageKey("testKey", mcmh.HashMessage(msg)) + key := utility.MakeStoredMessageKey("testKey", mcmh.HashMessage(msg)) // Save message err := mcmh.SaveMessage(kv, msg, key) @@ -64,7 +65,7 @@ func Test_meteredCmixMessageHandler_LoadMessage(t *testing.T) { testMsgs, _ := makeTestMeteredCmixMessage(10) for i, msg := range testMsgs { - key := makeStoredMessageKey("testKey", mcmh.HashMessage(msg)) + key := utility.MakeStoredMessageKey("testKey", mcmh.HashMessage(msg)) // Save message if err := mcmh.SaveMessage(kv, msg, key); err != nil { @@ -96,7 +97,7 @@ func Test_meteredCmixMessageHandler_DeleteMessage(t *testing.T) { testMsgs, _ := makeTestMeteredCmixMessage(10) for _, msg := range testMsgs { - key := makeStoredMessageKey("testKey", mcmh.HashMessage(msg)) + key := utility.MakeStoredMessageKey("testKey", mcmh.HashMessage(msg)) // Save message err := mcmh.SaveMessage(kv, msg, key) @@ -130,7 +131,7 @@ func Test_meteredCmixMessageHandler_Smoke(t *testing.T) { "\n\texpected: %v\n\trecieved: %v", nil, err) } - // Add two messages + // AddFingerprint two messages mcmb.Add(testMsgs[0]) mcmb.Add(testMsgs[1]) @@ -187,10 +188,10 @@ func Test_meteredCmixMessageHandler_Smoke(t *testing.T) { // makeTestMeteredCmixMessage creates a list of messages with random data and the // expected map after they are added to the buffer. -func makeTestMeteredCmixMessage(n int) ([]meteredCmixMessage, map[MessageHash]struct{}) { +func makeTestMeteredCmixMessage(n int) ([]meteredCmixMessage, map[utility.MessageHash]struct{}) { mcmh := &meteredCmixMessageHandler{} prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) - mh := map[MessageHash]struct{}{} + mh := map[utility.MessageHash]struct{}{} msgs := make([]meteredCmixMessage, n) for i := range msgs { payload := make([]byte, 128) diff --git a/network/message/triggers.go b/network/message/triggers.go index 453c45cc5..16461f2c4 100644 --- a/network/message/triggers.go +++ b/network/message/triggers.go @@ -87,14 +87,14 @@ func (t *TriggersManager) get(clientID *id.ID, receivedIdentityFp, return nil, false } -// Add - Adds a trigger which can call a message +// AddTrigger - Adds a trigger which can call a message // handing function or be used for notifications. // Multiple triggers can be registered for the same preimage. // preimage - the preimage which is triggered on // type - a descriptive string of the trigger. Generally used in notifications // source - a byte buffer of related data. Generally used in notifications. // Example: Sender ID -func (t *TriggersManager) Add(clientID *id.ID, newTrigger interfaces.Trigger, +func (t *TriggersManager) AddTrigger(clientID *id.ID, newTrigger interfaces.Trigger, response interfaces.MessageProcessor) { t.Lock() defer t.Unlock() @@ -118,11 +118,11 @@ func (t *TriggersManager) Add(clientID *id.ID, newTrigger interfaces.Trigger, } -// Delete - If only a single response is associated with the preimage, +// DeleteTriggers - If only a single response is associated with the preimage, // the entire preimage is removed. If there is more than one response, only // the given response is removed. If nil is passed in for response, // all triggers for the preimage will be removed. -func (t *TriggersManager) Delete(clientID *id.ID, preimage interfaces.Preimage, +func (t *TriggersManager) DeleteTriggers(clientID *id.ID, preimage interfaces.Preimage, response interfaces.MessageProcessor) error { t.Lock() defer t.Unlock() @@ -162,8 +162,8 @@ func (t *TriggersManager) Delete(clientID *id.ID, preimage interfaces.Preimage, return nil } -// DeleteClient - delete the mapping associated with an ID -func (t *TriggersManager) DeleteClient(clientID *id.ID) { +// DeleteClientTriggers - delete the mapping associated with an ID +func (t *TriggersManager) DeleteClientTriggers(clientID *id.ID) { t.Lock() defer t.Unlock() diff --git a/network/nodes/registrar.go b/network/nodes/registrar.go index dec183a4c..3f23def58 100644 --- a/network/nodes/registrar.go +++ b/network/nodes/registrar.go @@ -32,9 +32,11 @@ var delayTable = [5]time.Duration{ type Registrar interface { StartProcesses(numParallel uint) stoppable.Stoppable Has(nid *id.ID) bool + Remove(nid *id.ID) GetKeys(topology *connect.Circuit) (MixCypher, error) NumRegistered() int GetInputChannel() chan<- network.NodeGateway + TriggerRegistration(nid *id.ID) } type RegisterNodeCommsInterface interface { @@ -112,6 +114,13 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { func (r *registrar) GetInputChannel() chan<- network.NodeGateway { return r.c } +func (r *registrar) TriggerRegistration(nid *id.ID) { + r.c <- network.NodeGateway{ + Node: ndf.Node{ID: nid.Marshal(), + //status must be active because it is in a round + Status: ndf.Active}, + } +} // GetKeys returns a MixCypher for the topology and a list of nodes it did // not have a key for. If there are missing keys, then returns nil MixCypher. @@ -156,6 +165,10 @@ func (r *registrar) Has(nid *id.ID) bool { return exists } +func (r *registrar) Remove(nid *id.ID) { + r.remove(nid) +} + // NumRegistered returns the number of registered nodes. func (r *registrar) NumRegistered() int { r.mux.RLock() diff --git a/network/message/sendCmix.go b/network/sendCmix.go similarity index 90% rename from network/message/sendCmix.go rename to network/sendCmix.go index 9faa51049..c8e01cc22 100644 --- a/network/message/sendCmix.go +++ b/network/sendCmix.go @@ -5,7 +5,7 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package message +package network import ( "fmt" @@ -14,6 +14,7 @@ import ( "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/network/gateway" + "gitlab.com/elixxir/client/network/nodes" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" pb "gitlab.com/elixxir/comms/mixmessages" @@ -34,13 +35,13 @@ import ( // WARNING: Potentially Unsafe // Public manager function to send a message over CMIX -func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, +func (m *message.manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient *id.ID, cmixParams params.CMIX, stop *stoppable.Single) (id.Round, ephemeral.Id, error) { msgCopy := msg.Copy() - return sendCmixHelper(sender, msgCopy, recipient, cmixParams, m.blacklistedNodes, m.Instance, - m.Session, m.nodeRegistration, m.Rng, m.Internal.Events, + return sendCmixHelper(sender, msgCopy, recipient, cmixParams, m.blacklistedNodes, m.instance, + m.session, m.nodeRegistration, m.rng, m.Internal.Events, m.TransmissionID, m.Comms, stop) } @@ -72,7 +73,7 @@ func calculateSendTimeout(best *pb.RoundInfo, max time.Duration) time.Duration { // its status func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID, cmixParams params.CMIX, blacklistedNodes map[string]interface{}, instance *network.Instance, - session *storage.Session, nodeRegistration chan network.NodeGateway, + session *storage.Session, nodes nodes.Registrar, rng *fastRNG.StreamGenerator, events interfaces.EventManager, senderId *id.ID, comms SendCmixCommsInterface, stop *stoppable.Single) (id.Round, ephemeral.Id, error) { @@ -92,7 +93,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, stream := rng.GetStream() defer stream.Close() - grp := session.Cmix().GetGroup() + grp := session.GetCmixGroup() // flip leading bits randomly to thwart a tagging attack. // See SetGroupBits for more info @@ -147,7 +148,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, } // Retrieve host and key information from round - firstGateway, roundKeys, err := processRound(instance, session, nodeRegistration, bestRound, recipient.String(), msg.Digest()) + firstGateway, roundKeys, err := processRound(nodes, bestRound, recipient.String(), msg.Digest()) if err != nil { jww.WARN.Printf("[SendCMIX-%s]SendCmix failed to process round"+ " (will retry): %v", cmixParams.DebugTag, err) @@ -190,9 +191,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, jww.TRACE.Printf("[SendCMIX-%s]sendFunc %s putmsg", cmixParams.DebugTag, host) if err != nil { - // fixme: should we provide as a slice the whole topology? - err := handlePutMessageError(firstGateway, - instance, session, nodeRegistration, + err := handlePutMessageError(firstGateway, nodes, recipient.String(), bestRound, err) jww.TRACE.Printf("[SendCMIX-%s] sendFunc %s err %+v", cmixParams.DebugTag, host, err) @@ -238,7 +237,6 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, elapsed, numRoundTries) jww.INFO.Print(m) events.Report(1, "MessageSend", "Metric", m) - onSend(1, session) return id.Round(bestRound.ID), ephID, nil } else { jww.FATAL.Panicf("[SendCMIX-%s] Gateway %s returned no error, but failed "+ @@ -250,13 +248,3 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, return 0, ephemeral.Id{}, errors.New("failed to send the message, " + "unknown error") } - -// OnSend performs a bucket addition on a call to Manager.SendCMIX or -// Manager.SendManyCMIX, updating the bucket for the amount of messages sent. -func onSend(messages uint32, session *storage.Session) { - rateLimitingParam := session.GetBucketParams().Get() - session.GetBucket().AddWithExternalParams(messages, - rateLimitingParam.Capacity, rateLimitingParam.LeakedTokens, - rateLimitingParam.LeakDuration) - -} diff --git a/network/message/sendCmixUtils.go b/network/sendCmixUtils.go similarity index 88% rename from network/message/sendCmixUtils.go rename to network/sendCmixUtils.go index feb86c585..958ee4400 100644 --- a/network/message/sendCmixUtils.go +++ b/network/sendCmixUtils.go @@ -5,7 +5,7 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package message +package network import ( "github.com/pkg/errors" @@ -14,7 +14,6 @@ import ( "gitlab.com/elixxir/client/interfaces/params" preimage2 "gitlab.com/elixxir/client/interfaces/preimage" "gitlab.com/elixxir/client/network/nodes" - "gitlab.com/elixxir/client/storage" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" @@ -51,8 +50,7 @@ const unrecoverableError = "failed with an unrecoverable error" // boolean will be returned false. If the error is among recoverable errors, // then the boolean will return true. // recoverable means we should try resending to the round -func handlePutMessageError(firstGateway *id.ID, instance *network.Instance, - session *storage.Session, nodeRegistration chan network.NodeGateway, +func handlePutMessageError(firstGateway *id.ID, nodes nodes.Registrar, recipientString string, bestRound *pb.RoundInfo, err error) (returnErr error) { @@ -69,11 +67,9 @@ func handlePutMessageError(firstGateway *id.ID, instance *network.Instance, nodeID := firstGateway.DeepCopy() nodeID.SetType(id.Node) - // Delete the keys - session.Cmix().Remove(nodeID) - - // Trigger - go handleMissingNodeKeys(instance, nodeRegistration, []*id.ID{nodeID}) + // DeleteFingerprint the keys and re-register + nodes.Remove(nodeID) + nodes.TriggerRegistration(nodeID) return errors.WithMessagef(err, "Failed to send to [%s] via %s "+ "due to failed authentication, retrying...", @@ -86,9 +82,8 @@ func handlePutMessageError(firstGateway *id.ID, instance *network.Instance, // processRound is a helper function that determines the gateway to send to for // a round and retrieves the round keys. -func processRound(instance *network.Instance, session *storage.Session, - nodeRegistration chan network.NodeGateway, bestRound *pb.RoundInfo, - recipientString, messageDigest string) (*id.ID, *nodes.MixCypher, error) { +func processRound(nodes nodes.Registrar, bestRound *pb.RoundInfo, + recipientString, messageDigest string) (*id.ID, nodes.MixCypher, error) { // Build the topology idList, err := id.NewIDListFromBytes(bestRound.Topology) @@ -101,13 +96,9 @@ func processRound(instance *network.Instance, session *storage.Session, // get the keys for the round, reject if any nodes do not have keying // relationships - roundKeys, missingKeys := session.Cmix().GetRoundKeys(topology) - if len(missingKeys) > 0 { - go handleMissingNodeKeys(instance, nodeRegistration, missingKeys) - - return nil, nil, errors.Errorf("Failed to send on round %d to [%s] "+ - "(msgDigest(s): %s) due to missing relationships with nodes: %s", - bestRound.ID, recipientString, messageDigest, missingKeys) + roundKeys, err := nodes.GetKeys(topology) + if err != nil { + return nil, nil, errors.WithMessagef(err, "Failed to get keys for round %d", bestRound.ID) } // get the gateway to transmit to @@ -122,7 +113,7 @@ func processRound(instance *network.Instance, session *storage.Session, // the recipient. func buildSlotMessage(msg format.Message, recipient *id.ID, target *id.ID, stream *fastRNG.Stream, senderId *id.ID, bestRound *pb.RoundInfo, - roundKeys *nodes.MixCypher, param params.CMIX) (*pb.GatewaySlot, + mixCrypt nodes.MixCypher, param params.CMIX) (*pb.GatewaySlot, format.Message, ephemeral.Id, error) { @@ -169,7 +160,7 @@ func buildSlotMessage(msg format.Message, recipient *id.ID, target *id.ID, "Failed to generate salt, this should never happen") } - encMsg, kmacs := roundKeys.Encrypt(msg, salt, id.Round(bestRound.ID)) + encMsg, kmacs := mixCrypt.Encrypt(msg, salt, id.Round(bestRound.ID)) // Build the message payload msgPacket := &pb.Slot{ @@ -188,7 +179,7 @@ func buildSlotMessage(msg format.Message, recipient *id.ID, target *id.ID, } // Add the mac proving ownership - slot.MAC = roundKeys.MakeClientGatewayKey(salt, + slot.MAC = mixCrypt.MakeClientGatewayKey(salt, network.GenerateSlotDigest(slot)) return slot, encMsg, ephID, nil diff --git a/network/message/sendCmix_test.go b/network/sendCmix_test.go similarity index 88% rename from network/message/sendCmix_test.go rename to network/sendCmix_test.go index 8ef85711c..c45f8b38c 100644 --- a/network/message/sendCmix_test.go +++ b/network/sendCmix_test.go @@ -1,4 +1,4 @@ -package message +package network import ( "github.com/pkg/errors" @@ -6,6 +6,7 @@ import ( "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/internal" + message2 "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/switchboard" "gitlab.com/elixxir/comms/client" @@ -39,7 +40,7 @@ func Test_attemptSendCmix(t *testing.T) { events := &dummyEvent{} sw := switchboard.New() - l := TestListener{ + l := message2.TestListener{ ch: make(chan bool), } sw.RegisterListener(sess2.GetUser().TransmissionID, message.Raw, l) @@ -47,7 +48,7 @@ func Test_attemptSendCmix(t *testing.T) { if err != nil { t.Errorf("Failed to start client comms: %+v", err) } - inst, err := network.NewInstanceTesting(comms.ProtoComms, getNDF(), nil, nil, nil, t) + inst, err := network.NewInstanceTesting(comms.ProtoComms, message2.getNDF(), nil, nil, nil, t) if err != nil { t.Errorf("Failed to start instance: %+v", err) } @@ -103,23 +104,23 @@ func Test_attemptSendCmix(t *testing.T) { } p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 - sender, err := gateway.NewSender(p, i.Rng, getNDF(), &MockSendCMIXComms{t: t}, i.Session, nil) + sender, err := gateway.NewSender(p, i.Rng, message2.getNDF(), &message2.MockSendCMIXComms{t: t}, i.Session, nil) if err != nil { t.Errorf("%+v", errors.New(err.Error())) return } - m := NewManager(i, params.Network{Messages: params.Messages{ + m := message2.NewManager(i, params.Network{Messages: params.Messages{ MessageReceptionBuffLen: 20, MessageReceptionWorkerPoolSize: 20, - MaxChecksGarbledMessage: 20, - GarbledMessageWait: time.Hour, + MaxChecksRetryMessage: 20, + RetryMessageWait: time.Hour, }}, nil, sender) msgCmix := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen()) msgCmix.SetContents([]byte("test")) e2e.SetUnencrypted(msgCmix, m.Session.User().GetCryptographicIdentity().GetTransmissionID()) _, _, err = sendCmixHelper(sender, msgCmix, sess2.GetUser().ReceptionID, params.GetDefaultCMIX(), make(map[string]interface{}), m.Instance, m.Session, m.nodeRegistration, - m.Rng, events, m.TransmissionID, &MockSendCMIXComms{t: t}, nil) + m.Rng, events, m.TransmissionID, &message2.MockSendCMIXComms{t: t}, nil) if err != nil { t.Errorf("Failed to sendcmix: %+v", err) panic("t") diff --git a/network/message/sendManyCmix.go b/network/sendManyCmix.go similarity index 97% rename from network/message/sendManyCmix.go rename to network/sendManyCmix.go index a814b8493..872bacb7e 100644 --- a/network/message/sendManyCmix.go +++ b/network/sendManyCmix.go @@ -5,7 +5,7 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package message +package network import ( "fmt" @@ -35,7 +35,7 @@ import ( // with this call and can leak data about yourself. Returns the round ID of the // round the payload was sent or an error if it fails. // WARNING: Potentially Unsafe -func (m *Manager) SendManyCMIX(sender *gateway.Sender, +func (m *message2.manager) SendManyCMIX(sender *gateway.Sender, messages []message.TargetedCmixMessage, p params.CMIX, stop *stoppable.Single) (id.Round, []ephemeral.Id, error) { @@ -44,7 +44,7 @@ func (m *Manager) SendManyCMIX(sender *gateway.Sender, m.TransmissionID, m.Comms, stop) } -// sendManyCmixHelper is a helper function for Manager.SendManyCMIX. +// sendManyCmixHelper is a helper function for manager.SendManyCMIX. // // NOTE: Payloads sent are not end-to-end encrypted, metadata is NOT protected // with this call; see SendE2E for end-to-end encryption and full privacy @@ -215,7 +215,7 @@ func sendManyCmixHelper(sender *gateway.Sender, "in round %d", param.DebugTag, ephemeralIDsString, recipientString, bestRound.ID) jww.INFO.Print(m) events.Report(1, "MessageSendMany", "Metric", m) - onSend(uint32(len(msgs)), session) + trackNetworkRateLimit(uint32(len(msgs)), session) return id.Round(bestRound.ID), ephemeralIDs, nil } else { jww.FATAL.Panicf("Gateway %s returned no error, but failed to "+ diff --git a/network/message/sendManyCmix_test.go b/network/sendManyCmix_test.go similarity index 88% rename from network/message/sendManyCmix_test.go rename to network/sendManyCmix_test.go index 7ac3b99c3..8d5cb3387 100644 --- a/network/message/sendManyCmix_test.go +++ b/network/sendManyCmix_test.go @@ -1,4 +1,4 @@ -package message +package network import ( "github.com/pkg/errors" @@ -6,6 +6,7 @@ import ( "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/internal" + message2 "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/switchboard" "gitlab.com/elixxir/comms/client" @@ -34,7 +35,7 @@ func Test_attemptSendManyCmix(t *testing.T) { numRecipients := 3 recipients := make([]*id.ID, numRecipients) sw := switchboard.New() - l := TestListener{ + l := message2.TestListener{ ch: make(chan bool), } for i := 0; i < numRecipients; i++ { @@ -47,7 +48,7 @@ func Test_attemptSendManyCmix(t *testing.T) { if err != nil { t.Errorf("Failed to start client comms: %+v", err) } - inst, err := network.NewInstanceTesting(comms.ProtoComms, getNDF(), nil, nil, nil, t) + inst, err := network.NewInstanceTesting(comms.ProtoComms, message2.getNDF(), nil, nil, nil, t) if err != nil { t.Errorf("Failed to start instance: %+v", err) } @@ -103,16 +104,16 @@ func Test_attemptSendManyCmix(t *testing.T) { } p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 - sender, err := gateway.NewSender(p, i.Rng, getNDF(), &MockSendCMIXComms{t: t}, i.Session, nil) + sender, err := gateway.NewSender(p, i.Rng, message2.getNDF(), &message2.MockSendCMIXComms{t: t}, i.Session, nil) if err != nil { t.Errorf("%+v", errors.New(err.Error())) return } - m := NewManager(i, params.Network{Messages: params.Messages{ + m := message2.NewManager(i, params.Network{Messages: params.Messages{ MessageReceptionBuffLen: 20, MessageReceptionWorkerPoolSize: 20, - MaxChecksGarbledMessage: 20, - GarbledMessageWait: time.Hour, + MaxChecksRetryMessage: 20, + RetryMessageWait: time.Hour, }}, nil, sender) msgCmix := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen()) msgCmix.SetContents([]byte("test")) @@ -132,7 +133,7 @@ func Test_attemptSendManyCmix(t *testing.T) { _, _, err = sendManyCmixHelper(sender, msgList, params.GetDefaultCMIX(), make(map[string]interface{}), m.Instance, m.Session, m.nodeRegistration, - m.Rng, events, m.TransmissionID, &MockSendCMIXComms{t: t}, nil) + m.Rng, events, m.TransmissionID, &message2.MockSendCMIXComms{t: t}, nil) if err != nil { t.Errorf("Failed to sendcmix: %+v", err) } diff --git a/storage/partition/multiPartMessage_test.go b/storage/partition/multiPartMessage_test.go index 8d39165a7..040b3dfde 100644 --- a/storage/partition/multiPartMessage_test.go +++ b/storage/partition/multiPartMessage_test.go @@ -113,7 +113,7 @@ func CheckMultiPartMessages(expectedMpm *multiPartMessage, mpm *multiPartMessage } } -// Tests happy path of multiPartMessage.Add(). +// Tests happy path of multiPartMessage.AddFingerprint(). func TestMultiPartMessage_Add(t *testing.T) { // Generate test values prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) diff --git a/storage/utility/cmixMessageBuffer_test.go b/storage/utility/cmixMessageBuffer_test.go index 237d734eb..48be053ea 100644 --- a/storage/utility/cmixMessageBuffer_test.go +++ b/storage/utility/cmixMessageBuffer_test.go @@ -31,7 +31,7 @@ func TestCmixMessageHandler_SaveMessage(t *testing.T) { Msg: testMsgs[i].Marshal(), Recipient: ids[i].Marshal(), } - key := makeStoredMessageKey("testKey", cmh.HashMessage(msg)) + key := MakeStoredMessageKey("testKey", cmh.HashMessage(msg)) // Save message err := cmh.SaveMessage(kv, msg, key) @@ -67,7 +67,7 @@ func TestCmixMessageHandler_LoadMessage(t *testing.T) { Msg: testMsgs[i].Marshal(), Recipient: ids[i].Marshal(), } - key := makeStoredMessageKey("testKey", cmh.HashMessage(msg)) + key := MakeStoredMessageKey("testKey", cmh.HashMessage(msg)) // Save message if err := cmh.SaveMessage(kv, msg, key); err != nil { diff --git a/storage/utility/e2eMessageBuffer_test.go b/storage/utility/e2eMessageBuffer_test.go index 53bd84db4..d3fb4dc85 100644 --- a/storage/utility/e2eMessageBuffer_test.go +++ b/storage/utility/e2eMessageBuffer_test.go @@ -29,7 +29,7 @@ func TestE2EMessageHandler_SaveMessage(t *testing.T) { testMsgs, _ := makeTestE2EMessages(10, t) for _, msg := range testMsgs { - key := makeStoredMessageKey("testKey", emg.HashMessage(msg)) + key := MakeStoredMessageKey("testKey", emg.HashMessage(msg)) // Save message err := emg.SaveMessage(kv, msg, key) @@ -65,7 +65,7 @@ func TestE2EMessageHandler_LoadMessage(t *testing.T) { testMsgs, _ := makeTestE2EMessages(10, t) for _, msg := range testMsgs { - key := makeStoredMessageKey("testKey", cmh.HashMessage(msg)) + key := MakeStoredMessageKey("testKey", cmh.HashMessage(msg)) // Save message if err := cmh.SaveMessage(kv, msg, key); err != nil { diff --git a/storage/utility/knownRounds.go b/storage/utility/knownRounds.go index 2788a5f55..0f8057ce6 100644 --- a/storage/utility/knownRounds.go +++ b/storage/utility/knownRounds.go @@ -100,8 +100,8 @@ func (kr *KnownRounds) load() error { } // Deletes a known rounds object from disk and memory -func (kr *KnownRounds) Delete() error { - err := kr.kv.Delete(kr.key) +func (kr *KnownRounds) DeleteFingerprint() error { + err := kr.kv.DeleteFingerprint(kr.key) if err != nil { return err } diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go index f9891edce..6b8fcd10b 100644 --- a/storage/utility/messageBuffer.go +++ b/storage/utility/messageBuffer.go @@ -29,7 +29,7 @@ func (m MessageHash) String() string { const messageSubKey = "bufferedMessage" // Version of the file saved to the key value store -const currentMessageBufferVersion = 0 +const CurrentMessageBufferVersion = 0 // MessageHandler interface used to handle the passed in message type so the // buffer can be used at different layers of the stack. @@ -123,13 +123,13 @@ func (mb *MessageBuffer) save() error { // Create versioned object with data obj := versioned.Object{ - Version: currentMessageBufferVersion, + Version: CurrentMessageBufferVersion, Timestamp: now, Data: data, } // Save versioned object - return mb.kv.Set(mb.key, currentMessageBufferVersion, &obj) + return mb.kv.Set(mb.key, CurrentMessageBufferVersion, &obj) } // getMessageList returns a list of all message hashes stored in messages and @@ -145,7 +145,7 @@ func (mb *MessageBuffer) getMessageList() []MessageHash { i++ } - // Add messages from the "processing" list + // AddFingerprint messages from the "processing" list for msg := range mb.processingMessages { msgs[i] = msg i++ @@ -159,7 +159,7 @@ func (mb *MessageBuffer) getMessageList() []MessageHash { func (mb *MessageBuffer) load() error { // Load the versioned object - vo, err := mb.kv.Get(mb.key, currentMessageBufferVersion) + vo, err := mb.kv.Get(mb.key, CurrentMessageBufferVersion) if err != nil { return err } @@ -180,26 +180,27 @@ func (mb *MessageBuffer) load() error { } // Add adds a message to the buffer in "not processing" state. -func (mb *MessageBuffer) Add(m interface{}) { +func (mb *MessageBuffer) Add(m interface{}) interface{} { h := mb.handler.HashMessage(m) mb.mux.Lock() defer mb.mux.Unlock() // Ensure message does not already exist in buffer - _, exists1 := mb.messages[h] - _, exists2 := mb.processingMessages[h] - if exists1 || exists2 { - return + if face1, exists1 := mb.messages[h]; exists1 { + return face1 + } + if face2, exists2 := mb.processingMessages[h]; exists2 { + return face2 } // Save message as versioned object - err := mb.handler.SaveMessage(mb.kv, m, makeStoredMessageKey(mb.key, h)) + err := mb.handler.SaveMessage(mb.kv, m, MakeStoredMessageKey(mb.key, h)) if err != nil { jww.FATAL.Panicf("Error saving message: %v", err) } - // Add message to the buffer + // AddFingerprint message to the buffer mb.messages[h] = struct{}{} // Save buffer @@ -207,10 +208,12 @@ func (mb *MessageBuffer) Add(m interface{}) { if err != nil { jww.FATAL.Panicf("Error whilse saving buffer: %v", err) } + + return m } // Add adds a message to the buffer in "processing" state. -func (mb *MessageBuffer) AddProcessing(m interface{}) { +func (mb *MessageBuffer) AddProcessing(m interface{}) interface{} { h := mb.handler.HashMessage(m) jww.TRACE.Printf("Critical Messages AddProcessing(%s)", base64.StdEncoding.EncodeToString(h[:])) @@ -219,19 +222,20 @@ func (mb *MessageBuffer) AddProcessing(m interface{}) { defer mb.mux.Unlock() // Ensure message does not already exist in buffer - _, exists1 := mb.messages[h] - _, exists2 := mb.processingMessages[h] - if exists1 || exists2 { - return + if face1, exists1 := mb.messages[h]; exists1 { + return face1 + } + if face2, exists2 := mb.processingMessages[h]; exists2 { + return face2 } // Save message as versioned object - err := mb.handler.SaveMessage(mb.kv, m, makeStoredMessageKey(mb.key, h)) + err := mb.handler.SaveMessage(mb.kv, m, MakeStoredMessageKey(mb.key, h)) if err != nil { jww.FATAL.Panicf("Error saving message: %v", err) } - // Add message to the buffer + // AddFingerprint message to the buffer mb.processingMessages[h] = struct{}{} // Save buffer @@ -239,6 +243,8 @@ func (mb *MessageBuffer) AddProcessing(m interface{}) { if err != nil { jww.FATAL.Panicf("Error whilse saving buffer: %v", err) } + + return m } // Next gets the next message from the buffer whose state is "not processing". @@ -264,11 +270,11 @@ func (mb *MessageBuffer) Next() (interface{}, bool) { delete(mb.messages, h) - // Add message to list of processing messages + // AddFingerprint message to list of processing messages mb.processingMessages[h] = struct{}{} // Retrieve the message for storage - m, err = mb.handler.LoadMessage(mb.kv, makeStoredMessageKey(mb.key, h)) + m, err = mb.handler.LoadMessage(mb.kv, MakeStoredMessageKey(mb.key, h)) if err != nil { m = nil jww.ERROR.Printf("Failed to load message %s from store, "+ @@ -302,7 +308,7 @@ func (mb *MessageBuffer) Succeeded(m interface{}) { delete(mb.messages, h) // Done message from key value store - err := mb.handler.DeleteMessage(mb.kv, makeStoredMessageKey(mb.key, h)) + err := mb.handler.DeleteMessage(mb.kv, MakeStoredMessageKey(mb.key, h)) if err != nil { jww.ERROR.Printf("Failed to delete message from store, "+ "this may happen on occasion due to replays to increase "+ @@ -330,12 +336,12 @@ func (mb *MessageBuffer) Failed(m interface{}) { delete(mb.processingMessages, h) // Save message as versioned object - err := mb.handler.SaveMessage(mb.kv, m, makeStoredMessageKey(mb.key, h)) + err := mb.handler.SaveMessage(mb.kv, m, MakeStoredMessageKey(mb.key, h)) if err != nil { jww.FATAL.Panicf("Error saving message: %v", err) } - // Add to "not processed" state + // AddFingerprint to "not processed" state mb.messages[h] = struct{}{} // Save buffer @@ -345,7 +351,7 @@ func (mb *MessageBuffer) Failed(m interface{}) { } } -// makeStoredMessageKey generates a new key for the message based on its has. -func makeStoredMessageKey(key string, h MessageHash) string { +// MakeStoredMessageKey generates a new key for the message based on its has. +func MakeStoredMessageKey(key string, h MessageHash) string { return key + messageSubKey + base64.StdEncoding.EncodeToString(h[:]) } diff --git a/storage/utility/messageBuffer_test.go b/storage/utility/messageBuffer_test.go index f9afe57c1..e9d1a841b 100644 --- a/storage/utility/messageBuffer_test.go +++ b/storage/utility/messageBuffer_test.go @@ -196,7 +196,7 @@ func TestMessageBuffer_Add(t *testing.T) { } if !reflect.DeepEqual(expectedMessages, testMB.messages) { - t.Errorf("Add() failed to add messages correctly into the buffer."+ + t.Errorf("AddFingerprint() failed to add messages correctly into the buffer."+ "\n\texpected: %v\n\trecieved: %v", expectedMessages, testMB.messages) } @@ -207,7 +207,7 @@ func TestMessageBuffer_Add(t *testing.T) { } if !reflect.DeepEqual(expectedMessages, testMB.messages) { - t.Errorf("Add() failed to add messages correctly into the buffer."+ + t.Errorf("AddFingerprint() failed to add messages correctly into the buffer."+ "\n\texpected: %v\n\trecieved: %v", expectedMessages, testMB.messages) } @@ -253,7 +253,7 @@ func TestMessageBuffer_InvalidNext(t *testing.T) { m := []byte("This is a message that should fail") h := testMB.handler.HashMessage(m) testMB.Add(m) - err = testMB.handler.DeleteMessage(testMB.kv, makeStoredMessageKey(testMB.key, h)) + err = testMB.handler.DeleteMessage(testMB.kv, MakeStoredMessageKey(testMB.key, h)) if err != nil { t.Fatalf("Failed to set up test (delete from kv failed): %+v", err) } -- GitLab