Skip to content
Snippets Groups Projects
Commit be43294a authored by benjamin's avatar benjamin
Browse files

implemented, needs to be thread though and exposed

parent 36530a48
No related branches found
No related tags found
2 merge requests!510Release,!432Control node reg
...@@ -191,9 +191,13 @@ type Client interface { ...@@ -191,9 +191,13 @@ type Client interface {
AddService(clientID *id.ID, newService message.Service, AddService(clientID *id.ID, newService message.Service,
response message.Processor) response message.Processor)
// IncreaseParallelNodeRegistration increases the number of parallel node //PauseNodeRegistrations stops all node registrations
// registrations by num //and returns a function to resume them
IncreaseParallelNodeRegistration(num int) func() (stoppable.Stoppable, error) PauseNodeRegistrations(timeout time.Duration) error
// ChangeNumberOfNodeRegistrations changes the number of parallel node
// registrations up to the initialized maximum
ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error
// DeleteService deletes a message service. If only a single response is // DeleteService deletes a message service. If only a single response is
// associated with the preimage, the entire preimage is removed. If there is // associated with the preimage, the entire preimage is removed. If there is
......
...@@ -27,9 +27,13 @@ type Registrar interface { ...@@ -27,9 +27,13 @@ type Registrar interface {
// to register with nodes. // to register with nodes.
StartProcesses(numParallel uint) stoppable.Stoppable StartProcesses(numParallel uint) stoppable.Stoppable
// IncreaseParallelNodeRegistration increases the number of parallel node //PauseNodeRegistrations stops all node registrations
// registrations by num //and returns a function to resume them
IncreaseParallelNodeRegistration(num int) func() (stoppable.Stoppable, error) PauseNodeRegistrations(timeout time.Duration) error
// ChangeNumberOfNodeRegistrations changes the number of parallel node
// registrations up to the initialized maximum
ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error
// GetNodeKeys returns a MixCypher for the topology and a list of nodes it did // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did
// not have a key for. If there are missing keys, then returns nil. // not have a key for. If there are missing keys, then returns nil.
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/crypto/csprng"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
...@@ -33,14 +34,28 @@ import ( ...@@ -33,14 +34,28 @@ import (
func registerNodes(r *registrar, s session, stop *stoppable.Single, func registerNodes(r *registrar, s session, stop *stoppable.Single,
inProgress, attempts *sync.Map) { inProgress, attempts *sync.Map) {
atomic.AddInt64(r.numberRunning, 1)
interval := time.Duration(500) * time.Millisecond interval := time.Duration(500) * time.Millisecond
t := time.NewTicker(interval) t := time.NewTicker(interval)
for { for {
select { select {
case <-r.pauser:
atomic.AddInt64(r.numberRunning, -1)
select {
case <-stop.Quit():
// On a stop signal, close the thread
t.Stop()
stop.ToStopped()
return
case <-r.resumer:
atomic.AddInt64(r.numberRunning, 1)
}
case <-stop.Quit(): case <-stop.Quit():
// On a stop signal, close the thread // On a stop signal, close the thread
t.Stop() t.Stop()
stop.ToStopped() stop.ToStopped()
atomic.AddInt64(r.numberRunning, -1)
return return
case gw := <-r.c: case gw := <-r.c:
......
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
"gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/ndf"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
...@@ -51,6 +52,11 @@ type registrar struct { ...@@ -51,6 +52,11 @@ type registrar struct {
// operator at a time, as a result this is a map of ID -> int // operator at a time, as a result this is a map of ID -> int
attempts sync.Map attempts sync.Map
pauser chan interface{}
resumer chan interface{}
numberRunning *int64
maxRunning int
c chan network.NodeGateway c chan network.NodeGateway
} }
...@@ -60,10 +66,15 @@ func LoadRegistrar(session session, sender gateway.Sender, ...@@ -60,10 +66,15 @@ func LoadRegistrar(session session, sender gateway.Sender,
comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator, comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator,
c chan network.NodeGateway) (Registrar, error) { c chan network.NodeGateway) (Registrar, error) {
running := int64(0)
kv := session.GetKV().Prefix(prefix) kv := session.GetKV().Prefix(prefix)
r := &registrar{ r := &registrar{
nodes: make(map[id.ID]*key), nodes: make(map[id.ID]*key),
kv: kv, kv: kv,
pauser: make(chan interface{}),
resumer: make(chan interface{}),
numberRunning: &running,
} }
obj, err := kv.Get(storeKey, currentKeyVersion) obj, err := kv.Get(storeKey, currentKeyVersion)
...@@ -95,6 +106,7 @@ func LoadRegistrar(session session, sender gateway.Sender, ...@@ -95,6 +106,7 @@ func LoadRegistrar(session session, sender gateway.Sender,
// to register with nodes. // to register with nodes.
func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable {
multi := stoppable.NewMulti("NodeRegistrations") multi := stoppable.NewMulti("NodeRegistrations")
r.maxRunning = int(numParallel)
for i := uint(0); i < numParallel; i++ { for i := uint(0); i < numParallel; i++ {
stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i))) stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i)))
...@@ -106,21 +118,56 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { ...@@ -106,21 +118,56 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable {
return multi return multi
} }
// IncreaseParallelNodeRegistration increases the number of parallel node //PauseNodeRegistrations stops all node registrations
// registrations by num //and returns a function to resume them
func (r *registrar) IncreaseParallelNodeRegistration(numParallel int) func() (stoppable.Stoppable, error) { func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error {
return func() (stoppable.Stoppable, error) { timer := time.NewTimer(timeout)
multi := stoppable.NewMulti("NodeRegistrationsIncrease") defer timer.Stop()
numRegistrations := atomic.LoadInt64(r.numberRunning)
for i := int64(0); i < numRegistrations; i++ {
select {
case r.pauser <- struct{}{}:
case <-timer.C:
return errors.New("Timed out on pausing node registration")
}
}
return nil
}
for i := uint(0); i < uint(numParallel); i++ { // ChangeNumberOfNodeRegistrations changes the number of parallel node
stop := stoppable.NewSingle("NodeRegistration Increase" + strconv.Itoa(int(i))) // registrations up to the initialized maximum
func (r *registrar) ChangeNumberOfNodeRegistrations(toRun int,
timeout time.Duration) error {
numRunning := int(atomic.LoadInt64(r.numberRunning))
go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts) if toRun+numRunning > r.maxRunning {
multi.Add(stop) return errors.Errorf("Cannot change number of " +
"running node registration to number greater than the max")
}
timer := time.NewTimer(timeout)
defer timer.Stop()
if numRunning < toRun {
jww.INFO.Printf("ChangeNumberOfNodeRegistrations(%d) Reducing number "+
"of node registrations from %d to %d", numRunning, toRun)
for i := 0; i < toRun-numRunning; i++ {
select {
case r.pauser <- struct{}{}:
case <-timer.C:
return errors.New("Timed out on reducing node registration")
}
}
} else if numRunning > toRun {
jww.INFO.Printf("ChangeNumberOfNodeRegistrations(%d) Increasing number "+
"of node registrations from %d to %d", numRunning, toRun)
for i := 0; i < toRun-numRunning; i++ {
select {
case r.resumer <- struct{}{}:
case <-timer.C:
return errors.New("Timed out on increasing node registration")
}
} }
return multi, nil
} }
return nil
} }
// GetNodeKeys returns a MixCypher for the topology and a list of nodes it did // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment