diff --git a/storage/utility/bucket.go b/storage/utility/bucket.go index db2a3e805c7a1519c9cc50fce55923a59377800b..5548cdfe52321d58ca181130dbca4010d5aa4b16 100644 --- a/storage/utility/bucket.go +++ b/storage/utility/bucket.go @@ -8,11 +8,12 @@ package utility import ( - "github.com/pkg/errors" + "encoding/json" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/rateLimiting" - "sync" + "time" ) const ( @@ -24,92 +25,92 @@ const ( // BucketStore stores a leaky bucket into storage. The bucket // is saved in a JSON-able format. type BucketStore struct { - bucket *rateLimiting.Bucket - kv *versioned.KV - mux sync.Mutex + kv *versioned.KV } -// NewBucketStore creates a new, empty BucketStore and saves it to storage. -// If the primary method of modifying your BucketStore.bucket is via the method -// BucketStore.AddWithExternalParams, then the params argument may be -// default or junk data. -func NewBucketStore(params *rateLimiting.BucketParams, - kv *versioned.KV) (*BucketStore, error) { +// bucketDisk is a JSON-able structure used to store +// a rateLimiting.Bucket parameters. +type bucketDisk struct { + capacity uint32 + timestamp int64 +} + +// NewStoredBucket creates a new, empty Bucket and saves it to storage. +func NewStoredBucket(capacity, leaked uint32, leakDuration time.Duration, + kv *versioned.KV) *rateLimiting.Bucket { bs := &BucketStore{ - bucket: rateLimiting.CreateBucketFromParams(params, nil), - kv: kv.Prefix(bucketStorePrefix), - mux: sync.Mutex{}, + kv: kv.Prefix(bucketStorePrefix), } - return bs, bs.save() + bs.save(0, time.Now().UnixNano()) + + return rateLimiting.CreateBucket(capacity, leaked, leakDuration, bs.save) } -// AddWithExternalParams adds the specified number of tokens to the bucket -// given external bucket parameters rather than the params specified in -// the bucket. If an add is unsuccessful, an error is returned. -// Else the bucket is saved to storage. -func (s *BucketStore) AddWithExternalParams(tokens uint32, - params *rateLimiting.MapParams) error { - s.mux.Lock() - defer s.mux.Unlock() - - success, _ := s.bucket.AddWithExternalParams(tokens, - params.Capacity, params.LeakedTokens, - params.LeakDuration) - if err := s.save(); err != nil { - return errors.WithMessagef(err, "Failed to save") +// save stores the buckets values into storage. +func (s *BucketStore) save(inBucket uint32, timestamp int64) { + + // Create + bd := bucketDisk{ + capacity: inBucket, + timestamp: timestamp, + } + + data, err := json.Marshal(&bd) + if err != nil { + jww.ERROR.Printf("Failed to marshal %s bucket data for"+ + " storage: %v", s.kv.GetPrefix(), err) } - if !success { - return errors.Errorf("Failed to add to bucket") + obj := versioned.Object{ + Version: bucketStoreVersion, + Timestamp: netTime.Now(), + Data: data, } - return nil + err = s.kv.Set(bucketStoreKey, bucketStoreVersion, &obj) + + if err != nil { + jww.ERROR.Printf("Failed to store %s bucket data: %v", + s.kv.GetPrefix(), err) + } } //////////////////////////////////////////////////////////////////////////////// // Storage Functions // //////////////////////////////////////////////////////////////////////////////// -// LoadBucketStore is a storage operation which loads a bucket from storage. -func LoadBucketStore(params *rateLimiting.BucketParams, - kv *versioned.KV) (*BucketStore, error) { +// LoadBucket is a storage operation which loads a bucket from storage. +func LoadBucket(capacity, leaked uint32, leakDuration time.Duration, + kv *versioned.KV) (*rateLimiting.Bucket, error) { bs := &BucketStore{ - bucket: rateLimiting.CreateBucketFromParams(params, nil), - kv: kv.Prefix(bucketStorePrefix), - mux: sync.Mutex{}, + kv: kv.Prefix(bucketStorePrefix), } - - return bs, bs.load() - -} - -// save is a non-thread-safe method of saving the bucket to storage. It is -// the responsibility of the caller to hold the lock for BucketStore. -func (s *BucketStore) save() error { - data, err := s.bucket.MarshalJSON() + inBucket, ts, err := bs.load() if err != nil { - return errors.Errorf("Failed to marshal bucket: %v", err) - } - - obj := versioned.Object{ - Version: bucketStoreVersion, - Timestamp: netTime.Now(), - Data: data, + return nil, err } - return s.kv.Set(bucketStoreKey, bucketStoreVersion, &obj) + return rateLimiting.CreateBucketFromDB(capacity, + leaked, leakDuration, inBucket, ts, bs.save), nil } // load is a helper function which extracts the bucket data from storage // and loads it back into BucketStore. -func (s *BucketStore) load() error { +func (s *BucketStore) load() (uint32, int64, error) { // Load the versioned object vo, err := s.kv.Get(bucketStoreKey, bucketStoreVersion) if err != nil { - return err + return 0, 0, err + } + + bd := bucketDisk{} + + err = json.Unmarshal(vo.Data, &bd) + if err != nil { + return 0, 0, err } - return s.bucket.UnmarshalJSON(vo.Data) + return bd.capacity, bd.timestamp, err } diff --git a/storage/versioned/kv.go b/storage/versioned/kv.go index 083944e4db7c3ebefae3ea711811fc49fbc80c20..1e4267c5f330b6a3815b1c5ed0abafc810eda409 100644 --- a/storage/versioned/kv.go +++ b/storage/versioned/kv.go @@ -137,6 +137,11 @@ func (v *KV) Set(key string, version uint64, object *Object) error { return v.r.data.Set(key, object) } +// GetPrefix returns the prefix of the KV. +func (v *KV) GetPrefix() string { + return v.prefix +} + //Returns a new KV with the new prefix func (v *KV) Prefix(prefix string) *KV { kvPrefix := KV{