Skip to content
Snippets Groups Projects
Commit 22eaa388 authored by Jono Wenger's avatar Jono Wenger
Browse files

Add SendTimeout to Params and add it to the manager

parent 150a1eac
No related branches found
No related tags found
2 merge requests!117Release,!105XX-3617 / faster file transfer
......@@ -47,9 +47,13 @@ type FileTransferReceiveFunc interface {
// NewFileTransferManager creates a new file transfer manager and starts the
// sending and receiving threads. The receiveFunc is called everytime a new file
// transfer is received. The parameters string is a JSON formatted string of the
// fileTransfer.Params object. If it is left empty, then defaults are used. It
// must match the following format: {"MaxThroughput":150000}
// transfer is received.
// The parameters string contains file transfer network configuration options
// and is a JSON formatted string of the fileTransfer.Params object. If it is
// left empty, then defaults are used. It is highly recommended that defaults
// are used. If it is set, it must match the following format:
// {"MaxThroughput":150000,"SendTimeout":500000000}
// MaxThroughput is in bytes/sec and SendTimeout is in nanoseconds.
func NewFileTransferManager(client *Client, receiveFunc FileTransferReceiveFunc,
parameters string) (*FileTransfer, error) {
......@@ -89,7 +93,7 @@ func NewFileTransferManager(client *Client, receiveFunc FileTransferReceiveFunc,
// 48 bytes.
// The file type identifies what type of file is being sent. It has a max length
// of 8 bytes.
// The file data cannot be larger than 4 mB
// The file data cannot be larger than 256 kB
// The retry float is the total amount of data to send relative to the data
// size. Data will be resent on error and will resend up to [(1 + retry) *
// fileSize].
......
......@@ -144,8 +144,9 @@ func initFileTransferManager(client *api.Client, maxThroughput int) (
// Create new parameters
p := ft.DefaultParams()
p.SendTimeout = 10 * time.Second
if maxThroughput != 0 {
p = ft.NewParams(maxThroughput)
p.MaxThroughput = maxThroughput
}
// Create new manager
......
......@@ -18,7 +18,7 @@ message NewFileTransfer {
bytes transferKey = 3; // 256 bit encryption key to identify the transfer
bytes transferMac = 4; // 256 bit MAC of the entire file
uint32 numParts = 5; // Number of file parts
uint32 size = 6; // The size of the file; max of 4 mB
uint32 size = 6; // The size of the file; max of 250 kB
float retry = 7; // Used to determine how many times to retry sending
bytes preview = 8; // A preview of the file; max of 4 kB
}
\ No newline at end of file
......@@ -37,8 +37,8 @@ const (
FileTypeMaxLen = 8
// FileMaxSize is the maximum file size that can be transferred. Currently,
// it is set to 4 mB.
FileMaxSize = 4_000_000
// it is set to 250 kB.
FileMaxSize = 250_000
// minPartsSendPerRound is the minimum number of file parts sent each round.
minPartsSendPerRound = 1
......@@ -100,13 +100,13 @@ type Manager struct {
// Queue of parts to send
sendQueue chan queuedPart
// Maximum data transfer speed in bytes per second
maxThroughput int
// Indicates if old transfers saved to storage have been recovered after
// file transfer is closed and reopened
oldTransfersRecovered bool
// File transfer parameters
p Params
// Client interfaces
client *api.Client
store *storage.Session
......@@ -166,8 +166,8 @@ func newManager(client *api.Client, store *storage.Session,
sent: sent,
received: received,
sendQueue: make(chan queuedPart, sendQueueBuffLen),
maxThroughput: p.MaxThroughput,
oldTransfersRecovered: false,
p: p,
client: client,
store: store,
swb: swb,
......
......@@ -7,8 +7,11 @@
package fileTransfer
import "time"
const (
defaultMaxThroughput = 150_000 // 150 kB per second
defaultSendTimeout = 500 * time.Millisecond
)
// Params contains parameters used for file transfer.
......@@ -16,18 +19,17 @@ type Params struct {
// MaxThroughput is the maximum data transfer speed to send file parts (in
// bytes per second)
MaxThroughput int
}
// NewParams generates a new Params object filled with the given parameters.
func NewParams(maxThroughput int) Params {
return Params{
MaxThroughput: maxThroughput,
}
// SendTimeout is the duration, in nanoseconds, before sending on a round
// times out. It is recommended that SendTimeout is not changed from its
// default.
SendTimeout time.Duration
}
// DefaultParams returns a Params object filled with the default values.
func DefaultParams() Params {
return Params{
MaxThroughput: defaultMaxThroughput,
SendTimeout: defaultSendTimeout,
}
}
......@@ -12,23 +12,12 @@ import (
"testing"
)
// Tests that NewParams returns the expected Params object.
func TestNewParams(t *testing.T) {
expected := Params{
MaxThroughput: 42,
}
received := NewParams(expected.MaxThroughput)
if !reflect.DeepEqual(expected, received) {
t.Errorf("Received Params does not match expected."+
"\nexpected: %+v\nreceived: %+v", expected, received)
}
}
// Tests that DefaultParams returns a Params object with the expected defaults.
func TestDefaultParams(t *testing.T) {
expected := Params{MaxThroughput: defaultMaxThroughput}
expected := Params{
MaxThroughput: defaultMaxThroughput,
SendTimeout: defaultSendTimeout,
}
received := DefaultParams()
if !reflect.DeepEqual(expected, received) {
......
......@@ -59,16 +59,13 @@ const (
const (
// Duration to wait for round to finish before timing out.
roundResultsTimeout = 60 * time.Second
roundResultsTimeout = 15 * time.Second
// Duration to wait for send batch to fill before sending partial batch.
pollSleepDuration = 100 * time.Millisecond
// Age when rounds that files were sent from are deleted from the tracker
clearSentRoundsAge = 10 * time.Second
// Duration before sending on a round times out
sendTimeout = 500 * time.Millisecond
)
// sendThread waits on the sendQueue channel for parts to send. Once its
......@@ -84,7 +81,7 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool,
avgSendSize := avgNumMessages * (8192 / 8)
// Calculate the delay needed to reach max throughput
delay := time.Duration((int(time.Second) * avgSendSize) / m.maxThroughput)
delay := time.Duration((int(time.Second) * avgSendSize) / m.p.MaxThroughput)
// Batch of parts read from the queue to be sent
var partList []queuedPart
......@@ -248,7 +245,7 @@ func (m *Manager) sendParts(partList []queuedPart,
// Create cMix parameters with round exclusion list
p := params.GetDefaultCMIX()
p.SendTimeout = sendTimeout
p.SendTimeout = m.p.SendTimeout
p.ExcludedRounds = sentRounds
// Send parts
......@@ -319,24 +316,24 @@ func (m *Manager) buildMessages(partList []queuedPart) (
for i, part := range partList {
// Lookup the transfer by the ID; if the transfer does not exist, then
// print a warning and skip this message
transfer, err := m.sent.GetTransfer(part.tid)
st, err := m.sent.GetTransfer(part.tid)
if err != nil {
jww.WARN.Printf(noSentTransferWarn, part.tid, part.partNum, err)
continue
}
// Generate new cMix message with encrypted file part
cmixMsg, err := m.newCmixMessage(transfer, part.partNum, rng)
cmixMsg, err := m.newCmixMessage(st, part.partNum, rng)
if err == ftStorage.MaxRetriesErr {
jww.DEBUG.Printf("[FT] File transfer %s sent to %s ran out of "+
"retries {parts: %d, fingerprints: %d}",
part.tid, transfer.GetRecipient(), transfer.GetNumParts(),
transfer.GetNumFps())
"retries {parts: %d, numFps: %d/%d}",
part.tid, st.GetRecipient(), st.GetNumParts(),
st.GetNumFps()-st.GetNumAvailableFps(), st.GetNumFps())
// If the max number of retries has been reached, then report the
// error on the callback, delete the transfer, and skip to the next
// message
go transfer.CallProgressCB(errors.Errorf(maxRetriesErr, err))
go st.CallProgressCB(errors.Errorf(maxRetriesErr, err))
continue
} else if err != nil {
// For all other errors, return an error
......@@ -346,13 +343,13 @@ func (m *Manager) buildMessages(partList []queuedPart) (
// Construct TargetedCmixMessage
msg := message.TargetedCmixMessage{
Recipient: transfer.GetRecipient(),
Recipient: st.GetRecipient(),
Message: cmixMsg,
}
// Add to list of messages to send
messages = append(messages, msg)
transfers[part.tid] = transfer
transfers[part.tid] = st
groupedParts[part.tid] = append(groupedParts[part.tid], part.partNum)
partsToResend = append(partsToResend, i)
}
......
......@@ -189,12 +189,15 @@ func newTestManager(sendErr bool, sendChan, sendE2eChan chan message.Receive,
avgNumMessages := (minPartsSendPerRound + maxPartsSendPerRound) / 2
avgSendSize := avgNumMessages * (8192 / 8)
p := DefaultParams()
p.MaxThroughput = int(time.Second) * avgSendSize
m := &Manager{
receiveCB: receiveCB,
sent: sent,
received: received,
sendQueue: make(chan queuedPart, sendQueueBuffLen),
maxThroughput: int(time.Second) * avgSendSize,
p: p,
store: storage.InitTestingSession(t),
swb: switchboard.New(),
net: net,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment