diff --git a/bindings/channels.go b/bindings/channels.go index 78182a190bb63b9f851049c85cb944da50a89454..f58af96029bfeb55ff800e13d6be032185e2823e 100644 --- a/bindings/channels.go +++ b/bindings/channels.go @@ -11,7 +11,6 @@ import ( "crypto/ed25519" "encoding/base64" "encoding/json" - jww "github.com/spf13/jwalterweatherman" "sync" "time" @@ -235,16 +234,6 @@ func NewChannelsManagerGoEventModel(cmixID int, privateIdentity []byte, return nil, err } - // fixme: remove this, make it called by the javascript - // hack to get to release late at night - go func() { - time.Sleep(3 * time.Second) - localErr := user.IncreaseParallelNodeRegistration(13) - if localErr != nil { - jww.ERROR.Printf(localErr.Error()) - } - }() - // Add channel to singleton and return return channelManagerTrackerSingleton.make(m), nil } @@ -278,16 +267,6 @@ func LoadChannelsManagerGoEventModel(cmixID int, storageTag string, return nil, err } - // fixme: remove this, make it called by the javascript - // hack to get to release late at night - go func() { - time.Sleep(3 * time.Second) - localErr := user.IncreaseParallelNodeRegistration(13) - if localErr != nil { - jww.ERROR.Printf(localErr.Error()) - } - }() - // Add channel to singleton and return return channelManagerTrackerSingleton.make(m), nil } diff --git a/bindings/follow.go b/bindings/follow.go index 4327964853f82e3f23146ecb3d017a5552e90146..08a9d0bffe2ed250c076b918c76926b0e2544cfa 100644 --- a/bindings/follow.go +++ b/bindings/follow.go @@ -99,13 +99,22 @@ func (c *Cmix) ReadyToSend() bool { jww.FATAL.Panicf("Failed to get node registration status: %+v", err) } - // FIXME: This is a fix put in place because not all nodes in the NDF are - // online. This should be fixed. - total = 340 - return numReg >= total*7/10 } +// IsReadyInfo contains information on if the network is ready and how close it +// is to being ready. +// +// Example JSON: +// { +// "IsReady": true, +// "HowClose": 0.534 +// } +type IsReadyInfo struct { + IsReady bool + HowClose float64 +} + // NetworkFollowerStatus gets the state of the network follower. It returns a // status with the following values: // Stopped - 0 @@ -144,6 +153,44 @@ func (c *Cmix) GetNodeRegistrationStatus() ([]byte, error) { return json.Marshal(nodeRegReport) } +// IsReady returns true if at least percentReady of node registrations has +// completed. If not all have completed, then it returns false and howClose will +// be a percent (0-1) of node registrations completed. +// +// Parameters: +// - percentReady - The percentage of nodes required to be registered with to +// be ready. This is a number between 0 and 1. +// +// Returns: +// - JSON of [IsReadyInfo]. +func (c *Cmix) IsReady(percentReady float64) ([]byte, error) { + isReady, howClose := c.api.IsReady(percentReady) + return json.Marshal(&IsReadyInfo{isReady, howClose}) +} + +// PauseNodeRegistrations stops all node registrations and returns a function to +// resume them. +// +// Parameters: +// - timeoutMS - The timeout, in milliseconds, to wait when stopping threads +// before failing. +func (c *Cmix) PauseNodeRegistrations(timeoutMS int) error { + timeout := time.Duration(timeoutMS) * time.Millisecond + return c.api.PauseNodeRegistrations(timeout) +} + +// ChangeNumberOfNodeRegistrations changes the number of parallel node +// registrations up to the initialized maximum. +// +// Parameters: +// - toRun - The number of parallel node registrations. +// - timeoutMS - The timeout, in milliseconds, to wait when changing node +// registrations before failing. +func (c *Cmix) ChangeNumberOfNodeRegistrations(toRun, timeoutMS int) error { + timeout := time.Duration(timeoutMS) * time.Millisecond + return c.api.ChangeNumberOfNodeRegistrations(toRun, timeout) +} + // HasRunningProcessies checks if any background threads are running and returns // true if one or more are. // @@ -188,12 +235,6 @@ func (c *Cmix) AddHealthCallback(nhc NetworkHealthCallback) int64 { return int64(c.api.GetCmix().AddHealthCallback(nhc.Callback)) } -// IncreaseParallelNodeRegistration increases the number of parallel node -// registrations by num -func (c *Cmix) IncreaseParallelNodeRegistration(num int) error { - return c.api.IncreaseParallelNodeRegistration(num) -} - // RemoveHealthCallback removes a health callback using its registration ID. func (c *Cmix) RemoveHealthCallback(funcID int64) { c.api.GetCmix().RemoveHealthCallback(uint64(funcID)) diff --git a/cmix/client.go b/cmix/client.go index bc7eaeaecc7a6a8fdb28f189e2d9ad9488870f2d..675e44be877c277301acafbcb65320d4f5a5fa03 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -92,6 +92,8 @@ type client struct { // Storage of the max message length maxMsgLen int + + numNodes *uint64 } // NewClient builds a new reception client object using inputted key fields. @@ -103,6 +105,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, tracker := uint64(0) earliest := uint64(0) + numNodes := uint64(0) + netTime.SetTimeSource(localTime{}) // Create client object @@ -117,6 +121,7 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, maxMsgLen: tmpMsg.ContentsSize(), skewTracker: clockSkew.New(params.ClockSkewClamp), attemptTracker: attempts.NewSendAttempts(), + numNodes: &numNodes, } if params.VerboseRoundTracking { @@ -134,10 +139,20 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, // initialize turns on network handlers, initializing a host pool and // network health monitors. This should be called before // network Follow command is called. -func (c *client) initialize(ndf *ndf.NetworkDefinition) error { +func (c *client) initialize(ndfile *ndf.NetworkDefinition) error { + + //set the number of nodes + numNodes := uint64(0) + for _, n := range ndfile.Nodes { + if n.Status != ndf.Stale { + numNodes++ + } + } + atomic.StoreUint64(c.numNodes, numNodes) + // Start network instance instance, err := commNetwork.NewInstance( - c.comms.ProtoComms, ndf, nil, nil, commNetwork.None, + c.comms.ProtoComms, ndfile, nil, nil, commNetwork.None, c.param.FastPolling) if err != nil { return errors.WithMessage( @@ -145,7 +160,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { } c.instance = instance - addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size + addrSize := ndfile.AddressSpace[len(ndfile.AddressSpace)-1].Size c.Space = address.NewAddressSpace(addrSize) /* Set up modules */ @@ -165,7 +180,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { // Enable optimized HostPool initialization poolParams.MaxPings = 50 poolParams.ForceConnection = true - sender, err := gateway.NewSender(poolParams, c.rng, ndf, c.comms, + sender, err := gateway.NewSender(poolParams, c.rng, ndfile, c.comms, c.session, nodeChan) if err != nil { return err @@ -174,7 +189,9 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { // Set up the node registrar c.Registrar, err = nodes.LoadRegistrar( - c.session, c.Sender, c.comms, c.rng, nodeChan) + c.session, c.Sender, c.comms, c.rng, nodeChan, func() int { + return int(atomic.LoadUint64(c.numNodes)) + }) if err != nil { return err } diff --git a/cmix/follow.go b/cmix/follow.go index 8ac5d46841b8a61d0e9300d98375ed9adc0f7d30..87f45b1314eb91ce00b002c9fbb3f3fc9a0da707 100644 --- a/cmix/follow.go +++ b/cmix/follow.go @@ -27,6 +27,7 @@ import ( "encoding/binary" "fmt" "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/xx_network/primitives/ndf" "sync" "sync/atomic" "time" @@ -258,6 +259,15 @@ func (c *client) follow(identity receptionID.IdentityUse, return } + //set the number of nodes + numNodes := uint64(0) + for _, n := range c.instance.GetPartialNdf().Get().Nodes { + if n.Status != ndf.Stale { + numNodes++ + } + } + atomic.StoreUint64(c.numNodes, numNodes) + // update gateway connections c.UpdateNdf(c.GetInstance().GetPartialNdf().Get()) c.session.SetNDF(c.GetInstance().GetPartialNdf().Get()) diff --git a/cmix/interface.go b/cmix/interface.go index 0e01001ba00835213f1bf3d868a049bf427380d2..8fd4d51dbcc8e73513bb048ab2933397d2235f13 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -191,9 +191,13 @@ type Client interface { AddService(clientID *id.ID, newService message.Service, response message.Processor) - // IncreaseParallelNodeRegistration increases the number of parallel node - // registrations by num - IncreaseParallelNodeRegistration(num int) func() (stoppable.Stoppable, error) + // PauseNodeRegistrations stops all node registrations and returns a + // function to resume them. + PauseNodeRegistrations(timeout time.Duration) error + + // ChangeNumberOfNodeRegistrations changes the number of parallel node + // registrations up to the initialized maximum. + ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error // DeleteService deletes a message service. If only a single response is // associated with the preimage, the entire preimage is removed. If there is diff --git a/cmix/nodes/interfaces.go b/cmix/nodes/interfaces.go index ddce4b8f21c98f4ed6824e56a6d755be63ee6f82..77520a2ec0c74b7d5903b92889450f3dfbf32671 100644 --- a/cmix/nodes/interfaces.go +++ b/cmix/nodes/interfaces.go @@ -27,9 +27,13 @@ type Registrar interface { // to register with nodes. StartProcesses(numParallel uint) stoppable.Stoppable - // IncreaseParallelNodeRegistration increases the number of parallel node - // registrations by num - IncreaseParallelNodeRegistration(num int) func() (stoppable.Stoppable, error) + //PauseNodeRegistrations stops all node registrations + //and returns a function to resume them + PauseNodeRegistrations(timeout time.Duration) error + + // ChangeNumberOfNodeRegistrations changes the number of parallel node + // registrations up to the initialized maximum + ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did // not have a key for. If there are missing keys, then returns nil. diff --git a/cmix/nodes/register.go b/cmix/nodes/register.go index 8cada7d120e663cf9ef075d11fe72c3c409c7859..f446c80a328ec966c7a101d5b83cc53943652c3c 100644 --- a/cmix/nodes/register.go +++ b/cmix/nodes/register.go @@ -13,6 +13,7 @@ import ( "gitlab.com/xx_network/crypto/csprng" "strconv" "sync" + "sync/atomic" "time" "github.com/pkg/errors" @@ -31,16 +32,24 @@ import ( // before an interruption and how many registration attempts have // been attempted. func registerNodes(r *registrar, s session, stop *stoppable.Single, - inProgress, attempts *sync.Map) { + inProgress, attempts *sync.Map, index int) { + + atomic.AddInt64(r.numberRunning, 1) - interval := time.Duration(500) * time.Millisecond - t := time.NewTicker(interval) for { select { + case <-r.pauser: + atomic.AddInt64(r.numberRunning, -1) + select { + case <-stop.Quit(): + stop.ToStopped() + return + case <-r.resumer: + atomic.AddInt64(r.numberRunning, 1) + } case <-stop.Quit(): - // On a stop signal, close the thread - t.Stop() stop.ToStopped() + atomic.AddInt64(r.numberRunning, -1) return case gw := <-r.c: @@ -107,7 +116,13 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single, } } rng.Close() - case <-t.C: + } + if index >= 2 { + if float64(r.NumRegisteredNodes()) > (float64(r.numnodesGetter()) * .7) { + <-stop.Quit() + stop.ToStopped() + return + } } } } diff --git a/cmix/nodes/registrar.go b/cmix/nodes/registrar.go index 0c942b8de091ab1d64b2eb2cc0d67b73c19d0194..51a3b7d1d0d1c99f9d1bd0ccf8aec37102233ca9 100644 --- a/cmix/nodes/registrar.go +++ b/cmix/nodes/registrar.go @@ -20,11 +20,12 @@ import ( "gitlab.com/xx_network/primitives/ndf" "strconv" "sync" + "sync/atomic" "time" ) const InputChanLen = 1000 -const maxAttempts = 2 +const maxAttempts = 5 // Backoff for attempting to register with a cMix node. var delayTable = [5]time.Duration{ @@ -51,6 +52,15 @@ type registrar struct { // operator at a time, as a result this is a map of ID -> int attempts sync.Map + pauser chan interface{} + resumer chan interface{} + numberRunning *int64 + maxRunning int + + runnerLock sync.Mutex + + numnodesGetter func() int + c chan network.NodeGateway } @@ -58,12 +68,18 @@ type registrar struct { // exist. func LoadRegistrar(session session, sender gateway.Sender, comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator, - c chan network.NodeGateway) (Registrar, error) { + c chan network.NodeGateway, numNodesGetter func() int) (Registrar, error) { + + running := int64(0) kv := session.GetKV().Prefix(prefix) r := ®istrar{ - nodes: make(map[id.ID]*key), - kv: kv, + nodes: make(map[id.ID]*key), + kv: kv, + pauser: make(chan interface{}), + resumer: make(chan interface{}), + numberRunning: &running, + numnodesGetter: numNodesGetter, } obj, err := kv.Get(storeKey, currentKeyVersion) @@ -94,33 +110,77 @@ func LoadRegistrar(session session, sender gateway.Sender, // StartProcesses initiates numParallel amount of threads // to register with nodes. func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { + r.runnerLock.Lock() + defer r.runnerLock.Unlock() + multi := stoppable.NewMulti("NodeRegistrations") + r.maxRunning = int(numParallel) for i := uint(0); i < numParallel; i++ { stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i))) - go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts) + go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts, int(i)) multi.Add(stop) } return multi } -// IncreaseParallelNodeRegistration increases the number of parallel node -// registrations by num -func (r *registrar) IncreaseParallelNodeRegistration(numParallel int) func() (stoppable.Stoppable, error) { - return func() (stoppable.Stoppable, error) { - multi := stoppable.NewMulti("NodeRegistrationsIncrease") +//PauseNodeRegistrations stops all node registrations +//and returns a function to resume them +func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error { + r.runnerLock.Lock() + defer r.runnerLock.Unlock() + timer := time.NewTimer(timeout) + defer timer.Stop() + numRegistrations := atomic.LoadInt64(r.numberRunning) + jww.INFO.Printf("PauseNodeRegistrations() - Pausing %d registrations", numRegistrations) + for i := int64(0); i < numRegistrations; i++ { + select { + case r.pauser <- struct{}{}: + case <-timer.C: + return errors.Errorf("Timed out on pausing node registration on %d", i) + } + } - for i := uint(0); i < uint(numParallel); i++ { - stop := stoppable.NewSingle("NodeRegistration Increase" + strconv.Itoa(int(i))) + return nil +} - go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts) - multi.Add(stop) +// ChangeNumberOfNodeRegistrations changes the number of parallel node +// registrations up to the initialized maximum +func (r *registrar) ChangeNumberOfNodeRegistrations(toRun int, + timeout time.Duration) error { + r.runnerLock.Lock() + defer r.runnerLock.Unlock() + numRunning := int(atomic.LoadInt64(r.numberRunning)) + if toRun+numRunning > r.maxRunning { + return errors.Errorf("Cannot change number of " + + "running node registration to number greater than the max") + } + timer := time.NewTimer(timeout) + defer timer.Stop() + if numRunning < toRun { + jww.INFO.Printf("ChangeNumberOfNodeRegistrations(%d) Reducing number "+ + "of node registrations from %d to %d", toRun, numRunning, toRun) + for i := 0; i < toRun-numRunning; i++ { + select { + case r.pauser <- struct{}{}: + case <-timer.C: + return errors.New("Timed out on reducing node registration") + } + } + } else if numRunning > toRun { + jww.INFO.Printf("ChangeNumberOfNodeRegistrations(%d) Increasing number "+ + "of node registrations from %d to %d", toRun, numRunning, toRun) + for i := 0; i < toRun-numRunning; i++ { + select { + case r.resumer <- struct{}{}: + case <-timer.C: + return errors.New("Timed out on increasing node registration") + } } - - return multi, nil } + return nil } // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did diff --git a/cmix/nodes/registrar_test.go b/cmix/nodes/registrar_test.go index 922adfad68bc2b52f999a92561c834803be4f16b..55b684495decc278b4eb3147c3b0398676879901 100644 --- a/cmix/nodes/registrar_test.go +++ b/cmix/nodes/registrar_test.go @@ -38,7 +38,7 @@ func TestLoadRegistrar_New(t *testing.T) { nodeChan := make(chan commNetwork.NodeGateway, InputChanLen) r, err := LoadRegistrar(session, sender, &MockClientComms{}, - rngGen, nodeChan) + rngGen, nodeChan, func() int { return 100 }) if err != nil { t.Fatalf("Failed to create new registrar: %+v", err) } @@ -71,7 +71,7 @@ func TestLoadRegistrar_Load(t *testing.T) { // Load the store and check its attributes r, err := LoadRegistrar( - testR.session, testR.sender, testR.comms, testR.rng, testR.c) + testR.session, testR.sender, testR.comms, testR.rng, testR.c, func() int { return 100 }) if err != nil { t.Fatalf("Unable to load store: %+v", err) } diff --git a/cmix/nodes/utils_test.go b/cmix/nodes/utils_test.go index 96eacda72794e3f66c1b4513e7f359624cf0cbe1..9f9657db116623508c86db323425feaf293982a9 100644 --- a/cmix/nodes/utils_test.go +++ b/cmix/nodes/utils_test.go @@ -101,7 +101,7 @@ func makeTestRegistrar(mockComms *MockClientComms, t *testing.T) *registrar { nodeChan := make(chan commNetwork.NodeGateway, InputChanLen) r, err := LoadRegistrar( - session, sender, mockComms, rngGen, nodeChan) + session, sender, mockComms, rngGen, nodeChan, func() int { return 100 }) if err != nil { t.Fatalf("Failed to create new registrar: %+v", err) } diff --git a/cmix/parallelNodeRegistrations_js.go b/cmix/parallelNodeRegistrations_js.go index 8e90835507059eb67d8b8c5baacdce7561b35129..f30ec9f44a988a1bfc14e4d647d2c26c8bfd12b9 100644 --- a/cmix/parallelNodeRegistrations_js.go +++ b/cmix/parallelNodeRegistrations_js.go @@ -1,3 +1,3 @@ package cmix -const defaultParallelNodeRegistration = 2 +const defaultParallelNodeRegistration = 12 diff --git a/cmix/params.go b/cmix/params.go index 01a606e5b44fdb5d72d1ed851a82598f41ddff6c..ec4148ea782f62079898dc829f0b28a495cd9032 100644 --- a/cmix/params.go +++ b/cmix/params.go @@ -106,7 +106,7 @@ func GetDefaultParams() Params { VerboseRoundTracking: false, RealtimeOnly: false, ReplayRequests: true, - MaxParallelIdentityTracks: 20, + MaxParallelIdentityTracks: 5, ClockSkewClamp: 50 * time.Millisecond, } n.Rounds = rounds.GetDefaultParams() diff --git a/connect/utils_test.go b/connect/utils_test.go index 9e38793d8423a74299197d8e3a43d00180df20ab..edbf629f8a6f7dccb89d5fe791c2239f398bea1f 100644 --- a/connect/utils_test.go +++ b/connect/utils_test.go @@ -180,22 +180,24 @@ func (m *mockCmix) GetIdentity(*id.ID) (identity.TrackedID, error) { return identity.TrackedID{Creation: netTime.Now().Add(-time.Minute)}, nil } -func (m *mockCmix) AddFingerprint(*id.ID, format.Fingerprint, message.Processor) error { return nil } -func (m *mockCmix) DeleteFingerprint(*id.ID, format.Fingerprint) {} -func (m *mockCmix) DeleteClientFingerprints(*id.ID) {} -func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) {} -func (m *mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) {return nil} -func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) {} -func (m *mockCmix) DeleteClientService(*id.ID) {} -func (m *mockCmix) TrackServices(message.ServicesTracker) {} -func (m *mockCmix) CheckInProgressMessages() {} -func (m *mockCmix) IsHealthy() bool { return true } -func (m *mockCmix) WasHealthy() bool { return true } -func (m *mockCmix) AddHealthCallback(func(bool)) uint64 { return 0 } -func (m *mockCmix) RemoveHealthCallback(uint64) {} -func (m *mockCmix) HasNode(*id.ID) bool { return true } -func (m *mockCmix) NumRegisteredNodes() int { return 24 } -func (m *mockCmix) TriggerNodeRegistration(*id.ID) {} +func (m *mockCmix) AddFingerprint(*id.ID, format.Fingerprint, message.Processor) error { return nil } +func (m *mockCmix) DeleteFingerprint(*id.ID, format.Fingerprint) {} +func (m *mockCmix) DeleteClientFingerprints(*id.ID) {} +func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) {} +func (m *mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + return nil +} +func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) {} +func (m *mockCmix) DeleteClientService(*id.ID) {} +func (m *mockCmix) TrackServices(message.ServicesTracker) {} +func (m *mockCmix) CheckInProgressMessages() {} +func (m *mockCmix) IsHealthy() bool { return true } +func (m *mockCmix) WasHealthy() bool { return true } +func (m *mockCmix) AddHealthCallback(func(bool)) uint64 { return 0 } +func (m *mockCmix) RemoveHealthCallback(uint64) {} +func (m *mockCmix) HasNode(*id.ID) bool { return true } +func (m *mockCmix) NumRegisteredNodes() int { return 24 } +func (m *mockCmix) TriggerNodeRegistration(*id.ID) {} func (m *mockCmix) GetRoundResults(_ time.Duration, roundCallback cmix.RoundEventCallback, _ ...id.Round) { roundCallback(true, false, nil) @@ -215,6 +217,10 @@ func (m *mockCmix) RegisterAddressSpaceNotification(string) (chan uint8, error) func (m *mockCmix) UnregisterAddressSpaceNotification(string) {} func (m *mockCmix) GetInstance() *network.Instance { return m.instance } func (m *mockCmix) GetVerboseRounds() string { return "" } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} //////////////////////////////////////////////////////////////////////////////// // Misc set-up utils // diff --git a/dummy/mockCmix_test.go b/dummy/mockCmix_test.go index 60e7de512b840412d9290fdf0cde190f88cf3d74..34b7d5b36dc41cccf13a2737b20b7c5ea37636e1 100644 --- a/dummy/mockCmix_test.go +++ b/dummy/mockCmix_test.go @@ -131,7 +131,6 @@ func (m mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppa panic("implement me") } - func (m mockCmix) DeleteService(clientID *id.ID, toDelete message.Service, processor message.Processor) { //TODO implement me panic("implement me") @@ -241,3 +240,7 @@ func (m mockCmix) GetVerboseRounds() string { //TODO implement me panic("implement me") } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} diff --git a/e2e/fpGenerator_test.go b/e2e/fpGenerator_test.go index adb178e669a8ca94c79bae2909d009530e6f168d..75b03c161d502bef8bb21a84ab681af0c3f47b23 100644 --- a/e2e/fpGenerator_test.go +++ b/e2e/fpGenerator_test.go @@ -191,3 +191,7 @@ func (m *mockFpgCmix) RegisterAddressSpaceNotification(string) (chan uint8, erro func (m *mockFpgCmix) UnregisterAddressSpaceNotification(string) {} func (m *mockFpgCmix) GetInstance() *network.Instance { return nil } func (m *mockFpgCmix) GetVerboseRounds() string { return "" } +func (m *mockFpgCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockFpgCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} diff --git a/e2e/rekey/utils_test.go b/e2e/rekey/utils_test.go index 58512b33aa0dc41c9a17f0ab22605c5002e2d013..08c7ec154ed2b8b85ee900a3d52a3f7475964e65 100644 --- a/e2e/rekey/utils_test.go +++ b/e2e/rekey/utils_test.go @@ -351,3 +351,8 @@ func (m *mockNetManager) GetInstance() *network2.Instance { func (m *mockNetManager) GetVerboseRounds() string { return "" } + +func (m *mockNetManager) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockNetManager) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} diff --git a/e2e/utils_test.go b/e2e/utils_test.go index 8ba510fe0e66471878310440ca0de0da7980fdeb..52579e52870adbdfeac35087e9cf6558a60549d8 100644 --- a/e2e/utils_test.go +++ b/e2e/utils_test.go @@ -270,6 +270,10 @@ func (m *mockCmix) RegisterAddressSpaceNotification(string) (chan uint8, error) func (m *mockCmix) UnregisterAddressSpaceNotification(string) { return } func (m *mockCmix) GetInstance() *network.Instance { return m.instance } func (m *mockCmix) GetVerboseRounds() string { return "" } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} //////////////////////////////////////////////////////////////////////////////// // NDF // diff --git a/fileTransfer/connect/utils_test.go b/fileTransfer/connect/utils_test.go index 14707813e1a1a2361171b1c6a9412cd4dee8b57e..eb365c0ec7b3737532c33d776e570dc1ef2bb5c5 100644 --- a/fileTransfer/connect/utils_test.go +++ b/fileTransfer/connect/utils_test.go @@ -214,9 +214,13 @@ func (m *mockCmix) GetAddressSpace() uint8 { panic("implement me") } func (m *mockCmix) RegisterAddressSpaceNotification(string) (chan uint8, error) { panic("implement me") } -func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } -func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } -func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } +func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } +func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} //////////////////////////////////////////////////////////////////////////////// // Mock Connection Handler // diff --git a/fileTransfer/e2e/utils_test.go b/fileTransfer/e2e/utils_test.go index b77985aed45ac7dd36fa600a3684f41aac98a0ae..7635d9a88e32e9a6cb2a719bd6435813a210d804 100644 --- a/fileTransfer/e2e/utils_test.go +++ b/fileTransfer/e2e/utils_test.go @@ -220,6 +220,11 @@ func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} + //////////////////////////////////////////////////////////////////////////////// // Mock E2E Handler // //////////////////////////////////////////////////////////////////////////////// diff --git a/fileTransfer/groupChat/utils_test.go b/fileTransfer/groupChat/utils_test.go index 4475c6a0d45de084a214f73210681b9346132899..ee03acb237a2c83c7703448ae0a412bf2d5f8748 100644 --- a/fileTransfer/groupChat/utils_test.go +++ b/fileTransfer/groupChat/utils_test.go @@ -155,9 +155,11 @@ func (m *mockCmix) DeleteFingerprint(_ *id.ID, fp format.Fingerprint) { m.handler.Unlock() } -func (m *mockCmix) DeleteClientFingerprints(*id.ID) { panic("implement me") } -func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) { panic("implement me") } -func (m *mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) {panic("implement me")} +func (m *mockCmix) DeleteClientFingerprints(*id.ID) { panic("implement me") } +func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) { panic("implement me") } +func (m *mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + panic("implement me") +} func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) { panic("implement me") } func (m *mockCmix) DeleteClientService(*id.ID) { panic("implement me") } func (m *mockCmix) TrackServices(message.ServicesTracker) { panic("implement me") } @@ -209,9 +211,13 @@ func (m *mockCmix) GetAddressSpace() uint8 { panic("implement me") } func (m *mockCmix) RegisterAddressSpaceNotification(string) (chan uint8, error) { panic("implement me") } -func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } -func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } -func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } +func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } +func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} //////////////////////////////////////////////////////////////////////////////// // Mock Group Chat Manager // diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index 3c884076acf535232f8f4da54a59ac1cc7b4ec5e..6eb83a34d60fe5ff6c37cf08f2587ef9c04bf731 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -267,9 +267,13 @@ func (m *mockCmix) GetAddressSpace() uint8 { panic("implement me") } func (m *mockCmix) RegisterAddressSpaceNotification(string) (chan uint8, error) { panic("implement me") } -func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } -func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } -func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } +func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } +func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} //////////////////////////////////////////////////////////////////////////////// // Mock Storage Session // diff --git a/groupChat/networkManager_test.go b/groupChat/networkManager_test.go index 2cdd1a28a596b8646032417c17d8a5722d3bee94..108ae47a5c55a05292a547fd3f5b4a9d1e69053b 100644 --- a/groupChat/networkManager_test.go +++ b/groupChat/networkManager_test.go @@ -236,3 +236,8 @@ func (tnm *testNetworkManager) GetVerboseRounds() string { func (tnm *testNetworkManager) GetMaxMessageLength() int { return format.NewMessage(tnm.grp.GetP().ByteLen()).ContentsSize() } + +func (tnm *testNetworkManager) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (tnm *testNetworkManager) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} diff --git a/ud/networkManager_test.go b/ud/networkManager_test.go index 9cb6866c512b86b33dd9b9b4b8af0a1172d87389..56186c12c849eef83fde6bba26c89d8dee5b6abc 100644 --- a/ud/networkManager_test.go +++ b/ud/networkManager_test.go @@ -115,8 +115,6 @@ func (tnm *testNetworkManager) AddService(clientID *id.ID, return } -func (tnm *testNetworkManager) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) {return nil} - func (tnm *testNetworkManager) CheckInProgressMessages() { return } @@ -271,3 +269,8 @@ func (tnm *testNetworkManager) UnregisterAddressSpaceNotification(tag string) { //TODO implement me panic("implement me") } + +func (tnm *testNetworkManager) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (tnm *testNetworkManager) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} diff --git a/xxdk/cmix.go b/xxdk/cmix.go index 9ccf239dbaf6da0d00cef0bf7638090821a3ec2b..aa8fb76833fc97e2e73190294082b38a0c42d6f4 100644 --- a/xxdk/cmix.go +++ b/xxdk/cmix.go @@ -182,7 +182,7 @@ func NewProtoCmix_Unsafe(ndfJSON, storageDir string, password []byte, storageSess.SetRegistrationTimestamp(protoUser.RegistrationTimestamp) // Move the registration state to indicate registered with registration on - // roto client + // proto client err = storageSess.ForwardRegistrationStatus(storage.PermissioningComplete) if err != nil { return err @@ -484,12 +484,39 @@ func (c *Cmix) GetNodeRegistrationStatus() (int, int, error) { return numRegistered, len(nodes) - numStale, nil } -// IncreaseParallelNodeRegistration increases the number of parallel node -// registrations by num -func (c *Cmix) IncreaseParallelNodeRegistration(num int) error { - jww.INFO.Printf("IncreaseParallelNodeRegistration(%d)", num) - svc := c.network.IncreaseParallelNodeRegistration(num) - return c.followerServices.add(svc) +// IsReady returns true if at least percentReady of node registrations has +// completed. If not all have completed, then it returns false and howClose will +// be a percent (0-1) of node registrations completed. +func (c *Cmix) IsReady(percentReady float64) (isReady bool, howClose float64) { + // Check if the network is currently healthy + if !c.network.IsHealthy() { + return false, 0 + } + + numReg, numNodes, err := c.GetNodeRegistrationStatus() + if err != nil { + jww.FATAL.Panicf("Failed to get node registration status: %+v", err) + } + + isReady = (float64(numReg) / float64(numNodes)) >= percentReady + howClose = float64(numReg) / (float64(numNodes) * percentReady) + if howClose > 1 { + howClose = 1 + } + + return isReady, howClose +} + +// PauseNodeRegistrations stops all node registrations and returns a function to +// resume them. +func (c *Cmix) PauseNodeRegistrations(timeout time.Duration) error { + return c.network.PauseNodeRegistrations(timeout) +} + +// ChangeNumberOfNodeRegistrations changes the number of parallel node +// registrations up to the initialized maximum. +func (c *Cmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return c.network.ChangeNumberOfNodeRegistrations(toRun, timeout) } // GetPreferredBins returns the geographic bin or bins that the provided two diff --git a/xxdk/utilsInterfaces_test.go b/xxdk/utilsInterfaces_test.go index da4afd17210ebad04a2908f783375af35a513331..ba08622046afcee2c7268202b2ba7fd1785d1539 100644 --- a/xxdk/utilsInterfaces_test.go +++ b/xxdk/utilsInterfaces_test.go @@ -224,3 +224,7 @@ func (t *testNetworkManagerGeneric) TriggerNodeRegistration(nid *id.ID) {} func (t *testNetworkManagerGeneric) UnregisterAddressSpaceNotification( tag string) { } +func (t *testNetworkManagerGeneric) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (t *testNetworkManagerGeneric) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +}