Skip to content
Snippets Groups Projects
Commit be2fcb10 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

fixed health treacker, changed client thread handling, fixed tests

parent 9ca49ea5
No related branches found
No related tags found
No related merge requests found
......@@ -42,8 +42,8 @@ func newStatusTracker() *statusTracker {
func (s *statusTracker) toStarting() error {
if !atomic.CompareAndSwapUint32(s.s, uint32(Stopped), uint32(Starting)) {
return errors.Errorf("Failed to move to '%s' status, at '%s', "+
"must be at '%s' for transition", Starting, atomic.LoadUint32(s.s),
Stopped)
"must be at '%s' for transition", Starting,
Status(atomic.LoadUint32(s.s)), Stopped)
}
return nil
}
......@@ -51,8 +51,8 @@ func (s *statusTracker) toStarting() error {
func (s *statusTracker) toRunning() error {
if !atomic.CompareAndSwapUint32(s.s, uint32(Starting), uint32(Running)) {
return errors.Errorf("Failed to move to '%s' status, at '%s', "+
"must be at '%s' for transition", Running, atomic.LoadUint32(s.s),
Starting)
"must be at '%s' for transition",
Running, Status(atomic.LoadUint32(s.s)), Starting)
}
return nil
}
......@@ -60,8 +60,8 @@ func (s *statusTracker) toRunning() error {
func (s *statusTracker) toStopping() error {
if !atomic.CompareAndSwapUint32(s.s, uint32(Running), uint32(Stopping)) {
return errors.Errorf("Failed to move to '%s' status, at '%s',"+
" must be at '%s' for transition", Stopping, atomic.LoadUint32(s.s),
Running)
" must be at '%s' for transition", Stopping,
Status(atomic.LoadUint32(s.s)), Running)
}
return nil
}
......@@ -69,8 +69,8 @@ func (s *statusTracker) toStopping() error {
func (s *statusTracker) toStopped() error {
if !atomic.CompareAndSwapUint32(s.s, uint32(Stopping), uint32(Stopped)) {
return errors.Errorf("Failed to move to '%s' status, at '%s',"+
" must be at '%s' for transition", Stopped, atomic.LoadUint32(s.s),
Stopping)
" must be at '%s' for transition", Stopped,
Status(atomic.LoadUint32(s.s)), Stopping)
}
return nil
}
......
......
......@@ -16,7 +16,7 @@ func (c *Client) Version() version.Version {
func (c *Client) checkVersion() error {
clientVersion := c.Version()
jww.INFO.Printf("Client Version: %s", clientVersion)
jww.INFO.Printf("Client Version: %s", clientVersion.String())
has, netVersion, err := c.permissioning.GetNetworkVersion()
if err != nil {
......@@ -29,7 +29,7 @@ func (c *Client) checkVersion() error {
return errors.Errorf("Client and Minimum Network Version are "+
"incompatible\n"+
"\tMinimum Network: %s\n"+
"\tClient: %s", netVersion, clientVersion)
"\tClient: %s", netVersion.String(), clientVersion.String())
}
} else {
jww.WARN.Printf("Network requires no minnimim version")
......
......
......@@ -35,7 +35,7 @@ type BindingsClient struct {
// Users of this function should delete the storage directory on error.
func NewClient(network, storageDir string, password []byte) (Client, error) {
// TODO: This should wrap the bindings ClientImpl, when available.
client, err := api.NewClient(network, storageDir, password)
client, err := api.NewClient(network, storageDir, password, "")
if err != nil {
return nil, err
}
......
......
......@@ -8,7 +8,6 @@ package bindings
import (
"gitlab.com/elixxir/client/api"
"gitlab.com/elixxir/client/stoppable"
)
// Client is defined inside the api package. At minimum, it implements all of
......@@ -121,7 +120,7 @@ type Client interface {
// and returns an object for checking state and stopping those threads.
// Call this when returning from sleep and close when going back to
// sleep.
StartNetworkRunner() stoppable.Stoppable
StartNetworkFollower() error
// RegisterRoundEventsHandler registers a callback interface for round
// events.
......
......
......@@ -9,6 +9,7 @@
package health
import (
"errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/comms/network"
......@@ -24,7 +25,7 @@ type Tracker struct {
channels []chan bool
funcs []func(isHealthy bool)
*stoppable.Single
running bool
isHealthy bool
mux sync.RWMutex
......@@ -46,7 +47,7 @@ func newTracker(timeout time.Duration) *Tracker {
channels: make([]chan bool, 0),
heartbeat: make(chan network.Heartbeat, 100),
isHealthy: false,
Single: stoppable.NewSingle("Health Tracker"),
running: false,
}
}
......@@ -75,13 +76,30 @@ func (t *Tracker) setHealth(h bool) {
t.transmit(h)
}
func (t *Tracker) Start() {
if t.Single.IsRunning() {
jww.FATAL.Panicf("Cannot start the health tracker when it " +
"is already running")
func (t *Tracker) Start() (stoppable.Stoppable, error) {
t.mux.Lock()
defer t.mux.Unlock()
if t.running {
return nil, errors.New("cannot start Health tracker threads, " +
"they are already running")
}
t.running = true
t.isHealthy = false
stop := stoppable.NewSingle("Health Tracker")
stopCleanup := stoppable.NewCleanup(stop, func(duration time.Duration) error {
t.mux.Lock()
defer t.mux.Unlock()
t.isHealthy = false
t.transmit(false)
t.running = false
return nil
})
go t.start(stop.Quit())
go t.start(t.Quit())
return stopCleanup, nil
}
// Long-running thread used to monitor and report on network health
......
......
......@@ -96,8 +96,11 @@ func (m *manager) Follow() (stoppable.Stoppable, error) {
multi := stoppable.NewMulti("networkManager")
// health tracker
m.Health.Start()
multi.Add(m.Health)
healthStop, err := m.Health.Start()
if err != nil {
return nil, errors.Errorf("failed to follow")
}
multi.Add(healthStop)
// Node Updates
multi.Add(node.StartRegistration(m.Instance, m.Session, m.Rng,
......
......
......@@ -29,7 +29,6 @@ type historicalRoundsComms interface {
func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-chan struct{}) {
timerCh := make(<-chan time.Time)
hasTimer := false
rng := m.Rng.GetStream()
var rounds []uint64
......@@ -44,7 +43,6 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
done = true
// if the timer elapses process rounds to ensure the delay isn't too long
case <-timerCh:
hasTimer = false
if len(rounds) > 0 {
shouldProcess = true
}
......@@ -53,7 +51,8 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
rounds = append(rounds, uint64(rid))
if len(rounds) > int(m.params.MaxHistoricalRounds) {
shouldProcess = true
} else if !hasTimer {
} else if len(rounds) == 1 {
//if this is the first round, start the timeout
timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C
}
}
......@@ -61,6 +60,7 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
continue
}
//find a gateway to request about the rounds
gwHost, err := gateway.Get(m.Instance.GetPartialNdf().Get(), comm, rng)
if err != nil {
jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+
......@@ -75,9 +75,10 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
if err != nil {
jww.ERROR.Printf("Failed to request historical rounds "+
"data: %s", response)
// if the check fails to resolve, break the loop so they will be
// if the check fails to resolve, break the loop and so they will be
// checked again
break
timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C
continue
}
for i, roundInfo := range response.Rounds {
if roundInfo == nil {
......
......
......@@ -14,7 +14,7 @@ import (
// Returns the round ID of the round the payload was sent or an error
// if it fails.
func (m *manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) {
if !m.Health.IsRunning() {
if !m.Health.IsHealthy() {
return 0, errors.New("Cannot send cmix message when the " +
"network is not healthy")
}
......@@ -28,7 +28,7 @@ func (m *manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err
// NOTE: Do not use this function unless you know what you are doing.
// This function always produces an error message in client logging.
func (m *manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) {
if !m.Health.IsRunning() {
if !m.Health.IsHealthy() {
return nil, errors.New("cannot send unsafe message when the " +
"network is not healthy")
}
......@@ -46,7 +46,7 @@ func (m *manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round,
func (m *manager) SendE2E(msg message.Send, e2eP params.E2E) (
[]id.Round, error) {
if !m.Health.IsRunning() {
if !m.Health.IsHealthy() {
return nil, errors.New("Cannot send e2e message when the " +
"network is not healthy")
}
......
......
......@@ -37,7 +37,7 @@ func (c *Cleanup) IsRunning() bool {
// Name returns the name of the stoppable denoting it has cleanup.
func (c *Cleanup) Name() string {
return Name() + " with cleanup"
return c.stop.Name() + " with cleanup"
}
// Close stops the contained stoppable and runs the cleanup function after. The
......@@ -51,9 +51,9 @@ func (c *Cleanup) Close(timeout time.Duration) error {
start := time.Now()
// Run the stoppable
if err := Close(timeout); err != nil {
if err := c.stop.Close(timeout); err != nil {
err = errors.WithMessagef(err, "Cleanup for %s not executed",
Name())
c.stop.Name())
return
}
......@@ -71,10 +71,10 @@ func (c *Cleanup) Close(timeout time.Duration) error {
case err := <-complete:
if err != nil {
err = errors.WithMessagef(err, "Cleanup for %s failed",
Name())
c.stop.Name())
}
case <-timer.C:
err = errors.Errorf("Clean up for %s timeout", Name())
err = errors.Errorf("Clean up for %s timeout", c.stop.Name())
}
})
......
......
......@@ -43,7 +43,7 @@ func (m *Multi) Name() string {
m.mux.RLock()
names := m.name + ": {"
for _, s := range m.stoppables {
names += Name() + ", "
names += s.Name() + ", "
}
if len(m.stoppables) > 0 {
names = names[:len(names)-2]
......@@ -69,7 +69,7 @@ func (m *Multi) Close(timeout time.Duration) error {
for _, stoppable := range m.stoppables {
wg.Add(1)
go func(stoppable Stoppable) {
if Close(timeout) != nil {
if stoppable.Close(timeout) != nil {
atomic.AddUint32(&numErrors, 1)
}
wg.Done()
......
......
......@@ -14,34 +14,13 @@ import (
"gitlab.com/elixxir/ekv"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"os"
"testing"
)
// Most of these tests use the same Store
// So keep that in mind when designing tests
var testStore *Store
// Main testing function
func TestMain(m *testing.M) {
kv := make(ekv.Memstore)
vkv := versioned.NewKV(kv)
grp := cyclic.NewGroup(large.NewInt(173), large.NewInt(2))
priv := grp.NewInt(2)
testStore, _ = NewStore(grp, vkv, priv)
runFunc := func() int {
return m.Run()
}
os.Exit(runFunc())
}
// Happy path Add/Done test
func TestStore_AddRemove(t *testing.T) {
testStore, _ := makeTestStore()
nodeId := id.NewIdFromString("test", id.Node, t)
key := testStore.grp.NewInt(5)
......@@ -60,13 +39,16 @@ func TestStore_AddRemove(t *testing.T) {
// Happy path
func TestLoadStore(t *testing.T) {
testStore, kv := makeTestStore()
// Add a test node key
nodeId := id.NewIdFromString("test", id.Node, t)
key := testStore.grp.NewInt(5)
testStore.Add(nodeId, key)
// Load the store and check its attributes
store, err := LoadStore(testStore.kv)
store, err := LoadStore(kv)
if err != nil {
t.Errorf("Unable to load store: %+v", err)
}
......@@ -83,6 +65,7 @@ func TestLoadStore(t *testing.T) {
// Happy path
func TestStore_GetRoundKeys(t *testing.T) {
testStore, _ := makeTestStore()
// Set up the circuit
numIds := 10
nodeIds := make([]*id.ID, numIds)
......@@ -107,6 +90,7 @@ func TestStore_GetRoundKeys(t *testing.T) {
// Missing keys path
func TestStore_GetRoundKeys_Missing(t *testing.T) {
testStore, _ := makeTestStore()
// Set up the circuit
numIds := 10
nodeIds := make([]*id.ID, numIds)
......@@ -165,3 +149,17 @@ func TestNewStore(t *testing.T) {
t.Errorf("Failed to set store.kv")
}
}
// Main testing function
func makeTestStore() (*Store, *versioned.KV) {
kv := make(ekv.Memstore)
vkv := versioned.NewKV(kv)
grp := cyclic.NewGroup(large.NewInt(173), large.NewInt(2))
priv := grp.NewInt(2)
testStore, _ := NewStore(grp, vkv, priv)
return testStore, vkv
}
\ No newline at end of file
package storage
import (
"gitlab.com/elixxir/client/storage/versioned"
"github.com/pkg/errors"
"time"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/storage/versioned"
"time"
)
const regCodeKey = "regCode"
......
......
......@@ -155,7 +155,7 @@ func TestSwitchboard_RegisterFunc(t *testing.T) {
t.Errorf("Listener is not registered by Message Type")
}
Hear(message.Receive{})
lid.listener.Hear(message.Receive{})
if !heard {
t.Errorf("Func listener not registered correctly")
}
......@@ -224,7 +224,7 @@ func TestSwitchboard_RegisterChan(t *testing.T) {
t.Errorf("Listener is not registered by Message Type")
}
Hear(message.Receive{})
lid.listener.Hear(message.Receive{})
select {
case <-ch:
case <-time.After(5 * time.Millisecond):
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment