Skip to content
Snippets Groups Projects
Commit abc125e2 authored by Jono Wenger's avatar Jono Wenger
Browse files

Fix mosts tests in network/message

parent 153bb2ea
No related branches found
No related tags found
4 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast,!187Xx 3829/triggers
...@@ -40,8 +40,8 @@ func (f *FingerprintsManager) pop(clientID *id.ID, ...@@ -40,8 +40,8 @@ func (f *FingerprintsManager) pop(clientID *id.ID,
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
cid := *clientID cid := *clientID
if idFpmap, exists := f.fpMap[cid]; exists { if idFpMap, exists := f.fpMap[cid]; exists {
if proc, exists := idFpmap[fingerprint]; exists { if proc, exists := idFpMap[fingerprint]; exists {
delete(f.fpMap[cid], fingerprint) delete(f.fpMap[cid], fingerprint)
if len(f.fpMap[cid]) == 0 { if len(f.fpMap[cid]) == 0 {
delete(f.fpMap, cid) delete(f.fpMap, cid)
...@@ -58,8 +58,7 @@ func (f *FingerprintsManager) pop(clientID *id.ID, ...@@ -58,8 +58,7 @@ func (f *FingerprintsManager) pop(clientID *id.ID,
// value. If there is already an entry for this fingerprint, the // value. If there is already an entry for this fingerprint, the
// method returns with no write operation. // method returns with no write operation.
func (f *FingerprintsManager) AddFingerprint(clientID *id.ID, func (f *FingerprintsManager) AddFingerprint(clientID *id.ID,
fingerprint format.Fingerprint, fingerprint format.Fingerprint, mp interfaces.MessageProcessor) error {
mp interfaces.MessageProcessor) error {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
...@@ -98,8 +97,9 @@ func (f *FingerprintsManager) DeleteFingerprint(clientID *id.ID, ...@@ -98,8 +97,9 @@ func (f *FingerprintsManager) DeleteFingerprint(clientID *id.ID,
} }
} }
// DeleteClientFingerprints is a thread-safe deletion operation on the Fingerprints map. // DeleteClientFingerprints is a thread-safe deletion operation on the
// It will remove all entres for the given clientID from the map. // fingerprints map. It will remove all entries for the given clientID from the
// map.
func (f *FingerprintsManager) DeleteClientFingerprints(clientID *id.ID) { func (f *FingerprintsManager) DeleteClientFingerprints(clientID *id.ID) {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
......
...@@ -10,211 +10,146 @@ package message ...@@ -10,211 +10,146 @@ package message
import ( import (
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
"reflect" "reflect"
"strconv" "strconv"
"sync"
"testing" "testing"
) )
// Unit test. // Unit test.
func TestNewFingerprints(t *testing.T) { func Test_newFingerprints(t *testing.T) {
expected := &Fingerprints{ expected := &FingerprintsManager{
fingerprints: make(map[format.Fingerprint]*Processor), fpMap: make(map[id.ID]map[format.Fingerprint]interfaces.MessageProcessor),
RWMutex: sync.RWMutex{},
} }
received := newFingerprints() received := newFingerprints()
if !reflect.DeepEqual(expected, received) { if !reflect.DeepEqual(expected, received) {
t.Fatalf("NewFingerprint error: Did not construct expected object."+ t.Fatalf("New FingerprintsManager did not match expected."+
"\nExpected: %v"+ "\nexpected: %+v\nreceived: %+v", expected, received)
"\nReceived: %v", expected, received)
} }
} }
// Unit test. // Unit test.
func TestFingerprints_Get(t *testing.T) { func TestFingerprintsManager_pop(t *testing.T) {
// Construct fingerprint map // Construct fingerprint map
fpTracker := newFingerprints() fpTracker := newFingerprints()
// Construct fingerprint and processor values // Construct fingerprint and processor values
cid := id.NewIdFromString("clientID", id.User, t)
fp := format.NewFingerprint([]byte("test")) fp := format.NewFingerprint([]byte("test"))
mp := NewMockMsgProcessor(t) mp := NewMockMsgProcessor(t)
// Add the values to the tracker // Add the values to the tracker
fpTracker.AddFingerprint(fp, mp) err := fpTracker.AddFingerprint(cid, fp, mp)
if err != nil {
t.Errorf("Failed to add fingerprint: %+v", err)
}
// Attempt to retrieve value from map // Attempt to retrieve value from map
received, exists := fpTracker.Get(fp) received, exists := fpTracker.pop(cid, fp)
if !exists { if !exists {
t.Fatalf("get error: Did not retrieve fingerprint (%s) that "+ t.Fatalf("get error: Did not retrieve fingerprint (%s) that "+
"should have been in map.", fp) "should have been in map.", fp)
} }
// Check that received value contains the expected data // Check that received value contains the expected data
expected := newProcessor(mp) expected := NewMockMsgProcessor(t)
if !reflect.DeepEqual(received, expected) { if !reflect.DeepEqual(received, expected) {
t.Fatalf("get error: Map does not contain expected data."+ t.Fatalf("get error: Map does not contain expected data."+
"\nExpected: %v"+ "\nexpected: %#v\nreceived: %#v", expected, received)
"\nReceived: %v", expected, received)
} }
} }
// Unit test. // Unit test.
func TestFingerprints_AddFingerprint(t *testing.T) { func TestFingerprintsManager_AddFingerprint(t *testing.T) {
// Construct fingerprint map // Construct fingerprint map
fpTracker := newFingerprints() fpTracker := newFingerprints()
// Construct fingerprint and processor values // Construct fingerprint and processor values
cid := id.NewIdFromString("clientID", id.User, t)
fp := format.NewFingerprint([]byte("test")) fp := format.NewFingerprint([]byte("test"))
mp := NewMockMsgProcessor(t) mp := NewMockMsgProcessor(t)
// Add the values to the tracker // Add the values to the tracker
fpTracker.AddFingerprint(fp, mp) err := fpTracker.AddFingerprint(cid, fp, mp)
if err != nil {
t.Errorf("Failed to add fingerprint: %+v", err)
}
// Check that the fingerprint key has a map entry // Check that the fingerprint key has a map entry
received, exists := fpTracker.fingerprints[fp] received, exists := fpTracker.fpMap[*cid]
if !exists { if !exists {
t.Fatalf("Add did not write to map as expected. "+ t.Fatalf("Add did not write to map as expected. "+
"Fingerprint %s not found in map", fp) "Fingerprint %s not found in map", fp)
} }
// Check that received value contains the expected data // Check that received value contains the expected data
expected := newProcessor(mp) expected := map[format.Fingerprint]interfaces.MessageProcessor{fp: mp}
if !reflect.DeepEqual(received, expected) { if !reflect.DeepEqual(received, expected) {
t.Fatalf("Add error: Map does not contain expected data."+ t.Fatalf("Add error: Map does not contain expected data."+
"\nExpected: %v"+ "\nexpected: %v\nreceived: %v", expected, received)
"\nReceived: %v", expected, received)
}
}
// Unit test.
func TestFingerprints_AddFingerprints(t *testing.T) {
// Construct fingerprints map
fpTracker := newFingerprints()
// Construct slices of fingerprints and processors
numTests := 100
fingerprints := make([]format.Fingerprint, 0, numTests)
processors := make([]interfaces.MessageProcessorFP, 0, numTests)
for i := 0; i < numTests; i++ {
fp := format.NewFingerprint([]byte(strconv.Itoa(i)))
mp := NewMockMsgProcessor(t)
fingerprints = append(fingerprints, fp)
processors = append(processors, mp)
}
// Add slices to map
err := fpTracker.AddFingerprints(fingerprints, processors)
if err != nil {
t.Fatalf("Adds unexpected error: %v", err)
}
// Make sure every fingerprint is mapped to it's expected processor
for i, expected := range fingerprints {
received, exists := fpTracker.fingerprints[expected]
if !exists {
t.Errorf("Adds did not write to map as expected. "+
"Fingerprint number %d (value: %s) not found in map", i, expected)
} }
if !reflect.DeepEqual(received, expected) {
t.Fatalf("Adds error: Map does not contain expected data for "+
"fingerprint number %d."+
"\nExpected: %v"+
"\nReceived: %v", i, expected, received)
}
}
}
// Error case: Call Fingerprints.AddFingerprints with fingerprint and processor
// slices of different lengths.
func TestFingerprints_AddFingerprints_Error(t *testing.T) {
// Construct fingerprint map
fpTracker := newFingerprints()
// Construct 2 slices of different lengths
fingerprints := []format.Fingerprint{
format.NewFingerprint([]byte("1")),
format.NewFingerprint([]byte("2")),
format.NewFingerprint([]byte("3")),
}
processors := []interfaces.MessageProcessorFP{
NewMockMsgProcessor(t),
}
// Attempt to add fingerprints
err := fpTracker.AddFingerprints(fingerprints, processors)
if err == nil {
t.Fatalf("Add should have received an error with mismatched " +
"slices length")
}
} }
func TestFingerprints_RemoveFingerprint(t *testing.T) { func TestFingerprintsManager_DeleteFingerprint(t *testing.T) {
// Construct fingerprint map // Construct fingerprint map
fpTracker := newFingerprints() fpTracker := newFingerprints()
// Construct fingerprint and processor values // Construct fingerprint and processor values
cid := id.NewIdFromString("clientID", id.User, t)
fp := format.NewFingerprint([]byte("test")) fp := format.NewFingerprint([]byte("test"))
mp := NewMockMsgProcessor(t) mp := NewMockMsgProcessor(t)
// Add the values to the tracker // Add the values to the tracker
fpTracker.AddFingerprint(fp, mp) err := fpTracker.AddFingerprint(cid, fp, mp)
if err != nil {
t.Errorf("Failed to add fingerprint: %+v", err)
}
// Remove value from tracker // Remove value from tracker
fpTracker.RemoveFingerprint(fp) fpTracker.DeleteFingerprint(cid, fp)
// Check that value no longer exists within the map // Check that value no longer exists within the map
if _, exists := fpTracker.fingerprints[fp]; exists { if _, exists := fpTracker.fpMap[*cid][fp]; exists {
t.Fatalf("RemoveFingerprint error: "+ t.Fatalf("RemoveFingerprint error: "+
"Fingerprint %s exists in map after a RemoveFingerprint call", fp) "Fingerprint %s exists in map after a RemoveFingerprint call", fp)
} }
} }
// Unit test. // Unit test.
func TestFingerprints_RemoveFingerprints(t *testing.T) { func TestFingerprintsManager_DeleteClientFingerprints(t *testing.T) {
// Construct fingerprints map // Construct fingerprints map
fpTracker := newFingerprints() fpTracker := newFingerprints()
// Construct slices of fingerprints and processors // Construct slices of fingerprints and processors
numTests := 100 numTests := 100
cid := id.NewIdFromString("clientID", id.User, t)
fingerprints := make([]format.Fingerprint, 0, numTests) fingerprints := make([]format.Fingerprint, 0, numTests)
processors := make([]interfaces.MessageProcessorFP, 0, numTests) processors := make([]interfaces.MessageProcessor, 0, numTests)
for i := 0; i < numTests; i++ { for i := 0; i < numTests; i++ {
fp := format.NewFingerprint([]byte(strconv.Itoa(i))) fp := format.NewFingerprint([]byte(strconv.Itoa(i)))
mp := NewMockMsgProcessor(t) mp := NewMockMsgProcessor(t)
fingerprints = append(fingerprints, fp) // Add the values to the tracker
processors = append(processors, mp) err := fpTracker.AddFingerprint(cid, fp, mp)
if err != nil {
t.Errorf("Failed to add fingerprint: %+v", err)
} }
// Add slices to map fingerprints = append(fingerprints, fp)
err := fpTracker.AddFingerprints(fingerprints, processors) processors = append(processors, mp)
if err != nil {
t.Fatalf("Add unexpected error: %v", err)
} }
fpTracker.RemoveFingerprints(fingerprints) fpTracker.DeleteClientFingerprints(cid)
// Make sure every fingerprint is mapped to it's expected processor // Make sure every fingerprint is mapped to it's expected processor
for i, expected := range fingerprints { if _, exists := fpTracker.fpMap[*cid]; exists {
t.Fatalf("RemoveFingerprints error: failed to delete client.")
if received, exists := fpTracker.fingerprints[expected]; !exists {
t.Fatalf("RemoveFingerprints error: Map does not contain "+
"expected data for fingerprint number %d."+
"\nExpected: %v"+
"\nReceived: %v", i, expected, received)
} }
}
} }
// todo: consider moving this to a test utils somewhere else.. maybe in the interfaces package? // todo: consider moving this to a test utils somewhere else.. maybe in the interfaces package?
...@@ -231,10 +166,11 @@ func NewMockMsgProcessor(face interface{}) *MockMsgProcessor { ...@@ -231,10 +166,11 @@ func NewMockMsgProcessor(face interface{}) *MockMsgProcessor {
return &MockMsgProcessor{} return &MockMsgProcessor{}
} }
func (mock *MockMsgProcessor) MarkFingerprintUsed(fingerprint format.Fingerprint) { func (mock *MockMsgProcessor) MarkFingerprintUsed(_ format.Fingerprint) {
return return
} }
func (mock *MockMsgProcessor) Process(message format.Message, fingerprint format.Fingerprint) { func (mock *MockMsgProcessor) Process(format.Message, interfaces.Identity,
*mixmessages.RoundInfo) {
return return
} }
...@@ -18,21 +18,21 @@ import ( ...@@ -18,21 +18,21 @@ import (
// to decrypt a message, it is added to the garbled message buffer (which is // to decrypt a message, it is added to the garbled message buffer (which is
// stored on disk) and the message decryption is retried here whenever triggered. // stored on disk) and the message decryption is retried here whenever triggered.
// This can be triggered through the CheckInProgressMessages on the network pickup // This can be triggered through the CheckInProgressMessages on the network
// and is used in the /keyExchange package on successful rekey triggering // pickup and is used in the /keyExchange package on successful rekey triggering.
// CheckInProgressMessages triggers rechecking all in progress messages // CheckInProgressMessages triggers rechecking all in progress messages if the
// if the queue is not full Exposed on the network pickup // queue is not full Exposed on the network pickup.
func (p *pickup) CheckInProgressMessages() { func (p *pickup) CheckInProgressMessages() {
select { select {
case p.checkInProgress <- struct{}{}: case p.checkInProgress <- struct{}{}:
default: default:
jww.WARN.Println("Failed to check garbled messages " + jww.WARN.Print("Failed to check garbled messages due to full channel.")
"due to full channel")
} }
} }
//long running thread which processes messages that need to be checked // recheckInProgressRunner is a long-running thread which processes messages
// that need to be checked.
func (p *pickup) recheckInProgressRunner(stop *stoppable.Single) { func (p *pickup) recheckInProgressRunner(stop *stoppable.Single) {
for { for {
select { select {
...@@ -46,9 +46,10 @@ func (p *pickup) recheckInProgressRunner(stop *stoppable.Single) { ...@@ -46,9 +46,10 @@ func (p *pickup) recheckInProgressRunner(stop *stoppable.Single) {
} }
} }
//handler for a single run of recheck messages // recheckInProgress is the handler for a single run of recheck messages.
func (p *pickup) recheckInProgress() { func (p *pickup) recheckInProgress() {
//try to decrypt every garbled message, excising those who's counts are too high // Try to decrypt every garbled message, excising those whose counts are too
// high
for grbldMsg, ri, identity, has := p.inProcess.Next(); has; grbldMsg, ri, identity, has = p.inProcess.Next() { for grbldMsg, ri, identity, has := p.inProcess.Next(); has; grbldMsg, ri, identity, has = p.inProcess.Next() {
bundle := Bundle{ bundle := Bundle{
Round: id.Round(ri.ID), Round: id.Round(ri.ID),
...@@ -57,12 +58,11 @@ func (p *pickup) recheckInProgress() { ...@@ -57,12 +58,11 @@ func (p *pickup) recheckInProgress() {
Finish: func() {}, Finish: func() {},
Identity: identity, Identity: identity,
} }
select { select {
case p.messageReception <- bundle: case p.messageReception <- bundle:
default: default:
jww.WARN.Printf("failed to send bundle, channel full") jww.WARN.Printf("Failed to send bundle, channel full.")
} }
} }
} }
...@@ -36,13 +36,12 @@ type TestListener struct { ...@@ -36,13 +36,12 @@ type TestListener struct {
ch chan bool ch chan bool
} }
// the Hear function is called to exercise the listener, passing in the // Hear is called to exercise the listener, passing in the data as an item.
// data as an item
func (l TestListener) Hear(item message.Receive) { func (l TestListener) Hear(item message.Receive) {
l.ch <- true l.ch <- true
} }
// Returns a name, used for debugging // Name returns a name; used for debugging.
func (l TestListener) Name() string { func (l TestListener) Name() string {
return "TEST LISTENER FOR GARBLED MESSAGES" return "TEST LISTENER FOR GARBLED MESSAGES"
} }
...@@ -64,7 +63,6 @@ func TestManager_CheckGarbledMessages(t *testing.T) { ...@@ -64,7 +63,6 @@ func TestManager_CheckGarbledMessages(t *testing.T) {
i := internal.Internal{ i := internal.Internal{
Session: sess1, Session: sess1,
Switchboard: sw, Switchboard: sw,
Rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG),
Comms: comms, Comms: comms,
Health: nil, Health: nil,
TransmissionID: sess1.GetUser().TransmissionID, TransmissionID: sess1.GetUser().TransmissionID,
......
...@@ -37,7 +37,8 @@ type meteredCmixMessage struct { ...@@ -37,7 +37,8 @@ type meteredCmixMessage struct {
// SaveMessage saves the message as a versioned object at the specified key in // SaveMessage saves the message as a versioned object at the specified key in
// the key value store. // the key value store.
func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error { func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{},
key string) error {
msg := m.(meteredCmixMessage) msg := m.(meteredCmixMessage)
marshaled, err := json.Marshal(&msg) marshaled, err := json.Marshal(&msg)
...@@ -59,7 +60,8 @@ func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, k ...@@ -59,7 +60,8 @@ func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, k
// LoadMessage returns the message with the specified key from the key value // LoadMessage returns the message with the specified key from the key value
// store. An empty message and error are returned if the message could not be // store. An empty message and error are returned if the message could not be
// retrieved. // retrieved.
func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (
interface{}, error) {
// Load the versioned object // Load the versioned object
vo, err := kv.Get(key, currentMeteredCmixMessageVersion) vo, err := kv.Get(key, currentMeteredCmixMessageVersion)
if err != nil { if err != nil {
...@@ -69,7 +71,8 @@ func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (int ...@@ -69,7 +71,8 @@ func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (int
msg := meteredCmixMessage{} msg := meteredCmixMessage{}
err = json.Unmarshal(vo.Data, &msg) err = json.Unmarshal(vo.Data, &msg)
if err != nil { if err != nil {
return nil, errors.WithMessage(err, "Failed to unmarshal metered cmix message") return nil,
errors.WithMessage(err, "Failed to unmarshal metered cmix message")
} }
// Create message from data // Create message from data
...@@ -96,7 +99,7 @@ func (*meteredCmixMessageHandler) HashMessage(m interface{}) utility.MessageHash ...@@ -96,7 +99,7 @@ func (*meteredCmixMessageHandler) HashMessage(m interface{}) utility.MessageHash
return messageHash return messageHash
} }
// CmixMessageBuffer wraps the message buffer to store and load raw cmix // MeteredCmixMessageBuffer wraps the message buffer to store and load raw cMix
// messages. // messages.
type MeteredCmixMessageBuffer struct { type MeteredCmixMessageBuffer struct {
mb *utility.MessageBuffer mb *utility.MessageBuffer
...@@ -104,7 +107,8 @@ type MeteredCmixMessageBuffer struct { ...@@ -104,7 +107,8 @@ type MeteredCmixMessageBuffer struct {
key string key string
} }
func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (
*MeteredCmixMessageBuffer, error) {
mb, err := utility.NewMessageBuffer(kv, &meteredCmixMessageHandler{}, key) mb, err := utility.NewMessageBuffer(kv, &meteredCmixMessageHandler{}, key)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -113,7 +117,8 @@ func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMess ...@@ -113,7 +117,8 @@ func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMess
return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil
} }
func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (
*MeteredCmixMessageBuffer, error) {
mb, err := utility.LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key) mb, err := utility.LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -122,7 +127,8 @@ func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMes ...@@ -122,7 +127,8 @@ func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMes
return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil
} }
func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (
*MeteredCmixMessageBuffer, error) {
mb, err := utility.LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key) mb, err := utility.LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key)
if err != nil { if err != nil {
jww.WARN.Printf("Failed to find MeteredCmixMessageBuffer %s, making a new one", key) jww.WARN.Printf("Failed to find MeteredCmixMessageBuffer %s, making a new one", key)
...@@ -132,7 +138,8 @@ func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCm ...@@ -132,7 +138,8 @@ func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCm
return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil
} }
func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) (uint, time.Time) { func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo,
identity interfaces.Identity) (uint, time.Time) {
if m.GetPrimeByteLen() == 0 { if m.GetPrimeByteLen() == 0 {
jww.FATAL.Panicf("Cannot handle a metered " + jww.FATAL.Panicf("Cannot handle a metered " +
"cmix message with a length of 0") "cmix message with a length of 0")
...@@ -145,7 +152,8 @@ func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo, id ...@@ -145,7 +152,8 @@ func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo, id
return addedMessage.Count, addedMessage.Timestamp return addedMessage.Count, addedMessage.Timestamp
} }
func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) (uint, time.Time) { func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message,
ri *pb.RoundInfo, identity interfaces.Identity) (uint, time.Time) {
if m.GetPrimeByteLen() == 0 { if m.GetPrimeByteLen() == 0 {
jww.FATAL.Panicf("Cannot handle a metered " + jww.FATAL.Panicf("Cannot handle a metered " +
"cmix message with a length of 0") "cmix message with a length of 0")
...@@ -158,7 +166,8 @@ func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message, ri *pb.Rou ...@@ -158,7 +166,8 @@ func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message, ri *pb.Rou
return addedMessage.Count, addedMessage.Timestamp return addedMessage.Count, addedMessage.Timestamp
} }
func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, interfaces.Identity, bool) { func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo,
interfaces.Identity, bool) {
m, ok := mcmb.mb.Next() m, ok := mcmb.mb.Next()
if !ok { if !ok {
return format.Message{}, nil, interfaces.Identity{}, false return format.Message{}, nil, interfaces.Identity{}, false
...@@ -169,7 +178,8 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, int ...@@ -169,7 +178,8 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, int
// increment the count and save // increment the count and save
msg.Count++ msg.Count++
mcmh := &meteredCmixMessageHandler{} mcmh := &meteredCmixMessageHandler{}
err := mcmh.SaveMessage(mcmb.kv, msg, utility.MakeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg))) err := mcmh.SaveMessage(mcmb.kv, msg,
utility.MakeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg)))
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to save metered message after count "+ jww.FATAL.Panicf("Failed to save metered message after count "+
"update: %s", err) "update: %s", err)
...@@ -196,11 +206,13 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, int ...@@ -196,11 +206,13 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, int
return msfFormat, ri, identity, true return msfFormat, ri, identity, true
} }
func (mcmb *MeteredCmixMessageBuffer) Remove(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) { func (mcmb *MeteredCmixMessageBuffer) Remove(m format.Message, ri *pb.RoundInfo,
identity interfaces.Identity) {
mcmb.mb.Succeeded(buildMsg(m, ri, identity)) mcmb.mb.Succeeded(buildMsg(m, ri, identity))
} }
func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message, ri *pb.RoundInfo, identity interfaces.Identity) { func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message, ri *pb.RoundInfo,
identity interfaces.Identity) {
mcmb.mb.Failed(buildMsg(m, ri, identity)) mcmb.mb.Failed(buildMsg(m, ri, identity))
} }
......
...@@ -10,10 +10,13 @@ package message ...@@ -10,10 +10,13 @@ package message
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/utility"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/ekv" "gitlab.com/elixxir/ekv"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
"math/rand" "math/rand"
"testing" "testing"
...@@ -128,62 +131,37 @@ func Test_meteredCmixMessageHandler_Smoke(t *testing.T) { ...@@ -128,62 +131,37 @@ func Test_meteredCmixMessageHandler_Smoke(t *testing.T) {
mcmb, err := NewMeteredCmixMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") mcmb, err := NewMeteredCmixMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey")
if err != nil { if err != nil {
t.Errorf("NewMeteredCmixMessageBuffer() returned an error."+ t.Errorf("NewMeteredCmixMessageBuffer() returned an error."+
"\n\texpected: %v\n\trecieved: %v", nil, err) "\nexpected: %v\nrecieved: %v", nil, err)
} }
// AddFingerprint two messages // AddFingerprint two messages
mcmb.Add(testMsgs[0], &pb.RoundInfo{ID: 1, Timestamps: []uint64{0, 1, 2, 3}}, interfaces.Identity{
Source: id.NewIdFromString("user1", id.User, t)})
mcmb.Add(testMsgs[1], &pb.RoundInfo{ID: 2, Timestamps: []uint64{0, 1, 2, 3}}, interfaces.Identity{
Source: id.NewIdFromString("user2", id.User, t)})
mcmb.Add(testMsgs[0]) msg, ri, identity, exists := mcmb.Next()
mcmb.Add(testMsgs[1])
if len(mcmb.mb.messages) != 2 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
2, len(mcmb.mb.messages))
}
msg, _, _, exists := mcmb.Next()
if !exists { if !exists {
t.Error("Next() did not find any messages in buffer.") t.Error("Next() did not find any messages in buffer.")
} }
mcmb.Remove(msg) mcmb.Remove(msg, ri, identity)
if len(mcmb.mb.messages) != 1 { msg, ri, identity, exists = mcmb.Next()
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
1, len(mcmb.mb.messages))
}
msg, _, _, exists = mcmb.Next()
if !exists { if !exists {
t.Error("Next() did not find any messages in buffer.") t.Error("Next() did not find any messages in buffer.")
} }
if len(mcmb.mb.messages) != 0 { mcmb.Failed(msg, ri, identity)
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
0, len(mcmb.mb.messages))
}
mcmb.Failed(msg)
if len(mcmb.mb.messages) != 1 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
1, len(mcmb.mb.messages))
}
msg, _, _, exists = mcmb.Next() msg, ri, identity, exists = mcmb.Next()
if !exists { if !exists {
t.Error("Next() did not find any messages in buffer.") t.Error("Next() did not find any messages in buffer.")
} }
mcmb.Remove(msg) mcmb.Remove(msg, ri, identity)
msg, _, _, exists = mcmb.Next() msg, _, _, exists = mcmb.Next()
if exists { if exists {
t.Error("Next() found a message in the buffer when it should be empty.") t.Error("Next() found a message in the buffer when it should be empty.")
} }
mcmb.Remove(msg)
if len(mcmb.mb.messages) != 0 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
0, len(mcmb.mb.messages))
}
} }
// makeTestMeteredCmixMessage creates a list of messages with random data and the // makeTestMeteredCmixMessage creates a list of messages with random data and the
......
...@@ -9,7 +9,6 @@ package message ...@@ -9,7 +9,6 @@ package message
import ( import (
"encoding/base64" "encoding/base64"
"fmt"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/network/nodes" "gitlab.com/elixxir/client/network/nodes"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
...@@ -18,6 +17,7 @@ import ( ...@@ -18,6 +17,7 @@ import (
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/rateLimiting" "gitlab.com/xx_network/primitives/rateLimiting"
"strconv"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/params"
...@@ -32,7 +32,7 @@ const ( ...@@ -32,7 +32,7 @@ const (
type Pickup interface { type Pickup interface {
GetMessageReceptionChannel() chan<- Bundle GetMessageReceptionChannel() chan<- Bundle
StartProcessies() stoppable.Stoppable StartProcesses() stoppable.Stoppable
CheckInProgressMessages() CheckInProgressMessages()
// Fingerprints // Fingerprints
...@@ -80,7 +80,7 @@ func NewPickup(param params.Network, ...@@ -80,7 +80,7 @@ func NewPickup(param params.Network,
garbled, err := NewOrLoadMeteredCmixMessageBuffer(session.GetKV(), inProcessKey) garbled, err := NewOrLoadMeteredCmixMessageBuffer(session.GetKV(), inProcessKey)
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to load or new the Garbled Messages system") jww.FATAL.Panicf("Failed to load or new the Garbled Messages system: %v", err)
} }
m := pickup{ m := pickup{
...@@ -111,18 +111,19 @@ func NewPickup(param params.Network, ...@@ -111,18 +111,19 @@ func NewPickup(param params.Network,
return &m return &m
} }
//Gets the channel to send received messages on // GetMessageReceptionChannel gets the channel to send received messages on.
func (p *pickup) GetMessageReceptionChannel() chan<- Bundle { func (p *pickup) GetMessageReceptionChannel() chan<- Bundle {
return p.messageReception return p.messageReception
} }
//Starts all worker pool // StartProcesses starts all worker pool.
func (p *pickup) StartProcessies() stoppable.Stoppable { func (p *pickup) StartProcesses() stoppable.Stoppable {
multi := stoppable.NewMulti("MessageReception") multi := stoppable.NewMulti("MessageReception")
// create the message handler workers // create the message handler workers
for i := uint(0); i < p.param.MessageReceptionWorkerPoolSize; i++ { for i := uint(0); i < p.param.MessageReceptionWorkerPoolSize; i++ {
stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i)) stop := stoppable.NewSingle(
"MessageReception Worker " + strconv.Itoa(int(i)))
go p.handleMessages(stop) go p.handleMessages(stop)
multi.Add(stop) multi.Add(stop)
} }
......
...@@ -16,24 +16,24 @@ import ( ...@@ -16,24 +16,24 @@ import (
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
) )
/* Trigger - predefined hash based tags appended to all cmix messages /* Trigger - predefined hash based tags appended to all cMix messages which,
which, though trial hashing, are used to determine if a message applies though trial hashing, are used to determine if a message applies to this client.
to this client
Triggers are used for 2 purposes - can be processed by the notifications system, Triggers are used for 2 purposes - can be processed by the notification system,
or can be used to implement custom non fingerprint processing of payloads. or can be used to implement custom non fingerprint processing of payloads (i.e.
I.E. key negotiation, broadcast negotiation key negotiation and broadcast negotiation).
A tag is appended to the message of the format tag = H(H(messageContents),preimage) A tag is appended to the message of the format tag = H(H(messageContents),preimage)
and trial hashing is used to determine if a message adheres to a tag. and trial hashing is used to determine if a message adheres to a tag.
WARNING: If a preiamge is known by an adversary, they can determine which messages WARNING: If a preimage is known by an adversary, they can determine which
are for the client. messages are for the client.
Due to the extra overhead of trial hashing, triggers are processed after fingerprints. Due to the extra overhead of trial hashing, triggers are processed after
If a fingerprint match occurs on the message, triggers will not be handled. fingerprints. If a fingerprint match occurs on the message, triggers will not be
handled.
Triggers are ephemeral to the session. When starting a new client, all triggers must be Triggers are ephemeral to the session. When starting a new client, all triggers
re-added before StartNetworkFollower is called. must be re-added before StartNetworkFollower is called.
*/ */
type TriggersManager struct { type TriggersManager struct {
...@@ -52,19 +52,19 @@ func NewTriggers() *TriggersManager { ...@@ -52,19 +52,19 @@ func NewTriggers() *TriggersManager {
} }
// Lookup will see if a trigger exists for the given preimage and message // Lookup will see if a trigger exists for the given preimage and message
// contents. It will do this by trial hashing the preimages in the map with // contents. It will do this by trial hashing the preimages in the map with the
// the received message contents, until either a match to the // received message contents, until either a match to the received identity
// received identity fingerprint is received or it has exhausted the map. // fingerprint is received or it has exhausted the map.
// If a match is found, this means the message received is for the client, // If a match is found, this means the message received is for the client, and
// and that one or multiple triggers exist to process this message. // that one or multiple triggers exist to process this message.
// These triggers are returned to the caller along with the a true boolean. // These triggers are returned to the caller along with the a true boolean.
// If the map has been exhausted with no matches found, it returns nil and false. // If the map has been exhausted with no matches found, it returns nil and false.
// todo: reorganize this interface. Lookup needs to be called // TODO: reorganize this interface. Lookup needs to be called by handleMessage,
// by handleMessage, which should not have access to the other // which should not have access to the other state modifying methods below.
// state modifying methods below. Possible options include: // Possible options include:
// - privatizing the state-changing methods // - privatizing the state-changing methods
// - leaking lookup on this layer and migrating the state modifiation methods // - leaking lookup on this layer and migrating the state modification methods
// a layer down in a sepearate package // a layer down in a separate package
func (t *TriggersManager) get(clientID *id.ID, receivedIdentityFp, func (t *TriggersManager) get(clientID *id.ID, receivedIdentityFp,
ecrMsgContents []byte) ([]trigger, ecrMsgContents []byte) ([]trigger,
bool) { bool) {
...@@ -87,9 +87,9 @@ func (t *TriggersManager) get(clientID *id.ID, receivedIdentityFp, ...@@ -87,9 +87,9 @@ func (t *TriggersManager) get(clientID *id.ID, receivedIdentityFp,
return nil, false return nil, false
} }
// AddTrigger - Adds a trigger which can call a message // AddTrigger adds a trigger which can call a message handing function or be
// handing function or be used for notifications. // used for notifications. Multiple triggers can be registered for the same
// Multiple triggers can be registered for the same preimage. // preimage.
// preimage - the preimage which is triggered on // preimage - the preimage which is triggered on
// type - a descriptive string of the trigger. Generally used in notifications // type - a descriptive string of the trigger. Generally used in notifications
// source - a byte buffer of related data. Generally used in notifications. // source - a byte buffer of related data. Generally used in notifications.
...@@ -119,9 +119,9 @@ func (t *TriggersManager) AddTrigger(clientID *id.ID, newTrigger interfaces.Trig ...@@ -119,9 +119,9 @@ func (t *TriggersManager) AddTrigger(clientID *id.ID, newTrigger interfaces.Trig
} }
// DeleteTriggers - If only a single response is associated with the preimage, // DeleteTriggers - If only a single response is associated with the preimage,
// the entire preimage is removed. If there is more than one response, only // the entire preimage is removed. If there is more than one response, only the
// the given response is removed. If nil is passed in for response, // given response is removed. If nil is passed in for response, all triggers for
// all triggers for the preimage will be removed. // the preimage will be removed.
func (t *TriggersManager) DeleteTriggers(clientID *id.ID, preimage interfaces.Preimage, func (t *TriggersManager) DeleteTriggers(clientID *id.ID, preimage interfaces.Preimage,
response interfaces.MessageProcessor) error { response interfaces.MessageProcessor) error {
t.Lock() t.Lock()
...@@ -162,7 +162,7 @@ func (t *TriggersManager) DeleteTriggers(clientID *id.ID, preimage interfaces.Pr ...@@ -162,7 +162,7 @@ func (t *TriggersManager) DeleteTriggers(clientID *id.ID, preimage interfaces.Pr
return nil return nil
} }
// DeleteClientTriggers - delete the mapping associated with an ID // DeleteClientTriggers deletes the mapping associated with an ID.
func (t *TriggersManager) DeleteClientTriggers(clientID *id.ID) { func (t *TriggersManager) DeleteClientTriggers(clientID *id.ID) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
......
...@@ -24,7 +24,8 @@ func (mc *MockSendCMIXComms) GetHost(*id.ID) (*connect.Host, bool) { ...@@ -24,7 +24,8 @@ func (mc *MockSendCMIXComms) GetHost(*id.ID) (*connect.Host, bool) {
return h, true return h, true
} }
func (mc *MockSendCMIXComms) AddHost(*id.ID, string, []byte, connect.HostParams) (host *connect.Host, err error) { func (mc *MockSendCMIXComms) AddHost(
*id.ID, string, []byte, connect.HostParams) (host *connect.Host, err error) {
host, _ = mc.GetHost(nil) host, _ = mc.GetHost(nil)
return host, nil return host, nil
} }
...@@ -33,14 +34,16 @@ func (mc *MockSendCMIXComms) RemoveHost(*id.ID) { ...@@ -33,14 +34,16 @@ func (mc *MockSendCMIXComms) RemoveHost(*id.ID) {
} }
func (mc *MockSendCMIXComms) SendPutMessage(*connect.Host, *mixmessages.GatewaySlot, time.Duration) (*mixmessages.GatewaySlotResponse, error) { func (mc *MockSendCMIXComms) SendPutMessage(*connect.Host,
*mixmessages.GatewaySlot, time.Duration) (*mixmessages.GatewaySlotResponse, error) {
return &mixmessages.GatewaySlotResponse{ return &mixmessages.GatewaySlotResponse{
Accepted: true, Accepted: true,
RoundID: 3, RoundID: 3,
}, nil }, nil
} }
func (mc *MockSendCMIXComms) SendPutManyMessages(*connect.Host, *mixmessages.GatewaySlots, time.Duration) (*mixmessages.GatewaySlotResponse, error) { func (mc *MockSendCMIXComms) SendPutManyMessages(*connect.Host,
*mixmessages.GatewaySlots, time.Duration) (*mixmessages.GatewaySlotResponse, error) {
return &mixmessages.GatewaySlotResponse{ return &mixmessages.GatewaySlotResponse{
Accepted: true, Accepted: true,
RoundID: 3, RoundID: 3,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment