Skip to content
Snippets Groups Projects
Commit af3a6160 authored by benjamin's avatar benjamin
Browse files

minor fixes to node reg

parent 9ba01110
No related branches found
No related tags found
2 merge requests!510Release,!432Control node reg
......@@ -36,24 +36,18 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single,
atomic.AddInt64(r.numberRunning, 1)
interval := time.Duration(500) * time.Millisecond
t := time.NewTicker(interval)
for {
select {
case <-r.pauser:
atomic.AddInt64(r.numberRunning, -1)
select {
case <-stop.Quit():
// On a stop signal, close the thread
t.Stop()
stop.ToStopped()
return
case <-r.resumer:
atomic.AddInt64(r.numberRunning, 1)
}
case <-stop.Quit():
// On a stop signal, close the thread
t.Stop()
stop.ToStopped()
atomic.AddInt64(r.numberRunning, -1)
return
......@@ -122,7 +116,6 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single,
}
}
rng.Close()
case <-t.C:
}
}
}
......
......@@ -57,6 +57,8 @@ type registrar struct {
numberRunning *int64
maxRunning int
runnerLock sync.Mutex
c chan network.NodeGateway
}
......@@ -105,6 +107,9 @@ func LoadRegistrar(session session, sender gateway.Sender,
// StartProcesses initiates numParallel amount of threads
// to register with nodes.
func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable {
r.runnerLock.Lock()
defer r.runnerLock.Unlock()
multi := stoppable.NewMulti("NodeRegistrations")
r.maxRunning = int(numParallel)
......@@ -121,6 +126,8 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable {
//PauseNodeRegistrations stops all node registrations
//and returns a function to resume them
func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error {
r.runnerLock.Lock()
defer r.runnerLock.Unlock()
timer := time.NewTimer(timeout)
defer timer.Stop()
numRegistrations := atomic.LoadInt64(r.numberRunning)
......@@ -128,7 +135,7 @@ func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error {
select {
case r.pauser <- struct{}{}:
case <-timer.C:
return errors.New("Timed out on pausing node registration")
return errors.Errorf("Timed out on pausing node registration on %d", i)
}
}
return nil
......@@ -138,6 +145,8 @@ func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error {
// registrations up to the initialized maximum
func (r *registrar) ChangeNumberOfNodeRegistrations(toRun int,
timeout time.Duration) error {
r.runnerLock.Lock()
defer r.runnerLock.Unlock()
numRunning := int(atomic.LoadInt64(r.numberRunning))
if toRun+numRunning > r.maxRunning {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment