Skip to content
Snippets Groups Projects
Commit 770c3cb1 authored by Josh Brooks's avatar Josh Brooks
Browse files

Fix stored bucket implementation

parent 0d9e4bda
No related branches found
No related tags found
2 merge requests!117Release,!83Add EKV backed leaky bucket storage
......@@ -8,11 +8,12 @@
package utility
import (
"github.com/pkg/errors"
"encoding/json"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/xx_network/primitives/netTime"
"gitlab.com/xx_network/primitives/rateLimiting"
"sync"
"time"
)
const (
......@@ -24,92 +25,92 @@ const (
// BucketStore stores a leaky bucket into storage. The bucket
// is saved in a JSON-able format.
type BucketStore struct {
bucket *rateLimiting.Bucket
kv *versioned.KV
mux sync.Mutex
}
// NewBucketStore creates a new, empty BucketStore and saves it to storage.
// If the primary method of modifying your BucketStore.bucket is via the method
// BucketStore.AddWithExternalParams, then the params argument may be
// default or junk data.
func NewBucketStore(params *rateLimiting.BucketParams,
kv *versioned.KV) (*BucketStore, error) {
// bucketDisk is a JSON-able structure used to store
// a rateLimiting.Bucket parameters.
type bucketDisk struct {
capacity uint32
timestamp int64
}
// NewStoredBucket creates a new, empty Bucket and saves it to storage.
func NewStoredBucket(capacity, leaked uint32, leakDuration time.Duration,
kv *versioned.KV) *rateLimiting.Bucket {
bs := &BucketStore{
bucket: rateLimiting.CreateBucketFromParams(params, nil),
kv: kv.Prefix(bucketStorePrefix),
mux: sync.Mutex{},
}
return bs, bs.save()
bs.save(0, time.Now().UnixNano())
return rateLimiting.CreateBucket(capacity, leaked, leakDuration, bs.save)
}
// AddWithExternalParams adds the specified number of tokens to the bucket
// given external bucket parameters rather than the params specified in
// the bucket. If an add is unsuccessful, an error is returned.
// Else the bucket is saved to storage.
func (s *BucketStore) AddWithExternalParams(tokens uint32,
params *rateLimiting.MapParams) error {
s.mux.Lock()
defer s.mux.Unlock()
success, _ := s.bucket.AddWithExternalParams(tokens,
params.Capacity, params.LeakedTokens,
params.LeakDuration)
if err := s.save(); err != nil {
return errors.WithMessagef(err, "Failed to save")
// save stores the buckets values into storage.
func (s *BucketStore) save(inBucket uint32, timestamp int64) {
// Create
bd := bucketDisk{
capacity: inBucket,
timestamp: timestamp,
}
data, err := json.Marshal(&bd)
if err != nil {
jww.ERROR.Printf("Failed to marshal %s bucket data for"+
" storage: %v", s.kv.GetPrefix(), err)
}
if !success {
return errors.Errorf("Failed to add to bucket")
obj := versioned.Object{
Version: bucketStoreVersion,
Timestamp: netTime.Now(),
Data: data,
}
return nil
err = s.kv.Set(bucketStoreKey, bucketStoreVersion, &obj)
if err != nil {
jww.ERROR.Printf("Failed to store %s bucket data: %v",
s.kv.GetPrefix(), err)
}
}
////////////////////////////////////////////////////////////////////////////////
// Storage Functions //
////////////////////////////////////////////////////////////////////////////////
// LoadBucketStore is a storage operation which loads a bucket from storage.
func LoadBucketStore(params *rateLimiting.BucketParams,
kv *versioned.KV) (*BucketStore, error) {
// LoadBucket is a storage operation which loads a bucket from storage.
func LoadBucket(capacity, leaked uint32, leakDuration time.Duration,
kv *versioned.KV) (*rateLimiting.Bucket, error) {
bs := &BucketStore{
bucket: rateLimiting.CreateBucketFromParams(params, nil),
kv: kv.Prefix(bucketStorePrefix),
mux: sync.Mutex{},
}
return bs, bs.load()
}
// save is a non-thread-safe method of saving the bucket to storage. It is
// the responsibility of the caller to hold the lock for BucketStore.
func (s *BucketStore) save() error {
data, err := s.bucket.MarshalJSON()
inBucket, ts, err := bs.load()
if err != nil {
return errors.Errorf("Failed to marshal bucket: %v", err)
}
obj := versioned.Object{
Version: bucketStoreVersion,
Timestamp: netTime.Now(),
Data: data,
return nil, err
}
return s.kv.Set(bucketStoreKey, bucketStoreVersion, &obj)
return rateLimiting.CreateBucketFromDB(capacity,
leaked, leakDuration, inBucket, ts, bs.save), nil
}
// load is a helper function which extracts the bucket data from storage
// and loads it back into BucketStore.
func (s *BucketStore) load() error {
func (s *BucketStore) load() (uint32, int64, error) {
// Load the versioned object
vo, err := s.kv.Get(bucketStoreKey, bucketStoreVersion)
if err != nil {
return err
return 0, 0, err
}
bd := bucketDisk{}
err = json.Unmarshal(vo.Data, &bd)
if err != nil {
return 0, 0, err
}
return s.bucket.UnmarshalJSON(vo.Data)
return bd.capacity, bd.timestamp, err
}
......@@ -137,6 +137,11 @@ func (v *KV) Set(key string, version uint64, object *Object) error {
return v.r.data.Set(key, object)
}
// GetPrefix returns the prefix of the KV.
func (v *KV) GetPrefix() string {
return v.prefix
}
//Returns a new KV with the new prefix
func (v *KV) Prefix(prefix string) *KV {
kvPrefix := KV{
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment