diff --git a/bindings/follow.go b/bindings/follow.go index b545981892576fc3136854e3af8a07cd3ae1c2e3..d456a181a7486c8c4b0cb360c41c2de94c9ccec6 100644 --- a/bindings/follow.go +++ b/bindings/follow.go @@ -69,6 +69,12 @@ func (c *Cmix) StopNetworkFollower() error { return nil } +// SetTrackNetworkPeriod allows changing the frequency that follower threads +// are started. +func (c *Cmix) SetTrackNetworkPeriod(d time.Duration) { + c.api.SetTrackNetworkPeriod(d) +} + // WaitForNetwork will block until either the network is healthy or the passed // timeout is reached. It will return true if the network is healthy. func (c *Cmix) WaitForNetwork(timeoutMS int) bool { diff --git a/cmix/client.go b/cmix/client.go index 6c3343e4f56b41d83627358968f8375113ac7f55..ce579f7fb09c6bf5f6a27f18128d16062a91838a 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -82,6 +82,9 @@ type client struct { // Earliest tracked round earliestRound *uint64 + // Current Period of the follower + followerPeriod *int64 + // Number of polls done in a period of time tracker *uint64 latencySum uint64 @@ -110,6 +113,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, netTime.SetTimeSource(localTime{}) + followerPeriod := int64(params.TrackNetworkPeriod) + // Create client object c := &client{ param: params, @@ -123,6 +128,7 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, skewTracker: clockSkew.New(params.ClockSkewClamp), attemptTracker: attempts.NewSendAttempts(), numNodes: &numNodes, + followerPeriod: &followerPeriod, } if params.VerboseRoundTracking { @@ -285,6 +291,17 @@ func (c *client) Follow(report ClientErrorReport) (stoppable.Stoppable, error) { return multi, nil } +// SetTrackNetworkPeriod allows changing the frequency that follower threads +// are started. +func (c *client) SetTrackNetworkPeriod(d time.Duration) { + atomic.StoreInt64(c.followerPeriod, int64(d)) +} + +// GetTrackNetworkPeriod returns the current tracked network period. +func (c *client) GetTrackNetworkPeriod() time.Duration { + return time.Duration(atomic.LoadInt64(c.followerPeriod)) +} + // GetInstance returns the network instance object (NDF state). func (c *client) GetInstance() *commNetwork.Instance { return c.instance diff --git a/cmix/follow.go b/cmix/follow.go index 3ba4f12de5c260a9cfff81e4fb70214a47eacd09..2a79df78825c239963c05e715b786ad83f5943c1 100644 --- a/cmix/follow.go +++ b/cmix/follow.go @@ -47,12 +47,6 @@ import ( const ( debugTrackPeriod = 1 * time.Minute - - // Estimate the number of rounds per second in the network. Will need - // updated someday in order to correctly determine how far back to search - // rounds for messages as the network continues to grow, otherwise message - // drops occur. - estimatedRoundsPerSecond = 5 ) // followNetworkComms is a comms interface to make testing easier. @@ -68,9 +62,11 @@ type followNetworkComms interface { // round status, and informs the client when messages can be retrieved. func (c *client) followNetwork(report ClientErrorReport, stop *stoppable.Single) { - ticker := time.NewTicker(c.param.TrackNetworkPeriod) - TrackTicker := time.NewTicker(debugTrackPeriod) - rng := c.rng.GetStream() + + // Keep track of the current tracker period in order to detect changes + currentTrackPeriod := c.GetTrackNetworkPeriod() + ticker := time.NewTicker(currentTrackPeriod) + trackTicker := time.NewTicker(debugTrackPeriod) // abandon tracks rounds which data was not found out about in // the verbose rounds debugging mode @@ -85,7 +81,6 @@ func (c *client) followNetwork(report ClientErrorReport, for { select { case <-stop.Quit(): - rng.Close() stop.ToStopped() return case <-ticker.C: @@ -98,7 +93,7 @@ func (c *client) followNetwork(report ClientErrorReport, // trigger the first separately because it will get network state // updates go func() { - c.follow(toTrack[0], report, rng, c.comms, stop, abandon, + c.follow(toTrack[0], report, c.comms, stop, abandon, true) wg.Done() }() @@ -106,7 +101,7 @@ func (c *client) followNetwork(report ClientErrorReport, //trigger all others without getting network state updates for i := 1; i < len(toTrack); i++ { go func(index int) { - c.follow(toTrack[index], report, rng, c.comms, stop, + c.follow(toTrack[index], report, c.comms, stop, dummyAbandon, false) wg.Done() }(i) @@ -127,6 +122,10 @@ func (c *client) followNetwork(report ClientErrorReport, stream, c.Space.GetAddressSpaceWithoutWait(), operator) + if err != nil { + jww.ERROR.Printf("failed to operate on identities to "+ + "track: %s", err) + } stream.Close() //update clock skew @@ -134,13 +133,14 @@ func (c *client) followNetwork(report ClientErrorReport, // invert the skew because we need to reverse it netTime.SetOffset(-estimatedSkew) - if err != nil { - jww.ERROR.Printf("failed to operate on identities to "+ - "track: %s", err) - continue + // Update ticker if tracker period changes + newTrackPeriod := c.GetTrackNetworkPeriod() + if newTrackPeriod != currentTrackPeriod { + currentTrackPeriod = newTrackPeriod + ticker.Reset(currentTrackPeriod) } - case <-TrackTicker.C: + case <-trackTicker.C: numPolls := atomic.SwapUint64(c.tracker, 0) if c.numLatencies != 0 { latencyAvg := time.Nanosecond * time.Duration( @@ -167,7 +167,7 @@ func (c *client) followNetwork(report ClientErrorReport, // follow executes an iteration of the follower for a specific identity func (c *client) follow(identity receptionID.IdentityUse, - report ClientErrorReport, rng csprng.Source, comms followNetworkComms, + report ClientErrorReport, comms followNetworkComms, stop *stoppable.Single, abandon func(round id.Round), getUpdates bool) { // While polling with a fake identity, it is necessary to have populated @@ -498,10 +498,10 @@ func (c *client) follow(identity receptionID.IdentityUse, func (c *client) getFakeEarliestRound() id.Round { rng := c.rng.GetStream() b, err := csprng.Generate(8, rng) + rng.Close() if err != nil { jww.FATAL.Panicf("Could not get random number: %v", err) } - rng.Close() rangeVal := binary.LittleEndian.Uint64(b) % 800 diff --git a/cmix/follow_test.go b/cmix/follow_test.go deleted file mode 100644 index 7b887cc1e874f49c2ae0226cfa28a4736956e689..0000000000000000000000000000000000000000 --- a/cmix/follow_test.go +++ /dev/null @@ -1,28 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2022 xx foundation // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file. // -//////////////////////////////////////////////////////////////////////////////// - -package cmix - -//func TestClient_Follow(t *testing.T) { -// m, err := newTestClient(t) -// if err != nil { -// t.Fatalf("Failed to create test client: %+v", err) -// } -// -// clientErrorReport := func(source, message, trace string) { -// -// } -// s, err := m.Follow(clientErrorReport) -// if err != nil { -// t.Errorf("Failed to follow network: %+v", err) -// } -// -// err = s.Close() -// if err != nil { -// t.Errorf("Failed to close follower: %+v", err) -// } -//} diff --git a/cmix/interface.go b/cmix/interface.go index 739c463f937b2d4cdac0a5e4c9294e44104e6c78..c5d1a45dc1f00f881e78d141a68645c41e84c176 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -29,6 +29,10 @@ type Client interface { // Only one follower may run at a time. Follow(report ClientErrorReport) (stoppable.Stoppable, error) + // SetTrackNetworkPeriod allows changing the frequency that follower threads + // are started. + SetTrackNetworkPeriod(d time.Duration) + /* === Sending ========================================================== */ // GetMaxMessageLength returns the max message size for the current network. diff --git a/cmix/params.go b/cmix/params.go index 74e639b3cb8023db379558ffe2caa732a5d4830b..c42beda6b8ff0ceba1b546e1fc369d42411a4d9f 100644 --- a/cmix/params.go +++ b/cmix/params.go @@ -22,7 +22,9 @@ import ( ) type Params struct { + // TrackNetworkPeriod determines how frequently follower threads are started. TrackNetworkPeriod time.Duration + // MaxCheckedRounds is the maximum number of rounds to check in a single // iterations network updates. MaxCheckedRounds uint diff --git a/connect/utils_test.go b/connect/utils_test.go index 8e7f31a8d6b2399cc01fbb1c43d0495723e41181..aa507deccf371582327d2d130d721676fec0c393 100644 --- a/connect/utils_test.go +++ b/connect/utils_test.go @@ -151,6 +151,11 @@ type mockCmix struct { instance *network.Instance } +func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + func newMockCmix() *mockCmix { return &mockCmix{} } diff --git a/dummy/mockCmix_test.go b/dummy/mockCmix_test.go index 3d6517f7961338fabea54127816e15bb5c9b7404..a7042bc087e996d3b26906b27b001559814419c1 100644 --- a/dummy/mockCmix_test.go +++ b/dummy/mockCmix_test.go @@ -30,6 +30,11 @@ type mockCmix struct { payloadSize int } +func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + func newMockCmix(payloadSize int) cmix.Client { return &mockCmix{ diff --git a/e2e/fpGenerator_test.go b/e2e/fpGenerator_test.go index 857fed38cd419c629cbb262c450a5a29ffddb7df..4d69dd69f353c4d51b8c180b04a48c26d1f772a1 100644 --- a/e2e/fpGenerator_test.go +++ b/e2e/fpGenerator_test.go @@ -110,6 +110,11 @@ type mockFpgCmix struct { sync.Mutex } +func (m *mockFpgCmix) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + func newMockFpgCmix() *mockFpgCmix { return &mockFpgCmix{ processors: make(map[id.ID]map[format.Fingerprint]message.Processor), diff --git a/e2e/rekey/utils_test.go b/e2e/rekey/utils_test.go index e6ba2671ba29dcd9a09a1ce1be74c35b5d3c728b..ca8ccef0816ef93de512f70df33576601c05ee03 100644 --- a/e2e/rekey/utils_test.go +++ b/e2e/rekey/utils_test.go @@ -221,6 +221,11 @@ func (m mockServiceHandler) DeleteService(clientID *id.ID, toDelete message.Serv type mockNetManager struct{} +func (m *mockNetManager) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + func (m *mockNetManager) GetIdentity(get *id.ID) (identity.TrackedID, error) { // TODO implement me panic("implement me") diff --git a/e2e/utils_test.go b/e2e/utils_test.go index bb63b070419f5653154ac9f8aae6b7bf2d281ad7..1afd01b554a66e29e0142a5211be41f9f9ec7372 100644 --- a/e2e/utils_test.go +++ b/e2e/utils_test.go @@ -154,6 +154,11 @@ type mockCmix struct { instance *network.Instance } +func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + func newMockCmix(myID *id.ID, handler *mockCmixHandler, t testing.TB) *mockCmix { comms := &connect.ProtoComms{Manager: connect.NewManagerTesting(t)} def := getNDF() diff --git a/fileTransfer/connect/utils_test.go b/fileTransfer/connect/utils_test.go index f62768d6fb7f23d144131a2c8051c8ef744e116e..5ad7afff04191a8e82b0c8b2ac815e708bc2514e 100644 --- a/fileTransfer/connect/utils_test.go +++ b/fileTransfer/connect/utils_test.go @@ -95,6 +95,11 @@ type mockCmix struct { sync.Mutex } +func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + func newMockCmix( myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix { return &mockCmix{ diff --git a/fileTransfer/e2e/utils_test.go b/fileTransfer/e2e/utils_test.go index 47de2b6709a93c0fead7d4185b7aed5914ccd674..0e71b45232060feb1d1ea694793f41f0730e574a 100644 --- a/fileTransfer/e2e/utils_test.go +++ b/fileTransfer/e2e/utils_test.go @@ -98,6 +98,11 @@ type mockCmix struct { sync.Mutex } +func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + func newMockCmix(myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix { return &mockCmix{ myID: myID, diff --git a/fileTransfer/groupChat/utils_test.go b/fileTransfer/groupChat/utils_test.go index f0c76f713494b526bebe457a0fbc4ffe8801b2b2..055a45e6cb5a8a6c28e8aeacd94e07d780094547 100644 --- a/fileTransfer/groupChat/utils_test.go +++ b/fileTransfer/groupChat/utils_test.go @@ -92,6 +92,11 @@ type mockCmix struct { sync.Mutex } +func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + func newMockCmix( myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix { return &mockCmix{ diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index 2d2e914311d6ab2cdf229d41d45daeccd8ddc788..1ccdb1792f16c0c9c7737bf1d135f7b71c4aedae 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -145,6 +145,11 @@ type mockCmix struct { sync.Mutex } +func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + func newMockCmix( myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix { return &mockCmix{ diff --git a/groupChat/networkManager_test.go b/groupChat/networkManager_test.go index c96ab995981c8d674df2c1175d6eda037d4aa7b6..5ea299aafecbb0cb4bda8808c8139eebdf185fb6 100644 --- a/groupChat/networkManager_test.go +++ b/groupChat/networkManager_test.go @@ -35,6 +35,11 @@ type testNetworkManager struct { sync.RWMutex } +func (tnm *testNetworkManager) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + func newTestNetworkManager(sendErr int) cmix.Client { return &testNetworkManager{ receptionMessages: [][]format.Message{}, diff --git a/ud/networkManager_test.go b/ud/networkManager_test.go index 0e42e5006a4230bfd00be247aa083b2e32cb4066..97a5b165299ea8544dffb2e9767bebae848cad72 100644 --- a/ud/networkManager_test.go +++ b/ud/networkManager_test.go @@ -34,6 +34,11 @@ type testNetworkManager struct { responseProcessor message.Processor } +func (tnm *testNetworkManager) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + func (tnm *testNetworkManager) SendWithAssembler(recipient *id.ID, assembler cmix.MessageAssembler, cmixParams cmix.CMIXParams) (rounds.Round, ephemeral.Id, error) { diff --git a/xxdk/cmix.go b/xxdk/cmix.go index 66ce1e4b946f0dd1ccb5fac6b9e1f3a15d54f1d7..b81ea31c8ebdef8e0ee4a15ec1d0f2cce508a90a 100644 --- a/xxdk/cmix.go +++ b/xxdk/cmix.go @@ -340,28 +340,28 @@ func (c *Cmix) GetErrorsChannel() <-chan interfaces.ClientError { // they are stopped if there is no internet access. // // Threads Started: -// - Network Follower (/network/follow.go) -// tracks the network events and hands them off to workers for handling. -// - Historical Round Retrieval (/network/rounds/historical.go) -// retrieves data about rounds that are too old to be stored by the client. -// - Message Retrieval Worker Group (/network/rounds/retrieve.go) -// requests all messages in a given round from the gateway of the last nodes. -// - Message Handling Worker Group (/network/message/handle.go) -// decrypts and partitions messages when signals via the Switchboard. -// - Health Tracker (/network/health), -// via the network instance, tracks the state of the network. -// - Garbled Messages (/network/message/garbled.go) -// can be signaled to check all recent messages that could be decoded. It -// uses a message store on disk for persistence. -// - Critical Messages (/network/message/critical.go) -// ensures all protocol layer mandatory messages are sent. It uses a message -// store on disk for persistence. -// - KeyExchange Trigger (/keyExchange/trigger.go) -// responds to sent rekeys and executes them. -// - KeyExchange Confirm (/keyExchange/confirm.go) -// responds to confirmations of successful rekey operations. -// - Auth Callback (/auth/callback.go) -// handles both auth confirm and requests. +// - Network Follower (/network/follow.go) +// tracks the network events and hands them off to workers for handling. +// - Historical Round Retrieval (/network/rounds/historical.go) +// retrieves data about rounds that are too old to be stored by the client. +// - Message Retrieval Worker Group (/network/rounds/retrieve.go) +// requests all messages in a given round from the gateway of the last nodes. +// - Message Handling Worker Group (/network/message/handle.go) +// decrypts and partitions messages when signals via the Switchboard. +// - Health Tracker (/network/health), +// via the network instance, tracks the state of the network. +// - Garbled Messages (/network/message/garbled.go) +// can be signaled to check all recent messages that could be decoded. It +// uses a message store on disk for persistence. +// - Critical Messages (/network/message/critical.go) +// ensures all protocol layer mandatory messages are sent. It uses a message +// store on disk for persistence. +// - KeyExchange Trigger (/keyExchange/trigger.go) +// responds to sent rekeys and executes them. +// - KeyExchange Confirm (/keyExchange/confirm.go) +// responds to confirmations of successful rekey operations. +// - Auth Callback (/auth/callback.go) +// handles both auth confirm and requests. func (c *Cmix) StartNetworkFollower(timeout time.Duration) error { jww.INFO.Printf( "StartNetworkFollower() \n\tTransmissionID: %s \n\tReceptionID: %s", @@ -381,11 +381,18 @@ func (c *Cmix) StopNetworkFollower() error { return c.followerServices.stop() } +// SetTrackNetworkPeriod allows changing the frequency that follower threads +// are started. +func (c *Cmix) SetTrackNetworkPeriod(d time.Duration) { + c.network.SetTrackNetworkPeriod(d) +} + // NetworkFollowerStatus gets the state of the network follower. It returns a // status with the following values: -// Stopped - 0 -// Running - 2000 -// Stopping - 3000 +// +// Stopped - 0 +// Running - 2000 +// Stopping - 3000 func (c *Cmix) NetworkFollowerStatus() Status { jww.INFO.Printf("NetworkFollowerStatus()") return c.followerServices.status() @@ -597,7 +604,7 @@ func DecodeGroups(ndf *ndf.NetworkDefinition) (cmixGrp, e2eGrp *cyclic.Group) { // CheckVersionAndSetupStorage checks the client version and creates a new // storage for user data. This function is common code shared by NewCmix, -//// NewPrecannedCmix and NewVanityCmix. +// // NewPrecannedCmix and NewVanityCmix. func CheckVersionAndSetupStorage(def *ndf.NetworkDefinition, storageDir string, password []byte, userInfo user.Info, cmixGrp, e2eGrp *cyclic.Group, registrationCode string) (storage.Session, error) { diff --git a/xxdk/utilsInterfaces_test.go b/xxdk/utilsInterfaces_test.go index d3d05eb1e3f1062b31ef4a9fbf7778d80fb1d3dc..0320ab726f764b22f261c7fc18ee92f103b4198a 100644 --- a/xxdk/utilsInterfaces_test.go +++ b/xxdk/utilsInterfaces_test.go @@ -85,6 +85,12 @@ type testNetworkManagerGeneric struct { instance *network.Instance sender gateway.Sender } + +func (t *testNetworkManagerGeneric) SetTrackNetworkPeriod(d time.Duration) { + //TODO implement me + panic("implement me") +} + type dummyEventMgr struct{} func (d *dummyEventMgr) Report(p int, a, b, c string) {}