Skip to content
Snippets Groups Projects
Commit ab7a84fa authored by Jono Wenger's avatar Jono Wenger
Browse files

Merge remote-tracking branch 'origin/delayedRegistration' into delayedRegistration

parents e57a0436 876a8e2d
No related branches found
No related tags found
2 merge requests!510Release,!431made lower paralell regs unique to js and hacked increase in num registrations...
......@@ -188,6 +188,12 @@ func (c *Cmix) AddHealthCallback(nhc NetworkHealthCallback) int64 {
return int64(c.api.GetCmix().AddHealthCallback(nhc.Callback))
}
// IncreaseParallelNodeRegistration increases the number of parallel node
// registrations by num
func (c *Cmix) IncreaseParallelNodeRegistration(num int) error {
return c.api.IncreaseParallelNodeRegistration(num)
}
// RemoveHealthCallback removes a health callback using its registration ID.
func (c *Cmix) RemoveHealthCallback(funcID int64) {
c.api.GetCmix().RemoveHealthCallback(uint64(funcID))
......
......@@ -46,6 +46,11 @@ type registrar struct {
comms RegisterNodeCommsInterface
rng *fastRNG.StreamGenerator
inProgress sync.Map
// We are relying on the in progress check to ensure there is only a single
// operator at a time, as a result this is a map of ID -> int
attempts sync.Map
c chan network.NodeGateway
}
......@@ -91,22 +96,33 @@ func LoadRegistrar(session session, sender gateway.Sender,
func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable {
multi := stoppable.NewMulti("NodeRegistrations")
inProgress := &sync.Map{}
// We are relying on the in progress check to ensure there is only a single
// operator at a time, as a result this is a map of ID -> int
attempts := &sync.Map{}
for i := uint(0); i < numParallel; i++ {
stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i)))
go registerNodes(r, r.session, stop, inProgress, attempts)
go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts)
multi.Add(stop)
}
return multi
}
// IncreaseParallelNodeRegistration increases the number of parallel node
// registrations by num
func (r *registrar) IncreaseParallelNodeRegistration(numParallel uint) func() (stoppable.Stoppable, error) {
return func() (stoppable.Stoppable, error) {
multi := stoppable.NewMulti("NodeRegistrationsIncrease")
for i := uint(0); i < numParallel; i++ {
stop := stoppable.NewSingle("NodeRegistration Increase" + strconv.Itoa(int(i)))
go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts)
multi.Add(stop)
}
return multi, nil
}
}
// GetNodeKeys returns a MixCypher for the topology and a list of nodes it did
// not have a key for. If there are missing keys, then returns nil.
func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) {
......
......@@ -484,6 +484,14 @@ func (c *Cmix) GetNodeRegistrationStatus() (int, int, error) {
return numRegistered, len(nodes) - numStale, nil
}
// IncreaseParallelNodeRegistration increases the number of parallel node
// registrations by num
func (c *Cmix) IncreaseParallelNodeRegistration(num int) error {
jww.INFO.Printf("IncreaseParallelNodeRegistration(%d)", num)
svc := c.network.IncreaseParallelNodeRegistration(num)
return c.followerServices.add(svc)
}
// GetPreferredBins returns the geographic bin or bins that the provided two
// character country code is a part of.
func (c *Cmix) GetPreferredBins(countryCode string) ([]string, error) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment