diff --git a/auth/confirm.go b/auth/confirm.go index 73e834d9766cbfe4b91b0156412f3941ad50ac0c..c2d6c59081876d237459e19d14725bf8252d4bc6 100644 --- a/auth/confirm.go +++ b/auth/confirm.go @@ -122,7 +122,7 @@ func ConfirmRequestAuth(partner contact.Contact, rng io.Reader, } //store the message as a critical message so it will always be sent - storage.GetCriticalRawMessages().AddProcessing(cmixMsg) + storage.GetCriticalRawMessages().AddProcessing(cmixMsg, partner.ID) /*send message*/ round, _, err := net.SendCMIX(cmixMsg, partner.ID, params.GetDefaultCMIX()) @@ -131,7 +131,7 @@ func ConfirmRequestAuth(partner contact.Contact, rng io.Reader, // retried jww.ERROR.Printf("auth confirm failed to transmit, will be "+ "handled on reconnect: %+v", err) - storage.GetCriticalRawMessages().Failed(cmixMsg) + storage.GetCriticalRawMessages().Failed(cmixMsg, partner.ID) } /*check message delivery*/ @@ -145,9 +145,9 @@ func ConfirmRequestAuth(partner contact.Contact, rng io.Reader, if !success { jww.ERROR.Printf("auth confirm failed to transmit, will be " + "handled on reconnect") - storage.GetCriticalRawMessages().Failed(cmixMsg) + storage.GetCriticalRawMessages().Failed(cmixMsg, partner.ID) } else { - storage.GetCriticalRawMessages().Succeeded(cmixMsg) + storage.GetCriticalRawMessages().Succeeded(cmixMsg, partner.ID) } return nil diff --git a/auth/request.go b/auth/request.go index d0e20a44b2f09707a4d456c99c5ced2cacd65551..369fdf8c4ecb7a289b46378249307a1a04d24d54 100644 --- a/auth/request.go +++ b/auth/request.go @@ -144,7 +144,7 @@ func RequestAuth(partner, me contact.Contact, message string, rng io.Reader, } //store the message as a critical message so it will always be sent - storage.GetCriticalRawMessages().AddProcessing(cmixMsg) + storage.GetCriticalRawMessages().AddProcessing(cmixMsg, partner.ID) //jww.INFO.Printf("CMIX MESSAGE 1: %s, %v, %v, %v", cmixMsg.GetRecipientID(), // cmixMsg.GetKeyFP(), cmixMsg.GetMac(), cmixMsg.GetContents()) @@ -158,7 +158,7 @@ func RequestAuth(partner, me contact.Contact, message string, rng io.Reader, // retried jww.ERROR.Printf("auth request failed to transmit, will be "+ "handled on reconnect: %+v", err) - storage.GetCriticalRawMessages().Failed(cmixMsg) + storage.GetCriticalRawMessages().Failed(cmixMsg, partner.ID) } /*check message delivery*/ @@ -172,9 +172,9 @@ func RequestAuth(partner, me contact.Contact, message string, rng io.Reader, if !success { jww.ERROR.Printf("auth request failed to transmit, will be " + "handled on reconnect") - storage.GetCriticalRawMessages().Failed(cmixMsg) + storage.GetCriticalRawMessages().Failed(cmixMsg, partner.ID) } else { - storage.GetCriticalRawMessages().Succeeded(cmixMsg) + storage.GetCriticalRawMessages().Succeeded(cmixMsg, partner.ID) } return nil diff --git a/network/follow.go b/network/follow.go index 050501213144ea9dcfb2387b947d21ece62e12f1..9bdfbc85969cb8bff8d60ea74848875def5c21dd 100644 --- a/network/follow.go +++ b/network/follow.go @@ -150,9 +150,6 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { continue } - jww.INFO.Printf("Round %d timestamps: %v", - update.ID, update.Timestamps) - for _, clientErr := range update.ClientErrors { // If this Client appears in the ClientError diff --git a/network/message/critical.go b/network/message/critical.go index 5036bd5219e7aa8c4c166d83dbcc12c48ba14a68..99c83d9842b48f200d6e3c2d2e930b8441ae362e 100644 --- a/network/message/critical.go +++ b/network/message/critical.go @@ -82,16 +82,16 @@ func (m *Manager) criticalMessages() { critRawMsgs := m.Session.GetCriticalRawMessages() param := params.GetDefaultCMIX() //raw critical messages - for msg, has := critRawMsgs.Next(); has; msg, has = critRawMsgs.Next() { + for msg, rid, has := critRawMsgs.Next(); has; msg, rid, has = critRawMsgs.Next() { go func(msg format.Message) { //send the message - round, _, err := m.SendCMIX(msg, m.TransmissionID, param) + round, _, err := m.SendCMIX(msg, rid, param) //if the message fail to send, notify the buffer so it can be handled //in the future and exit if err != nil { jww.ERROR.Printf("Failed to send critical message on "+ "notification of healthy network: %+v", err) - critRawMsgs.Failed(msg) + critRawMsgs.Failed(msg, rid) return } jww.INFO.Printf("critical healthy RoundIDs: %v", round) @@ -108,10 +108,10 @@ func (m *Manager) criticalMessages() { jww.ERROR.Printf("critical message send failed to transmit "+ "transmit %v/%v paritions: %v round failures, %v timeouts", numRoundFail+numTimeOut, 1, numRoundFail, numTimeOut) - critRawMsgs.Failed(msg) + critRawMsgs.Failed(msg, rid) return } - critRawMsgs.Succeeded(msg) + critRawMsgs.Succeeded(msg, rid) }(msg) } diff --git a/storage/utility/cmixMessageBuffer.go b/storage/utility/cmixMessageBuffer.go index d3c5f5ce9f98473aa1428eafbda142b3af50ee62..7611e87c2d4f6d9872ca1e1b8e45daef6067e08a 100644 --- a/storage/utility/cmixMessageBuffer.go +++ b/storage/utility/cmixMessageBuffer.go @@ -9,8 +9,12 @@ package utility import ( "crypto/md5" + "encoding/json" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/primitives/id" "time" ) @@ -18,16 +22,31 @@ const currentCmixMessageVersion = 0 type cmixMessageHandler struct{} +type storedMessage struct{ + Msg []byte + Recipient []byte +} + +func (sm storedMessage)Marshal()[]byte{ + + data, err := json.Marshal(&sm) + if err!=nil{ + jww.FATAL.Panicf("Failed to marshal stored message: %s", err) + } + + return data +} + // SaveMessage saves the message as a versioned object at the specified key // in the key value store. func (cmh *cmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error { - msg := m.(format.Message) + sm := m.(storedMessage) // Create versioned object obj := versioned.Object{ Version: currentCmixMessageVersion, Timestamp: time.Now(), - Data: msg.Marshal(), + Data: sm.Marshal(), } // Save versioned object @@ -44,8 +63,13 @@ func (cmh *cmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interf return format.Message{}, err } + sm := storedMessage{} + if err = json.Unmarshal(vo.Data, &sm); err!=nil{ + return nil, errors.Wrap(err, "Failed to unmarshal stored message") + } + // Create message from data - return format.Unmarshal(vo.Data), nil + return sm, nil } // DeleteMessage deletes the message with the specified key from the key value @@ -56,9 +80,8 @@ func (cmh *cmixMessageHandler) DeleteMessage(kv *versioned.KV, key string) error // HashMessage generates a hash of the message. func (cmh *cmixMessageHandler) HashMessage(m interface{}) MessageHash { - msg := m.(format.Message) - - return md5.Sum(msg.Marshal()) + sm := m.(storedMessage) + return md5.Sum(sm.Marshal()) } // CmixMessageBuffer wraps the message buffer to store and load raw cmix @@ -85,28 +108,50 @@ func LoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, er return &CmixMessageBuffer{mb: mb}, nil } -func (cmb *CmixMessageBuffer) Add(m format.Message) { - cmb.mb.Add(m) +func (cmb *CmixMessageBuffer) Add(msg format.Message, recipent *id.ID) { + sm := storedMessage{ + Msg: msg.Marshal(), + Recipient: recipent.Marshal(), + } + cmb.mb.Add(sm) } -func (cmb *CmixMessageBuffer) AddProcessing(m format.Message) { - cmb.mb.AddProcessing(m) +func (cmb *CmixMessageBuffer) AddProcessing(msg format.Message, recipent *id.ID) { + sm := storedMessage{ + Msg: msg.Marshal(), + Recipient: recipent.Marshal(), + } + cmb.mb.AddProcessing(sm) } -func (cmb *CmixMessageBuffer) Next() (format.Message, bool) { +func (cmb *CmixMessageBuffer) Next() (format.Message, *id.ID, bool) { m, ok := cmb.mb.Next() if !ok { - return format.Message{}, false + return format.Message{}, nil, false } - msg := m.(format.Message) - return msg, true + sm := m.(storedMessage) + msg := format.Unmarshal(sm.Msg) + recpient, err := id.Unmarshal(sm.Recipient) + if err!=nil{ + jww.FATAL.Panicf("Could nto get an id for stored cmix " + + "message buffer: %+v", err) + } + return msg, recpient, true } -func (cmb *CmixMessageBuffer) Succeeded(m format.Message) { - cmb.mb.Succeeded(m) +func (cmb *CmixMessageBuffer) Succeeded(msg format.Message, recipent *id.ID) { + sm := storedMessage{ + Msg: msg.Marshal(), + Recipient: recipent.Marshal(), + } + cmb.mb.Succeeded(sm) } -func (cmb *CmixMessageBuffer) Failed(m format.Message) { - cmb.mb.Failed(m) +func (cmb *CmixMessageBuffer) Failed(msg format.Message, recipent *id.ID) { + sm := storedMessage{ + Msg: msg.Marshal(), + Recipient: recipent.Marshal(), + } + cmb.mb.Failed(sm) } diff --git a/storage/utility/cmixMessageBuffer_test.go b/storage/utility/cmixMessageBuffer_test.go index c5f544edf249d93824a89f4d36a39c1eec066eac..149da666323d27604102c965741dcd04dc8f3cec 100644 --- a/storage/utility/cmixMessageBuffer_test.go +++ b/storage/utility/cmixMessageBuffer_test.go @@ -12,6 +12,7 @@ import ( "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/ekv" "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/primitives/id" "math/rand" "reflect" "testing" @@ -23,9 +24,13 @@ func TestCmixMessageHandler_SaveMessage(t *testing.T) { // Set up test values cmh := &cmixMessageHandler{} kv := versioned.NewKV(make(ekv.Memstore)) - testMsgs, _ := makeTestCmixMessages(10) + testMsgs, ids, _ := makeTestCmixMessages(10) - for _, msg := range testMsgs { + for i := range testMsgs { + msg := storedMessage{ + Msg: testMsgs[i].Marshal(), + Recipient: ids[i].Marshal(), + } key := makeStoredMessageKey("testKey", cmh.HashMessage(msg)) // Save message @@ -55,9 +60,13 @@ func TestCmixMessageHandler_LoadMessage(t *testing.T) { // Set up test values cmh := &cmixMessageHandler{} kv := versioned.NewKV(make(ekv.Memstore)) - testMsgs, _ := makeTestCmixMessages(10) + testMsgs, ids, _ := makeTestCmixMessages(10) - for _, msg := range testMsgs { + for i := range testMsgs { + msg := storedMessage{ + Msg: testMsgs[i].Marshal(), + Recipient: ids[i].Marshal(), + } key := makeStoredMessageKey("testKey", cmh.HashMessage(msg)) // Save message @@ -84,7 +93,7 @@ func TestCmixMessageHandler_LoadMessage(t *testing.T) { // Smoke test of cmixMessageHandler. func TestCmixMessageBuffer_Smoke(t *testing.T) { // Set up test messages - testMsgs, _ := makeTestCmixMessages(2) + testMsgs, ids, _ := makeTestCmixMessages(2) // Create new buffer cmb, err := NewCmixMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") @@ -94,26 +103,26 @@ func TestCmixMessageBuffer_Smoke(t *testing.T) { } // Add two messages - cmb.Add(testMsgs[0]) - cmb.Add(testMsgs[1]) + cmb.Add(testMsgs[0], ids[0]) + cmb.Add(testMsgs[1], ids[1]) if len(cmb.mb.messages) != 2 { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", 2, len(cmb.mb.messages)) } - msg, exists := cmb.Next() + msg, rid, exists := cmb.Next() if !exists { t.Error("Next() did not find any messages in buffer.") } - cmb.Succeeded(msg) + cmb.Succeeded(msg, rid) if len(cmb.mb.messages) != 1 { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", 1, len(cmb.mb.messages)) } - msg, exists = cmb.Next() + msg, rid, exists = cmb.Next() if !exists { t.Error("Next() did not find any messages in buffer.") } @@ -121,24 +130,23 @@ func TestCmixMessageBuffer_Smoke(t *testing.T) { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", 0, len(cmb.mb.messages)) } - cmb.Failed(msg) + cmb.Failed(msg, rid) if len(cmb.mb.messages) != 1 { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", 1, len(cmb.mb.messages)) } - msg, exists = cmb.Next() + msg, rid, exists = cmb.Next() if !exists { t.Error("Next() did not find any messages in buffer.") } - cmb.Succeeded(msg) + cmb.Succeeded(msg, rid) - msg, exists = cmb.Next() + msg, rid, exists = cmb.Next() if exists { t.Error("Next() found a message in the buffer when it should be empty.") } - cmb.Succeeded(msg) if len(cmb.mb.messages) != 0 { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", @@ -149,11 +157,12 @@ func TestCmixMessageBuffer_Smoke(t *testing.T) { // makeTestCmixMessages creates a list of messages with random data and the // expected map after they are added to the buffer. -func makeTestCmixMessages(n int) ([]format.Message, map[MessageHash]struct{}) { +func makeTestCmixMessages(n int) ([]format.Message, []*id.ID, map[MessageHash]struct{}) { cmh := &cmixMessageHandler{} prng := rand.New(rand.NewSource(time.Now().UnixNano())) mh := map[MessageHash]struct{}{} msgs := make([]format.Message, n) + ids := make([]*id.ID, n) for i := range msgs { msgs[i] = format.NewMessage(128) payload := make([]byte, 128) @@ -161,8 +170,17 @@ func makeTestCmixMessages(n int) ([]format.Message, map[MessageHash]struct{}) { msgs[i].SetPayloadA(payload) prng.Read(payload) msgs[i].SetPayloadB(payload) - mh[cmh.HashMessage(msgs[i])] = struct{}{} + + rid := id.ID{} + prng.Read(rid[:32]) + rid[32] = byte(id.User) + ids[i] = &rid + sm := storedMessage{ + Msg: msgs[i].Marshal(), + Recipient: ids[i].Marshal(), + } + mh[cmh.HashMessage(sm)] = struct{}{} } - return msgs, mh + return msgs, ids, mh } diff --git a/storage/utility/e2eMessageBuffer.go b/storage/utility/e2eMessageBuffer.go index 045175c8fd752ef790f4ded0c98d26f1107fdf2b..975f2f21af3d15fb1936759971ea0926a68c8d77 100644 --- a/storage/utility/e2eMessageBuffer.go +++ b/storage/utility/e2eMessageBuffer.go @@ -148,7 +148,7 @@ func (emb *E2eMessageBuffer) Next() (message.Send, params.E2E, bool) { msg := m.(e2eMessage) recipient, err := id.Unmarshal(msg.Recipient) if err != nil { - jww.FATAL.Panicf("Error unmarshaling recipient: %v", err) + jww.FATAL.Panicf("Error unmarshaling Recipient: %v", err) } return message.Send{recipient, msg.Payload, message.Type(msg.MessageType)}, msg.Params, true