Skip to content
Snippets Groups Projects
Commit 234701e3 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

made critical messages store the recipient ID

parent 93fedd7c
No related branches found
No related tags found
No related merge requests found
......@@ -122,7 +122,7 @@ func ConfirmRequestAuth(partner contact.Contact, rng io.Reader,
}
//store the message as a critical message so it will always be sent
storage.GetCriticalRawMessages().AddProcessing(cmixMsg)
storage.GetCriticalRawMessages().AddProcessing(cmixMsg, partner.ID)
/*send message*/
round, _, err := net.SendCMIX(cmixMsg, partner.ID, params.GetDefaultCMIX())
......@@ -131,7 +131,7 @@ func ConfirmRequestAuth(partner contact.Contact, rng io.Reader,
// retried
jww.ERROR.Printf("auth confirm failed to transmit, will be "+
"handled on reconnect: %+v", err)
storage.GetCriticalRawMessages().Failed(cmixMsg)
storage.GetCriticalRawMessages().Failed(cmixMsg, partner.ID)
}
/*check message delivery*/
......@@ -145,9 +145,9 @@ func ConfirmRequestAuth(partner contact.Contact, rng io.Reader,
if !success {
jww.ERROR.Printf("auth confirm failed to transmit, will be " +
"handled on reconnect")
storage.GetCriticalRawMessages().Failed(cmixMsg)
storage.GetCriticalRawMessages().Failed(cmixMsg, partner.ID)
} else {
storage.GetCriticalRawMessages().Succeeded(cmixMsg)
storage.GetCriticalRawMessages().Succeeded(cmixMsg, partner.ID)
}
return nil
......
......@@ -144,7 +144,7 @@ func RequestAuth(partner, me contact.Contact, message string, rng io.Reader,
}
//store the message as a critical message so it will always be sent
storage.GetCriticalRawMessages().AddProcessing(cmixMsg)
storage.GetCriticalRawMessages().AddProcessing(cmixMsg, partner.ID)
//jww.INFO.Printf("CMIX MESSAGE 1: %s, %v, %v, %v", cmixMsg.GetRecipientID(),
// cmixMsg.GetKeyFP(), cmixMsg.GetMac(), cmixMsg.GetContents())
......@@ -158,7 +158,7 @@ func RequestAuth(partner, me contact.Contact, message string, rng io.Reader,
// retried
jww.ERROR.Printf("auth request failed to transmit, will be "+
"handled on reconnect: %+v", err)
storage.GetCriticalRawMessages().Failed(cmixMsg)
storage.GetCriticalRawMessages().Failed(cmixMsg, partner.ID)
}
/*check message delivery*/
......@@ -172,9 +172,9 @@ func RequestAuth(partner, me contact.Contact, message string, rng io.Reader,
if !success {
jww.ERROR.Printf("auth request failed to transmit, will be " +
"handled on reconnect")
storage.GetCriticalRawMessages().Failed(cmixMsg)
storage.GetCriticalRawMessages().Failed(cmixMsg, partner.ID)
} else {
storage.GetCriticalRawMessages().Succeeded(cmixMsg)
storage.GetCriticalRawMessages().Succeeded(cmixMsg, partner.ID)
}
return nil
......
......@@ -150,9 +150,6 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) {
continue
}
jww.INFO.Printf("Round %d timestamps: %v",
update.ID, update.Timestamps)
for _, clientErr := range update.ClientErrors {
// If this Client appears in the ClientError
......
......@@ -82,16 +82,16 @@ func (m *Manager) criticalMessages() {
critRawMsgs := m.Session.GetCriticalRawMessages()
param := params.GetDefaultCMIX()
//raw critical messages
for msg, has := critRawMsgs.Next(); has; msg, has = critRawMsgs.Next() {
for msg, rid, has := critRawMsgs.Next(); has; msg, rid, has = critRawMsgs.Next() {
go func(msg format.Message) {
//send the message
round, _, err := m.SendCMIX(msg, m.TransmissionID, param)
round, _, err := m.SendCMIX(msg, rid, param)
//if the message fail to send, notify the buffer so it can be handled
//in the future and exit
if err != nil {
jww.ERROR.Printf("Failed to send critical message on "+
"notification of healthy network: %+v", err)
critRawMsgs.Failed(msg)
critRawMsgs.Failed(msg, rid)
return
}
jww.INFO.Printf("critical healthy RoundIDs: %v", round)
......@@ -108,10 +108,10 @@ func (m *Manager) criticalMessages() {
jww.ERROR.Printf("critical message send failed to transmit "+
"transmit %v/%v paritions: %v round failures, %v timeouts",
numRoundFail+numTimeOut, 1, numRoundFail, numTimeOut)
critRawMsgs.Failed(msg)
critRawMsgs.Failed(msg, rid)
return
}
critRawMsgs.Succeeded(msg)
critRawMsgs.Succeeded(msg, rid)
}(msg)
}
......
......@@ -9,8 +9,12 @@ package utility
import (
"crypto/md5"
"encoding/json"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
"time"
)
......@@ -18,16 +22,31 @@ const currentCmixMessageVersion = 0
type cmixMessageHandler struct{}
type storedMessage struct{
Msg []byte
Recipient []byte
}
func (sm storedMessage)Marshal()[]byte{
data, err := json.Marshal(&sm)
if err!=nil{
jww.FATAL.Panicf("Failed to marshal stored message: %s", err)
}
return data
}
// SaveMessage saves the message as a versioned object at the specified key
// in the key value store.
func (cmh *cmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error {
msg := m.(format.Message)
sm := m.(storedMessage)
// Create versioned object
obj := versioned.Object{
Version: currentCmixMessageVersion,
Timestamp: time.Now(),
Data: msg.Marshal(),
Data: sm.Marshal(),
}
// Save versioned object
......@@ -44,8 +63,13 @@ func (cmh *cmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interf
return format.Message{}, err
}
sm := storedMessage{}
if err = json.Unmarshal(vo.Data, &sm); err!=nil{
return nil, errors.Wrap(err, "Failed to unmarshal stored message")
}
// Create message from data
return format.Unmarshal(vo.Data), nil
return sm, nil
}
// DeleteMessage deletes the message with the specified key from the key value
......@@ -56,9 +80,8 @@ func (cmh *cmixMessageHandler) DeleteMessage(kv *versioned.KV, key string) error
// HashMessage generates a hash of the message.
func (cmh *cmixMessageHandler) HashMessage(m interface{}) MessageHash {
msg := m.(format.Message)
return md5.Sum(msg.Marshal())
sm := m.(storedMessage)
return md5.Sum(sm.Marshal())
}
// CmixMessageBuffer wraps the message buffer to store and load raw cmix
......@@ -85,28 +108,50 @@ func LoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, er
return &CmixMessageBuffer{mb: mb}, nil
}
func (cmb *CmixMessageBuffer) Add(m format.Message) {
cmb.mb.Add(m)
func (cmb *CmixMessageBuffer) Add(msg format.Message, recipent *id.ID) {
sm := storedMessage{
Msg: msg.Marshal(),
Recipient: recipent.Marshal(),
}
cmb.mb.Add(sm)
}
func (cmb *CmixMessageBuffer) AddProcessing(m format.Message) {
cmb.mb.AddProcessing(m)
func (cmb *CmixMessageBuffer) AddProcessing(msg format.Message, recipent *id.ID) {
sm := storedMessage{
Msg: msg.Marshal(),
Recipient: recipent.Marshal(),
}
cmb.mb.AddProcessing(sm)
}
func (cmb *CmixMessageBuffer) Next() (format.Message, bool) {
func (cmb *CmixMessageBuffer) Next() (format.Message, *id.ID, bool) {
m, ok := cmb.mb.Next()
if !ok {
return format.Message{}, false
return format.Message{}, nil, false
}
msg := m.(format.Message)
return msg, true
sm := m.(storedMessage)
msg := format.Unmarshal(sm.Msg)
recpient, err := id.Unmarshal(sm.Recipient)
if err!=nil{
jww.FATAL.Panicf("Could nto get an id for stored cmix " +
"message buffer: %+v", err)
}
return msg, recpient, true
}
func (cmb *CmixMessageBuffer) Succeeded(m format.Message) {
cmb.mb.Succeeded(m)
func (cmb *CmixMessageBuffer) Succeeded(msg format.Message, recipent *id.ID) {
sm := storedMessage{
Msg: msg.Marshal(),
Recipient: recipent.Marshal(),
}
cmb.mb.Succeeded(sm)
}
func (cmb *CmixMessageBuffer) Failed(m format.Message) {
cmb.mb.Failed(m)
func (cmb *CmixMessageBuffer) Failed(msg format.Message, recipent *id.ID) {
sm := storedMessage{
Msg: msg.Marshal(),
Recipient: recipent.Marshal(),
}
cmb.mb.Failed(sm)
}
......@@ -12,6 +12,7 @@ import (
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/ekv"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
"math/rand"
"reflect"
"testing"
......@@ -23,9 +24,13 @@ func TestCmixMessageHandler_SaveMessage(t *testing.T) {
// Set up test values
cmh := &cmixMessageHandler{}
kv := versioned.NewKV(make(ekv.Memstore))
testMsgs, _ := makeTestCmixMessages(10)
testMsgs, ids, _ := makeTestCmixMessages(10)
for _, msg := range testMsgs {
for i := range testMsgs {
msg := storedMessage{
Msg: testMsgs[i].Marshal(),
Recipient: ids[i].Marshal(),
}
key := makeStoredMessageKey("testKey", cmh.HashMessage(msg))
// Save message
......@@ -55,9 +60,13 @@ func TestCmixMessageHandler_LoadMessage(t *testing.T) {
// Set up test values
cmh := &cmixMessageHandler{}
kv := versioned.NewKV(make(ekv.Memstore))
testMsgs, _ := makeTestCmixMessages(10)
testMsgs, ids, _ := makeTestCmixMessages(10)
for _, msg := range testMsgs {
for i := range testMsgs {
msg := storedMessage{
Msg: testMsgs[i].Marshal(),
Recipient: ids[i].Marshal(),
}
key := makeStoredMessageKey("testKey", cmh.HashMessage(msg))
// Save message
......@@ -84,7 +93,7 @@ func TestCmixMessageHandler_LoadMessage(t *testing.T) {
// Smoke test of cmixMessageHandler.
func TestCmixMessageBuffer_Smoke(t *testing.T) {
// Set up test messages
testMsgs, _ := makeTestCmixMessages(2)
testMsgs, ids, _ := makeTestCmixMessages(2)
// Create new buffer
cmb, err := NewCmixMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey")
......@@ -94,26 +103,26 @@ func TestCmixMessageBuffer_Smoke(t *testing.T) {
}
// Add two messages
cmb.Add(testMsgs[0])
cmb.Add(testMsgs[1])
cmb.Add(testMsgs[0], ids[0])
cmb.Add(testMsgs[1], ids[1])
if len(cmb.mb.messages) != 2 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
2, len(cmb.mb.messages))
}
msg, exists := cmb.Next()
msg, rid, exists := cmb.Next()
if !exists {
t.Error("Next() did not find any messages in buffer.")
}
cmb.Succeeded(msg)
cmb.Succeeded(msg, rid)
if len(cmb.mb.messages) != 1 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
1, len(cmb.mb.messages))
}
msg, exists = cmb.Next()
msg, rid, exists = cmb.Next()
if !exists {
t.Error("Next() did not find any messages in buffer.")
}
......@@ -121,24 +130,23 @@ func TestCmixMessageBuffer_Smoke(t *testing.T) {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
0, len(cmb.mb.messages))
}
cmb.Failed(msg)
cmb.Failed(msg, rid)
if len(cmb.mb.messages) != 1 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
1, len(cmb.mb.messages))
}
msg, exists = cmb.Next()
msg, rid, exists = cmb.Next()
if !exists {
t.Error("Next() did not find any messages in buffer.")
}
cmb.Succeeded(msg)
cmb.Succeeded(msg, rid)
msg, exists = cmb.Next()
msg, rid, exists = cmb.Next()
if exists {
t.Error("Next() found a message in the buffer when it should be empty.")
}
cmb.Succeeded(msg)
if len(cmb.mb.messages) != 0 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
......@@ -149,11 +157,12 @@ func TestCmixMessageBuffer_Smoke(t *testing.T) {
// makeTestCmixMessages creates a list of messages with random data and the
// expected map after they are added to the buffer.
func makeTestCmixMessages(n int) ([]format.Message, map[MessageHash]struct{}) {
func makeTestCmixMessages(n int) ([]format.Message, []*id.ID, map[MessageHash]struct{}) {
cmh := &cmixMessageHandler{}
prng := rand.New(rand.NewSource(time.Now().UnixNano()))
mh := map[MessageHash]struct{}{}
msgs := make([]format.Message, n)
ids := make([]*id.ID, n)
for i := range msgs {
msgs[i] = format.NewMessage(128)
payload := make([]byte, 128)
......@@ -161,8 +170,17 @@ func makeTestCmixMessages(n int) ([]format.Message, map[MessageHash]struct{}) {
msgs[i].SetPayloadA(payload)
prng.Read(payload)
msgs[i].SetPayloadB(payload)
mh[cmh.HashMessage(msgs[i])] = struct{}{}
rid := id.ID{}
prng.Read(rid[:32])
rid[32] = byte(id.User)
ids[i] = &rid
sm := storedMessage{
Msg: msgs[i].Marshal(),
Recipient: ids[i].Marshal(),
}
mh[cmh.HashMessage(sm)] = struct{}{}
}
return msgs, mh
return msgs, ids, mh
}
......@@ -148,7 +148,7 @@ func (emb *E2eMessageBuffer) Next() (message.Send, params.E2E, bool) {
msg := m.(e2eMessage)
recipient, err := id.Unmarshal(msg.Recipient)
if err != nil {
jww.FATAL.Panicf("Error unmarshaling recipient: %v", err)
jww.FATAL.Panicf("Error unmarshaling Recipient: %v", err)
}
return message.Send{recipient, msg.Payload,
message.Type(msg.MessageType)}, msg.Params, true
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment