diff --git a/api/client.go b/api/client.go index 4f31af1e887c5f3cbddea6090c22b5742cdaadf5..6ea04f78cc4b4156899f93cf67947c3170aae79a 100644 --- a/api/client.go +++ b/api/client.go @@ -579,6 +579,13 @@ func (c *Client) GetNetworkInterface() interfaces.NetworkManager { return c.network } +// GetRateLimitParams retrieves the rate limiting parameters. +func (c *Client) GetRateLimitParams() (uint32, uint32, int64) { + rateLimitParams := c.storage.GetBucketParams().Get() + return rateLimitParams.Capacity, rateLimitParams.LeakedTokens, + rateLimitParams.LeakDuration.Nanoseconds() +} + // GetNodeRegistrationStatus gets the current state of node registration. It // returns the total number of nodes in the NDF and the number of those which // are currently registers with. An error is returned if the network is not @@ -783,7 +790,7 @@ func checkVersionAndSetupStorage(def *ndf.NetworkDefinition, // Create Storage passwordStr := string(password) storageSess, err := storage.New(storageDir, passwordStr, protoUser, - currentVersion, cmixGrp, e2eGrp, rngStreamGen) + currentVersion, cmixGrp, e2eGrp, rngStreamGen, def.RateLimits) if err != nil { return nil, err } diff --git a/bindings/client.go b/bindings/client.go index 0c0a588d32e0f98786defb2b0a37de8e41fe6482..7d18a1f4339a6fbd81a72e57e5d827a1deb5f5f7 100644 --- a/bindings/client.go +++ b/bindings/client.go @@ -486,6 +486,11 @@ func (c *Client) GetPreferredBins(countryCode string) (string, error) { return buff.String(), nil } +// GetRateLimitParams retrieves the rate limiting parameters. +func (c *Client) GetRateLimitParams() (uint32, uint32, int64) { + return c.api.GetRateLimitParams() +} + /* // SearchWithHandler is a non-blocking search that also registers // a callback interface for user disovery events. diff --git a/go.mod b/go.mod index aca1594f285da11ba13dc2432922f14220300500..21efb0e58e54ee23279d03a487bee008cb0e2752 100644 --- a/go.mod +++ b/go.mod @@ -22,9 +22,9 @@ require ( gitlab.com/elixxir/crypto v0.0.7-0.20211218190454-ecf7e6e1f41f gitlab.com/elixxir/ekv v0.1.5 gitlab.com/elixxir/primitives v0.0.3-0.20211208211148-752546cf2e46 - gitlab.com/xx_network/comms v0.0.4-0.20211215181459-0918c1141509 - gitlab.com/xx_network/crypto v0.0.5-0.20211215175729-3c916d3c5344 - gitlab.com/xx_network/primitives v0.0.4-0.20211014163031-53405cf191fb + gitlab.com/xx_network/comms v0.0.4-0.20211202195810-9dfc0b6cdb28 + gitlab.com/xx_network/crypto v0.0.5-0.20211014163843-57b345890686 + gitlab.com/xx_network/primitives v0.0.4-0.20211215214631-b245b75e5caf golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 golang.org/x/net v0.0.0-20210525063256-abc453219eb5 google.golang.org/genproto v0.0.0-20210105202744-fe13368bc0e1 // indirect diff --git a/go.sum b/go.sum index 94723085a30493052bd898c44127b3785906c113..ddd1ca127a2db4a68924a2340d70112679f34ee6 100644 --- a/go.sum +++ b/go.sum @@ -283,6 +283,10 @@ gitlab.com/xx_network/primitives v0.0.0-20200804183002-f99f7a7284da/go.mod h1:OK gitlab.com/xx_network/primitives v0.0.2/go.mod h1:cs0QlFpdMDI6lAo61lDRH2JZz+3aVkHy+QogOB6F/qc= gitlab.com/xx_network/primitives v0.0.4-0.20211014163031-53405cf191fb h1:0K9dyxFpDYzH9jYLwzg3+bRj9a0uJjwjQkMeIdTxduQ= gitlab.com/xx_network/primitives v0.0.4-0.20211014163031-53405cf191fb/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= +gitlab.com/xx_network/primitives v0.0.4-0.20211215010517-1917372c5ecc h1:a3FIvYTY5b5+wN6LzQpgkFSkdlfxR0u95xSSRFvi3Ho= +gitlab.com/xx_network/primitives v0.0.4-0.20211215010517-1917372c5ecc/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= +gitlab.com/xx_network/primitives v0.0.4-0.20211215214631-b245b75e5caf h1:lhkq8CQATHg2h/VQC0zFy/9iAelFlY2ciKetFcU8GUE= +gitlab.com/xx_network/primitives v0.0.4-0.20211215214631-b245b75e5caf/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= gitlab.com/xx_network/ring v0.0.3-0.20210527191221-ce3f170aabd5 h1:FY+4Rh1Q2rgLyv10aKJjhWApuKRCR/054XhreudfAvw= gitlab.com/xx_network/ring v0.0.3-0.20210527191221-ce3f170aabd5/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= diff --git a/network/follow.go b/network/follow.go index 50f98f0aca1371723361d838942a60b611ef575a..e25f5ff6cd43857aa7e5fd4cd5856f2a2eaa5172 100644 --- a/network/follow.go +++ b/network/follow.go @@ -194,6 +194,28 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, m.Session.SetNDF(m.GetInstance().GetPartialNdf().Get()) } + // Pull rate limiting parameter values from NDF + ndfRateLimitParam := m.Instance.GetPartialNdf().Get().RateLimits + ndfCapacity, ndfLeakedTokens, ndfLeakDuration := uint32(ndfRateLimitParam.Capacity), + uint32(ndfRateLimitParam.LeakedTokens), time.Duration(ndfRateLimitParam.LeakDuration) + + // Pull internal rate limiting parameters from RAM + internalRateLimitParams := m.Internal.Session.GetBucketParams().Get() + + // If any param value in our internal store does not + // match the NDF's corresponding value, update our internal store + if ndfCapacity != internalRateLimitParams.Capacity || + ndfLeakedTokens != internalRateLimitParams.LeakedTokens || + ndfLeakDuration != internalRateLimitParams.LeakDuration { + // Update internally stored params + err = m.Internal.Session.GetBucketParams(). + UpdateParams(ndfCapacity, ndfLeakedTokens, ndfLeakDuration) + if err != nil { + jww.ERROR.Printf("%+v", err) + return + } + } + // Update the address space size // todo: this is a fix for incompatibility with the live network // remove once the live network has been pushed to diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 5847e4e15631424d0b4165d11d49e0ec980cf98d..3c48ac49f2c9f624fc40f61d2d80744401e7fe7d 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -187,6 +187,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, elapsed, numRoundTries) jww.INFO.Print(m) events.Report(1, "MessageSend", "Metric", m) + onSend(1, session) return id.Round(bestRound.ID), ephID, nil } else { jww.FATAL.Panicf("Gateway %s returned no error, but failed "+ @@ -198,3 +199,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, return 0, ephemeral.Id{}, errors.New("failed to send the message, " + "unknown error") } + +// OnSend performs a bucket addition on a call to Manager.SendCMIX or +// Manager.SendManyCMIX, updating the bucket for the amount of messages sent. +func onSend(messages uint32, session *storage.Session) { + rateLimitingParam := session.GetBucketParams().Get() + session.GetBucket().AddWithExternalParams(messages, + rateLimitingParam.Capacity, rateLimitingParam.LeakedTokens, + rateLimitingParam.LeakDuration) + +} diff --git a/network/message/sendManyCmix.go b/network/message/sendManyCmix.go index 04b833344bf42e524cba6756bc4770785ffad18a..a99af6ddec241eb2f0b261f17b6a800f53fab5ca 100644 --- a/network/message/sendManyCmix.go +++ b/network/message/sendManyCmix.go @@ -203,6 +203,7 @@ func sendManyCmixHelper(sender *gateway.Sender, "in round %d", ephemeralIDsString, recipientString, bestRound.ID) jww.INFO.Print(m) events.Report(1, "MessageSendMany", "Metric", m) + onSend(uint32(len(msgs)), session) return id.Round(bestRound.ID), ephemeralIDs, nil } else { jww.FATAL.Panicf("Gateway %s returned no error, but failed to "+ diff --git a/storage/session.go b/storage/session.go index 8ee2b43d3119b222612389685360d53b26bc9abf..fea09353f9438ab0f3ca8c13094a3f307e6b7230 100644 --- a/storage/session.go +++ b/storage/session.go @@ -13,6 +13,7 @@ import ( "gitlab.com/elixxir/client/storage/edge" "gitlab.com/elixxir/client/storage/hostList" "gitlab.com/elixxir/client/storage/rounds" + "gitlab.com/xx_network/primitives/rateLimiting" "sync" "testing" "time" @@ -64,6 +65,8 @@ type Session struct { auth *auth.Store criticalMessages *utility.E2eMessageBuffer criticalRawMessages *utility.CmixMessageBuffer + bucketStore *rateLimiting.Bucket + bucketParamStore *utility.BucketParamStore garbledMessages *utility.MeteredCmixMessageBuffer reception *reception.Store clientVersion *clientVersion.Store @@ -89,8 +92,10 @@ func initStore(baseDir, password string) (*Session, error) { } // Creates new UserData in the session -func New(baseDir, password string, u userInterface.User, currentVersion version.Version, - cmixGrp, e2eGrp *cyclic.Group, rng *fastRNG.StreamGenerator) (*Session, error) { +func New(baseDir, password string, u userInterface.User, + currentVersion version.Version, cmixGrp, e2eGrp *cyclic.Group, + rng *fastRNG.StreamGenerator, + rateLimitParams ndf.RateLimiting) (*Session, error) { s, err := initStore(baseDir, password) if err != nil { @@ -159,8 +164,19 @@ func New(baseDir, password string, u userInterface.User, currentVersion version. s.edgeCheck, err = edge.NewStore(s.kv, u.ReceptionID) if err != nil { - return nil, errors.WithMessage(err, "Failed to edge check store") + return nil, errors.WithMessage(err, "Failed to create edge check store") } + + s.bucketParamStore, err = utility.NewBucketParamsStore( + uint32(rateLimitParams.Capacity), uint32(rateLimitParams.LeakedTokens), + time.Duration(rateLimitParams.LeakDuration), s.kv) + if err != nil { + return nil, errors.WithMessage(err, "Failed to create bucket params store") + } + + s.bucketStore = utility.NewStoredBucket(uint32(rateLimitParams.Capacity), uint32(rateLimitParams.LeakedTokens), + time.Duration(rateLimitParams.LeakDuration), s.kv) + return s, nil } @@ -244,6 +260,20 @@ func Load(baseDir, password string, currentVersion version.Version, return nil, errors.WithMessage(err, "Failed to load edge check store") } + s.bucketParamStore, err = utility.LoadBucketParamsStore(s.kv) + if err != nil { + return nil, errors.WithMessage(err, + "Failed to load bucket params store") + } + + params := s.bucketParamStore.Get() + s.bucketStore, err = utility.LoadBucket(params.Capacity, params.LeakedTokens, + params.LeakDuration, s.kv) + if err != nil { + return nil, errors.WithMessage(err, + "Failed to load bucket store") + } + return s, nil } @@ -333,6 +363,19 @@ func (s *Session) GetEdge() *edge.Store { return s.edgeCheck } +// GetBucketParams returns the bucket params store. +func (s *Session) GetBucketParams() *utility.BucketParamStore { + s.mux.RLock() + defer s.mux.RUnlock() + return s.bucketParamStore +} + +func (s *Session) GetBucket() *rateLimiting.Bucket { + s.mux.RLock() + defer s.mux.RUnlock() + return s.bucketStore +} + // Get an object from the session func (s *Session) Get(key string) (*versioned.Object, error) { return s.kv.Get(key, currentSessionVersion) @@ -407,6 +450,12 @@ func InitTestingSession(i interface{}) *Session { } s.cmix = cmixStore + s.bucketParamStore, err = utility.NewBucketParamsStore(10, 11, 12, kv) + if err != nil { + jww.FATAL.Panicf("InitTestingSession failed to create NewBucketParamsStore session: %+v", err) + } + s.bucketStore = utility.NewStoredBucket(10, 11, 12, kv) + e2eStore, err := e2e.NewStore(cmixGrp, kv, cmixGrp.NewInt(2), uid, fastRNG.NewStreamGenerator(7, 3, csprng.NewSystemRNG)) if err != nil { diff --git a/storage/utility/bucket.go b/storage/utility/bucket.go new file mode 100644 index 0000000000000000000000000000000000000000..5548cdfe52321d58ca181130dbca4010d5aa4b16 --- /dev/null +++ b/storage/utility/bucket.go @@ -0,0 +1,116 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package utility + +import ( + "encoding/json" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/xx_network/primitives/netTime" + "gitlab.com/xx_network/primitives/rateLimiting" + "time" +) + +const ( + bucketStorePrefix = "bucketStore" + bucketStoreKey = "bucketStoreKey" + bucketStoreVersion = 0 +) + +// BucketStore stores a leaky bucket into storage. The bucket +// is saved in a JSON-able format. +type BucketStore struct { + kv *versioned.KV +} + +// bucketDisk is a JSON-able structure used to store +// a rateLimiting.Bucket parameters. +type bucketDisk struct { + capacity uint32 + timestamp int64 +} + +// NewStoredBucket creates a new, empty Bucket and saves it to storage. +func NewStoredBucket(capacity, leaked uint32, leakDuration time.Duration, + kv *versioned.KV) *rateLimiting.Bucket { + bs := &BucketStore{ + kv: kv.Prefix(bucketStorePrefix), + } + + bs.save(0, time.Now().UnixNano()) + + return rateLimiting.CreateBucket(capacity, leaked, leakDuration, bs.save) +} + +// save stores the buckets values into storage. +func (s *BucketStore) save(inBucket uint32, timestamp int64) { + + // Create + bd := bucketDisk{ + capacity: inBucket, + timestamp: timestamp, + } + + data, err := json.Marshal(&bd) + if err != nil { + jww.ERROR.Printf("Failed to marshal %s bucket data for"+ + " storage: %v", s.kv.GetPrefix(), err) + } + + obj := versioned.Object{ + Version: bucketStoreVersion, + Timestamp: netTime.Now(), + Data: data, + } + + err = s.kv.Set(bucketStoreKey, bucketStoreVersion, &obj) + + if err != nil { + jww.ERROR.Printf("Failed to store %s bucket data: %v", + s.kv.GetPrefix(), err) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Storage Functions // +//////////////////////////////////////////////////////////////////////////////// + +// LoadBucket is a storage operation which loads a bucket from storage. +func LoadBucket(capacity, leaked uint32, leakDuration time.Duration, + kv *versioned.KV) (*rateLimiting.Bucket, error) { + bs := &BucketStore{ + kv: kv.Prefix(bucketStorePrefix), + } + inBucket, ts, err := bs.load() + if err != nil { + return nil, err + } + + return rateLimiting.CreateBucketFromDB(capacity, + leaked, leakDuration, inBucket, ts, bs.save), nil +} + +// load is a helper function which extracts the bucket data from storage +// and loads it back into BucketStore. +func (s *BucketStore) load() (uint32, int64, error) { + // Load the versioned object + vo, err := s.kv.Get(bucketStoreKey, bucketStoreVersion) + if err != nil { + return 0, 0, err + } + + bd := bucketDisk{} + + err = json.Unmarshal(vo.Data, &bd) + if err != nil { + return 0, 0, err + } + + return bd.capacity, bd.timestamp, err + +} diff --git a/storage/utility/bucketParams.go b/storage/utility/bucketParams.go new file mode 100644 index 0000000000000000000000000000000000000000..e07797719349cb7fa2c809709d08d3207f95886c --- /dev/null +++ b/storage/utility/bucketParams.go @@ -0,0 +1,166 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package utility + +import ( + "bytes" + "encoding/binary" + "github.com/pkg/errors" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/xx_network/primitives/netTime" + "gitlab.com/xx_network/primitives/rateLimiting" + "sync" + "time" +) + +const ( + bucketParamsPrefix = "bucketParamPrefix" + bucketParamsKey = "bucketParamKey" + bucketParamsVersion = 0 +) + +// BucketParamStore is the storage object for bucket params. Updated via the +// network follower. +type BucketParamStore struct { + params *rateLimiting.MapParams + mux sync.RWMutex + kv *versioned.KV +} + +// NewBucketParamsStore is the constructor for a BucketParamStore. +func NewBucketParamsStore(capacity, leakedTokens uint32, + leakDuration time.Duration, kv *versioned.KV) (*BucketParamStore, error) { + + bps := &BucketParamStore{ + params: &rateLimiting.MapParams{ + Capacity: capacity, + LeakedTokens: leakedTokens, + LeakDuration: leakDuration, + }, + mux: sync.RWMutex{}, + kv: kv.Prefix(bucketParamsPrefix), + } + + return bps, bps.save() +} + +// Get reads and returns te bucket params. +func (s *BucketParamStore) Get() *rateLimiting.MapParams { + s.mux.RLock() + defer s.mux.RUnlock() + return s.params +} + +// UpdateParams updates the parameters to store. +func (s *BucketParamStore) UpdateParams(capacity, leakedTokens uint32, + leakDuration time.Duration) error { + s.mux.Lock() + defer s.mux.Unlock() + + s.params = &rateLimiting.MapParams{ + Capacity: capacity, + LeakedTokens: leakedTokens, + LeakDuration: leakDuration, + } + + return s.save() +} + +//////////////////////////////////////////////////////////////////////////////// +// Storage Functions // +//////////////////////////////////////////////////////////////////////////////// + +// LoadBucketParamsStore loads the bucket params data from storage and constructs +// a BucketParamStore. +func LoadBucketParamsStore(kv *versioned.KV) (*BucketParamStore, error) { + bps := &BucketParamStore{ + params: &rateLimiting.MapParams{}, + mux: sync.RWMutex{}, + kv: kv.Prefix(bucketParamsPrefix), + } + + return bps, bps.load() + +} + +// save stores the bucket params into storage. +func (s *BucketParamStore) save() error { + + // Initiate stored object + object := &versioned.Object{ + Version: bucketParamsVersion, + Timestamp: netTime.Now(), + Data: s.marshal(), + } + + // Store object into storage + return s.kv.Set(bucketParamsKey, bucketParamsVersion, object) +} + +// load extracts the bucket params from store and loads it into the +// BucketParamStore. +func (s *BucketParamStore) load() error { + // Load params from KV + vo, err := s.kv.Get(bucketParamsKey, bucketParamsVersion) + if err != nil { + return errors.Errorf("Failed to load from KV: %v", err) + } + + // Unmarshal bucket params + loadedParams := unmarshalBucketParams(vo.Data) + + // Place params into RAM object + s.params = loadedParams + + return nil + +} + +// marshal serializes the rateLimiting.MapParams into byte data. +func (s *BucketParamStore) marshal() []byte { + buff := bytes.NewBuffer(nil) + + // Write capacity to buffer + b := make([]byte, 4) + binary.LittleEndian.PutUint32(b, s.params.Capacity) + buff.Write(b) + + // Write leakedTokens to buffer + b = make([]byte, 4) + binary.LittleEndian.PutUint32(b, s.params.LeakedTokens) + buff.Write(b) + + // Write leakDuration to buffer + b = make([]byte, 8) + binary.LittleEndian.PutUint64(b, uint64(s.params.LeakDuration.Nanoseconds())) + buff.Write(b) + + return buff.Bytes() +} + +// unmarshalBucketParams deserializes the bucket params +// into a rateLimiting.MapParams. +func unmarshalBucketParams(b []byte) *rateLimiting.MapParams { + buff := bytes.NewBuffer(b) + + // Load capacity + capacity := binary.LittleEndian.Uint32(buff.Next(4)) + + // Load leakedTokens + leakedTokents := binary.LittleEndian.Uint32(buff.Next(4)) + + // Load leakDuration + leakDuration := time.Duration(binary.LittleEndian.Uint32(buff.Next(8))) + + return &rateLimiting.MapParams{ + Capacity: capacity, + LeakedTokens: leakedTokents, + LeakDuration: leakDuration, + } + +} diff --git a/storage/utility/bucketParams_test.go b/storage/utility/bucketParams_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c2ce1bdbfdb99bb0dd0dba059216556805cb03bc --- /dev/null +++ b/storage/utility/bucketParams_test.go @@ -0,0 +1,70 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package utility + +import ( + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/ekv" + "reflect" + "testing" + "time" +) + +// todo: write tests + +func TestNewBucketParamsStore(t *testing.T) { + kv := versioned.NewKV(make(ekv.Memstore)) + capacity, leakedTokens, leakDuration := uint32(10), uint32(11), time.Duration(12) + bps, err := NewBucketParamsStore(capacity, leakedTokens, leakDuration, kv) + if err != nil { + t.Fatalf("NewBucketParamsStore error: %v", err) + } + + newParams := bps.params + if newParams.Capacity != capacity || newParams.LeakedTokens != leakedTokens || + newParams.LeakDuration != leakDuration { + t.Fatalf("Unexpected values in BucketParamStore!"+ + "\n\tExpected params {capacity: %d, leakedToken %d, leakDuration: %d}"+ + "\n\tReceived params {capacity: %d, leakedToken %d, leakDuration: %d}", + capacity, leakedTokens, leakDuration, + newParams.Capacity, newParams.LeakedTokens, newParams.LeakDuration) + } + + vo, err := kv.Prefix(bucketParamsPrefix).Get(bucketParamsKey, bucketParamsVersion) + if err != nil { + t.Fatalf("Failed to load from KV: %v", err) + } + + loadedParams := unmarshalBucketParams(vo.Data) + + if !reflect.DeepEqual(newParams, loadedParams) { + t.Fatalf("Loaded params from store does not match initialized values."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", newParams, loadedParams) + } +} + +func TestLoadBucketParamsStore(t *testing.T) { + kv := versioned.NewKV(make(ekv.Memstore)) + capacity, leakedTokens, leakDuration := uint32(10), uint32(11), time.Duration(12) + bps, err := NewBucketParamsStore(capacity, leakedTokens, leakDuration, kv) + if err != nil { + t.Fatalf("NewBucketParamsStore error: %v", err) + } + + loadedBps, err := LoadBucketParamsStore(kv) + if err != nil { + t.Fatalf("LoadBucketParamsStore error: %v", err) + } + + if !reflect.DeepEqual(loadedBps, bps) { + t.Fatalf("Loaded params from store does not match initialized values."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", bps, loadedBps) + } +} diff --git a/storage/versioned/kv.go b/storage/versioned/kv.go index 083944e4db7c3ebefae3ea711811fc49fbc80c20..1e4267c5f330b6a3815b1c5ed0abafc810eda409 100644 --- a/storage/versioned/kv.go +++ b/storage/versioned/kv.go @@ -137,6 +137,11 @@ func (v *KV) Set(key string, version uint64, object *Object) error { return v.r.data.Set(key, object) } +// GetPrefix returns the prefix of the KV. +func (v *KV) GetPrefix() string { + return v.prefix +} + //Returns a new KV with the new prefix func (v *KV) Prefix(prefix string) *KV { kvPrefix := KV{