From af3a616086b374b5c866ff24bfcec436eab9e833 Mon Sep 17 00:00:00 2001 From: benjamin <ben@elixxir.io> Date: Thu, 27 Oct 2022 13:07:10 -0700 Subject: [PATCH] minor fixes to node reg --- cmix/nodes/register.go | 7 ------- cmix/nodes/registrar.go | 11 ++++++++++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/cmix/nodes/register.go b/cmix/nodes/register.go index 72f9669c2..98b08d73f 100644 --- a/cmix/nodes/register.go +++ b/cmix/nodes/register.go @@ -36,24 +36,18 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single, atomic.AddInt64(r.numberRunning, 1) - interval := time.Duration(500) * time.Millisecond - t := time.NewTicker(interval) for { select { case <-r.pauser: atomic.AddInt64(r.numberRunning, -1) select { case <-stop.Quit(): - // On a stop signal, close the thread - t.Stop() stop.ToStopped() return case <-r.resumer: atomic.AddInt64(r.numberRunning, 1) } case <-stop.Quit(): - // On a stop signal, close the thread - t.Stop() stop.ToStopped() atomic.AddInt64(r.numberRunning, -1) return @@ -122,7 +116,6 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single, } } rng.Close() - case <-t.C: } } } diff --git a/cmix/nodes/registrar.go b/cmix/nodes/registrar.go index 77740debd..090d44093 100644 --- a/cmix/nodes/registrar.go +++ b/cmix/nodes/registrar.go @@ -57,6 +57,8 @@ type registrar struct { numberRunning *int64 maxRunning int + runnerLock sync.Mutex + c chan network.NodeGateway } @@ -105,6 +107,9 @@ func LoadRegistrar(session session, sender gateway.Sender, // StartProcesses initiates numParallel amount of threads // to register with nodes. func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { + r.runnerLock.Lock() + defer r.runnerLock.Unlock() + multi := stoppable.NewMulti("NodeRegistrations") r.maxRunning = int(numParallel) @@ -121,6 +126,8 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { //PauseNodeRegistrations stops all node registrations //and returns a function to resume them func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error { + r.runnerLock.Lock() + defer r.runnerLock.Unlock() timer := time.NewTimer(timeout) defer timer.Stop() numRegistrations := atomic.LoadInt64(r.numberRunning) @@ -128,7 +135,7 @@ func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error { select { case r.pauser <- struct{}{}: case <-timer.C: - return errors.New("Timed out on pausing node registration") + return errors.Errorf("Timed out on pausing node registration on %d", i) } } return nil @@ -138,6 +145,8 @@ func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error { // registrations up to the initialized maximum func (r *registrar) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + r.runnerLock.Lock() + defer r.runnerLock.Unlock() numRunning := int(atomic.LoadInt64(r.numberRunning)) if toRun+numRunning > r.maxRunning { -- GitLab