diff --git a/cmix/client.go b/cmix/client.go index bc7eaeaecc7a6a8fdb28f189e2d9ad9488870f2d..675e44be877c277301acafbcb65320d4f5a5fa03 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -92,6 +92,8 @@ type client struct { // Storage of the max message length maxMsgLen int + + numNodes *uint64 } // NewClient builds a new reception client object using inputted key fields. @@ -103,6 +105,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, tracker := uint64(0) earliest := uint64(0) + numNodes := uint64(0) + netTime.SetTimeSource(localTime{}) // Create client object @@ -117,6 +121,7 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, maxMsgLen: tmpMsg.ContentsSize(), skewTracker: clockSkew.New(params.ClockSkewClamp), attemptTracker: attempts.NewSendAttempts(), + numNodes: &numNodes, } if params.VerboseRoundTracking { @@ -134,10 +139,20 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, // initialize turns on network handlers, initializing a host pool and // network health monitors. This should be called before // network Follow command is called. -func (c *client) initialize(ndf *ndf.NetworkDefinition) error { +func (c *client) initialize(ndfile *ndf.NetworkDefinition) error { + + //set the number of nodes + numNodes := uint64(0) + for _, n := range ndfile.Nodes { + if n.Status != ndf.Stale { + numNodes++ + } + } + atomic.StoreUint64(c.numNodes, numNodes) + // Start network instance instance, err := commNetwork.NewInstance( - c.comms.ProtoComms, ndf, nil, nil, commNetwork.None, + c.comms.ProtoComms, ndfile, nil, nil, commNetwork.None, c.param.FastPolling) if err != nil { return errors.WithMessage( @@ -145,7 +160,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { } c.instance = instance - addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size + addrSize := ndfile.AddressSpace[len(ndfile.AddressSpace)-1].Size c.Space = address.NewAddressSpace(addrSize) /* Set up modules */ @@ -165,7 +180,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { // Enable optimized HostPool initialization poolParams.MaxPings = 50 poolParams.ForceConnection = true - sender, err := gateway.NewSender(poolParams, c.rng, ndf, c.comms, + sender, err := gateway.NewSender(poolParams, c.rng, ndfile, c.comms, c.session, nodeChan) if err != nil { return err @@ -174,7 +189,9 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { // Set up the node registrar c.Registrar, err = nodes.LoadRegistrar( - c.session, c.Sender, c.comms, c.rng, nodeChan) + c.session, c.Sender, c.comms, c.rng, nodeChan, func() int { + return int(atomic.LoadUint64(c.numNodes)) + }) if err != nil { return err } diff --git a/cmix/follow.go b/cmix/follow.go index 8ac5d46841b8a61d0e9300d98375ed9adc0f7d30..87f45b1314eb91ce00b002c9fbb3f3fc9a0da707 100644 --- a/cmix/follow.go +++ b/cmix/follow.go @@ -27,6 +27,7 @@ import ( "encoding/binary" "fmt" "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/xx_network/primitives/ndf" "sync" "sync/atomic" "time" @@ -258,6 +259,15 @@ func (c *client) follow(identity receptionID.IdentityUse, return } + //set the number of nodes + numNodes := uint64(0) + for _, n := range c.instance.GetPartialNdf().Get().Nodes { + if n.Status != ndf.Stale { + numNodes++ + } + } + atomic.StoreUint64(c.numNodes, numNodes) + // update gateway connections c.UpdateNdf(c.GetInstance().GetPartialNdf().Get()) c.session.SetNDF(c.GetInstance().GetPartialNdf().Get()) diff --git a/cmix/nodes/register.go b/cmix/nodes/register.go index 98b08d73fb0486f103d775685734bcf2d35d8b5e..f446c80a328ec966c7a101d5b83cc53943652c3c 100644 --- a/cmix/nodes/register.go +++ b/cmix/nodes/register.go @@ -32,7 +32,7 @@ import ( // before an interruption and how many registration attempts have // been attempted. func registerNodes(r *registrar, s session, stop *stoppable.Single, - inProgress, attempts *sync.Map) { + inProgress, attempts *sync.Map, index int) { atomic.AddInt64(r.numberRunning, 1) @@ -117,6 +117,13 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single, } rng.Close() } + if index >= 2 { + if float64(r.NumRegisteredNodes()) > (float64(r.numnodesGetter()) * .7) { + <-stop.Quit() + stop.ToStopped() + return + } + } } } diff --git a/cmix/nodes/registrar.go b/cmix/nodes/registrar.go index d0170e1a1c9d8202fd4d33f170a82184b54495c1..51a3b7d1d0d1c99f9d1bd0ccf8aec37102233ca9 100644 --- a/cmix/nodes/registrar.go +++ b/cmix/nodes/registrar.go @@ -25,7 +25,7 @@ import ( ) const InputChanLen = 1000 -const maxAttempts = 2 +const maxAttempts = 5 // Backoff for attempting to register with a cMix node. var delayTable = [5]time.Duration{ @@ -59,6 +59,8 @@ type registrar struct { runnerLock sync.Mutex + numnodesGetter func() int + c chan network.NodeGateway } @@ -66,17 +68,18 @@ type registrar struct { // exist. func LoadRegistrar(session session, sender gateway.Sender, comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator, - c chan network.NodeGateway) (Registrar, error) { + c chan network.NodeGateway, numNodesGetter func() int) (Registrar, error) { running := int64(0) kv := session.GetKV().Prefix(prefix) r := ®istrar{ - nodes: make(map[id.ID]*key), - kv: kv, - pauser: make(chan interface{}), - resumer: make(chan interface{}), - numberRunning: &running, + nodes: make(map[id.ID]*key), + kv: kv, + pauser: make(chan interface{}), + resumer: make(chan interface{}), + numberRunning: &running, + numnodesGetter: numNodesGetter, } obj, err := kv.Get(storeKey, currentKeyVersion) @@ -116,7 +119,7 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { for i := uint(0); i < numParallel; i++ { stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i))) - go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts) + go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts, int(i)) multi.Add(stop) } @@ -135,14 +138,11 @@ func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error { for i := int64(0); i < numRegistrations; i++ { select { case r.pauser <- struct{}{}: - jww.INFO.Printf("PauseNodeRegistrations() - paused node %d", i) case <-timer.C: return errors.Errorf("Timed out on pausing node registration on %d", i) } } - jww.INFO.Printf("PauseNodeRegistrations() - Paused all nodes") - return nil }