Skip to content
Snippets Groups Projects
Commit 23f9637a authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

added meteredCmixBuffer

parent f21a23a2
No related branches found
No related tags found
No related merge requests found
......@@ -16,12 +16,12 @@ import (
)
// Returns an error if registration fails.
func RegisterWithPermissioning(ctx context.Context, comms client.Comms, registrationCode string) error {
func (c *Client) RegisterWithPermissioning(registrationCode string) error {
instance := ctx.Manager.GetInstance()
instance.GetPartialNdf()
//Check the regState is in proper state for registration
regState := ctx.Session.GetRegistrationStatus()
regState := c.storage.GetRegistrationStatus()
if regState != storage.KeyGenComplete {
return errors.Errorf("Attempting to register before key generation!")
}
......
......@@ -37,7 +37,6 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
}
}
}
}
func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo,
......@@ -56,7 +55,7 @@ func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo,
msgReq := &pb.GetMessages{
ClientID: m.Uid.Marshal(),
//TODO: fix this, should not be a byte slice
//RoundID: uint64(rid),
RoundID: uint64(rid),
}
msgResp, err := comms.RequestMessages(gwHost, msgReq)
// Fail the round if an error occurs so it can be tried again later
......
......@@ -172,6 +172,11 @@ func (s *Store) PopKey(f format.Fingerprint) (*Key, bool) {
return s.fingerprints.Pop(f)
}
//Key exists for a key fingerprint
func (s *Store) CheckKey(f format.Fingerprint) bool {
return s.fingerprints.Check(f)
}
//Returns the diffie hellman private key
func (s *Store) GetDHPrivateKey() *cyclic.Int {
return s.dhPrivateKey
......@@ -272,6 +277,15 @@ func (f *fingerprints) remove(keys []*Key) {
}
}
func (f *fingerprints) Check(fingerprint format.Fingerprint) bool {
f.mux.RLock()
defer f.mux.RUnlock()
_, ok := f.toKey[fingerprint]
return ok
}
func (f *fingerprints) Pop(fingerprint format.Fingerprint) (*Key, bool) {
f.mux.Lock()
defer f.mux.Unlock()
......
......@@ -45,7 +45,7 @@ type Session struct {
conversations *conversation.Store
partition *partition.Store
criticalMessages *utility.E2eMessageBuffer
garbledMessages *utility.CmixMessageBuffer
garbledMessages *utility.MeteredCmixMessageBuffer
checkedRounds *utility.KnownRounds
}
......@@ -101,7 +101,7 @@ func New(baseDir, password string, uid *id.ID, salt []byte, rsaKey *rsa.PrivateK
return nil, errors.WithMessage(err, "Failed to create session")
}
s.garbledMessages, err = utility.NewCmixMessageBuffer(s.kv, garbledMessagesKey)
s.garbledMessages, err = utility.NewMeteredCmixMessageBuffer(s.kv, garbledMessagesKey)
if err != nil {
return nil, errors.WithMessage(err, "Failed to create session")
}
......@@ -149,7 +149,7 @@ func Load(baseDir, password string, rng *fastRNG.StreamGenerator) (*Session, err
return nil, errors.WithMessage(err, "Failed to load session")
}
s.garbledMessages, err = utility.LoadCmixMessageBuffer(s.kv, garbledMessagesKey)
s.garbledMessages, err = utility.LoadMeteredCmixMessageBuffer(s.kv, garbledMessagesKey)
if err != nil {
return nil, errors.WithMessage(err, "Failed to load session")
}
......@@ -189,7 +189,7 @@ func (s *Session) GetCriticalMessages() *utility.E2eMessageBuffer {
return s.criticalMessages
}
func (s *Session) GetGarbledMessages() *utility.CmixMessageBuffer {
func (s *Session) GetGarbledMessages() *utility.MeteredCmixMessageBuffer {
s.mux.RLock()
defer s.mux.RUnlock()
return s.garbledMessages
......
package utility
import (
"crypto/md5"
"encoding/json"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/primitives/format"
"time"
)
const currentMeteredCmixMessageVersion = 0
type meteredCmixMessageHandler struct{}
type meteredCmixMessage struct {
M []byte
Count uint
Timestamp time.Time
}
// SaveMessage saves the message as a versioned object at the specified key
// in the key value store.
func (*meteredCmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error {
msg := m.(meteredCmixMessage)
marshaled, err := json.Marshal(&msg)
if err != nil {
return errors.WithMessage(err, "Failed to marshal metered "+
"cmix message")
}
// Create versioned object
obj := versioned.Object{
Version: currentMeteredCmixMessageVersion,
Timestamp: time.Now(),
Data: marshaled,
}
// Save versioned object
return kv.Set(key, &obj)
}
// LoadMessage returns the message with the specified key from the key value
// store. An empty message and error are returned if the message could not be
// retrieved.
func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) {
// Load the versioned object
vo, err := kv.Get(key)
if err != nil {
return format.Message{}, err
}
msg := &meteredCmixMessage{}
err := json.Unmarshal(vo.Data, msg)
if err != nil {
return nil, errors.WithMessage(err, "Failed to unmarshal "+
"metered cmix message")
}
// Create message from data
return format.Unmarshal(vo.Data), nil
}
// DeleteMessage deletes the message with the specified key from the key value
// store.
func (*meteredCmixMessageHandler) DeleteMessage(kv *versioned.KV, key string) error {
return kv.Delete(key)
}
// HashMessage generates a hash of the message.
func (*meteredCmixMessageHandler) HashMessage(m interface{}) MessageHash {
msg := m.(meteredCmixMessage)
return md5.Sum(msg.M)
}
// CmixMessageBuffer wraps the message buffer to store and load raw cmix
// messages.
type MeteredCmixMessageBuffer struct {
mb *MessageBuffer
kv *versioned.KV
key string
}
func NewMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) {
mb, err := NewMessageBuffer(kv, &cmixMessageHandler{}, key)
if err != nil {
return nil, err
}
return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil
}
func LoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (*MeteredCmixMessageBuffer, error) {
mb, err := LoadMessageBuffer(kv, &cmixMessageHandler{}, key)
if err != nil {
return nil, err
}
return &MeteredCmixMessageBuffer{mb: mb, kv: kv, key: key}, nil
}
func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message) {
msg := meteredCmixMessage{
M: m.Marshal(),
Count: 0,
Timestamp: time.Now(),
}
mcmb.mb.Add(msg)
}
func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message) {
msg := meteredCmixMessage{
M: m.Marshal(),
Count: 0,
Timestamp: time.Now(),
}
mcmb.mb.AddProcessing(msg)
}
func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, uint, time.Time, bool) {
m, ok := mcmb.mb.Next()
if !ok {
return format.Message{}, 0, time.Time{}, false
}
msg := m.(meteredCmixMessage)
rtnCnt := msg.Count
//increment the count and save
msg.Count++
mcmh := &meteredCmixMessageHandler{}
err := mcmh.SaveMessage(mcmb.kv, msg, makeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg)))
if err != nil {
jww.FATAL.Panicf("Failed to save metered message after count "+
"update: %s", err)
}
msfFormat := format.Unmarshal(msg.M)
return msfFormat, rtnCnt, msg.Timestamp, true
}
func (mcmb *MeteredCmixMessageBuffer) Succeeded(m format.Message) {
mcmb.mb.Succeeded(m)
}
func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message) {
mcmb.mb.Failed(m)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment