diff --git a/api/permissioning.go b/api/permissioning.go index 5c681ce886325aadc514a389979e36b49cfdd2c7..8573366306087791f71ea806bd3110691037ce77 100644 --- a/api/permissioning.go +++ b/api/permissioning.go @@ -16,12 +16,12 @@ import ( ) // Returns an error if registration fails. -func RegisterWithPermissioning(ctx context.Context, comms client.Comms, registrationCode string) error { +func (c *Client) RegisterWithPermissioning(registrationCode string) error { instance := ctx.Manager.GetInstance() instance.GetPartialNdf() //Check the regState is in proper state for registration - regState := ctx.Session.GetRegistrationStatus() + regState := c.storage.GetRegistrationStatus() if regState != storage.KeyGenComplete { return errors.Errorf("Attempting to register before key generation!") } diff --git a/network/rounds/retreive.go b/network/rounds/retreive.go index 6ed794e53b99e8409b02d7ef964400827dc898d4..dcb08c525f7560425de26bc424409dc3dbe4ee68 100644 --- a/network/rounds/retreive.go +++ b/network/rounds/retreive.go @@ -37,7 +37,6 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, } } } - } func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo, @@ -56,7 +55,7 @@ func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo, msgReq := &pb.GetMessages{ ClientID: m.Uid.Marshal(), //TODO: fix this, should not be a byte slice - //RoundID: uint64(rid), + RoundID: uint64(rid), } msgResp, err := comms.RequestMessages(gwHost, msgReq) // Fail the round if an error occurs so it can be tried again later diff --git a/storage/e2e/store.go b/storage/e2e/store.go index b4129b6f1e8eda72978848d7a83114f27a67bd89..f26aadb15b8b7e7036b0a15e291d466c6b156f50 100644 --- a/storage/e2e/store.go +++ b/storage/e2e/store.go @@ -172,6 +172,11 @@ func (s *Store) PopKey(f format.Fingerprint) (*Key, bool) { return s.fingerprints.Pop(f) } +//Key exists for a key fingerprint +func (s *Store) CheckKey(f format.Fingerprint) bool { + return s.fingerprints.Check(f) +} + //Returns the diffie hellman private key func (s *Store) GetDHPrivateKey() *cyclic.Int { return s.dhPrivateKey @@ -272,6 +277,15 @@ func (f *fingerprints) remove(keys []*Key) { } } +func (f *fingerprints) Check(fingerprint format.Fingerprint) bool { + f.mux.RLock() + defer f.mux.RUnlock() + + _, ok := f.toKey[fingerprint] + return ok +} + + func (f *fingerprints) Pop(fingerprint format.Fingerprint) (*Key, bool) { f.mux.Lock() defer f.mux.Unlock() diff --git a/storage/session.go b/storage/session.go index f543ac4423965d7f941b24c1ac9926662aa3c480..3a9c8f51d77e39e5b825d1b3637b1e9869c76d41 100644 --- a/storage/session.go +++ b/storage/session.go @@ -45,7 +45,7 @@ type Session struct { conversations *conversation.Store partition *partition.Store criticalMessages *utility.E2eMessageBuffer - garbledMessages *utility.CmixMessageBuffer + garbledMessages *utility.MeteredCmixMessageBuffer checkedRounds *utility.KnownRounds } @@ -101,7 +101,7 @@ func New(baseDir, password string, uid *id.ID, salt []byte, rsaKey *rsa.PrivateK return nil, errors.WithMessage(err, "Failed to create session") } - s.garbledMessages, err = utility.NewCmixMessageBuffer(s.kv, garbledMessagesKey) + s.garbledMessages, err = utility.NewMeteredCmixMessageBuffer(s.kv, garbledMessagesKey) if err != nil { return nil, errors.WithMessage(err, "Failed to create session") } @@ -149,7 +149,7 @@ func Load(baseDir, password string, rng *fastRNG.StreamGenerator) (*Session, err return nil, errors.WithMessage(err, "Failed to load session") } - s.garbledMessages, err = utility.LoadCmixMessageBuffer(s.kv, garbledMessagesKey) + s.garbledMessages, err = utility.LoadMeteredCmixMessageBuffer(s.kv, garbledMessagesKey) if err != nil { return nil, errors.WithMessage(err, "Failed to load session") } @@ -189,7 +189,7 @@ func (s *Session) GetCriticalMessages() *utility.E2eMessageBuffer { return s.criticalMessages } -func (s *Session) GetGarbledMessages() *utility.CmixMessageBuffer { +func (s *Session) GetGarbledMessages() *utility.MeteredCmixMessageBuffer { s.mux.RLock() defer s.mux.RUnlock() return s.garbledMessages diff --git a/storage/utility/meteredCmixMessageBuffer.go b/storage/utility/meteredCmixMessageBuffer.go new file mode 100644 index 0000000000000000000000000000000000000000..688887f7d5db6059aaf5c3bbdbd4314e7e866843 --- /dev/null +++ b/storage/utility/meteredCmixMessageBuffer.go @@ -0,0 +1,151 @@ +package utility + +import ( + "crypto/md5" + "encoding/json" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/primitives/format" + "time" +) + +const currentMeteredCmixMessageVersion = 0 + +type meteredCmixMessageHandler struct{} + +type meteredCmixMessage struct { + M []byte + Count uint + Timestamp time.Time +} + +// SaveMessage saves the message as a versioned object at the specified key +// in the key value store. +func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error { + msg := m.(meteredCmixMessage) + + marshaled, err := json.Marshal(&msg) + if err != nil { + return errors.WithMessage(err, "Failed to marshal metered "+ + "cmix message") + } + + // Create versioned object + obj := versioned.Object{ + Version: currentMeteredCmixMessageVersion, + Timestamp: time.Now(), + Data: marshaled, + } + + // Save versioned object + return kv.Set(key, &obj) +} + +// LoadMessage returns the message with the specified key from the key value +// store. An empty message and error are returned if the message could not be +// retrieved. +func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { + // Load the versioned object + vo, err := kv.Get(key) + if err != nil { + return format.Message{}, err + } + + msg := &meteredCmixMessage{} + err := json.Unmarshal(vo.Data, msg) + if err != nil { + return nil, errors.WithMessage(err, "Failed to unmarshal "+ + "metered cmix message") + } + + // Create message from data + return format.Unmarshal(vo.Data), nil +} + +// DeleteMessage deletes the message with the specified key from the key value +// store. +func (*meteredCmixMessageHandler) DeleteMessage(kv *versioned.KV, key string) error { + return kv.Delete(key) +} + +// HashMessage generates a hash of the message. +func (*meteredCmixMessageHandler) HashMessage(m interface{}) MessageHash { + msg := m.(meteredCmixMessage) + + return md5.Sum(msg.M) +} + +// CmixMessageBuffer wraps the message buffer to store and load raw cmix +// messages. +type MeteredCmixMessageBuffer struct { + mb *MessageBuffer + kv *versioned.KV + key string +} + +func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { + mb, err := NewMessageBuffer(kv, &cmixMessageHandler{}, key) + if err != nil { + return nil, err + } + + return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil +} + +func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) { + mb, err := LoadMessageBuffer(kv, &cmixMessageHandler{}, key) + if err != nil { + return nil, err + } + + return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil +} + +func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message) { + msg := meteredCmixMessage{ + M: m.Marshal(), + Count: 0, + Timestamp: time.Now(), + } + mcmb.mb.Add(msg) +} + +func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message) { + msg := meteredCmixMessage{ + M: m.Marshal(), + Count: 0, + Timestamp: time.Now(), + } + mcmb.mb.AddProcessing(msg) +} + +func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, uint, time.Time, bool) { + m, ok := mcmb.mb.Next() + if !ok { + return format.Message{}, 0, time.Time{}, false + } + + msg := m.(meteredCmixMessage) + rtnCnt := msg.Count + + //increment the count and save + msg.Count++ + mcmh := &meteredCmixMessageHandler{} + err := mcmh.SaveMessage(mcmb.kv, msg, makeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg))) + if err != nil { + jww.FATAL.Panicf("Failed to save metered message after count "+ + "update: %s", err) + } + + msfFormat := format.Unmarshal(msg.M) + return msfFormat, rtnCnt, msg.Timestamp, true +} + +func (mcmb *MeteredCmixMessageBuffer) Succeeded(m format.Message) { + mcmb.mb.Succeeded(m) +} + +func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message) { + mcmb.mb.Failed(m) +}