diff --git a/api/client.go b/api/client.go index 0271887422ee1171f9d9cd28cadcce7f26af5986..1e53bd0fadde17f9070806ce22d52398bc5ece0c 100644 --- a/api/client.go +++ b/api/client.go @@ -330,7 +330,6 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, //Open the client c, err := OpenClient(storageDir, password, parameters) - if err != nil { return nil, err } @@ -374,6 +373,58 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, return c, nil } +// LoginWithProtoClient creates a client object with a protoclient JSON containing the +// cryptographic primitives. This is designed for some specific deployment +//// procedures and is generally unsafe. +func LoginWithProtoClient(storageDir string, password []byte, protoClientJSON []byte, + newBaseNdf string, parameters params.Network) (*Client, error) { + jww.INFO.Printf("LoginWithNewBaseNDF_UNSAFE()") + + // Parse the NDF + def, err := parseNDF(newBaseNdf) + if err != nil { + return nil, err + } + + //Open the client + err = NewProtoClient_Unsafe(newBaseNdf, storageDir, password, protoClientJSON) + if err != nil { + return nil, err + } + + //Open the client + c, err := OpenClient(storageDir, password, parameters) + if err != nil { + return nil, err + } + + //initialize comms + err = c.initComms() + if err != nil { + return nil, err + } + + //store the updated base NDF + c.storage.SetNDF(def) + + // Initialize network and link it to context + c.network, err = network.NewManager(c.storage, c.switchboard, c.rng, + c.events, c.comms, parameters, def) + if err != nil { + return nil, err + } + + // initialize the auth tracker + c.auth = auth.NewManager(c.switchboard, c.storage, c.network) + + err = c.registerFollower() + if err != nil { + return nil, err + } + + return c, nil +} + func (c *Client) initComms() error { var err error diff --git a/bindings/fileTransfer.go b/bindings/fileTransfer.go index 1bf1258271867e25cdf5a0d93893e1416f6c2938..099cecc8496f254704267cb1c286b34a3355361b 100644 --- a/bindings/fileTransfer.go +++ b/bindings/fileTransfer.go @@ -47,9 +47,13 @@ type FileTransferReceiveFunc interface { // NewFileTransferManager creates a new file transfer manager and starts the // sending and receiving threads. The receiveFunc is called everytime a new file -// transfer is received. The parameters string is a JSON formatted string of the -// fileTransfer.Params object. If it is left empty, then defaults are used. It -// must match the following format: {"MaxThroughput":150000} +// transfer is received. +// The parameters string contains file transfer network configuration options +// and is a JSON formatted string of the fileTransfer.Params object. If it is +// left empty, then defaults are used. It is highly recommended that defaults +// are used. If it is set, it must match the following format: +// {"MaxThroughput":150000,"SendTimeout":500000000} +// MaxThroughput is in bytes/sec and SendTimeout is in nanoseconds. func NewFileTransferManager(client *Client, receiveFunc FileTransferReceiveFunc, parameters string) (*FileTransfer, error) { @@ -89,7 +93,7 @@ func NewFileTransferManager(client *Client, receiveFunc FileTransferReceiveFunc, // 48 bytes. // The file type identifies what type of file is being sent. It has a max length // of 8 bytes. -// The file data cannot be larger than 4 mB +// The file data cannot be larger than 256 kB // The retry float is the total amount of data to send relative to the data // size. Data will be resent on error and will resend up to [(1 + retry) * // fileSize]. diff --git a/cmd/fileTransfer.go b/cmd/fileTransfer.go index ca0e7da681929d98bad35871bb28d579c9cffa16..932abff2a8cc1a99574fb4128c6762ba6f84a535 100644 --- a/cmd/fileTransfer.go +++ b/cmd/fileTransfer.go @@ -144,6 +144,7 @@ func initFileTransferManager(client *api.Client, maxThroughput int) ( // Create new parameters p := ft.DefaultParams() + p.SendTimeout = 10 * time.Second if maxThroughput != 0 { p.MaxThroughput = maxThroughput } diff --git a/fileTransfer/ftMessages.proto b/fileTransfer/ftMessages.proto index 22976e3697ba8e34692f0a94f0225098f36ec33c..dae0d2f2a4d72784965924a547f7e5d599386172 100644 --- a/fileTransfer/ftMessages.proto +++ b/fileTransfer/ftMessages.proto @@ -18,7 +18,7 @@ message NewFileTransfer { bytes transferKey = 3; // 256 bit encryption key to identify the transfer bytes transferMac = 4; // 256 bit MAC of the entire file uint32 numParts = 5; // Number of file parts - uint32 size = 6; // The size of the file; max of 4 mB + uint32 size = 6; // The size of the file; max of 250 kB float retry = 7; // Used to determine how many times to retry sending bytes preview = 8; // A preview of the file; max of 4 kB } \ No newline at end of file diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index 6191cd96c8adbd1d4f31e6bf7f6f3a73ac32ac6f..e91303742ebc27ccb95810e004cd69b2d8449d62 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -37,8 +37,8 @@ const ( FileTypeMaxLen = 8 // FileMaxSize is the maximum file size that can be transferred. Currently, - // it is set to 4 mB. - FileMaxSize = 4_000_000 + // it is set to 250 kB. + FileMaxSize = 250_000 // minPartsSendPerRound is the minimum number of file parts sent each round. minPartsSendPerRound = 1 diff --git a/fileTransfer/params.go b/fileTransfer/params.go index ec2d3b53a0734e8ceed0da9f66553a4b283ca80b..754bb88db3d45c75f3e47454f45bf4cf6170e830 100644 --- a/fileTransfer/params.go +++ b/fileTransfer/params.go @@ -7,8 +7,11 @@ package fileTransfer +import "time" + const ( defaultMaxThroughput = 150_000 // 150 kB per second + defaultSendTimeout = 500 * time.Millisecond ) // Params contains parameters used for file transfer. @@ -16,11 +19,17 @@ type Params struct { // MaxThroughput is the maximum data transfer speed to send file parts (in // bytes per second) MaxThroughput int + + // SendTimeout is the duration, in nanoseconds, before sending on a round + // times out. It is recommended that SendTimeout is not changed from its + // default. + SendTimeout time.Duration } // DefaultParams returns a Params object filled with the default values. func DefaultParams() Params { return Params{ MaxThroughput: defaultMaxThroughput, + SendTimeout: defaultSendTimeout, } } diff --git a/fileTransfer/params_test.go b/fileTransfer/params_test.go index 4cf19e52ca927e098b299229d6918f83c3c6f67a..b60676c994610796744fe326bce6cff09775abc4 100644 --- a/fileTransfer/params_test.go +++ b/fileTransfer/params_test.go @@ -16,6 +16,7 @@ import ( func TestDefaultParams(t *testing.T) { expected := Params{ MaxThroughput: defaultMaxThroughput, + SendTimeout: defaultSendTimeout, } received := DefaultParams() diff --git a/fileTransfer/send.go b/fileTransfer/send.go index 6b742a7e618538f2b08c51d2c60849ad5fc86cb2..dbab0994774046b8ce5fa62ab8030fbde4335fd8 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -59,10 +59,13 @@ const ( const ( // Duration to wait for round to finish before timing out. - roundResultsTimeout = 60 * time.Second + roundResultsTimeout = 15 * time.Second // Duration to wait for send batch to fill before sending partial batch. pollSleepDuration = 100 * time.Millisecond + + // Age when rounds that files were sent from are deleted from the tracker + clearSentRoundsAge = 10 * time.Second ) // sendThread waits on the sendQueue channel for parts to send. Once its @@ -83,6 +86,10 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, // Batch of parts read from the queue to be sent var partList []queuedPart + // Create new sent round tracker that tracks which recent rounds file parts + // were sent on so that they can be avoided on subsequent sends + sentRounds := newSentRoundTracker(clearSentRoundsAge) + // The size of each batch var numParts int @@ -132,7 +139,7 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, // If the batch is full, then send the parts if len(partList) == numParts { quit := m.handleSend( - &partList, &lastSend, delay, stop, healthChanID) + &partList, &lastSend, delay, stop, healthChanID, sentRounds) if quit { return } @@ -149,7 +156,7 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, } quit := m.handleSend( - &partList, &lastSend, delay, stop, healthChanID) + &partList, &lastSend, delay, stop, healthChanID, sentRounds) if quit { return } @@ -183,7 +190,8 @@ func (m *Manager) closeSendThread(partList []queuedPart, stop *stoppable.Single, // Returns true if the stoppable has been triggered and the sending thread // should quit. func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, - delay time.Duration, stop *stoppable.Single, healthChanID uint64) bool { + delay time.Duration, stop *stoppable.Single, healthChanID uint64, + sentRounds *sentRoundTracker) bool { // Bandwidth limiter: wait to send until the delay has been reached so that // the bandwidth is limited to the maximum throughput if netTime.Since(*lastSend) < delay { @@ -201,7 +209,7 @@ func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, } // Send all the messages - err := m.sendParts(*partList) + err := m.sendParts(*partList, sentRounds) if err != nil { jww.FATAL.Panic(err) } @@ -217,7 +225,8 @@ func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, // sendParts handles the composing and sending of a cMix message for each part // in the list. All errors returned are fatal errors. -func (m *Manager) sendParts(partList []queuedPart) error { +func (m *Manager) sendParts(partList []queuedPart, + sentRounds *sentRoundTracker) error { // Build cMix messages messages, transfers, groupedParts, partsToResend, err := @@ -231,8 +240,16 @@ func (m *Manager) sendParts(partList []queuedPart) error { return nil } + // Clear all old rounds from the sent rounds list + sentRounds.removeOldRounds() + + // Create cMix parameters with round exclusion list + p := params.GetDefaultCMIX() + p.SendTimeout = m.p.SendTimeout + p.ExcludedRounds = sentRounds + // Send parts - rid, _, err := m.net.SendManyCMIX(messages, params.GetDefaultCMIX()) + rid, _, err := m.net.SendManyCMIX(messages, p) if err != nil { // If an error occurs, then print a warning and add the file parts back // to the queue to try sending again diff --git a/fileTransfer/send_test.go b/fileTransfer/send_test.go index a1360b215e610841d5c164610a1ee728a390f906..45c273e90bb41657c4ff9042808bcb1497bf93be 100644 --- a/fileTransfer/send_test.go +++ b/fileTransfer/send_test.go @@ -282,7 +282,7 @@ func TestManager_sendParts(t *testing.T) { queuedParts[i], queuedParts[j] = queuedParts[j], queuedParts[i] }) - err := m.sendParts(queuedParts) + err := m.sendParts(queuedParts, newSentRoundTracker(clearSentRoundsAge)) if err != nil { t.Errorf("sendParts returned an error: %+v", err) } @@ -356,7 +356,7 @@ func TestManager_sendParts_SendManyCmixError(t *testing.T) { } } - err := m.sendParts(queuedParts) + err := m.sendParts(queuedParts, newSentRoundTracker(clearSentRoundsAge)) if err != nil { t.Errorf("sendParts returned an error: %+v", err) } @@ -402,7 +402,7 @@ func TestManager_sendParts_RoundResultsError(t *testing.T) { } expectedErr := fmt.Sprintf(getRoundResultsErr, 0, tIDs, grrErr) - err := m.sendParts(queuedParts) + err := m.sendParts(queuedParts, newSentRoundTracker(clearSentRoundsAge)) if err == nil || err.Error() != expectedErr { t.Errorf("sendParts did not return the expected error when "+ "GetRoundResults should have returned an error."+ diff --git a/fileTransfer/sentRoundTracker.go b/fileTransfer/sentRoundTracker.go new file mode 100644 index 0000000000000000000000000000000000000000..80fa1da210befa6d3cc7cec698d197259a903ecb --- /dev/null +++ b/fileTransfer/sentRoundTracker.go @@ -0,0 +1,98 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package fileTransfer + +import ( + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" + "sync" + "time" +) + +// sentRoundTracker keeps track of rounds that file parts were sent on and when +// those rounds occurred. Rounds past the given age can be deleted manually. +type sentRoundTracker struct { + rounds map[id.Round]time.Time + age time.Duration + mux sync.RWMutex +} + +// newSentRoundTracker returns an empty sentRoundTracker. +func newSentRoundTracker(interval time.Duration) *sentRoundTracker { + return &sentRoundTracker{ + rounds: make(map[id.Round]time.Time), + age: interval, + } +} + +// removeOldRounds removes any rounds that are older than the max round age. +func (srt *sentRoundTracker) removeOldRounds() { + srt.mux.Lock() + defer srt.mux.Unlock() + deleteBefore := netTime.Now().Add(-srt.age) + + for rid, timeStamp := range srt.rounds { + if timeStamp.Before(deleteBefore) { + delete(srt.rounds, rid) + } + } +} + +// Has indicates if the round ID is in the tracker. +func (srt *sentRoundTracker) Has(rid id.Round) bool { + srt.mux.RLock() + defer srt.mux.RUnlock() + + _, exists := srt.rounds[rid] + return exists +} + +// Insert adds the round to the tracker with the current time. +func (srt *sentRoundTracker) Insert(rid id.Round) { + timeNow := netTime.Now() + srt.mux.Lock() + defer srt.mux.Unlock() + + srt.rounds[rid] = timeNow +} + +// Remove deletes a round ID from the tracker. +func (srt *sentRoundTracker) Remove(rid id.Round) { + srt.mux.Lock() + defer srt.mux.Unlock() + delete(srt.rounds, rid) +} + +// Len returns the number of round IDs in the tracker. +func (srt *sentRoundTracker) Len() int { + srt.mux.RLock() + defer srt.mux.RUnlock() + + return len(srt.rounds) +} + +// GetRoundIDs returns a list of all round IDs in the tracker. +func (srt *sentRoundTracker) GetRoundIDs() []id.Round { + srt.mux.RLock() + defer srt.mux.RUnlock() + + roundIDs := make([]id.Round, 0, len(srt.rounds)) + + for rid := range srt.rounds { + roundIDs = append(roundIDs, rid) + } + + return roundIDs +} diff --git a/fileTransfer/sentRoundTracker_test.go b/fileTransfer/sentRoundTracker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..451ec1eff967d387e3c904791c5f3f8bf82abffb --- /dev/null +++ b/fileTransfer/sentRoundTracker_test.go @@ -0,0 +1,184 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package fileTransfer + +import ( + "gitlab.com/xx_network/primitives/id" + "reflect" + "testing" + "time" +) + +// Tests that newSentRoundTracker returns the expected new sentRoundTracker. +func Test_newSentRoundTracker(t *testing.T) { + interval := 10 * time.Millisecond + expected := &sentRoundTracker{ + rounds: make(map[id.Round]time.Time), + age: interval, + } + + srt := newSentRoundTracker(interval) + + if !reflect.DeepEqual(expected, srt) { + t.Errorf("New sentRoundTracker does not match expected."+ + "\nexpected: %+v\nreceived: %+v", expected, srt) + } +} + +// Tests that sentRoundTracker.removeOldRounds removes only old rounds and not +// newer rounds. +func Test_sentRoundTracker_removeOldRounds(t *testing.T) { + srt := newSentRoundTracker(50 * time.Millisecond) + + // Add odd round to tracker + for rid := id.Round(0); rid < 100; rid++ { + if rid%2 != 0 { + srt.Insert(rid) + } + } + + time.Sleep(50 * time.Millisecond) + + // Add even round to tracker + for rid := id.Round(0); rid < 100; rid++ { + if rid%2 == 0 { + srt.Insert(rid) + } + } + + // Remove all old rounds (should be all odd rounds) + srt.removeOldRounds() + + // Check that only even rounds exist + for rid := id.Round(0); rid < 100; rid++ { + if srt.Has(rid) { + if rid%2 != 0 { + t.Errorf("Round %d exists.", rid) + } + } else if rid%2 == 0 { + t.Errorf("Round %d does not exist.", rid) + } + } +} + +// Tests that sentRoundTracker.Has returns true for all the even rounds and +// false for all odd rounds. +func Test_sentRoundTracker_Has(t *testing.T) { + srt := newSentRoundTracker(0) + + // Insert even rounds into the tracker + for rid := id.Round(0); rid < 100; rid++ { + if rid%2 == 0 { + srt.Insert(rid) + } + } + + // Check that only even rounds exist + for rid := id.Round(0); rid < 100; rid++ { + if srt.Has(rid) { + if rid%2 != 0 { + t.Errorf("Round %d exists.", rid) + } + } else if rid%2 == 0 { + t.Errorf("Round %d does not exist.", rid) + } + } +} + +// Tests that sentRoundTracker.Insert adds all the expected rounds. +func Test_sentRoundTracker_Insert(t *testing.T) { + srt := newSentRoundTracker(0) + + // Insert even rounds into the tracker + for rid := id.Round(0); rid < 100; rid++ { + if rid%2 == 0 { + srt.Insert(rid) + } + } + + // Check that only even rounds were added + for rid := id.Round(0); rid < 100; rid++ { + _, exists := srt.rounds[rid] + if exists { + if rid%2 != 0 { + t.Errorf("Round %d exists.", rid) + } + } else if rid%2 == 0 { + t.Errorf("Round %d does not exist.", rid) + } + } +} + +// Tests that sentRoundTracker.Remove removes all even rounds. +func Test_sentRoundTracker_Remove(t *testing.T) { + srt := newSentRoundTracker(0) + + // Add all round to tracker + for rid := id.Round(0); rid < 100; rid++ { + srt.Insert(rid) + } + + // Remove even rounds from the tracker + for rid := id.Round(0); rid < 100; rid++ { + if rid%2 == 0 { + srt.Remove(rid) + } + } + + // Check that only even rounds were removed + for rid := id.Round(0); rid < 100; rid++ { + _, exists := srt.rounds[rid] + if exists { + if rid%2 == 0 { + t.Errorf("Round %d does not exist.", rid) + } + } else if rid%2 != 0 { + t.Errorf("Round %d exists.", rid) + } + } +} + +// Tests that sentRoundTracker.Len returns the expected length when the tracker +// is empty, filled, and then modified. +func Test_sentRoundTracker_Len(t *testing.T) { + srt := newSentRoundTracker(0) + + if srt.Len() != 0 { + t.Errorf("Length of tracker incorrect.\nexpected: %d\nreceived: %d", + 0, srt.Len()) + } + + // Add all round to tracker + for rid := id.Round(0); rid < 100; rid++ { + srt.Insert(rid) + } + + if srt.Len() != 100 { + t.Errorf("Length of tracker incorrect.\nexpected: %d\nreceived: %d", + 100, srt.Len()) + } + + // Remove even rounds from the tracker + for rid := id.Round(0); rid < 100; rid++ { + if rid%2 == 0 { + srt.Remove(rid) + } + } + + if srt.Len() != 50 { + t.Errorf("Length of tracker incorrect.\nexpected: %d\nreceived: %d", + 50, srt.Len()) + } +} diff --git a/interfaces/params/CMIX.go b/interfaces/params/CMIX.go index 83a287fc460d5ce0254496d98610379fd2f8bee8..5e772d0360a94350ff6b99b6db33d22934e5410d 100644 --- a/interfaces/params/CMIX.go +++ b/interfaces/params/CMIX.go @@ -19,6 +19,11 @@ type CMIX struct { Timeout time.Duration RetryDelay time.Duration ExcludedRounds excludedRounds.ExcludedRounds + + // Duration to wait before sending on a round times out and a new round is + // tried + SendTimeout time.Duration + // an alternate identity preimage to use on send. If not set, the default // for the sending identity will be used IdentityPreimage []byte @@ -26,9 +31,10 @@ type CMIX struct { func GetDefaultCMIX() CMIX { return CMIX{ - RoundTries: 10, - Timeout: 25 * time.Second, - RetryDelay: 1 * time.Second, + RoundTries: 10, + Timeout: 25 * time.Second, + RetryDelay: 1 * time.Second, + SendTimeout: 3 * time.Second, } } diff --git a/interfaces/params/rounds.go b/interfaces/params/rounds.go index 3e39ad47827e5f5f6f9dc526fcfffa200f5cf5a8..40bd41bfbd80a6cef2ecf30d8a1c645782f7b305 100644 --- a/interfaces/params/rounds.go +++ b/interfaces/params/rounds.go @@ -39,6 +39,10 @@ type Rounds struct { // Toggles if message pickup retrying mechanism if forced // by intentionally not looking up messages ForceMessagePickupRetry bool + + // Duration to wait before sending on a round times out and a new round is + // tried + SendTimeout time.Duration } func GetDefaultRounds() Rounds { @@ -53,5 +57,6 @@ func GetDefaultRounds() Rounds { MaxHistoricalRoundsRetries: 3, UncheckRoundPeriod: 20 * time.Second, ForceMessagePickupRetry: false, + SendTimeout: 1 * time.Second, } } diff --git a/network/gateway/sender.go b/network/gateway/sender.go index c3c7bff8259feb2f477d658791c39c42e079064c..d09decce432779a13271667851159580dbc6f824 100644 --- a/network/gateway/sender.go +++ b/network/gateway/sender.go @@ -18,7 +18,9 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" + "gitlab.com/xx_network/primitives/netTime" "strings" + "time" ) // Sender Object used for sending that wraps the HostPool for providing destinations @@ -76,17 +78,31 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error return nil, errors.Errorf("Unable to send to any proxies") } -// SendToPreferred Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations -func (s *Sender) SendToPreferred(targets []*id.ID, - sendFunc func(host *connect.Host, target *id.ID) (interface{}, error), - stop *stoppable.Single) (interface{}, error) { +// sendToPreferredFunc is the send function passed into Sender.SendToPreferred. +type sendToPreferredFunc func(host *connect.Host, target *id.ID, + timeout time.Duration) (interface{}, error) + +// SendToPreferred Call given sendFunc to any Host in the HostPool, attempting +// with up to numProxies destinations. Returns an error if the timeout is +// reached. +func (s *Sender) SendToPreferred(targets []*id.ID, sendFunc sendToPreferredFunc, + stop *stoppable.Single, timeout time.Duration) (interface{}, error) { + + startTime := netTime.Now() // Get the hosts and shuffle randomly targetHosts := s.getPreferred(targets) // Attempt to send directly to targets if they are in the HostPool for i := range targetHosts { - result, err := sendFunc(targetHosts[i], targets[i]) + // Return an error if the timeout duration is reached + if netTime.Since(startTime) > timeout { + return nil, errors.Errorf( + "sending to targets in HostPool timed out after %s", timeout) + } + + remainingTimeout := timeout - netTime.Since(startTime) + result, err := sendFunc(targetHosts[i], targets[i], remainingTimeout) if stop != nil && !stop.IsRunning() { return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred") } else if err == nil { @@ -135,6 +151,12 @@ func (s *Sender) SendToPreferred(targets []*id.ID, for proxyIdx := uint32(0); proxyIdx < s.poolParams.ProxyAttempts; proxyIdx++ { for targetIdx := range proxies { + // Return an error if the timeout duration is reached + if netTime.Since(startTime) > timeout { + return nil, errors.Errorf("iterating over target's procies "+ + "timed out after %s", timeout) + } + target := targets[targetIdx] targetProxies := proxies[targetIdx] if !(int(proxyIdx) < len(targetProxies)) { @@ -150,7 +172,8 @@ func (s *Sender) SendToPreferred(targets []*id.ID, continue } - result, err := sendFunc(proxy, target) + remainingTimeout := timeout - netTime.Since(startTime) + result, err := sendFunc(proxy, target, remainingTimeout) if stop != nil && !stop.IsRunning() { return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred") } else if err == nil { diff --git a/network/gateway/sender_test.go b/network/gateway/sender_test.go index d8dbf16227e8724ef509589bb7133efea888cc26..771df617287adcc15cc095ddcab4a5112206f162 100644 --- a/network/gateway/sender_test.go +++ b/network/gateway/sender_test.go @@ -16,6 +16,7 @@ import ( "gitlab.com/xx_network/primitives/id" "reflect" "testing" + "time" ) // Unit test @@ -139,7 +140,8 @@ func TestSender_SendToPreferred(t *testing.T) { preferredHost := sender.hostList[preferredIndex] // Happy path - result, err := sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, SendToPreferred_HappyPath, nil) + result, err := sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, + SendToPreferred_HappyPath, nil, 250*time.Millisecond) if err != nil { t.Errorf("Should not error in SendToPreferred happy path: %v", err) } @@ -151,7 +153,8 @@ func TestSender_SendToPreferred(t *testing.T) { } // Call a send which returns an error which triggers replacement - _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, SendToPreferred_KnownError, nil) + _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, + SendToPreferred_KnownError, nil, 250*time.Millisecond) if err == nil { t.Fatalf("Expected error path did not receive error") } @@ -171,7 +174,8 @@ func TestSender_SendToPreferred(t *testing.T) { preferredHost = sender.hostList[preferredIndex] // Unknown error return will not trigger replacement - _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, SendToPreferred_UnknownError, nil) + _, err = sender.SendToPreferred([]*id.ID{preferredHost.GetId()}, + SendToPreferred_UnknownError, nil, 250*time.Millisecond) if err == nil { t.Fatalf("Expected error path did not receive error") } diff --git a/network/gateway/utils_test.go b/network/gateway/utils_test.go index 2a90528ff10dc53a2375349d87cbcffba3aa77e4..7bd89f1237cac73ec5beb37cf6a0870621815bff 100644 --- a/network/gateway/utils_test.go +++ b/network/gateway/utils_test.go @@ -13,6 +13,7 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" + "time" ) // Mock structure adhering to HostManager to be used for happy path @@ -142,15 +143,15 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition { const happyPathReturn = "happyPathReturn" -func SendToPreferred_HappyPath(*connect.Host, *id.ID) (interface{}, error) { +func SendToPreferred_HappyPath(*connect.Host, *id.ID, time.Duration) (interface{}, error) { return happyPathReturn, nil } -func SendToPreferred_KnownError(*connect.Host, *id.ID) (interface{}, error) { +func SendToPreferred_KnownError(*connect.Host, *id.ID, time.Duration) (interface{}, error) { return nil, errors.Errorf(errorsList[0]) } -func SendToPreferred_UnknownError(*connect.Host, *id.ID) (interface{}, error) { +func SendToPreferred_UnknownError(*connect.Host, *id.ID, time.Duration) (interface{}, error) { return nil, errors.Errorf("Unexpected error: Oopsie") } diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index c38948368440cff9e6cc69f530aee032d0b8a117..6f3aa5eb1f395ccaf26772473566afddb2b38371 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -167,13 +167,23 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, encMsg.Digest(), firstGateway.String()) // Send the payload - sendFunc := func(host *connect.Host, target *id.ID) (interface{}, error) { + sendFunc := func(host *connect.Host, target *id.ID, + timeout time.Duration) (interface{}, error) { wrappedMsg.Target = target.Marshal() jww.TRACE.Printf("[sendCMIX] sendFunc %s", host) timeout := calculateSendTimeout(bestRound, maxTimeout) jww.TRACE.Printf("[sendCMIX] sendFunc %s timeout %s", host, timeout) + result, err := comms.SendPutMessage(host, wrappedMsg, + timeout) + jww.TRACE.Printf("[sendCMIX] sendFunc %s putmsg", host) + // Use the smaller of the two timeout durations + calculatedTimeout := calculateSendTimeout(bestRound, maxTimeout) + if calculatedTimeout < timeout { + timeout = calculatedTimeout + } + result, err := comms.SendPutMessage(host, wrappedMsg, timeout) jww.TRACE.Printf("[sendCMIX] sendFunc %s putmsg", host) @@ -190,7 +200,8 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, return result, err } jww.DEBUG.Printf("[sendCMIX] sendToPreferred %s", firstGateway) - result, err := sender.SendToPreferred([]*id.ID{firstGateway}, sendFunc, stop) + result, err := sender.SendToPreferred( + []*id.ID{firstGateway}, sendFunc, stop, cmixParams.SendTimeout) jww.DEBUG.Printf("[sendCMIX] sendToPreferred %s returned", firstGateway) diff --git a/network/message/sendManyCmix.go b/network/message/sendManyCmix.go index 69043e60f24504bc1c00c37a2ec97dc807fd4d67..58edff8076be9e365f0c4873281c78141136f68c 100644 --- a/network/message/sendManyCmix.go +++ b/network/message/sendManyCmix.go @@ -27,6 +27,7 @@ import ( "gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/netTime" "strings" + "time" ) // SendManyCMIX sends many "raw" cMix message payloads to each of the provided @@ -165,11 +166,17 @@ func sendManyCmixHelper(sender *gateway.Sender, } // Send the payload - sendFunc := func(host *connect.Host, target *id.ID) (interface{}, error) { + sendFunc := func(host *connect.Host, target *id.ID, + timeout time.Duration) (interface{}, error) { + // Use the smaller of the two timeout durations + calculatedTimeout := calculateSendTimeout(bestRound, maxTimeout) + if calculatedTimeout < timeout { + timeout = calculatedTimeout + } + wrappedMessage.Target = target.Marshal() - timeout := calculateSendTimeout(bestRound, maxTimeout) - result, err := comms.SendPutManyMessages(host, - wrappedMessage, timeout) + result, err := comms.SendPutManyMessages( + host, wrappedMessage, timeout) if err != nil { err := handlePutMessageError(firstGateway, instance, session, nodeRegistration, recipientString, bestRound, err) @@ -181,7 +188,7 @@ func sendManyCmixHelper(sender *gateway.Sender, return result, err } result, err := sender.SendToPreferred( - []*id.ID{firstGateway}, sendFunc, stop) + []*id.ID{firstGateway}, sendFunc, stop, param.SendTimeout) // Exit if the thread has been stopped if stoppable.CheckErr(err) { diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index 3682de1a2970c30f5f816aaa8cde22cf221faac8..1db833b43ccf1b415efe44b1d0f10aefe3aa2bed 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -148,7 +148,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, stop *stoppable.Single) (message.Bundle, error) { start := time.Now() // Send to the gateways using backup proxies - result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID) (interface{}, error) { + result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID, _ time.Duration) (interface{}, error) { jww.DEBUG.Printf("Trying to get messages for round %v for ephemeralID %d (%v) "+ "via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), host.GetId()) @@ -173,7 +173,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, } return msgResp, nil - }, stop) + }, stop, m.params.SendTimeout) jww.INFO.Printf("Received message for round %d, processing...", roundID) // Fail the round if an error occurs so it can be tried again later if err != nil { diff --git a/network/rounds/utils_test.go b/network/rounds/utils_test.go index f323c266aa5744034fc369637ba1dcce217caa08..8779a68acdf4f0c929186ddb59b6a0ad26bc3f05 100644 --- a/network/rounds/utils_test.go +++ b/network/rounds/utils_test.go @@ -9,6 +9,7 @@ package rounds import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/network/internal" "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/storage" @@ -28,6 +29,7 @@ func newManager(face interface{}) *Manager { sess1 := storage.InitTestingSession(face) testManager := &Manager{ + params: params.GetDefaultRounds(), lookupRoundMessages: make(chan roundLookup), messageBundles: make(chan message.Bundle), Internal: internal.Internal{