diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go index 881c7ddaafa392eed831b51ec2b0cd5fc7ccbc9a..197e669c4af6436544f3d90c25c0c903fafa809f 100644 --- a/storage/utility/messageBuffer.go +++ b/storage/utility/messageBuffer.go @@ -299,27 +299,6 @@ func (mb *MessageBuffer) Failed(m interface{}) { mb.messages[h] = struct{}{} } -/* -// loadMessage loads the message with the specified key. -func loadMessage(kv *versioned.KV, key string) (format.Message, error) { - // Load the versioned object - vo, err := kv.Get(key) - if err != nil { - return format.Message{}, err - } - - // Create message from data - return format.Unmarshal(vo.Data), err -} - -// hashMessage generates a hash of the message. -func hashMessage(m format.Message) MessageHash { - // Sum returns a array that is the exact same size as the MessageHash and Go - // apparently automatically casts it - return md5.Sum(m.Marshal()) -} -*/ - // makeStoredMessageKey generates a new key for the message based on its has. func makeStoredMessageKey(key string, h MessageHash) string { return key + messageSubKey + string(h[:]) diff --git a/storage/utility/meteredCmixMessageBuffer.go b/storage/utility/meteredCmixMessageBuffer.go index e5780321a933088699e45e9c22a0c8cdfe661d40..2ccf9c11115eeb031e1aa2fc82b663ab365e56bf 100644 --- a/storage/utility/meteredCmixMessageBuffer.go +++ b/storage/utility/meteredCmixMessageBuffer.go @@ -20,15 +20,14 @@ type meteredCmixMessage struct { Timestamp time.Time } -// SaveMessage saves the message as a versioned object at the specified key -// in the key value store. +// SaveMessage saves the message as a versioned object at the specified key in +// the key value store. func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error { msg := m.(meteredCmixMessage) marshaled, err := json.Marshal(&msg) if err != nil { - return errors.WithMessage(err, "Failed to marshal metered "+ - "cmix message") + return errors.WithMessage(err, "Failed to marshal metered cmix message") } // Create versioned object @@ -52,15 +51,14 @@ func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (int return format.Message{}, err } - msg := &meteredCmixMessage{} - err = json.Unmarshal(vo.Data, msg) + msg := meteredCmixMessage{} + err = json.Unmarshal(vo.Data, &msg) if err != nil { - return nil, errors.WithMessage(err, "Failed to unmarshal "+ - "metered cmix message") + return nil, errors.WithMessage(err, "Failed to unmarshal metered cmix message") } // Create message from data - return format.Unmarshal(vo.Data), nil + return msg, nil } // DeleteMessage deletes the message with the specified key from the key value @@ -85,7 +83,7 @@ type MeteredCmixMessageBuffer struct { } func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { - mb, err := NewMessageBuffer(kv, &cmixMessageHandler{}, key) + mb, err := NewMessageBuffer(kv, &meteredCmixMessageHandler{}, key) if err != nil { return nil, err } @@ -94,7 +92,7 @@ func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMess } func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { - mb, err := LoadMessageBuffer(kv, &cmixMessageHandler{}, key) + mb, err := LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key) if err != nil { return nil, err } @@ -129,7 +127,7 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, uint, time.Time, b msg := m.(meteredCmixMessage) rtnCnt := msg.Count - //increment the count and save + // increment the count and save msg.Count++ mcmh := &meteredCmixMessageHandler{} err := mcmh.SaveMessage(mcmb.kv, msg, makeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg))) @@ -143,9 +141,9 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, uint, time.Time, b } func (mcmb *MeteredCmixMessageBuffer) Remove(m format.Message) { - mcmb.mb.Succeeded(m) + mcmb.mb.Succeeded(meteredCmixMessage{M: m.Marshal()}) } func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message) { - mcmb.mb.Failed(m) + mcmb.mb.Failed(meteredCmixMessage{M: m.Marshal()}) } diff --git a/storage/utility/meteredCmixMessageBuffer_test.go b/storage/utility/meteredCmixMessageBuffer_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9266b0cd713fbcb49475bbad87885ba5bc5a7e7d --- /dev/null +++ b/storage/utility/meteredCmixMessageBuffer_test.go @@ -0,0 +1,215 @@ +package utility + +import ( + "bytes" + "encoding/json" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/ekv" + "gitlab.com/elixxir/primitives/format" + "math/rand" + "testing" + "time" +) + +// Test happy path of meteredCmixMessage.SaveMessage(). +func Test_meteredCmixMessageHandler_SaveMessage(t *testing.T) { + // Set up test values + mcmh := &meteredCmixMessageHandler{} + kv := versioned.NewKV(make(ekv.Memstore)) + testMsgs, _ := makeTestMeteredCmixMessage(10) + + for _, msg := range testMsgs { + key := makeStoredMessageKey("testKey", mcmh.HashMessage(msg)) + + // Save message + err := mcmh.SaveMessage(kv, msg, key) + if err != nil { + t.Errorf("SaveMessage() returned an error."+ + "\n\texpected: %v\n\trecieved: %v", nil, err) + } + + // Try to get message + obj, err := kv.Get(key) + if err != nil { + t.Errorf("Get() returned an error: %v", err) + } + + msgData, err := json.Marshal(&msg) + if err != nil { + t.Fatalf("Could not marshal message: %v", err) + } + + // Test if message retrieved matches expected + if !bytes.Equal(msgData, obj.Data) { + t.Errorf("SaveMessage() returned versioned object with incorrect data."+ + "\n\texpected: %v\n\treceived: %v", + msg, obj.Data) + } + } +} + +// Test happy path of meteredCmixMessage.LoadMessage(). +func Test_meteredCmixMessageHandler_LoadMessage(t *testing.T) { + // Set up test values + mcmh := &meteredCmixMessageHandler{} + kv := versioned.NewKV(make(ekv.Memstore)) + testMsgs, _ := makeTestMeteredCmixMessage(10) + + for i, msg := range testMsgs { + key := makeStoredMessageKey("testKey", mcmh.HashMessage(msg)) + + // Save message + if err := mcmh.SaveMessage(kv, msg, key); err != nil { + t.Errorf("SaveMessage() returned an error: %v", err) + } + + // Try to load message + testMsg, err := mcmh.LoadMessage(kv, key) + if err != nil { + t.Errorf("LoadMessage() returned an error."+ + "\n\texpected: %v\n\trecieved: %v", nil, err) + } + + testMcm := testMsg.(meteredCmixMessage) + + // Test if message loaded matches expected + if !bytes.Equal(msg.M, testMcm.M) || msg.Count != testMcm.Count || !msg.Timestamp.Equal(testMcm.Timestamp) { + t.Errorf("LoadMessage() returned an unexpected object (round %d)."+ + "\n\texpected: %+v\n\treceived: %+v", i, msg, testMsg.(meteredCmixMessage)) + } + } +} + +// Test happy path of meteredCmixMessage.DeleteMessage(). +func Test_meteredCmixMessageHandler_DeleteMessage(t *testing.T) { + // Set up test values + mcmh := &meteredCmixMessageHandler{} + kv := versioned.NewKV(make(ekv.Memstore)) + testMsgs, _ := makeTestMeteredCmixMessage(10) + + for _, msg := range testMsgs { + key := makeStoredMessageKey("testKey", mcmh.HashMessage(msg)) + + // Save message + err := mcmh.SaveMessage(kv, msg, key) + if err != nil { + t.Errorf("SaveMessage() returned an error."+ + "\n\texpected: %v\n\trecieved: %v", nil, err) + } + + err = mcmh.DeleteMessage(kv, key) + if err != nil { + t.Errorf("DeleteMessage() produced an error: %v", err) + } + + // Try to get message + _, err = kv.Get(key) + if err == nil { + t.Error("Get() did not return an error.") + } + } +} + +// Smoke test of meteredCmixMessageHandler. +func Test_meteredCmixMessageHandler_Smoke(t *testing.T) { + // Set up test messages + testMsgs := makeTestFormatMessages(2) + + // Create new buffer + mcmb, err := NewMeteredCmixMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") + if err != nil { + t.Errorf("NewMeteredCmixMessageBuffer() returned an error."+ + "\n\texpected: %v\n\trecieved: %v", nil, err) + } + + // Add two messages + + mcmb.Add(testMsgs[0]) + mcmb.Add(testMsgs[1]) + + if len(mcmb.mb.messages) != 2 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 2, len(mcmb.mb.messages)) + } + + msg, _, _, exists := mcmb.Next() + if !exists { + t.Error("Next() did not find any messages in buffer.") + } + mcmb.Remove(msg) + + if len(mcmb.mb.messages) != 1 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 1, len(mcmb.mb.messages)) + } + + msg, _, _, exists = mcmb.Next() + if !exists { + t.Error("Next() did not find any messages in buffer.") + } + if len(mcmb.mb.messages) != 0 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 0, len(mcmb.mb.messages)) + } + mcmb.Failed(msg) + + if len(mcmb.mb.messages) != 1 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 1, len(mcmb.mb.messages)) + } + + msg, _, _, exists = mcmb.Next() + if !exists { + t.Error("Next() did not find any messages in buffer.") + } + mcmb.Remove(msg) + + msg, _, _, exists = mcmb.Next() + if exists { + t.Error("Next() found a message in the buffer when it should be empty.") + } + mcmb.Remove(msg) + + if len(mcmb.mb.messages) != 0 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 0, len(mcmb.mb.messages)) + } + +} + +// makeTestMeteredCmixMessage creates a list of messages with random data and the +// expected map after they are added to the buffer. +func makeTestMeteredCmixMessage(n int) ([]meteredCmixMessage, map[MessageHash]struct{}) { + mcmh := &meteredCmixMessageHandler{} + prng := rand.New(rand.NewSource(time.Now().UnixNano())) + mh := map[MessageHash]struct{}{} + msgs := make([]meteredCmixMessage, n) + for i := range msgs { + payload := make([]byte, 128) + prng.Read(payload) + msgs[i] = meteredCmixMessage{ + M: payload, + Count: uint(prng.Uint64()), + Timestamp: time.Unix(0, 0), + } + mh[mcmh.HashMessage(msgs[i])] = struct{}{} + } + + return msgs, mh +} + +// makeTestFormatMessages creates a list of messages with random data. +func makeTestFormatMessages(n int) []format.Message { + prng := rand.New(rand.NewSource(time.Now().UnixNano())) + msgs := make([]format.Message, n) + for i := range msgs { + msgs[i] = format.NewMessage(128) + payload := make([]byte, 128) + prng.Read(payload) + msgs[i].SetPayloadA(payload) + prng.Read(payload) + msgs[i].SetPayloadB(payload) + } + + return msgs +}