Skip to content
Snippets Groups Projects
Commit a632099c authored by Josh Brooks's avatar Josh Brooks
Browse files

Add bucket to session object

parent e3e02982
Branches
Tags
2 merge requests!117Release,!83Add EKV backed leaky bucket storage
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"gitlab.com/elixxir/client/storage/edge" "gitlab.com/elixxir/client/storage/edge"
"gitlab.com/elixxir/client/storage/hostList" "gitlab.com/elixxir/client/storage/hostList"
"gitlab.com/elixxir/client/storage/rounds" "gitlab.com/elixxir/client/storage/rounds"
"gitlab.com/xx_network/primitives/rateLimiting"
"sync" "sync"
"testing" "testing"
"time" "time"
...@@ -64,6 +65,8 @@ type Session struct { ...@@ -64,6 +65,8 @@ type Session struct {
auth *auth.Store auth *auth.Store
criticalMessages *utility.E2eMessageBuffer criticalMessages *utility.E2eMessageBuffer
criticalRawMessages *utility.CmixMessageBuffer criticalRawMessages *utility.CmixMessageBuffer
bucketStore *rateLimiting.Bucket
bucketParamStore *utility.BucketParamStore
garbledMessages *utility.MeteredCmixMessageBuffer garbledMessages *utility.MeteredCmixMessageBuffer
reception *reception.Store reception *reception.Store
clientVersion *clientVersion.Store clientVersion *clientVersion.Store
...@@ -89,8 +92,10 @@ func initStore(baseDir, password string) (*Session, error) { ...@@ -89,8 +92,10 @@ func initStore(baseDir, password string) (*Session, error) {
} }
// Creates new UserData in the session // Creates new UserData in the session
func New(baseDir, password string, u userInterface.User, currentVersion version.Version, func New(baseDir, password string, u userInterface.User,
cmixGrp, e2eGrp *cyclic.Group, rng *fastRNG.StreamGenerator) (*Session, error) { currentVersion version.Version, cmixGrp, e2eGrp *cyclic.Group,
rng *fastRNG.StreamGenerator,
rateLimitParams ndf.RateLimiting) (*Session, error) {
s, err := initStore(baseDir, password) s, err := initStore(baseDir, password)
if err != nil { if err != nil {
...@@ -159,8 +164,19 @@ func New(baseDir, password string, u userInterface.User, currentVersion version. ...@@ -159,8 +164,19 @@ func New(baseDir, password string, u userInterface.User, currentVersion version.
s.edgeCheck, err = edge.NewStore(s.kv, u.ReceptionID) s.edgeCheck, err = edge.NewStore(s.kv, u.ReceptionID)
if err != nil { if err != nil {
return nil, errors.WithMessage(err, "Failed to edge check store") return nil, errors.WithMessage(err, "Failed to create edge check store")
} }
s.bucketParamStore, err = utility.NewBucketParamsStore(
uint32(rateLimitParams.Capacity), uint32(rateLimitParams.LeakedTokens),
time.Duration(rateLimitParams.LeakDuration), s.kv)
if err != nil {
return nil, errors.WithMessage(err, "Failed to create bucket params store")
}
s.bucketStore = utility.NewStoredBucket(uint32(rateLimitParams.Capacity), uint32(rateLimitParams.LeakedTokens),
time.Duration(rateLimitParams.LeakDuration), s.kv)
return s, nil return s, nil
} }
...@@ -244,6 +260,20 @@ func Load(baseDir, password string, currentVersion version.Version, ...@@ -244,6 +260,20 @@ func Load(baseDir, password string, currentVersion version.Version,
return nil, errors.WithMessage(err, "Failed to load edge check store") return nil, errors.WithMessage(err, "Failed to load edge check store")
} }
s.bucketParamStore, err = utility.LoadBucketParamsStore(s.kv)
if err != nil {
return nil, errors.WithMessage(err,
"Failed to load bucket params store")
}
params := s.bucketParamStore.Get()
s.bucketStore, err = utility.LoadBucket(params.Capacity, params.LeakedTokens,
params.LeakDuration, s.kv)
if err != nil {
return nil, errors.WithMessage(err,
"Failed to load bucket store")
}
return s, nil return s, nil
} }
...@@ -333,6 +363,19 @@ func (s *Session) GetEdge() *edge.Store { ...@@ -333,6 +363,19 @@ func (s *Session) GetEdge() *edge.Store {
return s.edgeCheck return s.edgeCheck
} }
// GetBucketParams returns the bucket params store.
func (s *Session) GetBucketParams() *utility.BucketParamStore {
s.mux.RLock()
defer s.mux.RUnlock()
return s.bucketParamStore
}
func (s *Session) GetBucket() *rateLimiting.Bucket {
s.mux.RLock()
defer s.mux.RUnlock()
return s.bucketStore
}
// Get an object from the session // Get an object from the session
func (s *Session) Get(key string) (*versioned.Object, error) { func (s *Session) Get(key string) (*versioned.Object, error) {
return s.kv.Get(key, currentSessionVersion) return s.kv.Get(key, currentSessionVersion)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment