Skip to content
Snippets Groups Projects
Commit b5218334 authored by benjamin's avatar benjamin
Browse files

made the system stop all but 2 node registration threads one 70% of nodes have been registered with

parent a0b2277a
No related branches found
No related tags found
2 merge requests!510Release,!432Control node reg
......@@ -92,6 +92,8 @@ type client struct {
// Storage of the max message length
maxMsgLen int
numNodes *uint64
}
// NewClient builds a new reception client object using inputted key fields.
......@@ -103,6 +105,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
tracker := uint64(0)
earliest := uint64(0)
numNodes := uint64(0)
netTime.SetTimeSource(localTime{})
// Create client object
......@@ -117,6 +121,7 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
maxMsgLen: tmpMsg.ContentsSize(),
skewTracker: clockSkew.New(params.ClockSkewClamp),
attemptTracker: attempts.NewSendAttempts(),
numNodes: &numNodes,
}
if params.VerboseRoundTracking {
......@@ -134,10 +139,20 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
// initialize turns on network handlers, initializing a host pool and
// network health monitors. This should be called before
// network Follow command is called.
func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
func (c *client) initialize(ndfile *ndf.NetworkDefinition) error {
//set the number of nodes
numNodes := uint64(0)
for _, n := range ndfile.Nodes {
if n.Status != ndf.Stale {
numNodes++
}
}
atomic.StoreUint64(c.numNodes, numNodes)
// Start network instance
instance, err := commNetwork.NewInstance(
c.comms.ProtoComms, ndf, nil, nil, commNetwork.None,
c.comms.ProtoComms, ndfile, nil, nil, commNetwork.None,
c.param.FastPolling)
if err != nil {
return errors.WithMessage(
......@@ -145,7 +160,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
}
c.instance = instance
addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size
addrSize := ndfile.AddressSpace[len(ndfile.AddressSpace)-1].Size
c.Space = address.NewAddressSpace(addrSize)
/* Set up modules */
......@@ -165,7 +180,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
// Enable optimized HostPool initialization
poolParams.MaxPings = 50
poolParams.ForceConnection = true
sender, err := gateway.NewSender(poolParams, c.rng, ndf, c.comms,
sender, err := gateway.NewSender(poolParams, c.rng, ndfile, c.comms,
c.session, nodeChan)
if err != nil {
return err
......@@ -174,7 +189,9 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
// Set up the node registrar
c.Registrar, err = nodes.LoadRegistrar(
c.session, c.Sender, c.comms, c.rng, nodeChan)
c.session, c.Sender, c.comms, c.rng, nodeChan, func() int {
return int(atomic.LoadUint64(c.numNodes))
})
if err != nil {
return err
}
......
......@@ -27,6 +27,7 @@ import (
"encoding/binary"
"fmt"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/xx_network/primitives/ndf"
"sync"
"sync/atomic"
"time"
......@@ -258,6 +259,15 @@ func (c *client) follow(identity receptionID.IdentityUse,
return
}
//set the number of nodes
numNodes := uint64(0)
for _, n := range c.instance.GetPartialNdf().Get().Nodes {
if n.Status != ndf.Stale {
numNodes++
}
}
atomic.StoreUint64(c.numNodes, numNodes)
// update gateway connections
c.UpdateNdf(c.GetInstance().GetPartialNdf().Get())
c.session.SetNDF(c.GetInstance().GetPartialNdf().Get())
......
......@@ -32,7 +32,7 @@ import (
// before an interruption and how many registration attempts have
// been attempted.
func registerNodes(r *registrar, s session, stop *stoppable.Single,
inProgress, attempts *sync.Map) {
inProgress, attempts *sync.Map, index int) {
atomic.AddInt64(r.numberRunning, 1)
......@@ -117,6 +117,13 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single,
}
rng.Close()
}
if index >= 2 {
if float64(r.NumRegisteredNodes()) > (float64(r.numnodesGetter()) * .7) {
<-stop.Quit()
stop.ToStopped()
return
}
}
}
}
......
......@@ -25,7 +25,7 @@ import (
)
const InputChanLen = 1000
const maxAttempts = 2
const maxAttempts = 5
// Backoff for attempting to register with a cMix node.
var delayTable = [5]time.Duration{
......@@ -59,6 +59,8 @@ type registrar struct {
runnerLock sync.Mutex
numnodesGetter func() int
c chan network.NodeGateway
}
......@@ -66,17 +68,18 @@ type registrar struct {
// exist.
func LoadRegistrar(session session, sender gateway.Sender,
comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator,
c chan network.NodeGateway) (Registrar, error) {
c chan network.NodeGateway, numNodesGetter func() int) (Registrar, error) {
running := int64(0)
kv := session.GetKV().Prefix(prefix)
r := &registrar{
nodes: make(map[id.ID]*key),
kv: kv,
pauser: make(chan interface{}),
resumer: make(chan interface{}),
numberRunning: &running,
nodes: make(map[id.ID]*key),
kv: kv,
pauser: make(chan interface{}),
resumer: make(chan interface{}),
numberRunning: &running,
numnodesGetter: numNodesGetter,
}
obj, err := kv.Get(storeKey, currentKeyVersion)
......@@ -116,7 +119,7 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable {
for i := uint(0); i < numParallel; i++ {
stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i)))
go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts)
go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts, int(i))
multi.Add(stop)
}
......@@ -135,14 +138,11 @@ func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error {
for i := int64(0); i < numRegistrations; i++ {
select {
case r.pauser <- struct{}{}:
jww.INFO.Printf("PauseNodeRegistrations() - paused node %d", i)
case <-timer.C:
return errors.Errorf("Timed out on pausing node registration on %d", i)
}
}
jww.INFO.Printf("PauseNodeRegistrations() - Paused all nodes")
return nil
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment