diff --git a/api/client.go b/api/client.go index ac47598a9869e0be6a74f0937979de4cd909c362..95a05f6bd1c0308d50bbf06071bd7338071cc732 100644 --- a/api/client.go +++ b/api/client.go @@ -30,6 +30,7 @@ import ( "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" + "math" "sync" "time" ) @@ -235,6 +236,8 @@ func Login(storageDir string, password []byte, parameters params.Network) (*Clie if def.Notification.Address != "" { hp := connect.GetDefaultHostParams() + // Client will not send KeepAlive packets + hp.KaClientOpts.Time = time.Duration(math.MaxInt64) hp.AuthEnabled = false hp.MaxRetries = 5 _, err = c.comms.AddHost(&id.NotificationBot, def.Notification.Address, []byte(def.Notification.TlsCertificate), hp) @@ -392,11 +395,10 @@ func (c *Client) StartNetworkFollower(timeout time.Duration) (<-chan interfaces. jww.INFO.Printf("StartNetworkFollower() \n\tTransmisstionID: %s "+ "\n\tReceptionID: %s", u.TransmissionID, u.ReceptionID) - if status := c.status.get(); status != Stopped{ + if status := c.status.get(); status != Stopped { return nil, errors.Errorf("Cannot Stop the Network Follower when it is not running, status: %s", status) } - c.clientErrorChannel = make(chan interfaces.ClientError, 1000) cer := func(source, message, trace string) { @@ -456,7 +458,7 @@ func (c *Client) StopNetworkFollower() error { c.followerLock.Lock() defer c.followerLock.Unlock() - if status := c.status.get(); status != Running{ + if status := c.status.get(); status != Running { return errors.Errorf("Cannot Stop the Network Follower when it is not running, status: %s", status) } diff --git a/api/version_vars.go b/api/version_vars.go index ab6b7356d36db61e2e5d548fa1893167ddcd05d5..494f2d6a7cee377c706e92d3268951f85a464490 100644 --- a/api/version_vars.go +++ b/api/version_vars.go @@ -1,9 +1,9 @@ // Code generated by go generate; DO NOT EDIT. // This file was generated by robots at -// 2021-05-24 13:15:09.546928 -0500 CDT m=+0.071553794 +// 2021-06-22 11:16:35.397077 -0500 CDT m=+0.025546669 package api -const GITVERSION = `c85adeb9 Merge branch 'Anne/CI2' into 'release'` +const GITVERSION = `b7692cd7 added Keepalive opts` const SEMVER = "2.7.0" const DEPENDENCIES = `module gitlab.com/elixxir/client @@ -11,7 +11,7 @@ go 1.13 require ( github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 - github.com/golang/protobuf v1.4.3 + github.com/golang/protobuf v1.5.2 github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect github.com/magiconair/properties v1.8.4 // indirect github.com/mitchellh/mapstructure v1.4.0 // indirect @@ -24,21 +24,18 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/viper v1.7.1 gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 - gitlab.com/elixxir/comms v0.0.4-0.20210524170509-89dd425cb228 - gitlab.com/elixxir/crypto v0.0.7-0.20210524170447-264b215ce90b + gitlab.com/elixxir/comms v0.0.4-0.20210622161439-b694033c9507 + gitlab.com/elixxir/crypto v0.0.7-0.20210614155844-c1e9c23a6ba7 gitlab.com/elixxir/ekv v0.1.5 - gitlab.com/elixxir/primitives v0.0.3-0.20210524170524-9780695d2b55 - gitlab.com/xx_network/comms v0.0.4-0.20210524170426-175f698a7b07 - gitlab.com/xx_network/crypto v0.0.5-0.20210524170434-dc9a398a2581 - gitlab.com/xx_network/primitives v0.0.4-0.20210524170438-ab712af183db + gitlab.com/elixxir/primitives v0.0.3-0.20210614155726-ebcf2d47a527 + gitlab.com/xx_network/comms v0.0.4-0.20210622161535-4f3d927d4c8c + gitlab.com/xx_network/crypto v0.0.5-0.20210614155554-8c333814205b + gitlab.com/xx_network/primitives v0.0.4-0.20210617180018-6472489fd418 golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 - golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 - golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect + golang.org/x/net v0.0.0-20210525063256-abc453219eb5 google.golang.org/genproto v0.0.0-20210105202744-fe13368bc0e1 // indirect - google.golang.org/grpc v1.34.0 // indirect - google.golang.org/protobuf v1.26.0-rc.1 + google.golang.org/protobuf v1.26.0 gopkg.in/ini.v1 v1.62.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect ) - -replace google.golang.org/grpc => github.com/grpc/grpc-go v1.27.1 ` diff --git a/bindings/client.go b/bindings/client.go index 898c609bdfa1e61f1b40fd25186b427cf8a8f949..869ccfe1d03d7089bb38712c9d01d265da287a14 100644 --- a/bindings/client.go +++ b/bindings/client.go @@ -232,7 +232,7 @@ func (c *Client) StopNetworkFollower() error { func (c *Client) WaitForNetwork(timeoutMS int) bool { start := netTime.Now() timeout := time.Duration(timeoutMS) * time.Millisecond - for netTime.Now().Sub(start) < timeout { + for netTime.Since(start) < timeout { if c.api.GetHealth().IsHealthy() { return true } @@ -256,10 +256,15 @@ func (c *Client) IsNetworkHealthy() bool { return c.api.GetHealth().IsHealthy() } -// registers the network health callback to be called any time the network -// health changes -func (c *Client) RegisterNetworkHealthCB(nhc NetworkHealthCallback) { - c.api.GetHealth().AddFunc(nhc.Callback) +// RegisterNetworkHealthCB registers the network health callback to be called +// any time the network health changes. Returns a unique ID that can be used to +// unregister the network health callback. +func (c *Client) RegisterNetworkHealthCB(nhc NetworkHealthCallback) int64 { + return int64(c.api.GetHealth().AddFunc(nhc.Callback)) +} + +func (c *Client) UnregisterNetworkHealthCB(funcID int64) { + c.api.GetHealth().RemoveFunc(uint64(funcID)) } // RegisterListener records and installs a listener for messages diff --git a/cmd/getndf.go b/cmd/getndf.go index 7ac150020c911d695f646993e3a409c18d6cb8da..7562acd153adc1257667faacb4d4a0ccbfbc2e74 100644 --- a/cmd/getndf.go +++ b/cmd/getndf.go @@ -70,8 +70,8 @@ var getNDFCmd = &cobra.Command{ Partial: &pb.NDFHash{ Hash: nil, }, - LastUpdate: uint64(0), - ReceptionID: dummyID[:], + LastUpdate: uint64(0), + ReceptionID: dummyID[:], ClientVersion: []byte(api.SEMVER), } resp, err := comms.SendPoll(host, pollMsg) diff --git a/go.mod b/go.mod index 751d38d99f09935e88760efb7eb2ba47be7d2755..a1e3dcadd27b247e95f93aa918215e8ac4abb852 100644 --- a/go.mod +++ b/go.mod @@ -17,13 +17,13 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/viper v1.7.1 gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 - gitlab.com/elixxir/comms v0.0.4-0.20210614160131-de90e88a68cc + gitlab.com/elixxir/comms v0.0.4-0.20210622161439-b694033c9507 gitlab.com/elixxir/crypto v0.0.7-0.20210614155844-c1e9c23a6ba7 gitlab.com/elixxir/ekv v0.1.5 gitlab.com/elixxir/primitives v0.0.3-0.20210614155726-ebcf2d47a527 - gitlab.com/xx_network/comms v0.0.4-0.20210614155654-191473de2702 + gitlab.com/xx_network/comms v0.0.4-0.20210622161535-4f3d927d4c8c gitlab.com/xx_network/crypto v0.0.5-0.20210614155554-8c333814205b - gitlab.com/xx_network/primitives v0.0.4-0.20210608160426-670aab2d82cf + gitlab.com/xx_network/primitives v0.0.4-0.20210617180018-6472489fd418 golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 golang.org/x/net v0.0.0-20210525063256-abc453219eb5 google.golang.org/genproto v0.0.0-20210105202744-fe13368bc0e1 // indirect diff --git a/go.sum b/go.sum index 7f1f0b4b1edfbdfcf92ef60042ae87ba814e74ce..be6929544fff09ff9ef8ff85bdda4fa4e8144b65 100644 --- a/go.sum +++ b/go.sum @@ -247,8 +247,8 @@ github.com/zeebo/pcg v1.0.0 h1:dt+dx+HvX8g7Un32rY9XWoYnd0NmKmrIzpHF7qiTDj0= github.com/zeebo/pcg v1.0.0/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 h1:Gi6rj4mAlK0BJIk1HIzBVMjWNjIUfstrsXC2VqLYPcA= gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228/go.mod h1:H6jztdm0k+wEV2QGK/KYA+MY9nj9Zzatux/qIvDDv3k= -gitlab.com/elixxir/comms v0.0.4-0.20210614160131-de90e88a68cc h1:MB+Ixmz/0eWt2akBYajjH2h53GF0NjWn4Oy4Q7x4QVg= -gitlab.com/elixxir/comms v0.0.4-0.20210614160131-de90e88a68cc/go.mod h1:JeCKUXRS9xP3YYGPl4+OMFdvtt7ySJIxEsL9AzgeCu0= +gitlab.com/elixxir/comms v0.0.4-0.20210622161439-b694033c9507 h1:uH64CKk3PVzo0v9FbII3XYNiZIoQz2pOb7Fnu5L/Ba8= +gitlab.com/elixxir/comms v0.0.4-0.20210622161439-b694033c9507/go.mod h1:vVAO+8dSm/sikL66Qx/+CEBVVzzDIdYru+VLcrIm+tA= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp07WQmhA= gitlab.com/elixxir/crypto v0.0.7-0.20210614155844-c1e9c23a6ba7 h1:UBq4/xMUWkYmEzUN2F7nLw5qQeiNKoLaoX3vZ/flz1c= @@ -262,8 +262,9 @@ gitlab.com/elixxir/primitives v0.0.1/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2Y gitlab.com/elixxir/primitives v0.0.3-0.20210614155726-ebcf2d47a527 h1:kBNAGFy5Ylz7F0K3DmyzuHLf1npBg7a3t4qKvfqPL3Y= gitlab.com/elixxir/primitives v0.0.3-0.20210614155726-ebcf2d47a527/go.mod h1:nSmBXcw4hkBLFdhu+araAPvf9szCDQF1fpRZ9/BgBec= gitlab.com/xx_network/comms v0.0.0-20200805174823-841427dd5023/go.mod h1:owEcxTRl7gsoM8c3RQ5KAm5GstxrJp5tn+6JfQ4z5Hw= -gitlab.com/xx_network/comms v0.0.4-0.20210614155654-191473de2702 h1:ydi8FaAjFGfxMcvmIGlvnng491K2uEl3ymALC2Hh8Vw= -gitlab.com/xx_network/comms v0.0.4-0.20210614155654-191473de2702/go.mod h1:ehwxZxcAQHkJjP5BNkwPNK8/o6avUn0j0iDDiu+nMFc= +gitlab.com/xx_network/comms v0.0.4-0.20210617183321-d5f4fd71033c/go.mod h1:ehwxZxcAQHkJjP5BNkwPNK8/o6avUn0j0iDDiu+nMFc= +gitlab.com/xx_network/comms v0.0.4-0.20210622161535-4f3d927d4c8c h1:/vUWvCEGL9dI73Cv0eRIxgYnicSCwXYduV6PJVcdsus= +gitlab.com/xx_network/comms v0.0.4-0.20210622161535-4f3d927d4c8c/go.mod h1:ehwxZxcAQHkJjP5BNkwPNK8/o6avUn0j0iDDiu+nMFc= gitlab.com/xx_network/crypto v0.0.3/go.mod h1:DF2HYvvCw9wkBybXcXAgQMzX+MiGbFPjwt3t17VRqRE= gitlab.com/xx_network/crypto v0.0.4/go.mod h1:+lcQEy+Th4eswFgQDwT0EXKp4AXrlubxalwQFH5O0Mk= gitlab.com/xx_network/crypto v0.0.5-0.20210614155554-8c333814205b h1:X2Hhg9/IYowxMdI6TTnWj6WW3pnO2vMB/7f4mnu6Muw= @@ -271,8 +272,9 @@ gitlab.com/xx_network/crypto v0.0.5-0.20210614155554-8c333814205b/go.mod h1:wiaQ gitlab.com/xx_network/primitives v0.0.0-20200803231956-9b192c57ea7c/go.mod h1:wtdCMr7DPePz9qwctNoAUzZtbOSHSedcK++3Df3psjA= gitlab.com/xx_network/primitives v0.0.0-20200804183002-f99f7a7284da/go.mod h1:OK9xevzWCaPO7b1wiluVJGk7R5ZsuC7pHY5hteZFQug= gitlab.com/xx_network/primitives v0.0.2/go.mod h1:cs0QlFpdMDI6lAo61lDRH2JZz+3aVkHy+QogOB6F/qc= -gitlab.com/xx_network/primitives v0.0.4-0.20210608160426-670aab2d82cf h1:4snRmLHx/mgbdMtuXT2ITxyCElgvlOd7bJA1QSx0aI4= gitlab.com/xx_network/primitives v0.0.4-0.20210608160426-670aab2d82cf/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= +gitlab.com/xx_network/primitives v0.0.4-0.20210617180018-6472489fd418 h1:F52R0wvFobjkmB8YaPNHZIu0VYqwjesMBCb9T14ygW8= +gitlab.com/xx_network/primitives v0.0.4-0.20210617180018-6472489fd418/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= gitlab.com/xx_network/ring v0.0.3-0.20210527191221-ce3f170aabd5 h1:FY+4Rh1Q2rgLyv10aKJjhWApuKRCR/054XhreudfAvw= gitlab.com/xx_network/ring v0.0.3-0.20210527191221-ce3f170aabd5/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= diff --git a/interfaces/healthTracker.go b/interfaces/healthTracker.go index 39441984ee0088b9e82e33e7b7a11ab689288f00..0d746d50f73bc1215201c413e5d6d83e54bbbb55 100644 --- a/interfaces/healthTracker.go +++ b/interfaces/healthTracker.go @@ -8,8 +8,10 @@ package interfaces type HealthTracker interface { - AddChannel(chan bool) - AddFunc(f func(bool)) + AddChannel(chan bool) uint64 + RemoveChannel(uint64) + AddFunc(f func(bool)) uint64 + RemoveFunc(uint64) IsHealthy() bool WasHealthy() bool } diff --git a/network/health/tracker.go b/network/health/tracker.go index 1fc5e8cfb7285916192e34918e5fa9068b06a928..1e6dfdd97a0087d304f942ae2c6c3c0edf8f1991 100644 --- a/network/health/tracker.go +++ b/network/health/tracker.go @@ -5,7 +5,8 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -// Contains functionality related to the event model driven network health tracker +// Contains functionality related to the event model driven network health +// tracker. package health @@ -23,70 +24,108 @@ type Tracker struct { heartbeat chan network.Heartbeat - channels []chan bool - funcs []func(isHealthy bool) + channels map[uint64]chan bool + funcs map[uint64]func(isHealthy bool) + channelsID uint64 + funcsID uint64 running bool // Determines the current health status isHealthy bool - // Denotes the past health status - // wasHealthy is true if isHealthy has ever been true + + // Denotes that the past health status wasHealthy is true if isHealthy has + // ever been true wasHealthy bool mux sync.RWMutex } -// Creates a single HealthTracker thread, starts it, and returns a tracker and a stoppable +// Init creates a single HealthTracker thread, starts it, and returns a tracker +// and a stoppable. func Init(instance *network.Instance, timeout time.Duration) *Tracker { - tracker := newTracker(timeout) instance.SetNetworkHealthChan(tracker.heartbeat) return tracker } -// Builds and returns a new Tracker object given a Context +// newTracker builds and returns a new Tracker object given a Context. func newTracker(timeout time.Duration) *Tracker { return &Tracker{ timeout: timeout, - channels: make([]chan bool, 0), + channels: map[uint64]chan bool{}, + funcs: map[uint64]func(isHealthy bool){}, heartbeat: make(chan network.Heartbeat, 100), isHealthy: false, running: false, } } -// Add a channel to the list of Tracker channels -// such that each channel can be notified of network changes -func (t *Tracker) AddChannel(c chan bool) { +// AddChannel adds a channel to the list of Tracker channels such that each +// channel can be notified of network changes. Returns a unique ID for the +// channel. +func (t *Tracker) AddChannel(c chan bool) uint64 { + var currentID uint64 + t.mux.Lock() - t.channels = append(t.channels, c) + t.channels[t.channelsID] = c + currentID = t.channelsID + t.channelsID++ t.mux.Unlock() + select { case c <- t.IsHealthy(): default: } + + return currentID } -// Add a function to the list of Tracker function -// such that each function can be run after network changes -func (t *Tracker) AddFunc(f func(isHealthy bool)) { +// RemoveChannel removes the channel with the given ID from the list of Tracker +// channels so that it will not longer be notified of network changes. +func (t *Tracker) RemoveChannel(chanID uint64) { t.mux.Lock() - t.funcs = append(t.funcs, f) + delete(t.channels, chanID) t.mux.Unlock() +} + +// AddFunc adds a function to the list of Tracker functions such that each +// function can be run after network changes. Returns a unique ID for the +// function. +func (t *Tracker) AddFunc(f func(isHealthy bool)) uint64 { + var currentID uint64 + + t.mux.Lock() + t.funcs[t.funcsID] = f + currentID = t.funcsID + t.funcsID++ + t.mux.Unlock() + go f(t.IsHealthy()) + + return currentID +} + +// RemoveFunc removes the function with the given ID from the list of Tracker +// functions so that it will not longer be run. +func (t *Tracker) RemoveFunc(chanID uint64) { + t.mux.Lock() + delete(t.channels, chanID) + t.mux.Unlock() } func (t *Tracker) IsHealthy() bool { t.mux.RLock() defer t.mux.RUnlock() + return t.isHealthy } -// Returns true if isHealthy has ever been true +// WasHealthy returns true if isHealthy has ever been true. func (t *Tracker) WasHealthy() bool { t.mux.RLock() defer t.mux.RUnlock() + return t.wasHealthy } @@ -94,10 +133,11 @@ func (t *Tracker) setHealth(h bool) { t.mux.Lock() // Only set wasHealthy to true if either // wasHealthy is true or - // wasHealthy false but h value is true + // wasHealthy is false but h value is true t.wasHealthy = t.wasHealthy || h t.isHealthy = h t.mux.Unlock() + t.transmit(h) } @@ -119,7 +159,8 @@ func (t *Tracker) Start() (stoppable.Stoppable, error) { return stop, nil } -// Long-running thread used to monitor and report on network health +// start starts a long-running thread used to monitor and report on network +// health. func (t *Tracker) start(stop *stoppable.Single) { timer := time.NewTimer(t.timeout) @@ -131,8 +172,10 @@ func (t *Tracker) start(stop *stoppable.Single) { t.isHealthy = false t.running = false t.mux.Unlock() + t.transmit(false) stop.ToStopped() + return case heartbeat = <-t.heartbeat: if healthy(heartbeat) { diff --git a/network/health/tracker_test.go b/network/health/tracker_test.go index 4a10843c36ef23bcd3dc8cfbb5699e71a9643e78..a2e20651adaa06781f4d685cb5502cd5b56faae0 100644 --- a/network/health/tracker_test.go +++ b/network/health/tracker_test.go @@ -9,12 +9,11 @@ package health import ( "gitlab.com/elixxir/comms/network" - // "gitlab.com/elixxir/comms/network" "testing" "time" ) -// Happy path smoke test +// Happy path smoke test. func TestNewTracker(t *testing.T) { // Initialize required variables timeout := 250 * time.Millisecond @@ -49,8 +48,7 @@ func TestNewTracker(t *testing.T) { // Begin the health tracker _, err := tracker.Start() if err != nil { - t.Errorf("Unable to start tracker: %+v", err) - return + t.Fatalf("Unable to start tracker: %+v", err) } // Send a positive health heartbeat @@ -68,14 +66,12 @@ func TestNewTracker(t *testing.T) { // Verify the network was marked as healthy if !tracker.IsHealthy() { - t.Errorf("Tracker did not become healthy") - return + t.Fatal("Tracker did not become healthy.") } // Check if the tracker was ever healthy if !tracker.WasHealthy() { - t.Errorf("Tracker did not become healthy") - return + t.Fatal("Tracker did not become healthy.") } // Verify the heartbeat triggered the listening chan/func @@ -89,15 +85,12 @@ func TestNewTracker(t *testing.T) { // Verify the network was marked as NOT healthy if tracker.IsHealthy() { - t.Errorf("Tracker should not report healthy") - return + t.Fatal("Tracker should not report healthy.") } - // Check if the tracker was ever healthy, - // after setting healthy to false + // Check if the tracker was ever healthy, after setting healthy to false if !tracker.WasHealthy() { - t.Errorf("Tracker was healthy previously but not reported healthy") - return + t.Fatal("Tracker was healthy previously but not reported healthy.") } // Verify the timeout triggered the listening chan/func diff --git a/network/manager.go b/network/manager.go index fcb20f2d77f94a57c4c2f01e581b03fca06b6178..d480f4afe6c2b23da229c4c20b0b752c3e4d936f 100644 --- a/network/manager.go +++ b/network/manager.go @@ -28,6 +28,8 @@ import ( "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/xx_network/primitives/ndf" + "math" + "time" ) // Manager implements the NetworkManager interface inside context. It @@ -95,6 +97,8 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, // Set up gateway.Sender poolParams := gateway.DefaultPoolParams() + // Client will not send KeepAlive packets + poolParams.HostParams.KaClientOpts.Time = time.Duration(math.MaxInt64) m.sender, err = gateway.NewSender(poolParams, rng, ndf, comms, session, m.NodeRegistration) if err != nil { diff --git a/network/message/garbled.go b/network/message/garbled.go index 3a828b5eed2584ff323df7cdbb6016726c9dfa18..e8fb10cbe49845f7370639877c4dcdba45342a39 100644 --- a/network/message/garbled.go +++ b/network/message/garbled.go @@ -12,7 +12,7 @@ import ( "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/primitives/format" - "time" + "gitlab.com/xx_network/primitives/netTime" ) // Messages can arrive in the network out of order. When message handling fails @@ -81,7 +81,7 @@ func (m *Manager) handleGarbledMessages() { // unless it is the last attempts and has been in the buffer long // enough, in which case remove it if count == m.param.MaxChecksGarbledMessage && - time.Since(timestamp) > m.param.GarbledMessageWait { + netTime.Since(timestamp) > m.param.GarbledMessageWait { garbledMsgs.Remove(grbldMsg) } else { failedMsgs = append(failedMsgs, grbldMsg) diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index d011a533cb05aad5ecdf801267cb0db7a5f6055d..d70cd6221a32fd84d40e9b4afba0d60f22691456 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -57,7 +57,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, "(msgDigest: %s)", recipient, msg.Digest()) for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries++ { - elapsed := netTime.Now().Sub(timeStart) + elapsed := netTime.Since(timeStart) if elapsed > cmixParams.Timeout { jww.INFO.Printf("No rounds to send to %s (msgDigest: %s) "+ @@ -73,11 +73,15 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, remainingTime := cmixParams.Timeout - elapsed //find the best round to send to, excluding attempted rounds - bestRound, _ := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer) + bestRound, err := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer) + if err!=nil{ + jww.WARN.Printf("Failed to GetUpcomingRealtime (msgDigest: %s): %+v", msg.Digest(), err) + } if bestRound == nil { continue } + //add the round on to the list of attempted so it is not tried again attempted.Insert(bestRound) diff --git a/network/message/sendCmixUtils.go b/network/message/sendCmixUtils.go index c1d8e295885b1dadaa018ea881d07b1896625d87..d3f2c74a01725e191403748fb51608ccf36b7d8c 100644 --- a/network/message/sendCmixUtils.go +++ b/network/message/sendCmixUtils.go @@ -32,8 +32,8 @@ type sendCmixCommsInterface interface { SendPutManyMessages(host *connect.Host, messages *pb.GatewaySlots) (*pb.GatewaySlotResponse, error) } -// 2.5 seconds -const sendTimeBuffer = 2500 * time.Millisecond +// how much in the future a round needs to be to send to it +const sendTimeBuffer = 1000 * time.Millisecond const unrecoverableError = "failed with an unrecoverable error" // handlePutMessageError handles errors received from a PutMessage or a diff --git a/network/message/sendManyCmix.go b/network/message/sendManyCmix.go index e9d598182bb805fc3220804389af43456eccf022..d9e098715a5032c90ed2bef7438f15ce2714f890 100644 --- a/network/message/sendManyCmix.go +++ b/network/message/sendManyCmix.go @@ -71,7 +71,7 @@ func sendManyCmixHelper(sender *gateway.Sender, msgs map[id.ID]format.Message, "(msgDigest: %s)", recipientString, msgDigests) for numRoundTries := uint(0); numRoundTries < param.RoundTries; numRoundTries++ { - elapsed := netTime.Now().Sub(timeStart) + elapsed := netTime.Since(timeStart) if elapsed > param.Timeout { jww.INFO.Printf("No rounds to send to %s (msgDigest: %s) were found "+ diff --git a/permissioning/permissioning.go b/permissioning/permissioning.go index ebe00b3c3d41c3c9fcaf844e65d87fc6033dd829..87c3b6fff40c147c59a4534f7e52f588a9ad45fd 100644 --- a/permissioning/permissioning.go +++ b/permissioning/permissioning.go @@ -13,6 +13,8 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" + "math" + "time" ) type Permissioning struct { @@ -31,7 +33,8 @@ func Init(comms *client.Comms, def *ndf.NetworkDefinition) (*Permissioning, erro //add the permissioning host to comms hParam := connect.GetDefaultHostParams() hParam.AuthEnabled = false - + // Client will not send KeepAlive packets + hParam.KaClientOpts.Time = time.Duration(math.MaxInt64) perm.host, err = comms.AddHost(&id.Permissioning, def.Registration.Address, []byte(def.Registration.TlsCertificate), hParam) diff --git a/single/transmission.go b/single/transmission.go index bbe41371d7252b29bf78b87bed37088af95c733c..c896eb7e954e55fddc7b99513635461dc755de2f 100644 --- a/single/transmission.go +++ b/single/transmission.go @@ -138,7 +138,7 @@ func (m *Manager) transmitSingleUse(partner contact2.Contact, payload []byte, } // Update the timeout for the elapsed time - roundEventTimeout := timeout - netTime.Now().Sub(timeStart) - time.Millisecond + roundEventTimeout := timeout - netTime.Since(timeStart) - time.Millisecond // Check message delivery sendResults := make(chan ds.EventReturn, 1) diff --git a/ud/manager.go b/ud/manager.go index f3b7afc643c27af41d2d1591de2ff89755c6104d..1b80b1ad22bcb07fb94a5944a78000c2ef56f45d 100644 --- a/ud/manager.go +++ b/ud/manager.go @@ -15,6 +15,7 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/primitives/id" + "math" "time" ) @@ -50,9 +51,9 @@ type Manager struct { // updated NDF is available and will error if one is not. func NewManager(client *api.Client, single *single.Manager) (*Manager, error) { jww.INFO.Println("ud.NewManager()") - if !client.GetHealth().IsHealthy() { - return nil, errors.New("cannot start UD Manager when network was " + - "never healthy.") + if client.NetworkFollowerStatus() != api.Running { + return nil, errors.New("cannot start UD Manager when network follower is not " + + "running.") } m := &Manager{ @@ -89,6 +90,8 @@ func NewManager(client *api.Client, single *single.Manager) (*Manager, error) { // Create the user discovery host object hp := connect.GetDefaultHostParams() + // Client will not send KeepAlive packets + hp.KaClientOpts.Time = time.Duration(math.MaxInt64) hp.MaxRetries = 3 hp.SendTimeout = 3 * time.Second m.host, err = m.comms.AddHost(&id.UDB, def.UDB.Address, []byte(def.UDB.Cert), hp)