Skip to content
Snippets Groups Projects
Commit 42a463bc authored by Niamh Nikali's avatar Niamh Nikali
Browse files

Fix multipart message storage keying

parent b6d65ddb
Branches
Tags
No related merge requests found
...@@ -181,5 +181,5 @@ func (c *Conversation) marshal() ([]byte, error) { ...@@ -181,5 +181,5 @@ func (c *Conversation) marshal() ([]byte, error) {
} }
func makeConversationKey(partner *id.ID) string { func makeConversationKey(partner *id.ID) string {
return partner.String() return versioned.MakePartnerPrefix(partner)
} }
...@@ -16,7 +16,7 @@ type Store struct { ...@@ -16,7 +16,7 @@ type Store struct {
//Returns a new conversation store made off of the KV //Returns a new conversation store made off of the KV
func NewStore(kv *versioned.KV) *Store { func NewStore(kv *versioned.KV) *Store {
kv = kv.Prefix(conversationKeyPrefix).Prefix("Partner") kv = kv.Prefix(conversationKeyPrefix)
return &Store{ return &Store{
loadedConversations: make(map[id.ID]*Conversation), loadedConversations: make(map[id.ID]*Conversation),
kv: kv, kv: kv,
......
...@@ -2,19 +2,19 @@ package partition ...@@ -2,19 +2,19 @@ package partition
import ( import (
"encoding/json" "encoding/json"
"fmt"
"github.com/pkg/errors" "github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/ekv" "gitlab.com/elixxir/ekv"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"strconv"
"sync" "sync"
"time" "time"
) )
const currentMultiPartMessageVersion = 0 const currentMultiPartMessageVersion = 0
const keyMultiPartMessagePrefix = "MultiPartMessage" const messageKey = "MultiPart"
type multiPartMessage struct { type multiPartMessage struct {
Sender *id.ID Sender *id.ID
...@@ -33,10 +33,9 @@ type multiPartMessage struct { ...@@ -33,10 +33,9 @@ type multiPartMessage struct {
// creates a new one and saves it if one does not exist. // creates a new one and saves it if one does not exist.
func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64, func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64,
kv *versioned.KV) *multiPartMessage { kv *versioned.KV) *multiPartMessage {
kv = kv.Prefix(versioned.MakePartnerPrefix(sender)) kv = kv.Prefix(versioned.MakePartnerPrefix(sender)).Prefix(fmt.Sprintf("MessageID:%d", messageID))
key := makeMultiPartMessageKey(messageID)
obj, err := kv.Get(key) obj, err := kv.Get(messageKey)
if err != nil { if err != nil {
if !ekv.Exists(err) { if !ekv.Exists(err) {
mpm := &multiPartMessage{ mpm := &multiPartMessage{
...@@ -71,8 +70,6 @@ func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64, ...@@ -71,8 +70,6 @@ func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64,
} }
func (mpm *multiPartMessage) save() error { func (mpm *multiPartMessage) save() error {
key := makeMultiPartMessageKey(mpm.MessageID)
data, err := json.Marshal(mpm) data, err := json.Marshal(mpm)
if err != nil { if err != nil {
return errors.Wrap(err, "Failed to unmarshal multi-part message") return errors.Wrap(err, "Failed to unmarshal multi-part message")
...@@ -84,7 +81,7 @@ func (mpm *multiPartMessage) save() error { ...@@ -84,7 +81,7 @@ func (mpm *multiPartMessage) save() error {
Data: data, Data: data,
} }
return mpm.kv.Set(key, &obj) return mpm.kv.Set(messageKey, &obj)
} }
func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) { func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) {
...@@ -99,7 +96,7 @@ func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) { ...@@ -99,7 +96,7 @@ func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) {
mpm.parts[partNumber] = part mpm.parts[partNumber] = part
mpm.PresentParts++ mpm.PresentParts++
if err := savePart(mpm.kv, mpm.MessageID, partNumber, part); err != nil { if err := savePart(mpm.kv, partNumber, part); err != nil {
jww.FATAL.Panicf("Failed to save multi part "+ jww.FATAL.Panicf("Failed to save multi part "+
"message part %v from %s messageID %v: %s", partNumber, mpm.Sender, "message part %v from %s messageID %v: %s", partNumber, mpm.Sender,
mpm.MessageID, err) mpm.MessageID, err)
...@@ -128,7 +125,7 @@ func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8, ...@@ -128,7 +125,7 @@ func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8,
mpm.parts[partNumber] = part mpm.parts[partNumber] = part
mpm.PresentParts++ mpm.PresentParts++
if err := savePart(mpm.kv, mpm.MessageID, partNumber, part); err != nil { if err := savePart(mpm.kv, partNumber, part); err != nil {
jww.FATAL.Panicf("Failed to save multi part "+ jww.FATAL.Panicf("Failed to save multi part "+
"message part %v from %s messageID %v: %s", partNumber, mpm.Sender, "message part %v from %s messageID %v: %s", partNumber, mpm.Sender,
mpm.MessageID, err) mpm.MessageID, err)
...@@ -158,12 +155,12 @@ func (mpm *multiPartMessage) IsComplete() (message.Receive, bool) { ...@@ -158,12 +155,12 @@ func (mpm *multiPartMessage) IsComplete() (message.Receive, bool) {
// Load all parts from disk, deleting files from disk as we go along // Load all parts from disk, deleting files from disk as we go along
for i := uint8(0); i < mpm.NumParts; i++ { for i := uint8(0); i < mpm.NumParts; i++ {
if mpm.parts[i] == nil { if mpm.parts[i] == nil {
if mpm.parts[i], err = loadPart(mpm.kv, mpm.MessageID, i); err != nil { if mpm.parts[i], err = loadPart(mpm.kv, i); err != nil {
jww.FATAL.Panicf("Failed to load multi part "+ jww.FATAL.Panicf("Failed to load multi part "+
"message part %v from %s messageID %v: %s", i, mpm.Sender, "message part %v from %s messageID %v: %s", i, mpm.Sender,
mpm.MessageID, err) mpm.MessageID, err)
} }
if err = deletePart(mpm.kv, mpm.MessageID, i); err != nil { if err = deletePart(mpm.kv, i); err != nil {
jww.FATAL.Panicf("Failed to delete multi part "+ jww.FATAL.Panicf("Failed to delete multi part "+
"message part %v from %s messageID %v: %s", i, mpm.Sender, "message part %v from %s messageID %v: %s", i, mpm.Sender,
mpm.MessageID, err) mpm.MessageID, err)
...@@ -198,14 +195,10 @@ func (mpm *multiPartMessage) IsComplete() (message.Receive, bool) { ...@@ -198,14 +195,10 @@ func (mpm *multiPartMessage) IsComplete() (message.Receive, bool) {
} }
func (mpm *multiPartMessage) delete() { func (mpm *multiPartMessage) delete() {
key := makeMultiPartMessageKey(mpm.MessageID) //key := makeMultiPartMessageKey(mpm.MessageID)
if err := mpm.kv.Delete(key); err != nil { if err := mpm.kv.Delete(messageKey); err != nil {
jww.FATAL.Panicf("Failed to delete multi part "+ jww.FATAL.Panicf("Failed to delete multi part "+
"message from %s messageID %v: %s", mpm.Sender, "message from %s messageID %v: %s", mpm.Sender,
mpm.MessageID, err) mpm.MessageID, err)
} }
} }
func makeMultiPartMessageKey(messageID uint64) string {
return strconv.FormatUint(messageID, 32)
}
...@@ -39,7 +39,7 @@ func Test_loadOrCreateMultiPartMessage_Create(t *testing.T) { ...@@ -39,7 +39,7 @@ func Test_loadOrCreateMultiPartMessage_Create(t *testing.T) {
CheckMultiPartMessages(expectedMpm, mpm, t) CheckMultiPartMessages(expectedMpm, mpm, t)
obj, err := mpm.kv.Get(makeMultiPartMessageKey(expectedMpm.MessageID)) obj, err := mpm.kv.Get(messageKey)
if err != nil { if err != nil {
t.Errorf("Get() failed to get multiPartMessage from key value store: %v", err) t.Errorf("Get() failed to get multiPartMessage from key value store: %v", err)
} }
...@@ -135,7 +135,7 @@ func TestMultiPartMessage_Add(t *testing.T) { ...@@ -135,7 +135,7 @@ func TestMultiPartMessage_Add(t *testing.T) {
t.Fatalf("Failed to marshal expected multiPartMessage: %v", err) t.Fatalf("Failed to marshal expected multiPartMessage: %v", err)
} }
obj, err := mpm.kv.Get(makeMultiPartMessageKey(mpm.MessageID)) obj, err := mpm.kv.Get(messageKey)
if err != nil { if err != nil {
t.Errorf("Get() failed to get multiPartMessage from key value store: %v", err) t.Errorf("Get() failed to get multiPartMessage from key value store: %v", err)
} }
...@@ -170,7 +170,7 @@ func TestMultiPartMessage_AddFirst(t *testing.T) { ...@@ -170,7 +170,7 @@ func TestMultiPartMessage_AddFirst(t *testing.T) {
CheckMultiPartMessages(expectedMpm, npm, t) CheckMultiPartMessages(expectedMpm, npm, t)
data, err := loadPart(npm.kv, npm.MessageID, 2) data, err := loadPart(npm.kv, 2)
if err != nil { if err != nil {
t.Errorf("loadPart() produced an error: %v", err) t.Errorf("loadPart() produced an error: %v", err)
} }
...@@ -233,13 +233,12 @@ func TestMultiPartMessage_delete(t *testing.T) { ...@@ -233,13 +233,12 @@ func TestMultiPartMessage_delete(t *testing.T) {
kv := versioned.NewKV(make(ekv.Memstore)) kv := versioned.NewKV(make(ekv.Memstore))
mpm := loadOrCreateMultiPartMessage(id.NewIdFromUInt(prng.Uint64(), id.User, t), mpm := loadOrCreateMultiPartMessage(id.NewIdFromUInt(prng.Uint64(), id.User, t),
prng.Uint64(), kv) prng.Uint64(), kv)
key := makeMultiPartMessageKey(mpm.MessageID)
mpm.delete() mpm.delete()
obj, err := kv.Get(key) obj, err := kv.Get(messageKey)
if ekv.Exists(err) { if ekv.Exists(err) {
t.Errorf("delete() did not properly delete key %s."+ t.Errorf("delete() did not properly delete key %s."+
"\n\tobject received: %+v", key, obj) "\n\tobject received: %+v", messageKey, obj)
} }
} }
......
package partition package partition
import ( import (
"fmt"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
"strconv"
"time" "time"
) )
const currentMultiPartMessagePartVersion = 0 const currentMultiPartMessagePartVersion = 0
const keyMultiPartMessagePartPrefix = "parts"
func loadPart(kv *versioned.KV, messageID uint64, partNum uint8) ([]byte, error) { func loadPart(kv *versioned.KV, partNum uint8) ([]byte, error) {
kv = multiPartMessagePartPrefix(kv, messageID)
key := makeMultiPartMessagePartKey(partNum) key := makeMultiPartMessagePartKey(partNum)
obj, err := kv.Get(key) obj, err := kv.Get(key)
...@@ -21,8 +19,7 @@ func loadPart(kv *versioned.KV, messageID uint64, partNum uint8) ([]byte, error) ...@@ -21,8 +19,7 @@ func loadPart(kv *versioned.KV, messageID uint64, partNum uint8) ([]byte, error)
return obj.Data, nil return obj.Data, nil
} }
func savePart(kv *versioned.KV, messageID uint64, partNum uint8, part []byte) error { func savePart(kv *versioned.KV, partNum uint8, part []byte) error {
kv = multiPartMessagePartPrefix(kv, messageID)
key := makeMultiPartMessagePartKey(partNum) key := makeMultiPartMessagePartKey(partNum)
obj := versioned.Object{ obj := versioned.Object{
...@@ -34,17 +31,17 @@ func savePart(kv *versioned.KV, messageID uint64, partNum uint8, part []byte) er ...@@ -34,17 +31,17 @@ func savePart(kv *versioned.KV, messageID uint64, partNum uint8, part []byte) er
return kv.Set(key, &obj) return kv.Set(key, &obj)
} }
func deletePart(kv *versioned.KV, messageID uint64, partNum uint8) error { func deletePart(kv *versioned.KV, partNum uint8) error {
kv = multiPartMessagePartPrefix(kv, messageID)
key := makeMultiPartMessagePartKey(partNum) key := makeMultiPartMessagePartKey(partNum)
return kv.Delete(key) return kv.Delete(key)
} }
func makeMultiPartMessagePartKey(partNum uint8) string { // Make the key for a part
return strconv.FormatUint(uint64(partNum), 32) func makeMultiPartMessagePartKey(part uint8) string {
return fmt.Sprintf("part:%v", part)
} }
func multiPartMessagePartPrefix(kv *versioned.KV, id uint64) *versioned.KV { //func multiPartMessagePartPrefix(kv *versioned.KV, id uint64) *versioned.KV {
return kv.Prefix(keyMultiPartMessagePartPrefix). // return kv.Prefix(keyMultiPartMessagePartPrefix).
Prefix(strconv.FormatUint(id, 32)) // Prefix(strconv.FormatUint(id, 32))
} //}
...@@ -15,20 +15,19 @@ func Test_savePart(t *testing.T) { ...@@ -15,20 +15,19 @@ func Test_savePart(t *testing.T) {
// Set up test values // Set up test values
prng := rand.New(rand.NewSource(time.Now().UnixNano())) prng := rand.New(rand.NewSource(time.Now().UnixNano()))
kv := versioned.NewKV(make(ekv.Memstore)) kv := versioned.NewKV(make(ekv.Memstore))
messageID := prng.Uint64()
partNum := uint8(prng.Uint32()) partNum := uint8(prng.Uint32())
part := make([]byte, prng.Int31n(500)) part := make([]byte, prng.Int31n(500))
prng.Read(part) prng.Read(part)
key := makeMultiPartMessagePartKey(partNum) key := makeMultiPartMessagePartKey(partNum)
// Save part // Save part
err := savePart(kv, messageID, partNum, part) err := savePart(kv, partNum, part)
if err != nil { if err != nil {
t.Errorf("savePart() produced an error: %v", err) t.Errorf("savePart() produced an error: %v", err)
} }
// Attempt to get from key value store // Attempt to get from key value store
obj, err := multiPartMessagePartPrefix(kv, messageID).Get(key) obj, err := kv.Get(key)
if err != nil { if err != nil {
t.Errorf("Get() produced an error: %v", err) t.Errorf("Get() produced an error: %v", err)
} }
...@@ -45,22 +44,20 @@ func Test_loadPart(t *testing.T) { ...@@ -45,22 +44,20 @@ func Test_loadPart(t *testing.T) {
jww.SetStdoutThreshold(jww.LevelTrace) jww.SetStdoutThreshold(jww.LevelTrace)
// Set up test values // Set up test values
prng := rand.New(rand.NewSource(time.Now().UnixNano())) prng := rand.New(rand.NewSource(time.Now().UnixNano()))
messageID := prng.Uint64()
rootKv := versioned.NewKV(make(ekv.Memstore)) rootKv := versioned.NewKV(make(ekv.Memstore))
kv := multiPartMessagePartPrefix(rootKv, messageID)
partNum := uint8(prng.Uint32()) partNum := uint8(prng.Uint32())
part := make([]byte, prng.Int31n(500)) part := make([]byte, prng.Int31n(500))
prng.Read(part) prng.Read(part)
key := makeMultiPartMessagePartKey(partNum) key := makeMultiPartMessagePartKey(partNum)
// Save part to key value store // Save part to key value store
err := kv.Set(key, &versioned.Object{Timestamp: time.Now(), Data: part}) err := rootKv.Set(key, &versioned.Object{Timestamp: time.Now(), Data: part})
if err != nil { if err != nil {
t.Fatalf("Failed to set object: %v", err) t.Fatalf("Failed to set object: %v", err)
} }
// Load part from key value store // Load part from key value store
data, err := loadPart(rootKv, messageID, partNum) data, err := loadPart(rootKv, partNum)
if err != nil { if err != nil {
t.Errorf("loadPart() produced an error: %v", err) t.Errorf("loadPart() produced an error: %v", err)
} }
...@@ -78,13 +75,12 @@ func Test_loadPart_NotFoundError(t *testing.T) { ...@@ -78,13 +75,12 @@ func Test_loadPart_NotFoundError(t *testing.T) {
// Set up test values // Set up test values
prng := rand.New(rand.NewSource(time.Now().UnixNano())) prng := rand.New(rand.NewSource(time.Now().UnixNano()))
kv := versioned.NewKV(make(ekv.Memstore)) kv := versioned.NewKV(make(ekv.Memstore))
messageID := prng.Uint64()
partNum := uint8(prng.Uint32()) partNum := uint8(prng.Uint32())
part := make([]byte, prng.Int31n(500)) part := make([]byte, prng.Int31n(500))
prng.Read(part) prng.Read(part)
// Load part from key value store // Load part from key value store
data, err := loadPart(kv, messageID, partNum) data, err := loadPart(kv, partNum)
if ekv.Exists(err) { if ekv.Exists(err) {
t.Errorf("loadPart() found an item for the key: %v", err) t.Errorf("loadPart() found an item for the key: %v", err)
} }
...@@ -101,25 +97,24 @@ func TestDeletePart(t *testing.T) { ...@@ -101,25 +97,24 @@ func TestDeletePart(t *testing.T) {
// Set up test values // Set up test values
prng := rand.New(rand.NewSource(time.Now().UnixNano())) prng := rand.New(rand.NewSource(time.Now().UnixNano()))
kv := versioned.NewKV(make(ekv.Memstore)) kv := versioned.NewKV(make(ekv.Memstore))
messageID := prng.Uint64()
partNum := uint8(prng.Uint32()) partNum := uint8(prng.Uint32())
part := make([]byte, prng.Int31n(500)) part := make([]byte, prng.Int31n(500))
prng.Read(part) prng.Read(part)
// Save part // Save part
err := savePart(kv, messageID, partNum, part) err := savePart(kv, partNum, part)
if err != nil { if err != nil {
t.Fatalf("savePart() produced an error: %v", err) t.Fatalf("savePart() produced an error: %v", err)
} }
// Attempt to delete part // Attempt to delete part
err = deletePart(kv, messageID, partNum) err = deletePart(kv, partNum)
if err != nil { if err != nil {
t.Errorf("deletePart() produced an error: %v", err) t.Errorf("deletePart() produced an error: %v", err)
} }
// Check if part was deleted // Check if part was deleted
_, err = loadPart(kv, messageID, partNum) _, err = loadPart(kv, partNum)
if ekv.Exists(err) { if ekv.Exists(err) {
t.Errorf("part was found in key value store: %v", err) t.Errorf("part was found in key value store: %v", err)
} }
......
...@@ -12,6 +12,8 @@ import ( ...@@ -12,6 +12,8 @@ import (
type multiPartID [16]byte type multiPartID [16]byte
const packagePrefix = "Partition"
type Store struct { type Store struct {
multiParts map[multiPartID]*multiPartMessage multiParts map[multiPartID]*multiPartMessage
kv *versioned.KV kv *versioned.KV
...@@ -21,7 +23,7 @@ type Store struct { ...@@ -21,7 +23,7 @@ type Store struct {
func New(kv *versioned.KV) *Store { func New(kv *versioned.KV) *Store {
return &Store{ return &Store{
multiParts: make(map[multiPartID]*multiPartMessage), multiParts: make(map[multiPartID]*multiPartMessage),
kv: kv.Prefix(keyMultiPartMessagePrefix), kv: kv.Prefix(packagePrefix),
} }
} }
......
...@@ -2,6 +2,7 @@ package partition ...@@ -2,6 +2,7 @@ package partition
import ( import (
"bytes" "bytes"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/ekv" "gitlab.com/elixxir/ekv"
...@@ -16,7 +17,7 @@ func TestNew(t *testing.T) { ...@@ -16,7 +17,7 @@ func TestNew(t *testing.T) {
rootKv := versioned.NewKV(make(ekv.Memstore)) rootKv := versioned.NewKV(make(ekv.Memstore))
expectedStore := &Store{ expectedStore := &Store{
multiParts: make(map[multiPartID]*multiPartMessage), multiParts: make(map[multiPartID]*multiPartMessage),
kv: rootKv.Prefix(keyMultiPartMessagePrefix), kv: rootKv.Prefix(packagePrefix),
} }
store := New(rootKv) store := New(rootKv)
...@@ -29,6 +30,7 @@ func TestNew(t *testing.T) { ...@@ -29,6 +30,7 @@ func TestNew(t *testing.T) {
// Tests happy path of Store.AddFirst(). // Tests happy path of Store.AddFirst().
func TestStore_AddFirst(t *testing.T) { func TestStore_AddFirst(t *testing.T) {
jww.SetStdoutThreshold(jww.LevelTrace)
part := []byte("Test message.") part := []byte("Test message.")
s := New(versioned.NewKV(ekv.Memstore{})) s := New(versioned.NewKV(ekv.Memstore{}))
......
...@@ -25,7 +25,7 @@ func MakeKeyWithPrefix(dataType string, uniqueID string) string { ...@@ -25,7 +25,7 @@ func MakeKeyWithPrefix(dataType string, uniqueID string) string {
// MakePartnerPrefix creates a string prefix // MakePartnerPrefix creates a string prefix
// to denote who a conversation or relationship is with // to denote who a conversation or relationship is with
func MakePartnerPrefix(id *id.ID) string { func MakePartnerPrefix(id *id.ID) string {
return fmt.Sprintf("%v:%v", "Partner:", id.String()) return fmt.Sprintf("Partner:%v", id.String())
} }
// Upgrade functions must be of this type // Upgrade functions must be of this type
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment