diff --git a/api/client.go b/api/client.go index 83ea0d3828bdae85a2eddbe5f957fe6c3b333aea..f1a9e030fbcebaa229dd2e33ba246f491e3141c4 100644 --- a/api/client.go +++ b/api/client.go @@ -408,9 +408,6 @@ func (c *Client) StartNetworkFollower(timeout time.Duration) (<-chan interfaces. c.runner = stoppable.NewMulti(followerStoppableName) } - jww.INFO.Printf("Adding partition cleaner") - c.runner.Add(c.storage.Partition().ClearMessages()) - err = c.status.toStarting() if err != nil { return nil, errors.WithMessage(err, "Failed to Start the Network Follower") diff --git a/storage/auth/store.go b/storage/auth/store.go index 3b74b7d1babbd39816e71523b235722348054a04..341bc74c01639163b9c22666263ab23b7812b1a8 100644 --- a/storage/auth/store.go +++ b/storage/auth/store.go @@ -105,7 +105,6 @@ func LoadStore(kv *versioned.KV, grp *cyclic.Group, privKeys []*cyclic.Int) (*St jww.FATAL.Panicf("Failed to load stored id: %+v", err) } - switch r.rt { case Sent: sr, err := loadSentRequest(kv, partner, grp) diff --git a/storage/partition/multiPartMessage.go b/storage/partition/multiPartMessage.go index 0bddd605fd9da711f95c2c60cdeaee2c1e91f5a7..3aa8cffb556c3bd23f8afb6f8730830919b3c07f 100644 --- a/storage/partition/multiPartMessage.go +++ b/storage/partition/multiPartMessage.go @@ -31,10 +31,10 @@ type multiPartMessage struct { NumParts uint8 PresentParts uint8 // Timestamp of message from sender - SenderTimestamp time.Time + SenderTimestamp time.Time // Timestamp in which message was stored in RAM StorageTimestamp time.Time - MessageType message.Type + MessageType message.Type parts [][]byte kv *versioned.KV @@ -51,13 +51,13 @@ func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64, if err != nil { if !ekv.Exists(err) { mpm := &multiPartMessage{ - Sender: sender, - MessageID: messageID, - NumParts: 0, - PresentParts: 0, - SenderTimestamp: time.Time{}, - MessageType: 0, - kv: kv, + Sender: sender, + MessageID: messageID, + NumParts: 0, + PresentParts: 0, + SenderTimestamp: time.Time{}, + MessageType: 0, + kv: kv, } if err = mpm.save(); err != nil { jww.FATAL.Panicf("Failed to save new multi part "+ diff --git a/storage/partition/multiPartMessage_test.go b/storage/partition/multiPartMessage_test.go index cf8a02a55d7db9417ba681940b78255ae066c05b..752d53272bb121f566e2917d746087020f68d14b 100644 --- a/storage/partition/multiPartMessage_test.go +++ b/storage/partition/multiPartMessage_test.go @@ -27,13 +27,13 @@ func Test_loadOrCreateMultiPartMessage_Create(t *testing.T) { // Set up expected test value prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) expectedMpm := &multiPartMessage{ - Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), - MessageID: prng.Uint64(), - NumParts: 0, - PresentParts: 0, - SenderTimestamp: time.Time{}, - MessageType: 0, - kv: versioned.NewKV(make(ekv.Memstore)), + Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), + MessageID: prng.Uint64(), + NumParts: 0, + PresentParts: 0, + SenderTimestamp: time.Time{}, + MessageType: 0, + kv: versioned.NewKV(make(ekv.Memstore)), } expectedData, err := json.Marshal(expectedMpm) if err != nil { @@ -63,13 +63,13 @@ func Test_loadOrCreateMultiPartMessage_Load(t *testing.T) { // Set up expected test value prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) expectedMpm := &multiPartMessage{ - Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), - MessageID: prng.Uint64(), - NumParts: 0, - PresentParts: 0, - SenderTimestamp: time.Time{}, - MessageType: 0, - kv: versioned.NewKV(make(ekv.Memstore)), + Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), + MessageID: prng.Uint64(), + NumParts: 0, + PresentParts: 0, + SenderTimestamp: time.Time{}, + MessageType: 0, + kv: versioned.NewKV(make(ekv.Memstore)), } err := expectedMpm.save() if err != nil { @@ -159,14 +159,14 @@ func TestMultiPartMessage_AddFirst(t *testing.T) { // Generate test values prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) expectedMpm := &multiPartMessage{ - Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), - MessageID: prng.Uint64(), - NumParts: uint8(prng.Uint32()), - PresentParts: 1, - SenderTimestamp: netTime.Now(), - MessageType: message.NoType, - parts: make([][]byte, 3), - kv: versioned.NewKV(make(ekv.Memstore)), + Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), + MessageID: prng.Uint64(), + NumParts: uint8(prng.Uint32()), + PresentParts: 1, + SenderTimestamp: netTime.Now(), + MessageType: message.NoType, + parts: make([][]byte, 3), + kv: versioned.NewKV(make(ekv.Memstore)), } expectedMpm.parts[2] = []byte{5, 8, 78, 9} npm := loadOrCreateMultiPartMessage(expectedMpm.Sender, diff --git a/storage/partition/store.go b/storage/partition/store.go index 7dfaf405bf35b0fb64743bd9a6820360c6f3642c..d5f570555aad1d3df4e4aaf0cf0434b9807efcc0 100644 --- a/storage/partition/store.go +++ b/storage/partition/store.go @@ -9,9 +9,9 @@ package partition import ( "encoding/binary" + "encoding/json" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/message" - "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/netTime" @@ -23,22 +23,39 @@ import ( type multiPartID [16]byte const packagePrefix = "Partition" -const clearPartitionInterval = 5*time.Hour -const clearPartitionThreshold = 24*time.Hour +const clearPartitionThreshold = 24 * time.Hour +const activePartitions = "activePartitions" +const activePartitionVersion = 0 type Store struct { - multiParts map[multiPartID]*multiPartMessage - kv *versioned.KV - mux sync.Mutex + multiParts map[multiPartID]*multiPartMessage + activeParts []*multiPartMessage + kv *versioned.KV + mux sync.Mutex } func New(kv *versioned.KV) *Store { return &Store{ - multiParts: make(map[multiPartID]*multiPartMessage), - kv: kv.Prefix(packagePrefix), + multiParts: make(map[multiPartID]*multiPartMessage), + activeParts: make([]*multiPartMessage, 0), + kv: kv.Prefix(packagePrefix), } } +func Load(kv *versioned.KV) *Store { + partitionStore := &Store{ + multiParts: make(map[multiPartID]*multiPartMessage), + activeParts: make([]*multiPartMessage, 0), + kv: kv.Prefix(packagePrefix), + } + + partitionStore.loadActivePartitions() + + partitionStore.Prune() + + return partitionStore +} + func (s *Store) AddFirst(partner *id.ID, mt message.Type, messageID uint64, partNum, numParts uint8, senderTimestamp, storageTimestamp time.Time, part []byte, relationshipFingerprint []byte) (message.Receive, bool) { @@ -46,8 +63,14 @@ func (s *Store) AddFirst(partner *id.ID, mt message.Type, messageID uint64, mpm := s.load(partner, messageID) mpm.AddFirst(mt, partNum, numParts, senderTimestamp, storageTimestamp, part) + msg, ok := mpm.IsComplete(relationshipFingerprint) + + if !ok { + s.activeParts = append(s.activeParts, mpm) + s.saveActiveParts() + } - return mpm.IsComplete(relationshipFingerprint) + return msg, ok } func (s *Store) Add(partner *id.ID, messageID uint64, partNum uint8, @@ -57,41 +80,27 @@ func (s *Store) Add(partner *id.ID, messageID uint64, partNum uint8, mpm.Add(partNum, part) - return mpm.IsComplete(relationshipFingerprint) -} + msg, ok := mpm.IsComplete(relationshipFingerprint) + if !ok { + s.activeParts = append(s.activeParts, mpm) + s.saveActiveParts() + } -// ClearMessages periodically clear old messages on it's stored timestamp -func (s *Store) ClearMessages() stoppable.Stoppable { - stop := stoppable.NewSingle("clearPartition") - t := time.NewTicker(clearPartitionInterval) - go func() { - for { - select { - case <-stop.Quit(): - jww.INFO.Printf("Received stop signal in clear messages") - stop.ToStopped() - t.Stop() - return - case <-t.C: - s.clearMessages() - } - } - }() - return stop + return msg, ok } -// clearMessages is a helper function which clears -// old messages from storage -func (s *Store) clearMessages() { +// Prune clear old messages on it's stored timestamp +func (s *Store) Prune() { s.mux.Lock() + defer s.mux.Unlock() now := netTime.Now() - for mpmId, mpm := range s.multiParts { + for _, mpm := range s.activeParts { if now.Sub(mpm.StorageTimestamp) >= clearPartitionThreshold { mpm.delete() - delete(s.multiParts, mpmId) + mpID := getMultiPartID(mpm.Sender, mpm.MessageID) + delete(s.multiParts, mpID) } } - s.mux.Unlock() } func (s *Store) load(partner *id.ID, messageID uint64) *multiPartMessage { @@ -107,6 +116,40 @@ func (s *Store) load(partner *id.ID, messageID uint64) *multiPartMessage { return mpm } +func (s *Store) saveActiveParts() { + data, err := json.Marshal(s.activeParts) + if err != nil { + jww.FATAL.Panicf("Could not save active partitions: %v", err) + } + + obj := versioned.Object{ + Version: activePartitionVersion, + Timestamp: netTime.Now(), + Data: data, + } + + err = s.kv.Set(activePartitions, activePartitionVersion, &obj) + if err != nil { + jww.FATAL.Panicf("Could not save active partitions: %v", err) + } +} + +func (s *Store) loadActivePartitions() { + s.mux.Lock() + defer s.mux.Unlock() + obj, err := s.kv.Get(activePartitions, activePartitionVersion) + if err != nil { + jww.DEBUG.Printf("Could not load active partitions: %v", err) + return + } + + err = json.Unmarshal(obj.Data, &s.activeParts) + if err != nil { + jww.FATAL.Panicf("Could not load active partitions: %v", err) + } + +} + func getMultiPartID(partner *id.ID, messageID uint64) multiPartID { h, _ := blake2b.New256(nil) diff --git a/storage/partition/store_test.go b/storage/partition/store_test.go index 9ca2783477c7b4b60097fd7229e7efdb8d4f677d..c26a50401a65d10b35bdb7624fee4f618949d548 100644 --- a/storage/partition/store_test.go +++ b/storage/partition/store_test.go @@ -22,8 +22,9 @@ import ( func TestNew(t *testing.T) { rootKv := versioned.NewKV(make(ekv.Memstore)) expectedStore := &Store{ - multiParts: make(map[multiPartID]*multiPartMessage), - kv: rootKv.Prefix(packagePrefix), + multiParts: make(map[multiPartID]*multiPartMessage), + activeParts: make([]*multiPartMessage, 0), + kv: rootKv.Prefix(packagePrefix), } store := New(rootKv) @@ -80,7 +81,7 @@ func TestStore_Add(t *testing.T) { } } -// Unit test of clearMessages +// Unit test of prune func TestStore_ClearMessages(t *testing.T) { // Setup: Add 2 message to store: an old message past the threshold and a new message part1 := []byte("Test message.") @@ -89,7 +90,7 @@ func TestStore_ClearMessages(t *testing.T) { partner1 := id.NewIdFromString("User", id.User, t) messageId1 := uint64(5) - oldTimestamp := netTime.Now().Add(-2*clearPartitionThreshold) + oldTimestamp := netTime.Now().Add(-2 * clearPartitionThreshold) s.AddFirst(partner1, message.Text, messageId1, 0, 2, netTime.Now(), oldTimestamp, part1, @@ -104,22 +105,20 @@ func TestStore_ClearMessages(t *testing.T) { []byte{0}) s.Add(partner2, messageId2, 1, part2, []byte{0}) - - // Call clear messages - s.clearMessages() + s.Prune() // Check if old message cleared mpmId := getMultiPartID(partner1, messageId1) if _, ok := s.multiParts[mpmId]; ok { - t.Errorf("ClearMessages error: " + + t.Errorf("Prune error: " + "Expected old message to be cleared out of store") } // Check if new message remains mpmId2 := getMultiPartID(partner2, messageId2) if _, ok := s.multiParts[mpmId2]; !ok { - t.Errorf("ClearMessages error: " + + t.Errorf("Prune error: " + "Expected new message to be remain in store") } } diff --git a/storage/session.go b/storage/session.go index 7fdeea29e807fac8de73b3e92b20da789682962f..019e187d3176b9c4ea1b24f26eaf31697e13e458 100644 --- a/storage/session.go +++ b/storage/session.go @@ -215,7 +215,7 @@ func Load(baseDir, password string, currentVersion version.Version, } s.conversations = conversation.NewStore(s.kv) - s.partition = partition.New(s.kv) + s.partition = partition.Load(s.kv) s.reception = reception.LoadStore(s.kv) diff --git a/storage/user.go b/storage/user.go index a236f22d35e618d77c75277c9ece5999712832a3..975b574a2e9678a31eff7f0598b396ab6427a6af 100644 --- a/storage/user.go +++ b/storage/user.go @@ -14,18 +14,18 @@ func (s *Session) GetUser() user.User { defer s.mux.RUnlock() ci := s.user.GetCryptographicIdentity() return user.User{ - TransmissionID: ci.GetTransmissionID().DeepCopy(), - TransmissionSalt: copySlice(ci.GetTransmissionSalt()), - TransmissionRSA: ci.GetReceptionRSA(), - ReceptionID: ci.GetReceptionID().DeepCopy(), + TransmissionID: ci.GetTransmissionID().DeepCopy(), + TransmissionSalt: copySlice(ci.GetTransmissionSalt()), + TransmissionRSA: ci.GetReceptionRSA(), + ReceptionID: ci.GetReceptionID().DeepCopy(), RegistrationTimestamp: s.user.GetRegistrationTimestamp(), - ReceptionSalt: copySlice(ci.GetReceptionSalt()), - ReceptionRSA: ci.GetReceptionRSA(), - Precanned: ci.IsPrecanned(), - CmixDhPrivateKey: s.cmix.GetDHPrivateKey().DeepCopy(), - CmixDhPublicKey: s.cmix.GetDHPublicKey().DeepCopy(), - E2eDhPrivateKey: s.e2e.GetDHPrivateKey().DeepCopy(), - E2eDhPublicKey: s.e2e.GetDHPublicKey().DeepCopy(), + ReceptionSalt: copySlice(ci.GetReceptionSalt()), + ReceptionRSA: ci.GetReceptionRSA(), + Precanned: ci.IsPrecanned(), + CmixDhPrivateKey: s.cmix.GetDHPrivateKey().DeepCopy(), + CmixDhPublicKey: s.cmix.GetDHPublicKey().DeepCopy(), + E2eDhPrivateKey: s.e2e.GetDHPrivateKey().DeepCopy(), + E2eDhPublicKey: s.e2e.GetDHPublicKey().DeepCopy(), } }