diff --git a/interfaces/params/CMIX.go b/interfaces/params/CMIX.go index 83a287fc460d5ce0254496d98610379fd2f8bee8..5e772d0360a94350ff6b99b6db33d22934e5410d 100644 --- a/interfaces/params/CMIX.go +++ b/interfaces/params/CMIX.go @@ -19,6 +19,11 @@ type CMIX struct { Timeout time.Duration RetryDelay time.Duration ExcludedRounds excludedRounds.ExcludedRounds + + // Duration to wait before sending on a round times out and a new round is + // tried + SendTimeout time.Duration + // an alternate identity preimage to use on send. If not set, the default // for the sending identity will be used IdentityPreimage []byte @@ -26,9 +31,10 @@ type CMIX struct { func GetDefaultCMIX() CMIX { return CMIX{ - RoundTries: 10, - Timeout: 25 * time.Second, - RetryDelay: 1 * time.Second, + RoundTries: 10, + Timeout: 25 * time.Second, + RetryDelay: 1 * time.Second, + SendTimeout: 3 * time.Second, } } diff --git a/interfaces/params/rounds.go b/interfaces/params/rounds.go index 3e39ad47827e5f5f6f9dc526fcfffa200f5cf5a8..40bd41bfbd80a6cef2ecf30d8a1c645782f7b305 100644 --- a/interfaces/params/rounds.go +++ b/interfaces/params/rounds.go @@ -39,6 +39,10 @@ type Rounds struct { // Toggles if message pickup retrying mechanism if forced // by intentionally not looking up messages ForceMessagePickupRetry bool + + // Duration to wait before sending on a round times out and a new round is + // tried + SendTimeout time.Duration } func GetDefaultRounds() Rounds { @@ -53,5 +57,6 @@ func GetDefaultRounds() Rounds { MaxHistoricalRoundsRetries: 3, UncheckRoundPeriod: 20 * time.Second, ForceMessagePickupRetry: false, + SendTimeout: 1 * time.Second, } } diff --git a/network/gateway/sender.go b/network/gateway/sender.go index c3c7bff8259feb2f477d658791c39c42e079064c..311bcf3dd86ac3df8ac6b0a4afe012e0656ad4e3 100644 --- a/network/gateway/sender.go +++ b/network/gateway/sender.go @@ -18,7 +18,9 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" + "gitlab.com/xx_network/primitives/netTime" "strings" + "time" ) // Sender Object used for sending that wraps the HostPool for providing destinations @@ -76,16 +78,26 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error return nil, errors.Errorf("Unable to send to any proxies") } -// SendToPreferred Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations +// SendToPreferred Call given sendFunc to any Host in the HostPool, attempting +// with up to numProxies destinations. Returns an error if the timeout is +// reached. func (s *Sender) SendToPreferred(targets []*id.ID, sendFunc func(host *connect.Host, target *id.ID) (interface{}, error), - stop *stoppable.Single) (interface{}, error) { + stop *stoppable.Single, timeout time.Duration) (interface{}, error) { + + startTime := netTime.Now() // Get the hosts and shuffle randomly targetHosts := s.getPreferred(targets) // Attempt to send directly to targets if they are in the HostPool for i := range targetHosts { + // Return an error if the timeout duration is reached + if netTime.Since(startTime) > timeout { + return nil, errors.Errorf( + "sending to targets in HostPool timed out after %s", timeout) + } + result, err := sendFunc(targetHosts[i], targets[i]) if stop != nil && !stop.IsRunning() { return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred") @@ -135,6 +147,12 @@ func (s *Sender) SendToPreferred(targets []*id.ID, for proxyIdx := uint32(0); proxyIdx < s.poolParams.ProxyAttempts; proxyIdx++ { for targetIdx := range proxies { + // Return an error if the timeout duration is reached + if netTime.Since(startTime) > timeout { + return nil, errors.Errorf("iterating over target's procies "+ + "timed out after %s", timeout) + } + target := targets[targetIdx] targetProxies := proxies[targetIdx] if !(int(proxyIdx) < len(targetProxies)) { diff --git a/network/gateway/sender_test.go b/network/gateway/sender_test.go index d8dbf16227e8724ef509589bb7133efea888cc26..771df617287adcc15cc095ddcab4a5112206f162 100644 --- a/network/gateway/sender_test.go +++ b/network/gateway/sender_test.go @@ -16,6 +16,7 @@ import ( "gitlab.com/xx_network/primitives/id" "reflect" "testing" + "time" ) // Unit test @@ -139,7 +140,8 @@ func TestSender_SendToPreferred(t *testing.T) { preferredHost := sender.hostList[preferredIndex] // Happy path - result, err := sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, SendToPreferred_HappyPath, nil) + result, err := sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, + SendToPreferred_HappyPath, nil, 250*time.Millisecond) if err != nil { t.Errorf("Should not error in SendToPreferred happy path: %v", err) } @@ -151,7 +153,8 @@ func TestSender_SendToPreferred(t *testing.T) { } // Call a send which returns an error which triggers replacement - _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, SendToPreferred_KnownError, nil) + _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, + SendToPreferred_KnownError, nil, 250*time.Millisecond) if err == nil { t.Fatalf("Expected error path did not receive error") } @@ -171,7 +174,8 @@ func TestSender_SendToPreferred(t *testing.T) { preferredHost = sender.hostList[preferredIndex] // Unknown error return will not trigger replacement - _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, SendToPreferred_UnknownError, nil) + _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, + SendToPreferred_UnknownError, nil, 250*time.Millisecond) if err == nil { t.Fatalf("Expected error path did not receive error") } diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index c127168decc0838a25702e488e314ea072ee3f6e..0c8aa453368e545a737817de6bed97ea3daa24af 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -174,7 +174,8 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, } return result, err } - result, err := sender.SendToPreferred([]*id.ID{firstGateway}, sendFunc, stop) + result, err := sender.SendToPreferred( + []*id.ID{firstGateway}, sendFunc, stop, cmixParams.SendTimeout) // Exit if the thread has been stopped if stoppable.CheckErr(err) { diff --git a/network/message/sendManyCmix.go b/network/message/sendManyCmix.go index 69043e60f24504bc1c00c37a2ec97dc807fd4d67..4a5df6d1068a31e3894e05c16412b41d22d48ff0 100644 --- a/network/message/sendManyCmix.go +++ b/network/message/sendManyCmix.go @@ -181,7 +181,7 @@ func sendManyCmixHelper(sender *gateway.Sender, return result, err } result, err := sender.SendToPreferred( - []*id.ID{firstGateway}, sendFunc, stop) + []*id.ID{firstGateway}, sendFunc, stop, param.SendTimeout) // Exit if the thread has been stopped if stoppable.CheckErr(err) { diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index 6f0f5e15a994317e9bbd342cf406198d020fbab9..bcebcf5a486d0c2f641b1db0bc6d3bf3c21dca01 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -163,7 +163,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, msgResp, err := comms.RequestMessages(host, msgReq) if err != nil { - //you need to default to a retryable errors because otherwise we cannot enumerate all errors + // you need to default to a retryable errors because otherwise we cannot enumerate all errors return nil, errors.WithMessage(err, gateway.RetryableError) } @@ -173,7 +173,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, } return msgResp, nil - }, stop) + }, stop, m.params.SendTimeout) jww.INFO.Printf("Received message for round %d, processing...", roundID) // Fail the round if an error occurs so it can be tried again later if err != nil { @@ -202,7 +202,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, jww.INFO.Printf("Received %d messages in Round %v for %d (%s) in %s", len(msgs), roundID, identity.EphId.Int64(), identity.Source, time.Now().Sub(start)) - //build the bundle of messages to send to the message processor + // build the bundle of messages to send to the message processor bundle := message.Bundle{ Round: roundID, Messages: make([]format.Message, len(msgs)),