From b521833463ec79f85c23af23b500b448124d184a Mon Sep 17 00:00:00 2001 From: benjamin <ben@elixxir.io> Date: Thu, 27 Oct 2022 14:22:37 -0700 Subject: [PATCH] made the system stop all but 2 node registration threads one 70% of nodes have been registered with --- cmix/client.go | 27 ++++++++++++++++++++++----- cmix/follow.go | 10 ++++++++++ cmix/nodes/register.go | 9 ++++++++- cmix/nodes/registrar.go | 22 +++++++++++----------- 4 files changed, 51 insertions(+), 17 deletions(-) diff --git a/cmix/client.go b/cmix/client.go index bc7eaeaec..675e44be8 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -92,6 +92,8 @@ type client struct { // Storage of the max message length maxMsgLen int + + numNodes *uint64 } // NewClient builds a new reception client object using inputted key fields. @@ -103,6 +105,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, tracker := uint64(0) earliest := uint64(0) + numNodes := uint64(0) + netTime.SetTimeSource(localTime{}) // Create client object @@ -117,6 +121,7 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, maxMsgLen: tmpMsg.ContentsSize(), skewTracker: clockSkew.New(params.ClockSkewClamp), attemptTracker: attempts.NewSendAttempts(), + numNodes: &numNodes, } if params.VerboseRoundTracking { @@ -134,10 +139,20 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, // initialize turns on network handlers, initializing a host pool and // network health monitors. This should be called before // network Follow command is called. -func (c *client) initialize(ndf *ndf.NetworkDefinition) error { +func (c *client) initialize(ndfile *ndf.NetworkDefinition) error { + + //set the number of nodes + numNodes := uint64(0) + for _, n := range ndfile.Nodes { + if n.Status != ndf.Stale { + numNodes++ + } + } + atomic.StoreUint64(c.numNodes, numNodes) + // Start network instance instance, err := commNetwork.NewInstance( - c.comms.ProtoComms, ndf, nil, nil, commNetwork.None, + c.comms.ProtoComms, ndfile, nil, nil, commNetwork.None, c.param.FastPolling) if err != nil { return errors.WithMessage( @@ -145,7 +160,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { } c.instance = instance - addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size + addrSize := ndfile.AddressSpace[len(ndfile.AddressSpace)-1].Size c.Space = address.NewAddressSpace(addrSize) /* Set up modules */ @@ -165,7 +180,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { // Enable optimized HostPool initialization poolParams.MaxPings = 50 poolParams.ForceConnection = true - sender, err := gateway.NewSender(poolParams, c.rng, ndf, c.comms, + sender, err := gateway.NewSender(poolParams, c.rng, ndfile, c.comms, c.session, nodeChan) if err != nil { return err @@ -174,7 +189,9 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { // Set up the node registrar c.Registrar, err = nodes.LoadRegistrar( - c.session, c.Sender, c.comms, c.rng, nodeChan) + c.session, c.Sender, c.comms, c.rng, nodeChan, func() int { + return int(atomic.LoadUint64(c.numNodes)) + }) if err != nil { return err } diff --git a/cmix/follow.go b/cmix/follow.go index 8ac5d4684..87f45b131 100644 --- a/cmix/follow.go +++ b/cmix/follow.go @@ -27,6 +27,7 @@ import ( "encoding/binary" "fmt" "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/xx_network/primitives/ndf" "sync" "sync/atomic" "time" @@ -258,6 +259,15 @@ func (c *client) follow(identity receptionID.IdentityUse, return } + //set the number of nodes + numNodes := uint64(0) + for _, n := range c.instance.GetPartialNdf().Get().Nodes { + if n.Status != ndf.Stale { + numNodes++ + } + } + atomic.StoreUint64(c.numNodes, numNodes) + // update gateway connections c.UpdateNdf(c.GetInstance().GetPartialNdf().Get()) c.session.SetNDF(c.GetInstance().GetPartialNdf().Get()) diff --git a/cmix/nodes/register.go b/cmix/nodes/register.go index 98b08d73f..f446c80a3 100644 --- a/cmix/nodes/register.go +++ b/cmix/nodes/register.go @@ -32,7 +32,7 @@ import ( // before an interruption and how many registration attempts have // been attempted. func registerNodes(r *registrar, s session, stop *stoppable.Single, - inProgress, attempts *sync.Map) { + inProgress, attempts *sync.Map, index int) { atomic.AddInt64(r.numberRunning, 1) @@ -117,6 +117,13 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single, } rng.Close() } + if index >= 2 { + if float64(r.NumRegisteredNodes()) > (float64(r.numnodesGetter()) * .7) { + <-stop.Quit() + stop.ToStopped() + return + } + } } } diff --git a/cmix/nodes/registrar.go b/cmix/nodes/registrar.go index d0170e1a1..51a3b7d1d 100644 --- a/cmix/nodes/registrar.go +++ b/cmix/nodes/registrar.go @@ -25,7 +25,7 @@ import ( ) const InputChanLen = 1000 -const maxAttempts = 2 +const maxAttempts = 5 // Backoff for attempting to register with a cMix node. var delayTable = [5]time.Duration{ @@ -59,6 +59,8 @@ type registrar struct { runnerLock sync.Mutex + numnodesGetter func() int + c chan network.NodeGateway } @@ -66,17 +68,18 @@ type registrar struct { // exist. func LoadRegistrar(session session, sender gateway.Sender, comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator, - c chan network.NodeGateway) (Registrar, error) { + c chan network.NodeGateway, numNodesGetter func() int) (Registrar, error) { running := int64(0) kv := session.GetKV().Prefix(prefix) r := ®istrar{ - nodes: make(map[id.ID]*key), - kv: kv, - pauser: make(chan interface{}), - resumer: make(chan interface{}), - numberRunning: &running, + nodes: make(map[id.ID]*key), + kv: kv, + pauser: make(chan interface{}), + resumer: make(chan interface{}), + numberRunning: &running, + numnodesGetter: numNodesGetter, } obj, err := kv.Get(storeKey, currentKeyVersion) @@ -116,7 +119,7 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { for i := uint(0); i < numParallel; i++ { stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i))) - go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts) + go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts, int(i)) multi.Add(stop) } @@ -135,14 +138,11 @@ func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error { for i := int64(0); i < numRegistrations; i++ { select { case r.pauser <- struct{}{}: - jww.INFO.Printf("PauseNodeRegistrations() - paused node %d", i) case <-timer.C: return errors.Errorf("Timed out on pausing node registration on %d", i) } } - jww.INFO.Printf("PauseNodeRegistrations() - Paused all nodes") - return nil } -- GitLab