diff --git a/context/message/sendMessage.go b/context/message/sendMessage.go index 8688ab60d053f77a80787cd21f244d5e0d67d57e..1e639669f607ba0b9dbc26918bca3e6b0617d5b9 100644 --- a/context/message/sendMessage.go +++ b/context/message/sendMessage.go @@ -4,7 +4,6 @@ import "gitlab.com/xx_network/primitives/id" type Send struct { Recipient *id.ID - Sender *id.ID Payload []byte MessageType Type } diff --git a/context/networkManager.go b/context/networkManager.go index b585b946e668e780b9b5ea4832d62f9c967ab439..8d6e119df6cc961c05833a1c16181921931b79b6 100644 --- a/context/networkManager.go +++ b/context/networkManager.go @@ -16,3 +16,4 @@ type NetworkManager interface { GetInstance() *network.Instance Stoppable() stoppable.Stoppable } + diff --git a/io/keyExchange/trigger.go b/io/keyExchange/trigger.go index 06dd4a682d240aef49e00f899a0f7098710e3219..5e3925868a9162d23b098fc0d56b1d864edff8be 100644 --- a/io/keyExchange/trigger.go +++ b/io/keyExchange/trigger.go @@ -7,9 +7,12 @@ import ( "gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context/message" "gitlab.com/elixxir/client/context/params" + "gitlab.com/elixxir/client/context/utility" "gitlab.com/elixxir/client/storage/e2e" + ds "gitlab.com/elixxir/comms/network/dataStructures" "gitlab.com/elixxir/crypto/cyclic" - "gitlab.com/xx_network/primitives/id" + "gitlab.com/elixxir/primitives/states" + "time" ) func handleTrigger(ctx *context.Context, request message.Receive) { @@ -84,6 +87,35 @@ func handleTrigger(ctx *context.Context, request message.Receive) { rounds, err := ctx.Manager.SendE2E(m, e2eParams, cmixParams) + //Register the event for all rounds + sendResults := make(chan ds.EventReturn, len(rounds)) + roundEvents := ctx.Manager.GetInstance().GetRoundEvents() + for _, r := range rounds { + roundEvents.AddRoundEventChan(r, sendResults, 1*time.Minute, + states.COMPLETED, states.FAILED) + } + + //Wait until the result tracking responds + success, numTimeOut, numRoundFail := utility.TrackResults(sendResults, len(rounds)) + + // If a single partition of the Key Negotiation request does not + // transmit, the partner will not be able to read the confirmation. If + // such a failure occurs + if !success { + session.SetNegotiationStatus(e2e.Unconfirmed) + return errors.Errorf("Key Negotiation for %s failed to "+ + "transmit %v/%v paritions: %v round failures, %v timeouts", + session, numRoundFail+numTimeOut, len(rounds), numRoundFail, + numTimeOut) + } + + // otherwise, the transmission is a success and this should be denoted + // in the session and the log + jww.INFO.Printf("Key Negotiation transmission for %s sucesfull", + session) + session.SetNegotiationStatus(e2e.Sent) + + } func unmarshalKeyExchangeTrigger(grp *cyclic.Group, payload []byte) (e2e.SessionID, diff --git a/io/parse/partition.go b/io/parse/partition.go index 5a9cca45fd873112e36185c7337f32aa4671e170..8818e3628c3abb020bdaa48ee05e4eef9fa2630b 100644 --- a/io/parse/partition.go +++ b/io/parse/partition.go @@ -2,11 +2,11 @@ package parse import ( "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context/message" "gitlab.com/xx_network/primitives/id" "time" - jww "github.com/spf13/jwalterweatherman" ) const MaxMessageParts = 255 diff --git a/storage/garbledMessages.go b/storage/garbledMessages.go new file mode 100644 index 0000000000000000000000000000000000000000..cb1168da4420a87e8589f958d532731425e5c766 --- /dev/null +++ b/storage/garbledMessages.go @@ -0,0 +1,4 @@ +package storage + +type GarbledMessages struct { +} diff --git a/storage/messages.go b/storage/messages.go new file mode 100644 index 0000000000000000000000000000000000000000..2e0b96b3f1b77819488ce4f4140142df552880c5 --- /dev/null +++ b/storage/messages.go @@ -0,0 +1,3 @@ +package storage + +const criticalMessagesKey = "CriticalMessages" diff --git a/storage/session.go b/storage/session.go index 662bb5d11514f72a92c83d95299689e7fe80ff6a..2dfcef8ab516c0526956bfb5df13afe83e061522 100644 --- a/storage/session.go +++ b/storage/session.go @@ -16,6 +16,7 @@ import ( "gitlab.com/elixxir/client/storage/e2e" "gitlab.com/elixxir/client/storage/partition" "gitlab.com/elixxir/client/storage/user" + "gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/crypto/cyclic" "gitlab.com/elixxir/ekv" @@ -33,11 +34,12 @@ type Session struct { regStatus RegistrationStatus //sub-stores - e2e *e2e.Store - cmix *cmix.Store - user *user.User - conversations *conversation.Store - partition *partition.Store + e2e *e2e.Store + cmix *cmix.Store + user *user.User + conversations *conversation.Store + partition *partition.Store + criticalMessages *utility.MessageBuffer } // Initialize a new Session object @@ -87,6 +89,11 @@ func New(baseDir, password string, uid *id.ID, salt []byte, rsaKey *rsa.PrivateK return nil, errors.WithMessage(err, "Failed to create session") } + s.criticalMessages, err = utility.NewMessageBuffer(s.kv, criticalMessagesKey) + if err != nil { + return nil, errors.WithMessage(err, "Failed to create session") + } + s.conversations = conversation.NewStore(s.kv) s.partition = partition.New(s.kv) @@ -120,6 +127,11 @@ func Load(baseDir, password string) (*Session, error) { return nil, errors.WithMessage(err, "Failed to load Session") } + s.criticalMessages, err = utility.LoadMessageBuffer(s.kv, criticalMessagesKey) + if err != nil { + return nil, errors.WithMessage(err, "Failed to load session") + } + s.conversations = conversation.NewStore(s.kv) s.partition = partition.New(s.kv) @@ -144,6 +156,12 @@ func (s *Session) E2e() *e2e.Store { return s.e2e } +func (s *Session) GetCriticalMessages() *utility.MessageBuffer { + s.mux.RLock() + defer s.mux.RUnlock() + return s.criticalMessages +} + func (s *Session) Conversations() *conversation.Store { s.mux.RLock() defer s.mux.RUnlock() diff --git a/storage/utility/cmixMessageBuffer.go b/storage/utility/cmixMessageBuffer.go new file mode 100644 index 0000000000000000000000000000000000000000..d98b399fbfa914bdd869f4ef321fa23223062d70 --- /dev/null +++ b/storage/utility/cmixMessageBuffer.go @@ -0,0 +1,97 @@ +package utility + +import ( + "crypto/md5" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/primitives/format" + "time" +) + +const currentCmixMessageVersion = 0 + +type cmixMessageHandler struct{} + +// saveMessage saves the message as a versioned object. +func (cmh *cmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error { + msg := m.(format.Message) + + // Create versioned object + obj := versioned.Object{ + Version: currentCmixMessageVersion, + Timestamp: time.Now(), + Data: msg.Marshal(), + } + + // Save versioned object + return kv.Set(key, &obj) +} + +// loadMessage loads the message with the specified key. +func (cmh *cmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { + // Load the versioned object + vo, err := kv.Get(key) + if err != nil { + return format.Message{}, err + } + + // Create message from data + return format.Unmarshal(vo.Data), err +} + +// DeleteMessage deletes the message with the specified key. +func (cmh *cmixMessageHandler) DeleteMessage(kv *versioned.KV, key string) error { + return kv.Delete(key) +} + +// hashMessage generates a hash of the message. +func (cmh *cmixMessageHandler) HashMessage(m interface{}) MessageHash { + msg := m.(format.Message) + // Create message from data + return md5.Sum(msg.Marshal()) +} + +// CmixMessageBuffer wraps the message buffer to store and load raw cmix +// messages +type CmixMessageBuffer struct { + mb *MessageBuffer +} + +func NewCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, error) { + mb, err := NewMessageBuffer(kv, &cmixMessageHandler{}, key) + if err != nil { + return nil, err + } + + return &CmixMessageBuffer{mb: mb}, nil +} + +func LoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, error) { + mb, err := LoadMessageBuffer(kv, &cmixMessageHandler{}, key) + if err != nil { + return nil, err + } + + return &CmixMessageBuffer{mb: mb}, nil +} + +func (cmb *CmixMessageBuffer) Add(m format.Message) { + cmb.mb.Add(m) +} + +func (cmb *CmixMessageBuffer) Next() (format.Message, bool) { + m, ok := cmb.mb.Next() + if !ok { + return format.Message{}, false + } + + msg := m.(format.Message) + return msg, true +} + +func (cmb *CmixMessageBuffer) Succeeded(m format.Message) { + cmb.mb.Succeeded(m) +} + +func (cmb *CmixMessageBuffer) Failed(m format.Message) { + cmb.mb.Failed(m) +} diff --git a/storage/utility/cmixMessageBuffer_test.go b/storage/utility/cmixMessageBuffer_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c4fc4ec073cf9d3b5cd8bc5ac06685ac8d810686 --- /dev/null +++ b/storage/utility/cmixMessageBuffer_test.go @@ -0,0 +1,64 @@ +package utility + +import ( + "bytes" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/ekv" + "gitlab.com/elixxir/primitives/format" + "math/rand" + "testing" + "time" +) + +func Test_saveMessage(t *testing.T) { + // Set up test values + cmh := &cmixMessageHandler{} + + kv := versioned.NewKV(make(ekv.Memstore)) + subKey := "testKey" + testMsgs, _ := makeTestCmixMessages(1) + mh := cmh.HashMessage(testMsgs[0]) + key := makeStoredMessageKey(subKey, mh) + + // Save message + err := cmh.SaveMessage(kv, testMsgs[0], key) + if err != nil { + t.Errorf("saveMessage() returned an error."+ + "\n\texpected: %v\n\trecieved: %v", nil, err) + } + + // Try to get message + obj, err := kv.Get(key) + if err != nil { + t.Errorf("Get() returned an error."+ + "\n\texpected: %v\n\trecieved: %v", nil, err) + } + + if !bytes.Equal(testMsgs[0].Marshal(), obj.Data) { + t.Errorf("saveMessage() returned versioned object with incorrect data."+ + "\n\texpected: %v\n\treceived: %v", + testMsgs[0], obj.Data) + } +} + +// makeTestCmixMessages creates a list of messages with random data and the expected +// map after they are added to the buffer. +// makeTestMessages creates a list of messages with random data and the expected +// map after they are added to the buffer. +func makeTestCmixMessages(n int) ([]format.Message, map[MessageHash]struct{}) { + cmh := &cmixMessageHandler{} + prng := rand.New(rand.NewSource(time.Now().UnixNano())) + mh := map[MessageHash]struct{}{} + msgs := make([]format.Message, n) + for i := range msgs { + msgs[i] = format.NewMessage(128) + payload := make([]byte, 128) + prng.Read(payload) + msgs[i].SetPayloadA(payload) + prng.Read(payload) + msgs[i].SetPayloadB(payload) + mh[cmh.HashMessage(msgs[i])] = struct{}{} + } + + return msgs, mh +} diff --git a/storage/utility/e2eMessageBuffer.go b/storage/utility/e2eMessageBuffer.go new file mode 100644 index 0000000000000000000000000000000000000000..da2c911b5ac83e2fa3e8813b980eaa3d65e5b085 --- /dev/null +++ b/storage/utility/e2eMessageBuffer.go @@ -0,0 +1,80 @@ +package utility + +import ( + "crypto/md5" + "encoding/binary" + "encoding/json" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/storage/versioned" + "time" +) + +const currentE2EMessageVersion = 0 + +type e2eMessageHandler struct{} + +type e2eMessage struct { + Recipient []byte + Payload []byte + MessageType uint32 +} + +// saveMessage saves the message as a versioned object. +func (emh *e2eMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error { + msg := m.(e2eMessage) + + b, err := json.Marshal(&msg) + if err != nil { + jww.FATAL.Panicf("Failed to marshal e2e message for "+ + "storage: %s", err) + } + + // Create versioned object + obj := versioned.Object{ + Version: currentE2EMessageVersion, + Timestamp: time.Now(), + Data: b, + } + + // Save versioned object + return kv.Set(key, &obj) +} + +// loadMessage loads the message with the specified key. +func (emh *e2eMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { + // Load the versioned object + vo, err := kv.Get(key) + if err != nil { + return nil, err + } + + msg := e2eMessage{} + + if err := json.Unmarshal(vo.Data, &msg); err != nil { + jww.FATAL.Panicf("Failed to unmarshal e2e message for "+ + "storage: %s", err) + } + // Create message from data + return msg, err +} + +// DeleteMessage deletes the message with the specified key. +func (emh *e2eMessageHandler) DeleteMessage(kv *versioned.KV, key string) error { + return kv.Delete(key) +} + +// hashMessage generates a hash of the message. +func (emh *e2eMessageHandler) HashMessage(m interface{}) MessageHash { + msg := m.(e2eMessage) + + var digest []byte + digest = append(digest, msg.Recipient...) + digest = append(digest, msg.Payload...) + + mtBytes := make([]byte, 4) + binary.BigEndian.PutUint32(mtBytes, msg.MessageType) + digest = append(digest, mtBytes...) + + // Create message from data + return md5.Sum(digest) +} diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go index 1a0e2c9b5d97da053eb699b80e422f05103110b1..28bf6c6b8616efc44eaab8b92c7a9a4ebd971af4 100644 --- a/storage/utility/messageBuffer.go +++ b/storage/utility/messageBuffer.go @@ -1,7 +1,6 @@ package utility import ( - "crypto/md5" "encoding/json" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/storage/versioned" @@ -10,8 +9,8 @@ import ( "time" ) -// messageHash stores the key for each message stored in the buffer. -type messageHash [16]byte +// MessageHash stores the key for each message stored in the buffer. +type MessageHash [16]byte // Sub key used in building keys for saving the message to the key value store const messageSubKey = "bufferedMessage" @@ -19,6 +18,15 @@ const messageSubKey = "bufferedMessage" // Version of the file saved to the key value store const currentMessageBufferVersion = 0 +// Message interface used to handle the passed in message type so this can be +// used at diffrent layers of the stack +type MessageHandler interface { + SaveMessage(kv *versioned.KV, m interface{}, key string) error + LoadMessage(kv *versioned.KV, key string) (interface{}, error) + DeleteMessage(kv *versioned.KV, key string) error + HashMessage(m interface{}) MessageHash +} + // MessageBuffer holds a list of messages in the "not processed" or "processing" // state both in memory. Messages in the "not processed" state are held in the // messages map and messages in the "processing" state are moved into the @@ -26,21 +34,26 @@ const currentMessageBufferVersion = 0 // removed from the buffer. The actual messages are saved in the key value store // along with a copy of the buffer that is held in memory. type MessageBuffer struct { - messages map[messageHash]struct{} - processingMessages map[messageHash]struct{} + messages map[MessageHash]struct{} + processingMessages map[MessageHash]struct{} kv *versioned.KV + + handler MessageHandler + key string mux sync.RWMutex + } // NewMessageBuffer creates a new empty buffer and saves it to the passed in key // value store at the specified key. An error is returned on an unsuccessful // save. -func NewMessageBuffer(kv *versioned.KV, key string) (*MessageBuffer, error) { +func NewMessageBuffer(kv *versioned.KV, handler MessageHandler, key string) (*MessageBuffer, error) { // Create new empty buffer mb := &MessageBuffer{ - messages: make(map[messageHash]struct{}), - processingMessages: make(map[messageHash]struct{}), + messages: make(map[MessageHash]struct{}), + processingMessages: make(map[MessageHash]struct{}), + handler: handler, kv: kv, key: key, } @@ -54,11 +67,12 @@ func NewMessageBuffer(kv *versioned.KV, key string) (*MessageBuffer, error) { // LoadMessageBuffer loads an existing message buffer from the key value store // into memory at the given key. Returns an error if buffer cannot be loaded. -func LoadMessageBuffer(kv *versioned.KV, key string) (*MessageBuffer, error) { +func LoadMessageBuffer(kv *versioned.KV, handler MessageHandler, key string) (*MessageBuffer, error) { // Create new empty buffer mb := &MessageBuffer{ - messages: make(map[messageHash]struct{}), - processingMessages: make(map[messageHash]struct{}), + messages: make(map[MessageHash]struct{}), + processingMessages: make(map[MessageHash]struct{}), + handler: handler, kv: kv, key: key, } @@ -98,9 +112,9 @@ func (mb *MessageBuffer) save() error { // getMessageList returns a list of all message hashes stored in messages and // processingMessages in a random order. -func (mb *MessageBuffer) getMessageList() []messageHash { +func (mb *MessageBuffer) getMessageList() []MessageHash { // Create new slice with a length to fit all messages in either list - msgs := make([]messageHash, len(mb.messages)+len(mb.processingMessages)) + msgs := make([]MessageHash, len(mb.messages)+len(mb.processingMessages)) i := 0 // Add messages from the "not processed" list @@ -129,7 +143,7 @@ func (mb *MessageBuffer) load() error { } // Create slice of message hashes from data - var msgs []messageHash + var msgs []MessageHash err = json.Unmarshal(vo.Data, &msgs) if err != nil { return err @@ -144,8 +158,8 @@ func (mb *MessageBuffer) load() error { } // Add adds a message to the buffer in "not processing" state. -func (mb *MessageBuffer) Add(m format.Message) { - h := hashMessage(m) +func (mb *MessageBuffer) Add(m interface{}) { + h := mb.handler.HashMessage(m) mb.mux.Lock() defer mb.mux.Unlock() @@ -158,7 +172,7 @@ func (mb *MessageBuffer) Add(m format.Message) { } // Save message as versioned object - err := saveMessage(mb.kv, m, makeStoredMessageKey(mb.key, h)) + err := mb.handler.SaveMessage(mb.kv, m, makeStoredMessageKey(mb.key, h)) if err != nil { jww.FATAL.Panicf("Error saving message: %v", err) } @@ -176,7 +190,7 @@ func (mb *MessageBuffer) Add(m format.Message) { // Next gets the next message from the buffer whose state is "not processing". // The returned messages are moved to the processing state. If there are no // messages remaining, then false is returned. -func (mb *MessageBuffer) Next() (format.Message, bool) { +func (mb *MessageBuffer) Next() (interface{}, bool) { mb.mux.Lock() defer mb.mux.Unlock() @@ -184,7 +198,7 @@ func (mb *MessageBuffer) Next() (format.Message, bool) { return format.Message{}, false } - // Pop the next messageHash from the "not processing" list + // Pop the next MessageHash from the "not processing" list h := next(mb.messages) delete(mb.messages, h) @@ -192,39 +206,43 @@ func (mb *MessageBuffer) Next() (format.Message, bool) { mb.processingMessages[h] = struct{}{} // Retrieve the message for storage - m, err := loadMessage(mb.kv, makeStoredMessageKey(mb.key, h)) + m, err := mb.handler.LoadMessage(mb.kv, makeStoredMessageKey(mb.key, h)) if err != nil { jww.FATAL.Panicf("Could not load message: %v", err) } return m, true } -// next returns the first messageHash in the map returned by range. -func next(msgMap map[messageHash]struct{}) messageHash { +// next returns the first MessageHash in the map returned by range. +func next(msgMap map[MessageHash]struct{}) MessageHash { for h := range msgMap { return h } - return messageHash{} + return MessageHash{} } // Succeeded sets a messaged as processed and removed it from the buffer. -func (mb *MessageBuffer) Succeeded(m format.Message) { - h := hashMessage(m) +func (mb *MessageBuffer) Succeeded(m interface{}) { + h := mb.handler.HashMessage(m) mb.mux.Lock() defer mb.mux.Unlock() delete(mb.processingMessages, h) - err := mb.save() - if err != nil { + + if err := mb.handler.DeleteMessage(mb.kv, makeStoredMessageKey(mb.key, h)); err != nil { + jww.FATAL.Fatalf("Failed to save: %v", err) + } + + if err := mb.save(); err != nil { jww.FATAL.Fatalf("Failed to save: %v", err) } } // Failed sets a message as failed to process. It changes the message back to // the "not processed" state. -func (mb *MessageBuffer) Failed(m format.Message) { - h := hashMessage(m) +func (mb *MessageBuffer) Failed(m interface{}) { + h := mb.handler.HashMessage(m) mb.mux.Lock() defer mb.mux.Unlock() @@ -236,20 +254,9 @@ func (mb *MessageBuffer) Failed(m format.Message) { mb.messages[h] = struct{}{} } +/* // saveMessage saves the message as a versioned object. -func saveMessage(kv *versioned.KV, m format.Message, key string) error { - now := time.Now() - - // Create versioned object - obj := versioned.Object{ - Version: currentMessageBufferVersion, - Timestamp: now, - Data: m.Marshal(), - } - // Save versioned object - return kv.Set(key, &obj) -} // loadMessage loads the message with the specified key. func loadMessage(kv *versioned.KV, key string) (format.Message, error) { @@ -264,13 +271,13 @@ func loadMessage(kv *versioned.KV, key string) (format.Message, error) { } // hashMessage generates a hash of the message. -func hashMessage(m format.Message) messageHash { - // Sum returns a array that is the exact same size as the messageHash and Go +func hashMessage(m format.Message) MessageHash { + // Sum returns a array that is the exact same size as the MessageHash and Go // apparently automatically casts it return md5.Sum(m.Marshal()) } - +*/ // makeStoredMessageKey generates a new key for the message based on its has. -func makeStoredMessageKey(key string, h messageHash) string { +func makeStoredMessageKey(key string, h MessageHash) string { return key + messageSubKey + string(h[:]) } diff --git a/storage/utility/messageBuffer_test.go b/storage/utility/messageBuffer_test.go index 05c5230dd4428f26dda084b06dfd3dfbf5bd6fe2..d97a188d80a5517e7f81f7d931e5a175b51478ab 100644 --- a/storage/utility/messageBuffer_test.go +++ b/storage/utility/messageBuffer_test.go @@ -2,27 +2,68 @@ package utility import ( "bytes" + "crypto/md5" "encoding/json" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/ekv" - "gitlab.com/elixxir/primitives/format" "math/rand" + "os" "reflect" "testing" "time" ) +type testHandler struct { + messages map[string][]byte +} + +func (th *testHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error { + mBytes := m.([]byte) + th.messages[key] = mBytes + return nil +} + +func (th *testHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { + m, ok := th.messages[key] + if !ok { + return nil, os.ErrNotExist + } + return m, nil +} + +func (th *testHandler) DeleteMessage(kv *versioned.KV, key string) error { + _, ok := th.messages[key] + if !ok { + return os.ErrNotExist + } + delete(th.messages, key) + return nil +} + +func (th *testHandler) HashMessage(m interface{}) MessageHash { + mBytes := m.([]byte) + // Sum returns a array that is the exact same size as the MessageHash and Go + // apparently automatically casts it + return md5.Sum(mBytes) +} + +func newTestHandler() *testHandler { + return &testHandler{messages: make(map[string][]byte)} +} + // Tests happy path of NewMessageBuffer. func TestNewMessageBuffer(t *testing.T) { // Set up expected value + th := newTestHandler() expectedMB := &MessageBuffer{ - messages: make(map[messageHash]struct{}), - processingMessages: make(map[messageHash]struct{}), + messages: make(map[MessageHash]struct{}), + processingMessages: make(map[MessageHash]struct{}), + handler: th, kv: versioned.NewKV(make(ekv.Memstore)), key: "testKey", } - testMB, err := NewMessageBuffer(expectedMB.kv, expectedMB.key) + testMB, err := NewMessageBuffer(expectedMB.kv, th, expectedMB.key) if err != nil { t.Errorf("NewMessageBuffer() returned an error."+ "\n\texpected: %v\n\treceived: %v", nil, err) @@ -36,10 +77,12 @@ func TestNewMessageBuffer(t *testing.T) { // Tests happy path of TestLoadMessageBuffer. func TestLoadMessageBuffer(t *testing.T) { + th := newTestHandler() // Set up expected value expectedMB := &MessageBuffer{ - messages: make(map[messageHash]struct{}), - processingMessages: make(map[messageHash]struct{}), + messages: make(map[MessageHash]struct{}), + processingMessages: make(map[MessageHash]struct{}), + handler: th, kv: versioned.NewKV(make(ekv.Memstore)), key: "testKey", } @@ -49,13 +92,13 @@ func TestLoadMessageBuffer(t *testing.T) { t.Fatalf("Error saving MessageBuffer: %v", err) } - testMB, err := LoadMessageBuffer(expectedMB.kv, expectedMB.key) + testMB, err := LoadMessageBuffer(expectedMB.kv, th, expectedMB.key) // Move all the messages into one map to match the output for mh := range expectedMB.processingMessages { expectedMB.messages[mh] = struct{}{} } - expectedMB.processingMessages = make(map[messageHash]struct{}) + expectedMB.processingMessages = make(map[MessageHash]struct{}) if err != nil { t.Errorf("LoadMessageBuffer() returned an error."+ @@ -72,7 +115,8 @@ func TestLoadMessageBuffer(t *testing.T) { func TestMessageBuffer_save_NewMB(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) key := "testKey" - mb, err := NewMessageBuffer(kv, key) + + mb, err := NewMessageBuffer(kv, newTestHandler(), key) if err != nil { t.Fatalf("Failed to create new MessageBuffer: %v", err) } @@ -88,12 +132,12 @@ func TestMessageBuffer_save_NewMB(t *testing.T) { "\n\terror: %v", key, err) } - var messageArr []messageHash + var messageArr []MessageHash err = json.Unmarshal(obj.Data, &messageArr) - if !reflect.DeepEqual([]messageHash{}, messageArr) { + if !reflect.DeepEqual([]MessageHash{}, messageArr) { t.Errorf("save() returned versioned object with incorrect data."+ "\n\texpected: %#v\n\treceived: %#v", - []messageHash{}, messageArr) + []MessageHash{}, messageArr) } } @@ -101,7 +145,7 @@ func TestMessageBuffer_save_NewMB(t *testing.T) { func TestMessageBuffer_save(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) key := "testKey" - mb, err := NewMessageBuffer(kv, key) + mb, err := NewMessageBuffer(kv, newTestHandler(), key) if err != nil { t.Fatalf("Failed to create new MessageBuffer: %v", err) } @@ -119,7 +163,7 @@ func TestMessageBuffer_save(t *testing.T) { "\n\terror: %v", key, err) } - var messageArr []messageHash + var messageArr []MessageHash err = json.Unmarshal(obj.Data, &messageArr) if !cmpMessageHash(expectedMH, messageArr) { t.Errorf("save() returned versioned object with incorrect data."+ @@ -131,7 +175,7 @@ func TestMessageBuffer_save(t *testing.T) { // Tests happy path of MessageBuffer.Add(). func TestMessageBuffer_Add(t *testing.T) { // Create new MessageBuffer and fill with messages - testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") + testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), newTestHandler(), "testKey") if err != nil { t.Fatalf("Failed to create new MessageBuffer: %v", err) } @@ -161,7 +205,7 @@ func TestMessageBuffer_Add(t *testing.T) { // Tests happy path of MessageBuffer.Next(). func TestMessageBuffer_Next(t *testing.T) { // Create new MessageBuffer and fill with messages - testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") + testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), newTestHandler(), "testKey") if err != nil { t.Fatalf("Failed to create new MessageBuffer: %v", err) } @@ -173,10 +217,11 @@ func TestMessageBuffer_Next(t *testing.T) { for m, exists := testMB.Next(); exists; m, exists = testMB.Next() { foundMsg := false for i := range testMsgs { - if bytes.Equal(testMsgs[i].Marshal(), m.Marshal()) { + mBytes := m.([]byte) + if bytes.Equal(testMsgs[i], mBytes) { foundMsg = true testMsgs[i] = testMsgs[len(testMsgs)-1] - testMsgs[len(testMsgs)-1] = format.Message{} + testMsgs[len(testMsgs)-1] = []byte{} testMsgs = testMsgs[:len(testMsgs)-1] break } @@ -188,39 +233,11 @@ func TestMessageBuffer_Next(t *testing.T) { } } -func Test_saveMessage(t *testing.T) { - // Set up test values - kv := versioned.NewKV(make(ekv.Memstore)) - subKey := "testKey" - testMsgs, _ := makeTestMessages(1) - mh := hashMessage(testMsgs[0]) - key := makeStoredMessageKey(subKey, mh) - - // Save message - err := saveMessage(kv, testMsgs[0], key) - if err != nil { - t.Errorf("saveMessage() returned an error."+ - "\n\texpected: %v\n\trecieved: %v", nil, err) - } - - // Try to get message - obj, err := kv.Get(key) - if err != nil { - t.Errorf("Get() returned an error."+ - "\n\texpected: %v\n\trecieved: %v", nil, err) - } - - if !bytes.Equal(testMsgs[0].Marshal(), obj.Data) { - t.Errorf("saveMessage() returned versioned object with incorrect data."+ - "\n\texpected: %v\n\treceived: %v", - testMsgs[0], obj.Data) - } -} - // Tests happy path of MessageBuffer.Succeeded(). func TestMessageBuffer_Succeeded(t *testing.T) { + th := newTestHandler() // Create new MessageBuffer and fill with message - testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") + testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), th, "testKey") if err != nil { t.Fatalf("Failed to create new MessageBuffer: %v", err) } @@ -234,8 +251,8 @@ func TestMessageBuffer_Succeeded(t *testing.T) { testMB.Succeeded(m) - _, exists1 := testMB.messages[hashMessage(m)] - _, exists2 := testMB.processingMessages[hashMessage(m)] + _, exists1 := testMB.messages[th.HashMessage(m)] + _, exists2 := testMB.processingMessages[th.HashMessage(m)] if exists1 || exists2 { t.Errorf("Succeeded() did not remove the message from the buffer."+ "\n\tbuffer: %+v", testMB) @@ -244,8 +261,9 @@ func TestMessageBuffer_Succeeded(t *testing.T) { // Tests happy path of MessageBuffer.Failed(). func TestMessageBuffer_Failed(t *testing.T) { + th := newTestHandler() // Create new MessageBuffer and fill with message - testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") + testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), th, "testKey") if err != nil { t.Fatalf("Failed to create new MessageBuffer: %v", err) } @@ -259,8 +277,8 @@ func TestMessageBuffer_Failed(t *testing.T) { testMB.Failed(m) - _, exists1 := testMB.messages[hashMessage(m)] - _, exists2 := testMB.processingMessages[hashMessage(m)] + _, exists1 := testMB.messages[th.HashMessage(m)] + _, exists2 := testMB.processingMessages[th.HashMessage(m)] if !exists1 || exists2 { t.Errorf("Failed() did not move the message back into the \"not "+ "processed\" state.\n\tbuffer: %+v", testMB) @@ -268,13 +286,13 @@ func TestMessageBuffer_Failed(t *testing.T) { } // addTestMessages adds random messages to the buffer. -func addTestMessages(mb *MessageBuffer, n int) []messageHash { +func addTestMessages(mb *MessageBuffer, n int) []MessageHash { prng := rand.New(rand.NewSource(time.Now().UnixNano())) - msgs := make([]messageHash, n) + msgs := make([]MessageHash, n) for i := 0; i < n; i++ { keyData := make([]byte, 16) prng.Read(keyData) - mh := messageHash{} + mh := MessageHash{} copy(mh[:], keyData) if i%10 == 0 { @@ -288,9 +306,9 @@ func addTestMessages(mb *MessageBuffer, n int) []messageHash { return msgs } -// cmpMessageHash compares two slices of messageHash to see if they have the +// cmpMessageHash compares two slices of MessageHash to see if they have the // exact same elements in any order. -func cmpMessageHash(arrA, arrB []messageHash) bool { +func cmpMessageHash(arrA, arrB []MessageHash) bool { if len(arrA) != len(arrB) { return false } @@ -311,19 +329,15 @@ func cmpMessageHash(arrA, arrB []messageHash) bool { // makeTestMessages creates a list of messages with random data and the expected // map after they are added to the buffer. -func makeTestMessages(n int) ([]format.Message, map[messageHash]struct{}) { +func makeTestMessages(n int) ([][]byte, map[MessageHash]struct{}) { prng := rand.New(rand.NewSource(time.Now().UnixNano())) - mh := map[messageHash]struct{}{} - msgs := make([]format.Message, n) + mh := map[MessageHash]struct{}{} + msgs := make([][]byte, n) for i := range msgs { - msgs[i] = format.NewMessage(128) - payload := make([]byte, 128) - prng.Read(payload) - msgs[i].SetPayloadA(payload) - prng.Read(payload) - msgs[i].SetPayloadB(payload) - mh[hashMessage(msgs[i])] = struct{}{} + msgs[i] = make([]byte, 256) + prng.Read(msgs[i]) + mh[md5.Sum(msgs[i])] = struct{}{} } return msgs, mh -} +} \ No newline at end of file