From 23f9637a863fd1df227684d0dbc51f9c8b71fd9b Mon Sep 17 00:00:00 2001
From: Benjamin Wenger <ben@elixxir.ioo>
Date: Tue, 22 Sep 2020 11:40:32 -0700
Subject: [PATCH] added meteredCmixBuffer

---
 api/permissioning.go                        |   4 +-
 network/rounds/retreive.go                  |   3 +-
 storage/e2e/store.go                        |  14 ++
 storage/session.go                          |   8 +-
 storage/utility/meteredCmixMessageBuffer.go | 151 ++++++++++++++++++++
 5 files changed, 172 insertions(+), 8 deletions(-)
 create mode 100644 storage/utility/meteredCmixMessageBuffer.go

diff --git a/api/permissioning.go b/api/permissioning.go
index 5c681ce88..857336630 100644
--- a/api/permissioning.go
+++ b/api/permissioning.go
@@ -16,12 +16,12 @@ import (
 )
 
 // Returns an error if registration fails.
-func RegisterWithPermissioning(ctx context.Context, comms client.Comms, registrationCode string) error {
+func (c *Client) RegisterWithPermissioning(registrationCode string) error {
 	instance := ctx.Manager.GetInstance()
 	instance.GetPartialNdf()
 
 	//Check the regState is in proper state for registration
-	regState := ctx.Session.GetRegistrationStatus()
+	regState := c.storage.GetRegistrationStatus()
 	if regState != storage.KeyGenComplete {
 		return errors.Errorf("Attempting to register before key generation!")
 	}
diff --git a/network/rounds/retreive.go b/network/rounds/retreive.go
index 6ed794e53..dcb08c525 100644
--- a/network/rounds/retreive.go
+++ b/network/rounds/retreive.go
@@ -37,7 +37,6 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
 			}
 		}
 	}
-
 }
 
 func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo,
@@ -56,7 +55,7 @@ func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo,
 	msgReq := &pb.GetMessages{
 		ClientID: m.Uid.Marshal(),
 		//TODO: fix this, should not be a byte slice
-		//RoundID:  uint64(rid),
+		RoundID: uint64(rid),
 	}
 	msgResp, err := comms.RequestMessages(gwHost, msgReq)
 	// Fail the round if an error occurs so it can be tried again later
diff --git a/storage/e2e/store.go b/storage/e2e/store.go
index b4129b6f1..f26aadb15 100644
--- a/storage/e2e/store.go
+++ b/storage/e2e/store.go
@@ -172,6 +172,11 @@ func (s *Store) PopKey(f format.Fingerprint) (*Key, bool) {
 	return s.fingerprints.Pop(f)
 }
 
+//Key exists for a key fingerprint
+func (s *Store) CheckKey(f format.Fingerprint) bool {
+	return s.fingerprints.Check(f)
+}
+
 //Returns the diffie hellman private key
 func (s *Store) GetDHPrivateKey() *cyclic.Int {
 	return s.dhPrivateKey
@@ -272,6 +277,15 @@ func (f *fingerprints) remove(keys []*Key) {
 	}
 }
 
+func (f *fingerprints) Check(fingerprint format.Fingerprint) bool {
+	f.mux.RLock()
+	defer f.mux.RUnlock()
+
+	_, ok := f.toKey[fingerprint]
+	return ok
+}
+
+
 func (f *fingerprints) Pop(fingerprint format.Fingerprint) (*Key, bool) {
 	f.mux.Lock()
 	defer f.mux.Unlock()
diff --git a/storage/session.go b/storage/session.go
index f543ac442..3a9c8f51d 100644
--- a/storage/session.go
+++ b/storage/session.go
@@ -45,7 +45,7 @@ type Session struct {
 	conversations    *conversation.Store
 	partition        *partition.Store
 	criticalMessages *utility.E2eMessageBuffer
-	garbledMessages  *utility.CmixMessageBuffer
+	garbledMessages  *utility.MeteredCmixMessageBuffer
 	checkedRounds    *utility.KnownRounds
 }
 
@@ -101,7 +101,7 @@ func New(baseDir, password string, uid *id.ID, salt []byte, rsaKey *rsa.PrivateK
 		return nil, errors.WithMessage(err, "Failed to create session")
 	}
 
-	s.garbledMessages, err = utility.NewCmixMessageBuffer(s.kv, garbledMessagesKey)
+	s.garbledMessages, err = utility.NewMeteredCmixMessageBuffer(s.kv, garbledMessagesKey)
 	if err != nil {
 		return nil, errors.WithMessage(err, "Failed to create session")
 	}
@@ -149,7 +149,7 @@ func Load(baseDir, password string, rng *fastRNG.StreamGenerator) (*Session, err
 		return nil, errors.WithMessage(err, "Failed to load session")
 	}
 
-	s.garbledMessages, err = utility.LoadCmixMessageBuffer(s.kv, garbledMessagesKey)
+	s.garbledMessages, err = utility.LoadMeteredCmixMessageBuffer(s.kv, garbledMessagesKey)
 	if err != nil {
 		return nil, errors.WithMessage(err, "Failed to load session")
 	}
@@ -189,7 +189,7 @@ func (s *Session) GetCriticalMessages() *utility.E2eMessageBuffer {
 	return s.criticalMessages
 }
 
-func (s *Session) GetGarbledMessages() *utility.CmixMessageBuffer {
+func (s *Session) GetGarbledMessages() *utility.MeteredCmixMessageBuffer {
 	s.mux.RLock()
 	defer s.mux.RUnlock()
 	return s.garbledMessages
diff --git a/storage/utility/meteredCmixMessageBuffer.go b/storage/utility/meteredCmixMessageBuffer.go
new file mode 100644
index 000000000..688887f7d
--- /dev/null
+++ b/storage/utility/meteredCmixMessageBuffer.go
@@ -0,0 +1,151 @@
+package utility
+
+import (
+	"crypto/md5"
+	"encoding/json"
+	"github.com/pkg/errors"
+	jww "github.com/spf13/jwalterweatherman"
+	"gitlab.com/elixxir/client/storage/versioned"
+	"gitlab.com/elixxir/primitives/format"
+	"time"
+)
+
+const currentMeteredCmixMessageVersion = 0
+
+type meteredCmixMessageHandler struct{}
+
+type meteredCmixMessage struct {
+	M         []byte
+	Count     uint
+	Timestamp time.Time
+}
+
+// SaveMessage saves the message as a versioned object at the specified key
+// in the key value store.
+func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error {
+	msg := m.(meteredCmixMessage)
+
+	marshaled, err := json.Marshal(&msg)
+	if err != nil {
+		return errors.WithMessage(err, "Failed to marshal metered "+
+			"cmix message")
+	}
+
+	// Create versioned object
+	obj := versioned.Object{
+		Version:   currentMeteredCmixMessageVersion,
+		Timestamp: time.Now(),
+		Data:      marshaled,
+	}
+
+	// Save versioned object
+	return kv.Set(key, &obj)
+}
+
+// LoadMessage returns the message with the specified key from the key value
+// store. An empty message and error are returned if the message could not be
+// retrieved.
+func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) {
+	// Load the versioned object
+	vo, err := kv.Get(key)
+	if err != nil {
+		return format.Message{}, err
+	}
+
+	msg := &meteredCmixMessage{}
+	err := json.Unmarshal(vo.Data, msg)
+	if err != nil {
+		return nil, errors.WithMessage(err, "Failed to unmarshal "+
+			"metered cmix message")
+	}
+
+	// Create message from data
+	return format.Unmarshal(vo.Data), nil
+}
+
+// DeleteMessage deletes the message with the specified key from the key value
+// store.
+func (*meteredCmixMessageHandler) DeleteMessage(kv *versioned.KV, key string) error {
+	return kv.Delete(key)
+}
+
+// HashMessage generates a hash of the message.
+func (*meteredCmixMessageHandler) HashMessage(m interface{}) MessageHash {
+	msg := m.(meteredCmixMessage)
+
+	return md5.Sum(msg.M)
+}
+
+// CmixMessageBuffer wraps the message buffer to store and load raw cmix
+// messages.
+type MeteredCmixMessageBuffer struct {
+	mb  *MessageBuffer
+	kv  *versioned.KV
+	key string
+}
+
+func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) {
+	mb, err := NewMessageBuffer(kv, &cmixMessageHandler{}, key)
+	if err != nil {
+		return nil, err
+	}
+
+	return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil
+}
+
+func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) {
+	mb, err := LoadMessageBuffer(kv, &cmixMessageHandler{}, key)
+	if err != nil {
+		return nil, err
+	}
+
+	return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil
+}
+
+func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message) {
+	msg := meteredCmixMessage{
+		M:         m.Marshal(),
+		Count:     0,
+		Timestamp: time.Now(),
+	}
+	mcmb.mb.Add(msg)
+}
+
+func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message) {
+	msg := meteredCmixMessage{
+		M:         m.Marshal(),
+		Count:     0,
+		Timestamp: time.Now(),
+	}
+	mcmb.mb.AddProcessing(msg)
+}
+
+func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, uint, time.Time, bool) {
+	m, ok := mcmb.mb.Next()
+	if !ok {
+		return format.Message{}, 0, time.Time{}, false
+	}
+
+	msg := m.(meteredCmixMessage)
+	rtnCnt := msg.Count
+
+	//increment the count and save
+	msg.Count++
+	mcmh := &meteredCmixMessageHandler{}
+	err := mcmh.SaveMessage(mcmb.kv, msg, makeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg)))
+	if err != nil {
+		jww.FATAL.Panicf("Failed to save metered message after count "+
+			"update: %s", err)
+	}
+
+	msfFormat := format.Unmarshal(msg.M)
+	return msfFormat, rtnCnt, msg.Timestamp, true
+}
+
+func (mcmb *MeteredCmixMessageBuffer) Succeeded(m format.Message) {
+	mcmb.mb.Succeeded(m)
+}
+
+func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message) {
+	mcmb.mb.Failed(m)
+}
-- 
GitLab