diff --git a/bindings/client.go b/bindings/client.go index aaaf1ac8faccce2855b491a3d5f04764e7d0cc83..432ba95160cf5373b8664e044b5d61e3098a60d5 100644 --- a/bindings/client.go +++ b/bindings/client.go @@ -232,7 +232,7 @@ func (c *Client) StopNetworkFollower() error { func (c *Client) WaitForNetwork(timeoutMS int) bool { start := netTime.Now() timeout := time.Duration(timeoutMS) * time.Millisecond - for netTime.Now().Sub(start) < timeout { + for netTime.Since(start) < timeout { if c.api.GetHealth().IsHealthy() { return true } @@ -256,10 +256,15 @@ func (c *Client) IsNetworkHealthy() bool { return c.api.GetHealth().IsHealthy() } -// registers the network health callback to be called any time the network -// health changes -func (c *Client) RegisterNetworkHealthCB(nhc NetworkHealthCallback) { - c.api.GetHealth().AddFunc(nhc.Callback) +// RegisterNetworkHealthCB registers the network health callback to be called +// any time the network health changes. Returns a unique ID that can be used to +// unregister the network health callback. +func (c *Client) RegisterNetworkHealthCB(nhc NetworkHealthCallback) int64 { + return int64(c.api.GetHealth().AddFunc(nhc.Callback)) +} + +func (c *Client) UnregisterNetworkHealthCB(funcID int64) { + c.api.GetHealth().RemoveFunc(uint64(funcID)) } // RegisterListener records and installs a listener for messages diff --git a/interfaces/healthTracker.go b/interfaces/healthTracker.go index 39441984ee0088b9e82e33e7b7a11ab689288f00..0d746d50f73bc1215201c413e5d6d83e54bbbb55 100644 --- a/interfaces/healthTracker.go +++ b/interfaces/healthTracker.go @@ -8,8 +8,10 @@ package interfaces type HealthTracker interface { - AddChannel(chan bool) - AddFunc(f func(bool)) + AddChannel(chan bool) uint64 + RemoveChannel(uint64) + AddFunc(f func(bool)) uint64 + RemoveFunc(uint64) IsHealthy() bool WasHealthy() bool } diff --git a/network/health/tracker.go b/network/health/tracker.go index 1fc5e8cfb7285916192e34918e5fa9068b06a928..1e6dfdd97a0087d304f942ae2c6c3c0edf8f1991 100644 --- a/network/health/tracker.go +++ b/network/health/tracker.go @@ -5,7 +5,8 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -// Contains functionality related to the event model driven network health tracker +// Contains functionality related to the event model driven network health +// tracker. package health @@ -23,70 +24,108 @@ type Tracker struct { heartbeat chan network.Heartbeat - channels []chan bool - funcs []func(isHealthy bool) + channels map[uint64]chan bool + funcs map[uint64]func(isHealthy bool) + channelsID uint64 + funcsID uint64 running bool // Determines the current health status isHealthy bool - // Denotes the past health status - // wasHealthy is true if isHealthy has ever been true + + // Denotes that the past health status wasHealthy is true if isHealthy has + // ever been true wasHealthy bool mux sync.RWMutex } -// Creates a single HealthTracker thread, starts it, and returns a tracker and a stoppable +// Init creates a single HealthTracker thread, starts it, and returns a tracker +// and a stoppable. func Init(instance *network.Instance, timeout time.Duration) *Tracker { - tracker := newTracker(timeout) instance.SetNetworkHealthChan(tracker.heartbeat) return tracker } -// Builds and returns a new Tracker object given a Context +// newTracker builds and returns a new Tracker object given a Context. func newTracker(timeout time.Duration) *Tracker { return &Tracker{ timeout: timeout, - channels: make([]chan bool, 0), + channels: map[uint64]chan bool{}, + funcs: map[uint64]func(isHealthy bool){}, heartbeat: make(chan network.Heartbeat, 100), isHealthy: false, running: false, } } -// Add a channel to the list of Tracker channels -// such that each channel can be notified of network changes -func (t *Tracker) AddChannel(c chan bool) { +// AddChannel adds a channel to the list of Tracker channels such that each +// channel can be notified of network changes. Returns a unique ID for the +// channel. +func (t *Tracker) AddChannel(c chan bool) uint64 { + var currentID uint64 + t.mux.Lock() - t.channels = append(t.channels, c) + t.channels[t.channelsID] = c + currentID = t.channelsID + t.channelsID++ t.mux.Unlock() + select { case c <- t.IsHealthy(): default: } + + return currentID } -// Add a function to the list of Tracker function -// such that each function can be run after network changes -func (t *Tracker) AddFunc(f func(isHealthy bool)) { +// RemoveChannel removes the channel with the given ID from the list of Tracker +// channels so that it will not longer be notified of network changes. +func (t *Tracker) RemoveChannel(chanID uint64) { t.mux.Lock() - t.funcs = append(t.funcs, f) + delete(t.channels, chanID) t.mux.Unlock() +} + +// AddFunc adds a function to the list of Tracker functions such that each +// function can be run after network changes. Returns a unique ID for the +// function. +func (t *Tracker) AddFunc(f func(isHealthy bool)) uint64 { + var currentID uint64 + + t.mux.Lock() + t.funcs[t.funcsID] = f + currentID = t.funcsID + t.funcsID++ + t.mux.Unlock() + go f(t.IsHealthy()) + + return currentID +} + +// RemoveFunc removes the function with the given ID from the list of Tracker +// functions so that it will not longer be run. +func (t *Tracker) RemoveFunc(chanID uint64) { + t.mux.Lock() + delete(t.channels, chanID) + t.mux.Unlock() } func (t *Tracker) IsHealthy() bool { t.mux.RLock() defer t.mux.RUnlock() + return t.isHealthy } -// Returns true if isHealthy has ever been true +// WasHealthy returns true if isHealthy has ever been true. func (t *Tracker) WasHealthy() bool { t.mux.RLock() defer t.mux.RUnlock() + return t.wasHealthy } @@ -94,10 +133,11 @@ func (t *Tracker) setHealth(h bool) { t.mux.Lock() // Only set wasHealthy to true if either // wasHealthy is true or - // wasHealthy false but h value is true + // wasHealthy is false but h value is true t.wasHealthy = t.wasHealthy || h t.isHealthy = h t.mux.Unlock() + t.transmit(h) } @@ -119,7 +159,8 @@ func (t *Tracker) Start() (stoppable.Stoppable, error) { return stop, nil } -// Long-running thread used to monitor and report on network health +// start starts a long-running thread used to monitor and report on network +// health. func (t *Tracker) start(stop *stoppable.Single) { timer := time.NewTimer(t.timeout) @@ -131,8 +172,10 @@ func (t *Tracker) start(stop *stoppable.Single) { t.isHealthy = false t.running = false t.mux.Unlock() + t.transmit(false) stop.ToStopped() + return case heartbeat = <-t.heartbeat: if healthy(heartbeat) { diff --git a/network/health/tracker_test.go b/network/health/tracker_test.go index 4a10843c36ef23bcd3dc8cfbb5699e71a9643e78..a2e20651adaa06781f4d685cb5502cd5b56faae0 100644 --- a/network/health/tracker_test.go +++ b/network/health/tracker_test.go @@ -9,12 +9,11 @@ package health import ( "gitlab.com/elixxir/comms/network" - // "gitlab.com/elixxir/comms/network" "testing" "time" ) -// Happy path smoke test +// Happy path smoke test. func TestNewTracker(t *testing.T) { // Initialize required variables timeout := 250 * time.Millisecond @@ -49,8 +48,7 @@ func TestNewTracker(t *testing.T) { // Begin the health tracker _, err := tracker.Start() if err != nil { - t.Errorf("Unable to start tracker: %+v", err) - return + t.Fatalf("Unable to start tracker: %+v", err) } // Send a positive health heartbeat @@ -68,14 +66,12 @@ func TestNewTracker(t *testing.T) { // Verify the network was marked as healthy if !tracker.IsHealthy() { - t.Errorf("Tracker did not become healthy") - return + t.Fatal("Tracker did not become healthy.") } // Check if the tracker was ever healthy if !tracker.WasHealthy() { - t.Errorf("Tracker did not become healthy") - return + t.Fatal("Tracker did not become healthy.") } // Verify the heartbeat triggered the listening chan/func @@ -89,15 +85,12 @@ func TestNewTracker(t *testing.T) { // Verify the network was marked as NOT healthy if tracker.IsHealthy() { - t.Errorf("Tracker should not report healthy") - return + t.Fatal("Tracker should not report healthy.") } - // Check if the tracker was ever healthy, - // after setting healthy to false + // Check if the tracker was ever healthy, after setting healthy to false if !tracker.WasHealthy() { - t.Errorf("Tracker was healthy previously but not reported healthy") - return + t.Fatal("Tracker was healthy previously but not reported healthy.") } // Verify the timeout triggered the listening chan/func diff --git a/network/message/garbled.go b/network/message/garbled.go index 3a828b5eed2584ff323df7cdbb6016726c9dfa18..e8fb10cbe49845f7370639877c4dcdba45342a39 100644 --- a/network/message/garbled.go +++ b/network/message/garbled.go @@ -12,7 +12,7 @@ import ( "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/primitives/format" - "time" + "gitlab.com/xx_network/primitives/netTime" ) // Messages can arrive in the network out of order. When message handling fails @@ -81,7 +81,7 @@ func (m *Manager) handleGarbledMessages() { // unless it is the last attempts and has been in the buffer long // enough, in which case remove it if count == m.param.MaxChecksGarbledMessage && - time.Since(timestamp) > m.param.GarbledMessageWait { + netTime.Since(timestamp) > m.param.GarbledMessageWait { garbledMsgs.Remove(grbldMsg) } else { failedMsgs = append(failedMsgs, grbldMsg) diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index d011a533cb05aad5ecdf801267cb0db7a5f6055d..d70cd6221a32fd84d40e9b4afba0d60f22691456 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -57,7 +57,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, "(msgDigest: %s)", recipient, msg.Digest()) for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries++ { - elapsed := netTime.Now().Sub(timeStart) + elapsed := netTime.Since(timeStart) if elapsed > cmixParams.Timeout { jww.INFO.Printf("No rounds to send to %s (msgDigest: %s) "+ @@ -73,11 +73,15 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, remainingTime := cmixParams.Timeout - elapsed //find the best round to send to, excluding attempted rounds - bestRound, _ := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer) + bestRound, err := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer) + if err!=nil{ + jww.WARN.Printf("Failed to GetUpcomingRealtime (msgDigest: %s): %+v", msg.Digest(), err) + } if bestRound == nil { continue } + //add the round on to the list of attempted so it is not tried again attempted.Insert(bestRound) diff --git a/network/message/sendCmixUtils.go b/network/message/sendCmixUtils.go index c1d8e295885b1dadaa018ea881d07b1896625d87..d3f2c74a01725e191403748fb51608ccf36b7d8c 100644 --- a/network/message/sendCmixUtils.go +++ b/network/message/sendCmixUtils.go @@ -32,8 +32,8 @@ type sendCmixCommsInterface interface { SendPutManyMessages(host *connect.Host, messages *pb.GatewaySlots) (*pb.GatewaySlotResponse, error) } -// 2.5 seconds -const sendTimeBuffer = 2500 * time.Millisecond +// how much in the future a round needs to be to send to it +const sendTimeBuffer = 1000 * time.Millisecond const unrecoverableError = "failed with an unrecoverable error" // handlePutMessageError handles errors received from a PutMessage or a diff --git a/network/message/sendManyCmix.go b/network/message/sendManyCmix.go index e9d598182bb805fc3220804389af43456eccf022..d9e098715a5032c90ed2bef7438f15ce2714f890 100644 --- a/network/message/sendManyCmix.go +++ b/network/message/sendManyCmix.go @@ -71,7 +71,7 @@ func sendManyCmixHelper(sender *gateway.Sender, msgs map[id.ID]format.Message, "(msgDigest: %s)", recipientString, msgDigests) for numRoundTries := uint(0); numRoundTries < param.RoundTries; numRoundTries++ { - elapsed := netTime.Now().Sub(timeStart) + elapsed := netTime.Since(timeStart) if elapsed > param.Timeout { jww.INFO.Printf("No rounds to send to %s (msgDigest: %s) were found "+ diff --git a/single/transmission.go b/single/transmission.go index bbe41371d7252b29bf78b87bed37088af95c733c..c896eb7e954e55fddc7b99513635461dc755de2f 100644 --- a/single/transmission.go +++ b/single/transmission.go @@ -138,7 +138,7 @@ func (m *Manager) transmitSingleUse(partner contact2.Contact, payload []byte, } // Update the timeout for the elapsed time - roundEventTimeout := timeout - netTime.Now().Sub(timeStart) - time.Millisecond + roundEventTimeout := timeout - netTime.Since(timeStart) - time.Millisecond // Check message delivery sendResults := make(chan ds.EventReturn, 1) diff --git a/ud/manager.go b/ud/manager.go index 1397b8e2d8d8acc7081c307ef3ea2610e1aaae1c..1b80b1ad22bcb07fb94a5944a78000c2ef56f45d 100644 --- a/ud/manager.go +++ b/ud/manager.go @@ -51,9 +51,9 @@ type Manager struct { // updated NDF is available and will error if one is not. func NewManager(client *api.Client, single *single.Manager) (*Manager, error) { jww.INFO.Println("ud.NewManager()") - if !client.GetHealth().IsHealthy() { - return nil, errors.New("cannot start UD Manager when network was " + - "never healthy.") + if client.NetworkFollowerStatus() != api.Running { + return nil, errors.New("cannot start UD Manager when network follower is not " + + "running.") } m := &Manager{