diff --git a/bindings/follow.go b/bindings/follow.go index 6f892e4151356b6bfa0906f2f746542aaae4c99e..4327964853f82e3f23146ecb3d017a5552e90146 100644 --- a/bindings/follow.go +++ b/bindings/follow.go @@ -188,6 +188,12 @@ func (c *Cmix) AddHealthCallback(nhc NetworkHealthCallback) int64 { return int64(c.api.GetCmix().AddHealthCallback(nhc.Callback)) } +// IncreaseParallelNodeRegistration increases the number of parallel node +// registrations by num +func (c *Cmix) IncreaseParallelNodeRegistration(num int) error { + return c.api.IncreaseParallelNodeRegistration(num) +} + // RemoveHealthCallback removes a health callback using its registration ID. func (c *Cmix) RemoveHealthCallback(funcID int64) { c.api.GetCmix().RemoveHealthCallback(uint64(funcID)) diff --git a/cmix/nodes/registrar.go b/cmix/nodes/registrar.go index 5ca5f108d201de75fcb739c43ea1bf2cea24951b..fedd7ddd338f2afe56139eaa8955f2e04ccd4d4d 100644 --- a/cmix/nodes/registrar.go +++ b/cmix/nodes/registrar.go @@ -46,6 +46,11 @@ type registrar struct { comms RegisterNodeCommsInterface rng *fastRNG.StreamGenerator + inProgress sync.Map + // We are relying on the in progress check to ensure there is only a single + // operator at a time, as a result this is a map of ID -> int + attempts sync.Map + c chan network.NodeGateway } @@ -91,22 +96,33 @@ func LoadRegistrar(session session, sender gateway.Sender, func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { multi := stoppable.NewMulti("NodeRegistrations") - inProgress := &sync.Map{} - - // We are relying on the in progress check to ensure there is only a single - // operator at a time, as a result this is a map of ID -> int - attempts := &sync.Map{} - for i := uint(0); i < numParallel; i++ { stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i))) - go registerNodes(r, r.session, stop, inProgress, attempts) + go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts) multi.Add(stop) } return multi } +// IncreaseParallelNodeRegistration increases the number of parallel node +// registrations by num +func (r *registrar) IncreaseParallelNodeRegistration(numParallel uint) func() (stoppable.Stoppable, error) { + return func() (stoppable.Stoppable, error) { + multi := stoppable.NewMulti("NodeRegistrationsIncrease") + + for i := uint(0); i < numParallel; i++ { + stop := stoppable.NewSingle("NodeRegistration Increase" + strconv.Itoa(int(i))) + + go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts) + multi.Add(stop) + } + + return multi, nil + } +} + // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did // not have a key for. If there are missing keys, then returns nil. func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) { diff --git a/xxdk/cmix.go b/xxdk/cmix.go index a2b8b8b9ad3c6e7cc77c6eb325fabba17084cff0..9ccf239dbaf6da0d00cef0bf7638090821a3ec2b 100644 --- a/xxdk/cmix.go +++ b/xxdk/cmix.go @@ -484,6 +484,14 @@ func (c *Cmix) GetNodeRegistrationStatus() (int, int, error) { return numRegistered, len(nodes) - numStale, nil } +// IncreaseParallelNodeRegistration increases the number of parallel node +// registrations by num +func (c *Cmix) IncreaseParallelNodeRegistration(num int) error { + jww.INFO.Printf("IncreaseParallelNodeRegistration(%d)", num) + svc := c.network.IncreaseParallelNodeRegistration(num) + return c.followerServices.add(svc) +} + // GetPreferredBins returns the geographic bin or bins that the provided two // character country code is a part of. func (c *Cmix) GetPreferredBins(countryCode string) ([]string, error) {