Skip to content
Snippets Groups Projects
Commit a4dba2f2 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

finished parse

parent 1b813f5d
No related branches found
No related tags found
3 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast
Showing
with 118 additions and 38 deletions
package e2e
import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/e2e/ratchet/partner/session"
)
type fpGenerator struct {
*Manager
}
func (fp *fpGenerator) AddKey(k *session.Cypher) {
err := fp.net.AddFingerprint(fp.myID, k.Fingerprint(), &processor{
cy: k,
m: fp.Manager,
})
if err != nil {
jww.ERROR.Printf("Could not add fingerprint %s: %+v",
k.Fingerprint(), err)
}
}
func (fp *fpGenerator) DeleteKey(k *session.Cypher) {
fp.net.DeleteFingerprint(fp.myID, k.Fingerprint())
}
...@@ -24,5 +24,3 @@ type Handler interface { ...@@ -24,5 +24,3 @@ type Handler interface {
AddService(tag string, source []byte, processor message2.Processor) AddService(tag string, source []byte, processor message2.Processor)
RemoveService(tag string, source []byte) RemoveService(tag string, source []byte)
} }
type Manager interface {
}
package e2e
import (
"gitlab.com/elixxir/client/e2e/parse"
"gitlab.com/elixxir/client/e2e/ratchet"
"gitlab.com/elixxir/client/e2e/receive"
"gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/cyclic"
"gitlab.com/xx_network/primitives/id"
)
type Manager struct {
*ratchet.Ratchet
*receive.Switchboard
partitioner parse.Partitioner
net network.Manager
myID *id.ID
}
func InitManager(kv *versioned.KV, myID *id.ID, privKey *cyclic.Int) {
}
File moved
File moved
File moved
File moved
...@@ -10,8 +10,10 @@ package parse ...@@ -10,8 +10,10 @@ package parse
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/e2e/parse/conversation"
"gitlab.com/elixxir/client/e2e/parse/partition"
"gitlab.com/elixxir/client/e2e/receive" "gitlab.com/elixxir/client/e2e/receive"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
"time" "time"
...@@ -25,16 +27,18 @@ type Partitioner struct { ...@@ -25,16 +27,18 @@ type Partitioner struct {
partContentsSize int partContentsSize int
deltaFirstPart int deltaFirstPart int
maxSize int maxSize int
session *storage.Session conversation *conversation.Store
partition *partition.Store
} }
func NewPartitioner(messageSize int, session *storage.Session) Partitioner { func NewPartitioner(messageSize int, kv *versioned.KV) Partitioner {
p := Partitioner{ p := Partitioner{
baseMessageSize: messageSize, baseMessageSize: messageSize,
firstContentsSize: messageSize - firstHeaderLen, firstContentsSize: messageSize - firstHeaderLen,
partContentsSize: messageSize - headerLen, partContentsSize: messageSize - headerLen,
deltaFirstPart: firstHeaderLen - headerLen, deltaFirstPart: firstHeaderLen - headerLen,
session: session, conversation: conversation.NewStore(kv),
partition: partition.NewOrLoad(kv),
} }
p.maxSize = p.firstContentsSize + (MaxMessageParts-1)*p.partContentsSize p.maxSize = p.firstContentsSize + (MaxMessageParts-1)*p.partContentsSize
return p return p
...@@ -49,7 +53,7 @@ func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType, ...@@ -49,7 +53,7 @@ func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType,
} }
// Get the ID of the sent message // Get the ID of the sent message
fullMessageID, messageID := p.session.Conversations().Get(recipient).GetNextSendID() fullMessageID, messageID := p.conversation.Get(recipient).GetNextSendID()
// Get the number of parts of the message; this equates to just a linear // Get the number of parts of the message; this equates to just a linear
// equation // equation
...@@ -80,19 +84,19 @@ func (p Partitioner) HandlePartition(sender *id.ID, ...@@ -80,19 +84,19 @@ func (p Partitioner) HandlePartition(sender *id.ID,
fm := FirstMessagePartFromBytes(contents) fm := FirstMessagePartFromBytes(contents)
// Handle the message ID // Handle the message ID
messageID := p.session.Conversations().Get(sender). messageID := p.conversation.Get(sender).
ProcessReceivedMessageID(fm.GetID()) ProcessReceivedMessageID(fm.GetID())
storeageTimestamp := netTime.Now() storeageTimestamp := netTime.Now()
return p.session.Partition().AddFirst(sender, fm.GetType(), return p.partition.AddFirst(sender, fm.GetType(),
messageID, fm.GetPart(), fm.GetNumParts(), fm.GetTimestamp(), storeageTimestamp, messageID, fm.GetPart(), fm.GetNumParts(), fm.GetTimestamp(), storeageTimestamp,
fm.GetSizedContents(), relationshipFingerprint) fm.GetSizedContents(), relationshipFingerprint)
} else { } else {
// If it is a subsequent message part, handle it as so // If it is a subsequent message part, handle it as so
mp := messagePartFromBytes(contents) mp := messagePartFromBytes(contents)
messageID := p.session.Conversations().Get(sender). messageID := p.conversation.Get(sender).
ProcessReceivedMessageID(mp.GetID()) ProcessReceivedMessageID(mp.GetID())
return p.session.Partition().Add(sender, messageID, mp.GetPart(), return p.partition.Add(sender, messageID, mp.GetPart(),
mp.GetSizedContents(), relationshipFingerprint) mp.GetSizedContents(), relationshipFingerprint)
} }
} }
......
...@@ -12,7 +12,8 @@ import ( ...@@ -12,7 +12,8 @@ import (
"fmt" "fmt"
"github.com/pkg/errors" "github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/e2e/receive"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/e2e" "gitlab.com/elixxir/crypto/e2e"
"gitlab.com/elixxir/ekv" "gitlab.com/elixxir/ekv"
...@@ -34,7 +35,7 @@ type multiPartMessage struct { ...@@ -34,7 +35,7 @@ type multiPartMessage struct {
SenderTimestamp time.Time SenderTimestamp time.Time
// Timestamp in which message was stored in RAM // Timestamp in which message was stored in RAM
StorageTimestamp time.Time StorageTimestamp time.Time
MessageType message.Type MessageType catalog.MessageType
parts [][]byte parts [][]byte
kv *versioned.KV kv *versioned.KV
...@@ -121,7 +122,7 @@ func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) { ...@@ -121,7 +122,7 @@ func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) {
} }
} }
func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8, func (mpm *multiPartMessage) AddFirst(mt catalog.MessageType, partNumber uint8,
numParts uint8, senderTimestamp, storageTimestamp time.Time, part []byte) { numParts uint8, senderTimestamp, storageTimestamp time.Time, part []byte) {
mpm.mux.Lock() mpm.mux.Lock()
defer mpm.mux.Unlock() defer mpm.mux.Unlock()
...@@ -151,11 +152,11 @@ func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8, ...@@ -151,11 +152,11 @@ func (mpm *multiPartMessage) AddFirst(mt message.Type, partNumber uint8,
} }
} }
func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message.Receive, bool) { func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (receive.Message, bool) {
mpm.mux.Lock() mpm.mux.Lock()
if mpm.NumParts == 0 || mpm.NumParts != mpm.PresentParts { if mpm.NumParts == 0 || mpm.NumParts != mpm.PresentParts {
mpm.mux.Unlock() mpm.mux.Unlock()
return message.Receive{}, false return receive.Message{}, false
} }
// Make sure the parts buffer is large enough to load all parts from disk // Make sure the parts buffer is large enough to load all parts from disk
...@@ -181,13 +182,11 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message ...@@ -181,13 +182,11 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (message
} }
// Return the message // Return the message
m := message.Receive{ m := receive.Message{
Payload: reconstructed, Payload: reconstructed,
MessageType: mpm.MessageType, MessageType: mpm.MessageType,
Sender: mpm.Sender, Sender: mpm.Sender,
Timestamp: mpm.SenderTimestamp, Timestamp: mpm.SenderTimestamp,
// Encryption will be set externally
Encryption: 0,
ID: mid, ID: mid,
} }
......
File moved
File moved
...@@ -11,7 +11,8 @@ import ( ...@@ -11,7 +11,8 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/e2e/receive"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
...@@ -34,15 +35,7 @@ type Store struct { ...@@ -34,15 +35,7 @@ type Store struct {
mux sync.Mutex mux sync.Mutex
} }
func New(kv *versioned.KV) *Store { func NewOrLoad(kv *versioned.KV) *Store {
return &Store{
multiParts: make(map[multiPartID]*multiPartMessage),
activeParts: make(map[*multiPartMessage]bool),
kv: kv.Prefix(packagePrefix),
}
}
func Load(kv *versioned.KV) *Store {
partitionStore := &Store{ partitionStore := &Store{
multiParts: make(map[multiPartID]*multiPartMessage), multiParts: make(map[multiPartID]*multiPartMessage),
activeParts: make(map[*multiPartMessage]bool), activeParts: make(map[*multiPartMessage]bool),
...@@ -56,9 +49,9 @@ func Load(kv *versioned.KV) *Store { ...@@ -56,9 +49,9 @@ func Load(kv *versioned.KV) *Store {
return partitionStore return partitionStore
} }
func (s *Store) AddFirst(partner *id.ID, mt message.Type, messageID uint64, func (s *Store) AddFirst(partner *id.ID, mt catalog.MessageType, messageID uint64,
partNum, numParts uint8, senderTimestamp, storageTimestamp time.Time, partNum, numParts uint8, senderTimestamp, storageTimestamp time.Time,
part []byte, relationshipFingerprint []byte) (message.Receive, bool) { part []byte, relationshipFingerprint []byte) (receive.Message, bool) {
mpm := s.load(partner, messageID) mpm := s.load(partner, messageID)
...@@ -78,7 +71,7 @@ func (s *Store) AddFirst(partner *id.ID, mt message.Type, messageID uint64, ...@@ -78,7 +71,7 @@ func (s *Store) AddFirst(partner *id.ID, mt message.Type, messageID uint64,
} }
func (s *Store) Add(partner *id.ID, messageID uint64, partNum uint8, func (s *Store) Add(partner *id.ID, messageID uint64, partNum uint8,
part []byte, relationshipFingerprint []byte) (message.Receive, bool) { part []byte, relationshipFingerprint []byte) (receive.Message, bool) {
mpm := s.load(partner, messageID) mpm := s.load(partner, messageID)
......
...@@ -27,7 +27,7 @@ func TestNew(t *testing.T) { ...@@ -27,7 +27,7 @@ func TestNew(t *testing.T) {
kv: rootKv.Prefix(packagePrefix), kv: rootKv.Prefix(packagePrefix),
} }
store := New(rootKv) store := NewOrLoad(rootKv)
if !reflect.DeepEqual(expectedStore, store) { if !reflect.DeepEqual(expectedStore, store) {
t.Errorf("New() did not return the expecte Store."+ t.Errorf("New() did not return the expecte Store."+
...@@ -38,7 +38,7 @@ func TestNew(t *testing.T) { ...@@ -38,7 +38,7 @@ func TestNew(t *testing.T) {
// Tests happy path of Store.AddFirst(). // Tests happy path of Store.AddFirst().
func TestStore_AddFirst(t *testing.T) { func TestStore_AddFirst(t *testing.T) {
part := []byte("Test message.") part := []byte("Test message.")
s := New(versioned.NewKV(ekv.Memstore{})) s := NewOrLoad(versioned.NewKV(ekv.Memstore{}))
msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t), msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t),
message.XxMessage, 5, 0, 1, netTime.Now(), netTime.Now(), part, message.XxMessage, 5, 0, 1, netTime.Now(), netTime.Now(), part,
......
...@@ -56,9 +56,6 @@ func TestNewPartitioner(t *testing.T) { ...@@ -56,9 +56,6 @@ func TestNewPartitioner(t *testing.T) {
4088, p.partContentsSize) 4088, p.partContentsSize)
} }
if p.session != storeSession {
t.Errorf("session content mismatch")
}
} }
// Test that no error is returned running Partition // Test that no error is returned running Partition
......
package e2e
import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/e2e/ratchet/partner/session"
"gitlab.com/elixxir/client/network/historical"
"gitlab.com/elixxir/client/network/identity/receptionID"
"gitlab.com/elixxir/primitives/format"
)
type processor struct {
cy *session.Cypher
m *Manager
}
func (p *processor) Process(ecrMsg format.Message, receptionID receptionID.EphemeralIdentity,
round historical.Round) {
//ensure the key is used before returning
defer p.cy.Use()
//decrypt
msg, err := p.cy.Decrypt(ecrMsg)
if err != nil {
jww.ERROR.Printf("Decryption Failed of %s (fp: %s), dropping: %+v",
ecrMsg.Digest(), p.cy.Fingerprint(), err)
return
}
//Parse
sess := p.cy.GetSession()
message, done := p.m.partitioner.HandlePartition(sess.GetPartner(),
msg.GetContents(), sess.GetRelationshipFingerprint())
if done {
message.RecipientID = receptionID.Source
message.EphemeralID = receptionID.EphId
message.Round = round
message.Encrypted = true
p.m.Switchboard.Speak(message)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment