Skip to content
Snippets Groups Projects
Commit 61bb833f authored by Jake Taylor's avatar Jake Taylor :lips:
Browse files

Merge branch 'hotfix/AdjustableFollowerInterval' into 'release'

early attempt at making the network follower period adjustable via the bindings

See merge request !485
parents 5417f1aa 73828824
No related branches found
No related tags found
2 merge requests!510Release,!485early attempt at making the network follower period adjustable via the bindings
Showing
with 142 additions and 73 deletions
......@@ -69,6 +69,12 @@ func (c *Cmix) StopNetworkFollower() error {
return nil
}
// SetTrackNetworkPeriod allows changing the frequency that follower threads
// are started.
func (c *Cmix) SetTrackNetworkPeriod(d time.Duration) {
c.api.SetTrackNetworkPeriod(d)
}
// WaitForNetwork will block until either the network is healthy or the passed
// timeout is reached. It will return true if the network is healthy.
func (c *Cmix) WaitForNetwork(timeoutMS int) bool {
......
......@@ -82,6 +82,9 @@ type client struct {
// Earliest tracked round
earliestRound *uint64
// Current Period of the follower
followerPeriod *int64
// Number of polls done in a period of time
tracker *uint64
latencySum uint64
......@@ -110,6 +113,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
netTime.SetTimeSource(localTime{})
followerPeriod := int64(params.TrackNetworkPeriod)
// Create client object
c := &client{
param: params,
......@@ -123,6 +128,7 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
skewTracker: clockSkew.New(params.ClockSkewClamp),
attemptTracker: attempts.NewSendAttempts(),
numNodes: &numNodes,
followerPeriod: &followerPeriod,
}
if params.VerboseRoundTracking {
......@@ -285,6 +291,17 @@ func (c *client) Follow(report ClientErrorReport) (stoppable.Stoppable, error) {
return multi, nil
}
// SetTrackNetworkPeriod allows changing the frequency that follower threads
// are started.
func (c *client) SetTrackNetworkPeriod(d time.Duration) {
atomic.StoreInt64(c.followerPeriod, int64(d))
}
// GetTrackNetworkPeriod returns the current tracked network period.
func (c *client) GetTrackNetworkPeriod() time.Duration {
return time.Duration(atomic.LoadInt64(c.followerPeriod))
}
// GetInstance returns the network instance object (NDF state).
func (c *client) GetInstance() *commNetwork.Instance {
return c.instance
......
......@@ -47,12 +47,6 @@ import (
const (
debugTrackPeriod = 1 * time.Minute
// Estimate the number of rounds per second in the network. Will need
// updated someday in order to correctly determine how far back to search
// rounds for messages as the network continues to grow, otherwise message
// drops occur.
estimatedRoundsPerSecond = 5
)
// followNetworkComms is a comms interface to make testing easier.
......@@ -68,9 +62,11 @@ type followNetworkComms interface {
// round status, and informs the client when messages can be retrieved.
func (c *client) followNetwork(report ClientErrorReport,
stop *stoppable.Single) {
ticker := time.NewTicker(c.param.TrackNetworkPeriod)
TrackTicker := time.NewTicker(debugTrackPeriod)
rng := c.rng.GetStream()
// Keep track of the current tracker period in order to detect changes
currentTrackPeriod := c.GetTrackNetworkPeriod()
ticker := time.NewTicker(currentTrackPeriod)
trackTicker := time.NewTicker(debugTrackPeriod)
// abandon tracks rounds which data was not found out about in
// the verbose rounds debugging mode
......@@ -85,7 +81,6 @@ func (c *client) followNetwork(report ClientErrorReport,
for {
select {
case <-stop.Quit():
rng.Close()
stop.ToStopped()
return
case <-ticker.C:
......@@ -98,7 +93,7 @@ func (c *client) followNetwork(report ClientErrorReport,
// trigger the first separately because it will get network state
// updates
go func() {
c.follow(toTrack[0], report, rng, c.comms, stop, abandon,
c.follow(toTrack[0], report, c.comms, stop, abandon,
true)
wg.Done()
}()
......@@ -106,7 +101,7 @@ func (c *client) followNetwork(report ClientErrorReport,
//trigger all others without getting network state updates
for i := 1; i < len(toTrack); i++ {
go func(index int) {
c.follow(toTrack[index], report, rng, c.comms, stop,
c.follow(toTrack[index], report, c.comms, stop,
dummyAbandon, false)
wg.Done()
}(i)
......@@ -127,6 +122,10 @@ func (c *client) followNetwork(report ClientErrorReport,
stream,
c.Space.GetAddressSpaceWithoutWait(),
operator)
if err != nil {
jww.ERROR.Printf("failed to operate on identities to "+
"track: %s", err)
}
stream.Close()
//update clock skew
......@@ -134,13 +133,14 @@ func (c *client) followNetwork(report ClientErrorReport,
// invert the skew because we need to reverse it
netTime.SetOffset(-estimatedSkew)
if err != nil {
jww.ERROR.Printf("failed to operate on identities to "+
"track: %s", err)
continue
// Update ticker if tracker period changes
newTrackPeriod := c.GetTrackNetworkPeriod()
if newTrackPeriod != currentTrackPeriod {
currentTrackPeriod = newTrackPeriod
ticker.Reset(currentTrackPeriod)
}
case <-TrackTicker.C:
case <-trackTicker.C:
numPolls := atomic.SwapUint64(c.tracker, 0)
if c.numLatencies != 0 {
latencyAvg := time.Nanosecond * time.Duration(
......@@ -167,7 +167,7 @@ func (c *client) followNetwork(report ClientErrorReport,
// follow executes an iteration of the follower for a specific identity
func (c *client) follow(identity receptionID.IdentityUse,
report ClientErrorReport, rng csprng.Source, comms followNetworkComms,
report ClientErrorReport, comms followNetworkComms,
stop *stoppable.Single, abandon func(round id.Round), getUpdates bool) {
// While polling with a fake identity, it is necessary to have populated
......@@ -498,10 +498,10 @@ func (c *client) follow(identity receptionID.IdentityUse,
func (c *client) getFakeEarliestRound() id.Round {
rng := c.rng.GetStream()
b, err := csprng.Generate(8, rng)
rng.Close()
if err != nil {
jww.FATAL.Panicf("Could not get random number: %v", err)
}
rng.Close()
rangeVal := binary.LittleEndian.Uint64(b) % 800
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
package cmix
//func TestClient_Follow(t *testing.T) {
// m, err := newTestClient(t)
// if err != nil {
// t.Fatalf("Failed to create test client: %+v", err)
// }
//
// clientErrorReport := func(source, message, trace string) {
//
// }
// s, err := m.Follow(clientErrorReport)
// if err != nil {
// t.Errorf("Failed to follow network: %+v", err)
// }
//
// err = s.Close()
// if err != nil {
// t.Errorf("Failed to close follower: %+v", err)
// }
//}
......@@ -29,6 +29,10 @@ type Client interface {
// Only one follower may run at a time.
Follow(report ClientErrorReport) (stoppable.Stoppable, error)
// SetTrackNetworkPeriod allows changing the frequency that follower threads
// are started.
SetTrackNetworkPeriod(d time.Duration)
/* === Sending ========================================================== */
// GetMaxMessageLength returns the max message size for the current network.
......
......@@ -22,7 +22,9 @@ import (
)
type Params struct {
// TrackNetworkPeriod determines how frequently follower threads are started.
TrackNetworkPeriod time.Duration
// MaxCheckedRounds is the maximum number of rounds to check in a single
// iterations network updates.
MaxCheckedRounds uint
......
......@@ -151,6 +151,11 @@ type mockCmix struct {
instance *network.Instance
}
func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
func newMockCmix() *mockCmix {
return &mockCmix{}
}
......
......@@ -30,6 +30,11 @@ type mockCmix struct {
payloadSize int
}
func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
func newMockCmix(payloadSize int) cmix.Client {
return &mockCmix{
......
......@@ -110,6 +110,11 @@ type mockFpgCmix struct {
sync.Mutex
}
func (m *mockFpgCmix) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
func newMockFpgCmix() *mockFpgCmix {
return &mockFpgCmix{
processors: make(map[id.ID]map[format.Fingerprint]message.Processor),
......
......@@ -221,6 +221,11 @@ func (m mockServiceHandler) DeleteService(clientID *id.ID, toDelete message.Serv
type mockNetManager struct{}
func (m *mockNetManager) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
func (m *mockNetManager) GetIdentity(get *id.ID) (identity.TrackedID, error) {
// TODO implement me
panic("implement me")
......
......@@ -154,6 +154,11 @@ type mockCmix struct {
instance *network.Instance
}
func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
func newMockCmix(myID *id.ID, handler *mockCmixHandler, t testing.TB) *mockCmix {
comms := &connect.ProtoComms{Manager: connect.NewManagerTesting(t)}
def := getNDF()
......
......@@ -95,6 +95,11 @@ type mockCmix struct {
sync.Mutex
}
func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
func newMockCmix(
myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix {
return &mockCmix{
......
......@@ -98,6 +98,11 @@ type mockCmix struct {
sync.Mutex
}
func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
func newMockCmix(myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix {
return &mockCmix{
myID: myID,
......
......@@ -92,6 +92,11 @@ type mockCmix struct {
sync.Mutex
}
func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
func newMockCmix(
myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix {
return &mockCmix{
......
......@@ -145,6 +145,11 @@ type mockCmix struct {
sync.Mutex
}
func (m *mockCmix) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
func newMockCmix(
myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix {
return &mockCmix{
......
......@@ -35,6 +35,11 @@ type testNetworkManager struct {
sync.RWMutex
}
func (tnm *testNetworkManager) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
func newTestNetworkManager(sendErr int) cmix.Client {
return &testNetworkManager{
receptionMessages: [][]format.Message{},
......
......@@ -34,6 +34,11 @@ type testNetworkManager struct {
responseProcessor message.Processor
}
func (tnm *testNetworkManager) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
func (tnm *testNetworkManager) SendWithAssembler(recipient *id.ID, assembler cmix.MessageAssembler,
cmixParams cmix.CMIXParams) (rounds.Round, ephemeral.Id, error) {
......
......@@ -381,8 +381,15 @@ func (c *Cmix) StopNetworkFollower() error {
return c.followerServices.stop()
}
// SetTrackNetworkPeriod allows changing the frequency that follower threads
// are started.
func (c *Cmix) SetTrackNetworkPeriod(d time.Duration) {
c.network.SetTrackNetworkPeriod(d)
}
// NetworkFollowerStatus gets the state of the network follower. It returns a
// status with the following values:
//
// Stopped - 0
// Running - 2000
// Stopping - 3000
......
......@@ -85,6 +85,12 @@ type testNetworkManagerGeneric struct {
instance *network.Instance
sender gateway.Sender
}
func (t *testNetworkManagerGeneric) SetTrackNetworkPeriod(d time.Duration) {
//TODO implement me
panic("implement me")
}
type dummyEventMgr struct{}
func (d *dummyEventMgr) Report(p int, a, b, c string) {}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment