diff --git a/bindings/fileTransfer.go b/bindings/fileTransfer.go index 1bf1258271867e25cdf5a0d93893e1416f6c2938..099cecc8496f254704267cb1c286b34a3355361b 100644 --- a/bindings/fileTransfer.go +++ b/bindings/fileTransfer.go @@ -47,9 +47,13 @@ type FileTransferReceiveFunc interface { // NewFileTransferManager creates a new file transfer manager and starts the // sending and receiving threads. The receiveFunc is called everytime a new file -// transfer is received. The parameters string is a JSON formatted string of the -// fileTransfer.Params object. If it is left empty, then defaults are used. It -// must match the following format: {"MaxThroughput":150000} +// transfer is received. +// The parameters string contains file transfer network configuration options +// and is a JSON formatted string of the fileTransfer.Params object. If it is +// left empty, then defaults are used. It is highly recommended that defaults +// are used. If it is set, it must match the following format: +// {"MaxThroughput":150000,"SendTimeout":500000000} +// MaxThroughput is in bytes/sec and SendTimeout is in nanoseconds. func NewFileTransferManager(client *Client, receiveFunc FileTransferReceiveFunc, parameters string) (*FileTransfer, error) { @@ -89,7 +93,7 @@ func NewFileTransferManager(client *Client, receiveFunc FileTransferReceiveFunc, // 48 bytes. // The file type identifies what type of file is being sent. It has a max length // of 8 bytes. -// The file data cannot be larger than 4 mB +// The file data cannot be larger than 256 kB // The retry float is the total amount of data to send relative to the data // size. Data will be resent on error and will resend up to [(1 + retry) * // fileSize]. diff --git a/cmd/fileTransfer.go b/cmd/fileTransfer.go index 04214377d23c0fdbf6642159888cb65c933f0e9f..932abff2a8cc1a99574fb4128c6762ba6f84a535 100644 --- a/cmd/fileTransfer.go +++ b/cmd/fileTransfer.go @@ -144,8 +144,9 @@ func initFileTransferManager(client *api.Client, maxThroughput int) ( // Create new parameters p := ft.DefaultParams() + p.SendTimeout = 10 * time.Second if maxThroughput != 0 { - p = ft.NewParams(maxThroughput) + p.MaxThroughput = maxThroughput } // Create new manager diff --git a/fileTransfer/ftMessages.proto b/fileTransfer/ftMessages.proto index 22976e3697ba8e34692f0a94f0225098f36ec33c..dae0d2f2a4d72784965924a547f7e5d599386172 100644 --- a/fileTransfer/ftMessages.proto +++ b/fileTransfer/ftMessages.proto @@ -18,7 +18,7 @@ message NewFileTransfer { bytes transferKey = 3; // 256 bit encryption key to identify the transfer bytes transferMac = 4; // 256 bit MAC of the entire file uint32 numParts = 5; // Number of file parts - uint32 size = 6; // The size of the file; max of 4 mB + uint32 size = 6; // The size of the file; max of 250 kB float retry = 7; // Used to determine how many times to retry sending bytes preview = 8; // A preview of the file; max of 4 kB } \ No newline at end of file diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index 56e2a21fc023e69faf66bdb77d28ce2ad65df931..e91303742ebc27ccb95810e004cd69b2d8449d62 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -37,8 +37,8 @@ const ( FileTypeMaxLen = 8 // FileMaxSize is the maximum file size that can be transferred. Currently, - // it is set to 4 mB. - FileMaxSize = 4_000_000 + // it is set to 250 kB. + FileMaxSize = 250_000 // minPartsSendPerRound is the minimum number of file parts sent each round. minPartsSendPerRound = 1 @@ -100,13 +100,13 @@ type Manager struct { // Queue of parts to send sendQueue chan queuedPart - // Maximum data transfer speed in bytes per second - maxThroughput int - // Indicates if old transfers saved to storage have been recovered after // file transfer is closed and reopened oldTransfersRecovered bool + // File transfer parameters + p Params + // Client interfaces client *api.Client store *storage.Session @@ -166,8 +166,8 @@ func newManager(client *api.Client, store *storage.Session, sent: sent, received: received, sendQueue: make(chan queuedPart, sendQueueBuffLen), - maxThroughput: p.MaxThroughput, oldTransfersRecovered: false, + p: p, client: client, store: store, swb: swb, diff --git a/fileTransfer/params.go b/fileTransfer/params.go index 76fa1cf2d871f78aeba37b6b08e8b73e2dcdfb8f..754bb88db3d45c75f3e47454f45bf4cf6170e830 100644 --- a/fileTransfer/params.go +++ b/fileTransfer/params.go @@ -7,8 +7,11 @@ package fileTransfer +import "time" + const ( defaultMaxThroughput = 150_000 // 150 kB per second + defaultSendTimeout = 500 * time.Millisecond ) // Params contains parameters used for file transfer. @@ -16,18 +19,17 @@ type Params struct { // MaxThroughput is the maximum data transfer speed to send file parts (in // bytes per second) MaxThroughput int -} -// NewParams generates a new Params object filled with the given parameters. -func NewParams(maxThroughput int) Params { - return Params{ - MaxThroughput: maxThroughput, - } + // SendTimeout is the duration, in nanoseconds, before sending on a round + // times out. It is recommended that SendTimeout is not changed from its + // default. + SendTimeout time.Duration } // DefaultParams returns a Params object filled with the default values. func DefaultParams() Params { return Params{ MaxThroughput: defaultMaxThroughput, + SendTimeout: defaultSendTimeout, } } diff --git a/fileTransfer/params_test.go b/fileTransfer/params_test.go index 8a41f1fc8de56b6c906ccecb6d9761b4096402f0..b60676c994610796744fe326bce6cff09775abc4 100644 --- a/fileTransfer/params_test.go +++ b/fileTransfer/params_test.go @@ -12,23 +12,12 @@ import ( "testing" ) -// Tests that NewParams returns the expected Params object. -func TestNewParams(t *testing.T) { - expected := Params{ - MaxThroughput: 42, - } - - received := NewParams(expected.MaxThroughput) - - if !reflect.DeepEqual(expected, received) { - t.Errorf("Received Params does not match expected."+ - "\nexpected: %+v\nreceived: %+v", expected, received) - } -} - // Tests that DefaultParams returns a Params object with the expected defaults. func TestDefaultParams(t *testing.T) { - expected := Params{MaxThroughput: defaultMaxThroughput} + expected := Params{ + MaxThroughput: defaultMaxThroughput, + SendTimeout: defaultSendTimeout, + } received := DefaultParams() if !reflect.DeepEqual(expected, received) { diff --git a/fileTransfer/send.go b/fileTransfer/send.go index e35e80541d0936a70417517a0ecc454305c28e07..dbab0994774046b8ce5fa62ab8030fbde4335fd8 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -59,16 +59,13 @@ const ( const ( // Duration to wait for round to finish before timing out. - roundResultsTimeout = 60 * time.Second + roundResultsTimeout = 15 * time.Second // Duration to wait for send batch to fill before sending partial batch. pollSleepDuration = 100 * time.Millisecond // Age when rounds that files were sent from are deleted from the tracker clearSentRoundsAge = 10 * time.Second - - // Duration before sending on a round times out - sendTimeout = 500 * time.Millisecond ) // sendThread waits on the sendQueue channel for parts to send. Once its @@ -84,7 +81,7 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, avgSendSize := avgNumMessages * (8192 / 8) // Calculate the delay needed to reach max throughput - delay := time.Duration((int(time.Second) * avgSendSize) / m.maxThroughput) + delay := time.Duration((int(time.Second) * avgSendSize) / m.p.MaxThroughput) // Batch of parts read from the queue to be sent var partList []queuedPart @@ -248,7 +245,7 @@ func (m *Manager) sendParts(partList []queuedPart, // Create cMix parameters with round exclusion list p := params.GetDefaultCMIX() - p.SendTimeout = sendTimeout + p.SendTimeout = m.p.SendTimeout p.ExcludedRounds = sentRounds // Send parts @@ -319,24 +316,24 @@ func (m *Manager) buildMessages(partList []queuedPart) ( for i, part := range partList { // Lookup the transfer by the ID; if the transfer does not exist, then // print a warning and skip this message - transfer, err := m.sent.GetTransfer(part.tid) + st, err := m.sent.GetTransfer(part.tid) if err != nil { jww.WARN.Printf(noSentTransferWarn, part.tid, part.partNum, err) continue } // Generate new cMix message with encrypted file part - cmixMsg, err := m.newCmixMessage(transfer, part.partNum, rng) + cmixMsg, err := m.newCmixMessage(st, part.partNum, rng) if err == ftStorage.MaxRetriesErr { jww.DEBUG.Printf("[FT] File transfer %s sent to %s ran out of "+ - "retries {parts: %d, fingerprints: %d}", - part.tid, transfer.GetRecipient(), transfer.GetNumParts(), - transfer.GetNumFps()) + "retries {parts: %d, numFps: %d/%d}", + part.tid, st.GetRecipient(), st.GetNumParts(), + st.GetNumFps()-st.GetNumAvailableFps(), st.GetNumFps()) // If the max number of retries has been reached, then report the // error on the callback, delete the transfer, and skip to the next // message - go transfer.CallProgressCB(errors.Errorf(maxRetriesErr, err)) + go st.CallProgressCB(errors.Errorf(maxRetriesErr, err)) continue } else if err != nil { // For all other errors, return an error @@ -346,13 +343,13 @@ func (m *Manager) buildMessages(partList []queuedPart) ( // Construct TargetedCmixMessage msg := message.TargetedCmixMessage{ - Recipient: transfer.GetRecipient(), + Recipient: st.GetRecipient(), Message: cmixMsg, } // Add to list of messages to send messages = append(messages, msg) - transfers[part.tid] = transfer + transfers[part.tid] = st groupedParts[part.tid] = append(groupedParts[part.tid], part.partNum) partsToResend = append(partsToResend, i) } diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index 85bd6f9e040be6298429ee72a13cf78233192e84..4321e4be273a74d600c402bbd602a7d88554f611 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -189,12 +189,15 @@ func newTestManager(sendErr bool, sendChan, sendE2eChan chan message.Receive, avgNumMessages := (minPartsSendPerRound + maxPartsSendPerRound) / 2 avgSendSize := avgNumMessages * (8192 / 8) + p := DefaultParams() + p.MaxThroughput = int(time.Second) * avgSendSize + m := &Manager{ receiveCB: receiveCB, sent: sent, received: received, sendQueue: make(chan queuedPart, sendQueueBuffLen), - maxThroughput: int(time.Second) * avgSendSize, + p: p, store: storage.InitTestingSession(t), swb: switchboard.New(), net: net,