From be43294aaf563adcc0aaf538258dc13f2cdbb99a Mon Sep 17 00:00:00 2001 From: benjamin <ben@elixxir.io> Date: Thu, 27 Oct 2022 10:17:48 -0700 Subject: [PATCH] implemented, needs to be thread though and exposed --- cmix/interface.go | 10 ++++-- cmix/nodes/interfaces.go | 10 ++++-- cmix/nodes/register.go | 15 +++++++++ cmix/nodes/registrar.go | 73 +++++++++++++++++++++++++++++++++------- 4 files changed, 89 insertions(+), 19 deletions(-) diff --git a/cmix/interface.go b/cmix/interface.go index 0e01001ba..2f1ee3db8 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -191,9 +191,13 @@ type Client interface { AddService(clientID *id.ID, newService message.Service, response message.Processor) - // IncreaseParallelNodeRegistration increases the number of parallel node - // registrations by num - IncreaseParallelNodeRegistration(num int) func() (stoppable.Stoppable, error) + //PauseNodeRegistrations stops all node registrations + //and returns a function to resume them + PauseNodeRegistrations(timeout time.Duration) error + + // ChangeNumberOfNodeRegistrations changes the number of parallel node + // registrations up to the initialized maximum + ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error // DeleteService deletes a message service. If only a single response is // associated with the preimage, the entire preimage is removed. If there is diff --git a/cmix/nodes/interfaces.go b/cmix/nodes/interfaces.go index ddce4b8f2..77520a2ec 100644 --- a/cmix/nodes/interfaces.go +++ b/cmix/nodes/interfaces.go @@ -27,9 +27,13 @@ type Registrar interface { // to register with nodes. StartProcesses(numParallel uint) stoppable.Stoppable - // IncreaseParallelNodeRegistration increases the number of parallel node - // registrations by num - IncreaseParallelNodeRegistration(num int) func() (stoppable.Stoppable, error) + //PauseNodeRegistrations stops all node registrations + //and returns a function to resume them + PauseNodeRegistrations(timeout time.Duration) error + + // ChangeNumberOfNodeRegistrations changes the number of parallel node + // registrations up to the initialized maximum + ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did // not have a key for. If there are missing keys, then returns nil. diff --git a/cmix/nodes/register.go b/cmix/nodes/register.go index 8cada7d12..72f9669c2 100644 --- a/cmix/nodes/register.go +++ b/cmix/nodes/register.go @@ -13,6 +13,7 @@ import ( "gitlab.com/xx_network/crypto/csprng" "strconv" "sync" + "sync/atomic" "time" "github.com/pkg/errors" @@ -33,14 +34,28 @@ import ( func registerNodes(r *registrar, s session, stop *stoppable.Single, inProgress, attempts *sync.Map) { + atomic.AddInt64(r.numberRunning, 1) + interval := time.Duration(500) * time.Millisecond t := time.NewTicker(interval) for { select { + case <-r.pauser: + atomic.AddInt64(r.numberRunning, -1) + select { + case <-stop.Quit(): + // On a stop signal, close the thread + t.Stop() + stop.ToStopped() + return + case <-r.resumer: + atomic.AddInt64(r.numberRunning, 1) + } case <-stop.Quit(): // On a stop signal, close the thread t.Stop() stop.ToStopped() + atomic.AddInt64(r.numberRunning, -1) return case gw := <-r.c: diff --git a/cmix/nodes/registrar.go b/cmix/nodes/registrar.go index 0c942b8de..b76907351 100644 --- a/cmix/nodes/registrar.go +++ b/cmix/nodes/registrar.go @@ -20,6 +20,7 @@ import ( "gitlab.com/xx_network/primitives/ndf" "strconv" "sync" + "sync/atomic" "time" ) @@ -51,6 +52,11 @@ type registrar struct { // operator at a time, as a result this is a map of ID -> int attempts sync.Map + pauser chan interface{} + resumer chan interface{} + numberRunning *int64 + maxRunning int + c chan network.NodeGateway } @@ -60,10 +66,15 @@ func LoadRegistrar(session session, sender gateway.Sender, comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator, c chan network.NodeGateway) (Registrar, error) { + running := int64(0) + kv := session.GetKV().Prefix(prefix) r := ®istrar{ - nodes: make(map[id.ID]*key), - kv: kv, + nodes: make(map[id.ID]*key), + kv: kv, + pauser: make(chan interface{}), + resumer: make(chan interface{}), + numberRunning: &running, } obj, err := kv.Get(storeKey, currentKeyVersion) @@ -95,6 +106,7 @@ func LoadRegistrar(session session, sender gateway.Sender, // to register with nodes. func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { multi := stoppable.NewMulti("NodeRegistrations") + r.maxRunning = int(numParallel) for i := uint(0); i < numParallel; i++ { stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i))) @@ -106,21 +118,56 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { return multi } -// IncreaseParallelNodeRegistration increases the number of parallel node -// registrations by num -func (r *registrar) IncreaseParallelNodeRegistration(numParallel int) func() (stoppable.Stoppable, error) { - return func() (stoppable.Stoppable, error) { - multi := stoppable.NewMulti("NodeRegistrationsIncrease") +//PauseNodeRegistrations stops all node registrations +//and returns a function to resume them +func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + numRegistrations := atomic.LoadInt64(r.numberRunning) + for i := int64(0); i < numRegistrations; i++ { + select { + case r.pauser <- struct{}{}: + case <-timer.C: + return errors.New("Timed out on pausing node registration") + } + } + return nil +} - for i := uint(0); i < uint(numParallel); i++ { - stop := stoppable.NewSingle("NodeRegistration Increase" + strconv.Itoa(int(i))) +// ChangeNumberOfNodeRegistrations changes the number of parallel node +// registrations up to the initialized maximum +func (r *registrar) ChangeNumberOfNodeRegistrations(toRun int, + timeout time.Duration) error { + numRunning := int(atomic.LoadInt64(r.numberRunning)) - go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts) - multi.Add(stop) + if toRun+numRunning > r.maxRunning { + return errors.Errorf("Cannot change number of " + + "running node registration to number greater than the max") + } + timer := time.NewTimer(timeout) + defer timer.Stop() + if numRunning < toRun { + jww.INFO.Printf("ChangeNumberOfNodeRegistrations(%d) Reducing number "+ + "of node registrations from %d to %d", numRunning, toRun) + for i := 0; i < toRun-numRunning; i++ { + select { + case r.pauser <- struct{}{}: + case <-timer.C: + return errors.New("Timed out on reducing node registration") + } + } + } else if numRunning > toRun { + jww.INFO.Printf("ChangeNumberOfNodeRegistrations(%d) Increasing number "+ + "of node registrations from %d to %d", numRunning, toRun) + for i := 0; i < toRun-numRunning; i++ { + select { + case r.resumer <- struct{}{}: + case <-timer.C: + return errors.New("Timed out on increasing node registration") + } } - - return multi, nil } + return nil } // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did -- GitLab