Skip to content
Snippets Groups Projects
Commit 32e83ff9 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

Merge branch 'XX-2571/messageBuffer' into 'peppa/newClient'

XX-2571 / MessageBuffer

See merge request !393
parents 7de35428 6a41f9c8
No related branches found
No related tags found
No related merge requests found
...@@ -2,22 +2,30 @@ module gitlab.com/elixxir/client ...@@ -2,22 +2,30 @@ module gitlab.com/elixxir/client
go 1.13 go 1.13
replace google.golang.org/grpc => github.com/grpc/grpc-go v1.27.1
require ( require (
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
github.com/golang/protobuf v1.4.2 github.com/golang/protobuf v1.4.2
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
github.com/pelletier/go-toml v1.6.0 // indirect
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/smartystreets/assertions v1.0.1 // indirect
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/cobra v1.0.0 github.com/spf13/cobra v1.0.0
github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/jwalterweatherman v1.1.0
github.com/spf13/viper v1.7.1 github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.6.2
gitlab.com/elixxir/comms v0.0.0-20200903181126-c92d7a304999 gitlab.com/elixxir/comms v0.0.0-20200903181126-c92d7a304999
gitlab.com/elixxir/crypto v0.0.0-20200907171019-008a9d4aa264 gitlab.com/elixxir/crypto v0.0.0-20200907171019-008a9d4aa264
gitlab.com/elixxir/ekv v0.1.1 gitlab.com/elixxir/ekv v0.1.1
gitlab.com/elixxir/primitives v0.0.0-20200907165319-16ed0124890b gitlab.com/elixxir/primitives v0.0.0-20200903200059-7dcf9b844a40
gitlab.com/xx_network/comms v0.0.0-20200825213037-f58fa7c0a641 gitlab.com/xx_network/comms v0.0.0-20200825213037-f58fa7c0a641
gitlab.com/xx_network/crypto v0.0.0-20200812183430-c77a5281c686 gitlab.com/xx_network/crypto v0.0.0-20200812183430-c77a5281c686
gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2 gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/sys v0.0.0-20200828194041-157a740278f4 // indirect
google.golang.org/protobuf v1.25.0 google.golang.org/protobuf v1.25.0
gopkg.in/ini.v1 v1.52.0 // indirect
) )
replace google.golang.org/grpc => github.com/grpc/grpc-go v1.27.1
This diff is collapsed.
package utility
import (
"crypto/md5"
"encoding/json"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/primitives/format"
"sync"
"time"
)
// messageHash stores the key for each message stored in the buffer.
type messageHash [16]byte
// Sub key used in building keys for saving the message to the key value store
const messageSubKey = "bufferedMessage"
// Version of the file saved to the key value store
const currentMessageBufferVersion = 0
// MessageBuffer holds a list of messages in the "not processed" or "processing"
// state both in memory. Messages in the "not processed" state are held in the
// messages map and messages in the "processing" state are moved into the
// processingMessages map. When the message is done being processed, it is
// removed from the buffer. The actual messages are saved in the key value store
// along with a copy of the buffer that is held in memory.
type MessageBuffer struct {
messages map[messageHash]struct{}
processingMessages map[messageHash]struct{}
kv *versioned.KV
key string
mux sync.RWMutex
}
// NewMessageBuffer creates a new empty buffer and saves it to the passed in key
// value store at the specified key. An error is returned on an unsuccessful
// save.
func NewMessageBuffer(kv *versioned.KV, key string) (*MessageBuffer, error) {
// Create new empty buffer
mb := &MessageBuffer{
messages: make(map[messageHash]struct{}),
processingMessages: make(map[messageHash]struct{}),
kv: kv,
key: key,
}
// Save the buffer
err := mb.save()
// Return the new buffer or an error if saving failed
return mb, err
}
// LoadMessageBuffer loads an existing message buffer from the key value store
// into memory at the given key. Returns an error if buffer cannot be loaded.
func LoadMessageBuffer(kv *versioned.KV, key string) (*MessageBuffer, error) {
// Create new empty buffer
mb := &MessageBuffer{
messages: make(map[messageHash]struct{}),
processingMessages: make(map[messageHash]struct{}),
kv: kv,
key: key,
}
// Load rounds into buffer
err := mb.load()
// Return the filled buffer or an error if loading failed
return mb, err
}
// save saves the buffer as a versioned object. All messages, regardless if they
// are in the "not processed" or "processing" state are stored together and
// considered "not processed".
func (mb *MessageBuffer) save() error {
now := time.Now()
// Build a combined list of message hashes in messages + processingMessages
allMessages := mb.getMessageList()
// Marshal list of message hashes into byte slice
data, err := json.Marshal(allMessages)
if err != nil {
return err
}
// Create versioned object with data
obj := versioned.Object{
Version: currentMessageBufferVersion,
Timestamp: now,
Data: data,
}
// Save versioned object
return mb.kv.Set(mb.key, &obj)
}
// getMessageList returns a list of all message hashes stored in messages and
// processingMessages in a random order.
func (mb *MessageBuffer) getMessageList() []messageHash {
// Create new slice with a length to fit all messages in either list
msgs := make([]messageHash, len(mb.messages)+len(mb.processingMessages))
i := 0
// Add messages from the "not processed" list
for msg := range mb.messages {
msgs[i] = msg
i++
}
// Add messages from the "processing" list
for msg := range mb.processingMessages {
msgs[i] = msg
i++
}
return msgs
}
// load retrieves all the messages from the versioned object and stores them as
// unprocessed messages.
func (mb *MessageBuffer) load() error {
// Load the versioned object
vo, err := mb.kv.Get(mb.key)
if err != nil {
return err
}
// Create slice of message hashes from data
var msgs []messageHash
err = json.Unmarshal(vo.Data, &msgs)
if err != nil {
return err
}
// Convert slice to map and save all rounds as unprocessed
for _, m := range msgs {
mb.messages[m] = struct{}{}
}
return nil
}
// Add adds a message to the buffer in "not processing" state.
func (mb *MessageBuffer) Add(m format.Message) {
h := hashMessage(m)
mb.mux.Lock()
defer mb.mux.Unlock()
// Ensure message does not already exist in buffer
_, exists1 := mb.messages[h]
_, exists2 := mb.processingMessages[h]
if exists1 || exists2 {
return
}
// Save message as versioned object
err := saveMessage(mb.kv, m, makeStoredMessageKey(mb.key, h))
if err != nil {
jww.FATAL.Panicf("Error saving message: %v", err)
}
// Add message to the buffer
mb.messages[h] = struct{}{}
// Save buffer
err = mb.save()
if err != nil {
jww.FATAL.Panicf("Error whilse saving buffer: %v", err)
}
}
// Next gets the next message from the buffer whose state is "not processing".
// The returned messages are moved to the processing state. If there are no
// messages remaining, then false is returned.
func (mb *MessageBuffer) Next() (format.Message, bool) {
mb.mux.Lock()
defer mb.mux.Unlock()
if len(mb.messages) == 0 {
return format.Message{}, false
}
// Pop the next messageHash from the "not processing" list
h := next(mb.messages)
delete(mb.messages, h)
// Add message to list of processing messages
mb.processingMessages[h] = struct{}{}
// Retrieve the message for storage
m, err := loadMessage(mb.kv, makeStoredMessageKey(mb.key, h))
if err != nil {
jww.FATAL.Panicf("Could not load message: %v", err)
}
return m, true
}
// next returns the first messageHash in the map returned by range.
func next(msgMap map[messageHash]struct{}) messageHash {
for h := range msgMap {
return h
}
return messageHash{}
}
// Succeeded sets a messaged as processed and removed it from the buffer.
func (mb *MessageBuffer) Succeeded(m format.Message) {
h := hashMessage(m)
mb.mux.Lock()
defer mb.mux.Unlock()
delete(mb.processingMessages, h)
err := mb.save()
if err != nil {
jww.FATAL.Fatalf("Failed to save: %v", err)
}
}
// Failed sets a message as failed to process. It changes the message back to
// the "not processed" state.
func (mb *MessageBuffer) Failed(m format.Message) {
h := hashMessage(m)
mb.mux.Lock()
defer mb.mux.Unlock()
// Remove from "processing" state
delete(mb.processingMessages, h)
// Add to "not processed" state
mb.messages[h] = struct{}{}
}
// saveMessage saves the message as a versioned object.
func saveMessage(kv *versioned.KV, m format.Message, key string) error {
now := time.Now()
// Create versioned object
obj := versioned.Object{
Version: currentMessageBufferVersion,
Timestamp: now,
Data: m.Marshal(),
}
// Save versioned object
return kv.Set(key, &obj)
}
// loadMessage loads the message with the specified key.
func loadMessage(kv *versioned.KV, key string) (format.Message, error) {
// Load the versioned object
vo, err := kv.Get(key)
if err != nil {
return format.Message{}, err
}
// Create message from data
return format.Unmarshal(vo.Data), err
}
// hashMessage generates a hash of the message.
func hashMessage(m format.Message) messageHash {
// Sum returns a array that is the exact same size as the messageHash and Go
// apparently automatically casts it
return md5.Sum(m.Marshal())
}
// makeStoredMessageKey generates a new key for the message based on its has.
func makeStoredMessageKey(key string, h messageHash) string {
return key + messageSubKey + string(h[:])
}
package utility
import (
"bytes"
"encoding/json"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/ekv"
"gitlab.com/elixxir/primitives/format"
"math/rand"
"reflect"
"testing"
"time"
)
// Tests happy path of NewMessageBuffer.
func TestNewMessageBuffer(t *testing.T) {
// Set up expected value
expectedMB := &MessageBuffer{
messages: make(map[messageHash]struct{}),
processingMessages: make(map[messageHash]struct{}),
kv: versioned.NewKV(make(ekv.Memstore)),
key: "testKey",
}
testMB, err := NewMessageBuffer(expectedMB.kv, expectedMB.key)
if err != nil {
t.Errorf("NewMessageBuffer() returned an error."+
"\n\texpected: %v\n\treceived: %v", nil, err)
}
if !reflect.DeepEqual(expectedMB, testMB) {
t.Errorf("NewMessageBuffer() returned an incorrect MessageBuffer."+
"\n\texpected: %v\n\treceived: %v", expectedMB, testMB)
}
}
// Tests happy path of TestLoadMessageBuffer.
func TestLoadMessageBuffer(t *testing.T) {
// Set up expected value
expectedMB := &MessageBuffer{
messages: make(map[messageHash]struct{}),
processingMessages: make(map[messageHash]struct{}),
kv: versioned.NewKV(make(ekv.Memstore)),
key: "testKey",
}
_ = addTestMessages(expectedMB, 20)
err := expectedMB.save()
if err != nil {
t.Fatalf("Error saving MessageBuffer: %v", err)
}
testMB, err := LoadMessageBuffer(expectedMB.kv, expectedMB.key)
// Move all the messages into one map to match the output
for mh := range expectedMB.processingMessages {
expectedMB.messages[mh] = struct{}{}
}
expectedMB.processingMessages = make(map[messageHash]struct{})
if err != nil {
t.Errorf("LoadMessageBuffer() returned an error."+
"\n\texpected: %v\n\treceived: %v", nil, err)
}
if !reflect.DeepEqual(expectedMB, testMB) {
t.Errorf("NewMessageBuffer() returned an incorrect MessageBuffer."+
"\n\texpected: %+v\n\treceived: %+v", expectedMB, testMB)
}
}
// Tests happy path of save() with a new empty MessageBuffer.
func TestMessageBuffer_save_NewMB(t *testing.T) {
kv := versioned.NewKV(make(ekv.Memstore))
key := "testKey"
mb, err := NewMessageBuffer(kv, key)
if err != nil {
t.Fatalf("Failed to create new MessageBuffer: %v", err)
}
err = mb.save()
if err != nil {
t.Errorf("save() returned an error."+
"\n\texpected: %v\n\treceived: %v", nil, err)
}
obj, err := kv.Get(key)
if err != nil {
t.Errorf("save() did not correctly save buffer with key %+v to storage."+
"\n\terror: %v", key, err)
}
var messageArr []messageHash
err = json.Unmarshal(obj.Data, &messageArr)
if !reflect.DeepEqual([]messageHash{}, messageArr) {
t.Errorf("save() returned versioned object with incorrect data."+
"\n\texpected: %#v\n\treceived: %#v",
[]messageHash{}, messageArr)
}
}
// Tests happy path of save().
func TestMessageBuffer_save(t *testing.T) {
kv := versioned.NewKV(make(ekv.Memstore))
key := "testKey"
mb, err := NewMessageBuffer(kv, key)
if err != nil {
t.Fatalf("Failed to create new MessageBuffer: %v", err)
}
expectedMH := addTestMessages(mb, 20)
err = mb.save()
if err != nil {
t.Errorf("save() returned an error."+
"\n\texpected: %v\n\treceived: %v", nil, err)
}
obj, err := kv.Get(key)
if err != nil {
t.Errorf("save() did not correctly save buffer with key %+v to storage."+
"\n\terror: %v", key, err)
}
var messageArr []messageHash
err = json.Unmarshal(obj.Data, &messageArr)
if !cmpMessageHash(expectedMH, messageArr) {
t.Errorf("save() returned versioned object with incorrect data."+
"\n\texpected: %v\n\treceived: %v",
expectedMH, messageArr)
}
}
// Tests happy path of MessageBuffer.Add().
func TestMessageBuffer_Add(t *testing.T) {
// Create new MessageBuffer and fill with messages
testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey")
if err != nil {
t.Fatalf("Failed to create new MessageBuffer: %v", err)
}
testMsgs, expectedMessages := makeTestMessages(20)
for _, m := range testMsgs {
testMB.Add(m)
}
if !reflect.DeepEqual(expectedMessages, testMB.messages) {
t.Errorf("Add() failed to add messages correctly into the buffer."+
"\n\texpected: %v\n\trecieved: %v",
expectedMessages, testMB.messages)
}
// Test adding duplicates
for _, m := range testMsgs {
testMB.Add(m)
}
if !reflect.DeepEqual(expectedMessages, testMB.messages) {
t.Errorf("Add() failed to add messages correctly into the buffer."+
"\n\texpected: %v\n\trecieved: %v",
expectedMessages, testMB.messages)
}
}
// Tests happy path of MessageBuffer.Next().
func TestMessageBuffer_Next(t *testing.T) {
// Create new MessageBuffer and fill with messages
testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey")
if err != nil {
t.Fatalf("Failed to create new MessageBuffer: %v", err)
}
testMsgs, _ := makeTestMessages(20)
for _, m := range testMsgs {
testMB.Add(m)
}
for m, exists := testMB.Next(); exists; m, exists = testMB.Next() {
foundMsg := false
for i := range testMsgs {
if bytes.Equal(testMsgs[i].Marshal(), m.Marshal()) {
foundMsg = true
testMsgs[i] = testMsgs[len(testMsgs)-1]
testMsgs[len(testMsgs)-1] = format.Message{}
testMsgs = testMsgs[:len(testMsgs)-1]
break
}
}
if !foundMsg {
t.Errorf("Next() returned the wrong message."+
"\n\trecieved: %+v", m)
}
}
}
func Test_saveMessage(t *testing.T) {
// Set up test values
kv := versioned.NewKV(make(ekv.Memstore))
subKey := "testKey"
testMsgs, _ := makeTestMessages(1)
mh := hashMessage(testMsgs[0])
key := makeStoredMessageKey(subKey, mh)
// Save message
err := saveMessage(kv, testMsgs[0], key)
if err != nil {
t.Errorf("saveMessage() returned an error."+
"\n\texpected: %v\n\trecieved: %v", nil, err)
}
// Try to get message
obj, err := kv.Get(key)
if err != nil {
t.Errorf("Get() returned an error."+
"\n\texpected: %v\n\trecieved: %v", nil, err)
}
if !bytes.Equal(testMsgs[0].Marshal(), obj.Data) {
t.Errorf("saveMessage() returned versioned object with incorrect data."+
"\n\texpected: %v\n\treceived: %v",
testMsgs[0], obj.Data)
}
}
// Tests happy path of MessageBuffer.Succeeded().
func TestMessageBuffer_Succeeded(t *testing.T) {
// Create new MessageBuffer and fill with message
testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey")
if err != nil {
t.Fatalf("Failed to create new MessageBuffer: %v", err)
}
testMsgs, _ := makeTestMessages(1)
for _, m := range testMsgs {
testMB.Add(m)
}
// Get message
m, _ := testMB.Next()
testMB.Succeeded(m)
_, exists1 := testMB.messages[hashMessage(m)]
_, exists2 := testMB.processingMessages[hashMessage(m)]
if exists1 || exists2 {
t.Errorf("Succeeded() did not remove the message from the buffer."+
"\n\tbuffer: %+v", testMB)
}
}
// Tests happy path of MessageBuffer.Failed().
func TestMessageBuffer_Failed(t *testing.T) {
// Create new MessageBuffer and fill with message
testMB, err := NewMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey")
if err != nil {
t.Fatalf("Failed to create new MessageBuffer: %v", err)
}
testMsgs, _ := makeTestMessages(1)
for _, m := range testMsgs {
testMB.Add(m)
}
// Get message
m, _ := testMB.Next()
testMB.Failed(m)
_, exists1 := testMB.messages[hashMessage(m)]
_, exists2 := testMB.processingMessages[hashMessage(m)]
if !exists1 || exists2 {
t.Errorf("Failed() did not move the message back into the \"not "+
"processed\" state.\n\tbuffer: %+v", testMB)
}
}
// addTestMessages adds random messages to the buffer.
func addTestMessages(mb *MessageBuffer, n int) []messageHash {
prng := rand.New(rand.NewSource(time.Now().UnixNano()))
msgs := make([]messageHash, n)
for i := 0; i < n; i++ {
keyData := make([]byte, 16)
prng.Read(keyData)
mh := messageHash{}
copy(mh[:], keyData)
if i%10 == 0 {
mb.processingMessages[mh] = struct{}{}
} else {
mb.messages[mh] = struct{}{}
}
msgs[i] = mh
}
return msgs
}
// cmpMessageHash compares two slices of messageHash to see if they have the
// exact same elements in any order.
func cmpMessageHash(arrA, arrB []messageHash) bool {
if len(arrA) != len(arrB) {
return false
}
for _, a := range arrA {
foundInB := false
for _, b := range arrB {
if a == b {
foundInB = true
break
}
}
if !foundInB {
return false
}
}
return true
}
// makeTestMessages creates a list of messages with random data and the expected
// map after they are added to the buffer.
func makeTestMessages(n int) ([]format.Message, map[messageHash]struct{}) {
prng := rand.New(rand.NewSource(time.Now().UnixNano()))
mh := map[messageHash]struct{}{}
msgs := make([]format.Message, n)
for i := range msgs {
msgs[i] = format.NewMessage(128)
payload := make([]byte, 128)
prng.Read(payload)
msgs[i].SetPayloadA(payload)
prng.Read(payload)
msgs[i].SetPayloadB(payload)
mh[hashMessage(msgs[i])] = struct{}{}
}
return msgs, mh
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment