From c818a0e4fa0a5a697c9ba3cc8fc2eab0dcd2be0b Mon Sep 17 00:00:00 2001 From: "Richard T. Carback III" <rick.carback@gmail.com> Date: Tue, 9 Mar 2021 17:57:39 +0000 Subject: [PATCH] Use object level versioning This changes the global verioned object upgrade table to per-object upgrade tables, necessitating a version being sent to the get/set calls. It's not pretty but we decided it would have better flexibility moving forward and could be cleaned up in the future. --- storage/auth/sentRequest.go | 9 ++-- storage/auth/store.go | 4 +- storage/cmix/key.go | 6 +-- storage/cmix/store.go | 4 +- storage/conversation/partner.go | 6 +-- storage/e2e/relationship.go | 7 +-- storage/e2e/relationshipFingerprint.go | 9 ++-- storage/e2e/session.go | 13 +++--- storage/e2e/session_test.go | 14 +++--- storage/e2e/stateVector.go | 6 +-- storage/e2e/store.go | 4 +- storage/e2e/store_test.go | 2 +- storage/partition/multiPartMessage.go | 7 +-- storage/partition/multiPartMessage_test.go | 6 +-- storage/partition/part.go | 6 +-- storage/partition/part_test.go | 4 +- storage/reception/identity.go | 6 +-- storage/reception/registration_test.go | 4 +- storage/reception/store.go | 22 +++++---- storage/reception/store_test.go | 4 +- storage/reception/unknownRound.go | 7 +-- storage/session.go | 7 +-- storage/user/cryptographic.go | 6 ++- storage/user/cryptographic_test.go | 2 +- storage/user/regValidationSig.go | 12 +++-- storage/user/regValidationSig_test.go | 26 ++++++----- storage/user/username.go | 4 +- storage/user/username_test.go | 4 +- storage/utility/NDF.go | 4 +- storage/utility/cmixMessageBuffer.go | 6 +-- storage/utility/cmixMessageBuffer_test.go | 2 +- storage/utility/contact.go | 6 +-- storage/utility/dh.go | 4 +- storage/utility/e2eMessageBuffer.go | 6 +-- storage/utility/e2eMessageBuffer_test.go | 2 +- storage/utility/group.go | 4 +- storage/utility/messageBuffer.go | 4 +- storage/utility/messageBuffer_test.go | 4 +- storage/utility/meteredCmixMessageBuffer.go | 6 +-- .../utility/meteredCmixMessageBuffer_test.go | 4 +- storage/versioned/kv.go | 45 +++++++++---------- storage/versioned/kv_test.go | 43 +++++++++--------- 42 files changed, 186 insertions(+), 165 deletions(-) diff --git a/storage/auth/sentRequest.go b/storage/auth/sentRequest.go index d96fcaf0c..09251401c 100644 --- a/storage/auth/sentRequest.go +++ b/storage/auth/sentRequest.go @@ -37,7 +37,8 @@ type sentRequestDisk struct { } func loadSentRequest(kv *versioned.KV, partner *id.ID, grp *cyclic.Group) (*SentRequest, error) { - obj, err := kv.Get(versioned.MakePartnerPrefix(partner)) + obj, err := kv.Get(versioned.MakePartnerPrefix(partner), + currentSentRequestVersion) if err != nil { return nil, errors.WithMessagef(err, "Failed to Load "+ "SentRequest Auth with %s", partner) @@ -115,11 +116,13 @@ func (sr *SentRequest) save() error { Data: data, } - return sr.kv.Set(versioned.MakePartnerPrefix(sr.partner), &obj) + return sr.kv.Set(versioned.MakePartnerPrefix(sr.partner), + currentSentRequestVersion, &obj) } func (sr *SentRequest) delete() error { - return sr.kv.Delete(versioned.MakePartnerPrefix(sr.partner)) + return sr.kv.Delete(versioned.MakePartnerPrefix(sr.partner), + currentSentRequestVersion) } func (sr *SentRequest) GetPartner() *id.ID { diff --git a/storage/auth/store.go b/storage/auth/store.go index 4d67ac616..1d8f07443 100644 --- a/storage/auth/store.go +++ b/storage/auth/store.go @@ -65,7 +65,7 @@ func NewStore(kv *versioned.KV, grp *cyclic.Group, privKeys []*cyclic.Int) (*Sto // fingerprints so they can be used to trigger requests. func LoadStore(kv *versioned.KV, grp *cyclic.Group, privKeys []*cyclic.Int) (*Store, error) { kv = kv.Prefix(storePrefix) - sentObj, err := kv.Get(requestMapKey) + sentObj, err := kv.Get(requestMapKey, requestMapVersion) if err != nil { return nil, errors.WithMessagef(err, "Failed to load requestMap") } @@ -165,7 +165,7 @@ func (s *Store) save() error { Data: data, } - return s.kv.Set(requestMapKey, &obj) + return s.kv.Set(requestMapKey, requestMapVersion, &obj) } func (s *Store) AddSent(partner *id.ID, partnerHistoricalPubKey, myPrivKey, diff --git a/storage/cmix/key.go b/storage/cmix/key.go index ae57a4d77..3a7f6400e 100644 --- a/storage/cmix/key.go +++ b/storage/cmix/key.go @@ -49,7 +49,7 @@ func loadKey(kv *versioned.KV, id *id.ID) (*key, error) { key := keyKey(id) - obj, err := kv.Get(key) + obj, err := kv.Get(key, currentKeyVersion) if err != nil { return nil, err } @@ -78,13 +78,13 @@ func (k *key) save() error { Data: data, } - return k.kv.Set(k.storeKey, &obj) + return k.kv.Set(k.storeKey, currentKeyVersion, &obj) } // deletes the key from the versioned keystore func (k *key) delete(kv *versioned.KV, id *id.ID) { key := keyKey(id) - if err := kv.Delete(key); err != nil { + if err := kv.Delete(key, currentKeyVersion); err != nil { jww.FATAL.Panicf("Failed to delete key %s: %s", k, err) } } diff --git a/storage/cmix/store.go b/storage/cmix/store.go index cc03ebcf3..e4cf5ec3f 100644 --- a/storage/cmix/store.go +++ b/storage/cmix/store.go @@ -82,7 +82,7 @@ func LoadStore(kv *versioned.KV) (*Store, error) { kv: kv, } - obj, err := kv.Get(storeKey) + obj, err := kv.Get(storeKey, currentKeyVersion) if err != nil { return nil, err } @@ -200,7 +200,7 @@ func (s *Store) save() error { Data: data, } - return s.kv.Set(storeKey, &obj) + return s.kv.Set(storeKey, currentKeyVersion, &obj) } // marshal builds a byte representation of the Store. diff --git a/storage/conversation/partner.go b/storage/conversation/partner.go index 50650a1ac..3703f77ea 100644 --- a/storage/conversation/partner.go +++ b/storage/conversation/partner.go @@ -133,7 +133,7 @@ func (c *Conversation) GetNextSendID() (uint64, uint32) { func loadConversation(kv *versioned.KV, partner *id.ID) (*Conversation, error) { key := makeConversationKey(partner) - obj, err := kv.Get(key) + obj, err := kv.Get(key, 0) if err != nil { return nil, errors.WithMessage(err, "Failed to Load conversation") } @@ -164,13 +164,13 @@ func (c *Conversation) save() error { } key := makeConversationKey(c.partner) - return c.kv.Set(key, &obj) + return c.kv.Set(key, 0, &obj) } // delete removes the Conversation from KV storage. func (c *Conversation) delete() error { key := makeConversationKey(c.partner) - return c.kv.Delete(key) + return c.kv.Delete(key, currentConversationVersion) } func (c *Conversation) unmarshal(b []byte) error { diff --git a/storage/e2e/relationship.go b/storage/e2e/relationship.go index 8af504600..0ff938ab6 100644 --- a/storage/e2e/relationship.go +++ b/storage/e2e/relationship.go @@ -95,7 +95,7 @@ func LoadRelationship(manager *Manager, t RelationshipType) (*relationship, erro kv: kv, } - obj, err := kv.Get(relationshipKey) + obj, err := kv.Get(relationshipKey, currentRelationshipVersion) if err != nil { return nil, err } @@ -124,7 +124,7 @@ func (r *relationship) save() error { Data: data, } - return r.kv.Set(relationshipKey, &obj) + return r.kv.Set(relationshipKey, currentRelationshipVersion, &obj) } //ekv functions @@ -362,7 +362,8 @@ func (r *relationship) clean() { if err := r.save(); err != nil { jww.FATAL.Printf("Failed to save Session Buffer %s after "+ - "clean: %s", r.kv.GetFullKey(relationshipKey), err) + "clean: %s", r.kv.GetFullKey(relationshipKey, + currentRelationshipVersion), err) } } } diff --git a/storage/e2e/relationshipFingerprint.go b/storage/e2e/relationshipFingerprint.go index 5da176299..f071e5ffb 100644 --- a/storage/e2e/relationshipFingerprint.go +++ b/storage/e2e/relationshipFingerprint.go @@ -43,14 +43,17 @@ func storeRelationshipFingerprint(fp []byte, kv *versioned.KV) error { Data: fp, } - return kv.Set(relationshipFingerprintKey, &obj) + return kv.Set(relationshipFingerprintKey, currentRelationshipVersion, + &obj) } func loadRelationshipFingerprint(kv *versioned.KV) []byte { - obj, err := kv.Get(relationshipFingerprintKey) + obj, err := kv.Get(relationshipFingerprintKey, + currentRelationshipVersion) if err != nil { jww.FATAL.Panicf("Failed to load relationshipFingerprint at %s: "+ - "%s", kv.GetFullKey(relationshipFingerprintKey), err) + "%s", kv.GetFullKey(relationshipFingerprintKey, + currentRelationshipFingerprintVersion), err) } return obj.Data } diff --git a/storage/e2e/session.go b/storage/e2e/session.go index 6172f56ad..fbfe0d0bd 100644 --- a/storage/e2e/session.go +++ b/storage/e2e/session.go @@ -149,13 +149,14 @@ func loadSession(ship *relationship, kv *versioned.KV, kv: kv, } - obj, err := kv.Get(sessionKey) + obj, err := kv.Get(sessionKey, currentSessionVersion) if err != nil { - return nil, errors.WithMessagef(err, "Failed to load %s", kv.GetFullKey(sessionKey)) + return nil, errors.WithMessagef(err, "Failed to load %s", + kv.GetFullKey(sessionKey, currentSessionVersion)) } - obj, err := sessionUpgradeTable.Upgrade(obj) - + // TODO: Not necessary until we have versions on this object... + //obj, err := sessionUpgradeTable.Upgrade(obj) err = session.unmarshal(obj.Data) if err != nil { @@ -186,7 +187,7 @@ func (s *Session) save() error { Data: data, } - return s.kv.Set(sessionKey, &obj) + return s.kv.Set(sessionKey, currentSessionVersion, &obj) } /*METHODS*/ @@ -199,7 +200,7 @@ func (s *Session) Delete() { s.relationship.manager.ctx.fa.remove(s.getUnusedKeys()) stateVectorErr := s.keyState.Delete() - sessionErr := s.kv.Delete(sessionKey) + sessionErr := s.kv.Delete(sessionKey, currentSessionVersion) if stateVectorErr != nil && sessionErr != nil { jww.ERROR.Printf("Error deleting state vector with key %v: %v", stateVectorKey, stateVectorErr.Error()) diff --git a/storage/e2e/session_test.go b/storage/e2e/session_test.go index 63126229c..495dc3909 100644 --- a/storage/e2e/session_test.go +++ b/storage/e2e/session_test.go @@ -310,11 +310,11 @@ func TestSession_Delete(t *testing.T) { s.Delete() // Getting the keys that should have been stored should now result in an error - _, err = s.kv.Get(stateVectorKey) + _, err = s.kv.Get(stateVectorKey, 0) if err == nil { t.Error("State vector was gettable") } - _, err = s.kv.Get(sessionKey) + _, err = s.kv.Get(sessionKey, 0) if err == nil { t.Error("Session was gettable") } @@ -466,7 +466,7 @@ func TestSession_SetNegotiationStatus(t *testing.T) { if s.negotiationStatus != Sent { t.Error("SetNegotiationStatus didn't set the negotiation status") } - object, err := s.kv.Get(sessionKey) + object, err := s.kv.Get(sessionKey, 0) if err != nil { t.Fatal(err) } @@ -480,7 +480,7 @@ func TestSession_SetNegotiationStatus(t *testing.T) { if s.negotiationStatus != Confirmed { t.Error("SetNegotiationStatus didn't set the negotiation status") } - object, err = s.kv.Get(sessionKey) + object, err = s.kv.Get(sessionKey, 0) if err != nil { t.Fatal(err) } @@ -495,7 +495,7 @@ func TestSession_SetNegotiationStatus(t *testing.T) { if s.negotiationStatus != NewSessionCreated { t.Error("SetNegotiationStatus didn't set the negotiation status") } - object, err = s.kv.Get(sessionKey) + object, err = s.kv.Get(sessionKey, 0) if err != nil { t.Fatal(err) } @@ -512,7 +512,7 @@ func TestSession_SetNegotiationStatus(t *testing.T) { if s.negotiationStatus != Unconfirmed { t.Error("SetNegotiationStatus didn't set the negotiation status") } - object, err = s.kv.Get(sessionKey) + object, err = s.kv.Get(sessionKey, 0) if err != nil { t.Fatal(err) } @@ -525,7 +525,7 @@ func TestSession_SetNegotiationStatus(t *testing.T) { if s.negotiationStatus != Confirmed { t.Error("SetNegotiationStatus didn't set the negotiation status") } - object, err = s.kv.Get(sessionKey) + object, err = s.kv.Get(sessionKey, 0) if err != nil { t.Fatal(err) } diff --git a/storage/e2e/stateVector.go b/storage/e2e/stateVector.go index d72d5c28f..4a2e6271c 100644 --- a/storage/e2e/stateVector.go +++ b/storage/e2e/stateVector.go @@ -65,7 +65,7 @@ func loadStateVector(kv *versioned.KV, key string) (*stateVector, error) { key: stateVectorKey + key, } - obj, err := kv.Get(sv.key) + obj, err := kv.Get(sv.key, currentStateVectorVersion) if err != nil { return nil, err } @@ -92,7 +92,7 @@ func (sv *stateVector) save() error { Data: data, } - return sv.kv.Set(sv.key, &obj) + return sv.kv.Set(sv.key, currentStateVectorVersion, &obj) } func (sv *stateVector) Use(keynum uint32) { @@ -205,7 +205,7 @@ func (sv *stateVector) String() string { //Deletes the state vector from storage func (sv *stateVector) Delete() error { - return sv.kv.Delete(sv.key) + return sv.kv.Delete(sv.key, currentStateVectorVersion) } // finds the next used state and sets that as firstAvailable. This does not diff --git a/storage/e2e/store.go b/storage/e2e/store.go index cff582cc6..ae06cb56b 100644 --- a/storage/e2e/store.go +++ b/storage/e2e/store.go @@ -128,7 +128,7 @@ func LoadStore(kv *versioned.KV, myID *id.ID, rng *fastRNG.StreamGenerator) (*St e2eParams: params.GetDefaultE2ESessionParams(), } - obj, err := kv.Get(storeKey) + obj, err := kv.Get(storeKey, currentStoreVersion) if err != nil { return nil, err } @@ -157,7 +157,7 @@ func (s *Store) save() error { Data: data, } - return s.kv.Set(storeKey, &obj) + return s.kv.Set(storeKey, currentStoreVersion, &obj) } func (s *Store) AddPartner(partnerID *id.ID, partnerPubKey, myPrivKey *cyclic.Int, diff --git a/storage/e2e/store_test.go b/storage/e2e/store_test.go index 0d7f760b9..944b2517d 100644 --- a/storage/e2e/store_test.go +++ b/storage/e2e/store_test.go @@ -62,7 +62,7 @@ func TestNewStore(t *testing.T) { "\n\texpected: %+v\n\treceived: %+v", expectedStore, store) } - key, err := expectedStore.kv.Get(storeKey) + key, err := expectedStore.kv.Get(storeKey, 0) if err != nil { t.Errorf("Get() encoutnered an error when getting Store from KV: %v", err) } diff --git a/storage/partition/multiPartMessage.go b/storage/partition/multiPartMessage.go index 257013b8e..7dc6cbb88 100644 --- a/storage/partition/multiPartMessage.go +++ b/storage/partition/multiPartMessage.go @@ -43,7 +43,7 @@ func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64, kv *versioned.KV) *multiPartMessage { kv = kv.Prefix(versioned.MakePartnerPrefix(sender)).Prefix(fmt.Sprintf("MessageID:%d", messageID)) - obj, err := kv.Get(messageKey) + obj, err := kv.Get(messageKey, currentMultiPartMessageVersion) if err != nil { if !ekv.Exists(err) { mpm := &multiPartMessage{ @@ -89,7 +89,7 @@ func (mpm *multiPartMessage) save() error { Data: data, } - return mpm.kv.Set(messageKey, &obj) + return mpm.kv.Set(messageKey, currentMultiPartMessageVersion, &obj) } func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) { @@ -210,7 +210,8 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message func (mpm *multiPartMessage) delete() { //key := makeMultiPartMessageKey(mpm.MessageID) - if err := mpm.kv.Delete(messageKey); err != nil { + if err := mpm.kv.Delete(messageKey, + currentMultiPartMessageVersion); err != nil { jww.FATAL.Panicf("Failed to delete multi part "+ "message from %s messageID %v: %s", mpm.Sender, mpm.MessageID, err) diff --git a/storage/partition/multiPartMessage_test.go b/storage/partition/multiPartMessage_test.go index 103351af6..ec871783b 100644 --- a/storage/partition/multiPartMessage_test.go +++ b/storage/partition/multiPartMessage_test.go @@ -45,7 +45,7 @@ func Test_loadOrCreateMultiPartMessage_Create(t *testing.T) { CheckMultiPartMessages(expectedMpm, mpm, t) - obj, err := mpm.kv.Get(messageKey) + obj, err := mpm.kv.Get(messageKey, 0) if err != nil { t.Errorf("Get() failed to get multiPartMessage from key value store: %v", err) } @@ -141,7 +141,7 @@ func TestMultiPartMessage_Add(t *testing.T) { t.Fatalf("Failed to marshal expected multiPartMessage: %v", err) } - obj, err := mpm.kv.Get(messageKey) + obj, err := mpm.kv.Get(messageKey, 0) if err != nil { t.Errorf("Get() failed to get multiPartMessage from key value store: %v", err) } @@ -243,7 +243,7 @@ func TestMultiPartMessage_delete(t *testing.T) { prng.Uint64(), kv) mpm.delete() - obj, err := kv.Get(messageKey) + obj, err := kv.Get(messageKey, 0) if ekv.Exists(err) { t.Errorf("delete() did not properly delete key %s."+ "\n\tobject received: %+v", messageKey, obj) diff --git a/storage/partition/part.go b/storage/partition/part.go index 307dbfb40..66206fd43 100644 --- a/storage/partition/part.go +++ b/storage/partition/part.go @@ -18,7 +18,7 @@ const currentMultiPartMessagePartVersion = 0 func loadPart(kv *versioned.KV, partNum uint8) ([]byte, error) { key := makeMultiPartMessagePartKey(partNum) - obj, err := kv.Get(key) + obj, err := kv.Get(key, currentMultiPartMessageVersion) if err != nil { return nil, err } @@ -35,12 +35,12 @@ func savePart(kv *versioned.KV, partNum uint8, part []byte) error { Data: part, } - return kv.Set(key, &obj) + return kv.Set(key, currentMultiPartMessageVersion, &obj) } func deletePart(kv *versioned.KV, partNum uint8) error { key := makeMultiPartMessagePartKey(partNum) - return kv.Delete(key) + return kv.Delete(key, currentMultiPartMessageVersion) } // Make the key for a part diff --git a/storage/partition/part_test.go b/storage/partition/part_test.go index f9298ca92..22dfa1115 100644 --- a/storage/partition/part_test.go +++ b/storage/partition/part_test.go @@ -33,7 +33,7 @@ func Test_savePart(t *testing.T) { } // Attempt to get from key value store - obj, err := kv.Get(key) + obj, err := kv.Get(key, 0) if err != nil { t.Errorf("Get() produced an error: %v", err) } @@ -56,7 +56,7 @@ func Test_loadPart(t *testing.T) { key := makeMultiPartMessagePartKey(partNum) // Save part to key value store - err := rootKv.Set(key, &versioned.Object{Timestamp: time.Now(), Data: part}) + err := rootKv.Set(key, 0, &versioned.Object{Timestamp: time.Now(), Data: part}) if err != nil { t.Fatalf("Failed to set object: %v", err) } diff --git a/storage/reception/identity.go b/storage/reception/identity.go index 3aded8192..e4fa2556c 100644 --- a/storage/reception/identity.go +++ b/storage/reception/identity.go @@ -34,7 +34,7 @@ type Identity struct { } func loadIdentity(kv *versioned.KV) (Identity, error) { - obj, err := kv.Get(identityStorageKey) + obj, err := kv.Get(identityStorageKey, identityStorageVersion) if err != nil { return Identity{}, errors.WithMessage(err, "Failed to load Identity") } @@ -62,7 +62,7 @@ func (i Identity) store(kv *versioned.KV) error { } // Store the data - err = kv.Set(identityStorageKey, obj) + err = kv.Set(identityStorageKey, identityStorageVersion, obj) if err != nil { return errors.WithMessage(err, "Failed to store Identity") } @@ -71,7 +71,7 @@ func (i Identity) store(kv *versioned.KV) error { } func (i Identity) delete(kv *versioned.KV) error { - return kv.Delete(identityStorageKey) + return kv.Delete(identityStorageKey, identityStorageVersion) } func (i *Identity) String() string { diff --git a/storage/reception/registration_test.go b/storage/reception/registration_test.go index 20d03c159..77475258a 100644 --- a/storage/reception/registration_test.go +++ b/storage/reception/registration_test.go @@ -48,7 +48,7 @@ func TestNewRegistration_Ephemeral(t *testing.T) { t.Error("Ephemeral identity does not have a known rounds.") } - if _, err = reg.kv.Get(identityStorageKey); err == nil { + if _, err = reg.kv.Get(identityStorageKey, 0); err == nil { t.Error("Ephemeral identity stored the identity when it should not have.") } } @@ -79,7 +79,7 @@ func TestNewRegistration_Persistent(t *testing.T) { // if it isnt LoadUnknownRound(reg.kv) - if _, err = reg.kv.Get(identityStorageKey); err != nil { + if _, err = reg.kv.Get(identityStorageKey, 0); err != nil { t.Errorf("Persistent identity did not store the identity when "+ "it should: %+v.", err) } diff --git a/storage/reception/store.go b/storage/reception/store.go index 6317fdbc5..dcf7dcb79 100644 --- a/storage/reception/store.go +++ b/storage/reception/store.go @@ -26,7 +26,7 @@ const defaultIDSize = 12 type Store struct { // Identities which are being actively checked active []*registration - present map[idHash]interface{} + present map[idHash]interface{} idSize int idSizeCond *sync.Cond isIdSizeSet bool @@ -43,7 +43,8 @@ type storedReference struct { } type idHash [16]byte -func makeIdHash(ephID ephemeral.Id, source *id.ID)idHash{ + +func makeIdHash(ephID ephemeral.Id, source *id.ID) idHash { h := md5.New() h.Write(ephID[:]) h.Write(source.Bytes()) @@ -58,7 +59,7 @@ func NewStore(kv *versioned.KV) *Store { kv = kv.Prefix(receptionPrefix) s := &Store{ active: make([]*registration, 0), - present: make(map[idHash]interface{}), + present: make(map[idHash]interface{}), idSize: defaultIDSize * 2, kv: kv, idSizeCond: sync.NewCond(&sync.Mutex{}), @@ -79,12 +80,13 @@ func LoadStore(kv *versioned.KV) *Store { kv = kv.Prefix(receptionPrefix) s := &Store{ kv: kv, - present: make(map[idHash]interface{}), + present: make(map[idHash]interface{}), idSizeCond: sync.NewCond(&sync.Mutex{}), } // Load the versioned object for the reception list - vo, err := kv.Get(receptionStoreStorageKey) + vo, err := kv.Get(receptionStoreStorageKey, + receptionStoreStorageVersion) if err != nil { jww.FATAL.Panicf("Failed to get the reception storage list: %+v", err) } @@ -106,7 +108,8 @@ func LoadStore(kv *versioned.KV) *Store { } // Load the ephemeral ID length - vo, err = kv.Get(receptionIDSizeStorageKey) + vo, err = kv.Get(receptionIDSizeStorageKey, + receptionIDSizeStorageVersion) if err != nil { jww.FATAL.Panicf("Failed to get the reception ID size: %+v", err) } @@ -134,7 +137,7 @@ func (s *Store) save() error { Data: data, } - err = s.kv.Set(receptionStoreStorageKey, obj) + err = s.kv.Set(receptionStoreStorageKey, receptionStoreStorageVersion, obj) if err != nil { return errors.WithMessage(err, "Failed to store reception store") } @@ -207,7 +210,7 @@ func (s *Store) AddIdentity(identity Identity) error { defer s.mux.Unlock() //do not make duplicates of IDs - if _, ok := s.present[idH]; ok{ + if _, ok := s.present[idH]; ok { jww.DEBUG.Printf("Ignoring duplicate identity for %d (%s)", identity.EphId, identity.Source) return nil @@ -308,7 +311,8 @@ func (s *Store) UpdateIdSize(idSize uint) { Data: []byte(strconv.Itoa(s.idSize)), } - err := s.kv.Set(receptionIDSizeStorageKey, obj) + err := s.kv.Set(receptionIDSizeStorageKey, + receptionIDSizeStorageVersion, obj) if err != nil { jww.FATAL.Panicf("Failed to store reception ID size: %+v", err) } diff --git a/storage/reception/store_test.go b/storage/reception/store_test.go index 635ad0771..7f5dcd089 100644 --- a/storage/reception/store_test.go +++ b/storage/reception/store_test.go @@ -26,7 +26,7 @@ func TestNewStore(t *testing.T) { "\nexpected: %+v\nreceived: %+v", expected, s) } - obj, err := s.kv.Get(receptionStoreStorageKey) + obj, err := s.kv.Get(receptionStoreStorageKey, 0) if err != nil { t.Fatalf("Failed to load store from KV: %+v", err) } @@ -96,7 +96,7 @@ func TestStore_save(t *testing.T) { t.Errorf("save() produced an error: %+v", err) } - obj, err := kv.Prefix(receptionPrefix).Get(receptionStoreStorageKey) + obj, err := kv.Prefix(receptionPrefix).Get(receptionStoreStorageKey, 0) if err != nil { t.Errorf("Get() produced an error: %+v", err) } diff --git a/storage/reception/unknownRound.go b/storage/reception/unknownRound.go index 7912b9d5c..94feb079b 100644 --- a/storage/reception/unknownRound.go +++ b/storage/reception/unknownRound.go @@ -36,7 +36,7 @@ func LoadUnknownRound(kv *versioned.KV) *UnknownRound { rid: 0, } - obj, err := kv.Get(unknownRoundStorageKey) + obj, err := kv.Get(unknownRoundStorageKey, unknownRoundStorageVersion) if err != nil { jww.FATAL.Panicf("Failed to get the unknown round: %+v", err) } @@ -62,7 +62,8 @@ func (ur *UnknownRound) save() { Data: urStr, } - err = ur.kv.Set(unknownRoundStorageKey, obj) + err = ur.kv.Set(unknownRoundStorageKey, + unknownRoundStorageVersion, obj) if err != nil { jww.FATAL.Panicf("Failed to store the unknown round: %+v", err) } @@ -89,7 +90,7 @@ func (ur *UnknownRound) Get() id.Round { func (ur *UnknownRound) delete() { ur.mux.Lock() defer ur.mux.Unlock() - err := ur.kv.Delete(unknownRoundStorageKey) + err := ur.kv.Delete(unknownRoundStorageKey, unknownRoundStorageVersion) if err != nil { jww.FATAL.Panicf("Failed to delete unknownRound storage: %+v", err) } diff --git a/storage/session.go b/storage/session.go index de3768d13..1dab1ae4a 100644 --- a/storage/session.go +++ b/storage/session.go @@ -36,6 +36,7 @@ import ( // Number of rounds to store in the CheckedRound buffer const CheckRoundsMaxSize = 1000000 / 64 +const currentSessionVersion = 0 // Session object, backed by encrypted filestore type Session struct { @@ -255,17 +256,17 @@ func (s *Session) Partition() *partition.Store { // Get an object from the session func (s *Session) Get(key string) (*versioned.Object, error) { - return s.kv.Get(key) + return s.kv.Get(key, currentSessionVersion) } // Set a value in the session func (s *Session) Set(key string, object *versioned.Object) error { - return s.kv.Set(key, object) + return s.kv.Set(key, currentSessionVersion, object) } // delete a value in the session func (s *Session) Delete(key string) error { - return s.kv.Delete(key) + return s.kv.Delete(key, currentSessionVersion) } // Initializes a Session object wrapped around a MemStore object. diff --git a/storage/user/cryptographic.go b/storage/user/cryptographic.go index b444ea719..c01b6ffa1 100644 --- a/storage/user/cryptographic.go +++ b/storage/user/cryptographic.go @@ -63,7 +63,8 @@ func newCryptographicIdentity(transmissionID, receptionID *id.ID, transmissionSa } func loadCryptographicIdentity(kv *versioned.KV) (*CryptographicIdentity, error) { - obj, err := kv.Get(cryptographicIdentityKey) + obj, err := kv.Get(cryptographicIdentityKey, + currentCryptographicIdentityVersion) if err != nil { return nil, errors.WithMessage(err, "Failed to get user "+ "cryptographic identity from EKV") @@ -115,7 +116,8 @@ func (ci *CryptographicIdentity) save(kv *versioned.KV) error { Data: userDataBuffer.Bytes(), } - return kv.Set(cryptographicIdentityKey, obj) + return kv.Set(cryptographicIdentityKey, + currentCryptographicIdentityVersion, obj) } func (ci *CryptographicIdentity) GetTransmissionID() *id.ID { diff --git a/storage/user/cryptographic_test.go b/storage/user/cryptographic_test.go index b1d80bb46..0d2fab208 100644 --- a/storage/user/cryptographic_test.go +++ b/storage/user/cryptographic_test.go @@ -24,7 +24,7 @@ func TestNewCryptographicIdentity(t *testing.T) { salt := []byte("salt") _ = newCryptographicIdentity(uid, uid, salt, salt, &rsa.PrivateKey{}, &rsa.PrivateKey{}, false, kv) - _, err := kv.Get(cryptographicIdentityKey) + _, err := kv.Get(cryptographicIdentityKey, 0) if err != nil { t.Errorf("Did not store cryptographic identity") } diff --git a/storage/user/regValidationSig.go b/storage/user/regValidationSig.go index a64b33d4d..4b5e09957 100644 --- a/storage/user/regValidationSig.go +++ b/storage/user/regValidationSig.go @@ -36,7 +36,8 @@ func (u *User) GetReceptionRegistrationValidationSignature() []byte { // Loads the transmission Identity Validation Signature if it exists in the ekv func (u *User) loadTransmissionRegistrationValidationSignature() { u.rvsMux.Lock() - obj, err := u.kv.Get(transmissionRegValidationSigKey) + obj, err := u.kv.Get(transmissionRegValidationSigKey, + currentRegValidationSigVersion) if err == nil { u.transmissionRegValidationSig = obj.Data } @@ -46,7 +47,8 @@ func (u *User) loadTransmissionRegistrationValidationSignature() { // Loads the reception Identity Validation Signature if it exists in the ekv func (u *User) loadReceptionRegistrationValidationSignature() { u.rvsMux.Lock() - obj, err := u.kv.Get(receptionRegValidationSigKey) + obj, err := u.kv.Get(receptionRegValidationSigKey, + currentRegValidationSigVersion) if err == nil { u.receptionRegValidationSig = obj.Data } @@ -70,7 +72,8 @@ func (u *User) SetTransmissionRegistrationValidationSignature(b []byte) { Data: b, } - err := u.kv.Set(transmissionRegValidationSigKey, obj) + err := u.kv.Set(transmissionRegValidationSigKey, + currentRegValidationSigVersion, obj) if err != nil { jww.FATAL.Panicf("Failed to store the transmission Identity Validation "+ "Signature: %s", err) @@ -96,7 +99,8 @@ func (u *User) SetReceptionRegistrationValidationSignature(b []byte) { Data: b, } - err := u.kv.Set(receptionRegValidationSigKey, obj) + err := u.kv.Set(receptionRegValidationSigKey, + currentRegValidationSigVersion, obj) if err != nil { jww.FATAL.Panicf("Failed to store the reception Identity Validation "+ "Signature: %s", err) diff --git a/storage/user/regValidationSig_test.go b/storage/user/regValidationSig_test.go index 84cde96f5..c824ad889 100644 --- a/storage/user/regValidationSig_test.go +++ b/storage/user/regValidationSig_test.go @@ -69,7 +69,7 @@ func TestUser_SetRegistrationValidationSignature(t *testing.T) { sig, u.transmissionRegValidationSig) } - obj, err := u.kv.Get(transmissionRegValidationSigKey) + obj, err := u.kv.Get(transmissionRegValidationSigKey, 0) if err != nil { t.Errorf("Failed to get reg vaildation signature key: %+v", err) } @@ -85,7 +85,7 @@ func TestUser_SetRegistrationValidationSignature(t *testing.T) { sig, u.receptionRegValidationSig) } - obj, err = u.kv.Get(receptionRegValidationSigKey) + obj, err = u.kv.Get(receptionRegValidationSigKey, 0) if err != nil { t.Errorf("Failed to get reg vaildation signature key: %+v", err) } @@ -106,11 +106,12 @@ func TestUser_loadRegistrationValidationSignature(t *testing.T) { } sig := []byte("transmissionsignature") - err = kv.Set(transmissionRegValidationSigKey, &versioned.Object{ - Version: currentRegValidationSigVersion, - Timestamp: time.Now(), - Data: sig, - }) + err = kv.Set(transmissionRegValidationSigKey, + currentRegValidationSigVersion, &versioned.Object{ + Version: currentRegValidationSigVersion, + Timestamp: time.Now(), + Data: sig, + }) if err != nil { t.Errorf("Failed to set reg validation sig key in kv store: %+v", err) } @@ -121,11 +122,12 @@ func TestUser_loadRegistrationValidationSignature(t *testing.T) { } sig = []byte("receptionsignature") - err = kv.Set(receptionRegValidationSigKey, &versioned.Object{ - Version: currentRegValidationSigVersion, - Timestamp: time.Now(), - Data: sig, - }) + err = kv.Set(receptionRegValidationSigKey, + currentRegValidationSigVersion, &versioned.Object{ + Version: currentRegValidationSigVersion, + Timestamp: time.Now(), + Data: sig, + }) if err != nil { t.Errorf("Failed to set reg validation sig key in kv store: %+v", err) } diff --git a/storage/user/username.go b/storage/user/username.go index 3e6862c39..268025e2e 100644 --- a/storage/user/username.go +++ b/storage/user/username.go @@ -19,7 +19,7 @@ const usernameKey = "username" func (u *User) loadUsername() { u.usernameMux.Lock() - obj, err := u.kv.Get(usernameKey) + obj, err := u.kv.Get(usernameKey, currentUsernameVersion) if err == nil { u.username = string(obj.Data) } @@ -39,7 +39,7 @@ func (u *User) SetUsername(username string) error { Data: []byte(username), } - err := u.kv.Set(usernameKey, obj) + err := u.kv.Set(usernameKey, currentUsernameVersion, obj) if err != nil { jww.FATAL.Panicf("Failed to store the username: %s", err) } diff --git a/storage/user/username_test.go b/storage/user/username_test.go index 486234153..1da1ce0eb 100644 --- a/storage/user/username_test.go +++ b/storage/user/username_test.go @@ -40,7 +40,7 @@ func TestUser_SetUsername(t *testing.T) { t.Error("Did not error when attempting to set a new username") } - o, err := u.kv.Get(usernameKey) + o, err := u.kv.Get(usernameKey, 0) if err != nil { t.Errorf("Didn't get username from user kv store: %+v", err) } @@ -92,7 +92,7 @@ func TestUser_loadUsername(t *testing.T) { u1 := "zezima" - err = u.kv.Set(usernameKey, &versioned.Object{ + err = u.kv.Set(usernameKey, currentUsernameVersion, &versioned.Object{ Version: currentUsernameVersion, Timestamp: time.Now(), Data: []byte(u1), diff --git a/storage/utility/NDF.go b/storage/utility/NDF.go index c7d3800de..728ec90c9 100644 --- a/storage/utility/NDF.go +++ b/storage/utility/NDF.go @@ -16,7 +16,7 @@ import ( const currentNDFVersion = 0 func LoadNDF(kv *versioned.KV, key string) (*ndf.NetworkDefinition, error) { - vo, err := kv.Get(key) + vo, err := kv.Get(key, currentNDFVersion) if err != nil { return nil, err } @@ -43,5 +43,5 @@ func SaveNDF(kv *versioned.KV, key string, ndf *ndf.NetworkDefinition) error { Data: marshaled, } - return kv.Set(key, &obj) + return kv.Set(key, currentNDFVersion, &obj) } diff --git a/storage/utility/cmixMessageBuffer.go b/storage/utility/cmixMessageBuffer.go index ce53a60f5..367d4fd53 100644 --- a/storage/utility/cmixMessageBuffer.go +++ b/storage/utility/cmixMessageBuffer.go @@ -50,7 +50,7 @@ func (cmh *cmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key } // Save versioned object - return kv.Set(key, &obj) + return kv.Set(key, currentCmixMessageVersion, &obj) } // LoadMessage returns the message with the specified key from the key value @@ -58,7 +58,7 @@ func (cmh *cmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key // retrieved. func (cmh *cmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { // Load the versioned object - vo, err := kv.Get(key) + vo, err := kv.Get(key, currentCmixMessageVersion) if err != nil { return format.Message{}, err } @@ -75,7 +75,7 @@ func (cmh *cmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interf // DeleteMessage deletes the message with the specified key from the key value // store. func (cmh *cmixMessageHandler) DeleteMessage(kv *versioned.KV, key string) error { - return kv.Delete(key) + return kv.Delete(key, currentCmixMessageVersion) } // HashMessage generates a hash of the message. diff --git a/storage/utility/cmixMessageBuffer_test.go b/storage/utility/cmixMessageBuffer_test.go index 9e42103dd..f884d73d7 100644 --- a/storage/utility/cmixMessageBuffer_test.go +++ b/storage/utility/cmixMessageBuffer_test.go @@ -41,7 +41,7 @@ func TestCmixMessageHandler_SaveMessage(t *testing.T) { } // Try to get message - obj, err := kv.Get(key) + obj, err := kv.Get(key, 0) if err != nil { t.Errorf("Get() returned an error: %v", err) } diff --git a/storage/utility/contact.go b/storage/utility/contact.go index a9174efd1..cef79e2ff 100644 --- a/storage/utility/contact.go +++ b/storage/utility/contact.go @@ -26,11 +26,11 @@ func StoreContact(kv *versioned.KV, c contact.Contact) error { Data: c.Marshal(), } - return kv.Set(makeContactKey(c.ID), &obj) + return kv.Set(makeContactKey(c.ID), currentContactVersion, &obj) } func LoadContact(kv *versioned.KV, cid *id.ID) (contact.Contact, error) { - vo, err := kv.Get(makeContactKey(cid)) + vo, err := kv.Get(makeContactKey(cid), currentContactVersion) if err != nil { return contact.Contact{}, err } @@ -39,7 +39,7 @@ func LoadContact(kv *versioned.KV, cid *id.ID) (contact.Contact, error) { } func DeleteContact(kv *versioned.KV, cid *id.ID) error { - return kv.Delete(makeContactKey(cid)) + return kv.Delete(makeContactKey(cid), currentContactVersion) } func makeContactKey(cid *id.ID) string { diff --git a/storage/utility/dh.go b/storage/utility/dh.go index 2a840489d..ea2f6296d 100644 --- a/storage/utility/dh.go +++ b/storage/utility/dh.go @@ -29,11 +29,11 @@ func StoreCyclicKey(kv *versioned.KV, cy *cyclic.Int, key string) error { Data: data, } - return kv.Set(key, &obj) + return kv.Set(key, currentCyclicVersion, &obj) } func LoadCyclicKey(kv *versioned.KV, key string) (*cyclic.Int, error) { - vo, err := kv.Get(key) + vo, err := kv.Get(key, currentCyclicVersion) if err != nil { return nil, err } diff --git a/storage/utility/e2eMessageBuffer.go b/storage/utility/e2eMessageBuffer.go index 975f2f21a..9f0d3f417 100644 --- a/storage/utility/e2eMessageBuffer.go +++ b/storage/utility/e2eMessageBuffer.go @@ -49,7 +49,7 @@ func (emh *e2eMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key s } // Save versioned object - return kv.Set(key, &obj) + return kv.Set(key, currentE2EMessageVersion, &obj) } // LoadMessage returns the e2eMessage with the specified key from the key value @@ -57,7 +57,7 @@ func (emh *e2eMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key s // retrieved. func (emh *e2eMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { // Load the versioned object - vo, err := kv.Get(key) + vo, err := kv.Get(key, currentE2EMessageVersion) if err != nil { return nil, err } @@ -74,7 +74,7 @@ func (emh *e2eMessageHandler) LoadMessage(kv *versioned.KV, key string) (interfa // DeleteMessage deletes the message with the specified key from the key value // store. func (emh *e2eMessageHandler) DeleteMessage(kv *versioned.KV, key string) error { - return kv.Delete(key) + return kv.Delete(key, currentE2EMessageVersion) } // HashMessage generates a hash of the e2eMessage. diff --git a/storage/utility/e2eMessageBuffer_test.go b/storage/utility/e2eMessageBuffer_test.go index 7232ca0a1..aa7366ced 100644 --- a/storage/utility/e2eMessageBuffer_test.go +++ b/storage/utility/e2eMessageBuffer_test.go @@ -38,7 +38,7 @@ func TestE2EMessageHandler_SaveMessage(t *testing.T) { } // Try to get message - obj, err := kv.Get(key) + obj, err := kv.Get(key, 0) if err != nil { t.Errorf("Get() returned an error: %v", err) } diff --git a/storage/utility/group.go b/storage/utility/group.go index fbe18db68..20a05e837 100644 --- a/storage/utility/group.go +++ b/storage/utility/group.go @@ -29,11 +29,11 @@ func StoreGroup(kv *versioned.KV, grp *cyclic.Group, key string) error { Data: data, } - return kv.Set(key, &obj) + return kv.Set(key, currentE2EMessageVersion, &obj) } func LoadGroup(kv *versioned.KV, key string) (*cyclic.Group, error) { - vo, err := kv.Get(key) + vo, err := kv.Get(key, currentE2EMessageVersion) if err != nil { return nil, err } diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go index bbfc1727d..59743c855 100644 --- a/storage/utility/messageBuffer.go +++ b/storage/utility/messageBuffer.go @@ -124,7 +124,7 @@ func (mb *MessageBuffer) save() error { } // Save versioned object - return mb.kv.Set(mb.key, &obj) + return mb.kv.Set(mb.key, currentMessageBufferVersion, &obj) } // getMessageList returns a list of all message hashes stored in messages and @@ -154,7 +154,7 @@ func (mb *MessageBuffer) getMessageList() []MessageHash { func (mb *MessageBuffer) load() error { // Load the versioned object - vo, err := mb.kv.Get(mb.key) + vo, err := mb.kv.Get(mb.key, currentMessageBufferVersion) if err != nil { return err } diff --git a/storage/utility/messageBuffer_test.go b/storage/utility/messageBuffer_test.go index 6aff00a4c..f331078fd 100644 --- a/storage/utility/messageBuffer_test.go +++ b/storage/utility/messageBuffer_test.go @@ -133,7 +133,7 @@ func TestMessageBuffer_save_NewMB(t *testing.T) { t.Errorf("save() returned an error."+ "\n\texpected: %v\n\treceived: %v", nil, err) } - obj, err := kv.Get(key) + obj, err := kv.Get(key, 0) if err != nil { t.Errorf("save() did not correctly save buffer with key %+v to storage."+ "\n\terror: %v", key, err) @@ -164,7 +164,7 @@ func TestMessageBuffer_save(t *testing.T) { t.Errorf("save() returned an error."+ "\n\texpected: %v\n\treceived: %v", nil, err) } - obj, err := kv.Get(key) + obj, err := kv.Get(key, 0) if err != nil { t.Errorf("save() did not correctly save buffer with key %+v to storage."+ "\n\terror: %v", key, err) diff --git a/storage/utility/meteredCmixMessageBuffer.go b/storage/utility/meteredCmixMessageBuffer.go index 584f6e225..e4b7ab4fb 100644 --- a/storage/utility/meteredCmixMessageBuffer.go +++ b/storage/utility/meteredCmixMessageBuffer.go @@ -45,7 +45,7 @@ func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, k } // Save versioned object - return kv.Set(key, &obj) + return kv.Set(key, currentMessageBufferVersion, &obj) } // LoadMessage returns the message with the specified key from the key value @@ -53,7 +53,7 @@ func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, k // retrieved. func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { // Load the versioned object - vo, err := kv.Get(key) + vo, err := kv.Get(key, currentMeteredCmixMessageVersion) if err != nil { return format.Message{}, err } @@ -71,7 +71,7 @@ func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (int // DeleteMessage deletes the message with the specified key from the key value // store. func (*meteredCmixMessageHandler) DeleteMessage(kv *versioned.KV, key string) error { - return kv.Delete(key) + return kv.Delete(key, currentMeteredCmixMessageVersion) } // HashMessage generates a hash of the message. diff --git a/storage/utility/meteredCmixMessageBuffer_test.go b/storage/utility/meteredCmixMessageBuffer_test.go index d21f3ca6d..bd2edcd25 100644 --- a/storage/utility/meteredCmixMessageBuffer_test.go +++ b/storage/utility/meteredCmixMessageBuffer_test.go @@ -36,7 +36,7 @@ func Test_meteredCmixMessageHandler_SaveMessage(t *testing.T) { } // Try to get message - obj, err := kv.Get(key) + obj, err := kv.Get(key, 0) if err != nil { t.Errorf("Get() returned an error: %v", err) } @@ -110,7 +110,7 @@ func Test_meteredCmixMessageHandler_DeleteMessage(t *testing.T) { } // Try to get message - _, err = kv.Get(key) + _, err = kv.Get(key, 0) if err == nil { t.Error("Get() did not return an error.") } diff --git a/storage/versioned/kv.go b/storage/versioned/kv.go index d2c0f5061..9861f510d 100644 --- a/storage/versioned/kv.go +++ b/storage/versioned/kv.go @@ -17,12 +17,6 @@ import ( const PrefixSeparator = "/" -// MakeKeyWithPrefix creates a key for a type of data with a unique -// identifier using a globally defined separator character. -func MakeKeyWithPrefix(dataType string, uniqueID string) string { - return fmt.Sprintf("%s%s%s", dataType, PrefixSeparator, uniqueID) -} - // MakePartnerPrefix creates a string prefix // to denote who a conversation or relationship is with func MakePartnerPrefix(id *id.ID) string { @@ -33,9 +27,8 @@ func MakePartnerPrefix(id *id.ID) string { type Upgrade func(oldObject *Object) (*Object, error) - type root struct { - data ekv.KeyValue + data ekv.KeyValue } // KV stores versioned data and Upgrade functions @@ -70,9 +63,9 @@ func (v *KV) Get(key string, version uint64) (*Object, error) { return &result, nil } -type UpgradeTable struct{ +type UpgradeTable struct { CurrentVersion uint64 - Table []Upgrade + Table []Upgrade } // Get gets and upgrades data stored in the key/value store @@ -81,39 +74,42 @@ func (v *KV) GetUpgrade(key string, ut UpgradeTable) (*Object, error) { version := ut.CurrentVersion key = v.makeKey(key, version) - if uint64(len(ut.Table))!=version{ - jww.FATAL.Panicf("Cannot get upgrade for %s: table lengh (%d) " + + if uint64(len(ut.Table)) != version { + jww.FATAL.Panicf("Cannot get upgrade for %s: table lengh (%d) "+ "does not match current version (%d)", key, len(ut.Table), version) } var result *Object - for ;version>=0;version--{ - + // NOTE: Upgrades do not happen on the current version, so we check to + // see if version-1, version-2, and so on exist to find out if an + // earlier version of this object exists. + for version != 0 { + version-- key = v.makeKey(key, version) jww.TRACE.Printf("Get %p with key %v", v.r.data, key) // Get raw data result = &Object{} err := v.r.data.Get(key, result) - if err != nil { - jww.WARN.Printf("Failed to get keyvalue %s: %s", key, err) - }else{ + // Break when we find the *newest* version of the object + // in the data store. + if err == nil { break } } - if version < 0{ + if version < 0 { return nil, errors.Errorf("Failed to get key and upgrade it for %s", v.makeKey(key, ut.CurrentVersion)) } var err error initialVersion := result.Version - for result.Version<uint64(len(ut.Table)){ + for result.Version < uint64(len(ut.Table)) { oldVersion := result.Version result, err = ut.Table[oldVersion](result) - if err!=nil{ - jww.FATAL.Panicf("failed to upgrade key %s from " + - "version %v, initla version %v", key, oldVersion, + if err != nil || oldVersion == result.Version { + jww.FATAL.Panicf("failed to upgrade key %s from "+ + "version %v, initla version %v", key, oldVersion, initialVersion) } } @@ -121,7 +117,6 @@ func (v *KV) GetUpgrade(key string, ut UpgradeTable) (*Object, error) { return result, nil } - // delete removes a given key from the data store func (v *KV) Delete(key string, version uint64) error { key = v.makeKey(key, version) @@ -152,6 +147,6 @@ func (v *KV) GetFullKey(key string, version uint64) string { return v.makeKey(key, version) } -func (v *KV)makeKey(key string, version uint64)string{ +func (v *KV) makeKey(key string, version uint64) string { return fmt.Sprintf("%s%s_%d", v.prefix, key, version) -} \ No newline at end of file +} diff --git a/storage/versioned/kv_test.go b/storage/versioned/kv_test.go index d311109b6..2b6bb8e46 100644 --- a/storage/versioned/kv_test.go +++ b/storage/versioned/kv_test.go @@ -20,8 +20,8 @@ import ( func TestVersionedKV_Get_Err(t *testing.T) { kv := make(ekv.Memstore) vkv := NewKV(kv) - key := MakeKeyWithPrefix("test", "12345") - result, err := vkv.Get(key) + key := vkv.GetFullKey("test", 0) + result, err := vkv.Get(key, 0) if err == nil { t.Error("Getting a key that didn't exist should have" + " returned an error") @@ -37,7 +37,7 @@ func TestVersionedKV_GetUpgrade(t *testing.T) { // Set up a dummy KV with the required data kv := make(ekv.Memstore) vkv := NewKV(kv) - key := MakeKeyWithPrefix("test", "12345") + key := vkv.GetFullKey("test", 0) original := Object{ Version: 0, Timestamp: time.Now(), @@ -54,7 +54,8 @@ func TestVersionedKV_GetUpgrade(t *testing.T) { }, nil }} - result, err := vkv.GetUpgrade(key, upgrade) + result, err := vkv.GetUpgrade(key, UpgradeTable{CurrentVersion: 1, + Table: upgrade}) if err != nil { t.Fatalf("Error getting something that should have been in: %v", err) @@ -71,7 +72,7 @@ func TestVersionedKV_GetUpgrade_KeyNotFound(t *testing.T) { // Set up a dummy KV with the required data kv := make(ekv.Memstore) vkv := NewKV(kv) - key := MakeKeyWithPrefix("test", "12345") + key := "test" upgrade := []Upgrade{func(oldObject *Object) (*Object, error) { return &Object{ @@ -81,8 +82,9 @@ func TestVersionedKV_GetUpgrade_KeyNotFound(t *testing.T) { }, nil }} - _, err := vkv.GetUpgrade(key, upgrade) - if err == nil { + _, err := vkv.GetUpgrade(key, UpgradeTable{CurrentVersion: 1, + Table: upgrade}) + if err != nil { t.Fatalf("Error getting something that should have been in: %v", err) } @@ -93,7 +95,7 @@ func TestVersionedKV_GetUpgrade_UpgradeReturnsError(t *testing.T) { // Set up a dummy KV with the required data kv := make(ekv.Memstore) vkv := NewKV(kv) - key := MakeKeyWithPrefix("test", "12345") + key := vkv.GetFullKey("test", 0) original := Object{ Version: 0, Timestamp: time.Now(), @@ -107,12 +109,13 @@ func TestVersionedKV_GetUpgrade_UpgradeReturnsError(t *testing.T) { }} defer func() { - if r := recover(); r == nil { - t.Errorf("The code did not panic") - } - }() + if r := recover(); r == nil { + t.Errorf("The code did not panic") + } + }() - _, _ = vkv.GetUpgrade(key, upgrade) + _, _ = vkv.GetUpgrade("test", UpgradeTable{CurrentVersion: 1, + Table: upgrade}) } // Test delete key happy path @@ -120,7 +123,7 @@ func TestVersionedKV_Delete(t *testing.T) { // Set up a dummy KV with the required data kv := make(ekv.Memstore) vkv := NewKV(kv) - key := MakeKeyWithPrefix("test", "12345") + key := vkv.GetFullKey("test", 0) original := Object{ Version: 0, Timestamp: time.Now(), @@ -129,7 +132,7 @@ func TestVersionedKV_Delete(t *testing.T) { originalSerialized := original.Marshal() kv[key] = originalSerialized - err := vkv.Delete(key) + err := vkv.Delete("test", 0) if err != nil { t.Fatalf("Error getting something that should have been in: %v", err) @@ -145,8 +148,8 @@ func TestVersionedKV_Get(t *testing.T) { // Set up a dummy KV with the required data kv := make(ekv.Memstore) vkv := NewKV(kv) - originalVersion := uint64(1) - key := MakeKeyWithPrefix("test", "12345") + originalVersion := uint64(0) + key := vkv.GetFullKey("test", originalVersion) original := Object{ Version: originalVersion, Timestamp: time.Now(), @@ -155,7 +158,7 @@ func TestVersionedKV_Get(t *testing.T) { originalSerialized := original.Marshal() kv[key] = originalSerialized - result, err := vkv.Get(key) + result, err := vkv.Get("test", originalVersion) if err != nil { t.Fatalf("Error getting something that should have been in: %v", err) @@ -171,13 +174,13 @@ func TestVersionedKV_Set(t *testing.T) { kv := make(ekv.Memstore) vkv := NewKV(kv) originalVersion := uint64(1) - key := MakeKeyWithPrefix("test", "12345") + key := vkv.GetFullKey("test", originalVersion) original := Object{ Version: originalVersion, Timestamp: time.Now(), Data: []byte("not upgraded"), } - err := vkv.Set(key, &original) + err := vkv.Set("test", originalVersion, &original) if err != nil { t.Fatal(err) } -- GitLab