diff --git a/storage/partition/multiPartMessage.go b/storage/partition/multiPartMessage.go index 754c524702498c2c6586d54a42adedf3351a7cc5..0bddd605fd9da711f95c2c60cdeaee2c1e91f5a7 100644 --- a/storage/partition/multiPartMessage.go +++ b/storage/partition/multiPartMessage.go @@ -30,7 +30,10 @@ type multiPartMessage struct { MessageID uint64 NumParts uint8 PresentParts uint8 - Timestamp time.Time + // Timestamp of message from sender + SenderTimestamp time.Time + // Timestamp in which message was stored in RAM + StorageTimestamp time.Time MessageType message.Type parts [][]byte @@ -52,7 +55,7 @@ func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64, MessageID: messageID, NumParts: 0, PresentParts: 0, - Timestamp: time.Time{}, + SenderTimestamp: time.Time{}, MessageType: 0, kv: kv, } @@ -119,7 +122,7 @@ func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) { } func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8, - numParts uint8, timestamp time.Time, part []byte) { + numParts uint8, senderTimestamp, storageTimestamp time.Time, part []byte) { mpm.mux.Lock() defer mpm.mux.Unlock() @@ -129,10 +132,11 @@ func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8, } mpm.NumParts = numParts - mpm.Timestamp = timestamp + mpm.SenderTimestamp = senderTimestamp mpm.MessageType = mt mpm.parts[partNumber] = part mpm.PresentParts++ + mpm.StorageTimestamp = storageTimestamp if err := savePart(mpm.kv, partNumber, part); err != nil { jww.FATAL.Panicf("Failed to save multi part "+ @@ -159,27 +163,8 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message mpm.parts = append(mpm.parts, make([][]byte, int(mpm.NumParts)-len(mpm.parts))...) } - var err error - lenMsg := 0 - // Load all parts from disk, deleting files from disk as we go along - for i := uint8(0); i < mpm.NumParts; i++ { - if mpm.parts[i] == nil { - if mpm.parts[i], err = loadPart(mpm.kv, i); err != nil { - jww.FATAL.Panicf("Failed to load multi part "+ - "message part %v from %s messageID %v: %s", i, mpm.Sender, - mpm.MessageID, err) - } - if err = deletePart(mpm.kv, i); err != nil { - jww.FATAL.Panicf("Failed to delete multi part "+ - "message part %v from %s messageID %v: %s", i, mpm.Sender, - mpm.MessageID, err) - } - } - lenMsg += len(mpm.parts[i]) - } - // delete the multipart message - mpm.delete() + lenMsg := mpm.delete() mpm.mux.Unlock() // Reconstruct the message @@ -200,7 +185,7 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message Payload: reconstructed, MessageType: mpm.MessageType, Sender: mpm.Sender, - Timestamp: mpm.Timestamp, + Timestamp: mpm.SenderTimestamp, // Encryption will be set externally Encryption: 0, ID: mid, @@ -209,7 +194,27 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message return m, true } -func (mpm *multiPartMessage) delete() { +// deletes all parts from disk and RAM. Returns the message length for reconstruction +func (mpm *multiPartMessage) delete() int { + // Load all parts from disk, deleting files from disk as we go along + var err error + lenMsg := 0 + for i := uint8(0); i < mpm.NumParts; i++ { + if mpm.parts[i] == nil { + if mpm.parts[i], err = loadPart(mpm.kv, i); err != nil { + jww.FATAL.Panicf("Failed to load multi part "+ + "message part %v from %s messageID %v: %s", i, mpm.Sender, + mpm.MessageID, err) + } + if err = deletePart(mpm.kv, i); err != nil { + jww.FATAL.Panicf("Failed to delete multi part "+ + "message part %v from %s messageID %v: %s", i, mpm.Sender, + mpm.MessageID, err) + } + } + lenMsg += len(mpm.parts[i]) + } + //key := makeMultiPartMessageKey(mpm.MessageID) if err := mpm.kv.Delete(messageKey, currentMultiPartMessageVersion); err != nil { @@ -217,4 +222,6 @@ func (mpm *multiPartMessage) delete() { "message from %s messageID %v: %s", mpm.Sender, mpm.MessageID, err) } + + return lenMsg } diff --git a/storage/partition/multiPartMessage_test.go b/storage/partition/multiPartMessage_test.go index dff3c0fabb41cffd20f714497b487ea8bcbd4644..cf8a02a55d7db9417ba681940b78255ae066c05b 100644 --- a/storage/partition/multiPartMessage_test.go +++ b/storage/partition/multiPartMessage_test.go @@ -31,7 +31,7 @@ func Test_loadOrCreateMultiPartMessage_Create(t *testing.T) { MessageID: prng.Uint64(), NumParts: 0, PresentParts: 0, - Timestamp: time.Time{}, + SenderTimestamp: time.Time{}, MessageType: 0, kv: versioned.NewKV(make(ekv.Memstore)), } @@ -67,7 +67,7 @@ func Test_loadOrCreateMultiPartMessage_Load(t *testing.T) { MessageID: prng.Uint64(), NumParts: 0, PresentParts: 0, - Timestamp: time.Time{}, + SenderTimestamp: time.Time{}, MessageType: 0, kv: versioned.NewKV(make(ekv.Memstore)), } @@ -85,8 +85,8 @@ func Test_loadOrCreateMultiPartMessage_Load(t *testing.T) { func CheckMultiPartMessages(expectedMpm *multiPartMessage, mpm *multiPartMessage, t *testing.T) { // The kv differs because it has prefix called, so we compare fields individually - if expectedMpm.Timestamp != mpm.Timestamp { - t.Errorf("timestamps mismatch: expected %v, got %v", expectedMpm.Timestamp, mpm.Timestamp) + if expectedMpm.SenderTimestamp != mpm.SenderTimestamp { + t.Errorf("timestamps mismatch: expected %v, got %v", expectedMpm.SenderTimestamp, mpm.SenderTimestamp) } if expectedMpm.MessageType != mpm.MessageType { t.Errorf("messagetype mismatch: expected %v, got %v", expectedMpm.MessageID, mpm.MessageID) @@ -163,7 +163,7 @@ func TestMultiPartMessage_AddFirst(t *testing.T) { MessageID: prng.Uint64(), NumParts: uint8(prng.Uint32()), PresentParts: 1, - Timestamp: netTime.Now(), + SenderTimestamp: netTime.Now(), MessageType: message.NoType, parts: make([][]byte, 3), kv: versioned.NewKV(make(ekv.Memstore)), @@ -173,7 +173,7 @@ func TestMultiPartMessage_AddFirst(t *testing.T) { expectedMpm.MessageID, expectedMpm.kv) npm.AddFirst(expectedMpm.MessageType, 2, expectedMpm.NumParts, - expectedMpm.Timestamp, expectedMpm.parts[2]) + expectedMpm.SenderTimestamp, netTime.Now(), expectedMpm.parts[2]) CheckMultiPartMessages(expectedMpm, npm, t) @@ -203,7 +203,7 @@ func TestMultiPartMessage_IsComplete(t *testing.T) { t.Error("IsComplete() returned true when NumParts == 0.") } - mpm.AddFirst(message.Text, partNums[0], 75, netTime.Now(), parts[0]) + mpm.AddFirst(message.Text, partNums[0], 75, netTime.Now(), netTime.Now(), parts[0]) for i := range partNums { if i > 0 { mpm.Add(partNums[i], parts[i]) diff --git a/storage/partition/store.go b/storage/partition/store.go index e699aaaa70de7407044a349bb0cca3073e2478c2..b214b2ced01a1fd66fd3984354bf199b9bc60700 100644 --- a/storage/partition/store.go +++ b/storage/partition/store.go @@ -10,8 +10,10 @@ package partition import ( "encoding/binary" "gitlab.com/elixxir/client/interfaces/message" + "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" "golang.org/x/crypto/blake2b" "sync" "time" @@ -20,6 +22,8 @@ import ( type multiPartID [16]byte const packagePrefix = "Partition" +const clearPartitionInterval = 12*time.Hour +const clearPartitionThreshold = 24*time.Hour type Store struct { multiParts map[multiPartID]*multiPartMessage @@ -35,12 +39,12 @@ func New(kv *versioned.KV) *Store { } func (s *Store) AddFirst(partner *id.ID, mt message.Type, messageID uint64, - partNum, numParts uint8, timestamp time.Time, + partNum, numParts uint8, senderTimestamp, storageTimestamp time.Time, part []byte, relationshipFingerprint []byte) (message.Receive, bool) { mpm := s.load(partner, messageID) - mpm.AddFirst(mt, partNum, numParts, timestamp, part) + mpm.AddFirst(mt, partNum, numParts, senderTimestamp, storageTimestamp, part) return mpm.IsComplete(relationshipFingerprint) } @@ -55,14 +59,37 @@ func (s *Store) Add(partner *id.ID, messageID uint64, partNum uint8, return mpm.IsComplete(relationshipFingerprint) } -// todo: may need a way to clean up partitioned messages when deleting a contact -// todo: Possible options: -// todo: Store partition w/ a timestamp, periodically clear old timestamps -// todo: Don't clean, storage space is negligible -// todo: Misc: Store partitions in individual files under a folder? -//func (s *Store) Delete() error { -// -//} +// ClearMessages periodically clear old messages on it's stored timestamp +func (s *Store) ClearMessages() stoppable.Stoppable { + stop := stoppable.NewSingle("clearPartition") + t := time.NewTicker(clearPartitionInterval) + go func() { + for { + select { + case <-stop.Quit(): + t.Stop() + return + case <-t.C: + s.clearMessages() + } + } + }() + return stop +} + +// clearMessages is a helper function which clears +// old messages from storage +func (s *Store) clearMessages() { + s.mux.Lock() + now := netTime.Now() + for mpmId, mpm := range s.multiParts { + if now.Sub(mpm.StorageTimestamp) >= clearPartitionThreshold { + mpm.delete() + delete(s.multiParts, mpmId) + } + } + s.mux.Unlock() +} func (s *Store) load(partner *id.ID, messageID uint64) *multiPartMessage { mpID := getMultiPartID(partner, messageID) diff --git a/storage/partition/store_test.go b/storage/partition/store_test.go index 9de6327d989d032e6ed987db0aff891d6be0fa3b..9ca2783477c7b4b60097fd7229e7efdb8d4f677d 100644 --- a/storage/partition/store_test.go +++ b/storage/partition/store_test.go @@ -40,7 +40,7 @@ func TestStore_AddFirst(t *testing.T) { s := New(versioned.NewKV(ekv.Memstore{})) msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t), - message.Text, 5, 0, 1, netTime.Now(), part, + message.Text, 5, 0, 1, netTime.Now(), netTime.Now(), part, []byte{0}) if !complete { @@ -60,7 +60,7 @@ func TestStore_Add(t *testing.T) { s := New(versioned.NewKV(ekv.Memstore{})) msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t), - message.Text, 5, 0, 2, netTime.Now(), part1, + message.Text, 5, 0, 2, netTime.Now(), netTime.Now(), part1, []byte{0}) if complete { @@ -79,3 +79,47 @@ func TestStore_Add(t *testing.T) { "\n\texpected: %v\n\treceived: %v", part, msg.Payload) } } + +// Unit test of clearMessages +func TestStore_ClearMessages(t *testing.T) { + // Setup: Add 2 message to store: an old message past the threshold and a new message + part1 := []byte("Test message.") + part2 := []byte("Second Sentence.") + s := New(versioned.NewKV(ekv.Memstore{})) + + partner1 := id.NewIdFromString("User", id.User, t) + messageId1 := uint64(5) + oldTimestamp := netTime.Now().Add(-2*clearPartitionThreshold) + s.AddFirst(partner1, + message.Text, messageId1, 0, 2, netTime.Now(), + oldTimestamp, part1, + []byte{0}) + s.Add(partner1, messageId1, 1, part2, []byte{0}) + + partner2 := id.NewIdFromString("User1", id.User, t) + messageId2 := uint64(6) + newTimestamp := netTime.Now() + s.AddFirst(partner2, message.Text, messageId2, 0, 2, netTime.Now(), + newTimestamp, part1, + []byte{0}) + s.Add(partner2, messageId2, 1, part2, []byte{0}) + + + + // Call clear messages + s.clearMessages() + + // Check if old message cleared + mpmId := getMultiPartID(partner1, messageId1) + if _, ok := s.multiParts[mpmId]; ok { + t.Errorf("ClearMessages error: " + + "Expected old message to be cleared out of store") + } + + // Check if new message remains + mpmId2 := getMultiPartID(partner2, messageId2) + if _, ok := s.multiParts[mpmId2]; !ok { + t.Errorf("ClearMessages error: " + + "Expected new message to be remain in store") + } +}