Skip to content
Snippets Groups Projects
Commit 876a8e2d authored by benjamin's avatar benjamin
Browse files

added the system to increase the number of paralell node registrations

parent bcac57af
No related branches found
No related tags found
2 merge requests!510Release,!431made lower paralell regs unique to js and hacked increase in num registrations...
...@@ -188,6 +188,12 @@ func (c *Cmix) AddHealthCallback(nhc NetworkHealthCallback) int64 { ...@@ -188,6 +188,12 @@ func (c *Cmix) AddHealthCallback(nhc NetworkHealthCallback) int64 {
return int64(c.api.GetCmix().AddHealthCallback(nhc.Callback)) return int64(c.api.GetCmix().AddHealthCallback(nhc.Callback))
} }
// IncreaseParallelNodeRegistration increases the number of parallel node
// registrations by num
func (c *Cmix) IncreaseParallelNodeRegistration(num int) error {
return c.api.IncreaseParallelNodeRegistration(num)
}
// RemoveHealthCallback removes a health callback using its registration ID. // RemoveHealthCallback removes a health callback using its registration ID.
func (c *Cmix) RemoveHealthCallback(funcID int64) { func (c *Cmix) RemoveHealthCallback(funcID int64) {
c.api.GetCmix().RemoveHealthCallback(uint64(funcID)) c.api.GetCmix().RemoveHealthCallback(uint64(funcID))
......
...@@ -46,6 +46,11 @@ type registrar struct { ...@@ -46,6 +46,11 @@ type registrar struct {
comms RegisterNodeCommsInterface comms RegisterNodeCommsInterface
rng *fastRNG.StreamGenerator rng *fastRNG.StreamGenerator
inProgress sync.Map
// We are relying on the in progress check to ensure there is only a single
// operator at a time, as a result this is a map of ID -> int
attempts sync.Map
c chan network.NodeGateway c chan network.NodeGateway
} }
...@@ -91,22 +96,33 @@ func LoadRegistrar(session session, sender gateway.Sender, ...@@ -91,22 +96,33 @@ func LoadRegistrar(session session, sender gateway.Sender,
func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable {
multi := stoppable.NewMulti("NodeRegistrations") multi := stoppable.NewMulti("NodeRegistrations")
inProgress := &sync.Map{}
// We are relying on the in progress check to ensure there is only a single
// operator at a time, as a result this is a map of ID -> int
attempts := &sync.Map{}
for i := uint(0); i < numParallel; i++ { for i := uint(0); i < numParallel; i++ {
stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i))) stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i)))
go registerNodes(r, r.session, stop, inProgress, attempts) go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts)
multi.Add(stop) multi.Add(stop)
} }
return multi return multi
} }
// IncreaseParallelNodeRegistration increases the number of parallel node
// registrations by num
func (r *registrar) IncreaseParallelNodeRegistration(numParallel uint) func() (stoppable.Stoppable, error) {
return func() (stoppable.Stoppable, error) {
multi := stoppable.NewMulti("NodeRegistrationsIncrease")
for i := uint(0); i < numParallel; i++ {
stop := stoppable.NewSingle("NodeRegistration Increase" + strconv.Itoa(int(i)))
go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts)
multi.Add(stop)
}
return multi, nil
}
}
// GetNodeKeys returns a MixCypher for the topology and a list of nodes it did // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did
// not have a key for. If there are missing keys, then returns nil. // not have a key for. If there are missing keys, then returns nil.
func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) { func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) {
......
...@@ -484,6 +484,14 @@ func (c *Cmix) GetNodeRegistrationStatus() (int, int, error) { ...@@ -484,6 +484,14 @@ func (c *Cmix) GetNodeRegistrationStatus() (int, int, error) {
return numRegistered, len(nodes) - numStale, nil return numRegistered, len(nodes) - numStale, nil
} }
// IncreaseParallelNodeRegistration increases the number of parallel node
// registrations by num
func (c *Cmix) IncreaseParallelNodeRegistration(num int) error {
jww.INFO.Printf("IncreaseParallelNodeRegistration(%d)", num)
svc := c.network.IncreaseParallelNodeRegistration(num)
return c.followerServices.add(svc)
}
// GetPreferredBins returns the geographic bin or bins that the provided two // GetPreferredBins returns the geographic bin or bins that the provided two
// character country code is a part of. // character country code is a part of.
func (c *Cmix) GetPreferredBins(countryCode string) ([]string, error) { func (c *Cmix) GetPreferredBins(countryCode string) ([]string, error) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment