Skip to content
Snippets Groups Projects
Commit 684b11b3 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

made e2e object capable of initlizing without a cmix.client

parent 7411854d
Branches
Tags
2 merge requests!510Release,!250Hotfix/refactor connect
...@@ -27,7 +27,7 @@ import ( ...@@ -27,7 +27,7 @@ import (
type manager struct { type manager struct {
*ratchet.Ratchet *ratchet.Ratchet
*receive.Switchboard *receive.Switchboard
partitioner parse.Partitioner partitioner *parse.Partitioner
net cmix.Client net cmix.Client
myID *id.ID myID *id.ID
rng *fastRNG.StreamGenerator rng *fastRNG.StreamGenerator
...@@ -35,6 +35,7 @@ type manager struct { ...@@ -35,6 +35,7 @@ type manager struct {
grp *cyclic.Group grp *cyclic.Group
crit *critical crit *critical
rekeyParams rekey.Params rekeyParams rekey.Params
kv *versioned.KV
} }
const e2eRekeyParamsKey = "e2eRekeyParams" const e2eRekeyParamsKey = "e2eRekeyParams"
...@@ -69,6 +70,8 @@ func initE2E(kv *versioned.KV, myID *id.ID, privKey *cyclic.Int, ...@@ -69,6 +70,8 @@ func initE2E(kv *versioned.KV, myID *id.ID, privKey *cyclic.Int,
// Load returns an e2e manager from storage. It uses an ID to prefix the kv // Load returns an e2e manager from storage. It uses an ID to prefix the kv
// and is used for partner relationships. // and is used for partner relationships.
// You can use a memkv for an ephemeral e2e id // You can use a memkv for an ephemeral e2e id
// Can be initialized with a nil cmix.Client, but will crash on start - use when
// prebuilding e2e identity to be used later
func Load(kv *versioned.KV, net cmix.Client, myID *id.ID, func Load(kv *versioned.KV, net cmix.Client, myID *id.ID,
grp *cyclic.Group, rng *fastRNG.StreamGenerator, grp *cyclic.Group, rng *fastRNG.StreamGenerator,
events event.Reporter) (Handler, error) { events event.Reporter) (Handler, error) {
...@@ -82,6 +85,8 @@ func Load(kv *versioned.KV, net cmix.Client, myID *id.ID, ...@@ -82,6 +85,8 @@ func Load(kv *versioned.KV, net cmix.Client, myID *id.ID,
// Does not modify the kv prefix in any way to maintain backwards compatibility // Does not modify the kv prefix in any way to maintain backwards compatibility
// before multiple IDs were supported // before multiple IDs were supported
// You can use a memkv for an ephemeral e2e id // You can use a memkv for an ephemeral e2e id
// Can be initialized with a nil cmix.Client, but will crash on start - use when
// prebuilding e2e identity to be used later
func LoadLegacy(kv *versioned.KV, net cmix.Client, myID *id.ID, func LoadLegacy(kv *versioned.KV, net cmix.Client, myID *id.ID,
grp *cyclic.Group, rng *fastRNG.StreamGenerator, grp *cyclic.Group, rng *fastRNG.StreamGenerator,
events event.Reporter, params rekey.Params) (Handler, error) { events event.Reporter, params rekey.Params) (Handler, error) {
...@@ -118,12 +123,12 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID, ...@@ -118,12 +123,12 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID,
m := &manager{ m := &manager{
Switchboard: receive.New(), Switchboard: receive.New(),
partitioner: parse.NewPartitioner(kv, net.GetMaxMessageLength()),
net: net, net: net,
myID: myDefaultID, myID: myDefaultID,
events: events, events: events,
grp: grp, grp: grp,
rekeyParams: rekey.Params{}, rekeyParams: rekey.Params{},
kv: kv,
} }
var err error var err error
...@@ -144,14 +149,20 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID, ...@@ -144,14 +149,20 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID,
"Failed to unmarshal rekeyParams data") "Failed to unmarshal rekeyParams data")
} }
m.crit = newCritical(kv, net.AddHealthCallback, m.SendE2E)
return m, nil return m, nil
} }
func (m *manager) StartProcesses() (stoppable.Stoppable, error) { func (m *manager) StartProcesses() (stoppable.Stoppable, error) {
multi := stoppable.NewMulti("e2eManager") multi := stoppable.NewMulti("e2eManager")
if m.partitioner == nil {
m.partitioner = parse.NewPartitioner(m.kv, m.net.GetMaxMessageLength())
}
if m.crit == nil {
m.crit = newCritical(m.kv, m.net.AddHealthCallback, m.SendE2E)
}
critcalNetworkStopper := stoppable.NewSingle( critcalNetworkStopper := stoppable.NewSingle(
"e2eCriticalMessagesStopper") "e2eCriticalMessagesStopper")
m.crit.runCriticalMessages(critcalNetworkStopper, m.crit.runCriticalMessages(critcalNetworkStopper,
......
...@@ -32,7 +32,7 @@ type Partitioner struct { ...@@ -32,7 +32,7 @@ type Partitioner struct {
partition *partition.Store partition *partition.Store
} }
func NewPartitioner(kv *versioned.KV, messageSize int) Partitioner { func NewPartitioner(kv *versioned.KV, messageSize int) *Partitioner {
p := Partitioner{ p := Partitioner{
baseMessageSize: messageSize, baseMessageSize: messageSize,
firstContentsSize: messageSize - firstHeaderLen, firstContentsSize: messageSize - firstHeaderLen,
...@@ -43,10 +43,10 @@ func NewPartitioner(kv *versioned.KV, messageSize int) Partitioner { ...@@ -43,10 +43,10 @@ func NewPartitioner(kv *versioned.KV, messageSize int) Partitioner {
} }
p.maxSize = p.firstContentsSize + (MaxMessageParts-1)*p.partContentsSize p.maxSize = p.firstContentsSize + (MaxMessageParts-1)*p.partContentsSize
return p return &p
} }
func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType, func (p *Partitioner) Partition(recipient *id.ID, mt catalog.MessageType,
timestamp time.Time, payload []byte) ([][]byte, uint64, error) { timestamp time.Time, payload []byte) ([][]byte, uint64, error) {
if len(payload) > p.maxSize { if len(payload) > p.maxSize {
...@@ -77,7 +77,7 @@ func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType, ...@@ -77,7 +77,7 @@ func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType,
return parts, fullMessageID, nil return parts, fullMessageID, nil
} }
func (p Partitioner) HandlePartition(sender *id.ID, func (p *Partitioner) HandlePartition(sender *id.ID,
contents []byte, relationshipFingerprint []byte) (receive.Message, bool) { contents []byte, relationshipFingerprint []byte) (receive.Message, bool) {
if isFirst(contents) { if isFirst(contents) {
...@@ -106,19 +106,19 @@ func (p Partitioner) HandlePartition(sender *id.ID, ...@@ -106,19 +106,19 @@ func (p Partitioner) HandlePartition(sender *id.ID,
// FirstPartitionSize returns the max partition payload size for the // FirstPartitionSize returns the max partition payload size for the
// first payload // first payload
func (p Partitioner) FirstPartitionSize() uint { func (p *Partitioner) FirstPartitionSize() uint {
return uint(p.firstContentsSize) return uint(p.firstContentsSize)
} }
// SecondPartitionSize returns the max partition payload size for all // SecondPartitionSize returns the max partition payload size for all
// payloads after the first payload // payloads after the first payload
func (p Partitioner) SecondPartitionSize() uint { func (p *Partitioner) SecondPartitionSize() uint {
return uint(p.partContentsSize) return uint(p.partContentsSize)
} }
// PayloadSize Returns the max payload size for a partitionable E2E // PayloadSize Returns the max payload size for a partitionable E2E
// message // message
func (p Partitioner) PayloadSize() uint { func (p *Partitioner) PayloadSize() uint {
return uint(p.maxSize) return uint(p.maxSize)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment