diff --git a/go.mod b/go.mod index 6939ab034e484cc7bbad707487fe4a730477155f..b581b5ca57cb0187298436d0353aecb64c864a8e 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( gitlab.com/elixxir/comms v0.0.0-20200903181126-c92d7a304999 gitlab.com/elixxir/crypto v0.0.0-20200907171019-008a9d4aa264 gitlab.com/elixxir/ekv v0.1.1 - gitlab.com/elixxir/primitives v0.0.0-20200903200059-7dcf9b844a40 + gitlab.com/elixxir/primitives v0.0.0-20200907165319-16ed0124890b gitlab.com/xx_network/comms v0.0.0-20200825213037-f58fa7c0a641 gitlab.com/xx_network/crypto v0.0.0-20200812183430-c77a5281c686 gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2 diff --git a/go.sum b/go.sum index c3ef772af2aabfc9b9cc377913c6685e49f47f1e..131b1bef29401ee0c6c81038eb4b057f28431732 100644 --- a/go.sum +++ b/go.sum @@ -221,6 +221,8 @@ gitlab.com/elixxir/primitives v0.0.0-20200827170420-5d50351f99b4 h1:Vb8N4164sSk3 gitlab.com/elixxir/primitives v0.0.0-20200827170420-5d50351f99b4/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2YKnw5c+LJCE= gitlab.com/elixxir/primitives v0.0.0-20200903200059-7dcf9b844a40 h1:ZFwNuC0s8JeKzGUPlffvp5iR0uV4nKlPq1+9uvxDklg= gitlab.com/elixxir/primitives v0.0.0-20200903200059-7dcf9b844a40/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2YKnw5c+LJCE= +gitlab.com/elixxir/primitives v0.0.0-20200907165319-16ed0124890b h1:d1ttSIOWxWytnTpjO9hyr4KFtv/dmDBYuK59EP0sjAQ= +gitlab.com/elixxir/primitives v0.0.0-20200907165319-16ed0124890b/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2YKnw5c+LJCE= gitlab.com/xx_network/comms v0.0.0-20200805174823-841427dd5023/go.mod h1:owEcxTRl7gsoM8c3RQ5KAm5GstxrJp5tn+6JfQ4z5Hw= gitlab.com/xx_network/comms v0.0.0-20200806235452-3a82720833ba h1:7nozLSNBX0CfP53DDiDNLJx9obhYGfGf5na0/c9rMso= gitlab.com/xx_network/comms v0.0.0-20200806235452-3a82720833ba/go.mod h1:idLzPGYig57XE7xuU93OlIF9s6NgSJj7OArQvsd5DjY= diff --git a/storage/garbledMessages.go b/storage/garbledMessages.go deleted file mode 100644 index cb1168da4420a87e8589f958d532731425e5c766..0000000000000000000000000000000000000000 --- a/storage/garbledMessages.go +++ /dev/null @@ -1,4 +0,0 @@ -package storage - -type GarbledMessages struct { -} diff --git a/storage/messages.go b/storage/messages.go index 2e0b96b3f1b77819488ce4f4140142df552880c5..8f148a93a31763a49efe3c490c3dd595dcf0bfcb 100644 --- a/storage/messages.go +++ b/storage/messages.go @@ -1,3 +1,6 @@ package storage -const criticalMessagesKey = "CriticalMessages" +const ( + criticalMessagesKey = "CriticalMessages" + garbledMessagesKey = "GarbledMessages" +) diff --git a/storage/session.go b/storage/session.go index 2dfcef8ab516c0526956bfb5df13afe83e061522..53ff91e39354b24633dd0e1f41d22dc3880616ec 100644 --- a/storage/session.go +++ b/storage/session.go @@ -39,7 +39,8 @@ type Session struct { user *user.User conversations *conversation.Store partition *partition.Store - criticalMessages *utility.MessageBuffer + criticalMessages *utility.E2eMessageBuffer + garbledMessages *utility.CmixMessageBuffer } // Initialize a new Session object @@ -52,7 +53,7 @@ func initStore(baseDir, password string) (*Session, error) { } s = &Session{ - kv: versioned.NewKV(fs), + kv: versioned.NewKV(fs), } return s, nil @@ -89,7 +90,12 @@ func New(baseDir, password string, uid *id.ID, salt []byte, rsaKey *rsa.PrivateK return nil, errors.WithMessage(err, "Failed to create session") } - s.criticalMessages, err = utility.NewMessageBuffer(s.kv, criticalMessagesKey) + s.criticalMessages, err = utility.NewE2eMessageBuffer(s.kv, criticalMessagesKey) + if err != nil { + return nil, errors.WithMessage(err, "Failed to create session") + } + + s.garbledMessages, err = utility.NewCmixMessageBuffer(s.kv, garbledMessagesKey) if err != nil { return nil, errors.WithMessage(err, "Failed to create session") } @@ -127,7 +133,12 @@ func Load(baseDir, password string) (*Session, error) { return nil, errors.WithMessage(err, "Failed to load Session") } - s.criticalMessages, err = utility.LoadMessageBuffer(s.kv, criticalMessagesKey) + s.criticalMessages, err = utility.LoadE2eMessageBuffer(s.kv, criticalMessagesKey) + if err != nil { + return nil, errors.WithMessage(err, "Failed to load session") + } + + s.garbledMessages, err = utility.LoadCmixMessageBuffer(s.kv, garbledMessagesKey) if err != nil { return nil, errors.WithMessage(err, "Failed to load session") } @@ -156,12 +167,18 @@ func (s *Session) E2e() *e2e.Store { return s.e2e } -func (s *Session) GetCriticalMessages() *utility.MessageBuffer { +func (s *Session) GetCriticalMessages() *utility.E2eMessageBuffer { s.mux.RLock() defer s.mux.RUnlock() return s.criticalMessages } +func (s *Session) GetGarbledMessages() *utility.CmixMessageBuffer { + s.mux.RLock() + defer s.mux.RUnlock() + return s.garbledMessages +} + func (s *Session) Conversations() *conversation.Store { s.mux.RLock() defer s.mux.RUnlock() diff --git a/storage/utility/cmixMessageBuffer.go b/storage/utility/cmixMessageBuffer.go index d98b399fbfa914bdd869f4ef321fa23223062d70..1a1756787ba9c080e341053d611a4bbde846843d 100644 --- a/storage/utility/cmixMessageBuffer.go +++ b/storage/utility/cmixMessageBuffer.go @@ -11,7 +11,8 @@ const currentCmixMessageVersion = 0 type cmixMessageHandler struct{} -// saveMessage saves the message as a versioned object. +// SaveMessage saves the message as a versioned object at the specified key +// in the key value store. func (cmh *cmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error { msg := m.(format.Message) @@ -26,7 +27,9 @@ func (cmh *cmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key return kv.Set(key, &obj) } -// loadMessage loads the message with the specified key. +// LoadMessage returns the message with the specified key from the key value +// store. An empty message and error are returned if the message could not be +// retrieved. func (cmh *cmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { // Load the versioned object vo, err := kv.Get(key) @@ -35,23 +38,24 @@ func (cmh *cmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interf } // Create message from data - return format.Unmarshal(vo.Data), err + return format.Unmarshal(vo.Data), nil } -// DeleteMessage deletes the message with the specified key. +// DeleteMessage deletes the message with the specified key from the key value +// store. func (cmh *cmixMessageHandler) DeleteMessage(kv *versioned.KV, key string) error { return kv.Delete(key) } -// hashMessage generates a hash of the message. +// HashMessage generates a hash of the message. func (cmh *cmixMessageHandler) HashMessage(m interface{}) MessageHash { msg := m.(format.Message) - // Create message from data + return md5.Sum(msg.Marshal()) } // CmixMessageBuffer wraps the message buffer to store and load raw cmix -// messages +// messages. type CmixMessageBuffer struct { mb *MessageBuffer } diff --git a/storage/utility/cmixMessageBuffer_test.go b/storage/utility/cmixMessageBuffer_test.go index c4fc4ec073cf9d3b5cd8bc5ac06685ac8d810686..fff9ed7c35aba0eb5bedf7184fc19d9edddf50c4 100644 --- a/storage/utility/cmixMessageBuffer_test.go +++ b/storage/utility/cmixMessageBuffer_test.go @@ -6,45 +6,142 @@ import ( "gitlab.com/elixxir/ekv" "gitlab.com/elixxir/primitives/format" "math/rand" + "reflect" "testing" "time" ) -func Test_saveMessage(t *testing.T) { +// Test happy path of cmixMessageHandler.SaveMessage(). +func TestCmixMessageHandler_SaveMessage(t *testing.T) { // Set up test values cmh := &cmixMessageHandler{} + kv := versioned.NewKV(make(ekv.Memstore)) + testMsgs, _ := makeTestCmixMessages(10) + + for _, msg := range testMsgs { + key := makeStoredMessageKey("testKey", cmh.HashMessage(msg)) + + // Save message + err := cmh.SaveMessage(kv, msg, key) + if err != nil { + t.Errorf("SaveMessage() returned an error."+ + "\n\texpected: %v\n\trecieved: %v", nil, err) + } + // Try to get message + obj, err := kv.Get(key) + if err != nil { + t.Errorf("Get() returned an error: %v", err) + } + + // Test if message retrieved matches expected + if !bytes.Equal(msg.Marshal(), obj.Data) { + t.Errorf("SaveMessage() returned versioned object with incorrect data."+ + "\n\texpected: %v\n\treceived: %v", + msg, obj.Data) + } + } +} + +// Test happy path of cmixMessageHandler.LoadMessage(). +func TestCmixMessageHandler_LoadMessage(t *testing.T) { + // Set up test values + cmh := &cmixMessageHandler{} kv := versioned.NewKV(make(ekv.Memstore)) - subKey := "testKey" - testMsgs, _ := makeTestCmixMessages(1) - mh := cmh.HashMessage(testMsgs[0]) - key := makeStoredMessageKey(subKey, mh) + testMsgs, _ := makeTestCmixMessages(10) - // Save message - err := cmh.SaveMessage(kv, testMsgs[0], key) - if err != nil { - t.Errorf("saveMessage() returned an error."+ - "\n\texpected: %v\n\trecieved: %v", nil, err) + for _, msg := range testMsgs { + key := makeStoredMessageKey("testKey", cmh.HashMessage(msg)) + + // Save message + if err := cmh.SaveMessage(kv, msg, key); err != nil { + t.Errorf("SaveMessage() returned an error: %v", err) + } + + // Try to load message + testMsg, err := cmh.LoadMessage(kv, key) + if err != nil { + t.Errorf("LoadMessage() returned an error."+ + "\n\texpected: %v\n\trecieved: %v", nil, err) + } + + // Test if message loaded matches expected + if !reflect.DeepEqual(msg, testMsg) { + t.Errorf("LoadMessage() returned an unexpected object."+ + "\n\texpected: %v\n\treceived: %v", + msg, testMsg) + } } +} - // Try to get message - obj, err := kv.Get(key) +// Smoke test of cmixMessageHandler. +func TestCmixMessageBuffer_Smoke(t *testing.T) { + // Set up test messages + testMsgs, _ := makeTestCmixMessages(2) + + // Create new buffer + cmb, err := NewCmixMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") if err != nil { - t.Errorf("Get() returned an error."+ + t.Errorf("NewCmixMessageBuffer() returned an error."+ "\n\texpected: %v\n\trecieved: %v", nil, err) } - if !bytes.Equal(testMsgs[0].Marshal(), obj.Data) { - t.Errorf("saveMessage() returned versioned object with incorrect data."+ - "\n\texpected: %v\n\treceived: %v", - testMsgs[0], obj.Data) + // Add two messages + cmb.Add(testMsgs[0]) + cmb.Add(testMsgs[1]) + + if len(cmb.mb.messages) != 2 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 2, len(cmb.mb.messages)) } + + msg, exists := cmb.Next() + if !exists { + t.Error("Next() did not find any messages in buffer.") + } + cmb.Succeeded(msg) + + if len(cmb.mb.messages) != 1 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 1, len(cmb.mb.messages)) + } + + msg, exists = cmb.Next() + if !exists { + t.Error("Next() did not find any messages in buffer.") + } + if len(cmb.mb.messages) != 0 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 0, len(cmb.mb.messages)) + } + cmb.Failed(msg) + + if len(cmb.mb.messages) != 1 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 1, len(cmb.mb.messages)) + } + + msg, exists = cmb.Next() + if !exists { + t.Error("Next() did not find any messages in buffer.") + } + cmb.Succeeded(msg) + + msg, exists = cmb.Next() + if exists { + t.Error("Next() found a message in the buffer when it should be empty.") + } + cmb.Succeeded(msg) + + if len(cmb.mb.messages) != 0 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 0, len(cmb.mb.messages)) + } + } -// makeTestCmixMessages creates a list of messages with random data and the expected -// map after they are added to the buffer. -// makeTestMessages creates a list of messages with random data and the expected -// map after they are added to the buffer. +// makeTestCmixMessages creates a list of messages with random data and the +// expected map after they are added to the buffer. func makeTestCmixMessages(n int) ([]format.Message, map[MessageHash]struct{}) { cmh := &cmixMessageHandler{} prng := rand.New(rand.NewSource(time.Now().UnixNano())) diff --git a/storage/utility/e2eMessageBuffer.go b/storage/utility/e2eMessageBuffer.go index da2c911b5ac83e2fa3e8813b980eaa3d65e5b085..81ec03ba3abd385f8395218d76b9e5829c98e570 100644 --- a/storage/utility/e2eMessageBuffer.go +++ b/storage/utility/e2eMessageBuffer.go @@ -5,7 +5,9 @@ import ( "encoding/binary" "encoding/json" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/context/message" "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/xx_network/primitives/id" "time" ) @@ -19,14 +21,14 @@ type e2eMessage struct { MessageType uint32 } -// saveMessage saves the message as a versioned object. +// SaveMessage saves the e2eMessage as a versioned object at the specified key +// in the key value store. func (emh *e2eMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error { msg := m.(e2eMessage) b, err := json.Marshal(&msg) if err != nil { - jww.FATAL.Panicf("Failed to marshal e2e message for "+ - "storage: %s", err) + jww.FATAL.Panicf("Failed to marshal e2e message for storage: %s", err) } // Create versioned object @@ -40,7 +42,9 @@ func (emh *e2eMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key s return kv.Set(key, &obj) } -// loadMessage loads the message with the specified key. +// LoadMessage returns the e2eMessage with the specified key from the key value +// store. An empty message and error are returned if the message could not be +// retrieved. func (emh *e2eMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { // Load the versioned object vo, err := kv.Get(key) @@ -48,22 +52,22 @@ func (emh *e2eMessageHandler) LoadMessage(kv *versioned.KV, key string) (interfa return nil, err } + // Unmarshal data into e2eMessage msg := e2eMessage{} - if err := json.Unmarshal(vo.Data, &msg); err != nil { - jww.FATAL.Panicf("Failed to unmarshal e2e message for "+ - "storage: %s", err) + jww.FATAL.Panicf("Failed to unmarshal e2e message for storage: %s", err) } - // Create message from data + return msg, err } -// DeleteMessage deletes the message with the specified key. +// DeleteMessage deletes the message with the specified key from the key value +// store. func (emh *e2eMessageHandler) DeleteMessage(kv *versioned.KV, key string) error { return kv.Delete(key) } -// hashMessage generates a hash of the message. +// HashMessage generates a hash of the e2eMessage. func (emh *e2eMessageHandler) HashMessage(m interface{}) MessageHash { msg := m.(e2eMessage) @@ -75,6 +79,60 @@ func (emh *e2eMessageHandler) HashMessage(m interface{}) MessageHash { binary.BigEndian.PutUint32(mtBytes, msg.MessageType) digest = append(digest, mtBytes...) - // Create message from data return md5.Sum(digest) } + +// E2eMessageBuffer wraps the message buffer to store and load raw e2eMessages. +type E2eMessageBuffer struct { + mb *MessageBuffer +} + +func NewE2eMessageBuffer(kv *versioned.KV, key string) (*E2eMessageBuffer, error) { + mb, err := NewMessageBuffer(kv, &e2eMessageHandler{}, key) + if err != nil { + return nil, err + } + + return &E2eMessageBuffer{mb: mb}, nil +} + +func LoadE2eMessageBuffer(kv *versioned.KV, key string) (*E2eMessageBuffer, error) { + mb, err := LoadMessageBuffer(kv, &e2eMessageHandler{}, key) + if err != nil { + return nil, err + } + + return &E2eMessageBuffer{mb: mb}, nil +} + +func (emb *E2eMessageBuffer) Add(m message.Send) { + e2eMsg := e2eMessage{ + Recipient: m.Recipient.Marshal(), + Payload: m.Payload, + MessageType: uint32(m.MessageType), + } + + emb.mb.Add(e2eMsg) +} + +func (emb *E2eMessageBuffer) Next() (message.Send, bool) { + m, ok := emb.mb.Next() + if !ok { + return message.Send{}, false + } + + msg := m.(e2eMessage) + recipient, err := id.Unmarshal(msg.Recipient) + if err != nil { + jww.FATAL.Panicf("Error unmarshaling recipient: %v", err) + } + return message.Send{recipient, msg.Payload, message.Type(msg.MessageType)}, true +} + +func (emb *E2eMessageBuffer) Succeeded(m message.Send) { + emb.mb.Succeeded(e2eMessage{m.Recipient.Marshal(), m.Payload, uint32(m.MessageType)}) +} + +func (emb *E2eMessageBuffer) Failed(m message.Send) { + emb.mb.Failed(e2eMessage{m.Recipient.Marshal(), m.Payload, uint32(m.MessageType)}) +} diff --git a/storage/utility/e2eMessageBuffer_test.go b/storage/utility/e2eMessageBuffer_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8a2494903f86d4d8cdfefefc42b7048ce64ef2c6 --- /dev/null +++ b/storage/utility/e2eMessageBuffer_test.go @@ -0,0 +1,168 @@ +package utility + +import ( + "encoding/json" + "gitlab.com/elixxir/client/context/message" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/ekv" + "gitlab.com/xx_network/primitives/id" + "math/rand" + "reflect" + "testing" + "time" +) + +// Test happy path of e2eMessageHandler.SaveMessage(). +func TestE2EMessageHandler_SaveMessage(t *testing.T) { + // Set up test values + emg := &e2eMessageHandler{} + kv := versioned.NewKV(make(ekv.Memstore)) + testMsgs, _ := makeTestE2EMessages(10, t) + + for _, msg := range testMsgs { + key := makeStoredMessageKey("testKey", emg.HashMessage(msg)) + + // Save message + err := emg.SaveMessage(kv, msg, key) + if err != nil { + t.Errorf("SaveMessage() returned an error."+ + "\n\texpected: %v\n\trecieved: %v", nil, err) + } + + // Try to get message + obj, err := kv.Get(key) + if err != nil { + t.Errorf("Get() returned an error: %v", err) + } + + // Test if message retrieved matches expected + testMsg := &e2eMessage{} + if err := json.Unmarshal(obj.Data, testMsg); err != nil { + t.Errorf("Failed to unmarshal message: %v", err) + } + if !reflect.DeepEqual(msg, *testMsg) { + t.Errorf("SaveMessage() returned versioned object with incorrect data."+ + "\n\texpected: %v\n\treceived: %v", + msg, *testMsg) + } + } +} + +// Test happy path of e2eMessageHandler.LoadMessage(). +func TestE2EMessageHandler_LoadMessage(t *testing.T) { + // Set up test values + cmh := &e2eMessageHandler{} + kv := versioned.NewKV(make(ekv.Memstore)) + testMsgs, _ := makeTestE2EMessages(10, t) + + for _, msg := range testMsgs { + key := makeStoredMessageKey("testKey", cmh.HashMessage(msg)) + + // Save message + if err := cmh.SaveMessage(kv, msg, key); err != nil { + t.Errorf("SaveMessage() returned an error: %v", err) + } + + // Try to load message + testMsg, err := cmh.LoadMessage(kv, key) + if err != nil { + t.Errorf("LoadMessage() returned an error."+ + "\n\texpected: %v\n\trecieved: %v", nil, err) + } + + // Test if message loaded matches expected + if !reflect.DeepEqual(msg, testMsg) { + t.Errorf("LoadMessage() returned an unexpected object."+ + "\n\texpected: %v\n\treceived: %v", + msg, testMsg) + } + } +} + +// Smoke test of e2eMessageHandler. +func TestE2EMessageHandler_Smoke(t *testing.T) { + // Set up test messages + _, testMsgs := makeTestE2EMessages(2, t) + + // Create new buffer + cmb, err := NewE2eMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") + if err != nil { + t.Errorf("NewE2eMessageBuffer() returned an error."+ + "\n\texpected: %v\n\trecieved: %v", nil, err) + } + + // Add two messages + cmb.Add(testMsgs[0]) + cmb.Add(testMsgs[1]) + + if len(cmb.mb.messages) != 2 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 2, len(cmb.mb.messages)) + } + + msg, exists := cmb.Next() + if !exists { + t.Error("Next() did not find any messages in buffer.") + } + cmb.Succeeded(msg) + + if len(cmb.mb.messages) != 1 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 1, len(cmb.mb.messages)) + } + + msg, exists = cmb.Next() + if !exists { + t.Error("Next() did not find any messages in buffer.") + } + if len(cmb.mb.messages) != 0 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 0, len(cmb.mb.messages)) + } + cmb.Failed(msg) + + if len(cmb.mb.messages) != 1 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 1, len(cmb.mb.messages)) + } + + msg, exists = cmb.Next() + if !exists { + t.Error("Next() did not find any messages in buffer.") + } + cmb.Succeeded(msg) + + msg, exists = cmb.Next() + if exists { + t.Error("Next() found a message in the buffer when it should be empty.") + } + + if len(cmb.mb.messages) != 0 { + t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", + 0, len(cmb.mb.messages)) + } + +} + +// makeTestE2EMessages creates a list of messages with random data and the +// expected map after they are added to the buffer. +func makeTestE2EMessages(n int, t *testing.T) ([]e2eMessage, []message.Send) { + prng := rand.New(rand.NewSource(time.Now().UnixNano())) + msgs := make([]e2eMessage, n) + send := make([]message.Send, n) + for i := range msgs { + rngBytes := make([]byte, 128) + prng.Read(rngBytes) + msgs[i].Recipient = rngBytes + prng.Read(rngBytes) + msgs[i].Payload = rngBytes + prng.Read(rngBytes) + msgs[i].MessageType = uint32(rngBytes[0]) + + send[i].Recipient = id.NewIdFromString(string(msgs[i].Recipient), id.User, t) + send[i].Payload = msgs[i].Payload + send[i].MessageType = message.Type(msgs[i].MessageType) + } + + return msgs, send +} diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go index 28bf6c6b8616efc44eaab8b92c7a9a4ebd971af4..d40feafbf6e71746297280c517814e5515f467fd 100644 --- a/storage/utility/messageBuffer.go +++ b/storage/utility/messageBuffer.go @@ -9,7 +9,8 @@ import ( "time" ) -// MessageHash stores the key for each message stored in the buffer. +// MessageHash stores the hash of a message, which is used as the key for each +// message stored in the buffer. type MessageHash [16]byte // Sub key used in building keys for saving the message to the key value store @@ -18,12 +19,22 @@ const messageSubKey = "bufferedMessage" // Version of the file saved to the key value store const currentMessageBufferVersion = 0 -// Message interface used to handle the passed in message type so this can be -// used at diffrent layers of the stack +// MessageHandler interface used to handle the passed in message type so the +// buffer can be used at different layers of the stack. type MessageHandler interface { + // SaveMessage saves the message as a versioned object at the specified key + // in the key value store. SaveMessage(kv *versioned.KV, m interface{}, key string) error + + // LoadMessage returns the message with the specified key from the key value + // store. LoadMessage(kv *versioned.KV, key string) (interface{}, error) + + // DeleteMessage deletes the message with the specified key from the key + // value store. DeleteMessage(kv *versioned.KV, key string) error + + // HashMessage generates a hash of the message. HashMessage(m interface{}) MessageHash } @@ -37,18 +48,16 @@ type MessageBuffer struct { messages map[MessageHash]struct{} processingMessages map[MessageHash]struct{} kv *versioned.KV - - handler MessageHandler - + handler MessageHandler key string mux sync.RWMutex - } // NewMessageBuffer creates a new empty buffer and saves it to the passed in key // value store at the specified key. An error is returned on an unsuccessful // save. -func NewMessageBuffer(kv *versioned.KV, handler MessageHandler, key string) (*MessageBuffer, error) { +func NewMessageBuffer(kv *versioned.KV, handler MessageHandler, + key string) (*MessageBuffer, error) { // Create new empty buffer mb := &MessageBuffer{ messages: make(map[MessageHash]struct{}), @@ -67,7 +76,8 @@ func NewMessageBuffer(kv *versioned.KV, handler MessageHandler, key string) (*Me // LoadMessageBuffer loads an existing message buffer from the key value store // into memory at the given key. Returns an error if buffer cannot be loaded. -func LoadMessageBuffer(kv *versioned.KV, handler MessageHandler, key string) (*MessageBuffer, error) { +func LoadMessageBuffer(kv *versioned.KV, handler MessageHandler, + key string) (*MessageBuffer, error) { // Create new empty buffer mb := &MessageBuffer{ messages: make(map[MessageHash]struct{}), @@ -228,13 +238,18 @@ func (mb *MessageBuffer) Succeeded(m interface{}) { mb.mux.Lock() defer mb.mux.Unlock() + // Remove message from buffer delete(mb.processingMessages, h) - if err := mb.handler.DeleteMessage(mb.kv, makeStoredMessageKey(mb.key, h)); err != nil { + // Remove message from key value store + err := mb.handler.DeleteMessage(mb.kv, makeStoredMessageKey(mb.key, h)) + if err != nil { jww.FATAL.Fatalf("Failed to save: %v", err) } - if err := mb.save(); err != nil { + // Save modified buffer to key value store + err = mb.save() + if err != nil { jww.FATAL.Fatalf("Failed to save: %v", err) } } @@ -255,9 +270,6 @@ func (mb *MessageBuffer) Failed(m interface{}) { } /* -// saveMessage saves the message as a versioned object. - - // loadMessage loads the message with the specified key. func loadMessage(kv *versioned.KV, key string) (format.Message, error) { // Load the versioned object @@ -277,6 +289,7 @@ func hashMessage(m format.Message) MessageHash { return md5.Sum(m.Marshal()) } */ + // makeStoredMessageKey generates a new key for the message based on its has. func makeStoredMessageKey(key string, h MessageHash) string { return key + messageSubKey + string(h[:]) diff --git a/storage/utility/messageBuffer_test.go b/storage/utility/messageBuffer_test.go index d97a188d80a5517e7f81f7d931e5a175b51478ab..f2ebcd0923bc0438d518547d077b8588ab94583b 100644 --- a/storage/utility/messageBuffer_test.go +++ b/storage/utility/messageBuffer_test.go @@ -340,4 +340,4 @@ func makeTestMessages(n int) ([][]byte, map[MessageHash]struct{}) { } return msgs, mh -} \ No newline at end of file +}