Skip to content
Snippets Groups Projects
Commit 1ebef8ab authored by Jono Wenger's avatar Jono Wenger
Browse files

Add dynamic timeout to sendToPreferredFunc

parent 932486f3
No related branches found
No related tags found
3 merge requests!117Release,!107XX-3617 / Add timeout to send to preferred,!105XX-3617 / faster file transfer
...@@ -78,11 +78,14 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error ...@@ -78,11 +78,14 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error
return nil, errors.Errorf("Unable to send to any proxies") return nil, errors.Errorf("Unable to send to any proxies")
} }
// sendToPreferredFunc is the send function passed into Sender.SendToPreferred.
type sendToPreferredFunc func(host *connect.Host, target *id.ID,
timeout time.Duration) (interface{}, error)
// SendToPreferred Call given sendFunc to any Host in the HostPool, attempting // SendToPreferred Call given sendFunc to any Host in the HostPool, attempting
// with up to numProxies destinations. Returns an error if the timeout is // with up to numProxies destinations. Returns an error if the timeout is
// reached. // reached.
func (s *Sender) SendToPreferred(targets []*id.ID, func (s *Sender) SendToPreferred(targets []*id.ID, sendFunc sendToPreferredFunc,
sendFunc func(host *connect.Host, target *id.ID) (interface{}, error),
stop *stoppable.Single, timeout time.Duration) (interface{}, error) { stop *stoppable.Single, timeout time.Duration) (interface{}, error) {
startTime := netTime.Now() startTime := netTime.Now()
...@@ -98,7 +101,8 @@ func (s *Sender) SendToPreferred(targets []*id.ID, ...@@ -98,7 +101,8 @@ func (s *Sender) SendToPreferred(targets []*id.ID,
"sending to targets in HostPool timed out after %s", timeout) "sending to targets in HostPool timed out after %s", timeout)
} }
result, err := sendFunc(targetHosts[i], targets[i]) remainingTimeout := timeout - netTime.Since(startTime)
result, err := sendFunc(targetHosts[i], targets[i], remainingTimeout)
if stop != nil && !stop.IsRunning() { if stop != nil && !stop.IsRunning() {
return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred") return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred")
} else if err == nil { } else if err == nil {
...@@ -168,7 +172,8 @@ func (s *Sender) SendToPreferred(targets []*id.ID, ...@@ -168,7 +172,8 @@ func (s *Sender) SendToPreferred(targets []*id.ID,
continue continue
} }
result, err := sendFunc(proxy, target) remainingTimeout := timeout - netTime.Since(startTime)
result, err := sendFunc(proxy, target, remainingTimeout)
if stop != nil && !stop.IsRunning() { if stop != nil && !stop.IsRunning() {
return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred") return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred")
} else if err == nil { } else if err == nil {
......
...@@ -9,9 +9,11 @@ package gateway ...@@ -9,9 +9,11 @@ package gateway
import ( import (
"fmt" "fmt"
"github.com/pkg/errors"
"gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/ndf"
"time"
) )
// Mock structure adhering to HostManager to be used for happy path // Mock structure adhering to HostManager to be used for happy path
...@@ -141,16 +143,16 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition { ...@@ -141,16 +143,16 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition {
const happyPathReturn = "happyPathReturn" const happyPathReturn = "happyPathReturn"
func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, error) { func SendToPreferred_HappyPath(*connect.Host, *id.ID, time.Duration) (interface{}, error) {
return happyPathReturn, nil return happyPathReturn, nil
} }
func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, error) { func SendToPreferred_KnownError(*connect.Host, *id.ID, time.Duration) (interface{}, error) {
return nil, fmt.Errorf(errorsList[0]) return nil, errors.Errorf(errorsList[0])
} }
func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, error) { func SendToPreferred_UnknownError(*connect.Host, *id.ID, time.Duration) (interface{}, error) {
return nil, fmt.Errorf("Unexpected error: Oopsie") return nil, errors.Errorf("Unexpected error: Oopsie")
} }
func SendToAny_HappyPath(host *connect.Host) (interface{}, error) { func SendToAny_HappyPath(host *connect.Host) (interface{}, error) {
......
...@@ -160,12 +160,17 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, ...@@ -160,12 +160,17 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message,
encMsg.Digest(), firstGateway.String()) encMsg.Digest(), firstGateway.String())
// Send the payload // Send the payload
sendFunc := func(host *connect.Host, target *id.ID) (interface{}, error) { sendFunc := func(host *connect.Host, target *id.ID,
timeout time.Duration) (interface{}, error) {
wrappedMsg.Target = target.Marshal() wrappedMsg.Target = target.Marshal()
timeout := calculateSendTimeout(bestRound, maxTimeout) // Use the smaller of the two timeout durations
result, err := comms.SendPutMessage(host, wrappedMsg, calculatedTimeout := calculateSendTimeout(bestRound, maxTimeout)
timeout) if calculatedTimeout < timeout {
timeout = calculatedTimeout
}
result, err := comms.SendPutMessage(host, wrappedMsg, timeout)
if err != nil { if err != nil {
// fixme: should we provide as a slice the whole topology? // fixme: should we provide as a slice the whole topology?
err := handlePutMessageError(firstGateway, instance, session, nodeRegistration, recipient.String(), bestRound, err) err := handlePutMessageError(firstGateway, instance, session, nodeRegistration, recipient.String(), bestRound, err)
......
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
"strings" "strings"
"time"
) )
// SendManyCMIX sends many "raw" cMix message payloads to each of the provided // SendManyCMIX sends many "raw" cMix message payloads to each of the provided
...@@ -165,11 +166,17 @@ func sendManyCmixHelper(sender *gateway.Sender, ...@@ -165,11 +166,17 @@ func sendManyCmixHelper(sender *gateway.Sender,
} }
// Send the payload // Send the payload
sendFunc := func(host *connect.Host, target *id.ID) (interface{}, error) { sendFunc := func(host *connect.Host, target *id.ID,
timeout time.Duration) (interface{}, error) {
// Use the smaller of the two timeout durations
calculatedTimeout := calculateSendTimeout(bestRound, maxTimeout)
if calculatedTimeout < timeout {
timeout = calculatedTimeout
}
wrappedMessage.Target = target.Marshal() wrappedMessage.Target = target.Marshal()
timeout := calculateSendTimeout(bestRound, maxTimeout) result, err := comms.SendPutManyMessages(
result, err := comms.SendPutManyMessages(host, host, wrappedMessage, timeout)
wrappedMessage, timeout)
if err != nil { if err != nil {
err := handlePutMessageError(firstGateway, instance, err := handlePutMessageError(firstGateway, instance,
session, nodeRegistration, recipientString, bestRound, err) session, nodeRegistration, recipientString, bestRound, err)
......
...@@ -148,7 +148,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, ...@@ -148,7 +148,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round,
stop *stoppable.Single) (message.Bundle, error) { stop *stoppable.Single) (message.Bundle, error) {
start := time.Now() start := time.Now()
// Send to the gateways using backup proxies // Send to the gateways using backup proxies
result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID) (interface{}, error) { result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID, _ time.Duration) (interface{}, error) {
jww.DEBUG.Printf("Trying to get messages for round %v for ephemeralID %d (%v) "+ jww.DEBUG.Printf("Trying to get messages for round %v for ephemeralID %d (%v) "+
"via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), host.GetId()) "via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), host.GetId())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment