diff --git a/interfaces/params/network.go b/interfaces/params/network.go index f3de8ea10ee17e5408c2b72d335e8b2ea884e428..b73efc7a3ee56c5d2f24d18643194a22852dbc92 100644 --- a/interfaces/params/network.go +++ b/interfaces/params/network.go @@ -21,6 +21,8 @@ type Network struct { // Longest delay between network events for Health tracker to denote that // the network is in a bad state NetworkHealthTimeout time.Duration + //Number of parallel node registration the client is capable of + ParallelNodeRegistrations uint Rounds Messages @@ -36,6 +38,7 @@ func GetDefaultNetwork() Network { RegNodesBufferLen: 500, NetworkHealthTimeout: 30 * time.Second, E2EParams: GetDefaultE2ESessionParams(), + ParallelNodeRegistrations: 8, } n.Rounds = GetDefaultRounds() n.Messages = GetDefaultMessage() diff --git a/network/manager.go b/network/manager.go index 8e929f2e94e0272312911d976e4062f4e2332fa8..a9c9f1ea33db2ae667f85c8d0f0c6f11d352e18e 100644 --- a/network/manager.go +++ b/network/manager.go @@ -124,7 +124,7 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab // Node Updates multi.Add(node.StartRegistration(m.Instance, m.Session, m.Rng, - m.Comms, m.NodeRegistration)) // Adding/Keys + m.Comms, m.NodeRegistration, m.param.ParallelNodeRegistrations)) // Adding/Keys //TODO-remover //m.runners.Add(StartNodeRemover(m.Context)) // Removing diff --git a/network/node/register.go b/network/node/register.go index 9393ec97e3508e1db5b10dc3fae53ac20269e0de..fa1690aa2ac19230dc8284b9996cd7d158f51f11 100644 --- a/network/node/register.go +++ b/network/node/register.go @@ -41,14 +41,20 @@ type RegisterNodeCommsInterface interface { } func StartRegistration(instance *network.Instance, session *storage.Session, rngGen *fastRNG.StreamGenerator, comms RegisterNodeCommsInterface, - c chan network.NodeGateway) stoppable.Stoppable { - stop := stoppable.NewSingle("NodeRegistration") + c chan network.NodeGateway, numParallel uint) stoppable.Stoppable { + + multi := stoppable.NewMulti("NodeRegistrations") instance.SetAddGatewayChan(c) - go registerNodes(session, rngGen, comms, stop, c) + for i:=uint(0);i<numParallel;i++{ + stop := stoppable.NewSingle(fmt.Sprintf("NodeRegistration %d", i)) + + go registerNodes(session, rngGen, comms, stop, c) + multi.Add(stop) + } - return stop + return multi } func registerNodes(session *storage.Session, rngGen *fastRNG.StreamGenerator, comms RegisterNodeCommsInterface, @@ -74,7 +80,6 @@ func registerNodes(session *storage.Session, rngGen *fastRNG.StreamGenerator, co case <-t.C: } } - } //registerWithNode serves as a helper for RegisterWithNodes