Skip to content
Snippets Groups Projects
Commit 8c7c8c3a authored by Josh Brooks's avatar Josh Brooks
Browse files

Update partition for clearing messages

parent 18cabb0e
No related branches found
No related tags found
1 merge request!23Release
......@@ -30,7 +30,10 @@ type multiPartMessage struct {
MessageID uint64
NumParts uint8
PresentParts uint8
Timestamp time.Time
// Timestamp of message from sender
SenderTimestamp time.Time
// Timestamp in which message was stored in RAM
StorageTimestamp time.Time
MessageType message.Type
parts [][]byte
......@@ -52,7 +55,7 @@ func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64,
MessageID: messageID,
NumParts: 0,
PresentParts: 0,
Timestamp: time.Time{},
SenderTimestamp: time.Time{},
MessageType: 0,
kv: kv,
}
......@@ -119,7 +122,7 @@ func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) {
}
func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8,
numParts uint8, timestamp time.Time, part []byte) {
numParts uint8, senderTimestamp, storageTimestamp time.Time, part []byte) {
mpm.mux.Lock()
defer mpm.mux.Unlock()
......@@ -129,10 +132,11 @@ func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8,
}
mpm.NumParts = numParts
mpm.Timestamp = timestamp
mpm.SenderTimestamp = senderTimestamp
mpm.MessageType = mt
mpm.parts[partNumber] = part
mpm.PresentParts++
mpm.StorageTimestamp = storageTimestamp
if err := savePart(mpm.kv, partNumber, part); err != nil {
jww.FATAL.Panicf("Failed to save multi part "+
......@@ -159,27 +163,8 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message
mpm.parts = append(mpm.parts, make([][]byte, int(mpm.NumParts)-len(mpm.parts))...)
}
var err error
lenMsg := 0
// Load all parts from disk, deleting files from disk as we go along
for i := uint8(0); i < mpm.NumParts; i++ {
if mpm.parts[i] == nil {
if mpm.parts[i], err = loadPart(mpm.kv, i); err != nil {
jww.FATAL.Panicf("Failed to load multi part "+
"message part %v from %s messageID %v: %s", i, mpm.Sender,
mpm.MessageID, err)
}
if err = deletePart(mpm.kv, i); err != nil {
jww.FATAL.Panicf("Failed to delete multi part "+
"message part %v from %s messageID %v: %s", i, mpm.Sender,
mpm.MessageID, err)
}
}
lenMsg += len(mpm.parts[i])
}
// delete the multipart message
mpm.delete()
lenMsg := mpm.delete()
mpm.mux.Unlock()
// Reconstruct the message
......@@ -200,7 +185,7 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message
Payload: reconstructed,
MessageType: mpm.MessageType,
Sender: mpm.Sender,
Timestamp: mpm.Timestamp,
Timestamp: mpm.SenderTimestamp,
// Encryption will be set externally
Encryption: 0,
ID: mid,
......@@ -209,7 +194,27 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message
return m, true
}
func (mpm *multiPartMessage) delete() {
// deletes all parts from disk and RAM. Returns the message length for reconstruction
func (mpm *multiPartMessage) delete() int {
// Load all parts from disk, deleting files from disk as we go along
var err error
lenMsg := 0
for i := uint8(0); i < mpm.NumParts; i++ {
if mpm.parts[i] == nil {
if mpm.parts[i], err = loadPart(mpm.kv, i); err != nil {
jww.FATAL.Panicf("Failed to load multi part "+
"message part %v from %s messageID %v: %s", i, mpm.Sender,
mpm.MessageID, err)
}
if err = deletePart(mpm.kv, i); err != nil {
jww.FATAL.Panicf("Failed to delete multi part "+
"message part %v from %s messageID %v: %s", i, mpm.Sender,
mpm.MessageID, err)
}
}
lenMsg += len(mpm.parts[i])
}
//key := makeMultiPartMessageKey(mpm.MessageID)
if err := mpm.kv.Delete(messageKey,
currentMultiPartMessageVersion); err != nil {
......@@ -217,4 +222,6 @@ func (mpm *multiPartMessage) delete() {
"message from %s messageID %v: %s", mpm.Sender,
mpm.MessageID, err)
}
return lenMsg
}
......@@ -31,7 +31,7 @@ func Test_loadOrCreateMultiPartMessage_Create(t *testing.T) {
MessageID: prng.Uint64(),
NumParts: 0,
PresentParts: 0,
Timestamp: time.Time{},
SenderTimestamp: time.Time{},
MessageType: 0,
kv: versioned.NewKV(make(ekv.Memstore)),
}
......@@ -67,7 +67,7 @@ func Test_loadOrCreateMultiPartMessage_Load(t *testing.T) {
MessageID: prng.Uint64(),
NumParts: 0,
PresentParts: 0,
Timestamp: time.Time{},
SenderTimestamp: time.Time{},
MessageType: 0,
kv: versioned.NewKV(make(ekv.Memstore)),
}
......@@ -85,8 +85,8 @@ func Test_loadOrCreateMultiPartMessage_Load(t *testing.T) {
func CheckMultiPartMessages(expectedMpm *multiPartMessage, mpm *multiPartMessage, t *testing.T) {
// The kv differs because it has prefix called, so we compare fields individually
if expectedMpm.Timestamp != mpm.Timestamp {
t.Errorf("timestamps mismatch: expected %v, got %v", expectedMpm.Timestamp, mpm.Timestamp)
if expectedMpm.SenderTimestamp != mpm.SenderTimestamp {
t.Errorf("timestamps mismatch: expected %v, got %v", expectedMpm.SenderTimestamp, mpm.SenderTimestamp)
}
if expectedMpm.MessageType != mpm.MessageType {
t.Errorf("messagetype mismatch: expected %v, got %v", expectedMpm.MessageID, mpm.MessageID)
......@@ -163,7 +163,7 @@ func TestMultiPartMessage_AddFirst(t *testing.T) {
MessageID: prng.Uint64(),
NumParts: uint8(prng.Uint32()),
PresentParts: 1,
Timestamp: netTime.Now(),
SenderTimestamp: netTime.Now(),
MessageType: message.NoType,
parts: make([][]byte, 3),
kv: versioned.NewKV(make(ekv.Memstore)),
......@@ -173,7 +173,7 @@ func TestMultiPartMessage_AddFirst(t *testing.T) {
expectedMpm.MessageID, expectedMpm.kv)
npm.AddFirst(expectedMpm.MessageType, 2, expectedMpm.NumParts,
expectedMpm.Timestamp, expectedMpm.parts[2])
expectedMpm.SenderTimestamp, netTime.Now(), expectedMpm.parts[2])
CheckMultiPartMessages(expectedMpm, npm, t)
......@@ -203,7 +203,7 @@ func TestMultiPartMessage_IsComplete(t *testing.T) {
t.Error("IsComplete() returned true when NumParts == 0.")
}
mpm.AddFirst(message.Text, partNums[0], 75, netTime.Now(), parts[0])
mpm.AddFirst(message.Text, partNums[0], 75, netTime.Now(), netTime.Now(), parts[0])
for i := range partNums {
if i > 0 {
mpm.Add(partNums[i], parts[i])
......
......@@ -10,8 +10,10 @@ package partition
import (
"encoding/binary"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime"
"golang.org/x/crypto/blake2b"
"sync"
"time"
......@@ -20,6 +22,8 @@ import (
type multiPartID [16]byte
const packagePrefix = "Partition"
const clearPartitionInterval = 12*time.Hour
const clearPartitionThreshold = 24*time.Hour
type Store struct {
multiParts map[multiPartID]*multiPartMessage
......@@ -35,12 +39,12 @@ func New(kv *versioned.KV) *Store {
}
func (s *Store) AddFirst(partner *id.ID, mt message.Type, messageID uint64,
partNum, numParts uint8, timestamp time.Time,
partNum, numParts uint8, senderTimestamp, storageTimestamp time.Time,
part []byte, relationshipFingerprint []byte) (message.Receive, bool) {
mpm := s.load(partner, messageID)
mpm.AddFirst(mt, partNum, numParts, timestamp, part)
mpm.AddFirst(mt, partNum, numParts, senderTimestamp, storageTimestamp, part)
return mpm.IsComplete(relationshipFingerprint)
}
......@@ -55,14 +59,37 @@ func (s *Store) Add(partner *id.ID, messageID uint64, partNum uint8,
return mpm.IsComplete(relationshipFingerprint)
}
// todo: may need a way to clean up partitioned messages when deleting a contact
// todo: Possible options:
// todo: Store partition w/ a timestamp, periodically clear old timestamps
// todo: Don't clean, storage space is negligible
// todo: Misc: Store partitions in individual files under a folder?
//func (s *Store) Delete() error {
//
//}
// ClearMessages periodically clear old messages on it's stored timestamp
func (s *Store) ClearMessages() stoppable.Stoppable {
stop := stoppable.NewSingle("clearPartition")
t := time.NewTicker(clearPartitionInterval)
go func() {
for {
select {
case <-stop.Quit():
t.Stop()
return
case <-t.C:
s.clearMessages()
}
}
}()
return stop
}
// clearMessages is a helper function which clears
// old messages from storage
func (s *Store) clearMessages() {
s.mux.Lock()
now := netTime.Now()
for mpmId, mpm := range s.multiParts {
if now.Sub(mpm.StorageTimestamp) >= clearPartitionThreshold {
mpm.delete()
delete(s.multiParts, mpmId)
}
}
s.mux.Unlock()
}
func (s *Store) load(partner *id.ID, messageID uint64) *multiPartMessage {
mpID := getMultiPartID(partner, messageID)
......
......@@ -40,7 +40,7 @@ func TestStore_AddFirst(t *testing.T) {
s := New(versioned.NewKV(ekv.Memstore{}))
msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t),
message.Text, 5, 0, 1, netTime.Now(), part,
message.Text, 5, 0, 1, netTime.Now(), netTime.Now(), part,
[]byte{0})
if !complete {
......@@ -60,7 +60,7 @@ func TestStore_Add(t *testing.T) {
s := New(versioned.NewKV(ekv.Memstore{}))
msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t),
message.Text, 5, 0, 2, netTime.Now(), part1,
message.Text, 5, 0, 2, netTime.Now(), netTime.Now(), part1,
[]byte{0})
if complete {
......@@ -79,3 +79,47 @@ func TestStore_Add(t *testing.T) {
"\n\texpected: %v\n\treceived: %v", part, msg.Payload)
}
}
// Unit test of clearMessages
func TestStore_ClearMessages(t *testing.T) {
// Setup: Add 2 message to store: an old message past the threshold and a new message
part1 := []byte("Test message.")
part2 := []byte("Second Sentence.")
s := New(versioned.NewKV(ekv.Memstore{}))
partner1 := id.NewIdFromString("User", id.User, t)
messageId1 := uint64(5)
oldTimestamp := netTime.Now().Add(-2*clearPartitionThreshold)
s.AddFirst(partner1,
message.Text, messageId1, 0, 2, netTime.Now(),
oldTimestamp, part1,
[]byte{0})
s.Add(partner1, messageId1, 1, part2, []byte{0})
partner2 := id.NewIdFromString("User1", id.User, t)
messageId2 := uint64(6)
newTimestamp := netTime.Now()
s.AddFirst(partner2, message.Text, messageId2, 0, 2, netTime.Now(),
newTimestamp, part1,
[]byte{0})
s.Add(partner2, messageId2, 1, part2, []byte{0})
// Call clear messages
s.clearMessages()
// Check if old message cleared
mpmId := getMultiPartID(partner1, messageId1)
if _, ok := s.multiParts[mpmId]; ok {
t.Errorf("ClearMessages error: " +
"Expected old message to be cleared out of store")
}
// Check if new message remains
mpmId2 := getMultiPartID(partner2, messageId2)
if _, ok := s.multiParts[mpmId2]; !ok {
t.Errorf("ClearMessages error: " +
"Expected new message to be remain in store")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment