diff --git a/README.md b/README.md index 497f2c99231e3e0aa8908f4680aa1b76af8f802e..ebf490f59a5ab4ac6c8a154785443cff0ed28310 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,8 @@ Available Commands: Flags: --accept-channel Accept the channel request for the corresponding recipient ID + --delete-channel Delete the channel information for the + corresponding recipient ID --destfile string Read this contact file for the destination id -d, --destid string ID to send message to (if below 40, will be precanned. Use '0x' or 'b64:' for hex and diff --git a/api/client.go b/api/client.go index a81bb25352dd90926a2cce68c6601a5f847fcc8e..95a05f6bd1c0308d50bbf06071bd7338071cc732 100644 --- a/api/client.go +++ b/api/client.go @@ -572,6 +572,19 @@ func (c *Client) GetNodeRegistrationStatus() (int, int, error) { return numRegistered, len(nodes), nil } +// DeleteContact is a function which removes a partner from Client's storage +func (c *Client) DeleteContact(partnerId *id.ID) error { + jww.DEBUG.Printf("Deleting contact with ID %s", partnerId) + if err := c.storage.E2e().DeletePartner(partnerId); err != nil { + return err + } + if err := c.storage.Auth().Delete(partnerId); err != nil { + return err + } + c.storage.Conversations().Delete(partnerId) + return nil +} + // ----- Utility Functions ----- // parseNDF parses the initial ndf string for the client. do not check the // signature, it is deprecated. diff --git a/bindings/client.go b/bindings/client.go index 432ba95160cf5373b8664e044b5d61e3098a60d5..869ccfe1d03d7089bb38712c9d01d265da287a14 100644 --- a/bindings/client.go +++ b/bindings/client.go @@ -424,6 +424,15 @@ func (c *Client) GetNodeRegistrationStatus() (*NodeRegistrationsStatus, error) { return &NodeRegistrationsStatus{registered, total}, err } +// DeleteContact is a function which removes a contact from Client's storage +func (c *Client) DeleteContact(b []byte) error { + contactObj, err := UnmarshalContact(b) + if err != nil { + return err + } + return c.api.DeleteContact(contactObj.c.ID) +} + /* // SearchWithHandler is a non-blocking search that also registers // a callback interface for user disovery events. diff --git a/cmd/root.go b/cmd/root.go index 1c495bae029e9a6b3251b3c6a893158280f69d5d..c77caf8b79de8946d4bd1596bf627ded6596c6fa 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -174,6 +174,11 @@ var rootCmd = &cobra.Command{ " took %d seconds", scnt) } + // Delete this recipient + if viper.GetBool("delete-channel") { + deleteChannel(client, recipientID) + } + msg := message.Send{ Recipient: recipientID, Payload: []byte(msgBody), @@ -456,6 +461,13 @@ func acceptChannel(client *api.Client, recipientID *id.ID) { } } +func deleteChannel(client *api.Client, partnerId *id.ID) { + err := client.DeleteContact(partnerId) + if err != nil { + jww.FATAL.Panicf("%+v", err) + } +} + func printChanRequest(requestor contact.Contact, message string) { msg := fmt.Sprintf("Authentication channel request from: %s\n", requestor.ID) @@ -769,6 +781,11 @@ func init() { viper.BindPFlag("accept-channel", rootCmd.Flags().Lookup("accept-channel")) + rootCmd.Flags().Bool("delete-channel", false, + "Delete the channel information for the corresponding recipient ID") + viper.BindPFlag("delete-channel", + rootCmd.Flags().Lookup("delete-channel")) + rootCmd.Flags().BoolP("send-auth-request", "", false, "Send an auth request to the specified destination and wait"+ "for confirmation") diff --git a/keyExchange/confirm.go b/keyExchange/confirm.go index ce1bec2baf28d92502c494ec94e659ba1e7c48c1..45224193f80a302ae24e4e4f01ec0f7afe647638 100644 --- a/keyExchange/confirm.go +++ b/keyExchange/confirm.go @@ -83,11 +83,11 @@ func unmarshalConfirm(payload []byte) (e2e.SessionID, error) { "unmarshal payload: %s", err) } - confimedSessionID := e2e.SessionID{} - if err := confimedSessionID.Unmarshal(msg.SessionID); err != nil { + confirmedSessionID := e2e.SessionID{} + if err := confirmedSessionID.Unmarshal(msg.SessionID); err != nil { return e2e.SessionID{}, errors.Errorf("Failed to unmarshal"+ " sessionID: %s", err) } - return confimedSessionID, nil + return confirmedSessionID, nil } diff --git a/network/message/parse/partition.go b/network/message/parse/partition.go index e7fbbfd313379570d3ec522d23053fbd025456cf..ad66ba2f30e09434dc090f07e0917137973d9b00 100644 --- a/network/message/parse/partition.go +++ b/network/message/parse/partition.go @@ -12,6 +12,7 @@ import ( "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/storage" "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" "time" ) @@ -80,9 +81,9 @@ func (p Partitioner) HandlePartition(sender *id.ID, _ message.EncryptionType, // Handle the message ID messageID := p.session.Conversations().Get(sender). ProcessReceivedMessageID(fm.GetID()) - + storeageTimestamp := netTime.Now() return p.session.Partition().AddFirst(sender, fm.GetType(), - messageID, fm.GetPart(), fm.GetNumParts(), fm.GetTimestamp(), + messageID, fm.GetPart(), fm.GetNumParts(), fm.GetTimestamp(), storeageTimestamp, fm.GetSizedContents(), relationshipFingerprint) } else { // If it is a subsequent message part, handle it as so diff --git a/storage/auth/store.go b/storage/auth/store.go index 9ce2e6154e3b5bce28f4df6c36cbffca43255fc8..341bc74c01639163b9c22666263ab23b7812b1a8 100644 --- a/storage/auth/store.go +++ b/storage/auth/store.go @@ -92,7 +92,7 @@ func LoadStore(kv *versioned.KV, grp *cyclic.Group, privKeys []*cyclic.Int) (*St return nil, errors.WithMessagef(err, "Failed to "+ "unmarshal SentRequestMap") } - + jww.TRACE.Printf("%d found when loading AuthStore", len(requestList)) for _, rDisk := range requestList { r := &request{ rt: RequestType(rDisk.T), @@ -117,7 +117,6 @@ func LoadStore(kv *versioned.KV, grp *cyclic.Group, privKeys []*cyclic.Int) (*St PrivKey: nil, Request: r, } - rid = sr.partner r.sent = sr @@ -143,7 +142,6 @@ func LoadStore(kv *versioned.KV, grp *cyclic.Group, privKeys []*cyclic.Int) (*St func (s *Store) save() error { requestIDList := make([]requestDisk, len(s.requests)) - index := 0 for pid, r := range s.requests { rDisk := requestDisk{ @@ -158,7 +156,6 @@ func (s *Store) save() error { if err != nil { return err } - obj := versioned.Object{ Version: requestMapVersion, Timestamp: netTime.Now(), @@ -206,6 +203,7 @@ func (s *Store) AddSent(partner *id.ID, partnerHistoricalPubKey, myPrivKey, jww.INFO.Printf("AddSent PUBKEY FINGERPRINT: %v", sr.fingerprint) jww.INFO.Printf("AddSent PUBKEY: %v", sr.myPubKey.Bytes()) + jww.INFO.Printf("AddSent Partner: %s", partner) s.fingerprints[sr.fingerprint] = fingerprint{ Type: Specific, @@ -219,7 +217,7 @@ func (s *Store) AddSent(partner *id.ID, partnerHistoricalPubKey, myPrivKey, func (s *Store) AddReceived(c contact.Contact) error { s.mux.Lock() defer s.mux.Unlock() - + jww.DEBUG.Printf("AddReceived new contact: %s", c.ID) if _, ok := s.requests[*c.ID]; ok { return errors.Errorf("Cannot add contact for partner "+ "%s, one already exists", c.ID) diff --git a/storage/e2e/manager.go b/storage/e2e/manager.go index 6d7691b2b8f402c92ea6d4615a175b0b14b4b61e..a978beeae77c2fffa723c15f495ebf1fa4925c5c 100644 --- a/storage/e2e/manager.go +++ b/storage/e2e/manager.go @@ -112,6 +112,25 @@ func loadManager(ctx *context, kv *versioned.KV, partnerID *id.ID) (*Manager, er return m, nil } +// clearManager removes the relationship between the partner +// and deletes the Send and Receive sessions. This includes the +// sessions and the key vectors +func clearManager(m *Manager, kv *versioned.KV) error { + kv = kv.Prefix(fmt.Sprintf(managerPrefix, m.partner)) + + if err := DeleteRelationship(m); err != nil { + return errors.WithMessage(err, + "Failed to delete relationship") + } + + if err := utility.DeleteCyclicKey(m.kv, originPartnerPubKey); err != nil { + jww.FATAL.Panicf("Failed to delete %s: %+v", originPartnerPubKey, + err) + } + + return nil +} + // NewReceiveSession creates a new Receive session using the latest private key // this user has sent and the new public key received from the partner. If the // session already exists, then it will not be overwritten and the extant diff --git a/storage/e2e/manager_test.go b/storage/e2e/manager_test.go index 195b0752a603002dcb395507fc0b7d361a0b235a..114afac841407a4c4f8a0116ac452b7bd01c68cc 100644 --- a/storage/e2e/manager_test.go +++ b/storage/e2e/manager_test.go @@ -70,6 +70,30 @@ func TestLoadManager(t *testing.T) { } } +// Unit test for clearManager +func TestManager_ClearManager(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatalf("clearManager error: " + + "Did not panic when loading deleted manager") + } + }() + + // Set up expected and test values + expectedM, kv := newTestManager(t) + + err := clearManager(expectedM, kv) + if err != nil { + t.Fatalf("clearManager returned an error: %v", err) + } + + // Attempt to load relationship + _, err = loadManager(expectedM.ctx, kv, expectedM.partner) + if err != nil { + t.Errorf("loadManager() returned an error: %v", err) + } +} + // Tests happy path of Manager.NewReceiveSession. func TestManager_NewReceiveSession(t *testing.T) { // Set up test values diff --git a/storage/e2e/relationship.go b/storage/e2e/relationship.go index f92e12b6901b206dcdf2cae00200555eb762894b..e0e7723d6c332c36a1ac9ceae0753e3174da4ba6 100644 --- a/storage/e2e/relationship.go +++ b/storage/e2e/relationship.go @@ -84,6 +84,33 @@ func NewRelationship(manager *Manager, t RelationshipType, return r } +// DeleteRelationship removes all relationship and +// relationship adjacent information from storage +func DeleteRelationship(manager *Manager) error { + + // Delete the send information + sendKv := manager.kv.Prefix(Send.prefix()) + manager.send.Delete() + if err := deleteRelationshipFingerprint(sendKv); err != nil { + return err + } + if err := sendKv.Delete(relationshipKey, currentRelationshipVersion); err != nil { + return errors.Errorf("Could not delete send relationship: %v", err) + } + + // Delete the receive information + receiveKv := manager.kv.Prefix(Receive.prefix()) + manager.receive.Delete() + if err := deleteRelationshipFingerprint(receiveKv); err != nil { + return err + } + if err := receiveKv.Delete(relationshipKey, currentRelationshipVersion); err != nil { + return errors.Errorf("Could not delete receive relationship: %v", err) + } + + return nil +} + func LoadRelationship(manager *Manager, t RelationshipType) (*relationship, error) { kv := manager.kv.Prefix(t.prefix()) @@ -166,6 +193,16 @@ func (r *relationship) unmarshal(b []byte) error { return nil } +func (r *relationship) Delete() { + r.mux.Lock() + defer r.mux.Unlock() + for _, s := range r.sessions { + delete(r.sessionByID, s.GetID()) + s.Delete() + } + +} + func (r *relationship) AddSession(myPrivKey, partnerPubKey, baseKey *cyclic.Int, trigger SessionID, negotiationStatus Negotiation, e2eParams params.E2ESessionParams) *Session { diff --git a/storage/e2e/relationshipFingerprint.go b/storage/e2e/relationshipFingerprint.go index a3702655319ccf0f14323d31f5b32b3fe59cb825..29b80896dfc078b8ab1a7cefa348a7860498aec2 100644 --- a/storage/e2e/relationshipFingerprint.go +++ b/storage/e2e/relationshipFingerprint.go @@ -57,3 +57,9 @@ func loadRelationshipFingerprint(kv *versioned.KV) []byte { } return obj.Data } + +// deleteRelationshipFingerprint is a helper function which deletes a fingerprint from store +func deleteRelationshipFingerprint(kv *versioned.KV) error { + return kv.Delete(relationshipFingerprintKey, + currentRelationshipVersion) +} diff --git a/storage/e2e/relationship_test.go b/storage/e2e/relationship_test.go index 457e4f6dcd508abd2014248db7738f46badf947a..1ad211fcd55a389e758711a22ac8bcff24fdd76e 100644 --- a/storage/e2e/relationship_test.go +++ b/storage/e2e/relationship_test.go @@ -68,6 +68,64 @@ func TestLoadRelationship(t *testing.T) { } } +// Shows that a deleted Relationship can no longer be pulled from store +func TestDeleteRelationship(t *testing.T) { + mgr := makeTestRelationshipManager(t) + + // Generate send relationship + mgr.send = NewRelationship(mgr, Send, params.GetDefaultE2ESessionParams()) + if err := mgr.send.save(); err != nil { + t.Fatal(err) + } + + // Generate receive relationship + mgr.receive = NewRelationship(mgr, Receive, params.GetDefaultE2ESessionParams()) + if err := mgr.receive.save(); err != nil { + t.Fatal(err) + } + + err := DeleteRelationship(mgr) + if err != nil { + t.Fatalf("DeleteRelationship error: Could not delete manager: %v", err) + } + + _, err = LoadRelationship(mgr, Send) + if err == nil { + t.Fatalf("DeleteRelationship error: Should not have loaded deleted relationship: %v", err) + } + + _, err = LoadRelationship(mgr, Receive) + if err == nil { + t.Fatalf("DeleteRelationship error: Should not have loaded deleted relationship: %v", err) + } +} + +// Shows that a deleted relationship fingerprint can no longer be pulled from store +func TestRelationship_deleteRelationshipFingerprint(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatalf("deleteRelationshipFingerprint error: " + + "Did not panic when loading deleted fingerprint") + } + }() + + mgr := makeTestRelationshipManager(t) + sb := NewRelationship(mgr, Send, params.GetDefaultE2ESessionParams()) + + err := sb.save() + if err != nil { + t.Fatal(err) + } + + err = deleteRelationshipFingerprint(mgr.kv) + if err != nil { + t.Fatalf("deleteRelationshipFingerprint error: "+ + "Could not delete fingerprint: %v", err) + } + + loadRelationshipFingerprint(mgr.kv) +} + // Shows that Relationship returns a valid session buff func TestNewRelationshipBuff(t *testing.T) { mgr := makeTestRelationshipManager(t) diff --git a/storage/e2e/session.go b/storage/e2e/session.go index 6e85d873e36768d2372b5f713c45527625233c3c..9f74ebf35efe310214237e6da36f9f4029c872be 100644 --- a/storage/e2e/session.go +++ b/storage/e2e/session.go @@ -201,7 +201,8 @@ func (s *Session) save() error { /*METHODS*/ // Done all unused key fingerprints -// delete this session and its key states from the storage + +// Delete removes this session and its key states from the storage func (s *Session) Delete() { s.mux.Lock() defer s.mux.Unlock() @@ -221,7 +222,7 @@ func (s *Session) Delete() { } } -//Gets the base key. +// GetBaseKey retrieves the base key. func (s *Session) GetBaseKey() *cyclic.Int { // no lock is needed because this cannot be edited return s.baseKey.DeepCopy() diff --git a/storage/e2e/store.go b/storage/e2e/store.go index b6dc289c9b83edfa42e1468a015203d67dd7c1cd..2a8006902a9adf5a94704175f55f548f61eb0d7a 100644 --- a/storage/e2e/store.go +++ b/storage/e2e/store.go @@ -188,6 +188,21 @@ func (s *Store) AddPartner(partnerID *id.ID, partnerPubKey, myPrivKey *cyclic.In return nil } +// DeletePartner removes the associated contact from the E2E store +func (s *Store) DeletePartner(partnerId *id.ID) error { + m, ok := s.managers[*partnerId] + if !ok { + return errors.New(NoPartnerErrorStr) + } + + if err := clearManager(m, s.kv); err != nil { + return errors.WithMessagef(err, "Could not remove partner %s from store", partnerId) + } + + delete(s.managers, *partnerId) + return s.save() +} + func (s *Store) GetPartner(partnerID *id.ID) (*Manager, error) { s.mux.RLock() defer s.mux.RUnlock() diff --git a/storage/e2e/store_test.go b/storage/e2e/store_test.go index ef23f381f0370afec21f22c75d65dee892c93163..016184b58e803a9b92d763be3c880b8a1b4dc05e 100644 --- a/storage/e2e/store_test.go +++ b/storage/e2e/store_test.go @@ -98,7 +98,10 @@ func TestStore_AddPartner(t *testing.T) { expectedManager := newManager(s.context, s.kv, partnerID, s.dhPrivateKey, pubKey, p, p) - s.AddPartner(partnerID, pubKey, s.dhPrivateKey, p, p) + err := s.AddPartner(partnerID, pubKey, s.dhPrivateKey, p, p) + if err != nil { + t.Fatalf("AddPartner returned an error: %v", err) + } m, exists := s.managers[*partnerID] if !exists { @@ -111,6 +114,30 @@ func TestStore_AddPartner(t *testing.T) { } } +// Unit test for DeletePartner +func TestStore_DeletePartner(t *testing.T) { + s, _, _ := makeTestStore() + partnerID := id.NewIdFromUInt(rand.Uint64(), id.User, t) + pubKey := diffieHellman.GeneratePublicKey(s.dhPrivateKey, s.grp) + p := params.GetDefaultE2ESessionParams() + + err := s.AddPartner(partnerID, pubKey, s.dhPrivateKey, p, p) + if err != nil { + t.Fatalf("DeletePartner error: Could not add partner in set up: %v", err) + } + + err = s.DeletePartner(partnerID) + if err != nil { + t.Fatalf("DeletePartner received an error: %v", err) + } + + _, err = s.GetPartner(partnerID) + if err == nil { + t.Errorf("DeletePartner error: Should not be able to pull deleted partner from store") + } + +} + // Tests happy path of Store.GetPartner. func TestStore_GetPartner(t *testing.T) { s, _, _ := makeTestStore() diff --git a/storage/partition/multiPartMessage.go b/storage/partition/multiPartMessage.go index 754c524702498c2c6586d54a42adedf3351a7cc5..3aa8cffb556c3bd23f8afb6f8730830919b3c07f 100644 --- a/storage/partition/multiPartMessage.go +++ b/storage/partition/multiPartMessage.go @@ -30,8 +30,11 @@ type multiPartMessage struct { MessageID uint64 NumParts uint8 PresentParts uint8 - Timestamp time.Time - MessageType message.Type + // Timestamp of message from sender + SenderTimestamp time.Time + // Timestamp in which message was stored in RAM + StorageTimestamp time.Time + MessageType message.Type parts [][]byte kv *versioned.KV @@ -48,13 +51,13 @@ func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64, if err != nil { if !ekv.Exists(err) { mpm := &multiPartMessage{ - Sender: sender, - MessageID: messageID, - NumParts: 0, - PresentParts: 0, - Timestamp: time.Time{}, - MessageType: 0, - kv: kv, + Sender: sender, + MessageID: messageID, + NumParts: 0, + PresentParts: 0, + SenderTimestamp: time.Time{}, + MessageType: 0, + kv: kv, } if err = mpm.save(); err != nil { jww.FATAL.Panicf("Failed to save new multi part "+ @@ -119,7 +122,7 @@ func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) { } func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8, - numParts uint8, timestamp time.Time, part []byte) { + numParts uint8, senderTimestamp, storageTimestamp time.Time, part []byte) { mpm.mux.Lock() defer mpm.mux.Unlock() @@ -129,10 +132,11 @@ func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8, } mpm.NumParts = numParts - mpm.Timestamp = timestamp + mpm.SenderTimestamp = senderTimestamp mpm.MessageType = mt mpm.parts[partNumber] = part mpm.PresentParts++ + mpm.StorageTimestamp = storageTimestamp if err := savePart(mpm.kv, partNumber, part); err != nil { jww.FATAL.Panicf("Failed to save multi part "+ @@ -159,27 +163,8 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message mpm.parts = append(mpm.parts, make([][]byte, int(mpm.NumParts)-len(mpm.parts))...) } - var err error - lenMsg := 0 - // Load all parts from disk, deleting files from disk as we go along - for i := uint8(0); i < mpm.NumParts; i++ { - if mpm.parts[i] == nil { - if mpm.parts[i], err = loadPart(mpm.kv, i); err != nil { - jww.FATAL.Panicf("Failed to load multi part "+ - "message part %v from %s messageID %v: %s", i, mpm.Sender, - mpm.MessageID, err) - } - if err = deletePart(mpm.kv, i); err != nil { - jww.FATAL.Panicf("Failed to delete multi part "+ - "message part %v from %s messageID %v: %s", i, mpm.Sender, - mpm.MessageID, err) - } - } - lenMsg += len(mpm.parts[i]) - } - // delete the multipart message - mpm.delete() + lenMsg := mpm.delete() mpm.mux.Unlock() // Reconstruct the message @@ -200,7 +185,7 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message Payload: reconstructed, MessageType: mpm.MessageType, Sender: mpm.Sender, - Timestamp: mpm.Timestamp, + Timestamp: mpm.SenderTimestamp, // Encryption will be set externally Encryption: 0, ID: mid, @@ -209,7 +194,27 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message return m, true } -func (mpm *multiPartMessage) delete() { +// deletes all parts from disk and RAM. Returns the message length for reconstruction +func (mpm *multiPartMessage) delete() int { + // Load all parts from disk, deleting files from disk as we go along + var err error + lenMsg := 0 + for i := uint8(0); i < mpm.NumParts; i++ { + if mpm.parts[i] == nil { + if mpm.parts[i], err = loadPart(mpm.kv, i); err != nil { + jww.FATAL.Panicf("Failed to load multi part "+ + "message part %v from %s messageID %v: %s", i, mpm.Sender, + mpm.MessageID, err) + } + if err = deletePart(mpm.kv, i); err != nil { + jww.FATAL.Panicf("Failed to delete multi part "+ + "message part %v from %s messageID %v: %s", i, mpm.Sender, + mpm.MessageID, err) + } + } + lenMsg += len(mpm.parts[i]) + } + //key := makeMultiPartMessageKey(mpm.MessageID) if err := mpm.kv.Delete(messageKey, currentMultiPartMessageVersion); err != nil { @@ -217,4 +222,6 @@ func (mpm *multiPartMessage) delete() { "message from %s messageID %v: %s", mpm.Sender, mpm.MessageID, err) } + + return lenMsg } diff --git a/storage/partition/multiPartMessage_test.go b/storage/partition/multiPartMessage_test.go index dff3c0fabb41cffd20f714497b487ea8bcbd4644..752d53272bb121f566e2917d746087020f68d14b 100644 --- a/storage/partition/multiPartMessage_test.go +++ b/storage/partition/multiPartMessage_test.go @@ -27,13 +27,13 @@ func Test_loadOrCreateMultiPartMessage_Create(t *testing.T) { // Set up expected test value prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) expectedMpm := &multiPartMessage{ - Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), - MessageID: prng.Uint64(), - NumParts: 0, - PresentParts: 0, - Timestamp: time.Time{}, - MessageType: 0, - kv: versioned.NewKV(make(ekv.Memstore)), + Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), + MessageID: prng.Uint64(), + NumParts: 0, + PresentParts: 0, + SenderTimestamp: time.Time{}, + MessageType: 0, + kv: versioned.NewKV(make(ekv.Memstore)), } expectedData, err := json.Marshal(expectedMpm) if err != nil { @@ -63,13 +63,13 @@ func Test_loadOrCreateMultiPartMessage_Load(t *testing.T) { // Set up expected test value prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) expectedMpm := &multiPartMessage{ - Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), - MessageID: prng.Uint64(), - NumParts: 0, - PresentParts: 0, - Timestamp: time.Time{}, - MessageType: 0, - kv: versioned.NewKV(make(ekv.Memstore)), + Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), + MessageID: prng.Uint64(), + NumParts: 0, + PresentParts: 0, + SenderTimestamp: time.Time{}, + MessageType: 0, + kv: versioned.NewKV(make(ekv.Memstore)), } err := expectedMpm.save() if err != nil { @@ -85,8 +85,8 @@ func Test_loadOrCreateMultiPartMessage_Load(t *testing.T) { func CheckMultiPartMessages(expectedMpm *multiPartMessage, mpm *multiPartMessage, t *testing.T) { // The kv differs because it has prefix called, so we compare fields individually - if expectedMpm.Timestamp != mpm.Timestamp { - t.Errorf("timestamps mismatch: expected %v, got %v", expectedMpm.Timestamp, mpm.Timestamp) + if expectedMpm.SenderTimestamp != mpm.SenderTimestamp { + t.Errorf("timestamps mismatch: expected %v, got %v", expectedMpm.SenderTimestamp, mpm.SenderTimestamp) } if expectedMpm.MessageType != mpm.MessageType { t.Errorf("messagetype mismatch: expected %v, got %v", expectedMpm.MessageID, mpm.MessageID) @@ -159,21 +159,21 @@ func TestMultiPartMessage_AddFirst(t *testing.T) { // Generate test values prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) expectedMpm := &multiPartMessage{ - Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), - MessageID: prng.Uint64(), - NumParts: uint8(prng.Uint32()), - PresentParts: 1, - Timestamp: netTime.Now(), - MessageType: message.NoType, - parts: make([][]byte, 3), - kv: versioned.NewKV(make(ekv.Memstore)), + Sender: id.NewIdFromUInt(prng.Uint64(), id.User, t), + MessageID: prng.Uint64(), + NumParts: uint8(prng.Uint32()), + PresentParts: 1, + SenderTimestamp: netTime.Now(), + MessageType: message.NoType, + parts: make([][]byte, 3), + kv: versioned.NewKV(make(ekv.Memstore)), } expectedMpm.parts[2] = []byte{5, 8, 78, 9} npm := loadOrCreateMultiPartMessage(expectedMpm.Sender, expectedMpm.MessageID, expectedMpm.kv) npm.AddFirst(expectedMpm.MessageType, 2, expectedMpm.NumParts, - expectedMpm.Timestamp, expectedMpm.parts[2]) + expectedMpm.SenderTimestamp, netTime.Now(), expectedMpm.parts[2]) CheckMultiPartMessages(expectedMpm, npm, t) @@ -203,7 +203,7 @@ func TestMultiPartMessage_IsComplete(t *testing.T) { t.Error("IsComplete() returned true when NumParts == 0.") } - mpm.AddFirst(message.Text, partNums[0], 75, netTime.Now(), parts[0]) + mpm.AddFirst(message.Text, partNums[0], 75, netTime.Now(), netTime.Now(), parts[0]) for i := range partNums { if i > 0 { mpm.Add(partNums[i], parts[i]) diff --git a/storage/partition/store.go b/storage/partition/store.go index 52f7f330f06aa4887c64d991f3f384714073b3cb..f8a0e8e2c8054e6bbad61ee764c18b7664cee57f 100644 --- a/storage/partition/store.go +++ b/storage/partition/store.go @@ -9,9 +9,12 @@ package partition import ( "encoding/binary" + "encoding/json" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" "golang.org/x/crypto/blake2b" "sync" "time" @@ -20,29 +23,58 @@ import ( type multiPartID [16]byte const packagePrefix = "Partition" +const clearPartitionThreshold = 24 * time.Hour +const activePartitions = "activePartitions" +const activePartitionVersion = 0 type Store struct { - multiParts map[multiPartID]*multiPartMessage - kv *versioned.KV - mux sync.Mutex + multiParts map[multiPartID]*multiPartMessage + activeParts map[*multiPartMessage]bool + kv *versioned.KV + mux sync.Mutex } func New(kv *versioned.KV) *Store { return &Store{ - multiParts: make(map[multiPartID]*multiPartMessage), - kv: kv.Prefix(packagePrefix), + multiParts: make(map[multiPartID]*multiPartMessage), + activeParts: make(map[*multiPartMessage]bool), + kv: kv.Prefix(packagePrefix), } } +func Load(kv *versioned.KV) *Store { + partitionStore := &Store{ + multiParts: make(map[multiPartID]*multiPartMessage), + activeParts: make(map[*multiPartMessage]bool), + kv: kv.Prefix(packagePrefix), + } + + partitionStore.loadActivePartitions() + + partitionStore.prune() + + return partitionStore +} + func (s *Store) AddFirst(partner *id.ID, mt message.Type, messageID uint64, - partNum, numParts uint8, timestamp time.Time, + partNum, numParts uint8, senderTimestamp, storageTimestamp time.Time, part []byte, relationshipFingerprint []byte) (message.Receive, bool) { mpm := s.load(partner, messageID) - mpm.AddFirst(mt, partNum, numParts, timestamp, part) + mpm.AddFirst(mt, partNum, numParts, senderTimestamp, storageTimestamp, part) + msg, ok := mpm.IsComplete(relationshipFingerprint) + s.mux.Lock() + defer s.mux.Unlock() + if !ok { + s.activeParts[mpm] = true + s.saveActiveParts() + } else { + mpID := getMultiPartID(mpm.Sender, mpm.MessageID) + delete(s.multiParts, mpID) + } - return mpm.IsComplete(relationshipFingerprint) + return msg, ok } func (s *Store) Add(partner *id.ID, messageID uint64, partNum uint8, @@ -52,7 +84,33 @@ func (s *Store) Add(partner *id.ID, messageID uint64, partNum uint8, mpm.Add(partNum, part) - return mpm.IsComplete(relationshipFingerprint) + msg, ok := mpm.IsComplete(relationshipFingerprint) + if !ok { + s.activeParts[mpm] = true + s.saveActiveParts() + } else { + mpID := getMultiPartID(mpm.Sender, mpm.MessageID) + delete(s.multiParts, mpID) + } + + return msg, ok +} + +// Prune clear old messages on it's stored timestamp +func (s *Store) prune() { + s.mux.Lock() + defer s.mux.Unlock() + now := netTime.Now() + for mpm, _ := range s.activeParts { + if now.Sub(mpm.StorageTimestamp) >= clearPartitionThreshold { + jww.INFO.Printf("prune partition: %v", mpm) + mpm.mux.Lock() + mpm.delete() + mpID := getMultiPartID(mpm.Sender, mpm.MessageID) + mpm.mux.Unlock() + delete(s.multiParts, mpID) + } + } } func (s *Store) load(partner *id.ID, messageID uint64) *multiPartMessage { @@ -68,6 +126,56 @@ func (s *Store) load(partner *id.ID, messageID uint64) *multiPartMessage { return mpm } +func (s *Store) saveActiveParts() { + jww.INFO.Printf("Saving %d active partitions", len(s.activeParts)) + activeList := make([]*multiPartMessage, 0, len(s.activeParts)) + for mpm := range s.activeParts { + mpm.mux.Lock() + jww.INFO.Printf("saveActiveParts saving %v", mpm) + activeList = append(activeList, mpm) + mpm.mux.Unlock() + } + + data, err := json.Marshal(&activeList) + if err != nil { + jww.FATAL.Panicf("Could not save active partitions: %v", err) + } + + obj := versioned.Object{ + Version: activePartitionVersion, + Timestamp: netTime.Now(), + Data: data, + } + + err = s.kv.Set(activePartitions, activePartitionVersion, &obj) + if err != nil { + jww.FATAL.Panicf("Could not save active partitions: %v", err) + } +} + +func (s *Store) loadActivePartitions() { + s.mux.Lock() + defer s.mux.Unlock() + obj, err := s.kv.Get(activePartitions, activePartitionVersion) + if err != nil { + jww.DEBUG.Printf("Could not load active partitions: %v", err) + return + } + + activeList := make([]*multiPartMessage, 0) + if err := json.Unmarshal(obj.Data, &activeList); err != nil { + jww.FATAL.Panicf("Failed to "+ + "unmarshal active partitions: %v", err) + } + jww.INFO.Printf("loadActivePartitions found %d active", len(activeList)) + + for _, activeMpm := range activeList { + mpm := loadOrCreateMultiPartMessage(activeMpm.Sender, activeMpm.MessageID, s.kv) + s.activeParts[mpm] = true + } + +} + func getMultiPartID(partner *id.ID, messageID uint64) multiPartID { h, _ := blake2b.New256(nil) diff --git a/storage/partition/store_test.go b/storage/partition/store_test.go index 9de6327d989d032e6ed987db0aff891d6be0fa3b..4e3d221a159be0f1fd73664c1d6d6a9e7b5097aa 100644 --- a/storage/partition/store_test.go +++ b/storage/partition/store_test.go @@ -22,8 +22,9 @@ import ( func TestNew(t *testing.T) { rootKv := versioned.NewKV(make(ekv.Memstore)) expectedStore := &Store{ - multiParts: make(map[multiPartID]*multiPartMessage), - kv: rootKv.Prefix(packagePrefix), + multiParts: make(map[multiPartID]*multiPartMessage), + activeParts: make(map[*multiPartMessage]bool), + kv: rootKv.Prefix(packagePrefix), } store := New(rootKv) @@ -40,7 +41,7 @@ func TestStore_AddFirst(t *testing.T) { s := New(versioned.NewKV(ekv.Memstore{})) msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t), - message.Text, 5, 0, 1, netTime.Now(), part, + message.Text, 5, 0, 1, netTime.Now(), netTime.Now(), part, []byte{0}) if !complete { @@ -60,7 +61,7 @@ func TestStore_Add(t *testing.T) { s := New(versioned.NewKV(ekv.Memstore{})) msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t), - message.Text, 5, 0, 2, netTime.Now(), part1, + message.Text, 5, 0, 2, netTime.Now(), netTime.Now(), part1, []byte{0}) if complete { @@ -79,3 +80,44 @@ func TestStore_Add(t *testing.T) { "\n\texpected: %v\n\treceived: %v", part, msg.Payload) } } + +// Unit test of prune +func TestStore_ClearMessages(t *testing.T) { + // Setup: Add 2 message to store: an old message past the threshold and a new message + part1 := []byte("Test message.") + part2 := []byte("Second Sentence.") + s := New(versioned.NewKV(ekv.Memstore{})) + + partner1 := id.NewIdFromString("User", id.User, t) + messageId1 := uint64(5) + oldTimestamp := netTime.Now().Add(-2 * clearPartitionThreshold) + s.AddFirst(partner1, + message.Text, messageId1, 0, 2, netTime.Now(), + oldTimestamp, part1, + []byte{0}) + s.Add(partner1, messageId1, 1, part2, []byte{0}) + + partner2 := id.NewIdFromString("User1", id.User, t) + messageId2 := uint64(6) + newTimestamp := netTime.Now() + s.AddFirst(partner2, message.Text, messageId2, 0, 2, netTime.Now(), + newTimestamp, part1, + []byte{0}) + + // Call clear messages + s.prune() + + // Check if old message cleared + mpmId := getMultiPartID(partner1, messageId1) + if _, ok := s.multiParts[mpmId]; ok { + t.Errorf("Prune error: " + + "Expected old message to be cleared out of store") + } + + // Check if new message remains + mpmId2 := getMultiPartID(partner2, messageId2) + if _, ok := s.multiParts[mpmId2]; !ok { + t.Errorf("Prune error: " + + "Expected new message to be remain in store") + } +} diff --git a/storage/session.go b/storage/session.go index 3c4c1932baf747b080a5272b3702876abb57c652..e42cad445594e30d7b85eafa688d90b80d4d62ed 100644 --- a/storage/session.go +++ b/storage/session.go @@ -220,7 +220,7 @@ func Load(baseDir, password string, currentVersion version.Version, } s.conversations = conversation.NewStore(s.kv) - s.partition = partition.New(s.kv) + s.partition = partition.Load(s.kv) s.reception = reception.LoadStore(s.kv) diff --git a/storage/user.go b/storage/user.go index a236f22d35e618d77c75277c9ece5999712832a3..975b574a2e9678a31eff7f0598b396ab6427a6af 100644 --- a/storage/user.go +++ b/storage/user.go @@ -14,18 +14,18 @@ func (s *Session) GetUser() user.User { defer s.mux.RUnlock() ci := s.user.GetCryptographicIdentity() return user.User{ - TransmissionID: ci.GetTransmissionID().DeepCopy(), - TransmissionSalt: copySlice(ci.GetTransmissionSalt()), - TransmissionRSA: ci.GetReceptionRSA(), - ReceptionID: ci.GetReceptionID().DeepCopy(), + TransmissionID: ci.GetTransmissionID().DeepCopy(), + TransmissionSalt: copySlice(ci.GetTransmissionSalt()), + TransmissionRSA: ci.GetReceptionRSA(), + ReceptionID: ci.GetReceptionID().DeepCopy(), RegistrationTimestamp: s.user.GetRegistrationTimestamp(), - ReceptionSalt: copySlice(ci.GetReceptionSalt()), - ReceptionRSA: ci.GetReceptionRSA(), - Precanned: ci.IsPrecanned(), - CmixDhPrivateKey: s.cmix.GetDHPrivateKey().DeepCopy(), - CmixDhPublicKey: s.cmix.GetDHPublicKey().DeepCopy(), - E2eDhPrivateKey: s.e2e.GetDHPrivateKey().DeepCopy(), - E2eDhPublicKey: s.e2e.GetDHPublicKey().DeepCopy(), + ReceptionSalt: copySlice(ci.GetReceptionSalt()), + ReceptionRSA: ci.GetReceptionRSA(), + Precanned: ci.IsPrecanned(), + CmixDhPrivateKey: s.cmix.GetDHPrivateKey().DeepCopy(), + CmixDhPublicKey: s.cmix.GetDHPublicKey().DeepCopy(), + E2eDhPrivateKey: s.e2e.GetDHPrivateKey().DeepCopy(), + E2eDhPublicKey: s.e2e.GetDHPublicKey().DeepCopy(), } } diff --git a/storage/utility/dh.go b/storage/utility/dh.go index 9b0280ed1ef4bd96e71d15fcc788b5633c51fe03..6295e446d3563127c4c1262733eb3abf8ac8b91e 100644 --- a/storage/utility/dh.go +++ b/storage/utility/dh.go @@ -42,3 +42,8 @@ func LoadCyclicKey(kv *versioned.KV, key string) (*cyclic.Int, error) { return cy, cy.GobDecode(vo.Data) } + +// DeleteCyclicKey deletes a given cyclic key from storage +func DeleteCyclicKey(kv *versioned.KV, key string) error { + return kv.Delete(key, currentCyclicVersion) +} diff --git a/storage/utility/dh_test.go b/storage/utility/dh_test.go index 4d4a31941faee3467f029ffca4dbc57f8fa9dafe..36fb3c5734af6ea7bd29479279f01b10522b803d 100644 --- a/storage/utility/dh_test.go +++ b/storage/utility/dh_test.go @@ -47,3 +47,27 @@ func TestLoadCyclicKey(t *testing.T) { t.Errorf("Stored int did not match received. Stored: %v, Received: %v", x, loaded) } } + +// Unit test for DeleteCyclicKey +func TestDeleteCyclicKey(t *testing.T) { + kv := make(ekv.Memstore) + vkv := versioned.NewKV(kv) + grp := getTestGroup() + x := grp.NewInt(77) + + intKey := "testKey" + err := StoreCyclicKey(vkv, x, intKey) + if err != nil { + t.Errorf("Failed to store cyclic key: %+v", err) + } + + err = DeleteCyclicKey(vkv, intKey) + if err != nil { + t.Fatalf("DeleteCyclicKey returned an error: %v", err) + } + + _, err = LoadCyclicKey(vkv, intKey) + if err == nil { + t.Errorf("DeleteCyclicKey error: Should not load deleted key: %+v", err) + } +}