From d4d6d4659bb5799894cb603f8c9af8be14e80684 Mon Sep 17 00:00:00 2001 From: Jono Wenger <jono@elixxir.io> Date: Wed, 29 Dec 2021 11:49:51 -0800 Subject: [PATCH] Add timeout to SendToPreferred and add parameters for it in params.CMIX and params.Rounds --- interfaces/params/CMIX.go | 12 +++++++++--- interfaces/params/rounds.go | 5 +++++ network/gateway/sender.go | 22 ++++++++++++++++++++-- network/gateway/sender_test.go | 10 +++++++--- network/message/sendCmix.go | 3 ++- network/message/sendManyCmix.go | 2 +- network/rounds/retrieve.go | 6 +++--- 7 files changed, 47 insertions(+), 13 deletions(-) diff --git a/interfaces/params/CMIX.go b/interfaces/params/CMIX.go index 83a287fc4..5e772d036 100644 --- a/interfaces/params/CMIX.go +++ b/interfaces/params/CMIX.go @@ -19,6 +19,11 @@ type CMIX struct { Timeout time.Duration RetryDelay time.Duration ExcludedRounds excludedRounds.ExcludedRounds + + // Duration to wait before sending on a round times out and a new round is + // tried + SendTimeout time.Duration + // an alternate identity preimage to use on send. If not set, the default // for the sending identity will be used IdentityPreimage []byte @@ -26,9 +31,10 @@ type CMIX struct { func GetDefaultCMIX() CMIX { return CMIX{ - RoundTries: 10, - Timeout: 25 * time.Second, - RetryDelay: 1 * time.Second, + RoundTries: 10, + Timeout: 25 * time.Second, + RetryDelay: 1 * time.Second, + SendTimeout: 3 * time.Second, } } diff --git a/interfaces/params/rounds.go b/interfaces/params/rounds.go index 3e39ad478..40bd41bfb 100644 --- a/interfaces/params/rounds.go +++ b/interfaces/params/rounds.go @@ -39,6 +39,10 @@ type Rounds struct { // Toggles if message pickup retrying mechanism if forced // by intentionally not looking up messages ForceMessagePickupRetry bool + + // Duration to wait before sending on a round times out and a new round is + // tried + SendTimeout time.Duration } func GetDefaultRounds() Rounds { @@ -53,5 +57,6 @@ func GetDefaultRounds() Rounds { MaxHistoricalRoundsRetries: 3, UncheckRoundPeriod: 20 * time.Second, ForceMessagePickupRetry: false, + SendTimeout: 1 * time.Second, } } diff --git a/network/gateway/sender.go b/network/gateway/sender.go index c3c7bff82..311bcf3dd 100644 --- a/network/gateway/sender.go +++ b/network/gateway/sender.go @@ -18,7 +18,9 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" + "gitlab.com/xx_network/primitives/netTime" "strings" + "time" ) // Sender Object used for sending that wraps the HostPool for providing destinations @@ -76,16 +78,26 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error return nil, errors.Errorf("Unable to send to any proxies") } -// SendToPreferred Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations +// SendToPreferred Call given sendFunc to any Host in the HostPool, attempting +// with up to numProxies destinations. Returns an error if the timeout is +// reached. func (s *Sender) SendToPreferred(targets []*id.ID, sendFunc func(host *connect.Host, target *id.ID) (interface{}, error), - stop *stoppable.Single) (interface{}, error) { + stop *stoppable.Single, timeout time.Duration) (interface{}, error) { + + startTime := netTime.Now() // Get the hosts and shuffle randomly targetHosts := s.getPreferred(targets) // Attempt to send directly to targets if they are in the HostPool for i := range targetHosts { + // Return an error if the timeout duration is reached + if netTime.Since(startTime) > timeout { + return nil, errors.Errorf( + "sending to targets in HostPool timed out after %s", timeout) + } + result, err := sendFunc(targetHosts[i], targets[i]) if stop != nil && !stop.IsRunning() { return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred") @@ -135,6 +147,12 @@ func (s *Sender) SendToPreferred(targets []*id.ID, for proxyIdx := uint32(0); proxyIdx < s.poolParams.ProxyAttempts; proxyIdx++ { for targetIdx := range proxies { + // Return an error if the timeout duration is reached + if netTime.Since(startTime) > timeout { + return nil, errors.Errorf("iterating over target's procies "+ + "timed out after %s", timeout) + } + target := targets[targetIdx] targetProxies := proxies[targetIdx] if !(int(proxyIdx) < len(targetProxies)) { diff --git a/network/gateway/sender_test.go b/network/gateway/sender_test.go index d8dbf1622..771df6172 100644 --- a/network/gateway/sender_test.go +++ b/network/gateway/sender_test.go @@ -16,6 +16,7 @@ import ( "gitlab.com/xx_network/primitives/id" "reflect" "testing" + "time" ) // Unit test @@ -139,7 +140,8 @@ func TestSender_SendToPreferred(t *testing.T) { preferredHost := sender.hostList[preferredIndex] // Happy path - result, err := sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, SendToPreferred_HappyPath, nil) + result, err := sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, + SendToPreferred_HappyPath, nil, 250*time.Millisecond) if err != nil { t.Errorf("Should not error in SendToPreferred happy path: %v", err) } @@ -151,7 +153,8 @@ func TestSender_SendToPreferred(t *testing.T) { } // Call a send which returns an error which triggers replacement - _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, SendToPreferred_KnownError, nil) + _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, + SendToPreferred_KnownError, nil, 250*time.Millisecond) if err == nil { t.Fatalf("Expected error path did not receive error") } @@ -171,7 +174,8 @@ func TestSender_SendToPreferred(t *testing.T) { preferredHost = sender.hostList[preferredIndex] // Unknown error return will not trigger replacement - _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, SendToPreferred_UnknownError, nil) + _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, + SendToPreferred_UnknownError, nil, 250*time.Millisecond) if err == nil { t.Fatalf("Expected error path did not receive error") } diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index c127168de..0c8aa4533 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -174,7 +174,8 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, } return result, err } - result, err := sender.SendToPreferred([]*id.ID{firstGateway}, sendFunc, stop) + result, err := sender.SendToPreferred( + []*id.ID{firstGateway}, sendFunc, stop, cmixParams.SendTimeout) // Exit if the thread has been stopped if stoppable.CheckErr(err) { diff --git a/network/message/sendManyCmix.go b/network/message/sendManyCmix.go index 69043e60f..4a5df6d10 100644 --- a/network/message/sendManyCmix.go +++ b/network/message/sendManyCmix.go @@ -181,7 +181,7 @@ func sendManyCmixHelper(sender *gateway.Sender, return result, err } result, err := sender.SendToPreferred( - []*id.ID{firstGateway}, sendFunc, stop) + []*id.ID{firstGateway}, sendFunc, stop, param.SendTimeout) // Exit if the thread has been stopped if stoppable.CheckErr(err) { diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index 6f0f5e15a..bcebcf5a4 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -163,7 +163,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, msgResp, err := comms.RequestMessages(host, msgReq) if err != nil { - //you need to default to a retryable errors because otherwise we cannot enumerate all errors + // you need to default to a retryable errors because otherwise we cannot enumerate all errors return nil, errors.WithMessage(err, gateway.RetryableError) } @@ -173,7 +173,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, } return msgResp, nil - }, stop) + }, stop, m.params.SendTimeout) jww.INFO.Printf("Received message for round %d, processing...", roundID) // Fail the round if an error occurs so it can be tried again later if err != nil { @@ -202,7 +202,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, jww.INFO.Printf("Received %d messages in Round %v for %d (%s) in %s", len(msgs), roundID, identity.EphId.Int64(), identity.Source, time.Now().Sub(start)) - //build the bundle of messages to send to the message processor + // build the bundle of messages to send to the message processor bundle := message.Bundle{ Round: roundID, Messages: make([]format.Message, len(msgs)), -- GitLab