From 4b9428977b63f0038a23c72452a77ce91346a331 Mon Sep 17 00:00:00 2001 From: Jono Wenger <jono@elixxir.io> Date: Fri, 23 Apr 2021 20:39:17 +0000 Subject: [PATCH] XX-3187 / Store backed checked rounds --- network/checkedRounds.go | 74 ----- network/follow.go | 29 +- network/manager.go | 4 - network/rounds/check.go | 8 +- storage/reception/IdentityUse.go | 1 + storage/reception/fake_test.go | 6 +- storage/reception/registration.go | 21 ++ storage/reception/store.go | 1 + storage/rounds/checkedRounds.go | 164 +++++++++++ storage/rounds/checkedRounds_test.go | 217 ++++++++++++++ storage/utility/blockStore.go | 274 ++++++++++++++++++ storage/utility/blockStore_test.go | 414 +++++++++++++++++++++++++++ 12 files changed, 1114 insertions(+), 99 deletions(-) delete mode 100644 network/checkedRounds.go create mode 100644 storage/rounds/checkedRounds.go create mode 100644 storage/rounds/checkedRounds_test.go create mode 100644 storage/utility/blockStore.go create mode 100644 storage/utility/blockStore_test.go diff --git a/network/checkedRounds.go b/network/checkedRounds.go deleted file mode 100644 index d104ac958..000000000 --- a/network/checkedRounds.go +++ /dev/null @@ -1,74 +0,0 @@ -package network - -import ( - "container/list" - "crypto/md5" - "gitlab.com/elixxir/client/storage/reception" - "gitlab.com/xx_network/primitives/id" -) - -type idFingerprint [16]byte - -type checkedRounds struct { - lookup map[idFingerprint]*checklist -} - -type checklist struct { - m map[id.Round]interface{} - l *list.List -} - -func newCheckedRounds() *checkedRounds { - return &checkedRounds{ - lookup: make(map[idFingerprint]*checklist), - } -} - -func (cr *checkedRounds) Check(identity reception.IdentityUse, rid id.Round) bool { - idFp := getIdFingerprint(identity) - cl, exists := cr.lookup[idFp] - if !exists { - cl = &checklist{ - m: make(map[id.Round]interface{}), - l: list.New().Init(), - } - cr.lookup[idFp] = cl - } - - if _, exists := cl.m[rid]; !exists { - cl.m[rid] = nil - cl.l.PushBack(rid) - return true - } - return false -} - -func (cr *checkedRounds) Prune(identity reception.IdentityUse, earliestAllowed id.Round) { - idFp := getIdFingerprint(identity) - cl, exists := cr.lookup[idFp] - if !exists { - return - } - - e := cl.l.Front() - for e != nil { - if e.Value.(id.Round) < earliestAllowed { - delete(cl.m, e.Value.(id.Round)) - lastE := e - e = e.Next() - cl.l.Remove(lastE) - } else { - break - } - } -} - -func getIdFingerprint(identity reception.IdentityUse) idFingerprint { - h := md5.New() - h.Write(identity.EphId[:]) - h.Write(identity.Source[:]) - - fp := idFingerprint{} - copy(fp[:], h.Sum(nil)) - return fp -} diff --git a/network/follow.go b/network/follow.go index 698c7c5df..e761d2aa5 100644 --- a/network/follow.go +++ b/network/follow.go @@ -61,7 +61,7 @@ func (m *manager) followNetwork(report interfaces.ClientErrorReport, quitCh <-ch m.follow(report, rng, m.Comms) case <-TrackTicker.C: numPolls := atomic.SwapUint64(m.tracker, 0) - jww.INFO.Printf("Polled the network %d times in the " + + jww.INFO.Printf("Polled the network %d times in the "+ "last %s", numPolls, debugTrackPeriod) } } @@ -232,7 +232,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, // are messages waiting in rounds and then sends signals to the appropriate // handling threads roundChecker := func(rid id.Round) bool { - return rounds.Checker(rid, filterList) + return rounds.Checker(rid, filterList, identity.CR) } // move the earliest unknown round tracker forward to the earliest @@ -254,30 +254,25 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, roundsWithMessages2 := identity.UR.Iterate(func(rid id.Round) bool { if gwRoundsState.Checked(rid) { - return rounds.Checker(rid, filterList) + return rounds.Checker(rid, filterList, identity.CR) } return false }, roundsUnknown) for _, rid := range roundsWithMessages { - if m.checked.Check(identity, rid) { + if identity.CR.Check(rid) { m.round.GetMessagesFromRound(rid, identity) } } - for _, rid := range roundsWithMessages2 { - m.round.GetMessagesFromRound(rid, identity) - } - - earliestToKeep := getEarliestToKeep(m.param.KnownRoundsThreshold, - gwRoundsState.GetLastChecked()) - - m.checked.Prune(identity, earliestToKeep) -} + identity.CR.Prune() + err = identity.CR.SaveCheckedRounds() + if err != nil { + jww.ERROR.Printf("Could not save rounds for identity %d (%s): %+v", + identity.EphId.Int64(), identity.Source, err) + } -func getEarliestToKeep(delta uint, lastchecked id.Round) id.Round { - if uint(lastchecked) < delta { - return 0 + for _, rid := range roundsWithMessages2 { + m.round.GetMessagesFromRound(rid, identity) } - return lastchecked - id.Round(delta) } diff --git a/network/manager.go b/network/manager.go index 89cf3a3a8..31adb4447 100644 --- a/network/manager.go +++ b/network/manager.go @@ -48,9 +48,6 @@ type manager struct { //number of polls done in a period of time tracker *uint64 - - //tracks already checked rounds - checked *checkedRounds } // NewManager builds a new reception manager object using inputted key fields @@ -76,7 +73,6 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, m := manager{ param: params, tracker: &tracker, - checked: newCheckedRounds(), } m.Internal = internal.Internal{ diff --git a/network/rounds/check.go b/network/rounds/check.go index a22bafd8e..03cd83718 100644 --- a/network/rounds/check.go +++ b/network/rounds/check.go @@ -11,6 +11,7 @@ import ( "encoding/binary" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/storage/reception" + "gitlab.com/elixxir/client/storage/rounds" "gitlab.com/xx_network/primitives/id" ) @@ -27,7 +28,12 @@ import ( // Retrieval // false: no message // true: message -func Checker(roundID id.Round, filters []*RemoteFilter) bool { +func Checker(roundID id.Round, filters []*RemoteFilter, cr *rounds.CheckedRounds) bool { + // Skip checking if the round is already checked + if cr.IsChecked(roundID) { + return true + } + //find filters that could have the round and check them serialRid := serializeRound(roundID) for _, filter := range filters { diff --git a/storage/reception/IdentityUse.go b/storage/reception/IdentityUse.go index 93fc6b8d6..2c8b9df3b 100644 --- a/storage/reception/IdentityUse.go +++ b/storage/reception/IdentityUse.go @@ -22,6 +22,7 @@ type IdentityUse struct { UR *rounds.UnknownRounds ER *rounds.EarliestRound + CR *rounds.CheckedRounds } // setSamplingPeriod add the Request mask as a random buffer around the sampling diff --git a/storage/reception/fake_test.go b/storage/reception/fake_test.go index 35ff0e172..2c748ac22 100644 --- a/storage/reception/fake_test.go +++ b/storage/reception/fake_test.go @@ -24,7 +24,7 @@ func Test_generateFakeIdentity(t *testing.T) { "\"EndValid\":" + string(endValid) + "," + "\"RequestMask\":86400000000000,\"Ephemeral\":true," + "\"StartRequest\":\"0001-01-01T00:00:00Z\"," + - "\"EndRequest\":\"0001-01-01T00:00:00Z\",\"Fake\":true,\"UR\":null,\"ER\":null}" + "\"EndRequest\":\"0001-01-01T00:00:00Z\",\"Fake\":true,\"UR\":null,\"ER\":null,\"CR\":null}" timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC) @@ -36,8 +36,8 @@ func Test_generateFakeIdentity(t *testing.T) { receivedJson, _ := json.Marshal(received) if expected != string(receivedJson) { - t.Errorf("The fake identity was generated incorrectly.\n "+ - "expected: %s\nreceived: %s", expected, receivedJson) + t.Errorf("The fake identity was generated incorrectly."+ + "\nexpected: %s\nreceived: %s", expected, receivedJson) } } diff --git a/storage/reception/registration.go b/storage/reception/registration.go index 70647129c..0535f0923 100644 --- a/storage/reception/registration.go +++ b/storage/reception/registration.go @@ -2,6 +2,8 @@ package reception import ( "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/storage/rounds" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/xx_network/primitives/id" @@ -17,6 +19,7 @@ type registration struct { Identity UR *rounds.UnknownRounds ER *rounds.EarliestRound + CR *rounds.CheckedRounds kv *versioned.KV } @@ -46,6 +49,11 @@ func newRegistration(reg Identity, kv *versioned.KV) (*registration, error) { urParams.Stored = !reg.Ephemeral r.UR = rounds.NewUnknownRounds(kv, urParams) r.ER = rounds.NewEarliestRound(!reg.Ephemeral, kv) + cr, err := rounds.NewCheckedRounds(int(params.GetDefaultNetwork().KnownRoundsThreshold), kv) + if err != nil { + jww.FATAL.Printf("Failed to create new CheckedRounds for registration: %+v", err) + } + r.CR = cr // If this is not ephemeral, then store everything if !reg.Ephemeral { @@ -71,11 +79,24 @@ func loadRegistration(EphId ephemeral.Id, Source *id.ID, startValid time.Time, "for %s", regPrefix(EphId, Source, startValid)) } + cr, err := rounds.LoadCheckedRounds(int(params.GetDefaultNetwork().KnownRoundsThreshold), kv) + if err != nil { + jww.ERROR.Printf("Making new CheckedRounds, loading of CheckedRounds "+ + "failed: %+v", err) + + cr, err = rounds.NewCheckedRounds(int(params.GetDefaultNetwork().KnownRoundsThreshold), kv) + if err != nil { + jww.FATAL.Printf("Failed to create new CheckedRounds for "+ + "registration after CheckedRounds load failure: %+v", err) + } + } + r := ®istration{ Identity: reg, kv: kv, UR: rounds.LoadUnknownRounds(kv, rounds.DefaultUnknownRoundsParams()), ER: rounds.LoadEarliestRound(kv), + CR: cr, } return r, nil diff --git a/storage/reception/store.go b/storage/reception/store.go index f87ee416a..bd78f0b76 100644 --- a/storage/reception/store.go +++ b/storage/reception/store.go @@ -382,5 +382,6 @@ func (s *Store) selectIdentity(rng io.Reader, now time.Time) (IdentityUse, error Fake: false, UR: selected.UR, ER: selected.ER, + CR: selected.CR, }, nil } diff --git a/storage/rounds/checkedRounds.go b/storage/rounds/checkedRounds.go new file mode 100644 index 000000000..a7e3ebdec --- /dev/null +++ b/storage/rounds/checkedRounds.go @@ -0,0 +1,164 @@ +package rounds + +import ( + "container/list" + "encoding/binary" + "github.com/pkg/errors" + "gitlab.com/elixxir/client/storage/utility" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/xx_network/primitives/id" +) + +const itemsPerBlock = 50 + +type CheckedRounds struct { + // Map of round IDs for quick lookup if an ID is stored + m map[id.Round]interface{} + + // List of round IDs in order of age; oldest in front and newest in back + l *list.List + + // List of recently added round IDs that need to be stored + recent []id.Round + + // Saves round IDs in blocks to storage + store *utility.BlockStore + + // The maximum number of round IDs to store before pruning the oldest + maxRounds int +} + +// NewCheckedRounds returns a new CheckedRounds with an initialized map. +func NewCheckedRounds(maxRounds int, kv *versioned.KV) (*CheckedRounds, error) { + // Calculate the number of blocks of size itemsPerBlock are needed to store + // numRoundsToKeep number of round IDs + numBlocks := maxRounds / itemsPerBlock + if maxRounds%itemsPerBlock != 0 { + numBlocks++ + } + + // Create a new BlockStore for storing the round IDs to storage + store, err := utility.NewBlockStore(itemsPerBlock, numBlocks, kv) + if err != nil { + return nil, errors.Errorf("failed to save new checked rounds to storage: %+v", err) + } + + // Create new CheckedRounds + return newCheckedRounds(maxRounds, store), nil +} + +// newCheckedRounds initialises the lists in CheckedRounds. +func newCheckedRounds(maxRounds int, store *utility.BlockStore) *CheckedRounds { + return &CheckedRounds{ + m: make(map[id.Round]interface{}), + l: list.New(), + recent: []id.Round{}, + store: store, + maxRounds: maxRounds, + } +} + +// LoadCheckedRounds restores the list from storage. +func LoadCheckedRounds(maxRounds int, kv *versioned.KV) (*CheckedRounds, error) { + // Get rounds from storage + store, rounds, err := utility.LoadBlockStore(kv) + if err != nil { + return nil, errors.Errorf("failed to load CheckedRounds from storage: %+v", err) + } + + // Create new CheckedRounds + cr := newCheckedRounds(maxRounds, store) + + // Unmarshal round ID byte list into the new CheckedRounds + cr.unmarshal(rounds) + + return cr, nil +} + +// SaveCheckedRounds stores the list to storage. +func (cr *CheckedRounds) SaveCheckedRounds() error { + + // Save to disk + err := cr.store.Store(cr) + if err != nil { + return errors.Errorf("failed to store recent CheckedRounds: %+v", err) + } + + // Save to disk + return nil +} + +// Next pops the oldest recent round ID from the list and returns it as bytes. +// Returns false if the list is empty +func (cr *CheckedRounds) Next() ([]byte, bool) { + if len(cr.recent) > 0 { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, uint64(cr.recent[0])) + cr.recent = cr.recent[1:] + return b, true + } + + return nil, false +} + +// Check determines if the round ID has been added to the checklist. If it has +// not, then it is added and the function returns true. Otherwise, if it already +// exists, then the function returns false. +func (cr *CheckedRounds) Check(rid id.Round) bool { + // Add the round ID to the checklist if it does not exist and return true + if _, exists := cr.m[rid]; !exists { + cr.m[rid] = nil // Add ID to the map + cr.l.PushBack(rid) // Add ID to the end of the list + cr.recent = append(cr.recent, rid) + + // The commented out code below works the same as the Prune function but + // occurs when adding a round ID to the list. It was decided to use + // Prune instead so that it does not block even though the savings are + // probably negligible. + // // Remove the oldest round ID the list is full + // if cr.l.Len() > cr.maxRounds { + // oldestID := cr.l.Remove(cr.l.Front()) // Remove oldest from list + // delete(cr.m, oldestID.(id.Round)) // Remove oldest from map + // } + + return true + } + + return false +} + +// IsChecked determines if the round has been added to the checklist. +func (cr *CheckedRounds) IsChecked(rid id.Round) bool { + _, exists := cr.m[rid] + return exists +} + +// Prune any rounds that are earlier than the earliestAllowed. +func (cr *CheckedRounds) Prune() { + if len(cr.m) < cr.maxRounds { + return + } + earliestAllowed := cr.l.Back().Value.(id.Round) - id.Round(cr.maxRounds) + 1 + + // Iterate over all the round IDs and remove any that are too old + for e := cr.l.Front(); e != nil; { + if e.Value.(id.Round) < earliestAllowed { + delete(cr.m, e.Value.(id.Round)) + lastE := e + e = e.Next() + cr.l.Remove(lastE) + } else { + break + } + } +} + +// unmarshal unmarshalls the list of byte slices into the CheckedRounds map and +// list. +func (cr *CheckedRounds) unmarshal(rounds [][]byte) { + for _, round := range rounds { + rid := id.Round(binary.LittleEndian.Uint64(round)) + cr.m[rid] = nil + cr.l.PushBack(rid) + } +} diff --git a/storage/rounds/checkedRounds_test.go b/storage/rounds/checkedRounds_test.go new file mode 100644 index 000000000..816c6fce7 --- /dev/null +++ b/storage/rounds/checkedRounds_test.go @@ -0,0 +1,217 @@ +package rounds + +import ( + "container/list" + "encoding/binary" + "gitlab.com/elixxir/client/storage/utility" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/ekv" + "gitlab.com/xx_network/primitives/id" + "reflect" + "testing" +) + +// Happy path. +func Test_newCheckedRounds(t *testing.T) { + maxRounds := 230 + kv := versioned.NewKV(make(ekv.Memstore)) + + // Create a new BlockStore for storing the round IDs to storage + store, err := utility.NewBlockStore(itemsPerBlock, maxRounds/itemsPerBlock+1, kv) + if err != nil { + t.Errorf("Failed to create new BlockStore: %+v", err) + } + + expected := &CheckedRounds{ + m: make(map[id.Round]interface{}), + l: list.New(), + recent: []id.Round{}, + store: store, + maxRounds: maxRounds, + } + + received, err := NewCheckedRounds(maxRounds, kv) + if err != nil { + t.Errorf("NewCheckedRounds() returned an error: %+v", err) + } + + if !reflect.DeepEqual(expected, received) { + t.Errorf("NewCheckedRounds() did not return the exepcted CheckedRounds."+ + "\nexpected: %+v\nreceived: %+v", expected, received) + } +} + +// Tests that a CheckedRounds that has been saved and loaded from storage +// matches the original. +func TestCheckedRounds_SaveCheckedRounds_TestLoadCheckedRounds(t *testing.T) { + // Create new CheckedRounds and add rounds to it + kv := versioned.NewKV(make(ekv.Memstore)) + cr, err := NewCheckedRounds(50, kv) + if err != nil { + t.Errorf("failed to make new CheckedRounds: %+v", err) + } + for i := id.Round(0); i < 100; i++ { + cr.Check(i) + } + + err = cr.SaveCheckedRounds() + if err != nil { + t.Errorf("SaveCheckedRounds() returned an error: %+v", err) + } + + cr.Prune() + + newCR, err := LoadCheckedRounds(50, kv) + if err != nil { + t.Errorf("LoadCheckedRounds() returned an error: %+v", err) + } + + if !reflect.DeepEqual(cr, newCR) { + t.Errorf("Failed to store and load CheckedRounds."+ + "\nexpected: %+v\nreceived: %+v", cr, newCR) + } +} + +// Happy path. +func TestCheckedRounds_Next(t *testing.T) { + cr := newCheckedRounds(100, nil) + rounds := make([][]byte, 10) + for i := id.Round(0); i < 10; i++ { + cr.Check(i) + } + + for i := id.Round(0); i < 10; i++ { + round, exists := cr.Next() + if !exists { + t.Error("Next() returned false when there should be more IDs.") + } + + rounds[i] = round + } + round, exists := cr.Next() + if exists { + t.Errorf("Next() returned true when the list should be empty: %d", round) + } + + testCR := newCheckedRounds(100, nil) + testCR.unmarshal(rounds) + + if !reflect.DeepEqual(cr, testCR) { + t.Errorf("unmarshal() did not return the expected CheckedRounds."+ + "\nexpected: %+v\nreceived: %+v", cr, testCR) + } +} + +// Happy path. +func Test_checkedRounds_Check(t *testing.T) { + cr := newCheckedRounds(100, nil) + var expected []id.Round + for i := id.Round(1); i < 11; i++ { + if i%2 == 0 { + if !cr.Check(i) { + t.Errorf("Check() returned false when the round ID should have been added (%d).", i) + } + + val := cr.l.Back().Value.(id.Round) + if val != i { + t.Errorf("Check() did not add the round ID to the back of the list."+ + "\nexpected: %d\nreceived: %d", i, val) + } + expected = append(expected, i) + } + } + + if !reflect.DeepEqual(cr.recent, expected) { + t.Errorf("Unexpected list of recent rounds."+ + "\nexpected: %+v\nreceived: %+v", expected, cr.recent) + } + + for i := id.Round(1); i < 11; i++ { + result := cr.Check(i) + if i%2 == 0 { + if result { + t.Errorf("Check() returned true when the round ID should not have been added (%d).", i) + } + } else if !result { + t.Errorf("Check() returned false when the round ID should have been added (%d).", i) + } else { + expected = append(expected, i) + } + } + + if !reflect.DeepEqual(cr.recent, expected) { + t.Errorf("Unexpected list of recent rounds."+ + "\nexpected: %+v\nreceived: %+v", expected, cr.recent) + } +} + +// Happy path. +func TestCheckedRounds_IsChecked(t *testing.T) { + cr := newCheckedRounds(100, nil) + + for i := id.Round(0); i < 100; i += 2 { + cr.Check(i) + } + + for i := id.Round(0); i < 100; i++ { + if i%2 == 0 { + if !cr.IsChecked(i) { + t.Errorf("IsChecked() falsly reported round ID %d as not checked.", i) + } + } else if cr.IsChecked(i) { + t.Errorf("IsChecked() falsly reported round ID %d as checked.", i) + } + } +} + +// Happy path. +func Test_checkedRounds_Prune(t *testing.T) { + cr := newCheckedRounds(5, nil) + for i := id.Round(0); i < 10; i++ { + cr.Check(i) + } + + cr.Prune() + + if len(cr.m) != 5 || cr.l.Len() != 5 { + t.Errorf("Prune() did not remove the correct number of round IDs."+ + "\nexpected: %d\nmap: %d\nlist: %d", 5, + len(cr.m), cr.l.Len()) + } +} + +// Happy path: length of the list is not too long and does not need to be pruned. +func Test_checkedRounds_Prune_NoChange(t *testing.T) { + cr := newCheckedRounds(100, nil) + for i := id.Round(0); i < 10; i++ { + cr.Check(i) + } + + cr.Prune() + + if len(cr.m) != 10 || cr.l.Len() != 10 { + t.Errorf("Prune() did not remove the correct number of round IDs."+ + "\nexpected: %d\nmap: %d\nlist: %d", 5, + len(cr.m), cr.l.Len()) + } +} + +// Happy path. +func TestCheckedRounds_unmarshal(t *testing.T) { + expected := newCheckedRounds(100, nil) + rounds := make([][]byte, 10) + for i := id.Round(0); i < 10; i++ { + expected.Check(i) + rounds[i] = make([]byte, 8) + binary.LittleEndian.PutUint64(rounds[i], uint64(i)) + } + expected.recent = []id.Round{} + + cr := newCheckedRounds(100, nil) + cr.unmarshal(rounds) + + if !reflect.DeepEqual(expected, cr) { + t.Errorf("unmarshal() did not return the expected CheckedRounds."+ + "\nexpected: %+v\nreceived: %+v", expected, cr) + } +} diff --git a/storage/utility/blockStore.go b/storage/utility/blockStore.go new file mode 100644 index 000000000..a405e9b96 --- /dev/null +++ b/storage/utility/blockStore.go @@ -0,0 +1,274 @@ +package utility + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/xx_network/primitives/netTime" + "strconv" +) + +// Sizes in bytes +const ( + int64Size = 8 + marshalledSize = 4 * int64Size +) + +// Error messages +const ( + bsBuffLengthErr = "length of buffer %d != %d expected" + bsKvSaveErr = "failed to save blockStore to KV: %+v" + bsKvLoadErr = "failed to get BlockStore from storage: %+v" + bsKvUnmarshalErr = "failed to unmarshal BlockStore loaded from storage: %+v" + bJsonMarshalErr = "failed to JSON marshal block %d: %+v" + bKvSaveErr = "failed to save block %d to KV: %+v" + bKvDeleteErr = "failed to delete block %d from KV: %+v" + bKvLoadErr = "failed to get block %d from KV: %+v" + bJsonUnmarshalErr = "failed to JSON marshal block %d: %+v" +) + +// Storage keys and parts +const ( + delimiter = "/" + blockStoreKey = "blockStore" + blockStoreVersion = 0 + blockKey = "block" + blockVersion = 0 +) + +type Iterator interface { + Next() ([]byte, bool) +} + +type BlockStore struct { + block [][]byte + numBlocks int // The maximum number of blocks saved to the kv + blockSize int // The maximum number of items allowed in a block + firstSaved int // The index of the oldest block in the list + lastSaved int // The index of the newest block in the list + kv *versioned.KV +} + +// NewBlockStore returns a new BlockStore and saves it to storage. +func NewBlockStore(numBlocks, blockSize int, kv *versioned.KV) (*BlockStore, error) { + bs := &BlockStore{ + block: make([][]byte, 0, blockSize), + numBlocks: numBlocks, + blockSize: blockSize, + firstSaved: 0, + lastSaved: 0, + kv: kv, + } + + return bs, bs.save() +} + +// LoadBlockStore returns the BlockStore from storage and a concatenation of all +// blocks in storage. +func LoadBlockStore(kv *versioned.KV) (*BlockStore, [][]byte, error) { + bs := &BlockStore{kv: kv} + + // Get BlockStore parameters from storage + err := bs.load() + if err != nil { + return nil, nil, err + } + + // LoadBlockStore each block from storage and join together into single slice + var data, block [][]byte + for i := bs.firstSaved; i <= bs.lastSaved; i++ { + // Get the block from storage + block, err = bs.loadBlock(i) + if err != nil { + return nil, nil, err + } + + // Append block to the rest of the data + data = append(data, block...) + } + + // Save the last block into memory + bs.block = block + + return bs, data, nil +} + +// Store stores all items in the Iterator to storage in blocks. +func (bs *BlockStore) Store(iter Iterator) error { + // Iterate through all items in the Iterator and add each to the block in + // memory. When the block is full, it is saved to storage and a new block is + // added to until the iterator returns false. + for item, exists := iter.Next(); exists; item, exists = iter.Next() { + // If the block is full, save it to storage and start a new block + if len(bs.block) >= bs.blockSize { + if err := bs.saveBlock(); err != nil { + return err + } + + bs.lastSaved++ + bs.block = make([][]byte, 0, bs.blockSize) + } + + // Append the item to the block in memory + bs.block = append(bs.block, item) + } + + // Save the current partially filled block to storage + if err := bs.saveBlock(); err != nil { + return err + } + + // Calculate the new first saved index + oldFirstSaved := bs.firstSaved + if (bs.lastSaved+1)-bs.firstSaved > bs.numBlocks { + bs.firstSaved = (bs.lastSaved + 1) - bs.numBlocks + } + + // Save the BlockStorage parameters to storage + if err := bs.save(); err != nil { + return err + } + + // Delete all old blocks + bs.pruneBlocks(oldFirstSaved) + + return nil +} + +// saveBlock saves the block to an indexed storage. +func (bs *BlockStore) saveBlock() error { + // JSON marshal block + data, err := json.Marshal(bs.block) + if err != nil { + return errors.Errorf(bJsonMarshalErr, bs.lastSaved, err) + } + + // Construct versioning object + obj := versioned.Object{ + Version: blockVersion, + Timestamp: netTime.Now(), + Data: data, + } + + // Save to storage + err = bs.kv.Set(bs.getKey(bs.lastSaved), blockVersion, &obj) + if err != nil { + return errors.Errorf(bKvSaveErr, bs.lastSaved, err) + } + + return nil +} + +// loadBlock loads the block with the index from storage. +func (bs *BlockStore) loadBlock(i int) ([][]byte, error) { + // Get the data from the kv + obj, err := bs.kv.Get(bs.getKey(i), blockVersion) + if err != nil { + return nil, errors.Errorf(bKvLoadErr, i, err) + } + + // Unmarshal the block + var block [][]byte + err = json.Unmarshal(obj.Data, &block) + if err != nil { + return nil, errors.Errorf(bJsonUnmarshalErr, i, err) + } + + return block, nil +} + +// pruneBlocks reduces the number of saved blocks to numBlocks by removing the +// oldest blocks. +func (bs *BlockStore) pruneBlocks(firstSaved int) { + // Exit if no blocks need to be pruned + if (bs.lastSaved+1)-firstSaved < bs.numBlocks { + return + } + + // Delete all blocks before the firstSaved index + for ; firstSaved < bs.firstSaved; firstSaved++ { + err := bs.kv.Delete(bs.getKey(firstSaved), blockVersion) + if err != nil { + jww.WARN.Printf(bKvDeleteErr, bs.firstSaved, err) + } + } +} + +// getKey produces a block storage key for the given index. +func (bs *BlockStore) getKey(i int) string { + return blockKey + delimiter + strconv.Itoa(i) +} + +// save saves the parameters in BlockStore to storage. It does not save any +// block data. +func (bs *BlockStore) save() error { + // Construct versioning object + obj := versioned.Object{ + Version: blockStoreVersion, + Timestamp: netTime.Now(), + Data: bs.marshal(), + } + + // Save to storage + err := bs.kv.Set(blockStoreKey, blockStoreVersion, &obj) + if err != nil { + return errors.Errorf(bsKvSaveErr, err) + } + + return nil +} + +// load loads BlockStore parameters from storage. +func (bs *BlockStore) load() error { + // Get the data from the kv + obj, err := bs.kv.Get(blockStoreKey, blockStoreVersion) + if err != nil { + return errors.Errorf(bsKvLoadErr, err) + } + + // Unmarshal the data into a BlockStore + err = bs.unmarshal(obj.Data) + if err != nil { + return errors.Errorf(bsKvUnmarshalErr, err) + } + + return nil +} + +// marshal marshals the BlockStore integer values to a byte slice. +func (bs *BlockStore) marshal() []byte { + // Build list of values to store + values := []int{bs.numBlocks, bs.blockSize, bs.firstSaved, bs.lastSaved} + + // Convert each value to a byte slice and store + var buff bytes.Buffer + for _, val := range values { + b := make([]byte, int64Size) + binary.LittleEndian.PutUint64(b, uint64(val)) + buff.Write(b) + } + + // Return the bytes + return buff.Bytes() +} + +// unmarshal unmarshalls the BlockStore int values from the buffer. An error is +// returned if the length of the bytes is incorrect. +func (bs *BlockStore) unmarshal(b []byte) error { + // Return an error if the buffer is not the expected length + if len(b) != marshalledSize { + return errors.Errorf(bsBuffLengthErr, len(b), marshalledSize) + } + + // Convert the byte slices to ints and store + buff := bytes.NewBuffer(b) + bs.numBlocks = int(binary.LittleEndian.Uint64(buff.Next(int64Size))) + bs.blockSize = int(binary.LittleEndian.Uint64(buff.Next(int64Size))) + bs.firstSaved = int(binary.LittleEndian.Uint64(buff.Next(int64Size))) + bs.lastSaved = int(binary.LittleEndian.Uint64(buff.Next(int64Size))) + + return nil +} diff --git a/storage/utility/blockStore_test.go b/storage/utility/blockStore_test.go new file mode 100644 index 000000000..c9e2fb56d --- /dev/null +++ b/storage/utility/blockStore_test.go @@ -0,0 +1,414 @@ +package utility + +import ( + "bytes" + "encoding/binary" + "fmt" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/ekv" + "gitlab.com/xx_network/primitives/netTime" + "math/rand" + "reflect" + "strings" + "testing" +) + +type iter [][]byte + +func (it *iter) Next() ([]byte, bool) { + if len(*it) > 0 { + item := (*it)[0] + *it = (*it)[1:] + return item, true + } + + return nil, false +} + +// Happy path. +func TestNewBlockStore(t *testing.T) { + expected := &BlockStore{ + block: make([][]byte, 0, 20), + numBlocks: 50, + blockSize: 20, + firstSaved: 0, + lastSaved: 0, + kv: versioned.NewKV(make(ekv.Memstore)), + } + + bs, err := NewBlockStore(expected.numBlocks, expected.blockSize, expected.kv) + if err != nil { + t.Errorf("NewBlockStore() returned an error: %+v", err) + } + + if !reflect.DeepEqual(expected, bs) { + t.Errorf("NewBlockStore() did not return the expected BlockStore."+ + "\nexpected: %+v\nreceived: %+v", expected, bs) + } +} + +// Tests BlockStore storing and loading data in multiple situations. +func TestBlockStore_Store_LoadBlockStore(t *testing.T) { + values := []struct { + blockSize, numBlocks int + expectedFirstSaved, expectedLastSaved int + dataCutIndex int + }{ + {3, 5, 0, 3, 0}, // Multiple blocks, last block partial, no pruning + {10, 5, 0, 0, 0}, // Single block, last block full, no pruning + {15, 5, 0, 0, 0}, // Single block, last block partial, no pruning + + {2, 3, 2, 4, 4}, // Multiple blocks, last block partial, pruned + {5, 1, 1, 1, 5}, // Single block, last block full, pruned + {4, 1, 2, 2, 8}, // Single block, last block partial, pruned + } + + for i, v := range values { + // Create the initial data to store + iter := make(iter, 10) + for i := uint64(0); i < 10; i++ { + iter[i] = make([]byte, 8) + binary.LittleEndian.PutUint64(iter[i], i) + } + + // Calculate the expected data + expected := make([][]byte, len(iter[v.dataCutIndex:])) + copy(expected, iter[v.dataCutIndex:]) + + bs, err := NewBlockStore(v.numBlocks, v.blockSize, versioned.NewKV(make(ekv.Memstore))) + if err != nil { + t.Errorf("Failed to create new BlockStore (%d): %+v", i, err) + } + + // Attempt to store the data + err = bs.Store(&iter) + if err != nil { + t.Errorf("Store() returned an error (%d): %+v", i, err) + } + + if bs.firstSaved != v.expectedFirstSaved { + t.Errorf("Store() did not return the expected firstSaved (%d)."+ + "\nexpected: %d\nreceived: %d", i, v.expectedFirstSaved, bs.firstSaved) + } + + if bs.lastSaved != v.expectedLastSaved { + t.Errorf("Store() did not return the expected lastSaved (%d)."+ + "\nexpected: %d\nreceived: %d", i, v.expectedLastSaved, bs.lastSaved) + } + + // Attempt to load the data + loadBS, data, err := LoadBlockStore(bs.kv) + if err != nil { + t.Errorf("LoadBlockStore() returned an error (%d): %+v", i, err) + } + + // Check if the loaded BlockStore is correct + if !reflect.DeepEqual(bs, loadBS) { + t.Errorf("Loading wrong BlockStore from storage (%d)."+ + "\nexpected: %+v\nreceived: %+v", i, bs, loadBS) + } + + // Check if the loaded data is correct + if !reflect.DeepEqual(expected, data) { + t.Errorf("Loading wrong data from storage (%d)."+ + "\nexpected: %+v\nreceived: %+v", i, expected, data) + } + } +} + +// Tests that a block is successfully saved and loaded from storage. +func TestBlockStore_saveBlock_loadBlock(t *testing.T) { + prng := rand.New(rand.NewSource(42)) + bs := &BlockStore{ + block: make([][]byte, 0, 20), + numBlocks: 50, + blockSize: 20, + firstSaved: 0, + lastSaved: 0, + kv: versioned.NewKV(make(ekv.Memstore)), + } + + for i := range bs.block { + bs.block[i] = make([]byte, 32) + prng.Read(bs.block[i]) + } + + err := bs.saveBlock() + if err != nil { + t.Errorf("saveBlock() returned an error: %+v", err) + } + + newBS := &BlockStore{kv: bs.kv} + block, err := newBS.loadBlock(0) + if err != nil { + t.Errorf("loadBlock() returned an error: %+v", err) + } + + if !reflect.DeepEqual(bs.block, block) { + t.Errorf("Failed to save and load block to storage."+ + "\nexpected: %+v\nreceived: %+v", bs.block, block) + } +} + +// Error path: failed to save to KV. +func TestBlockStore_saveBlock_SaveError(t *testing.T) { + prng := rand.New(rand.NewSource(42)) + bs := &BlockStore{ + block: make([][]byte, 0, 20), + numBlocks: 50, + blockSize: 20, + firstSaved: 0, + lastSaved: 0, + kv: versioned.NewKV(make(ekv.Memstore)), + } + + for i := range bs.block { + bs.block[i] = make([]byte, 32) + prng.Read(bs.block[i]) + } + + err := bs.saveBlock() + if err != nil { + t.Errorf("saveBlock() returned an error: %+v", err) + } + + newBS := &BlockStore{kv: bs.kv} + block, err := newBS.loadBlock(0) + if err != nil { + t.Errorf("loadBlock() returned an error: %+v", err) + } + + if !reflect.DeepEqual(bs.block, block) { + t.Errorf("Failed to save and load block to storage."+ + "\nexpected: %+v\nreceived: %+v", bs.block, block) + } +} + +// Error path: loading of nonexistent key returns an error. +func TestBlockStore_loadBlock_LoadStorageError(t *testing.T) { + expectedErr := strings.SplitN(bKvLoadErr, "%", 2)[0] + bs := &BlockStore{kv: versioned.NewKV(make(ekv.Memstore))} + _, err := bs.loadBlock(0) + if err == nil || !strings.Contains(err.Error(), expectedErr) { + t.Errorf("loadBlock() did not return the expected error."+ + "\nexpected: %s\nreceived: %+v", expectedErr, err) + } +} + +// Error path: unmarshalling of invalid data fails. +func TestBlockStore_loadBlock_UnmarshalError(t *testing.T) { + bs := &BlockStore{kv: versioned.NewKV(make(ekv.Memstore))} + expectedErr := strings.SplitN(bJsonUnmarshalErr, "%", 2)[0] + + // Construct object with invalid data + obj := versioned.Object{ + Version: blockVersion, + Timestamp: netTime.Now(), + Data: []byte("invalid JSON"), + } + + // Save to storage + err := bs.kv.Set(bs.getKey(bs.lastSaved), blockVersion, &obj) + if err != nil { + t.Errorf("Failed to save data to KV: %+v", err) + } + + _, err = bs.loadBlock(0) + if err == nil || !strings.Contains(err.Error(), expectedErr) { + t.Errorf("loadBlock() did not return the expected error."+ + "\nexpected: %s\nreceived: %+v", expectedErr, err) + } +} + +// Happy path. +func TestBlockStore_pruneBlocks(t *testing.T) { + bs := &BlockStore{ + block: make([][]byte, 0, 32), + numBlocks: 5, + blockSize: 32, + firstSaved: 0, + lastSaved: 0, + kv: versioned.NewKV(make(ekv.Memstore)), + } + + // Save blocks to storage + for ; bs.lastSaved < 15; bs.lastSaved++ { + if err := bs.saveBlock(); err != nil { + t.Errorf("Failed to save block %d: %+v", bs.lastSaved, err) + } + } + + // Calculate the new first saved index + oldFirstSaved := bs.firstSaved + bs.firstSaved = bs.lastSaved - bs.numBlocks + + // Prune blocks + bs.pruneBlocks(oldFirstSaved) + + // Check that the old blocks were deleted + for i := 0; i < bs.lastSaved-bs.numBlocks; i++ { + if _, err := bs.kv.Get(bs.getKey(i), blockVersion); err == nil { + t.Errorf("pruneBlocks() failed to delete old block %d: %+v", i, err) + } + } + + // Check that the new blocks were not deleted + for i := bs.firstSaved; i < bs.lastSaved; i++ { + if _, err := bs.kv.Get(bs.getKey(i), blockVersion); err != nil { + t.Errorf("pruneBlocks() deleted block %d: %+v", i, err) + } + } + + // Call pruneBlocks when there are no blocks to prune + oldFirstSaved = bs.firstSaved + bs.firstSaved = bs.lastSaved - bs.numBlocks + bs.pruneBlocks(oldFirstSaved) + + // Check that the new blocks were not deleted + for i := bs.firstSaved; i < bs.lastSaved; i++ { + if _, err := bs.kv.Get(bs.getKey(i), blockVersion); err != nil { + t.Errorf("pruneBlocks() deleted block %d: %+v", i, err) + } + } +} + +// Consistency test. +func TestBlockStore_getKey_Consistency(t *testing.T) { + expectedKeys := []string{ + "block/0", "block/1", "block/2", "block/3", "block/4", + "block/5", "block/6", "block/7", "block/8", "block/9", + } + var bs BlockStore + + for i, expected := range expectedKeys { + key := bs.getKey(i) + if key != expected { + t.Errorf("getKey did not return the correct key for the index %d."+ + "\nexpected: %s\nreceived: %s", i, expected, key) + } + } +} + +// Tests that a BlockStore can be saved and loaded from the KV correctly. +func TestBlockStore_save_load(t *testing.T) { + bs := &BlockStore{ + numBlocks: 5, blockSize: 6, firstSaved: 7, lastSaved: 8, + kv: versioned.NewKV(make(ekv.Memstore)), + } + + err := bs.save() + if err != nil { + t.Errorf("save() returned an error: %+v", err) + } + + testBS := &BlockStore{kv: bs.kv} + err = testBS.load() + if err != nil { + t.Errorf("load() returned an error: %+v", err) + } + + if !reflect.DeepEqual(bs, testBS) { + t.Errorf("Failed to save and load BlockStore to KV."+ + "\nexpected: %+v\nreceived: %+v", bs, testBS) + } +} + +// Error path: loading of unsaved BlockStore fails. +func TestBlockStore_load_KvGetError(t *testing.T) { + expectedErr := strings.SplitN(bsKvLoadErr, "%", 2)[0] + + testBS := &BlockStore{kv: versioned.NewKV(make(ekv.Memstore))} + err := testBS.load() + if err == nil || !strings.Contains(err.Error(), expectedErr) { + t.Errorf("load() did not return an error for a nonexistent item in storage."+ + "\nexpected: %s\nreceived: %+v", expectedErr, err) + } +} + +// Error path: unmarshalling of invalid data fails. +func TestBlockStore_load_UnmarshalError(t *testing.T) { + expectedErr := strings.SplitN(bsKvUnmarshalErr, "%", 2)[0] + kv := versioned.NewKV(make(ekv.Memstore)) + + // Construct invalid versioning object + obj := versioned.Object{ + Version: blockStoreVersion, + Timestamp: netTime.Now(), + Data: []byte("invalid data"), + } + + // Save to storage + err := kv.Set(blockStoreKey, blockStoreVersion, &obj) + if err != nil { + t.Fatalf("failed to save object to storage: %+v", err) + } + + // Try to retrieve invalid object + testBS := &BlockStore{kv: kv} + err = testBS.load() + if err == nil || !strings.Contains(err.Error(), expectedErr) { + t.Errorf("load() did not return an error for a nonexistent item in storage."+ + "\nexpected: %s\nreceived: %+v", expectedErr, err) + } +} + +// Consistency test. +func TestBlockStore_unmarshal(t *testing.T) { + buff := []byte{5, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, + 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0} + expected := &BlockStore{numBlocks: 5, blockSize: 6, firstSaved: 7, lastSaved: 8} + + bs := &BlockStore{} + err := bs.unmarshal(buff) + if err != nil { + t.Errorf("unmarshal() returned an error: %+v", err) + } + + if !reflect.DeepEqual(expected, bs) { + t.Errorf("unmarshal() did not return the expected BlockStore."+ + "\nexpected: %+v\nreceived: %+v", expected, bs) + } +} + +// Error path: length of buffer incorrect. +func TestBlockStore_unmarshal_BuffLengthError(t *testing.T) { + expectedErr := fmt.Sprintf(bsBuffLengthErr, 0, marshalledSize) + bs := BlockStore{} + err := bs.unmarshal([]byte{}) + if err == nil || err.Error() != expectedErr { + t.Errorf("unmarshal() did not return the expected error for a buffer "+ + "of the wrong size.\nexpected: %s\nreceived: %+v", expectedErr, err) + } +} + +// Consistency test. +func TestBlockStore_marshal(t *testing.T) { + expected := []byte{5, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, + 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0} + bs := &BlockStore{numBlocks: 5, blockSize: 6, firstSaved: 7, lastSaved: 8} + + buff := bs.marshal() + + if !bytes.Equal(expected, buff) { + t.Errorf("marshal() did not return the expected bytes."+ + "\nexpected: %+v\nreceived: %+v", expected, buff) + } +} + +// Tests that marshal and unmarshal work together. +func TestBlockStore_marshal_unmarshal(t *testing.T) { + bs := &BlockStore{numBlocks: 5, blockSize: 6, firstSaved: 7, lastSaved: 8} + + buff := bs.marshal() + + testBS := &BlockStore{} + err := testBS.unmarshal(buff) + if err != nil { + t.Errorf("unmarshal() returned an error: %+v", err) + } + + if !reflect.DeepEqual(bs, testBS) { + t.Errorf("failed to marshal and unmarshal BlockStore."+ + "\nexpected: %+v\nreceived: %+v", bs, testBS) + } +} -- GitLab