From be2fcb10312d3e0e78597a8f035503ce9cae2e9e Mon Sep 17 00:00:00 2001 From: Benjamin Wenger <ben@elixxir.ioo> Date: Wed, 23 Sep 2020 17:28:40 -0700 Subject: [PATCH] fixed health treacker, changed client thread handling, fixed tests --- api/status.go | 16 ++++++------ api/version.go | 4 +-- bindings/api.go | 2 +- bindings/interfaces.go | 3 +-- network/health/tracker.go | 32 ++++++++++++++++++----- network/manager.go | 7 +++-- network/rounds/historical.go | 11 ++++---- network/send.go | 6 ++--- stoppable/cleanup.go | 10 +++---- stoppable/multi.go | 4 +-- storage/cmix/store_test.go | 46 ++++++++++++++++----------------- storage/regCode.go | 4 +-- switchboard/switchboard_test.go | 4 +-- 13 files changed, 84 insertions(+), 65 deletions(-) diff --git a/api/status.go b/api/status.go index 1b3ee608d..cebfac0f4 100644 --- a/api/status.go +++ b/api/status.go @@ -42,8 +42,8 @@ func newStatusTracker() *statusTracker { func (s *statusTracker) toStarting() error { if !atomic.CompareAndSwapUint32(s.s, uint32(Stopped), uint32(Starting)) { return errors.Errorf("Failed to move to '%s' status, at '%s', "+ - "must be at '%s' for transition", Starting, atomic.LoadUint32(s.s), - Stopped) + "must be at '%s' for transition", Starting, + Status(atomic.LoadUint32(s.s)), Stopped) } return nil } @@ -51,8 +51,8 @@ func (s *statusTracker) toStarting() error { func (s *statusTracker) toRunning() error { if !atomic.CompareAndSwapUint32(s.s, uint32(Starting), uint32(Running)) { return errors.Errorf("Failed to move to '%s' status, at '%s', "+ - "must be at '%s' for transition", Running, atomic.LoadUint32(s.s), - Starting) + "must be at '%s' for transition", + Running, Status(atomic.LoadUint32(s.s)), Starting) } return nil } @@ -60,8 +60,8 @@ func (s *statusTracker) toRunning() error { func (s *statusTracker) toStopping() error { if !atomic.CompareAndSwapUint32(s.s, uint32(Running), uint32(Stopping)) { return errors.Errorf("Failed to move to '%s' status, at '%s',"+ - " must be at '%s' for transition", Stopping, atomic.LoadUint32(s.s), - Running) + " must be at '%s' for transition", Stopping, + Status(atomic.LoadUint32(s.s)), Running) } return nil } @@ -69,8 +69,8 @@ func (s *statusTracker) toStopping() error { func (s *statusTracker) toStopped() error { if !atomic.CompareAndSwapUint32(s.s, uint32(Stopping), uint32(Stopped)) { return errors.Errorf("Failed to move to '%s' status, at '%s',"+ - " must be at '%s' for transition", Stopped, atomic.LoadUint32(s.s), - Stopping) + " must be at '%s' for transition", Stopped, + Status(atomic.LoadUint32(s.s)), Stopping) } return nil } diff --git a/api/version.go b/api/version.go index 639329b21..d8a8e2a05 100644 --- a/api/version.go +++ b/api/version.go @@ -16,7 +16,7 @@ func (c *Client) Version() version.Version { func (c *Client) checkVersion() error { clientVersion := c.Version() - jww.INFO.Printf("Client Version: %s", clientVersion) + jww.INFO.Printf("Client Version: %s", clientVersion.String()) has, netVersion, err := c.permissioning.GetNetworkVersion() if err != nil { @@ -29,7 +29,7 @@ func (c *Client) checkVersion() error { return errors.Errorf("Client and Minimum Network Version are "+ "incompatible\n"+ "\tMinimum Network: %s\n"+ - "\tClient: %s", netVersion, clientVersion) + "\tClient: %s", netVersion.String(), clientVersion.String()) } } else { jww.WARN.Printf("Network requires no minnimim version") diff --git a/bindings/api.go b/bindings/api.go index b8351406a..3195423f1 100644 --- a/bindings/api.go +++ b/bindings/api.go @@ -35,7 +35,7 @@ type BindingsClient struct { // Users of this function should delete the storage directory on error. func NewClient(network, storageDir string, password []byte) (Client, error) { // TODO: This should wrap the bindings ClientImpl, when available. - client, err := api.NewClient(network, storageDir, password) + client, err := api.NewClient(network, storageDir, password, "") if err != nil { return nil, err } diff --git a/bindings/interfaces.go b/bindings/interfaces.go index e7d3f4e27..4149cfad1 100644 --- a/bindings/interfaces.go +++ b/bindings/interfaces.go @@ -8,7 +8,6 @@ package bindings import ( "gitlab.com/elixxir/client/api" - "gitlab.com/elixxir/client/stoppable" ) // Client is defined inside the api package. At minimum, it implements all of @@ -121,7 +120,7 @@ type Client interface { // and returns an object for checking state and stopping those threads. // Call this when returning from sleep and close when going back to // sleep. - StartNetworkRunner() stoppable.Stoppable + StartNetworkFollower() error // RegisterRoundEventsHandler registers a callback interface for round // events. diff --git a/network/health/tracker.go b/network/health/tracker.go index 3fd4f78db..3a297ba9b 100644 --- a/network/health/tracker.go +++ b/network/health/tracker.go @@ -9,6 +9,7 @@ package health import ( + "errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/comms/network" @@ -24,7 +25,7 @@ type Tracker struct { channels []chan bool funcs []func(isHealthy bool) - *stoppable.Single + running bool isHealthy bool mux sync.RWMutex @@ -46,7 +47,7 @@ func newTracker(timeout time.Duration) *Tracker { channels: make([]chan bool, 0), heartbeat: make(chan network.Heartbeat, 100), isHealthy: false, - Single: stoppable.NewSingle("Health Tracker"), + running: false, } } @@ -75,13 +76,30 @@ func (t *Tracker) setHealth(h bool) { t.transmit(h) } -func (t *Tracker) Start() { - if t.Single.IsRunning() { - jww.FATAL.Panicf("Cannot start the health tracker when it " + - "is already running") +func (t *Tracker) Start() (stoppable.Stoppable, error) { + t.mux.Lock() + defer t.mux.Unlock() + if t.running { + return nil, errors.New("cannot start Health tracker threads, " + + "they are already running") } + t.running = true + + t.isHealthy = false + + stop := stoppable.NewSingle("Health Tracker") + stopCleanup := stoppable.NewCleanup(stop, func(duration time.Duration) error { + t.mux.Lock() + defer t.mux.Unlock() + t.isHealthy = false + t.transmit(false) + t.running = false + return nil + }) + + go t.start(stop.Quit()) - go t.start(t.Quit()) + return stopCleanup, nil } // Long-running thread used to monitor and report on network health diff --git a/network/manager.go b/network/manager.go index 1482b1dba..dd7990cc7 100644 --- a/network/manager.go +++ b/network/manager.go @@ -96,8 +96,11 @@ func (m *manager) Follow() (stoppable.Stoppable, error) { multi := stoppable.NewMulti("networkManager") // health tracker - m.Health.Start() - multi.Add(m.Health) + healthStop, err := m.Health.Start() + if err != nil { + return nil, errors.Errorf("failed to follow") + } + multi.Add(healthStop) // Node Updates multi.Add(node.StartRegistration(m.Instance, m.Session, m.Rng, diff --git a/network/rounds/historical.go b/network/rounds/historical.go index 65623f771..e04483840 100644 --- a/network/rounds/historical.go +++ b/network/rounds/historical.go @@ -29,7 +29,6 @@ type historicalRoundsComms interface { func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-chan struct{}) { timerCh := make(<-chan time.Time) - hasTimer := false rng := m.Rng.GetStream() var rounds []uint64 @@ -44,7 +43,6 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c done = true // if the timer elapses process rounds to ensure the delay isn't too long case <-timerCh: - hasTimer = false if len(rounds) > 0 { shouldProcess = true } @@ -53,7 +51,8 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c rounds = append(rounds, uint64(rid)) if len(rounds) > int(m.params.MaxHistoricalRounds) { shouldProcess = true - } else if !hasTimer { + } else if len(rounds) == 1 { + //if this is the first round, start the timeout timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C } } @@ -61,6 +60,7 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c continue } + //find a gateway to request about the rounds gwHost, err := gateway.Get(m.Instance.GetPartialNdf().Get(), comm, rng) if err != nil { jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+ @@ -75,9 +75,10 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c if err != nil { jww.ERROR.Printf("Failed to request historical rounds "+ "data: %s", response) - // if the check fails to resolve, break the loop so they will be + // if the check fails to resolve, break the loop and so they will be // checked again - break + timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C + continue } for i, roundInfo := range response.Rounds { if roundInfo == nil { diff --git a/network/send.go b/network/send.go index 7e6aeb84e..904776c9d 100644 --- a/network/send.go +++ b/network/send.go @@ -14,7 +14,7 @@ import ( // Returns the round ID of the round the payload was sent or an error // if it fails. func (m *manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { - if !m.Health.IsRunning() { + if !m.Health.IsHealthy() { return 0, errors.New("Cannot send cmix message when the " + "network is not healthy") } @@ -28,7 +28,7 @@ func (m *manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err // NOTE: Do not use this function unless you know what you are doing. // This function always produces an error message in client logging. func (m *manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { - if !m.Health.IsRunning() { + if !m.Health.IsHealthy() { return nil, errors.New("cannot send unsafe message when the " + "network is not healthy") } @@ -46,7 +46,7 @@ func (m *manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, func (m *manager) SendE2E(msg message.Send, e2eP params.E2E) ( []id.Round, error) { - if !m.Health.IsRunning() { + if !m.Health.IsHealthy() { return nil, errors.New("Cannot send e2e message when the " + "network is not healthy") } diff --git a/stoppable/cleanup.go b/stoppable/cleanup.go index ffe3cdae6..a733d74bc 100644 --- a/stoppable/cleanup.go +++ b/stoppable/cleanup.go @@ -37,7 +37,7 @@ func (c *Cleanup) IsRunning() bool { // Name returns the name of the stoppable denoting it has cleanup. func (c *Cleanup) Name() string { - return Name() + " with cleanup" + return c.stop.Name() + " with cleanup" } // Close stops the contained stoppable and runs the cleanup function after. The @@ -51,9 +51,9 @@ func (c *Cleanup) Close(timeout time.Duration) error { start := time.Now() // Run the stoppable - if err := Close(timeout); err != nil { + if err := c.stop.Close(timeout); err != nil { err = errors.WithMessagef(err, "Cleanup for %s not executed", - Name()) + c.stop.Name()) return } @@ -71,10 +71,10 @@ func (c *Cleanup) Close(timeout time.Duration) error { case err := <-complete: if err != nil { err = errors.WithMessagef(err, "Cleanup for %s failed", - Name()) + c.stop.Name()) } case <-timer.C: - err = errors.Errorf("Clean up for %s timeout", Name()) + err = errors.Errorf("Clean up for %s timeout", c.stop.Name()) } }) diff --git a/stoppable/multi.go b/stoppable/multi.go index a0f947d02..061be492d 100644 --- a/stoppable/multi.go +++ b/stoppable/multi.go @@ -43,7 +43,7 @@ func (m *Multi) Name() string { m.mux.RLock() names := m.name + ": {" for _, s := range m.stoppables { - names += Name() + ", " + names += s.Name() + ", " } if len(m.stoppables) > 0 { names = names[:len(names)-2] @@ -69,7 +69,7 @@ func (m *Multi) Close(timeout time.Duration) error { for _, stoppable := range m.stoppables { wg.Add(1) go func(stoppable Stoppable) { - if Close(timeout) != nil { + if stoppable.Close(timeout) != nil { atomic.AddUint32(&numErrors, 1) } wg.Done() diff --git a/storage/cmix/store_test.go b/storage/cmix/store_test.go index df305ebb3..1605abe94 100644 --- a/storage/cmix/store_test.go +++ b/storage/cmix/store_test.go @@ -14,34 +14,13 @@ import ( "gitlab.com/elixxir/ekv" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" - "os" "testing" ) -// Most of these tests use the same Store -// So keep that in mind when designing tests -var testStore *Store - -// Main testing function -func TestMain(m *testing.M) { - - kv := make(ekv.Memstore) - vkv := versioned.NewKV(kv) - - grp := cyclic.NewGroup(large.NewInt(173), large.NewInt(2)) - priv := grp.NewInt(2) - - testStore, _ = NewStore(grp, vkv, priv) - - runFunc := func() int { - return m.Run() - } - - os.Exit(runFunc()) -} - // Happy path Add/Done test func TestStore_AddRemove(t *testing.T) { + testStore, _ := makeTestStore() + nodeId := id.NewIdFromString("test", id.Node, t) key := testStore.grp.NewInt(5) @@ -60,13 +39,16 @@ func TestStore_AddRemove(t *testing.T) { // Happy path func TestLoadStore(t *testing.T) { + testStore, kv := makeTestStore() + // Add a test node key nodeId := id.NewIdFromString("test", id.Node, t) key := testStore.grp.NewInt(5) + testStore.Add(nodeId, key) // Load the store and check its attributes - store, err := LoadStore(testStore.kv) + store, err := LoadStore(kv) if err != nil { t.Errorf("Unable to load store: %+v", err) } @@ -83,6 +65,7 @@ func TestLoadStore(t *testing.T) { // Happy path func TestStore_GetRoundKeys(t *testing.T) { + testStore, _ := makeTestStore() // Set up the circuit numIds := 10 nodeIds := make([]*id.ID, numIds) @@ -107,6 +90,7 @@ func TestStore_GetRoundKeys(t *testing.T) { // Missing keys path func TestStore_GetRoundKeys_Missing(t *testing.T) { + testStore, _ := makeTestStore() // Set up the circuit numIds := 10 nodeIds := make([]*id.ID, numIds) @@ -165,3 +149,17 @@ func TestNewStore(t *testing.T) { t.Errorf("Failed to set store.kv") } } + +// Main testing function +func makeTestStore() (*Store, *versioned.KV) { + + kv := make(ekv.Memstore) + vkv := versioned.NewKV(kv) + + grp := cyclic.NewGroup(large.NewInt(173), large.NewInt(2)) + priv := grp.NewInt(2) + + testStore, _ := NewStore(grp, vkv, priv) + + return testStore, vkv +} \ No newline at end of file diff --git a/storage/regCode.go b/storage/regCode.go index c2d7c2fad..acc03a360 100644 --- a/storage/regCode.go +++ b/storage/regCode.go @@ -1,10 +1,10 @@ package storage import ( - "gitlab.com/elixxir/client/storage/versioned" "github.com/pkg/errors" - "time" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/storage/versioned" + "time" ) const regCodeKey = "regCode" diff --git a/switchboard/switchboard_test.go b/switchboard/switchboard_test.go index 27a7635c2..5c097cdfa 100644 --- a/switchboard/switchboard_test.go +++ b/switchboard/switchboard_test.go @@ -155,7 +155,7 @@ func TestSwitchboard_RegisterFunc(t *testing.T) { t.Errorf("Listener is not registered by Message Type") } - Hear(message.Receive{}) + lid.listener.Hear(message.Receive{}) if !heard { t.Errorf("Func listener not registered correctly") } @@ -224,7 +224,7 @@ func TestSwitchboard_RegisterChan(t *testing.T) { t.Errorf("Listener is not registered by Message Type") } - Hear(message.Receive{}) + lid.listener.Hear(message.Receive{}) select { case <-ch: case <-time.After(5 * time.Millisecond): -- GitLab