diff --git a/network/message/fingerprints.go b/network/message/fingerprints.go index 35ddd9546931119bbe4f410aad9c42feb9e4df5c..48fc46e1406d8f8dc169bfaf2c0c17d3573910b9 100644 --- a/network/message/fingerprints.go +++ b/network/message/fingerprints.go @@ -40,8 +40,8 @@ func (f *FingerprintsManager) pop(clientID *id.ID, f.Lock() defer f.Unlock() cid := *clientID - if idFpmap, exists := f.fpMap[cid]; exists { - if proc, exists := idFpmap[fingerprint]; exists { + if idFpMap, exists := f.fpMap[cid]; exists { + if proc, exists := idFpMap[fingerprint]; exists { delete(f.fpMap[cid], fingerprint) if len(f.fpMap[cid]) == 0 { delete(f.fpMap, cid) @@ -58,8 +58,7 @@ func (f *FingerprintsManager) pop(clientID *id.ID, // value. If there is already an entry for this fingerprint, the // method returns with no write operation. func (f *FingerprintsManager) AddFingerprint(clientID *id.ID, - fingerprint format.Fingerprint, - mp interfaces.MessageProcessor) error { + fingerprint format.Fingerprint, mp interfaces.MessageProcessor) error { f.Lock() defer f.Unlock() @@ -98,8 +97,9 @@ func (f *FingerprintsManager) DeleteFingerprint(clientID *id.ID, } } -// DeleteClientFingerprints is a thread-safe deletion operation on the Fingerprints map. -// It will remove all entres for the given clientID from the map. +// DeleteClientFingerprints is a thread-safe deletion operation on the +// fingerprints map. It will remove all entries for the given clientID from the +// map. func (f *FingerprintsManager) DeleteClientFingerprints(clientID *id.ID) { f.Lock() defer f.Unlock() diff --git a/network/message/fingerprints_test.go b/network/message/fingerprints_test.go index eca547874e56a9878e6f8db82ad7d048644665c4..d7d2c40172775de0dfdc7939aa3aafab13cf2e08 100644 --- a/network/message/fingerprints_test.go +++ b/network/message/fingerprints_test.go @@ -10,211 +10,146 @@ package message import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces" + "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/primitives/id" "reflect" "strconv" - "sync" "testing" ) // Unit test. -func TestNewFingerprints(t *testing.T) { - expected := &Fingerprints{ - fingerprints: make(map[format.Fingerprint]*Processor), - RWMutex: sync.RWMutex{}, +func Test_newFingerprints(t *testing.T) { + expected := &FingerprintsManager{ + fpMap: make(map[id.ID]map[format.Fingerprint]interfaces.MessageProcessor), } received := newFingerprints() - if !reflect.DeepEqual(expected, received) { - t.Fatalf("NewFingerprint error: Did not construct expected object."+ - "\nExpected: %v"+ - "\nReceived: %v", expected, received) + t.Fatalf("New FingerprintsManager did not match expected."+ + "\nexpected: %+v\nreceived: %+v", expected, received) } } // Unit test. -func TestFingerprints_Get(t *testing.T) { +func TestFingerprintsManager_pop(t *testing.T) { // Construct fingerprint map fpTracker := newFingerprints() // Construct fingerprint and processor values + cid := id.NewIdFromString("clientID", id.User, t) fp := format.NewFingerprint([]byte("test")) mp := NewMockMsgProcessor(t) // Add the values to the tracker - fpTracker.AddFingerprint(fp, mp) + err := fpTracker.AddFingerprint(cid, fp, mp) + if err != nil { + t.Errorf("Failed to add fingerprint: %+v", err) + } // Attempt to retrieve value from map - received, exists := fpTracker.Get(fp) + received, exists := fpTracker.pop(cid, fp) if !exists { t.Fatalf("get error: Did not retrieve fingerprint (%s) that "+ "should have been in map.", fp) } // Check that received value contains the expected data - expected := newProcessor(mp) + expected := NewMockMsgProcessor(t) if !reflect.DeepEqual(received, expected) { t.Fatalf("get error: Map does not contain expected data."+ - "\nExpected: %v"+ - "\nReceived: %v", expected, received) + "\nexpected: %#v\nreceived: %#v", expected, received) } } // Unit test. -func TestFingerprints_AddFingerprint(t *testing.T) { +func TestFingerprintsManager_AddFingerprint(t *testing.T) { // Construct fingerprint map fpTracker := newFingerprints() // Construct fingerprint and processor values + cid := id.NewIdFromString("clientID", id.User, t) fp := format.NewFingerprint([]byte("test")) mp := NewMockMsgProcessor(t) // Add the values to the tracker - fpTracker.AddFingerprint(fp, mp) + err := fpTracker.AddFingerprint(cid, fp, mp) + if err != nil { + t.Errorf("Failed to add fingerprint: %+v", err) + } // Check that the fingerprint key has a map entry - received, exists := fpTracker.fingerprints[fp] + received, exists := fpTracker.fpMap[*cid] if !exists { t.Fatalf("Add did not write to map as expected. "+ "Fingerprint %s not found in map", fp) } // Check that received value contains the expected data - expected := newProcessor(mp) + expected := map[format.Fingerprint]interfaces.MessageProcessor{fp: mp} if !reflect.DeepEqual(received, expected) { t.Fatalf("Add error: Map does not contain expected data."+ - "\nExpected: %v"+ - "\nReceived: %v", expected, received) - } -} - -// Unit test. -func TestFingerprints_AddFingerprints(t *testing.T) { - // Construct fingerprints map - fpTracker := newFingerprints() - - // Construct slices of fingerprints and processors - numTests := 100 - fingerprints := make([]format.Fingerprint, 0, numTests) - processors := make([]interfaces.MessageProcessorFP, 0, numTests) - for i := 0; i < numTests; i++ { - fp := format.NewFingerprint([]byte(strconv.Itoa(i))) - mp := NewMockMsgProcessor(t) - - fingerprints = append(fingerprints, fp) - processors = append(processors, mp) - } - - // Add slices to map - err := fpTracker.AddFingerprints(fingerprints, processors) - if err != nil { - t.Fatalf("Adds unexpected error: %v", err) - } - - // Make sure every fingerprint is mapped to it's expected processor - for i, expected := range fingerprints { - received, exists := fpTracker.fingerprints[expected] - if !exists { - t.Errorf("Adds did not write to map as expected. "+ - "Fingerprint number %d (value: %s) not found in map", i, expected) - } - - if !reflect.DeepEqual(received, expected) { - t.Fatalf("Adds error: Map does not contain expected data for "+ - "fingerprint number %d."+ - "\nExpected: %v"+ - "\nReceived: %v", i, expected, received) - } - } - -} - -// Error case: Call Fingerprints.AddFingerprints with fingerprint and processor -// slices of different lengths. -func TestFingerprints_AddFingerprints_Error(t *testing.T) { - // Construct fingerprint map - fpTracker := newFingerprints() - - // Construct 2 slices of different lengths - fingerprints := []format.Fingerprint{ - format.NewFingerprint([]byte("1")), - format.NewFingerprint([]byte("2")), - format.NewFingerprint([]byte("3")), - } - processors := []interfaces.MessageProcessorFP{ - NewMockMsgProcessor(t), - } - - // Attempt to add fingerprints - err := fpTracker.AddFingerprints(fingerprints, processors) - if err == nil { - t.Fatalf("Add should have received an error with mismatched " + - "slices length") + "\nexpected: %v\nreceived: %v", expected, received) } - } -func TestFingerprints_RemoveFingerprint(t *testing.T) { +func TestFingerprintsManager_DeleteFingerprint(t *testing.T) { // Construct fingerprint map fpTracker := newFingerprints() // Construct fingerprint and processor values + cid := id.NewIdFromString("clientID", id.User, t) fp := format.NewFingerprint([]byte("test")) mp := NewMockMsgProcessor(t) // Add the values to the tracker - fpTracker.AddFingerprint(fp, mp) + err := fpTracker.AddFingerprint(cid, fp, mp) + if err != nil { + t.Errorf("Failed to add fingerprint: %+v", err) + } // Remove value from tracker - fpTracker.RemoveFingerprint(fp) + fpTracker.DeleteFingerprint(cid, fp) // Check that value no longer exists within the map - if _, exists := fpTracker.fingerprints[fp]; exists { + if _, exists := fpTracker.fpMap[*cid][fp]; exists { t.Fatalf("RemoveFingerprint error: "+ "Fingerprint %s exists in map after a RemoveFingerprint call", fp) } } // Unit test. -func TestFingerprints_RemoveFingerprints(t *testing.T) { +func TestFingerprintsManager_DeleteClientFingerprints(t *testing.T) { // Construct fingerprints map fpTracker := newFingerprints() // Construct slices of fingerprints and processors numTests := 100 + cid := id.NewIdFromString("clientID", id.User, t) fingerprints := make([]format.Fingerprint, 0, numTests) - processors := make([]interfaces.MessageProcessorFP, 0, numTests) + processors := make([]interfaces.MessageProcessor, 0, numTests) for i := 0; i < numTests; i++ { fp := format.NewFingerprint([]byte(strconv.Itoa(i))) mp := NewMockMsgProcessor(t) + // Add the values to the tracker + err := fpTracker.AddFingerprint(cid, fp, mp) + if err != nil { + t.Errorf("Failed to add fingerprint: %+v", err) + } + fingerprints = append(fingerprints, fp) processors = append(processors, mp) } - // Add slices to map - err := fpTracker.AddFingerprints(fingerprints, processors) - if err != nil { - t.Fatalf("Add unexpected error: %v", err) - } - - fpTracker.RemoveFingerprints(fingerprints) + fpTracker.DeleteClientFingerprints(cid) // Make sure every fingerprint is mapped to it's expected processor - for i, expected := range fingerprints { - - if received, exists := fpTracker.fingerprints[expected]; !exists { - t.Fatalf("RemoveFingerprints error: Map does not contain "+ - "expected data for fingerprint number %d."+ - "\nExpected: %v"+ - "\nReceived: %v", i, expected, received) - } - + if _, exists := fpTracker.fpMap[*cid]; exists { + t.Fatalf("RemoveFingerprints error: failed to delete client.") } - } // todo: consider moving this to a test utils somewhere else.. maybe in the interfaces package? @@ -231,10 +166,11 @@ func NewMockMsgProcessor(face interface{}) *MockMsgProcessor { return &MockMsgProcessor{} } -func (mock *MockMsgProcessor) MarkFingerprintUsed(fingerprint format.Fingerprint) { +func (mock *MockMsgProcessor) MarkFingerprintUsed(_ format.Fingerprint) { return } -func (mock *MockMsgProcessor) Process(message format.Message, fingerprint format.Fingerprint) { +func (mock *MockMsgProcessor) Process(format.Message, interfaces.Identity, + *mixmessages.RoundInfo) { return } diff --git a/network/message/inProgress.go b/network/message/inProgress.go index b96d85d369533a372b298c88ff853423e6a443a2..c4e730bb824b6bee0ea3ebcaa1d7af02e6ab065a 100644 --- a/network/message/inProgress.go +++ b/network/message/inProgress.go @@ -18,21 +18,21 @@ import ( // to decrypt a message, it is added to the garbled message buffer (which is // stored on disk) and the message decryption is retried here whenever triggered. -// This can be triggered through the CheckInProgressMessages on the network pickup -// and is used in the /keyExchange package on successful rekey triggering +// This can be triggered through the CheckInProgressMessages on the network +// pickup and is used in the /keyExchange package on successful rekey triggering. -// CheckInProgressMessages triggers rechecking all in progress messages -// if the queue is not full Exposed on the network pickup +// CheckInProgressMessages triggers rechecking all in progress messages if the +// queue is not full Exposed on the network pickup. func (p *pickup) CheckInProgressMessages() { select { case p.checkInProgress <- struct{}{}: default: - jww.WARN.Println("Failed to check garbled messages " + - "due to full channel") + jww.WARN.Print("Failed to check garbled messages due to full channel.") } } -//long running thread which processes messages that need to be checked +// recheckInProgressRunner is a long-running thread which processes messages +// that need to be checked. func (p *pickup) recheckInProgressRunner(stop *stoppable.Single) { for { select { @@ -46,9 +46,10 @@ func (p *pickup) recheckInProgressRunner(stop *stoppable.Single) { } } -//handler for a single run of recheck messages +// recheckInProgress is the handler for a single run of recheck messages. func (p *pickup) recheckInProgress() { - //try to decrypt every garbled message, excising those who's counts are too high + // Try to decrypt every garbled message, excising those whose counts are too + // high for grbldMsg, ri, identity, has := p.inProcess.Next(); has; grbldMsg, ri, identity, has = p.inProcess.Next() { bundle := Bundle{ Round: id.Round(ri.ID), @@ -57,12 +58,11 @@ func (p *pickup) recheckInProgress() { Finish: func() {}, Identity: identity, } + select { case p.messageReception <- bundle: default: - jww.WARN.Printf("failed to send bundle, channel full") - + jww.WARN.Printf("Failed to send bundle, channel full.") } - } } diff --git a/network/message/inProgress_test.go b/network/message/inProgress_test.go index c8646569468969f130144187364e29deafea3646..f40c3f4dc2dded5ca5301e0fb5ec9d5c976c666d 100644 --- a/network/message/inProgress_test.go +++ b/network/message/inProgress_test.go @@ -36,13 +36,12 @@ type TestListener struct { ch chan bool } -// the Hear function is called to exercise the listener, passing in the -// data as an item +// Hear is called to exercise the listener, passing in the data as an item. func (l TestListener) Hear(item message.Receive) { l.ch <- true } -// Returns a name, used for debugging +// Name returns a name; used for debugging. func (l TestListener) Name() string { return "TEST LISTENER FOR GARBLED MESSAGES" } @@ -64,7 +63,6 @@ func TestManager_CheckGarbledMessages(t *testing.T) { i := internal.Internal{ Session: sess1, Switchboard: sw, - Rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), Comms: comms, Health: nil, TransmissionID: sess1.GetUser().TransmissionID, diff --git a/network/message/meteredCmixMessageBuffer.go b/network/message/meteredCmixMessageBuffer.go index de7008d5875ec54f5f714f46c2e25c22e78ae905..d8fdf9c6ab087faceb372213d2c5e4a435750470 100644 --- a/network/message/meteredCmixMessageBuffer.go +++ b/network/message/meteredCmixMessageBuffer.go @@ -37,7 +37,8 @@ type meteredCmixMessage struct { // SaveMessage saves the message as a versioned object at the specified key in // the key value store. -func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error { +func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, + key string) error { msg := m.(meteredCmixMessage) marshaled, err := json.Marshal(&msg) @@ -59,7 +60,8 @@ func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, k // LoadMessage returns the message with the specified key from the key value // store. An empty message and error are returned if the message could not be // retrieved. -func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { +func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) ( + interface{}, error) { // Load the versioned object vo, err := kv.Get(key, currentMeteredCmixMessageVersion) if err != nil { @@ -69,7 +71,8 @@ func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (int msg := meteredCmixMessage{} err = json.Unmarshal(vo.Data, &msg) if err != nil { - return nil, errors.WithMessage(err, "Failed to unmarshal metered cmix message") + return nil, + errors.WithMessage(err, "Failed to unmarshal metered cmix message") } // Create message from data @@ -96,7 +99,7 @@ func (*meteredCmixMessageHandler) HashMessage(m interface{}) utility.MessageHash return messageHash } -// CmixMessageBuffer wraps the message buffer to store and load raw cmix +// MeteredCmixMessageBuffer wraps the message buffer to store and load raw cMix // messages. type MeteredCmixMessageBuffer struct { mb *utility.MessageBuffer @@ -104,7 +107,8 @@ type MeteredCmixMessageBuffer struct { key string } -func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { +func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) ( + *MeteredCmixMessageBuffer, error) { mb, err := utility.NewMessageBuffer(kv, &meteredCmixMessageHandler{}, key) if err != nil { return nil, err @@ -113,7 +117,8 @@ func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMess return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil } -func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { +func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) ( + *MeteredCmixMessageBuffer, error) { mb, err := utility.LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key) if err != nil { return nil, err @@ -122,7 +127,8 @@ func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMes return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil } -func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { +func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) ( + *MeteredCmixMessageBuffer, error) { mb, err := utility.LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key) if err != nil { jww.WARN.Printf("Failed to find MeteredCmixMessageBuffer %s, making a new one", key) @@ -132,7 +138,8 @@ func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCm return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil } -func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) (uint, time.Time) { +func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo, + identity interfaces.Identity) (uint, time.Time) { if m.GetPrimeByteLen() == 0 { jww.FATAL.Panicf("Cannot handle a metered " + "cmix message with a length of 0") @@ -145,7 +152,8 @@ func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo, id return addedMessage.Count, addedMessage.Timestamp } -func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) (uint, time.Time) { +func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message, + ri *pb.RoundInfo, identity interfaces.Identity) (uint, time.Time) { if m.GetPrimeByteLen() == 0 { jww.FATAL.Panicf("Cannot handle a metered " + "cmix message with a length of 0") @@ -158,7 +166,8 @@ func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message, ri *pb.Rou return addedMessage.Count, addedMessage.Timestamp } -func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, interfaces.Identity, bool) { +func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, + interfaces.Identity, bool) { m, ok := mcmb.mb.Next() if !ok { return format.Message{}, nil, interfaces.Identity{}, false @@ -169,7 +178,8 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, int // increment the count and save msg.Count++ mcmh := &meteredCmixMessageHandler{} - err := mcmh.SaveMessage(mcmb.kv, msg, utility.MakeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg))) + err := mcmh.SaveMessage(mcmb.kv, msg, + utility.MakeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg))) if err != nil { jww.FATAL.Panicf("Failed to save metered message after count "+ "update: %s", err) @@ -196,11 +206,13 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, int return msfFormat, ri, identity, true } -func (mcmb *MeteredCmixMessageBuffer) Remove(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) { +func (mcmb *MeteredCmixMessageBuffer) Remove(m format.Message, ri *pb.RoundInfo, + identity interfaces.Identity) { mcmb.mb.Succeeded(buildMsg(m, ri, identity)) } -func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) { +func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message, ri *pb.RoundInfo, + identity interfaces.Identity) { mcmb.mb.Failed(buildMsg(m, ri, identity)) } diff --git a/network/message/meteredCmixMessageBuffer_test.go b/network/message/meteredCmixMessageBuffer_test.go index f1c46b8f81f031260e7112acaaa8fb4e1bb3b679..6cb697143d4a60f0989f0b92dada0eac71fce1fc 100644 --- a/network/message/meteredCmixMessageBuffer_test.go +++ b/network/message/meteredCmixMessageBuffer_test.go @@ -10,10 +10,13 @@ package message import ( "bytes" "encoding/json" + "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/versioned" + pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/ekv" "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/netTime" "math/rand" "testing" @@ -128,62 +131,37 @@ func Test_meteredCmixMessageHandler_Smoke(t *testing.T) { mcmb, err := NewMeteredCmixMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") if err != nil { t.Errorf("NewMeteredCmixMessageBuffer() returned an error."+ - "\n\texpected: %v\n\trecieved: %v", nil, err) + "\nexpected: %v\nrecieved: %v", nil, err) } // AddFingerprint two messages + mcmb.Add(testMsgs[0], &pb.RoundInfo{ID: 1, Timestamps: []uint64{0, 1, 2, 3}}, interfaces.Identity{ + Source: id.NewIdFromString("user1", id.User, t)}) + mcmb.Add(testMsgs[1], &pb.RoundInfo{ID: 2, Timestamps: []uint64{0, 1, 2, 3}}, interfaces.Identity{ + Source: id.NewIdFromString("user2", id.User, t)}) - mcmb.Add(testMsgs[0]) - mcmb.Add(testMsgs[1]) - - if len(mcmb.mb.messages) != 2 { - t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 2, len(mcmb.mb.messages)) - } - - msg, _, _, exists := mcmb.Next() + msg, ri, identity, exists := mcmb.Next() if !exists { t.Error("Next() did not find any messages in buffer.") } - mcmb.Remove(msg) + mcmb.Remove(msg, ri, identity) - if len(mcmb.mb.messages) != 1 { - t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 1, len(mcmb.mb.messages)) - } - - msg, _, _, exists = mcmb.Next() + msg, ri, identity, exists = mcmb.Next() if !exists { t.Error("Next() did not find any messages in buffer.") } - if len(mcmb.mb.messages) != 0 { - t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 0, len(mcmb.mb.messages)) - } - mcmb.Failed(msg) + mcmb.Failed(msg, ri, identity) - if len(mcmb.mb.messages) != 1 { - t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 1, len(mcmb.mb.messages)) - } - - msg, _, _, exists = mcmb.Next() + msg, ri, identity, exists = mcmb.Next() if !exists { t.Error("Next() did not find any messages in buffer.") } - mcmb.Remove(msg) + mcmb.Remove(msg, ri, identity) msg, _, _, exists = mcmb.Next() if exists { t.Error("Next() found a message in the buffer when it should be empty.") } - mcmb.Remove(msg) - - if len(mcmb.mb.messages) != 0 { - t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 0, len(mcmb.mb.messages)) - } - } // makeTestMeteredCmixMessage creates a list of messages with random data and the diff --git a/network/message/pickup.go b/network/message/pickup.go index fbd38cbd8b5b7c3758e1609eeb564bcad6effa14..83094fd879273c83596b058d9bb7ebe790a542b4 100644 --- a/network/message/pickup.go +++ b/network/message/pickup.go @@ -9,7 +9,6 @@ package message import ( "encoding/base64" - "fmt" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/network/nodes" "gitlab.com/elixxir/client/storage" @@ -18,6 +17,7 @@ import ( "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/rateLimiting" + "strconv" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/params" @@ -32,15 +32,15 @@ const ( type Pickup interface { GetMessageReceptionChannel() chan<- Bundle - StartProcessies() stoppable.Stoppable + StartProcesses() stoppable.Stoppable CheckInProgressMessages() - //Fingerprints + // Fingerprints AddFingerprint(clientID *id.ID, fingerprint format.Fingerprint, mp interfaces.MessageProcessor) error DeleteFingerprint(clientID *id.ID, fingerprint format.Fingerprint) DeleteClientFingerprints(clientID *id.ID) - //Triggers + // Triggers AddTrigger(clientID *id.ID, newTrigger interfaces.Trigger, response interfaces.MessageProcessor) DeleteTriggers(clientID *id.ID, preimage interfaces.Preimage, response interfaces.MessageProcessor) error DeleteClientTriggers(clientID *id.ID) @@ -67,7 +67,7 @@ type pickup struct { FingerprintsManager TriggersManager - //sending rate limit tracker + // sending rate limit tracker rateLimitBucket *rateLimiting.Bucket rateLimitParams utility.BucketParamStore } @@ -80,7 +80,7 @@ func NewPickup(param params.Network, garbled, err := NewOrLoadMeteredCmixMessageBuffer(session.GetKV(), inProcessKey) if err != nil { - jww.FATAL.Panicf("Failed to load or new the Garbled Messages system") + jww.FATAL.Panicf("Failed to load or new the Garbled Messages system: %v", err) } m := pickup{ @@ -111,23 +111,24 @@ func NewPickup(param params.Network, return &m } -//Gets the channel to send received messages on +// GetMessageReceptionChannel gets the channel to send received messages on. func (p *pickup) GetMessageReceptionChannel() chan<- Bundle { return p.messageReception } -//Starts all worker pool -func (p *pickup) StartProcessies() stoppable.Stoppable { +// StartProcesses starts all worker pool. +func (p *pickup) StartProcesses() stoppable.Stoppable { multi := stoppable.NewMulti("MessageReception") - //create the message handler workers + // create the message handler workers for i := uint(0); i < p.param.MessageReceptionWorkerPoolSize; i++ { - stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i)) + stop := stoppable.NewSingle( + "MessageReception Worker " + strconv.Itoa(int(i))) go p.handleMessages(stop) multi.Add(stop) } - //create the in progress messages thread + // create the in progress messages thread garbledStop := stoppable.NewSingle("GarbledMessages") go p.recheckInProgressRunner(garbledStop) multi.Add(garbledStop) diff --git a/network/message/triggers.go b/network/message/triggers.go index 16461f2c43a1cbc697a8cdf6d4d6e5d309bbd901..6242f49091e854dc3574e822eb9fec29ce073661 100644 --- a/network/message/triggers.go +++ b/network/message/triggers.go @@ -16,24 +16,24 @@ import ( "gitlab.com/xx_network/primitives/id" ) -/* Trigger - predefined hash based tags appended to all cmix messages -which, though trial hashing, are used to determine if a message applies -to this client +/* Trigger - predefined hash based tags appended to all cMix messages which, +though trial hashing, are used to determine if a message applies to this client. -Triggers are used for 2 purposes - can be processed by the notifications system, -or can be used to implement custom non fingerprint processing of payloads. -I.E. key negotiation, broadcast negotiation +Triggers are used for 2 purposes - can be processed by the notification system, +or can be used to implement custom non fingerprint processing of payloads (i.e. +key negotiation and broadcast negotiation). A tag is appended to the message of the format tag = H(H(messageContents),preimage) and trial hashing is used to determine if a message adheres to a tag. -WARNING: If a preiamge is known by an adversary, they can determine which messages -are for the client. +WARNING: If a preimage is known by an adversary, they can determine which +messages are for the client. -Due to the extra overhead of trial hashing, triggers are processed after fingerprints. -If a fingerprint match occurs on the message, triggers will not be handled. +Due to the extra overhead of trial hashing, triggers are processed after +fingerprints. If a fingerprint match occurs on the message, triggers will not be +handled. -Triggers are ephemeral to the session. When starting a new client, all triggers must be -re-added before StartNetworkFollower is called. +Triggers are ephemeral to the session. When starting a new client, all triggers +must be re-added before StartNetworkFollower is called. */ type TriggersManager struct { @@ -52,19 +52,19 @@ func NewTriggers() *TriggersManager { } // Lookup will see if a trigger exists for the given preimage and message -// contents. It will do this by trial hashing the preimages in the map with -// the received message contents, until either a match to the -// received identity fingerprint is received or it has exhausted the map. -// If a match is found, this means the message received is for the client, -// and that one or multiple triggers exist to process this message. +// contents. It will do this by trial hashing the preimages in the map with the +// received message contents, until either a match to the received identity +// fingerprint is received or it has exhausted the map. +// If a match is found, this means the message received is for the client, and +// that one or multiple triggers exist to process this message. // These triggers are returned to the caller along with the a true boolean. // If the map has been exhausted with no matches found, it returns nil and false. -// todo: reorganize this interface. Lookup needs to be called -// by handleMessage, which should not have access to the other -// state modifying methods below. Possible options include: +// TODO: reorganize this interface. Lookup needs to be called by handleMessage, +// which should not have access to the other state modifying methods below. +// Possible options include: // - privatizing the state-changing methods -// - leaking lookup on this layer and migrating the state modifiation methods -// a layer down in a sepearate package +// - leaking lookup on this layer and migrating the state modification methods +// a layer down in a separate package func (t *TriggersManager) get(clientID *id.ID, receivedIdentityFp, ecrMsgContents []byte) ([]trigger, bool) { @@ -87,9 +87,9 @@ func (t *TriggersManager) get(clientID *id.ID, receivedIdentityFp, return nil, false } -// AddTrigger - Adds a trigger which can call a message -// handing function or be used for notifications. -// Multiple triggers can be registered for the same preimage. +// AddTrigger adds a trigger which can call a message handing function or be +// used for notifications. Multiple triggers can be registered for the same +// preimage. // preimage - the preimage which is triggered on // type - a descriptive string of the trigger. Generally used in notifications // source - a byte buffer of related data. Generally used in notifications. @@ -119,9 +119,9 @@ func (t *TriggersManager) AddTrigger(clientID *id.ID, newTrigger interfaces.Trig } // DeleteTriggers - If only a single response is associated with the preimage, -// the entire preimage is removed. If there is more than one response, only -// the given response is removed. If nil is passed in for response, -// all triggers for the preimage will be removed. +// the entire preimage is removed. If there is more than one response, only the +// given response is removed. If nil is passed in for response, all triggers for +// the preimage will be removed. func (t *TriggersManager) DeleteTriggers(clientID *id.ID, preimage interfaces.Preimage, response interfaces.MessageProcessor) error { t.Lock() @@ -162,7 +162,7 @@ func (t *TriggersManager) DeleteTriggers(clientID *id.ID, preimage interfaces.Pr return nil } -// DeleteClientTriggers - delete the mapping associated with an ID +// DeleteClientTriggers deletes the mapping associated with an ID. func (t *TriggersManager) DeleteClientTriggers(clientID *id.ID) { t.Lock() defer t.Unlock() diff --git a/network/message/utils_test.go b/network/message/utils_test.go index e0bf2222d6f36a6934fd3469cfa1222fe257bae1..bd15af236399fd5bb355da541bbf4da9a5567922 100644 --- a/network/message/utils_test.go +++ b/network/message/utils_test.go @@ -24,7 +24,8 @@ func (mc *MockSendCMIXComms) GetHost(*id.ID) (*connect.Host, bool) { return h, true } -func (mc *MockSendCMIXComms) AddHost(*id.ID, string, []byte, connect.HostParams) (host *connect.Host, err error) { +func (mc *MockSendCMIXComms) AddHost( + *id.ID, string, []byte, connect.HostParams) (host *connect.Host, err error) { host, _ = mc.GetHost(nil) return host, nil } @@ -33,14 +34,16 @@ func (mc *MockSendCMIXComms) RemoveHost(*id.ID) { } -func (mc *MockSendCMIXComms) SendPutMessage(*connect.Host, *mixmessages.GatewaySlot, time.Duration) (*mixmessages.GatewaySlotResponse, error) { +func (mc *MockSendCMIXComms) SendPutMessage(*connect.Host, + *mixmessages.GatewaySlot, time.Duration) (*mixmessages.GatewaySlotResponse, error) { return &mixmessages.GatewaySlotResponse{ Accepted: true, RoundID: 3, }, nil } -func (mc *MockSendCMIXComms) SendPutManyMessages(*connect.Host, *mixmessages.GatewaySlots, time.Duration) (*mixmessages.GatewaySlotResponse, error) { +func (mc *MockSendCMIXComms) SendPutManyMessages(*connect.Host, + *mixmessages.GatewaySlots, time.Duration) (*mixmessages.GatewaySlotResponse, error) { return &mixmessages.GatewaySlotResponse{ Accepted: true, RoundID: 3,