From 8c7c8c3af41e4a32d9099cad46a2aeab8e526f3e Mon Sep 17 00:00:00 2001
From: josh <josh@elixxir.io>
Date: Fri, 4 Jun 2021 12:46:19 -0700
Subject: [PATCH] Update partition for clearing messages
---
storage/partition/multiPartMessage.go | 59 ++++++++++++----------
storage/partition/multiPartMessage_test.go | 14 ++---
storage/partition/store.go | 47 +++++++++++++----
storage/partition/store_test.go | 48 +++++++++++++++++-
4 files changed, 123 insertions(+), 45 deletions(-)
diff --git a/storage/partition/multiPartMessage.go b/storage/partition/multiPartMessage.go
index 754c52470..0bddd605f 100644
--- a/storage/partition/multiPartMessage.go
+++ b/storage/partition/multiPartMessage.go
@@ -30,7 +30,10 @@ type multiPartMessage struct {
MessageID uint64
NumParts uint8
PresentParts uint8
- Timestamp time.Time
+ // Timestamp of message from sender
+ SenderTimestamp time.Time
+ // Timestamp in which message was stored in RAM
+ StorageTimestamp time.Time
MessageType message.Type
parts [][]byte
@@ -52,7 +55,7 @@ func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64,
MessageID: messageID,
NumParts: 0,
PresentParts: 0,
- Timestamp: time.Time{},
+ SenderTimestamp: time.Time{},
MessageType: 0,
kv: kv,
}
@@ -119,7 +122,7 @@ func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) {
}
func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8,
- numParts uint8, timestamp time.Time, part []byte) {
+ numParts uint8, senderTimestamp, storageTimestamp time.Time, part []byte) {
mpm.mux.Lock()
defer mpm.mux.Unlock()
@@ -129,10 +132,11 @@ func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8,
}
mpm.NumParts = numParts
- mpm.Timestamp = timestamp
+ mpm.SenderTimestamp = senderTimestamp
mpm.MessageType = mt
mpm.parts[partNumber] = part
mpm.PresentParts++
+ mpm.StorageTimestamp = storageTimestamp
if err := savePart(mpm.kv, partNumber, part); err != nil {
jww.FATAL.Panicf("Failed to save multi part "+
@@ -159,27 +163,8 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message
mpm.parts = append(mpm.parts, make([][]byte, int(mpm.NumParts)-len(mpm.parts))...)
}
- var err error
- lenMsg := 0
- // Load all parts from disk, deleting files from disk as we go along
- for i := uint8(0); i < mpm.NumParts; i++ {
- if mpm.parts[i] == nil {
- if mpm.parts[i], err = loadPart(mpm.kv, i); err != nil {
- jww.FATAL.Panicf("Failed to load multi part "+
- "message part %v from %s messageID %v: %s", i, mpm.Sender,
- mpm.MessageID, err)
- }
- if err = deletePart(mpm.kv, i); err != nil {
- jww.FATAL.Panicf("Failed to delete multi part "+
- "message part %v from %s messageID %v: %s", i, mpm.Sender,
- mpm.MessageID, err)
- }
- }
- lenMsg += len(mpm.parts[i])
- }
-
// delete the multipart message
- mpm.delete()
+ lenMsg := mpm.delete()
mpm.mux.Unlock()
// Reconstruct the message
@@ -200,7 +185,7 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message
Payload: reconstructed,
MessageType: mpm.MessageType,
Sender: mpm.Sender,
- Timestamp: mpm.Timestamp,
+ Timestamp: mpm.SenderTimestamp,
// Encryption will be set externally
Encryption: 0,
ID: mid,
@@ -209,7 +194,27 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message
return m, true
}
-func (mpm *multiPartMessage) delete() {
+// deletes all parts from disk and RAM. Returns the message length for reconstruction
+func (mpm *multiPartMessage) delete() int {
+ // Load all parts from disk, deleting files from disk as we go along
+ var err error
+ lenMsg := 0
+ for i := uint8(0); i < mpm.NumParts; i++ {
+ if mpm.parts[i] == nil {
+ if mpm.parts[i], err = loadPart(mpm.kv, i); err != nil {
+ jww.FATAL.Panicf("Failed to load multi part "+
+ "message part %v from %s messageID %v: %s", i, mpm.Sender,
+ mpm.MessageID, err)
+ }
+ if err = deletePart(mpm.kv, i); err != nil {
+ jww.FATAL.Panicf("Failed to delete multi part "+
+ "message part %v from %s messageID %v: %s", i, mpm.Sender,
+ mpm.MessageID, err)
+ }
+ }
+ lenMsg += len(mpm.parts[i])
+ }
+
//key := makeMultiPartMessageKey(mpm.MessageID)
if err := mpm.kv.Delete(messageKey,
currentMultiPartMessageVersion); err != nil {
@@ -217,4 +222,6 @@ func (mpm *multiPartMessage) delete() {
"message from %s messageID %v: %s", mpm.Sender,
mpm.MessageID, err)
}
+
+ return lenMsg
}
diff --git a/storage/partition/multiPartMessage_test.go b/storage/partition/multiPartMessage_test.go
index dff3c0fab..cf8a02a55 100644
--- a/storage/partition/multiPartMessage_test.go
+++ b/storage/partition/multiPartMessage_test.go
@@ -31,7 +31,7 @@ func Test_loadOrCreateMultiPartMessage_Create(t *testing.T) {
MessageID: prng.Uint64(),
NumParts: 0,
PresentParts: 0,
- Timestamp: time.Time{},
+ SenderTimestamp: time.Time{},
MessageType: 0,
kv: versioned.NewKV(make(ekv.Memstore)),
}
@@ -67,7 +67,7 @@ func Test_loadOrCreateMultiPartMessage_Load(t *testing.T) {
MessageID: prng.Uint64(),
NumParts: 0,
PresentParts: 0,
- Timestamp: time.Time{},
+ SenderTimestamp: time.Time{},
MessageType: 0,
kv: versioned.NewKV(make(ekv.Memstore)),
}
@@ -85,8 +85,8 @@ func Test_loadOrCreateMultiPartMessage_Load(t *testing.T) {
func CheckMultiPartMessages(expectedMpm *multiPartMessage, mpm *multiPartMessage, t *testing.T) {
// The kv differs because it has prefix called, so we compare fields individually
- if expectedMpm.Timestamp != mpm.Timestamp {
- t.Errorf("timestamps mismatch: expected %v, got %v", expectedMpm.Timestamp, mpm.Timestamp)
+ if expectedMpm.SenderTimestamp != mpm.SenderTimestamp {
+ t.Errorf("timestamps mismatch: expected %v, got %v", expectedMpm.SenderTimestamp, mpm.SenderTimestamp)
}
if expectedMpm.MessageType != mpm.MessageType {
t.Errorf("messagetype mismatch: expected %v, got %v", expectedMpm.MessageID, mpm.MessageID)
@@ -163,7 +163,7 @@ func TestMultiPartMessage_AddFirst(t *testing.T) {
MessageID: prng.Uint64(),
NumParts: uint8(prng.Uint32()),
PresentParts: 1,
- Timestamp: netTime.Now(),
+ SenderTimestamp: netTime.Now(),
MessageType: message.NoType,
parts: make([][]byte, 3),
kv: versioned.NewKV(make(ekv.Memstore)),
@@ -173,7 +173,7 @@ func TestMultiPartMessage_AddFirst(t *testing.T) {
expectedMpm.MessageID, expectedMpm.kv)
npm.AddFirst(expectedMpm.MessageType, 2, expectedMpm.NumParts,
- expectedMpm.Timestamp, expectedMpm.parts[2])
+ expectedMpm.SenderTimestamp, netTime.Now(), expectedMpm.parts[2])
CheckMultiPartMessages(expectedMpm, npm, t)
@@ -203,7 +203,7 @@ func TestMultiPartMessage_IsComplete(t *testing.T) {
t.Error("IsComplete() returned true when NumParts == 0.")
}
- mpm.AddFirst(message.Text, partNums[0], 75, netTime.Now(), parts[0])
+ mpm.AddFirst(message.Text, partNums[0], 75, netTime.Now(), netTime.Now(), parts[0])
for i := range partNums {
if i > 0 {
mpm.Add(partNums[i], parts[i])
diff --git a/storage/partition/store.go b/storage/partition/store.go
index e699aaaa7..b214b2ced 100644
--- a/storage/partition/store.go
+++ b/storage/partition/store.go
@@ -10,8 +10,10 @@ package partition
import (
"encoding/binary"
"gitlab.com/elixxir/client/interfaces/message"
+ "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/xx_network/primitives/id"
+ "gitlab.com/xx_network/primitives/netTime"
"golang.org/x/crypto/blake2b"
"sync"
"time"
@@ -20,6 +22,8 @@ import (
type multiPartID [16]byte
const packagePrefix = "Partition"
+const clearPartitionInterval = 12*time.Hour
+const clearPartitionThreshold = 24*time.Hour
type Store struct {
multiParts map[multiPartID]*multiPartMessage
@@ -35,12 +39,12 @@ func New(kv *versioned.KV) *Store {
}
func (s *Store) AddFirst(partner *id.ID, mt message.Type, messageID uint64,
- partNum, numParts uint8, timestamp time.Time,
+ partNum, numParts uint8, senderTimestamp, storageTimestamp time.Time,
part []byte, relationshipFingerprint []byte) (message.Receive, bool) {
mpm := s.load(partner, messageID)
- mpm.AddFirst(mt, partNum, numParts, timestamp, part)
+ mpm.AddFirst(mt, partNum, numParts, senderTimestamp, storageTimestamp, part)
return mpm.IsComplete(relationshipFingerprint)
}
@@ -55,14 +59,37 @@ func (s *Store) Add(partner *id.ID, messageID uint64, partNum uint8,
return mpm.IsComplete(relationshipFingerprint)
}
-// todo: may need a way to clean up partitioned messages when deleting a contact
-// todo: Possible options:
-// todo: Store partition w/ a timestamp, periodically clear old timestamps
-// todo: Don't clean, storage space is negligible
-// todo: Misc: Store partitions in individual files under a folder?
-//func (s *Store) Delete() error {
-//
-//}
+// ClearMessages periodically clear old messages on it's stored timestamp
+func (s *Store) ClearMessages() stoppable.Stoppable {
+ stop := stoppable.NewSingle("clearPartition")
+ t := time.NewTicker(clearPartitionInterval)
+ go func() {
+ for {
+ select {
+ case <-stop.Quit():
+ t.Stop()
+ return
+ case <-t.C:
+ s.clearMessages()
+ }
+ }
+ }()
+ return stop
+}
+
+// clearMessages is a helper function which clears
+// old messages from storage
+func (s *Store) clearMessages() {
+ s.mux.Lock()
+ now := netTime.Now()
+ for mpmId, mpm := range s.multiParts {
+ if now.Sub(mpm.StorageTimestamp) >= clearPartitionThreshold {
+ mpm.delete()
+ delete(s.multiParts, mpmId)
+ }
+ }
+ s.mux.Unlock()
+}
func (s *Store) load(partner *id.ID, messageID uint64) *multiPartMessage {
mpID := getMultiPartID(partner, messageID)
diff --git a/storage/partition/store_test.go b/storage/partition/store_test.go
index 9de6327d9..9ca278347 100644
--- a/storage/partition/store_test.go
+++ b/storage/partition/store_test.go
@@ -40,7 +40,7 @@ func TestStore_AddFirst(t *testing.T) {
s := New(versioned.NewKV(ekv.Memstore{}))
msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t),
- message.Text, 5, 0, 1, netTime.Now(), part,
+ message.Text, 5, 0, 1, netTime.Now(), netTime.Now(), part,
[]byte{0})
if !complete {
@@ -60,7 +60,7 @@ func TestStore_Add(t *testing.T) {
s := New(versioned.NewKV(ekv.Memstore{}))
msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t),
- message.Text, 5, 0, 2, netTime.Now(), part1,
+ message.Text, 5, 0, 2, netTime.Now(), netTime.Now(), part1,
[]byte{0})
if complete {
@@ -79,3 +79,47 @@ func TestStore_Add(t *testing.T) {
"\n\texpected: %v\n\treceived: %v", part, msg.Payload)
}
}
+
+// Unit test of clearMessages
+func TestStore_ClearMessages(t *testing.T) {
+ // Setup: Add 2 message to store: an old message past the threshold and a new message
+ part1 := []byte("Test message.")
+ part2 := []byte("Second Sentence.")
+ s := New(versioned.NewKV(ekv.Memstore{}))
+
+ partner1 := id.NewIdFromString("User", id.User, t)
+ messageId1 := uint64(5)
+ oldTimestamp := netTime.Now().Add(-2*clearPartitionThreshold)
+ s.AddFirst(partner1,
+ message.Text, messageId1, 0, 2, netTime.Now(),
+ oldTimestamp, part1,
+ []byte{0})
+ s.Add(partner1, messageId1, 1, part2, []byte{0})
+
+ partner2 := id.NewIdFromString("User1", id.User, t)
+ messageId2 := uint64(6)
+ newTimestamp := netTime.Now()
+ s.AddFirst(partner2, message.Text, messageId2, 0, 2, netTime.Now(),
+ newTimestamp, part1,
+ []byte{0})
+ s.Add(partner2, messageId2, 1, part2, []byte{0})
+
+
+
+ // Call clear messages
+ s.clearMessages()
+
+ // Check if old message cleared
+ mpmId := getMultiPartID(partner1, messageId1)
+ if _, ok := s.multiParts[mpmId]; ok {
+ t.Errorf("ClearMessages error: " +
+ "Expected old message to be cleared out of store")
+ }
+
+ // Check if new message remains
+ mpmId2 := getMultiPartID(partner2, messageId2)
+ if _, ok := s.multiParts[mpmId2]; !ok {
+ t.Errorf("ClearMessages error: " +
+ "Expected new message to be remain in store")
+ }
+}
--
GitLab