diff --git a/cmix/health/callback.go b/cmix/health/callback.go new file mode 100644 index 0000000000000000000000000000000000000000..26dc504327d2f649875df0cb4c0a6d965001051c --- /dev/null +++ b/cmix/health/callback.go @@ -0,0 +1,53 @@ +package health + +import "sync" + +type trackerCallback struct { + funcs map[uint64]func(isHealthy bool) + funcsID uint64 + + mux sync.RWMutex +} + +func initTrackerCallback() *trackerCallback { + return &trackerCallback{ + funcs: map[uint64]func(isHealthy bool){}, + funcsID: 0, + } +} + +// addHealthCallback adds a function to the list of tracker functions such that +// each function can be run after network changes. Returns a unique ID for the +// function. +func (t *trackerCallback) addHealthCallback(f func(isHealthy bool), health bool) uint64 { + var currentID uint64 + + t.mux.Lock() + t.funcs[t.funcsID] = f + currentID = t.funcsID + t.funcsID++ + t.mux.Unlock() + + go f(health) + + return currentID +} + +// RemoveHealthCallback removes the function with the given ID from the list of +// tracker functions so that it will no longer be run. +func (t *trackerCallback) RemoveHealthCallback(chanID uint64) { + t.mux.Lock() + delete(t.funcs, chanID) + t.mux.Unlock() +} + +// callback calls every function with the new health state +func (t *trackerCallback) callback(health bool) { + t.mux.Lock() + defer t.mux.Unlock() + + // Run all listening functions + for _, f := range t.funcs { + go f(health) + } +} diff --git a/cmix/health/tracker.go b/cmix/health/tracker.go index 537ec320cae8341b341653e4af07c93fc37c50ce..bb4b5e3418e28cae8f5873a3566db6284f05125a 100644 --- a/cmix/health/tracker.go +++ b/cmix/health/tracker.go @@ -11,8 +11,7 @@ package health import ( - "errors" - "sync" + "sync/atomic" "time" jww "github.com/spf13/jwalterweatherman" @@ -29,108 +28,130 @@ type Monitor interface { } type tracker struct { + // timeout parameter describes how long + // without good news until the network is considered unhealthy timeout time.Duration + // channel on which new status updates are received from the network handler heartbeat chan network.Heartbeat - funcs map[uint64]func(isHealthy bool) - channelsID uint64 - funcsID uint64 - - running bool - - // Determines the current health status - isHealthy bool + // denotes the last time news was heard. Both hold ns since unix epoc + // in an atomic + lastCompletedRound *int64 + lastWaitingRound *int64 // Denotes that the past health status wasHealthy is true if isHealthy has - // ever been true - wasHealthy bool - mux sync.RWMutex + // ever been true in an atomic. + wasHealthy *uint32 + + // stores registered callbacks to receive event updates + *trackerCallback } // Init creates a single HealthTracker thread, starts it, and returns a tracker // and a stoppable. func Init(instance *network.Instance, timeout time.Duration) Monitor { - tracker := newTracker(timeout) - instance.SetNetworkHealthChan(tracker.heartbeat) - return tracker + trkr := newTracker(timeout) + instance.SetNetworkHealthChan(trkr.heartbeat) + + return trkr } // newTracker builds and returns a new tracker object given a Context. func newTracker(timeout time.Duration) *tracker { - return &tracker{ - timeout: timeout, - funcs: map[uint64]func(isHealthy bool){}, - heartbeat: make(chan network.Heartbeat, 100), - isHealthy: false, - running: false, - } -} -// AddHealthCallback adds a function to the list of tracker functions such that -// each function can be run after network changes. Returns a unique ID for the -// function. -func (t *tracker) AddHealthCallback(f func(isHealthy bool)) uint64 { - var currentID uint64 + lastCompletedRound := int64(0) + lastWaitingRound := int64(0) - t.mux.Lock() - t.funcs[t.funcsID] = f - currentID = t.funcsID - t.funcsID++ - t.mux.Unlock() + wasHealthy := uint32(0) - go f(t.IsHealthy()) + t := &tracker{ + timeout: timeout, + heartbeat: make(chan network.Heartbeat, 100), + lastCompletedRound: &lastCompletedRound, + lastWaitingRound: &lastWaitingRound, + wasHealthy: &wasHealthy, + } + t.trackerCallback = initTrackerCallback() + return t +} - return currentID +// getLastCompletedRoundTimestamp atomically loads the completed round timestamp +// and converts it to a time object, then returns it +func (t *tracker) getLastCompletedRoundTimestamp() time.Time { + return time.Unix(0, atomic.LoadInt64(t.lastCompletedRound)) } -// RemoveHealthCallback removes the function with the given ID from the list of -// tracker functions so that it will no longer be run. -func (t *tracker) RemoveHealthCallback(chanID uint64) { - t.mux.Lock() - delete(t.funcs, chanID) - t.mux.Unlock() +// getLastWaitingRoundTimestamp atomically loads the waiting round timestamp +// and converts it to a time object, then returns it +func (t *tracker) getLastWaitingRoundTimestamp() time.Time { + return time.Unix(0, atomic.LoadInt64(t.lastWaitingRound)) } +// IsHealthy returns true if the network is healthy, which is +// defined as the client having knowledge of both valid queued rounds +// and completed rounds within the last tracker.timeout seconds func (t *tracker) IsHealthy() bool { - t.mux.RLock() - defer t.mux.RUnlock() + // use the system time instead of netTime.Now() which can + // include an offset because local monotonicity is what + // matters here, not correctness relative to absolute time + now := time.Now() + + completedRecently := false + if now.Sub(t.getLastCompletedRoundTimestamp()) < t.timeout { + completedRecently = true + } + + waitingRecently := false + if now.Sub(t.getLastWaitingRoundTimestamp()) < t.timeout { + waitingRecently = true + } - return t.isHealthy + return completedRecently && waitingRecently } -// WasHealthy returns true if isHealthy has ever been true. -func (t *tracker) WasHealthy() bool { - t.mux.RLock() - defer t.mux.RUnlock() +// updateHealth atomically updates the internal +// timestamps to now if there are new waiting / completed +// rounds +func (t *tracker) updateHealth(hasWaiting, hasCompleted bool) { + // use the system time instead of netTime.Now() which can + // include an offset because local monotonicity is what + // matters here, not correctness relative to absolute time + now := time.Now().UnixNano() + + if hasWaiting { + atomic.StoreInt64(t.lastWaitingRound, now) + } - return t.wasHealthy + if hasCompleted { + atomic.StoreInt64(t.lastCompletedRound, now) + } } -func (t *tracker) setHealth(h bool) { - t.mux.Lock() - // Only set wasHealthy to true if either - // wasHealthy is true or - // wasHealthy is false but h value is true - t.wasHealthy = t.wasHealthy || h - t.isHealthy = h - t.mux.Unlock() +// forceUnhealthy cleats the internal timestamps, forcing the +// tracker into unhealthy +func (t *tracker) forceUnhealthy() { + atomic.StoreInt64(t.lastWaitingRound, 0) + atomic.StoreInt64(t.lastCompletedRound, 0) +} - t.transmit(h) +// WasHealthy returns true if isHealthy has ever been true. +func (t *tracker) WasHealthy() bool { + return atomic.LoadUint32(t.wasHealthy) == 1 } +// AddHealthCallback adds a function to the list of tracker functions such that +// each function can be run after network changes. Returns a unique ID for the +// function. +func (t *tracker) AddHealthCallback(f func(isHealthy bool)) uint64 { + return t.addHealthCallback(f, t.IsHealthy()) +} + +// StartProcesses starts running the func (t *tracker) StartProcesses() (stoppable.Stoppable, error) { - t.mux.Lock() - if t.running { - t.mux.Unlock() - return nil, errors.New( - "cannot start health tracker threads, they are already running") - } - t.running = true - t.isHealthy = false - t.mux.Unlock() + atomic.StoreUint32(t.wasHealthy, 0) stop := stoppable.NewSingle("health tracker") @@ -139,45 +160,55 @@ func (t *tracker) StartProcesses() (stoppable.Stoppable, error) { return stop, nil } -// start starts a long-running thread used to monitor and report on network +// start begins a long-running thread used to monitor and report on network // health. func (t *tracker) start(stop *stoppable.Single) { + + hasSetWasHealthy := false + lastState := false + quit := false + for { - var heartbeat network.Heartbeat + + /* wait for an event */ select { case <-stop.Quit(): - t.mux.Lock() - t.isHealthy = false - t.running = false - t.mux.Unlock() + t.forceUnhealthy() - t.transmit(false) - stop.ToStopped() + // flag the quit instead of quitting here so the + // joint signaling handler code can be triggered + quit = true - return - case heartbeat = <-t.heartbeat: - // FIXME: There's no transition to unhealthy here and there needs to - // be after some number of bad polls - if healthy(heartbeat) { - t.setHealth(true) - } + case heartbeat := <-t.heartbeat: + t.updateHealth(heartbeat.HasWaitingRound, heartbeat.IsRoundComplete) case <-time.After(t.timeout): - if !t.isHealthy { - jww.WARN.Printf("Network health tracker timed out, network " + - "is no longer healthy...") + jww.ERROR.Printf("Network health tracker timed out, network " + + "is no longer healthy, follower likely has stopped...") + // note: no need to force to unhealthy because by definition the + // timestamps will be stale + } + + /* handle the state change resulting from an event */ + + // send signals if the state has changed + newHealthState := t.IsHealthy() + if newHealthState != lastState { + // set was healthy if we are healthy and it was never set before + if newHealthState && !hasSetWasHealthy { + atomic.StoreUint32(t.wasHealthy, 1) + hasSetWasHealthy = true } - t.setHealth(false) + + //trigger downstream events + t.callback(newHealthState) + + lastState = newHealthState } - } -} -func (t *tracker) transmit(health bool) { - // Run all listening functions - for _, f := range t.funcs { - go f(health) + // quit if required to quit + if quit { + stop.ToStopped() + return + } } } - -func healthy(a network.Heartbeat) bool { - return a.IsRoundComplete -} diff --git a/cmix/params.go b/cmix/params.go index 8eb6c6c3c96d7edd28dac004bfc57df0a1d0d2f3..df692139efe88af82753e6a26453a757e5b3a37f 100644 --- a/cmix/params.go +++ b/cmix/params.go @@ -94,7 +94,7 @@ func GetDefaultParams() Params { TrackNetworkPeriod: 100 * time.Millisecond, MaxCheckedRounds: 500, RegNodesBufferLen: 1000, - NetworkHealthTimeout: 30 * time.Second, + NetworkHealthTimeout: 15 * time.Second, ParallelNodeRegistrations: 20, KnownRoundsThreshold: 1500, // 5 rounds/sec * 60 sec/min * 5 min FastPolling: true, diff --git a/go.sum b/go.sum index 398547585158c8b9af50df15e192eba16eca6239..1adbdd57d4b358f3ae55e06ee433da944b2e4824 100644 --- a/go.sum +++ b/go.sum @@ -635,6 +635,8 @@ gitlab.com/elixxir/comms v0.0.4-0.20220916185715-f1e9a5eda939 h1:+VRx2ULHKs040bB gitlab.com/elixxir/comms v0.0.4-0.20220916185715-f1e9a5eda939/go.mod h1:AO6XkMhaHJW8eXlgL5m3UUcJqsSP8F5Wm1GX+wyq/rw= gitlab.com/elixxir/comms v0.0.4-0.20221005205938-10f2defa5b33 h1:mtn/b+/+cMoZNSEo6293U48uqz+aE0si90mPwlhh08w= gitlab.com/elixxir/comms v0.0.4-0.20221005205938-10f2defa5b33/go.mod h1:oRteMH+R5t1j/FZ+KJJnZUcqJO2sLXnWksN5HPkZUIo= +gitlab.com/elixxir/comms v0.0.4-0.20221010233602-6ed8c94ddac0 h1:Z8VcCdfmA1AHlGdPe/L8QSGhbjSW2NyCNrDxzByfuqI= +gitlab.com/elixxir/comms v0.0.4-0.20221010233602-6ed8c94ddac0/go.mod h1:oRteMH+R5t1j/FZ+KJJnZUcqJO2sLXnWksN5HPkZUIo= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp07WQmhA= gitlab.com/elixxir/crypto v0.0.7-0.20220913220142-ab0771bad0af/go.mod h1:QF8SzsrYh9Elip9EUYUDAhPjqO9DGrrrQxYHvn+VXok= @@ -663,6 +665,7 @@ gitlab.com/xx_network/comms v0.0.4-0.20220913215811-c4bf83b27de3 h1:7mReTvEUVoI5 gitlab.com/xx_network/comms v0.0.4-0.20220913215811-c4bf83b27de3/go.mod h1:E2QKOKyPKLRjLUwMxgZpTKueEsHDEqshfqOHJ54ttxU= gitlab.com/xx_network/comms v0.0.4-0.20220916185248-8a984b8594de h1:44VKuVgT6X1l+MX8/oNmYORA+pa4nkOWV8hYxi4SCzc= gitlab.com/xx_network/comms v0.0.4-0.20220916185248-8a984b8594de/go.mod h1:E2QKOKyPKLRjLUwMxgZpTKueEsHDEqshfqOHJ54ttxU= +gitlab.com/xx_network/comms v0.0.4-0.20221005205845-b34d538ffd85 h1:bX2IYFnEbWTNGhZHfzHME19pkfD4Q7oTxFGI70PM2PM= gitlab.com/xx_network/comms v0.0.4-0.20221005205845-b34d538ffd85/go.mod h1:E2QKOKyPKLRjLUwMxgZpTKueEsHDEqshfqOHJ54ttxU= gitlab.com/xx_network/crypto v0.0.3/go.mod h1:DF2HYvvCw9wkBybXcXAgQMzX+MiGbFPjwt3t17VRqRE= gitlab.com/xx_network/crypto v0.0.4/go.mod h1:+lcQEy+Th4eswFgQDwT0EXKp4AXrlubxalwQFH5O0Mk=