Skip to content
Snippets Groups Projects
Commit 3e915471 authored by Jono Wenger's avatar Jono Wenger
Browse files

Merge branch 'XX-3617/SendToPreferredTimeout' of git.xx.network:elixxir/client...

Merge branch 'XX-3617/SendToPreferredTimeout' of git.xx.network:elixxir/client into XX-3617/fasterFileTransfer
parents c3681daf 1ebef8ab
No related branches found
No related tags found
3 merge requests!117Release,!107XX-3617 / Add timeout to send to preferred,!105XX-3617 / faster file transfer
...@@ -78,11 +78,14 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error ...@@ -78,11 +78,14 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error
return nil, errors.Errorf("Unable to send to any proxies") return nil, errors.Errorf("Unable to send to any proxies")
} }
// sendToPreferredFunc is the send function passed into Sender.SendToPreferred.
type sendToPreferredFunc func(host *connect.Host, target *id.ID,
timeout time.Duration) (interface{}, error)
// SendToPreferred Call given sendFunc to any Host in the HostPool, attempting // SendToPreferred Call given sendFunc to any Host in the HostPool, attempting
// with up to numProxies destinations. Returns an error if the timeout is // with up to numProxies destinations. Returns an error if the timeout is
// reached. // reached.
func (s *Sender) SendToPreferred(targets []*id.ID, func (s *Sender) SendToPreferred(targets []*id.ID, sendFunc sendToPreferredFunc,
sendFunc func(host *connect.Host, target *id.ID) (interface{}, error),
stop *stoppable.Single, timeout time.Duration) (interface{}, error) { stop *stoppable.Single, timeout time.Duration) (interface{}, error) {
startTime := netTime.Now() startTime := netTime.Now()
...@@ -98,7 +101,8 @@ func (s *Sender) SendToPreferred(targets []*id.ID, ...@@ -98,7 +101,8 @@ func (s *Sender) SendToPreferred(targets []*id.ID,
"sending to targets in HostPool timed out after %s", timeout) "sending to targets in HostPool timed out after %s", timeout)
} }
result, err := sendFunc(targetHosts[i], targets[i]) remainingTimeout := timeout - netTime.Since(startTime)
result, err := sendFunc(targetHosts[i], targets[i], remainingTimeout)
if stop != nil && !stop.IsRunning() { if stop != nil && !stop.IsRunning() {
return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred") return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred")
} else if err == nil { } else if err == nil {
...@@ -168,7 +172,8 @@ func (s *Sender) SendToPreferred(targets []*id.ID, ...@@ -168,7 +172,8 @@ func (s *Sender) SendToPreferred(targets []*id.ID,
continue continue
} }
result, err := sendFunc(proxy, target) remainingTimeout := timeout - netTime.Since(startTime)
result, err := sendFunc(proxy, target, remainingTimeout)
if stop != nil && !stop.IsRunning() { if stop != nil && !stop.IsRunning() {
return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred") return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred")
} else if err == nil { } else if err == nil {
......
...@@ -9,9 +9,11 @@ package gateway ...@@ -9,9 +9,11 @@ package gateway
import ( import (
"fmt" "fmt"
"github.com/pkg/errors"
"gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/ndf"
"time"
) )
// Mock structure adhering to HostManager to be used for happy path // Mock structure adhering to HostManager to be used for happy path
...@@ -141,16 +143,16 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition { ...@@ -141,16 +143,16 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition {
const happyPathReturn = "happyPathReturn" const happyPathReturn = "happyPathReturn"
func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, error) { func SendToPreferred_HappyPath(*connect.Host, *id.ID, time.Duration) (interface{}, error) {
return happyPathReturn, nil return happyPathReturn, nil
} }
func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, error) { func SendToPreferred_KnownError(*connect.Host, *id.ID, time.Duration) (interface{}, error) {
return nil, fmt.Errorf(errorsList[0]) return nil, errors.Errorf(errorsList[0])
} }
func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, error) { func SendToPreferred_UnknownError(*connect.Host, *id.ID, time.Duration) (interface{}, error) {
return nil, fmt.Errorf("Unexpected error: Oopsie") return nil, errors.Errorf("Unexpected error: Oopsie")
} }
func SendToAny_HappyPath(host *connect.Host) (interface{}, error) { func SendToAny_HappyPath(host *connect.Host) (interface{}, error) {
......
...@@ -160,12 +160,17 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, ...@@ -160,12 +160,17 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message,
encMsg.Digest(), firstGateway.String()) encMsg.Digest(), firstGateway.String())
// Send the payload // Send the payload
sendFunc := func(host *connect.Host, target *id.ID) (interface{}, error) { sendFunc := func(host *connect.Host, target *id.ID,
timeout time.Duration) (interface{}, error) {
wrappedMsg.Target = target.Marshal() wrappedMsg.Target = target.Marshal()
timeout := calculateSendTimeout(bestRound, maxTimeout) // Use the smaller of the two timeout durations
result, err := comms.SendPutMessage(host, wrappedMsg, calculatedTimeout := calculateSendTimeout(bestRound, maxTimeout)
timeout) if calculatedTimeout < timeout {
timeout = calculatedTimeout
}
result, err := comms.SendPutMessage(host, wrappedMsg, timeout)
if err != nil { if err != nil {
// fixme: should we provide as a slice the whole topology? // fixme: should we provide as a slice the whole topology?
err := handlePutMessageError(firstGateway, instance, session, nodeRegistration, recipient.String(), bestRound, err) err := handlePutMessageError(firstGateway, instance, session, nodeRegistration, recipient.String(), bestRound, err)
......
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
"strings" "strings"
"time"
) )
// SendManyCMIX sends many "raw" cMix message payloads to each of the provided // SendManyCMIX sends many "raw" cMix message payloads to each of the provided
...@@ -165,11 +166,17 @@ func sendManyCmixHelper(sender *gateway.Sender, ...@@ -165,11 +166,17 @@ func sendManyCmixHelper(sender *gateway.Sender,
} }
// Send the payload // Send the payload
sendFunc := func(host *connect.Host, target *id.ID) (interface{}, error) { sendFunc := func(host *connect.Host, target *id.ID,
timeout time.Duration) (interface{}, error) {
// Use the smaller of the two timeout durations
calculatedTimeout := calculateSendTimeout(bestRound, maxTimeout)
if calculatedTimeout < timeout {
timeout = calculatedTimeout
}
wrappedMessage.Target = target.Marshal() wrappedMessage.Target = target.Marshal()
timeout := calculateSendTimeout(bestRound, maxTimeout) result, err := comms.SendPutManyMessages(
result, err := comms.SendPutManyMessages(host, host, wrappedMessage, timeout)
wrappedMessage, timeout)
if err != nil { if err != nil {
err := handlePutMessageError(firstGateway, instance, err := handlePutMessageError(firstGateway, instance,
session, nodeRegistration, recipientString, bestRound, err) session, nodeRegistration, recipientString, bestRound, err)
......
...@@ -148,7 +148,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, ...@@ -148,7 +148,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round,
stop *stoppable.Single) (message.Bundle, error) { stop *stoppable.Single) (message.Bundle, error) {
start := time.Now() start := time.Now()
// Send to the gateways using backup proxies // Send to the gateways using backup proxies
result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID) (interface{}, error) { result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID, _ time.Duration) (interface{}, error) {
jww.DEBUG.Printf("Trying to get messages for round %v for ephemeralID %d (%v) "+ jww.DEBUG.Printf("Trying to get messages for round %v for ephemeralID %d (%v) "+
"via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), host.GetId()) "via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), host.GetId())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment