From 42a463bce5e5533be8bd2f94e51a5ee8b4ba2c0b Mon Sep 17 00:00:00 2001 From: Niamh Nikali <niamh@elixxir.io> Date: Fri, 25 Sep 2020 14:16:39 -0700 Subject: [PATCH] Fix multipart message storage keying --- storage/conversation/partner.go | 2 +- storage/conversation/store.go | 2 +- storage/partition/multiPartMessage.go | 29 ++++++++-------------- storage/partition/multiPartMessage_test.go | 11 ++++---- storage/partition/part.go | 25 ++++++++----------- storage/partition/part_test.go | 21 ++++++---------- storage/partition/store.go | 4 ++- storage/partition/store_test.go | 4 ++- storage/versioned/kv.go | 2 +- 9 files changed, 44 insertions(+), 56 deletions(-) diff --git a/storage/conversation/partner.go b/storage/conversation/partner.go index f13f133b2..e1bd682de 100644 --- a/storage/conversation/partner.go +++ b/storage/conversation/partner.go @@ -181,5 +181,5 @@ func (c *Conversation) marshal() ([]byte, error) { } func makeConversationKey(partner *id.ID) string { - return partner.String() + return versioned.MakePartnerPrefix(partner) } diff --git a/storage/conversation/store.go b/storage/conversation/store.go index 443efb2f5..9a8201c69 100644 --- a/storage/conversation/store.go +++ b/storage/conversation/store.go @@ -16,7 +16,7 @@ type Store struct { //Returns a new conversation store made off of the KV func NewStore(kv *versioned.KV) *Store { - kv = kv.Prefix(conversationKeyPrefix).Prefix("Partner") + kv = kv.Prefix(conversationKeyPrefix) return &Store{ loadedConversations: make(map[id.ID]*Conversation), kv: kv, diff --git a/storage/partition/multiPartMessage.go b/storage/partition/multiPartMessage.go index 459e004f1..289076640 100644 --- a/storage/partition/multiPartMessage.go +++ b/storage/partition/multiPartMessage.go @@ -2,19 +2,19 @@ package partition import ( "encoding/json" + "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/ekv" "gitlab.com/xx_network/primitives/id" - "strconv" "sync" "time" ) const currentMultiPartMessageVersion = 0 -const keyMultiPartMessagePrefix = "MultiPartMessage" +const messageKey = "MultiPart" type multiPartMessage struct { Sender *id.ID @@ -33,10 +33,9 @@ type multiPartMessage struct { // creates a new one and saves it if one does not exist. func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64, kv *versioned.KV) *multiPartMessage { - kv = kv.Prefix(versioned.MakePartnerPrefix(sender)) - key := makeMultiPartMessageKey(messageID) + kv = kv.Prefix(versioned.MakePartnerPrefix(sender)).Prefix(fmt.Sprintf("MessageID:%d", messageID)) - obj, err := kv.Get(key) + obj, err := kv.Get(messageKey) if err != nil { if !ekv.Exists(err) { mpm := &multiPartMessage{ @@ -71,8 +70,6 @@ func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64, } func (mpm *multiPartMessage) save() error { - key := makeMultiPartMessageKey(mpm.MessageID) - data, err := json.Marshal(mpm) if err != nil { return errors.Wrap(err, "Failed to unmarshal multi-part message") @@ -84,7 +81,7 @@ func (mpm *multiPartMessage) save() error { Data: data, } - return mpm.kv.Set(key, &obj) + return mpm.kv.Set(messageKey, &obj) } func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) { @@ -99,7 +96,7 @@ func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) { mpm.parts[partNumber] = part mpm.PresentParts++ - if err := savePart(mpm.kv, mpm.MessageID, partNumber, part); err != nil { + if err := savePart(mpm.kv, partNumber, part); err != nil { jww.FATAL.Panicf("Failed to save multi part "+ "message part %v from %s messageID %v: %s", partNumber, mpm.Sender, mpm.MessageID, err) @@ -128,7 +125,7 @@ func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8, mpm.parts[partNumber] = part mpm.PresentParts++ - if err := savePart(mpm.kv, mpm.MessageID, partNumber, part); err != nil { + if err := savePart(mpm.kv, partNumber, part); err != nil { jww.FATAL.Panicf("Failed to save multi part "+ "message part %v from %s messageID %v: %s", partNumber, mpm.Sender, mpm.MessageID, err) @@ -158,12 +155,12 @@ func (mpm *multiPartMessage) IsComplete() (message.Receive, bool) { // Load all parts from disk, deleting files from disk as we go along for i := uint8(0); i < mpm.NumParts; i++ { if mpm.parts[i] == nil { - if mpm.parts[i], err = loadPart(mpm.kv, mpm.MessageID, i); err != nil { + if mpm.parts[i], err = loadPart(mpm.kv, i); err != nil { jww.FATAL.Panicf("Failed to load multi part "+ "message part %v from %s messageID %v: %s", i, mpm.Sender, mpm.MessageID, err) } - if err = deletePart(mpm.kv, mpm.MessageID, i); err != nil { + if err = deletePart(mpm.kv, i); err != nil { jww.FATAL.Panicf("Failed to delete multi part "+ "message part %v from %s messageID %v: %s", i, mpm.Sender, mpm.MessageID, err) @@ -198,14 +195,10 @@ func (mpm *multiPartMessage) IsComplete() (message.Receive, bool) { } func (mpm *multiPartMessage) delete() { - key := makeMultiPartMessageKey(mpm.MessageID) - if err := mpm.kv.Delete(key); err != nil { + //key := makeMultiPartMessageKey(mpm.MessageID) + if err := mpm.kv.Delete(messageKey); err != nil { jww.FATAL.Panicf("Failed to delete multi part "+ "message from %s messageID %v: %s", mpm.Sender, mpm.MessageID, err) } } - -func makeMultiPartMessageKey(messageID uint64) string { - return strconv.FormatUint(messageID, 32) -} diff --git a/storage/partition/multiPartMessage_test.go b/storage/partition/multiPartMessage_test.go index f1bc42bb8..c5577e4ec 100644 --- a/storage/partition/multiPartMessage_test.go +++ b/storage/partition/multiPartMessage_test.go @@ -39,7 +39,7 @@ func Test_loadOrCreateMultiPartMessage_Create(t *testing.T) { CheckMultiPartMessages(expectedMpm, mpm, t) - obj, err := mpm.kv.Get(makeMultiPartMessageKey(expectedMpm.MessageID)) + obj, err := mpm.kv.Get(messageKey) if err != nil { t.Errorf("Get() failed to get multiPartMessage from key value store: %v", err) } @@ -135,7 +135,7 @@ func TestMultiPartMessage_Add(t *testing.T) { t.Fatalf("Failed to marshal expected multiPartMessage: %v", err) } - obj, err := mpm.kv.Get(makeMultiPartMessageKey(mpm.MessageID)) + obj, err := mpm.kv.Get(messageKey) if err != nil { t.Errorf("Get() failed to get multiPartMessage from key value store: %v", err) } @@ -170,7 +170,7 @@ func TestMultiPartMessage_AddFirst(t *testing.T) { CheckMultiPartMessages(expectedMpm, npm, t) - data, err := loadPart(npm.kv, npm.MessageID, 2) + data, err := loadPart(npm.kv, 2) if err != nil { t.Errorf("loadPart() produced an error: %v", err) } @@ -233,13 +233,12 @@ func TestMultiPartMessage_delete(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) mpm := loadOrCreateMultiPartMessage(id.NewIdFromUInt(prng.Uint64(), id.User, t), prng.Uint64(), kv) - key := makeMultiPartMessageKey(mpm.MessageID) mpm.delete() - obj, err := kv.Get(key) + obj, err := kv.Get(messageKey) if ekv.Exists(err) { t.Errorf("delete() did not properly delete key %s."+ - "\n\tobject received: %+v", key, obj) + "\n\tobject received: %+v", messageKey, obj) } } diff --git a/storage/partition/part.go b/storage/partition/part.go index e72eddbdc..7a6052d96 100644 --- a/storage/partition/part.go +++ b/storage/partition/part.go @@ -1,16 +1,14 @@ package partition import ( + "fmt" "gitlab.com/elixxir/client/storage/versioned" - "strconv" "time" ) const currentMultiPartMessagePartVersion = 0 -const keyMultiPartMessagePartPrefix = "parts" -func loadPart(kv *versioned.KV, messageID uint64, partNum uint8) ([]byte, error) { - kv = multiPartMessagePartPrefix(kv, messageID) +func loadPart(kv *versioned.KV, partNum uint8) ([]byte, error) { key := makeMultiPartMessagePartKey(partNum) obj, err := kv.Get(key) @@ -21,8 +19,7 @@ func loadPart(kv *versioned.KV, messageID uint64, partNum uint8) ([]byte, error) return obj.Data, nil } -func savePart(kv *versioned.KV, messageID uint64, partNum uint8, part []byte) error { - kv = multiPartMessagePartPrefix(kv, messageID) +func savePart(kv *versioned.KV, partNum uint8, part []byte) error { key := makeMultiPartMessagePartKey(partNum) obj := versioned.Object{ @@ -34,17 +31,17 @@ func savePart(kv *versioned.KV, messageID uint64, partNum uint8, part []byte) er return kv.Set(key, &obj) } -func deletePart(kv *versioned.KV, messageID uint64, partNum uint8) error { - kv = multiPartMessagePartPrefix(kv, messageID) +func deletePart(kv *versioned.KV, partNum uint8) error { key := makeMultiPartMessagePartKey(partNum) return kv.Delete(key) } -func makeMultiPartMessagePartKey(partNum uint8) string { - return strconv.FormatUint(uint64(partNum), 32) +// Make the key for a part +func makeMultiPartMessagePartKey(part uint8) string { + return fmt.Sprintf("part:%v", part) } -func multiPartMessagePartPrefix(kv *versioned.KV, id uint64) *versioned.KV { - return kv.Prefix(keyMultiPartMessagePartPrefix). - Prefix(strconv.FormatUint(id, 32)) -} +//func multiPartMessagePartPrefix(kv *versioned.KV, id uint64) *versioned.KV { +// return kv.Prefix(keyMultiPartMessagePartPrefix). +// Prefix(strconv.FormatUint(id, 32)) +//} diff --git a/storage/partition/part_test.go b/storage/partition/part_test.go index 0da4de42b..d83d528fb 100644 --- a/storage/partition/part_test.go +++ b/storage/partition/part_test.go @@ -15,20 +15,19 @@ func Test_savePart(t *testing.T) { // Set up test values prng := rand.New(rand.NewSource(time.Now().UnixNano())) kv := versioned.NewKV(make(ekv.Memstore)) - messageID := prng.Uint64() partNum := uint8(prng.Uint32()) part := make([]byte, prng.Int31n(500)) prng.Read(part) key := makeMultiPartMessagePartKey(partNum) // Save part - err := savePart(kv, messageID, partNum, part) + err := savePart(kv, partNum, part) if err != nil { t.Errorf("savePart() produced an error: %v", err) } // Attempt to get from key value store - obj, err := multiPartMessagePartPrefix(kv, messageID).Get(key) + obj, err := kv.Get(key) if err != nil { t.Errorf("Get() produced an error: %v", err) } @@ -45,22 +44,20 @@ func Test_loadPart(t *testing.T) { jww.SetStdoutThreshold(jww.LevelTrace) // Set up test values prng := rand.New(rand.NewSource(time.Now().UnixNano())) - messageID := prng.Uint64() rootKv := versioned.NewKV(make(ekv.Memstore)) - kv := multiPartMessagePartPrefix(rootKv, messageID) partNum := uint8(prng.Uint32()) part := make([]byte, prng.Int31n(500)) prng.Read(part) key := makeMultiPartMessagePartKey(partNum) // Save part to key value store - err := kv.Set(key, &versioned.Object{Timestamp: time.Now(), Data: part}) + err := rootKv.Set(key, &versioned.Object{Timestamp: time.Now(), Data: part}) if err != nil { t.Fatalf("Failed to set object: %v", err) } // Load part from key value store - data, err := loadPart(rootKv, messageID, partNum) + data, err := loadPart(rootKv, partNum) if err != nil { t.Errorf("loadPart() produced an error: %v", err) } @@ -78,13 +75,12 @@ func Test_loadPart_NotFoundError(t *testing.T) { // Set up test values prng := rand.New(rand.NewSource(time.Now().UnixNano())) kv := versioned.NewKV(make(ekv.Memstore)) - messageID := prng.Uint64() partNum := uint8(prng.Uint32()) part := make([]byte, prng.Int31n(500)) prng.Read(part) // Load part from key value store - data, err := loadPart(kv, messageID, partNum) + data, err := loadPart(kv, partNum) if ekv.Exists(err) { t.Errorf("loadPart() found an item for the key: %v", err) } @@ -101,25 +97,24 @@ func TestDeletePart(t *testing.T) { // Set up test values prng := rand.New(rand.NewSource(time.Now().UnixNano())) kv := versioned.NewKV(make(ekv.Memstore)) - messageID := prng.Uint64() partNum := uint8(prng.Uint32()) part := make([]byte, prng.Int31n(500)) prng.Read(part) // Save part - err := savePart(kv, messageID, partNum, part) + err := savePart(kv, partNum, part) if err != nil { t.Fatalf("savePart() produced an error: %v", err) } // Attempt to delete part - err = deletePart(kv, messageID, partNum) + err = deletePart(kv, partNum) if err != nil { t.Errorf("deletePart() produced an error: %v", err) } // Check if part was deleted - _, err = loadPart(kv, messageID, partNum) + _, err = loadPart(kv, partNum) if ekv.Exists(err) { t.Errorf("part was found in key value store: %v", err) } diff --git a/storage/partition/store.go b/storage/partition/store.go index 4f5d6c61b..2b23d2834 100644 --- a/storage/partition/store.go +++ b/storage/partition/store.go @@ -12,6 +12,8 @@ import ( type multiPartID [16]byte +const packagePrefix = "Partition" + type Store struct { multiParts map[multiPartID]*multiPartMessage kv *versioned.KV @@ -21,7 +23,7 @@ type Store struct { func New(kv *versioned.KV) *Store { return &Store{ multiParts: make(map[multiPartID]*multiPartMessage), - kv: kv.Prefix(keyMultiPartMessagePrefix), + kv: kv.Prefix(packagePrefix), } } diff --git a/storage/partition/store_test.go b/storage/partition/store_test.go index eca4892fe..fab54bcd6 100644 --- a/storage/partition/store_test.go +++ b/storage/partition/store_test.go @@ -2,6 +2,7 @@ package partition import ( "bytes" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/ekv" @@ -16,7 +17,7 @@ func TestNew(t *testing.T) { rootKv := versioned.NewKV(make(ekv.Memstore)) expectedStore := &Store{ multiParts: make(map[multiPartID]*multiPartMessage), - kv: rootKv.Prefix(keyMultiPartMessagePrefix), + kv: rootKv.Prefix(packagePrefix), } store := New(rootKv) @@ -29,6 +30,7 @@ func TestNew(t *testing.T) { // Tests happy path of Store.AddFirst(). func TestStore_AddFirst(t *testing.T) { + jww.SetStdoutThreshold(jww.LevelTrace) part := []byte("Test message.") s := New(versioned.NewKV(ekv.Memstore{})) diff --git a/storage/versioned/kv.go b/storage/versioned/kv.go index f906022ed..cb1332478 100644 --- a/storage/versioned/kv.go +++ b/storage/versioned/kv.go @@ -25,7 +25,7 @@ func MakeKeyWithPrefix(dataType string, uniqueID string) string { // MakePartnerPrefix creates a string prefix // to denote who a conversation or relationship is with func MakePartnerPrefix(id *id.ID) string { - return fmt.Sprintf("%v:%v", "Partner:", id.String()) + return fmt.Sprintf("Partner:%v", id.String()) } // Upgrade functions must be of this type -- GitLab