diff --git a/e2e/e2eMessageBuffer.go b/e2e/e2eMessageBuffer.go index ae96dac028a05b475bdde1dcd6da7e6bcf693206..4727ff19afb75b48dc75095c8e5ce906eab7336c 100644 --- a/e2e/e2eMessageBuffer.go +++ b/e2e/e2eMessageBuffer.go @@ -102,14 +102,13 @@ type E2eMessageBuffer struct { func NewOrLoadE2eMessageBuffer(kv *versioned.KV, key string) (*E2eMessageBuffer, error) { mb, err := LoadE2eMessageBuffer(kv, key) - if err != nil { + if err == nil { return mb, nil } mbInt, err := utility.NewMessageBuffer(kv, &e2eMessageHandler{}, key) if err != nil { return nil, err } - return &E2eMessageBuffer{mb: mbInt}, nil } diff --git a/e2e/e2eMessageBuffer_test.go b/e2e/e2eMessageBuffer_test.go index ea039dd3627e1092b73deb588897ca6e17fa853e..fb3e1d2f71b3c7b56202df09cc2b786609373d68 100644 --- a/e2e/e2eMessageBuffer_test.go +++ b/e2e/e2eMessageBuffer_test.go @@ -9,15 +9,12 @@ package e2e import ( "encoding/json" - "fmt" - "gitlab.com/elixxir/client/interfaces/message" - "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/catalog" + "gitlab.com/elixxir/client/network" "gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/ekv" "gitlab.com/xx_network/primitives/id" - "gitlab.com/xx_network/primitives/netTime" - "math/rand" "reflect" "testing" ) @@ -27,7 +24,7 @@ func TestE2EMessageHandler_SaveMessage(t *testing.T) { // Set up test values emg := &e2eMessageHandler{} kv := versioned.NewKV(make(ekv.Memstore)) - testMsgs, _ := makeTestE2EMessages(10, t) + testMsgs := makeTestE2EMessages(10, t) for _, msg := range testMsgs { key := utility.MakeStoredMessageKey("testKey", emg.HashMessage(msg)) @@ -46,14 +43,15 @@ func TestE2EMessageHandler_SaveMessage(t *testing.T) { } // Test if message retrieved matches expected - testMsg := &e2eMessage{} - if err := json.Unmarshal(obj.Data, testMsg); err != nil { + testMsg := e2eMessage{} + if err := json.Unmarshal(obj.Data, &testMsg); err != nil { t.Errorf("Failed to unmarshal message: %v", err) } - if !reflect.DeepEqual(msg, *testMsg) { + + if !e2eMessagesEqual(msg, testMsg, t) { t.Errorf("SaveMessage() returned versioned object with incorrect data."+ "\n\texpected: %v\n\treceived: %v", - msg, *testMsg) + msg, testMsg) } } } @@ -63,7 +61,7 @@ func TestE2EMessageHandler_LoadMessage(t *testing.T) { // Set up test values cmh := &e2eMessageHandler{} kv := versioned.NewKV(make(ekv.Memstore)) - testMsgs, _ := makeTestE2EMessages(10, t) + testMsgs := makeTestE2EMessages(10, t) for _, msg := range testMsgs { key := utility.MakeStoredMessageKey("testKey", cmh.HashMessage(msg)) @@ -74,14 +72,19 @@ func TestE2EMessageHandler_LoadMessage(t *testing.T) { } // Try to load message - testMsg, err := cmh.LoadMessage(kv, key) + face, err := cmh.LoadMessage(kv, key) if err != nil { t.Errorf("LoadMessage() returned an error."+ "\n\texpected: %v\n\trecieved: %v", nil, err) } + testMsg, ok := face.(e2eMessage) + if !ok { + t.Fatalf("Unexpected message type from LoadMessage") + } + // Test if message loaded matches expected - if !reflect.DeepEqual(msg, testMsg) { + if !e2eMessagesEqual(msg, testMsg, t) { t.Errorf("LoadMessage() returned an unexpected object."+ "\n\texpected: %v\n\treceived: %v", msg, testMsg) @@ -92,108 +95,99 @@ func TestE2EMessageHandler_LoadMessage(t *testing.T) { // Smoke test of e2eMessageHandler. func TestE2EMessageHandler_Smoke(t *testing.T) { // Set up test messages - _, testMsgs := makeTestE2EMessages(2, t) - + testMsgs := makeTestE2EMessages(2, t) + kv := versioned.NewKV(make(ekv.Memstore)) // Create new buffer - cmb, err := NewE2eMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") + cmb, err := NewOrLoadE2eMessageBuffer(kv, "testKey") if err != nil { t.Errorf("NewE2eMessageBuffer() returned an error."+ "\n\texpected: %v\n\trecieved: %v", nil, err) } + // Parse message 0 + msg0 := testMsgs[0] + recipient0, err := id.Unmarshal(msg0.Recipient) + if err != nil { + t.Fatalf("bad data in test message: %v", err) + } + + // Parse message 1 + msg1 := testMsgs[1] + recipient1, err := id.Unmarshal(msg1.Recipient) + if err != nil { + t.Fatalf("bad data in test message: %v", err) + } // Add two messages - cmb.Add(testMsgs[0], params.E2E{}) - cmb.Add(testMsgs[1], params.E2E{}) + cmb.Add(catalog.MessageType(msg0.MessageType), recipient0, + msg0.Payload, msg0.Params) + cmb.Add(catalog.MessageType(msg1.MessageType), recipient1, + msg1.Payload, msg1.Params) - if len(cmb.mb.messages) != 2 { + if len(cmb.mb.GetMessages()) != 2 { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 2, len(cmb.mb.messages)) + 2, len(cmb.mb.GetMessages())) } - msg, _, exists := cmb.Next() + msgType, recipient, payload, _, exists := cmb.Next() if !exists { t.Error("Next() did not find any messages in buffer.") } - cmb.Succeeded(msg, params.E2E{}) + cmb.Succeeded(msgType, recipient, payload) - if len(cmb.mb.messages) != 1 { + if len(cmb.mb.GetMessages()) != 1 { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 1, len(cmb.mb.messages)) + 1, cmb.mb.GetMessages()) } - msg, _, exists = cmb.Next() + msgType, recipient, payload, _, exists = cmb.Next() if !exists { t.Error("Next() did not find any messages in buffer.") } - if len(cmb.mb.messages) != 0 { + if len(cmb.mb.GetMessages()) != 0 { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 0, len(cmb.mb.messages)) + 0, len(cmb.mb.GetMessages())) } - cmb.Failed(msg, params.E2E{}) + cmb.Failed(msgType, recipient, payload) - if len(cmb.mb.messages) != 1 { + if len(cmb.mb.GetMessages()) != 1 { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 1, len(cmb.mb.messages)) + 1, len(cmb.mb.GetMessages())) } - msg, _, exists = cmb.Next() + msgType, recipient, payload, _, exists = cmb.Next() if !exists { t.Error("Next() did not find any messages in buffer.") } - cmb.Succeeded(msg, params.E2E{}) + cmb.Succeeded(msgType, recipient, payload) - msg, _, exists = cmb.Next() + msgType, recipient, payload, _, exists = cmb.Next() if exists { t.Error("Next() found a message in the buffer when it should be empty.") } - if len(cmb.mb.messages) != 0 { + if len(cmb.mb.GetMessages()) != 0 { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 0, len(cmb.mb.messages)) + 0, len(cmb.mb.GetMessages())) } } -// makeTestE2EMessages creates a list of messages with random data and the -// expected map after they are added to the buffer. -func makeTestE2EMessages(n int, t *testing.T) ([]e2eMessage, []message.Send) { - prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) - msgs := make([]e2eMessage, n) - send := make([]message.Send, n) - for i := range msgs { - rngBytes := make([]byte, 128) - prng.Read(rngBytes) - msgs[i].Recipient = rngBytes - prng.Read(rngBytes) - msgs[i].Payload = rngBytes - prng.Read(rngBytes) - msgs[i].MessageType = uint32(rngBytes[0]) - - send[i].Recipient = id.NewIdFromString(string(msgs[i].Recipient), id.User, t) - send[i].Payload = msgs[i].Payload - send[i].MessageType = message.Type(msgs[i].MessageType) - } - - return msgs, send -} - func TestE2EParamMarshalUnmarshal(t *testing.T) { msg := &e2eMessage{ Recipient: id.DummyUser[:], Payload: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}, MessageType: 42, - Params: params.E2E{ - Type: 1, - RetryCount: 7, - CMIX: params.CMIX{ - RoundTries: 6, - Timeout: 99, - RetryDelay: -4, + Params: Params{ + CMIX: network.CMIXParams{ + RoundTries: 6, + Timeout: 99, + RetryDelay: -4, + BlacklistedNodes: map[id.ID]bool{}, }, }, } - fmt.Printf("msg1: %#v\n", msg) + t.Logf("msg1: %#v\n", msg) b, err := json.Marshal(&msg) @@ -201,7 +195,7 @@ func TestE2EParamMarshalUnmarshal(t *testing.T) { t.Errorf("Failed to Marshal E2eMessage") } - fmt.Printf("json: %s\n", string(b)) + t.Logf("json: %s\n", string(b)) msg2 := &e2eMessage{} @@ -211,7 +205,7 @@ func TestE2EParamMarshalUnmarshal(t *testing.T) { t.Errorf("Failed to Unmarshal E2eMessage") } - fmt.Printf("msg2: %#v\n", msg2) + t.Logf("msg2: %#v\n", msg2) if !reflect.DeepEqual(msg, msg2) { t.Errorf("Unmarshaled message is not the same") diff --git a/e2e/interface.go b/e2e/interface.go index 84e10dcfd510227cf67adbf9a56de4930012596d..c2d00e3e24b9da5eb253f02198b1c5ff21ca7414 100644 --- a/e2e/interface.go +++ b/e2e/interface.go @@ -98,7 +98,7 @@ type Handler interface { // then pass them in, otherwise, leave myID and myPrivateKey nil // If temporary is true, an alternate ram kv will be used for storage and // the relationship will not survive a reset - AddPartner(myID *id.ID, myPrivateKey *cyclic.Int, partnerID *id.ID, + AddPartner(myID *id.ID, partnerID *id.ID, partnerPubKey, myPrivKey *cyclic.Int, partnerSIDHPubKey *sidh.PublicKey, mySIDHPrivKey *sidh.PrivateKey, sendParams, receiveParams session.Params, temporary bool) (*partner.Manager, error) diff --git a/e2e/ratchet/ratchet.go b/e2e/ratchet/ratchet.go index 5bdf827264e7ab86fa55a399f3203d144b9bc0fe..0c1bb6591fe5e2c69f7b2c90f46152052f103a96 100644 --- a/e2e/ratchet/ratchet.go +++ b/e2e/ratchet/ratchet.go @@ -106,8 +106,8 @@ func (r *Ratchet) AddPartner(myID *id.ID, partnerID *id.ID, myID = r.defaultID } - jww.INFO.Printf("Adding Partner %r:\n\tMy Private Key: %r"+ - "\n\tPartner Public Key: %r to %s", + jww.INFO.Printf("Adding Partner %s:\n\tMy Private Key: %s"+ + "\n\tPartner Public Key: %s to %s", partnerID, myPrivKey.TextVerbose(16, 0), partnerPubKey.TextVerbose(16, 0), myID) @@ -130,7 +130,7 @@ func (r *Ratchet) AddPartner(myID *id.ID, partnerID *id.ID, r.managers[mid] = m if err := r.save(); err != nil { - jww.FATAL.Printf("Failed to add Partner %r: Save of store failed: %r", + jww.FATAL.Printf("Failed to add Partner %s: Save of store failed: %s", partnerID, err) } @@ -171,7 +171,7 @@ func (r *Ratchet) DeletePartner(partnerId *id.ID, myID *id.ID) error { } if err := partner.ClearManager(m, r.kv); err != nil { - return errors.WithMessagef(err, "Could not remove partner %r from store", partnerId) + return errors.WithMessagef(err, "Could not remove partner %s from store", partnerId) } //delete services diff --git a/e2e/ratchet/ratchet_test.go b/e2e/ratchet/ratchet_test.go index 0cea9c65508d8a24380320bdc3955b2a20081dc2..1b87c88acda50c3725660bfbb0270aa9a5d0f827 100644 --- a/e2e/ratchet/ratchet_test.go +++ b/e2e/ratchet/ratchet_test.go @@ -99,8 +99,10 @@ func TestStore_AddPartner(t *testing.T) { r.defaultDHPrivateKey, partnerPubKey, myPrivSIDHKey, pubSIDHKey, p, p, r.cyHandler, r.grp, r.rng) - receivedManager, err := r.AddPartner(r.defaultID, partnerID, r.defaultDHPrivateKey, - partnerPubKey, pubSIDHKey, myPrivSIDHKey, p, p, true) + receivedManager, err := r.AddPartner( + r.defaultID, partnerID, + partnerPubKey, r.defaultDHPrivateKey, + pubSIDHKey, myPrivSIDHKey, p, p, true) if err != nil { t.Fatalf("AddPartner returned an error: %v", err) } diff --git a/e2e/receive/switchboard_test.go b/e2e/receive/switchboard_test.go index 22fa9830e33027c6d2c3457b823f6894035a7480..aaee91e2d9d5090175334b503a0ca3a54f929cfe 100644 --- a/e2e/receive/switchboard_test.go +++ b/e2e/receive/switchboard_test.go @@ -8,7 +8,7 @@ package receive import ( - "gitlab.com/elixxir/client/interfaces/message" + "gitlab.com/elixxir/client/catalog" "gitlab.com/xx_network/primitives/id" "strings" "testing" @@ -66,7 +66,7 @@ func TestSwitchboard_RegisterListener(t *testing.T) { uid := id.NewIdFromUInt(42, id.User, t) - mt := message.Type(69) + mt := catalog.MessageType(2) lid := sw.RegisterListener(uid, mt, l) @@ -107,7 +107,7 @@ func TestSwitchboard_RegisterFunc_Error_NilUserID(t *testing.T) { }() sw := New() - sw.RegisterFunc("test", nil, 0, func(receive message.Receive) {}) + sw.RegisterFunc("test", nil, 0, func(item Message) {}) t.Errorf("A nil user ID should have caused an error") } @@ -133,11 +133,11 @@ func TestSwitchboard_RegisterFunc(t *testing.T) { heard := false - l := func(receive message.Receive) { heard = true } + l := func(item Message) { heard = true } uid := id.NewIdFromUInt(42, id.User, t) - mt := message.Type(69) + mt := catalog.MessageType(1) lid := sw.RegisterFunc("test", uid, mt, l) @@ -162,7 +162,7 @@ func TestSwitchboard_RegisterFunc(t *testing.T) { t.Errorf("Listener is not registered by Message Tag") } - lid.listener.Hear(message.Receive{}) + lid.listener.Hear(Message{}) if !heard { t.Errorf("Func listener not registered correctly") } @@ -178,7 +178,7 @@ func TestSwitchboard_RegisterChan_Error_NilUser(t *testing.T) { }() sw := New() sw.RegisterChannel("test", nil, 0, - make(chan message.Receive)) + make(chan Message)) t.Errorf("A nil userID should have caused an error") } @@ -201,11 +201,11 @@ func TestSwitchboard_RegisterChan_Error_NilChan(t *testing.T) { func TestSwitchboard_RegisterChan(t *testing.T) { sw := New() - ch := make(chan message.Receive, 1) + ch := make(chan Message, 1) uid := id.NewIdFromUInt(42, id.User, t) - mt := message.Type(69) + mt := catalog.MessageType(1) lid := sw.RegisterChannel("test", uid, mt, ch) @@ -231,7 +231,7 @@ func TestSwitchboard_RegisterChan(t *testing.T) { t.Errorf("Listener is not registered by Message Tag") } - lid.listener.Hear(message.Receive{}) + lid.listener.Hear(Message{}) select { case <-ch: case <-time.After(5 * time.Millisecond): @@ -243,15 +243,15 @@ func TestSwitchboard_RegisterChan(t *testing.T) { func TestSwitchboard_Speak(t *testing.T) { uids := []*id.ID{{}, AnyUser(), id.NewIdFromUInt(42, id.User, t), id.NewIdFromUInt(69, id.User, t)} - mts := []message.Type{AnyType, 42, 69} + mts := []catalog.MessageType{AnyType, catalog.NoType, catalog.XxMessage} for _, uidReg := range uids { for _, mtReg := range mts { //create the registrations sw := New() - ch1 := make(chan message.Receive, 1) - ch2 := make(chan message.Receive, 1) + ch1 := make(chan Message, 1) + ch2 := make(chan Message, 1) sw.RegisterChannel("test", uidReg, mtReg, ch1) sw.RegisterChannel("test", uidReg, mtReg, ch2) @@ -263,7 +263,7 @@ func TestSwitchboard_Speak(t *testing.T) { continue } - m := message.Receive{ + m := Message{ Payload: []byte{0, 1, 2, 3}, Sender: uid, MessageType: mt, @@ -315,9 +315,9 @@ func TestSwitchboard_Unregister(t *testing.T) { sw := New() uid := id.NewIdFromUInt(42, id.User, t) - mt := message.Type(69) + mt := catalog.MessageType(1) - l := func(receive message.Receive) {} + l := func(receive Message) {} lid1 := sw.RegisterFunc("a", uid, mt, l) diff --git a/e2e/rekey/exchange_test.go b/e2e/rekey/exchange_test.go index a59e5a0ae3835341f7fa4469a7a296feefe72283..c278f5c47817b3262154e2e8ad3f2e9f7a39a62d 100644 --- a/e2e/rekey/exchange_test.go +++ b/e2e/rekey/exchange_test.go @@ -152,13 +152,9 @@ func TestFullExchange(t *testing.T) { // Check that the Alice's session for Bob is in the proper status newSession := receivedManager.GetReceiveSession(newSessionID) - fmt.Printf("newSession: %v\n", newSession) if newSession == nil || newSession.NegotiationStatus() != session.Confirmed { t.Errorf("Session not in confirmed status!"+ "\n\tExpected: Confirmed"+ "\n\tReceived: %s", confirmedSession.NegotiationStatus()) } - - fmt.Printf("after status: %v\n", confirmedSession.NegotiationStatus()) - } diff --git a/e2e/sendE2E.go b/e2e/sendE2E.go index c1b1229ce84f1eef5fb7df7aa865602290047d2e..144dd3c8c86b01a641596af034e91f1ac8e5b2b8 100644 --- a/e2e/sendE2E.go +++ b/e2e/sendE2E.go @@ -64,7 +64,7 @@ func (m *manager) sendE2E(mt catalog.MessageType, recipient *id.ID, errCh := make(chan error, len(partitions)) // get the key manager for the partner - partner, err := m.Ratchet.GetPartner(recipient) + partner, err := m.Ratchet.GetPartner(recipient, m.myDefaultID) if err != nil { return nil, e2e.MessageID{}, time.Time{}, errors.WithMessagef(err, "Could not send End to End encrypted "+ @@ -112,9 +112,9 @@ func (m *manager) sendE2E(mt catalog.MessageType, recipient *id.ID, //end to end encrypt the cmix message contentsEnc, mac := key.Encrypt(p) - jww.INFO.Printf("E2E sending %d/%d to %s with key fp: %s, msgID: %s", - i+i, len(partitions), recipient, format.DigestContents(p), - key.Fingerprint(), msgID) + jww.INFO.Printf("E2E sending %d/%d to %s with key fp: %s, msgID: %s (msgDigest %s)", + i+i, len(partitions), recipient, + key.Fingerprint(), msgID, format.DigestContents(p)) //set up the service tags var s message.Service diff --git a/e2e/utils_test.go b/e2e/utils_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2fd912bdec242d943a1d9c28fa8d8d31f0921780 --- /dev/null +++ b/e2e/utils_test.go @@ -0,0 +1,48 @@ +package e2e + +import ( + "bytes" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" + "math/rand" + "testing" +) + +func e2eMessagesEqual(received, expected e2eMessage, t *testing.T) bool { + equals := true + if !bytes.Equal(received.Recipient, expected.Recipient) { + t.Errorf("Receipient values for messages are not equivalent") + equals = false + } + + if !bytes.Equal(received.Payload, expected.Payload) { + equals = false + t.Errorf("Payload values for messages are not equivalent") + } + + if received.MessageType != expected.MessageType { + equals = false + t.Errorf("MessageType values for messages are not equivalent") + } + + return equals + +} + +// makeTestE2EMessages creates a list of messages with random data and the +// expected map after they are added to the buffer. +func makeTestE2EMessages(n int, t *testing.T) []e2eMessage { + prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) + msgs := make([]e2eMessage, n) + for i := range msgs { + rngBytes := make([]byte, 128) + prng.Read(rngBytes) + msgs[i].Recipient = id.NewIdFromBytes(rngBytes, t).Bytes() + prng.Read(rngBytes) + msgs[i].Payload = rngBytes + prng.Read(rngBytes) + msgs[i].MessageType = uint32(rngBytes[0]) + } + + return msgs +} diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go index 6b8fcd10b8ee3ac24f7813c7eadd207cc1642f51..82c8b898e14c9e9b225a5d74141ec6e61925c8bd 100644 --- a/storage/utility/messageBuffer.go +++ b/storage/utility/messageBuffer.go @@ -106,6 +106,22 @@ func LoadMessageBuffer(kv *versioned.KV, handler MessageHandler, return mb, err } +// GetMessages is a getter function which retrieves the +// MessageBuffer.messages map. +func (mb *MessageBuffer) GetMessages() map[MessageHash]struct{} { + mb.mux.RLock() + defer mb.mux.RUnlock() + return mb.messages +} + +// GetProcessingMessages is a getter function which retrieves the +// MessageBuffer.processingMessages map. +func (mb *MessageBuffer) GetProcessingMessages() map[MessageHash]struct{} { + mb.mux.RLock() + defer mb.mux.RUnlock() + return mb.processingMessages +} + // save saves the buffer as a versioned object. All messages, regardless if they // are in the "not processed" or "processing" state are stored together and // considered "not processed".