From 876a8e2dc807f1d7f7ba5d169fe5cff17c38a2da Mon Sep 17 00:00:00 2001 From: benjamin <ben@elixxir.io> Date: Wed, 26 Oct 2022 20:38:42 -0700 Subject: [PATCH] added the system to increase the number of paralell node registrations --- bindings/follow.go | 6 ++++++ cmix/nodes/registrar.go | 30 +++++++++++++++++++++++------- xxdk/cmix.go | 8 ++++++++ 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/bindings/follow.go b/bindings/follow.go index 6f892e415..432796485 100644 --- a/bindings/follow.go +++ b/bindings/follow.go @@ -188,6 +188,12 @@ func (c *Cmix) AddHealthCallback(nhc NetworkHealthCallback) int64 { return int64(c.api.GetCmix().AddHealthCallback(nhc.Callback)) } +// IncreaseParallelNodeRegistration increases the number of parallel node +// registrations by num +func (c *Cmix) IncreaseParallelNodeRegistration(num int) error { + return c.api.IncreaseParallelNodeRegistration(num) +} + // RemoveHealthCallback removes a health callback using its registration ID. func (c *Cmix) RemoveHealthCallback(funcID int64) { c.api.GetCmix().RemoveHealthCallback(uint64(funcID)) diff --git a/cmix/nodes/registrar.go b/cmix/nodes/registrar.go index 5ca5f108d..fedd7ddd3 100644 --- a/cmix/nodes/registrar.go +++ b/cmix/nodes/registrar.go @@ -46,6 +46,11 @@ type registrar struct { comms RegisterNodeCommsInterface rng *fastRNG.StreamGenerator + inProgress sync.Map + // We are relying on the in progress check to ensure there is only a single + // operator at a time, as a result this is a map of ID -> int + attempts sync.Map + c chan network.NodeGateway } @@ -91,22 +96,33 @@ func LoadRegistrar(session session, sender gateway.Sender, func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { multi := stoppable.NewMulti("NodeRegistrations") - inProgress := &sync.Map{} - - // We are relying on the in progress check to ensure there is only a single - // operator at a time, as a result this is a map of ID -> int - attempts := &sync.Map{} - for i := uint(0); i < numParallel; i++ { stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i))) - go registerNodes(r, r.session, stop, inProgress, attempts) + go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts) multi.Add(stop) } return multi } +// IncreaseParallelNodeRegistration increases the number of parallel node +// registrations by num +func (r *registrar) IncreaseParallelNodeRegistration(numParallel uint) func() (stoppable.Stoppable, error) { + return func() (stoppable.Stoppable, error) { + multi := stoppable.NewMulti("NodeRegistrationsIncrease") + + for i := uint(0); i < numParallel; i++ { + stop := stoppable.NewSingle("NodeRegistration Increase" + strconv.Itoa(int(i))) + + go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts) + multi.Add(stop) + } + + return multi, nil + } +} + // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did // not have a key for. If there are missing keys, then returns nil. func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) { diff --git a/xxdk/cmix.go b/xxdk/cmix.go index a2b8b8b9a..9ccf239db 100644 --- a/xxdk/cmix.go +++ b/xxdk/cmix.go @@ -484,6 +484,14 @@ func (c *Cmix) GetNodeRegistrationStatus() (int, int, error) { return numRegistered, len(nodes) - numStale, nil } +// IncreaseParallelNodeRegistration increases the number of parallel node +// registrations by num +func (c *Cmix) IncreaseParallelNodeRegistration(num int) error { + jww.INFO.Printf("IncreaseParallelNodeRegistration(%d)", num) + svc := c.network.IncreaseParallelNodeRegistration(num) + return c.followerServices.add(svc) +} + // GetPreferredBins returns the geographic bin or bins that the provided two // character country code is a part of. func (c *Cmix) GetPreferredBins(countryCode string) ([]string, error) { -- GitLab