diff --git a/bindings/follow.go b/bindings/follow.go index 6f892e4151356b6bfa0906f2f746542aaae4c99e..08a9d0bffe2ed250c076b918c76926b0e2544cfa 100644 --- a/bindings/follow.go +++ b/bindings/follow.go @@ -99,13 +99,22 @@ func (c *Cmix) ReadyToSend() bool { jww.FATAL.Panicf("Failed to get node registration status: %+v", err) } - // FIXME: This is a fix put in place because not all nodes in the NDF are - // online. This should be fixed. - total = 340 - return numReg >= total*7/10 } +// IsReadyInfo contains information on if the network is ready and how close it +// is to being ready. +// +// Example JSON: +// { +// "IsReady": true, +// "HowClose": 0.534 +// } +type IsReadyInfo struct { + IsReady bool + HowClose float64 +} + // NetworkFollowerStatus gets the state of the network follower. It returns a // status with the following values: // Stopped - 0 @@ -144,6 +153,44 @@ func (c *Cmix) GetNodeRegistrationStatus() ([]byte, error) { return json.Marshal(nodeRegReport) } +// IsReady returns true if at least percentReady of node registrations has +// completed. If not all have completed, then it returns false and howClose will +// be a percent (0-1) of node registrations completed. +// +// Parameters: +// - percentReady - The percentage of nodes required to be registered with to +// be ready. This is a number between 0 and 1. +// +// Returns: +// - JSON of [IsReadyInfo]. +func (c *Cmix) IsReady(percentReady float64) ([]byte, error) { + isReady, howClose := c.api.IsReady(percentReady) + return json.Marshal(&IsReadyInfo{isReady, howClose}) +} + +// PauseNodeRegistrations stops all node registrations and returns a function to +// resume them. +// +// Parameters: +// - timeoutMS - The timeout, in milliseconds, to wait when stopping threads +// before failing. +func (c *Cmix) PauseNodeRegistrations(timeoutMS int) error { + timeout := time.Duration(timeoutMS) * time.Millisecond + return c.api.PauseNodeRegistrations(timeout) +} + +// ChangeNumberOfNodeRegistrations changes the number of parallel node +// registrations up to the initialized maximum. +// +// Parameters: +// - toRun - The number of parallel node registrations. +// - timeoutMS - The timeout, in milliseconds, to wait when changing node +// registrations before failing. +func (c *Cmix) ChangeNumberOfNodeRegistrations(toRun, timeoutMS int) error { + timeout := time.Duration(timeoutMS) * time.Millisecond + return c.api.ChangeNumberOfNodeRegistrations(toRun, timeout) +} + // HasRunningProcessies checks if any background threads are running and returns // true if one or more are. // diff --git a/cmd/version.go b/cmd/version.go index fcfa3b927bbe2f5669ee881fce0dfd30beb3d20a..b3433a75f90c27472ac60b5a131d44170a7ce6bb 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -18,7 +18,7 @@ import ( ) // Change this value to set the version for this build -const currentVersion = "4.3.0" +const currentVersion = "4.3.2" func Version() string { out := fmt.Sprintf("Elixxir Client v%s -- %s\n\n", xxdk.SEMVER, diff --git a/cmix/client.go b/cmix/client.go index bc7eaeaecc7a6a8fdb28f189e2d9ad9488870f2d..675e44be877c277301acafbcb65320d4f5a5fa03 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -92,6 +92,8 @@ type client struct { // Storage of the max message length maxMsgLen int + + numNodes *uint64 } // NewClient builds a new reception client object using inputted key fields. @@ -103,6 +105,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, tracker := uint64(0) earliest := uint64(0) + numNodes := uint64(0) + netTime.SetTimeSource(localTime{}) // Create client object @@ -117,6 +121,7 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, maxMsgLen: tmpMsg.ContentsSize(), skewTracker: clockSkew.New(params.ClockSkewClamp), attemptTracker: attempts.NewSendAttempts(), + numNodes: &numNodes, } if params.VerboseRoundTracking { @@ -134,10 +139,20 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, // initialize turns on network handlers, initializing a host pool and // network health monitors. This should be called before // network Follow command is called. -func (c *client) initialize(ndf *ndf.NetworkDefinition) error { +func (c *client) initialize(ndfile *ndf.NetworkDefinition) error { + + //set the number of nodes + numNodes := uint64(0) + for _, n := range ndfile.Nodes { + if n.Status != ndf.Stale { + numNodes++ + } + } + atomic.StoreUint64(c.numNodes, numNodes) + // Start network instance instance, err := commNetwork.NewInstance( - c.comms.ProtoComms, ndf, nil, nil, commNetwork.None, + c.comms.ProtoComms, ndfile, nil, nil, commNetwork.None, c.param.FastPolling) if err != nil { return errors.WithMessage( @@ -145,7 +160,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { } c.instance = instance - addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size + addrSize := ndfile.AddressSpace[len(ndfile.AddressSpace)-1].Size c.Space = address.NewAddressSpace(addrSize) /* Set up modules */ @@ -165,7 +180,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { // Enable optimized HostPool initialization poolParams.MaxPings = 50 poolParams.ForceConnection = true - sender, err := gateway.NewSender(poolParams, c.rng, ndf, c.comms, + sender, err := gateway.NewSender(poolParams, c.rng, ndfile, c.comms, c.session, nodeChan) if err != nil { return err @@ -174,7 +189,9 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { // Set up the node registrar c.Registrar, err = nodes.LoadRegistrar( - c.session, c.Sender, c.comms, c.rng, nodeChan) + c.session, c.Sender, c.comms, c.rng, nodeChan, func() int { + return int(atomic.LoadUint64(c.numNodes)) + }) if err != nil { return err } diff --git a/cmix/follow.go b/cmix/follow.go index 8ac5d46841b8a61d0e9300d98375ed9adc0f7d30..87f45b1314eb91ce00b002c9fbb3f3fc9a0da707 100644 --- a/cmix/follow.go +++ b/cmix/follow.go @@ -27,6 +27,7 @@ import ( "encoding/binary" "fmt" "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/xx_network/primitives/ndf" "sync" "sync/atomic" "time" @@ -258,6 +259,15 @@ func (c *client) follow(identity receptionID.IdentityUse, return } + //set the number of nodes + numNodes := uint64(0) + for _, n := range c.instance.GetPartialNdf().Get().Nodes { + if n.Status != ndf.Stale { + numNodes++ + } + } + atomic.StoreUint64(c.numNodes, numNodes) + // update gateway connections c.UpdateNdf(c.GetInstance().GetPartialNdf().Get()) c.session.SetNDF(c.GetInstance().GetPartialNdf().Get()) diff --git a/cmix/interface.go b/cmix/interface.go index 1d0bfdaeb09c25ab4b26018233ab0b8b52e93964..8fd4d51dbcc8e73513bb048ab2933397d2235f13 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -191,6 +191,14 @@ type Client interface { AddService(clientID *id.ID, newService message.Service, response message.Processor) + // PauseNodeRegistrations stops all node registrations and returns a + // function to resume them. + PauseNodeRegistrations(timeout time.Duration) error + + // ChangeNumberOfNodeRegistrations changes the number of parallel node + // registrations up to the initialized maximum. + ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error + // DeleteService deletes a message service. If only a single response is // associated with the preimage, the entire preimage is removed. If there is // more than one response, only the given response is removed. If nil is diff --git a/cmix/nodes/interfaces.go b/cmix/nodes/interfaces.go index 5d08e6db9a6c97712621ed02ced970e8344e1f00..77520a2ec0c74b7d5903b92889450f3dfbf32671 100644 --- a/cmix/nodes/interfaces.go +++ b/cmix/nodes/interfaces.go @@ -27,6 +27,14 @@ type Registrar interface { // to register with nodes. StartProcesses(numParallel uint) stoppable.Stoppable + //PauseNodeRegistrations stops all node registrations + //and returns a function to resume them + PauseNodeRegistrations(timeout time.Duration) error + + // ChangeNumberOfNodeRegistrations changes the number of parallel node + // registrations up to the initialized maximum + ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error + // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did // not have a key for. If there are missing keys, then returns nil. GetNodeKeys(topology *connect.Circuit) (MixCypher, error) diff --git a/cmix/nodes/register.go b/cmix/nodes/register.go index 8cada7d120e663cf9ef075d11fe72c3c409c7859..f446c80a328ec966c7a101d5b83cc53943652c3c 100644 --- a/cmix/nodes/register.go +++ b/cmix/nodes/register.go @@ -13,6 +13,7 @@ import ( "gitlab.com/xx_network/crypto/csprng" "strconv" "sync" + "sync/atomic" "time" "github.com/pkg/errors" @@ -31,16 +32,24 @@ import ( // before an interruption and how many registration attempts have // been attempted. func registerNodes(r *registrar, s session, stop *stoppable.Single, - inProgress, attempts *sync.Map) { + inProgress, attempts *sync.Map, index int) { + + atomic.AddInt64(r.numberRunning, 1) - interval := time.Duration(500) * time.Millisecond - t := time.NewTicker(interval) for { select { + case <-r.pauser: + atomic.AddInt64(r.numberRunning, -1) + select { + case <-stop.Quit(): + stop.ToStopped() + return + case <-r.resumer: + atomic.AddInt64(r.numberRunning, 1) + } case <-stop.Quit(): - // On a stop signal, close the thread - t.Stop() stop.ToStopped() + atomic.AddInt64(r.numberRunning, -1) return case gw := <-r.c: @@ -107,7 +116,13 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single, } } rng.Close() - case <-t.C: + } + if index >= 2 { + if float64(r.NumRegisteredNodes()) > (float64(r.numnodesGetter()) * .7) { + <-stop.Quit() + stop.ToStopped() + return + } } } } diff --git a/cmix/nodes/registrar.go b/cmix/nodes/registrar.go index 5ca5f108d201de75fcb739c43ea1bf2cea24951b..51a3b7d1d0d1c99f9d1bd0ccf8aec37102233ca9 100644 --- a/cmix/nodes/registrar.go +++ b/cmix/nodes/registrar.go @@ -20,11 +20,12 @@ import ( "gitlab.com/xx_network/primitives/ndf" "strconv" "sync" + "sync/atomic" "time" ) const InputChanLen = 1000 -const maxAttempts = 2 +const maxAttempts = 5 // Backoff for attempting to register with a cMix node. var delayTable = [5]time.Duration{ @@ -46,6 +47,20 @@ type registrar struct { comms RegisterNodeCommsInterface rng *fastRNG.StreamGenerator + inProgress sync.Map + // We are relying on the in progress check to ensure there is only a single + // operator at a time, as a result this is a map of ID -> int + attempts sync.Map + + pauser chan interface{} + resumer chan interface{} + numberRunning *int64 + maxRunning int + + runnerLock sync.Mutex + + numnodesGetter func() int + c chan network.NodeGateway } @@ -53,12 +68,18 @@ type registrar struct { // exist. func LoadRegistrar(session session, sender gateway.Sender, comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator, - c chan network.NodeGateway) (Registrar, error) { + c chan network.NodeGateway, numNodesGetter func() int) (Registrar, error) { + + running := int64(0) kv := session.GetKV().Prefix(prefix) r := ®istrar{ - nodes: make(map[id.ID]*key), - kv: kv, + nodes: make(map[id.ID]*key), + kv: kv, + pauser: make(chan interface{}), + resumer: make(chan interface{}), + numberRunning: &running, + numnodesGetter: numNodesGetter, } obj, err := kv.Get(storeKey, currentKeyVersion) @@ -89,24 +110,79 @@ func LoadRegistrar(session session, sender gateway.Sender, // StartProcesses initiates numParallel amount of threads // to register with nodes. func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { - multi := stoppable.NewMulti("NodeRegistrations") - - inProgress := &sync.Map{} + r.runnerLock.Lock() + defer r.runnerLock.Unlock() - // We are relying on the in progress check to ensure there is only a single - // operator at a time, as a result this is a map of ID -> int - attempts := &sync.Map{} + multi := stoppable.NewMulti("NodeRegistrations") + r.maxRunning = int(numParallel) for i := uint(0); i < numParallel; i++ { stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i))) - go registerNodes(r, r.session, stop, inProgress, attempts) + go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts, int(i)) multi.Add(stop) } return multi } +//PauseNodeRegistrations stops all node registrations +//and returns a function to resume them +func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error { + r.runnerLock.Lock() + defer r.runnerLock.Unlock() + timer := time.NewTimer(timeout) + defer timer.Stop() + numRegistrations := atomic.LoadInt64(r.numberRunning) + jww.INFO.Printf("PauseNodeRegistrations() - Pausing %d registrations", numRegistrations) + for i := int64(0); i < numRegistrations; i++ { + select { + case r.pauser <- struct{}{}: + case <-timer.C: + return errors.Errorf("Timed out on pausing node registration on %d", i) + } + } + + return nil +} + +// ChangeNumberOfNodeRegistrations changes the number of parallel node +// registrations up to the initialized maximum +func (r *registrar) ChangeNumberOfNodeRegistrations(toRun int, + timeout time.Duration) error { + r.runnerLock.Lock() + defer r.runnerLock.Unlock() + numRunning := int(atomic.LoadInt64(r.numberRunning)) + if toRun+numRunning > r.maxRunning { + return errors.Errorf("Cannot change number of " + + "running node registration to number greater than the max") + } + timer := time.NewTimer(timeout) + defer timer.Stop() + if numRunning < toRun { + jww.INFO.Printf("ChangeNumberOfNodeRegistrations(%d) Reducing number "+ + "of node registrations from %d to %d", toRun, numRunning, toRun) + for i := 0; i < toRun-numRunning; i++ { + select { + case r.pauser <- struct{}{}: + case <-timer.C: + return errors.New("Timed out on reducing node registration") + } + } + } else if numRunning > toRun { + jww.INFO.Printf("ChangeNumberOfNodeRegistrations(%d) Increasing number "+ + "of node registrations from %d to %d", toRun, numRunning, toRun) + for i := 0; i < toRun-numRunning; i++ { + select { + case r.resumer <- struct{}{}: + case <-timer.C: + return errors.New("Timed out on increasing node registration") + } + } + } + return nil +} + // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did // not have a key for. If there are missing keys, then returns nil. func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) { diff --git a/cmix/nodes/registrar_test.go b/cmix/nodes/registrar_test.go index 922adfad68bc2b52f999a92561c834803be4f16b..55b684495decc278b4eb3147c3b0398676879901 100644 --- a/cmix/nodes/registrar_test.go +++ b/cmix/nodes/registrar_test.go @@ -38,7 +38,7 @@ func TestLoadRegistrar_New(t *testing.T) { nodeChan := make(chan commNetwork.NodeGateway, InputChanLen) r, err := LoadRegistrar(session, sender, &MockClientComms{}, - rngGen, nodeChan) + rngGen, nodeChan, func() int { return 100 }) if err != nil { t.Fatalf("Failed to create new registrar: %+v", err) } @@ -71,7 +71,7 @@ func TestLoadRegistrar_Load(t *testing.T) { // Load the store and check its attributes r, err := LoadRegistrar( - testR.session, testR.sender, testR.comms, testR.rng, testR.c) + testR.session, testR.sender, testR.comms, testR.rng, testR.c, func() int { return 100 }) if err != nil { t.Fatalf("Unable to load store: %+v", err) } diff --git a/cmix/nodes/utils_test.go b/cmix/nodes/utils_test.go index 96eacda72794e3f66c1b4513e7f359624cf0cbe1..9f9657db116623508c86db323425feaf293982a9 100644 --- a/cmix/nodes/utils_test.go +++ b/cmix/nodes/utils_test.go @@ -101,7 +101,7 @@ func makeTestRegistrar(mockComms *MockClientComms, t *testing.T) *registrar { nodeChan := make(chan commNetwork.NodeGateway, InputChanLen) r, err := LoadRegistrar( - session, sender, mockComms, rngGen, nodeChan) + session, sender, mockComms, rngGen, nodeChan, func() int { return 100 }) if err != nil { t.Fatalf("Failed to create new registrar: %+v", err) } diff --git a/cmix/parallelNodeRegistrations.go b/cmix/parallelNodeRegistrations.go new file mode 100644 index 0000000000000000000000000000000000000000..48f232901263118fffdc12c7c1ec16d3a2f35aa0 --- /dev/null +++ b/cmix/parallelNodeRegistrations.go @@ -0,0 +1,13 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +// This file is compiled for all architectures except WebAssembly. +//go:build !js || !wasm + +package cmix + +const defaultParallelNodeRegistration = 20 diff --git a/cmix/parallelNodeRegistrations_js.go b/cmix/parallelNodeRegistrations_js.go new file mode 100644 index 0000000000000000000000000000000000000000..f30ec9f44a988a1bfc14e4d647d2c26c8bfd12b9 --- /dev/null +++ b/cmix/parallelNodeRegistrations_js.go @@ -0,0 +1,3 @@ +package cmix + +const defaultParallelNodeRegistration = 12 diff --git a/cmix/params.go b/cmix/params.go index 3ef813f0d9ca6e7903696f83cb6fea72e62a7962..ec4148ea782f62079898dc829f0b28a495cd9032 100644 --- a/cmix/params.go +++ b/cmix/params.go @@ -100,13 +100,13 @@ func GetDefaultParams() Params { MaxCheckedRounds: 500, RegNodesBufferLen: 1000, NetworkHealthTimeout: 15 * time.Second, - ParallelNodeRegistrations: 20, + ParallelNodeRegistrations: defaultParallelNodeRegistration, KnownRoundsThreshold: 1500, // 5 rounds/sec * 60 sec/min * 5 min FastPolling: true, VerboseRoundTracking: false, RealtimeOnly: false, ReplayRequests: true, - MaxParallelIdentityTracks: 20, + MaxParallelIdentityTracks: 5, ClockSkewClamp: 50 * time.Millisecond, } n.Rounds = rounds.GetDefaultParams() diff --git a/cmix/pickup/params.go b/cmix/pickup/params.go index 84754d38f4f741350da8c37d0a6f708e3e41bcee..bdddf820e786caebc3c8912fb7a33786eb022ce0 100644 --- a/cmix/pickup/params.go +++ b/cmix/pickup/params.go @@ -57,7 +57,7 @@ type paramsDisk struct { // GetDefaultParams returns a default set of Params. func GetDefaultParams() Params { return Params{ - NumMessageRetrievalWorkers: 8, + NumMessageRetrievalWorkers: 4, LookupRoundsBufferLen: 2000, MaxHistoricalRoundsRetries: 3, UncheckRoundPeriod: 20 * time.Second, diff --git a/connect/utils_test.go b/connect/utils_test.go index 3aab375cca15e1385a2fa7c621751a0ab508d0cf..edbf629f8a6f7dccb89d5fe791c2239f398bea1f 100644 --- a/connect/utils_test.go +++ b/connect/utils_test.go @@ -184,17 +184,20 @@ func (m *mockCmix) AddFingerprint(*id.ID, format.Fingerprint, message.Processor) func (m *mockCmix) DeleteFingerprint(*id.ID, format.Fingerprint) {} func (m *mockCmix) DeleteClientFingerprints(*id.ID) {} func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) {} -func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) {} -func (m *mockCmix) DeleteClientService(*id.ID) {} -func (m *mockCmix) TrackServices(message.ServicesTracker) {} -func (m *mockCmix) CheckInProgressMessages() {} -func (m *mockCmix) IsHealthy() bool { return true } -func (m *mockCmix) WasHealthy() bool { return true } -func (m *mockCmix) AddHealthCallback(func(bool)) uint64 { return 0 } -func (m *mockCmix) RemoveHealthCallback(uint64) {} -func (m *mockCmix) HasNode(*id.ID) bool { return true } -func (m *mockCmix) NumRegisteredNodes() int { return 24 } -func (m *mockCmix) TriggerNodeRegistration(*id.ID) {} +func (m *mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + return nil +} +func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) {} +func (m *mockCmix) DeleteClientService(*id.ID) {} +func (m *mockCmix) TrackServices(message.ServicesTracker) {} +func (m *mockCmix) CheckInProgressMessages() {} +func (m *mockCmix) IsHealthy() bool { return true } +func (m *mockCmix) WasHealthy() bool { return true } +func (m *mockCmix) AddHealthCallback(func(bool)) uint64 { return 0 } +func (m *mockCmix) RemoveHealthCallback(uint64) {} +func (m *mockCmix) HasNode(*id.ID) bool { return true } +func (m *mockCmix) NumRegisteredNodes() int { return 24 } +func (m *mockCmix) TriggerNodeRegistration(*id.ID) {} func (m *mockCmix) GetRoundResults(_ time.Duration, roundCallback cmix.RoundEventCallback, _ ...id.Round) { roundCallback(true, false, nil) @@ -214,6 +217,10 @@ func (m *mockCmix) RegisterAddressSpaceNotification(string) (chan uint8, error) func (m *mockCmix) UnregisterAddressSpaceNotification(string) {} func (m *mockCmix) GetInstance() *network.Instance { return m.instance } func (m *mockCmix) GetVerboseRounds() string { return "" } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} //////////////////////////////////////////////////////////////////////////////// // Misc set-up utils // diff --git a/dummy/manager.go b/dummy/manager.go index 83feb81d324b560dc3850469f0a1acc2c852e52c..4a7d19f9e1766dfd1dcf1db7e5f2134c7df646a8 100644 --- a/dummy/manager.go +++ b/dummy/manager.go @@ -61,6 +61,8 @@ type Manager struct { // Pauses/Resumes the dummy send thread when triggered statusChan chan bool + totalSent *uint64 + // Interfaces net cmix.Client store storage.Session @@ -96,6 +98,7 @@ func NewManager(maxNumMessages int, // function is a helper function for NewManager. func newManager(maxNumMessages int, avgSendDelta, randomRange time.Duration, net cmix.Client, store storage.Session, rng *fastRNG.StreamGenerator) *Manager { + numSent := uint64(8) return &Manager{ maxNumMessages: maxNumMessages, avgSendDelta: avgSendDelta, @@ -105,6 +108,7 @@ func newManager(maxNumMessages int, avgSendDelta, randomRange time.Duration, net: net, store: store, rng: rng, + totalSent: &numSent, } } diff --git a/dummy/manager_test.go b/dummy/manager_test.go index c36b2a596292f35104e69be797ef999f3f3ada3e..13b46275a85429bc4a542a91f9cea58d7fa7bc84 100644 --- a/dummy/manager_test.go +++ b/dummy/manager_test.go @@ -35,6 +35,7 @@ func Test_newManager(t *testing.T) { statusChanLen, cap(received.statusChan)) } received.statusChan = expected.statusChan + received.totalSent = nil if !reflect.DeepEqual(expected, received) { t.Errorf("New manager does not match expected."+ diff --git a/dummy/mockCmix_test.go b/dummy/mockCmix_test.go index 75cdee581be7e2b08de783f8e27d9d58de078093..34b7d5b36dc41cccf13a2737b20b7c5ea37636e1 100644 --- a/dummy/mockCmix_test.go +++ b/dummy/mockCmix_test.go @@ -126,6 +126,11 @@ func (m mockCmix) AddService(clientID *id.ID, newService message.Service, respon panic("implement me") } +func (m mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + //TODO implement me + panic("implement me") +} + func (m mockCmix) DeleteService(clientID *id.ID, toDelete message.Service, processor message.Processor) { //TODO implement me panic("implement me") @@ -235,3 +240,7 @@ func (m mockCmix) GetVerboseRounds() string { //TODO implement me panic("implement me") } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} diff --git a/dummy/send.go b/dummy/send.go index 77b0f4e7de569e36b985c60521c6083b40006ee3..c35910feeebf0272ff6d27c3ab67c069dd431883 100644 --- a/dummy/send.go +++ b/dummy/send.go @@ -21,7 +21,12 @@ import ( // Error messages for the Manager.sendThread and its helper functions. const ( - numMsgsRngErr = "failed to generate random number of messages to send: %+v" + numMsgsRngErr = "failed to generate random number of messages to send: %+v" + overrideAvgSendDelta = 10 * time.Minute + overrideRandomRange = 8 * time.Minute + overrideMaxNumMessages = 2 + + numSendsToOverride = 20 ) // sendThread is a thread that sends the dummy messages at random intervals. @@ -32,6 +37,13 @@ func (m *Manager) sendThread(stop *stoppable.Single) { nextSendChanPtr := &(nextSendChan) for { + + if numSent := atomic.LoadUint64(m.totalSent); numSent > numSendsToOverride { + m.avgSendDelta = overrideAvgSendDelta + m.randomRange = overrideRandomRange + m.maxNumMessages = overrideMaxNumMessages + } + select { case status := <-m.statusChan: if status { @@ -71,6 +83,8 @@ func (m *Manager) sendThread(stop *stoppable.Single) { err := m.sendMessages() if err != nil { jww.ERROR.Printf("Failed to send dummy messages: %+v", err) + } else { + atomic.AddUint64(m.totalSent, 1) } }() case <-stop.Quit(): diff --git a/dummy/utils_test.go b/dummy/utils_test.go index 4ddccebf7cf5ccb4371fa31f6811bb319612fac1..4e787f88699de65ecc9878a95db0609649c7b4cd 100644 --- a/dummy/utils_test.go +++ b/dummy/utils_test.go @@ -40,6 +40,7 @@ func newTestManager(maxNumMessages int, avgSendDelta, randomRange time.Duration, t *testing.T) *Manager { store := storage.InitTestingSession(t) payloadSize := store.GetCmixGroup().GetP().ByteLen() + n := uint64(0) m := &Manager{ maxNumMessages: maxNumMessages, avgSendDelta: avgSendDelta, @@ -48,6 +49,7 @@ func newTestManager(maxNumMessages int, avgSendDelta, randomRange time.Duration, store: store, net: newMockCmix(payloadSize), rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), + totalSent: &n, } return m diff --git a/e2e/fpGenerator_test.go b/e2e/fpGenerator_test.go index 657fc16462548659dcd8e409c5acb6da928ffeef..75b03c161d502bef8bb21a84ab681af0c3f47b23 100644 --- a/e2e/fpGenerator_test.go +++ b/e2e/fpGenerator_test.go @@ -159,8 +159,11 @@ func (m *mockFpgCmix) DeleteFingerprint(uid *id.ID, fp format.Fingerprint) { } } -func (m *mockFpgCmix) DeleteClientFingerprints(*id.ID) {} -func (m *mockFpgCmix) AddService(*id.ID, message.Service, message.Processor) {} +func (m *mockFpgCmix) DeleteClientFingerprints(*id.ID) {} +func (m *mockFpgCmix) AddService(*id.ID, message.Service, message.Processor) {} +func (m *mockFpgCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + return nil +} func (m *mockFpgCmix) DeleteService(*id.ID, message.Service, message.Processor) {} func (m *mockFpgCmix) DeleteClientService(*id.ID) {} func (m *mockFpgCmix) TrackServices(message.ServicesTracker) {} @@ -188,3 +191,7 @@ func (m *mockFpgCmix) RegisterAddressSpaceNotification(string) (chan uint8, erro func (m *mockFpgCmix) UnregisterAddressSpaceNotification(string) {} func (m *mockFpgCmix) GetInstance() *network.Instance { return nil } func (m *mockFpgCmix) GetVerboseRounds() string { return "" } +func (m *mockFpgCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockFpgCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} diff --git a/e2e/rekey/utils_test.go b/e2e/rekey/utils_test.go index aeccacbe316832b436901c654981a6d6fc1138e5..08c7ec154ed2b8b85ee900a3d52a3f7475964e65 100644 --- a/e2e/rekey/utils_test.go +++ b/e2e/rekey/utils_test.go @@ -222,7 +222,7 @@ func (m mockServiceHandler) DeleteService(clientID *id.ID, toDelete message.Serv type mockNetManager struct{} func (m *mockNetManager) GetIdentity(get *id.ID) (identity.TrackedID, error) { - //TODO implement me + // TODO implement me panic("implement me") } @@ -270,6 +270,11 @@ func (m *mockNetManager) AddService(clientID *id.ID, newService message.Service, response message.Processor) { } +func (m *mockNetManager) IncreaseParallelNodeRegistration( + int) func() (stoppable.Stoppable, error) { + return nil +} + func (m *mockNetManager) DeleteService(clientID *id.ID, toDelete message.Service, processor message.Processor) { } @@ -346,3 +351,8 @@ func (m *mockNetManager) GetInstance() *network2.Instance { func (m *mockNetManager) GetVerboseRounds() string { return "" } + +func (m *mockNetManager) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockNetManager) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} diff --git a/e2e/utils_test.go b/e2e/utils_test.go index 78a0eaa7c57a7cd0520f42c260e3968fdbf6ff6a..52579e52870adbdfeac35087e9cf6558a60549d8 100644 --- a/e2e/utils_test.go +++ b/e2e/utils_test.go @@ -239,7 +239,9 @@ func (m *mockCmix) AddService(myId *id.ID, srv message.Service, proc message.Pro m.handler.Unlock() } - +func (m *mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + return nil +} func (m *mockCmix) DeleteClientFingerprints(*id.ID) {} func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) {} func (m *mockCmix) DeleteClientService(*id.ID) {} @@ -268,6 +270,10 @@ func (m *mockCmix) RegisterAddressSpaceNotification(string) (chan uint8, error) func (m *mockCmix) UnregisterAddressSpaceNotification(string) { return } func (m *mockCmix) GetInstance() *network.Instance { return m.instance } func (m *mockCmix) GetVerboseRounds() string { return "" } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} //////////////////////////////////////////////////////////////////////////////// // NDF // diff --git a/fileTransfer/connect/utils_test.go b/fileTransfer/connect/utils_test.go index b707e548ec2bbe96d43b9d7d3fa6f19205bee424..eb365c0ec7b3737532c33d776e570dc1ef2bb5c5 100644 --- a/fileTransfer/connect/utils_test.go +++ b/fileTransfer/connect/utils_test.go @@ -158,8 +158,11 @@ func (m *mockCmix) DeleteFingerprint(_ *id.ID, fp format.Fingerprint) { m.handler.Unlock() } -func (m *mockCmix) DeleteClientFingerprints(*id.ID) { panic("implement me") } -func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) { panic("implement me") } +func (m *mockCmix) DeleteClientFingerprints(*id.ID) { panic("implement me") } +func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) { panic("implement me") } +func (m *mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + panic("implement me") +} func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) { panic("implement me") } func (m *mockCmix) DeleteClientService(*id.ID) { panic("implement me") } func (m *mockCmix) TrackServices(message.ServicesTracker) { panic("implement me") } @@ -211,9 +214,13 @@ func (m *mockCmix) GetAddressSpace() uint8 { panic("implement me") } func (m *mockCmix) RegisterAddressSpaceNotification(string) (chan uint8, error) { panic("implement me") } -func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } -func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } -func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } +func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } +func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} //////////////////////////////////////////////////////////////////////////////// // Mock Connection Handler // diff --git a/fileTransfer/e2e/utils_test.go b/fileTransfer/e2e/utils_test.go index a7f50a19525cd2360dfb698fbca8281f2572d520..7635d9a88e32e9a6cb2a719bd6435813a210d804 100644 --- a/fileTransfer/e2e/utils_test.go +++ b/fileTransfer/e2e/utils_test.go @@ -160,8 +160,11 @@ func (m *mockCmix) DeleteFingerprint(_ *id.ID, fp format.Fingerprint) { m.handler.Unlock() } -func (m *mockCmix) DeleteClientFingerprints(*id.ID) { panic("implement me") } -func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) { panic("implement me") } +func (m *mockCmix) DeleteClientFingerprints(*id.ID) { panic("implement me") } +func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) { panic("implement me") } +func (m *mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + panic("implement me") +} func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) { panic("implement me") } func (m *mockCmix) DeleteClientService(*id.ID) { panic("implement me") } func (m *mockCmix) TrackServices(message.ServicesTracker) { panic("implement me") } @@ -217,6 +220,11 @@ func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} + //////////////////////////////////////////////////////////////////////////////// // Mock E2E Handler // //////////////////////////////////////////////////////////////////////////////// diff --git a/fileTransfer/groupChat/utils_test.go b/fileTransfer/groupChat/utils_test.go index a616d65c19783c0bfd9789fb0e3399a61a253656..ee03acb237a2c83c7703448ae0a412bf2d5f8748 100644 --- a/fileTransfer/groupChat/utils_test.go +++ b/fileTransfer/groupChat/utils_test.go @@ -155,8 +155,11 @@ func (m *mockCmix) DeleteFingerprint(_ *id.ID, fp format.Fingerprint) { m.handler.Unlock() } -func (m *mockCmix) DeleteClientFingerprints(*id.ID) { panic("implement me") } -func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) { panic("implement me") } +func (m *mockCmix) DeleteClientFingerprints(*id.ID) { panic("implement me") } +func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) { panic("implement me") } +func (m *mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + panic("implement me") +} func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) { panic("implement me") } func (m *mockCmix) DeleteClientService(*id.ID) { panic("implement me") } func (m *mockCmix) TrackServices(message.ServicesTracker) { panic("implement me") } @@ -208,9 +211,13 @@ func (m *mockCmix) GetAddressSpace() uint8 { panic("implement me") } func (m *mockCmix) RegisterAddressSpaceNotification(string) (chan uint8, error) { panic("implement me") } -func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } -func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } -func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } +func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } +func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} //////////////////////////////////////////////////////////////////////////////// // Mock Group Chat Manager // diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index daba7a06489ec8dac51f9a016d304f779ae94ec8..6eb83a34d60fe5ff6c37cf08f2587ef9c04bf731 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -211,8 +211,11 @@ func (m *mockCmix) DeleteFingerprint(_ *id.ID, fp format.Fingerprint) { delete(m.handler.processorMap, fp) } -func (m *mockCmix) DeleteClientFingerprints(*id.ID) { panic("implement me") } -func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) { panic("implement me") } +func (m *mockCmix) DeleteClientFingerprints(*id.ID) { panic("implement me") } +func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) { panic("implement me") } +func (m *mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + return nil +} func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) { panic("implement me") } func (m *mockCmix) DeleteClientService(*id.ID) { panic("implement me") } func (m *mockCmix) TrackServices(message.ServicesTracker) { panic("implement me") } @@ -264,9 +267,13 @@ func (m *mockCmix) GetAddressSpace() uint8 { panic("implement me") } func (m *mockCmix) RegisterAddressSpaceNotification(string) (chan uint8, error) { panic("implement me") } -func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } -func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } -func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) UnregisterAddressSpaceNotification(string) { panic("implement me") } +func (m *mockCmix) GetInstance() *network.Instance { panic("implement me") } +func (m *mockCmix) GetVerboseRounds() string { panic("implement me") } +func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} //////////////////////////////////////////////////////////////////////////////// // Mock Storage Session // diff --git a/go.mod b/go.mod index 0a4656a2958ba3c9f0297e4c691293f0b68826e0..d038a54e8c0e46b1f1bbdac3991b4a9c3da46368 100644 --- a/go.mod +++ b/go.mod @@ -14,11 +14,11 @@ require ( github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.8.0 gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f - gitlab.com/elixxir/comms v0.0.4-0.20221025202829-c33bbe4d8838 - gitlab.com/elixxir/crypto v0.0.7-0.20221025180839-3d8948607447 - gitlab.com/elixxir/ekv v0.2.2-0.20221026171556-67265b7e9f6a + gitlab.com/elixxir/comms v0.0.4-0.20221027214216-31527f2bb34c + gitlab.com/elixxir/crypto v0.0.7-0.20221027040529-dbcb6eb2b087 + gitlab.com/elixxir/ekv v0.2.1 gitlab.com/elixxir/primitives v0.0.3-0.20221025020430-f5d2eb330fbc - gitlab.com/xx_network/comms v0.0.4-0.20221025202603-3043ba10b2a2 + gitlab.com/xx_network/comms v0.0.4-0.20221027213956-c8a8cf68ee4f gitlab.com/xx_network/crypto v0.0.5-0.20221025020316-517fa8f91d2c gitlab.com/xx_network/primitives v0.0.4-0.20221025020003-cbec15a71b8f go.uber.org/ratelimit v0.2.0 @@ -29,7 +29,7 @@ require ( ) require ( - git.xx.network/elixxir/grpc-web-go-client v0.0.0-20220908170150-ef04339ffe65 // indirect + git.xx.network/elixxir/grpc-web-go-client v0.0.0-20221027213839-7343cd399f14 // indirect github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect github.com/badoux/checkmail v1.2.1 // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect diff --git a/go.sum b/go.sum index 6dcaa511f5c9a104dee64ebb623221ea57efdefa..e4cc4801afa0770c2ff142630e1d046f3931a6e6 100644 --- a/go.sum +++ b/go.sum @@ -54,8 +54,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -git.xx.network/elixxir/grpc-web-go-client v0.0.0-20220908170150-ef04339ffe65 h1:ksB3ZiMeFplqlaCjMDqKegbGzDZdS5pU0Z5GgFeBdZ0= -git.xx.network/elixxir/grpc-web-go-client v0.0.0-20220908170150-ef04339ffe65/go.mod h1:uFKw2wmgtlYMdiIm08dM0Vj4XvX9ZKVCj71c8O7SAPo= +git.xx.network/elixxir/grpc-web-go-client v0.0.0-20221027213839-7343cd399f14 h1:D2yUHVsLgCmHrWA7yDTw9+VhQGWsG8+MOuaxU+TVrho= +git.xx.network/elixxir/grpc-web-go-client v0.0.0-20221027213839-7343cd399f14/go.mod h1:uFKw2wmgtlYMdiIm08dM0Vj4XvX9ZKVCj71c8O7SAPo= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= @@ -631,16 +631,14 @@ github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo= github.com/zeebo/pcg v1.0.1/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f h1:yXGvNBqzZwAhDYlSnxPRbgor6JWoOt1Z7s3z1O9JR40= gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f/go.mod h1:H6jztdm0k+wEV2QGK/KYA+MY9nj9Zzatux/qIvDDv3k= -gitlab.com/elixxir/comms v0.0.4-0.20221025202829-c33bbe4d8838 h1:KCoOeLJxIsX5ZEzqXMJzw2Ph9bI2vqy8LDYk0Y7mrjM= -gitlab.com/elixxir/comms v0.0.4-0.20221025202829-c33bbe4d8838/go.mod h1:GJD9nek4eiKePSmSA0EwS1AoBunn0+N/Zm30Pu0eug4= +gitlab.com/elixxir/comms v0.0.4-0.20221027214216-31527f2bb34c h1:Xqi7zsiet2yvquYc6WuQ3fcpOAUYwHnigCRLH/e5H6I= +gitlab.com/elixxir/comms v0.0.4-0.20221027214216-31527f2bb34c/go.mod h1:TkgvoInYGMz9x2FSv9rZHsfJ54/A30uWQuQ1AIx8sQI= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp07WQmhA= -gitlab.com/elixxir/crypto v0.0.7-0.20221025180839-3d8948607447 h1:hFOOOGiVB9XtzSM//aClSJ+5nf4JSmmCkoWcc635W8I= -gitlab.com/elixxir/crypto v0.0.7-0.20221025180839-3d8948607447/go.mod h1:NImDa7951+jSolkYN/BBUm6qG6f+k0hsFZOCIxBlLhE= +gitlab.com/elixxir/crypto v0.0.7-0.20221027040529-dbcb6eb2b087 h1:e9cVSSFMmvnTj4PVRa1kCKhwWCekMNTEbeUply4NUGc= +gitlab.com/elixxir/crypto v0.0.7-0.20221027040529-dbcb6eb2b087/go.mod h1:NImDa7951+jSolkYN/BBUm6qG6f+k0hsFZOCIxBlLhE= gitlab.com/elixxir/ekv v0.2.1 h1:dtwbt6KmAXG2Tik5d60iDz2fLhoFBgWwST03p7T+9Is= gitlab.com/elixxir/ekv v0.2.1/go.mod h1:USLD7xeDnuZEavygdrgzNEwZXeLQJK/w1a+htpN+JEU= -gitlab.com/elixxir/ekv v0.2.2-0.20221026171556-67265b7e9f6a h1:E3bzKZcAxBS9UaQFcpXvVS/IQaEG1C2F/O/H9lNjqKc= -gitlab.com/elixxir/ekv v0.2.2-0.20221026171556-67265b7e9f6a/go.mod h1:InofHM7VvqerATEZqhEvfALSI/0zlQFI7ON9KWHNHjc= gitlab.com/elixxir/primitives v0.0.0-20200731184040-494269b53b4d/go.mod h1:OQgUZq7SjnE0b+8+iIAT2eqQF+2IFHn73tOo+aV11mg= gitlab.com/elixxir/primitives v0.0.0-20200804170709-a1896d262cd9/go.mod h1:p0VelQda72OzoUckr1O+vPW0AiFe0nyKQ6gYcmFSuF8= gitlab.com/elixxir/primitives v0.0.0-20200804182913-788f47bded40/go.mod h1:tzdFFvb1ESmuTCOl1z6+yf6oAICDxH2NPUemVgoNLxc= @@ -648,8 +646,8 @@ gitlab.com/elixxir/primitives v0.0.1/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2Y gitlab.com/elixxir/primitives v0.0.3-0.20221025020430-f5d2eb330fbc h1:o/qBYkVm9hBCgsA01y/8iwZN9z2ODC8jVrPQF2ZB1I0= gitlab.com/elixxir/primitives v0.0.3-0.20221025020430-f5d2eb330fbc/go.mod h1:2JdloMJCUyXaaGcIJ4ogKrjlI48Nop0Zuzkk3edhIZU= gitlab.com/xx_network/comms v0.0.0-20200805174823-841427dd5023/go.mod h1:owEcxTRl7gsoM8c3RQ5KAm5GstxrJp5tn+6JfQ4z5Hw= -gitlab.com/xx_network/comms v0.0.4-0.20221025202603-3043ba10b2a2 h1:jfO+F6gnS8jtX/1BJ7quYhIp8zp3/wYQZr6r3+5xODI= -gitlab.com/xx_network/comms v0.0.4-0.20221025202603-3043ba10b2a2/go.mod h1:iYiRy1i4+dPlsmBtAfaCmw0QFXWv5pn4WY26g/x3ZPA= +gitlab.com/xx_network/comms v0.0.4-0.20221027213956-c8a8cf68ee4f h1:gwhm0a9S9wk5M79Z99xiVu5XAQu9JDe3Vb1jjru+Hd4= +gitlab.com/xx_network/comms v0.0.4-0.20221027213956-c8a8cf68ee4f/go.mod h1:/RX6ZuRlQQL0IVnMdkgsS9RyXkVeNJcBDpAeEPUZqA4= gitlab.com/xx_network/crypto v0.0.3/go.mod h1:DF2HYvvCw9wkBybXcXAgQMzX+MiGbFPjwt3t17VRqRE= gitlab.com/xx_network/crypto v0.0.4/go.mod h1:+lcQEy+Th4eswFgQDwT0EXKp4AXrlubxalwQFH5O0Mk= gitlab.com/xx_network/crypto v0.0.5-0.20221025020316-517fa8f91d2c h1:MR2dwUhl3BFZRNkvd+UP1Ymiu+pxbAv5wk6OIsOXKvU= diff --git a/groupChat/networkManager_test.go b/groupChat/networkManager_test.go index 934744be0b08341ba95cc8e4e4af25bbcb5bae0d..108ae47a5c55a05292a547fd3f5b4a9d1e69053b 100644 --- a/groupChat/networkManager_test.go +++ b/groupChat/networkManager_test.go @@ -66,7 +66,10 @@ func (tnm *testNetworkManager) SendMany(messages []cmix.TargetedCmixMessage, _ c return rounds.Round{}, nil, nil } -func (*testNetworkManager) AddService(*id.ID, message.Service, message.Processor) {} +func (*testNetworkManager) AddService(*id.ID, message.Service, message.Processor) {} +func (*testNetworkManager) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + return nil +} func (*testNetworkManager) DeleteService(*id.ID, message.Service, message.Processor) {} ///////////////////////////////////////////////////////////////////////////////////// @@ -74,162 +77,167 @@ func (*testNetworkManager) DeleteService(*id.ID, message.Service, message.Proces ///////////////////////////////////////////////////////////////////////////////////// func (tnm *testNetworkManager) Follow(report cmix.ClientErrorReport) (stoppable.Stoppable, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) SendWithAssembler(recipient *id.ID, assembler cmix.MessageAssembler, cmixParams cmix.CMIXParams) (rounds.Round, ephemeral.Id, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) Send(recipient *id.ID, fingerprint format.Fingerprint, service message.Service, payload, mac []byte, cmixParams cmix.CMIXParams) (rounds.Round, ephemeral.Id, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) AddIdentity(id *id.ID, validUntil time.Time, persistent bool) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) AddIdentityWithHistory(id *id.ID, validUntil, beginning time.Time, persistent bool) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) RemoveIdentity(id *id.ID) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) GetIdentity(get *id.ID) (identity.TrackedID, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) AddFingerprint(identity *id.ID, fingerprint format.Fingerprint, mp message.Processor) error { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) DeleteFingerprint(identity *id.ID, fingerprint format.Fingerprint) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) DeleteClientFingerprints(identity *id.ID) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) DeleteClientService(clientID *id.ID) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) TrackServices(tracker message.ServicesTracker) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) CheckInProgressMessages() { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) IsHealthy() bool { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) WasHealthy() bool { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) AddHealthCallback(f func(bool)) uint64 { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) RemoveHealthCallback(u uint64) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) HasNode(nid *id.ID) bool { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) NumRegisteredNodes() int { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) TriggerNodeRegistration(nid *id.ID) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) GetRoundResults(timeout time.Duration, roundCallback cmix.RoundEventCallback, roundList ...id.Round) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) LookupHistoricalRound(rid id.Round, callback rounds.RoundResultCallback) error { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) SendToAny(sendFunc func(host *connect.Host) (interface{}, error), stop *stoppable.Single) (interface{}, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) SendToPreferred(targets []*id.ID, sendFunc gateway.SendToPreferredFunc, stop *stoppable.Single, timeout time.Duration) (interface{}, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) SetGatewayFilter(f gateway.Filter) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) GetHostParams() connect.HostParams { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) GetAddressSpace() uint8 { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) RegisterAddressSpaceNotification(tag string) (chan uint8, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) UnregisterAddressSpaceNotification(tag string) { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) GetInstance() *network.Instance { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) GetVerboseRounds() string { - //TODO implement me + // TODO implement me panic("implement me") } func (tnm *testNetworkManager) GetMaxMessageLength() int { return format.NewMessage(tnm.grp.GetP().ByteLen()).ContentsSize() } + +func (tnm *testNetworkManager) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (tnm *testNetworkManager) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} diff --git a/storage/utility/NDF.go b/storage/utility/NDF.go index c01485f26856e0ad111dab4614992045d7d585e4..3c5f9b9d48bb2fe6fc55bb07221f12d8add79538 100644 --- a/storage/utility/NDF.go +++ b/storage/utility/NDF.go @@ -5,6 +5,9 @@ // LICENSE file. // //////////////////////////////////////////////////////////////////////////////// +// This file is compiled for all architectures except WebAssembly. +//go:build !js || !wasm + package utility import ( diff --git a/storage/utility/NDF_js.go b/storage/utility/NDF_js.go new file mode 100644 index 0000000000000000000000000000000000000000..f958bd350143e6773e5aaf5f35ee495c46bce04c --- /dev/null +++ b/storage/utility/NDF_js.go @@ -0,0 +1,40 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package utility + +import ( + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/xx_network/primitives/ndf" + "os" + "syscall/js" +) + +const NdfStorageKeyNamePrefix = "ndfStorageKey/" + +var localStorage = js.Global().Get("localStorage") + +func LoadNDF(_ *versioned.KV, key string) (*ndf.NetworkDefinition, error) { + keyValue := localStorage.Call("getItem", NdfStorageKeyNamePrefix+key) + if keyValue.IsNull() { + return nil, os.ErrNotExist + } + + return ndf.Unmarshal([]byte(keyValue.String())) +} + +func SaveNDF(_ *versioned.KV, key string, ndf *ndf.NetworkDefinition) error { + marshaled, err := ndf.Marshal() + if err != nil { + return err + } + + localStorage.Call("setItem", + NdfStorageKeyNamePrefix+key, string(marshaled)) + + return nil +} diff --git a/ud/networkManager_test.go b/ud/networkManager_test.go index e58b3cf1fe3ef539e4c19bcb8ca2f95e24a15901..56186c12c849eef83fde6bba26c89d8dee5b6abc 100644 --- a/ud/networkManager_test.go +++ b/ud/networkManager_test.go @@ -269,3 +269,8 @@ func (tnm *testNetworkManager) UnregisterAddressSpaceNotification(tag string) { //TODO implement me panic("implement me") } + +func (tnm *testNetworkManager) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (tnm *testNetworkManager) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} diff --git a/xxdk/cmix.go b/xxdk/cmix.go index a2b8b8b9ad3c6e7cc77c6eb325fabba17084cff0..aa8fb76833fc97e2e73190294082b38a0c42d6f4 100644 --- a/xxdk/cmix.go +++ b/xxdk/cmix.go @@ -182,7 +182,7 @@ func NewProtoCmix_Unsafe(ndfJSON, storageDir string, password []byte, storageSess.SetRegistrationTimestamp(protoUser.RegistrationTimestamp) // Move the registration state to indicate registered with registration on - // roto client + // proto client err = storageSess.ForwardRegistrationStatus(storage.PermissioningComplete) if err != nil { return err @@ -484,6 +484,41 @@ func (c *Cmix) GetNodeRegistrationStatus() (int, int, error) { return numRegistered, len(nodes) - numStale, nil } +// IsReady returns true if at least percentReady of node registrations has +// completed. If not all have completed, then it returns false and howClose will +// be a percent (0-1) of node registrations completed. +func (c *Cmix) IsReady(percentReady float64) (isReady bool, howClose float64) { + // Check if the network is currently healthy + if !c.network.IsHealthy() { + return false, 0 + } + + numReg, numNodes, err := c.GetNodeRegistrationStatus() + if err != nil { + jww.FATAL.Panicf("Failed to get node registration status: %+v", err) + } + + isReady = (float64(numReg) / float64(numNodes)) >= percentReady + howClose = float64(numReg) / (float64(numNodes) * percentReady) + if howClose > 1 { + howClose = 1 + } + + return isReady, howClose +} + +// PauseNodeRegistrations stops all node registrations and returns a function to +// resume them. +func (c *Cmix) PauseNodeRegistrations(timeout time.Duration) error { + return c.network.PauseNodeRegistrations(timeout) +} + +// ChangeNumberOfNodeRegistrations changes the number of parallel node +// registrations up to the initialized maximum. +func (c *Cmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return c.network.ChangeNumberOfNodeRegistrations(toRun, timeout) +} + // GetPreferredBins returns the geographic bin or bins that the provided two // character country code is a part of. func (c *Cmix) GetPreferredBins(countryCode string) ([]string, error) { diff --git a/xxdk/utilsInterfaces_test.go b/xxdk/utilsInterfaces_test.go index a6be0ca8d7143ed748efe04058930a47392aa08c..ba08622046afcee2c7268202b2ba7fd1785d1539 100644 --- a/xxdk/utilsInterfaces_test.go +++ b/xxdk/utilsInterfaces_test.go @@ -168,6 +168,10 @@ func (t *testNetworkManagerGeneric) RemoveIdentity(id *id.ID) {} func (t *testNetworkManagerGeneric) AddService(clientID *id.ID, newService message.Service, response message.Processor) { } +func (t *testNetworkManagerGeneric) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) { + return nil +} + func (t *testNetworkManagerGeneric) DeleteService(clientID *id.ID, toDelete message.Service, processor message.Processor) { } @@ -220,3 +224,7 @@ func (t *testNetworkManagerGeneric) TriggerNodeRegistration(nid *id.ID) {} func (t *testNetworkManagerGeneric) UnregisterAddressSpaceNotification( tag string) { } +func (t *testNetworkManagerGeneric) PauseNodeRegistrations(timeout time.Duration) error { return nil } +func (t *testNetworkManagerGeneric) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error { + return nil +} diff --git a/xxdk/version_vars.go b/xxdk/version_vars.go index 76a41f587dcf4dfd5fa2e3ba8b7ebea7510c4d58..dfa74d63598860b1551a263d123448427a49a64f 100644 --- a/xxdk/version_vars.go +++ b/xxdk/version_vars.go @@ -1,11 +1,11 @@ // Code generated by go generate; DO NOT EDIT. // This file was generated by robots at -// 2022-10-17 12:52:49.822998 -0500 CDT m=+0.037589931 +// 2022-10-28 17:15:49.212852925 +0000 UTC m=+0.009994080 package xxdk -const GITVERSION = `a401d138 update deps` -const SEMVER = "4.3.0" +const GITVERSION = `30a95769 yRevert "Add /v4 modpath"` +const SEMVER = "4.3.2" const DEPENDENCIES = `module gitlab.com/elixxir/client go 1.17 @@ -22,13 +22,13 @@ require ( github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.8.0 gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f - gitlab.com/elixxir/comms v0.0.4-0.20221017173926-4eaa6061dfaa - gitlab.com/elixxir/crypto v0.0.7-0.20221017173452-565da4101a3b + gitlab.com/elixxir/comms v0.0.4-0.20221027214216-31527f2bb34c + gitlab.com/elixxir/crypto v0.0.7-0.20221027040529-dbcb6eb2b087 gitlab.com/elixxir/ekv v0.2.1 - gitlab.com/elixxir/primitives v0.0.3-0.20221017172918-6176818d1aba - gitlab.com/xx_network/comms v0.0.4-0.20221017172508-09e33697dc15 - gitlab.com/xx_network/crypto v0.0.5-0.20221017172404-b384a8d8b171 - gitlab.com/xx_network/primitives v0.0.4-0.20221017171439-42169a3e5c0d + gitlab.com/elixxir/primitives v0.0.3-0.20221025020430-f5d2eb330fbc + gitlab.com/xx_network/comms v0.0.4-0.20221027213956-c8a8cf68ee4f + gitlab.com/xx_network/crypto v0.0.5-0.20221025020316-517fa8f91d2c + gitlab.com/xx_network/primitives v0.0.4-0.20221025020003-cbec15a71b8f go.uber.org/ratelimit v0.2.0 golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c @@ -37,7 +37,7 @@ require ( ) require ( - git.xx.network/elixxir/grpc-web-go-client v0.0.0-20220908170150-ef04339ffe65 // indirect + git.xx.network/elixxir/grpc-web-go-client v0.0.0-20221027213839-7343cd399f14 // indirect github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect github.com/badoux/checkmail v1.2.1 // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect @@ -69,7 +69,7 @@ require ( github.com/ttacon/libphonenumber v1.2.1 // indirect github.com/tyler-smith/go-bip39 v1.1.0 // indirect github.com/zeebo/blake3 v0.2.3 // indirect - gitlab.com/xx_network/ring v0.0.3-0.20220222211904-da613960ad93 // indirect + gitlab.com/xx_network/ring v0.0.3-0.20220902183151-a7d3b15bc981 // indirect go.uber.org/atomic v1.10.0 // indirect golang.org/x/sys v0.0.0-20220731174439-a90be440212d // indirect golang.org/x/text v0.3.7 // indirect