diff --git a/e2e/manager.go b/e2e/manager.go index cbfcddd6ec0d4b4126983d800e96dce03b25a375..ceac6b8c95944ba7afe7e15c373f903b3336eff6 100644 --- a/e2e/manager.go +++ b/e2e/manager.go @@ -27,7 +27,7 @@ import ( type manager struct { *ratchet.Ratchet *receive.Switchboard - partitioner parse.Partitioner + partitioner *parse.Partitioner net cmix.Client myID *id.ID rng *fastRNG.StreamGenerator @@ -35,6 +35,7 @@ type manager struct { grp *cyclic.Group crit *critical rekeyParams rekey.Params + kv *versioned.KV } const e2eRekeyParamsKey = "e2eRekeyParams" @@ -69,6 +70,8 @@ func initE2E(kv *versioned.KV, myID *id.ID, privKey *cyclic.Int, // Load returns an e2e manager from storage. It uses an ID to prefix the kv // and is used for partner relationships. // You can use a memkv for an ephemeral e2e id +// Can be initialized with a nil cmix.Client, but will crash on start - use when +// prebuilding e2e identity to be used later func Load(kv *versioned.KV, net cmix.Client, myID *id.ID, grp *cyclic.Group, rng *fastRNG.StreamGenerator, events event.Reporter) (Handler, error) { @@ -82,6 +85,8 @@ func Load(kv *versioned.KV, net cmix.Client, myID *id.ID, // Does not modify the kv prefix in any way to maintain backwards compatibility // before multiple IDs were supported // You can use a memkv for an ephemeral e2e id +// Can be initialized with a nil cmix.Client, but will crash on start - use when +// prebuilding e2e identity to be used later func LoadLegacy(kv *versioned.KV, net cmix.Client, myID *id.ID, grp *cyclic.Group, rng *fastRNG.StreamGenerator, events event.Reporter, params rekey.Params) (Handler, error) { @@ -118,12 +123,12 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID, m := &manager{ Switchboard: receive.New(), - partitioner: parse.NewPartitioner(kv, net.GetMaxMessageLength()), net: net, myID: myDefaultID, events: events, grp: grp, rekeyParams: rekey.Params{}, + kv: kv, } var err error @@ -144,14 +149,20 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID, "Failed to unmarshal rekeyParams data") } - m.crit = newCritical(kv, net.AddHealthCallback, m.SendE2E) - return m, nil } func (m *manager) StartProcesses() (stoppable.Stoppable, error) { multi := stoppable.NewMulti("e2eManager") + if m.partitioner == nil { + m.partitioner = parse.NewPartitioner(m.kv, m.net.GetMaxMessageLength()) + } + + if m.crit == nil { + m.crit = newCritical(m.kv, m.net.AddHealthCallback, m.SendE2E) + } + critcalNetworkStopper := stoppable.NewSingle( "e2eCriticalMessagesStopper") m.crit.runCriticalMessages(critcalNetworkStopper, diff --git a/e2e/parse/partition.go b/e2e/parse/partition.go index c0f5c0b95338bd6044063fbe7cecbac114fbbd3f..da7628381175971f748d456761ccc836345bd227 100644 --- a/e2e/parse/partition.go +++ b/e2e/parse/partition.go @@ -32,7 +32,7 @@ type Partitioner struct { partition *partition.Store } -func NewPartitioner(kv *versioned.KV, messageSize int) Partitioner { +func NewPartitioner(kv *versioned.KV, messageSize int) *Partitioner { p := Partitioner{ baseMessageSize: messageSize, firstContentsSize: messageSize - firstHeaderLen, @@ -43,10 +43,10 @@ func NewPartitioner(kv *versioned.KV, messageSize int) Partitioner { } p.maxSize = p.firstContentsSize + (MaxMessageParts-1)*p.partContentsSize - return p + return &p } -func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType, +func (p *Partitioner) Partition(recipient *id.ID, mt catalog.MessageType, timestamp time.Time, payload []byte) ([][]byte, uint64, error) { if len(payload) > p.maxSize { @@ -77,7 +77,7 @@ func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType, return parts, fullMessageID, nil } -func (p Partitioner) HandlePartition(sender *id.ID, +func (p *Partitioner) HandlePartition(sender *id.ID, contents []byte, relationshipFingerprint []byte) (receive.Message, bool) { if isFirst(contents) { @@ -106,19 +106,19 @@ func (p Partitioner) HandlePartition(sender *id.ID, // FirstPartitionSize returns the max partition payload size for the // first payload -func (p Partitioner) FirstPartitionSize() uint { +func (p *Partitioner) FirstPartitionSize() uint { return uint(p.firstContentsSize) } // SecondPartitionSize returns the max partition payload size for all // payloads after the first payload -func (p Partitioner) SecondPartitionSize() uint { +func (p *Partitioner) SecondPartitionSize() uint { return uint(p.partContentsSize) } // PayloadSize Returns the max payload size for a partitionable E2E // message -func (p Partitioner) PayloadSize() uint { +func (p *Partitioner) PayloadSize() uint { return uint(p.maxSize) }