Skip to content
Snippets Groups Projects
Commit 305d6f6e authored by Jono Wenger's avatar Jono Wenger Committed by Benjamin Wenger
Browse files

message buffer tests

parent 0a4778c1
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,7 @@ require (
gitlab.com/elixxir/comms v0.0.0-20200903181126-c92d7a304999
gitlab.com/elixxir/crypto v0.0.0-20200907171019-008a9d4aa264
gitlab.com/elixxir/ekv v0.1.1
gitlab.com/elixxir/primitives v0.0.0-20200903200059-7dcf9b844a40
gitlab.com/elixxir/primitives v0.0.0-20200907165319-16ed0124890b
gitlab.com/xx_network/comms v0.0.0-20200825213037-f58fa7c0a641
gitlab.com/xx_network/crypto v0.0.0-20200812183430-c77a5281c686
gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2
......
......@@ -221,6 +221,8 @@ gitlab.com/elixxir/primitives v0.0.0-20200827170420-5d50351f99b4 h1:Vb8N4164sSk3
gitlab.com/elixxir/primitives v0.0.0-20200827170420-5d50351f99b4/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2YKnw5c+LJCE=
gitlab.com/elixxir/primitives v0.0.0-20200903200059-7dcf9b844a40 h1:ZFwNuC0s8JeKzGUPlffvp5iR0uV4nKlPq1+9uvxDklg=
gitlab.com/elixxir/primitives v0.0.0-20200903200059-7dcf9b844a40/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2YKnw5c+LJCE=
gitlab.com/elixxir/primitives v0.0.0-20200907165319-16ed0124890b h1:d1ttSIOWxWytnTpjO9hyr4KFtv/dmDBYuK59EP0sjAQ=
gitlab.com/elixxir/primitives v0.0.0-20200907165319-16ed0124890b/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2YKnw5c+LJCE=
gitlab.com/xx_network/comms v0.0.0-20200805174823-841427dd5023/go.mod h1:owEcxTRl7gsoM8c3RQ5KAm5GstxrJp5tn+6JfQ4z5Hw=
gitlab.com/xx_network/comms v0.0.0-20200806235452-3a82720833ba h1:7nozLSNBX0CfP53DDiDNLJx9obhYGfGf5na0/c9rMso=
gitlab.com/xx_network/comms v0.0.0-20200806235452-3a82720833ba/go.mod h1:idLzPGYig57XE7xuU93OlIF9s6NgSJj7OArQvsd5DjY=
......
package storage
type GarbledMessages struct {
}
package storage
const criticalMessagesKey = "CriticalMessages"
const (
criticalMessagesKey = "CriticalMessages"
garbledMessagesKey = "GarbledMessages"
)
......@@ -39,7 +39,8 @@ type Session struct {
user *user.User
conversations *conversation.Store
partition *partition.Store
criticalMessages *utility.MessageBuffer
criticalMessages *utility.E2eMessageBuffer
garbledMessages *utility.CmixMessageBuffer
}
// Initialize a new Session object
......@@ -89,7 +90,12 @@ func New(baseDir, password string, uid *id.ID, salt []byte, rsaKey *rsa.PrivateK
return nil, errors.WithMessage(err, "Failed to create session")
}
s.criticalMessages, err = utility.NewMessageBuffer(s.kv, criticalMessagesKey)
s.criticalMessages, err = utility.NewE2eMessageBuffer(s.kv, criticalMessagesKey)
if err != nil {
return nil, errors.WithMessage(err, "Failed to create session")
}
s.garbledMessages, err = utility.NewCmixMessageBuffer(s.kv, garbledMessagesKey)
if err != nil {
return nil, errors.WithMessage(err, "Failed to create session")
}
......@@ -127,7 +133,12 @@ func Load(baseDir, password string) (*Session, error) {
return nil, errors.WithMessage(err, "Failed to load Session")
}
s.criticalMessages, err = utility.LoadMessageBuffer(s.kv, criticalMessagesKey)
s.criticalMessages, err = utility.LoadE2eMessageBuffer(s.kv, criticalMessagesKey)
if err != nil {
return nil, errors.WithMessage(err, "Failed to load session")
}
s.garbledMessages, err = utility.LoadCmixMessageBuffer(s.kv, garbledMessagesKey)
if err != nil {
return nil, errors.WithMessage(err, "Failed to load session")
}
......@@ -156,12 +167,18 @@ func (s *Session) E2e() *e2e.Store {
return s.e2e
}
func (s *Session) GetCriticalMessages() *utility.MessageBuffer {
func (s *Session) GetCriticalMessages() *utility.E2eMessageBuffer {
s.mux.RLock()
defer s.mux.RUnlock()
return s.criticalMessages
}
func (s *Session) GetGarbledMessages() *utility.CmixMessageBuffer {
s.mux.RLock()
defer s.mux.RUnlock()
return s.garbledMessages
}
func (s *Session) Conversations() *conversation.Store {
s.mux.RLock()
defer s.mux.RUnlock()
......
......@@ -11,7 +11,8 @@ const currentCmixMessageVersion = 0
type cmixMessageHandler struct{}
// saveMessage saves the message as a versioned object.
// SaveMessage saves the message as a versioned object at the specified key
// in the key value store.
func (cmh *cmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error {
msg := m.(format.Message)
......@@ -26,7 +27,9 @@ func (cmh *cmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key
return kv.Set(key, &obj)
}
// loadMessage loads the message with the specified key.
// LoadMessage returns the message with the specified key from the key value
// store. An empty message and error are returned if the message could not be
// retrieved.
func (cmh *cmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) {
// Load the versioned object
vo, err := kv.Get(key)
......@@ -35,23 +38,24 @@ func (cmh *cmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interf
}
// Create message from data
return format.Unmarshal(vo.Data), err
return format.Unmarshal(vo.Data), nil
}
// DeleteMessage deletes the message with the specified key.
// DeleteMessage deletes the message with the specified key from the key value
// store.
func (cmh *cmixMessageHandler) DeleteMessage(kv *versioned.KV, key string) error {
return kv.Delete(key)
}
// hashMessage generates a hash of the message.
// HashMessage generates a hash of the message.
func (cmh *cmixMessageHandler) HashMessage(m interface{}) MessageHash {
msg := m.(format.Message)
// Create message from data
return md5.Sum(msg.Marshal())
}
// CmixMessageBuffer wraps the message buffer to store and load raw cmix
// messages
// messages.
type CmixMessageBuffer struct {
mb *MessageBuffer
}
......
......@@ -6,45 +6,142 @@ import (
"gitlab.com/elixxir/ekv"
"gitlab.com/elixxir/primitives/format"
"math/rand"
"reflect"
"testing"
"time"
)
func Test_saveMessage(t *testing.T) {
// Test happy path of cmixMessageHandler.SaveMessage().
func TestCmixMessageHandler_SaveMessage(t *testing.T) {
// Set up test values
cmh := &cmixMessageHandler{}
kv := versioned.NewKV(make(ekv.Memstore))
subKey := "testKey"
testMsgs, _ := makeTestCmixMessages(1)
mh := cmh.HashMessage(testMsgs[0])
key := makeStoredMessageKey(subKey, mh)
testMsgs, _ := makeTestCmixMessages(10)
for _, msg := range testMsgs {
key := makeStoredMessageKey("testKey", cmh.HashMessage(msg))
// Save message
err := cmh.SaveMessage(kv, testMsgs[0], key)
err := cmh.SaveMessage(kv, msg, key)
if err != nil {
t.Errorf("saveMessage() returned an error."+
t.Errorf("SaveMessage() returned an error."+
"\n\texpected: %v\n\trecieved: %v", nil, err)
}
// Try to get message
obj, err := kv.Get(key)
if err != nil {
t.Errorf("Get() returned an error."+
t.Errorf("Get() returned an error: %v", err)
}
// Test if message retrieved matches expected
if !bytes.Equal(msg.Marshal(), obj.Data) {
t.Errorf("SaveMessage() returned versioned object with incorrect data."+
"\n\texpected: %v\n\treceived: %v",
msg, obj.Data)
}
}
}
// Test happy path of cmixMessageHandler.LoadMessage().
func TestCmixMessageHandler_LoadMessage(t *testing.T) {
// Set up test values
cmh := &cmixMessageHandler{}
kv := versioned.NewKV(make(ekv.Memstore))
testMsgs, _ := makeTestCmixMessages(10)
for _, msg := range testMsgs {
key := makeStoredMessageKey("testKey", cmh.HashMessage(msg))
// Save message
if err := cmh.SaveMessage(kv, msg, key); err != nil {
t.Errorf("SaveMessage() returned an error: %v", err)
}
// Try to load message
testMsg, err := cmh.LoadMessage(kv, key)
if err != nil {
t.Errorf("LoadMessage() returned an error."+
"\n\texpected: %v\n\trecieved: %v", nil, err)
}
if !bytes.Equal(testMsgs[0].Marshal(), obj.Data) {
t.Errorf("saveMessage() returned versioned object with incorrect data."+
// Test if message loaded matches expected
if !reflect.DeepEqual(msg, testMsg) {
t.Errorf("LoadMessage() returned an unexpected object."+
"\n\texpected: %v\n\treceived: %v",
testMsgs[0], obj.Data)
msg, testMsg)
}
}
}
// Smoke test of cmixMessageHandler.
func TestCmixMessageBuffer_Smoke(t *testing.T) {
// Set up test messages
testMsgs, _ := makeTestCmixMessages(2)
// Create new buffer
cmb, err := NewCmixMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey")
if err != nil {
t.Errorf("NewCmixMessageBuffer() returned an error."+
"\n\texpected: %v\n\trecieved: %v", nil, err)
}
// Add two messages
cmb.Add(testMsgs[0])
cmb.Add(testMsgs[1])
if len(cmb.mb.messages) != 2 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
2, len(cmb.mb.messages))
}
msg, exists := cmb.Next()
if !exists {
t.Error("Next() did not find any messages in buffer.")
}
cmb.Succeeded(msg)
if len(cmb.mb.messages) != 1 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
1, len(cmb.mb.messages))
}
msg, exists = cmb.Next()
if !exists {
t.Error("Next() did not find any messages in buffer.")
}
if len(cmb.mb.messages) != 0 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
0, len(cmb.mb.messages))
}
cmb.Failed(msg)
if len(cmb.mb.messages) != 1 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
1, len(cmb.mb.messages))
}
msg, exists = cmb.Next()
if !exists {
t.Error("Next() did not find any messages in buffer.")
}
cmb.Succeeded(msg)
msg, exists = cmb.Next()
if exists {
t.Error("Next() found a message in the buffer when it should be empty.")
}
cmb.Succeeded(msg)
if len(cmb.mb.messages) != 0 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
0, len(cmb.mb.messages))
}
}
// makeTestCmixMessages creates a list of messages with random data and the expected
// map after they are added to the buffer.
// makeTestMessages creates a list of messages with random data and the expected
// map after they are added to the buffer.
// makeTestCmixMessages creates a list of messages with random data and the
// expected map after they are added to the buffer.
func makeTestCmixMessages(n int) ([]format.Message, map[MessageHash]struct{}) {
cmh := &cmixMessageHandler{}
prng := rand.New(rand.NewSource(time.Now().UnixNano()))
......
......@@ -5,7 +5,9 @@ import (
"encoding/binary"
"encoding/json"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/context/message"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/xx_network/primitives/id"
"time"
)
......@@ -19,14 +21,14 @@ type e2eMessage struct {
MessageType uint32
}
// saveMessage saves the message as a versioned object.
// SaveMessage saves the e2eMessage as a versioned object at the specified key
// in the key value store.
func (emh *e2eMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error {
msg := m.(e2eMessage)
b, err := json.Marshal(&msg)
if err != nil {
jww.FATAL.Panicf("Failed to marshal e2e message for "+
"storage: %s", err)
jww.FATAL.Panicf("Failed to marshal e2e message for storage: %s", err)
}
// Create versioned object
......@@ -40,7 +42,9 @@ func (emh *e2eMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key s
return kv.Set(key, &obj)
}
// loadMessage loads the message with the specified key.
// LoadMessage returns the e2eMessage with the specified key from the key value
// store. An empty message and error are returned if the message could not be
// retrieved.
func (emh *e2eMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) {
// Load the versioned object
vo, err := kv.Get(key)
......@@ -48,22 +52,22 @@ func (emh *e2eMessageHandler) LoadMessage(kv *versioned.KV, key string) (interfa
return nil, err
}
// Unmarshal data into e2eMessage
msg := e2eMessage{}
if err := json.Unmarshal(vo.Data, &msg); err != nil {
jww.FATAL.Panicf("Failed to unmarshal e2e message for "+
"storage: %s", err)
jww.FATAL.Panicf("Failed to unmarshal e2e message for storage: %s", err)
}
// Create message from data
return msg, err
}
// DeleteMessage deletes the message with the specified key.
// DeleteMessage deletes the message with the specified key from the key value
// store.
func (emh *e2eMessageHandler) DeleteMessage(kv *versioned.KV, key string) error {
return kv.Delete(key)
}
// hashMessage generates a hash of the message.
// HashMessage generates a hash of the e2eMessage.
func (emh *e2eMessageHandler) HashMessage(m interface{}) MessageHash {
msg := m.(e2eMessage)
......@@ -75,6 +79,60 @@ func (emh *e2eMessageHandler) HashMessage(m interface{}) MessageHash {
binary.BigEndian.PutUint32(mtBytes, msg.MessageType)
digest = append(digest, mtBytes...)
// Create message from data
return md5.Sum(digest)
}
// E2eMessageBuffer wraps the message buffer to store and load raw e2eMessages.
type E2eMessageBuffer struct {
mb *MessageBuffer
}
func NewE2eMessageBuffer(kv *versioned.KV, key string) (*E2eMessageBuffer, error) {
mb, err := NewMessageBuffer(kv, &e2eMessageHandler{}, key)
if err != nil {
return nil, err
}
return &E2eMessageBuffer{mb: mb}, nil
}
func LoadE2eMessageBuffer(kv *versioned.KV, key string) (*E2eMessageBuffer, error) {
mb, err := LoadMessageBuffer(kv, &e2eMessageHandler{}, key)
if err != nil {
return nil, err
}
return &E2eMessageBuffer{mb: mb}, nil
}
func (emb *E2eMessageBuffer) Add(m message.Send) {
e2eMsg := e2eMessage{
Recipient: m.Recipient.Marshal(),
Payload: m.Payload,
MessageType: uint32(m.MessageType),
}
emb.mb.Add(e2eMsg)
}
func (emb *E2eMessageBuffer) Next() (message.Send, bool) {
m, ok := emb.mb.Next()
if !ok {
return message.Send{}, false
}
msg := m.(e2eMessage)
recipient, err := id.Unmarshal(msg.Recipient)
if err != nil {
jww.FATAL.Panicf("Error unmarshaling recipient: %v", err)
}
return message.Send{recipient, msg.Payload, message.Type(msg.MessageType)}, true
}
func (emb *E2eMessageBuffer) Succeeded(m message.Send) {
emb.mb.Succeeded(e2eMessage{m.Recipient.Marshal(), m.Payload, uint32(m.MessageType)})
}
func (emb *E2eMessageBuffer) Failed(m message.Send) {
emb.mb.Failed(e2eMessage{m.Recipient.Marshal(), m.Payload, uint32(m.MessageType)})
}
package utility
import (
"encoding/json"
"gitlab.com/elixxir/client/context/message"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/ekv"
"gitlab.com/xx_network/primitives/id"
"math/rand"
"reflect"
"testing"
"time"
)
// Test happy path of e2eMessageHandler.SaveMessage().
func TestE2EMessageHandler_SaveMessage(t *testing.T) {
// Set up test values
emg := &e2eMessageHandler{}
kv := versioned.NewKV(make(ekv.Memstore))
testMsgs, _ := makeTestE2EMessages(10, t)
for _, msg := range testMsgs {
key := makeStoredMessageKey("testKey", emg.HashMessage(msg))
// Save message
err := emg.SaveMessage(kv, msg, key)
if err != nil {
t.Errorf("SaveMessage() returned an error."+
"\n\texpected: %v\n\trecieved: %v", nil, err)
}
// Try to get message
obj, err := kv.Get(key)
if err != nil {
t.Errorf("Get() returned an error: %v", err)
}
// Test if message retrieved matches expected
testMsg := &e2eMessage{}
if err := json.Unmarshal(obj.Data, testMsg); err != nil {
t.Errorf("Failed to unmarshal message: %v", err)
}
if !reflect.DeepEqual(msg, *testMsg) {
t.Errorf("SaveMessage() returned versioned object with incorrect data."+
"\n\texpected: %v\n\treceived: %v",
msg, *testMsg)
}
}
}
// Test happy path of e2eMessageHandler.LoadMessage().
func TestE2EMessageHandler_LoadMessage(t *testing.T) {
// Set up test values
cmh := &e2eMessageHandler{}
kv := versioned.NewKV(make(ekv.Memstore))
testMsgs, _ := makeTestE2EMessages(10, t)
for _, msg := range testMsgs {
key := makeStoredMessageKey("testKey", cmh.HashMessage(msg))
// Save message
if err := cmh.SaveMessage(kv, msg, key); err != nil {
t.Errorf("SaveMessage() returned an error: %v", err)
}
// Try to load message
testMsg, err := cmh.LoadMessage(kv, key)
if err != nil {
t.Errorf("LoadMessage() returned an error."+
"\n\texpected: %v\n\trecieved: %v", nil, err)
}
// Test if message loaded matches expected
if !reflect.DeepEqual(msg, testMsg) {
t.Errorf("LoadMessage() returned an unexpected object."+
"\n\texpected: %v\n\treceived: %v",
msg, testMsg)
}
}
}
// Smoke test of e2eMessageHandler.
func TestE2EMessageHandler_Smoke(t *testing.T) {
// Set up test messages
_, testMsgs := makeTestE2EMessages(2, t)
// Create new buffer
cmb, err := NewE2eMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey")
if err != nil {
t.Errorf("NewE2eMessageBuffer() returned an error."+
"\n\texpected: %v\n\trecieved: %v", nil, err)
}
// Add two messages
cmb.Add(testMsgs[0])
cmb.Add(testMsgs[1])
if len(cmb.mb.messages) != 2 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
2, len(cmb.mb.messages))
}
msg, exists := cmb.Next()
if !exists {
t.Error("Next() did not find any messages in buffer.")
}
cmb.Succeeded(msg)
if len(cmb.mb.messages) != 1 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
1, len(cmb.mb.messages))
}
msg, exists = cmb.Next()
if !exists {
t.Error("Next() did not find any messages in buffer.")
}
if len(cmb.mb.messages) != 0 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
0, len(cmb.mb.messages))
}
cmb.Failed(msg)
if len(cmb.mb.messages) != 1 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
1, len(cmb.mb.messages))
}
msg, exists = cmb.Next()
if !exists {
t.Error("Next() did not find any messages in buffer.")
}
cmb.Succeeded(msg)
msg, exists = cmb.Next()
if exists {
t.Error("Next() found a message in the buffer when it should be empty.")
}
if len(cmb.mb.messages) != 0 {
t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d",
0, len(cmb.mb.messages))
}
}
// makeTestE2EMessages creates a list of messages with random data and the
// expected map after they are added to the buffer.
func makeTestE2EMessages(n int, t *testing.T) ([]e2eMessage, []message.Send) {
prng := rand.New(rand.NewSource(time.Now().UnixNano()))
msgs := make([]e2eMessage, n)
send := make([]message.Send, n)
for i := range msgs {
rngBytes := make([]byte, 128)
prng.Read(rngBytes)
msgs[i].Recipient = rngBytes
prng.Read(rngBytes)
msgs[i].Payload = rngBytes
prng.Read(rngBytes)
msgs[i].MessageType = uint32(rngBytes[0])
send[i].Recipient = id.NewIdFromString(string(msgs[i].Recipient), id.User, t)
send[i].Payload = msgs[i].Payload
send[i].MessageType = message.Type(msgs[i].MessageType)
}
return msgs, send
}
......@@ -9,7 +9,8 @@ import (
"time"
)
// MessageHash stores the key for each message stored in the buffer.
// MessageHash stores the hash of a message, which is used as the key for each
// message stored in the buffer.
type MessageHash [16]byte
// Sub key used in building keys for saving the message to the key value store
......@@ -18,12 +19,22 @@ const messageSubKey = "bufferedMessage"
// Version of the file saved to the key value store
const currentMessageBufferVersion = 0
// Message interface used to handle the passed in message type so this can be
// used at diffrent layers of the stack
// MessageHandler interface used to handle the passed in message type so the
// buffer can be used at different layers of the stack.
type MessageHandler interface {
// SaveMessage saves the message as a versioned object at the specified key
// in the key value store.
SaveMessage(kv *versioned.KV, m interface{}, key string) error
// LoadMessage returns the message with the specified key from the key value
// store.
LoadMessage(kv *versioned.KV, key string) (interface{}, error)
// DeleteMessage deletes the message with the specified key from the key
// value store.
DeleteMessage(kv *versioned.KV, key string) error
// HashMessage generates a hash of the message.
HashMessage(m interface{}) MessageHash
}
......@@ -37,18 +48,16 @@ type MessageBuffer struct {
messages map[MessageHash]struct{}
processingMessages map[MessageHash]struct{}
kv *versioned.KV
handler MessageHandler
key string
mux sync.RWMutex
}
// NewMessageBuffer creates a new empty buffer and saves it to the passed in key
// value store at the specified key. An error is returned on an unsuccessful
// save.
func NewMessageBuffer(kv *versioned.KV, handler MessageHandler, key string) (*MessageBuffer, error) {
func NewMessageBuffer(kv *versioned.KV, handler MessageHandler,
key string) (*MessageBuffer, error) {
// Create new empty buffer
mb := &MessageBuffer{
messages: make(map[MessageHash]struct{}),
......@@ -67,7 +76,8 @@ func NewMessageBuffer(kv *versioned.KV, handler MessageHandler, key string) (*Me
// LoadMessageBuffer loads an existing message buffer from the key value store
// into memory at the given key. Returns an error if buffer cannot be loaded.
func LoadMessageBuffer(kv *versioned.KV, handler MessageHandler, key string) (*MessageBuffer, error) {
func LoadMessageBuffer(kv *versioned.KV, handler MessageHandler,
key string) (*MessageBuffer, error) {
// Create new empty buffer
mb := &MessageBuffer{
messages: make(map[MessageHash]struct{}),
......@@ -228,13 +238,18 @@ func (mb *MessageBuffer) Succeeded(m interface{}) {
mb.mux.Lock()
defer mb.mux.Unlock()
// Remove message from buffer
delete(mb.processingMessages, h)
if err := mb.handler.DeleteMessage(mb.kv, makeStoredMessageKey(mb.key, h)); err != nil {
// Remove message from key value store
err := mb.handler.DeleteMessage(mb.kv, makeStoredMessageKey(mb.key, h))
if err != nil {
jww.FATAL.Fatalf("Failed to save: %v", err)
}
if err := mb.save(); err != nil {
// Save modified buffer to key value store
err = mb.save()
if err != nil {
jww.FATAL.Fatalf("Failed to save: %v", err)
}
}
......@@ -255,9 +270,6 @@ func (mb *MessageBuffer) Failed(m interface{}) {
}
/*
// saveMessage saves the message as a versioned object.
// loadMessage loads the message with the specified key.
func loadMessage(kv *versioned.KV, key string) (format.Message, error) {
// Load the versioned object
......@@ -277,6 +289,7 @@ func hashMessage(m format.Message) MessageHash {
return md5.Sum(m.Marshal())
}
*/
// makeStoredMessageKey generates a new key for the message based on its has.
func makeStoredMessageKey(key string, h MessageHash) string {
return key + messageSubKey + string(h[:])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment