diff --git a/cmix/nodes/register.go b/cmix/nodes/register.go index 72f9669c236453221a3a3c2dee6484c03458ee98..98b08d73fb0486f103d775685734bcf2d35d8b5e 100644 --- a/cmix/nodes/register.go +++ b/cmix/nodes/register.go @@ -36,24 +36,18 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single, atomic.AddInt64(r.numberRunning, 1) - interval := time.Duration(500) * time.Millisecond - t := time.NewTicker(interval) for { select { case <-r.pauser: atomic.AddInt64(r.numberRunning, -1) select { case <-stop.Quit(): - // On a stop signal, close the thread - t.Stop() stop.ToStopped() return case <-r.resumer: atomic.AddInt64(r.numberRunning, 1) } case <-stop.Quit(): - // On a stop signal, close the thread - t.Stop() stop.ToStopped() atomic.AddInt64(r.numberRunning, -1) return @@ -122,7 +116,6 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single, } } rng.Close() - case <-t.C: } } } diff --git a/cmix/nodes/registrar.go b/cmix/nodes/registrar.go index 77740debdc61c6053a0207aa775d8fa2fa6b84ac..090d44093d0335bc617d5ea217459cc8bbd456b0 100644 --- a/cmix/nodes/registrar.go +++ b/cmix/nodes/registrar.go @@ -57,6 +57,8 @@ type registrar struct { numberRunning *int64 maxRunning int + runnerLock sync.Mutex + c chan network.NodeGateway } @@ -105,6 +107,9 @@ func LoadRegistrar(session session, sender gateway.Sender, // StartProcesses initiates numParallel amount of threads // to register with nodes. func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { + r.runnerLock.Lock() + defer r.runnerLock.Unlock() + multi := stoppable.NewMulti("NodeRegistrations") r.maxRunning = int(numParallel) @@ -121,6 +126,8 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { //PauseNodeRegistrations stops all node registrations //and returns a function to resume them func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error { + r.runnerLock.Lock() + defer r.runnerLock.Unlock() timer := time.NewTimer(timeout) defer timer.Stop() numRegistrations := atomic.LoadInt64(r.numberRunning) @@ -128,7 +135,7 @@ func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error { select { case r.pauser <- struct{}{}: case <-timer.C: - return errors.New("Timed out on pausing node registration") + return errors.Errorf("Timed out on pausing node registration on %d", i) } } return nil @@ -138,6 +145,8 @@ func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error { // registrations up to the initialized maximum func (r *registrar) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + r.runnerLock.Lock() + defer r.runnerLock.Unlock() numRunning := int(atomic.LoadInt64(r.numberRunning)) if toRun+numRunning > r.maxRunning {