Skip to content
Snippets Groups Projects
Commit 278c7ad2 authored by Jono Wenger's avatar Jono Wenger
Browse files

Add cmix params to file transfer params

parent c2e08173
Branches
Tags
4 merge requests!510Release,!226WIP: Api2.0,!210XX-3880 / Generic File Transfer,!207WIP: Client Restructure
......@@ -54,7 +54,7 @@ func Test_callbackTracker_call(t *testing.T) {
if r != nil {
t.Errorf("Received error: %+v", r)
}
case <-time.After(25 * time.Millisecond):
case <-time.After(35 * time.Millisecond):
t.Error("Timed out waiting for callback.")
}
......@@ -66,7 +66,7 @@ func Test_callbackTracker_call(t *testing.T) {
case <-cbChan:
t.Error("Callback called too soon.")
case <-time.After(25 * time.Millisecond):
case <-time.After(35 * time.Millisecond):
ct.mux.RLock()
if !ct.scheduled {
t.Error("Callback is not scheduled when it should be.")
......@@ -99,7 +99,7 @@ func Test_callbackTracker_call(t *testing.T) {
if !ct.complete {
t.Error("Callback is not marked complete when it should be.")
}
case <-time.After(ct.period + 15*time.Millisecond):
case <-time.After(ct.period + 25*time.Millisecond):
t.Errorf("Callback not called after period %s.",
ct.period+15*time.Millisecond)
}
......
......@@ -7,7 +7,10 @@
package fileTransfer2
import "time"
import (
"gitlab.com/elixxir/client/cmix"
"time"
)
const (
defaultMaxThroughput = 150_000 // 150 kB per second
......@@ -24,6 +27,9 @@ type Params struct {
// times out. It is recommended that SendTimeout is not changed from its
// default.
SendTimeout time.Duration
// Cmix are the parameters used when sending a cMix message.
Cmix cmix.CMIXParams
}
// DefaultParams returns a Params object filled with the default values.
......@@ -31,5 +37,6 @@ func DefaultParams() Params {
return Params{
MaxThroughput: defaultMaxThroughput,
SendTimeout: defaultSendTimeout,
Cmix: cmix.GetDefaultCMIXParams(),
}
}
......@@ -8,6 +8,7 @@
package fileTransfer2
import (
"gitlab.com/elixxir/client/cmix"
"reflect"
"testing"
)
......@@ -17,8 +18,10 @@ func TestDefaultParams(t *testing.T) {
expected := Params{
MaxThroughput: defaultMaxThroughput,
SendTimeout: defaultSendTimeout,
Cmix: cmix.GetDefaultCMIXParams(),
}
received := DefaultParams()
received.Cmix.Stop = expected.Cmix.Stop
if !reflect.DeepEqual(expected, received) {
t.Errorf("Received Params does not match expected."+
......
......@@ -51,20 +51,24 @@ const (
// threads.
func (m *manager) startSendingWorkerPool(multiStop *stoppable.Multi) {
// Set up cMix sending parameters
params := cmix.GetDefaultCMIXParams()
params.SendTimeout = m.params.SendTimeout
params.ExcludedRounds = sentRoundTracker.NewManager(clearSentRoundsAge)
params.DebugTag = cMixDebugTag
m.params.Cmix.SendTimeout = m.params.SendTimeout
m.params.Cmix.ExcludedRounds =
sentRoundTracker.NewManager(clearSentRoundsAge)
if m.params.Cmix.DebugTag == cmix.DefaultDebugTag ||
m.params.Cmix.DebugTag == "" {
m.params.Cmix.DebugTag = cMixDebugTag
}
for i := 0; i < workerPoolThreads; i++ {
stop := stoppable.NewSingle(sendThreadStoppableName + strconv.Itoa(i))
multiStop.Add(stop)
go m.sendingThread(params, stop)
go m.sendingThread(stop)
}
}
// sendingThread sends part packets that become available oin the send queue.
func (m *manager) sendingThread(cMixParams cmix.CMIXParams, stop *stoppable.Single) {
func (m *manager) sendingThread(stop *stoppable.Single) {
healthChan := make(chan bool, 10)
healthChanID := m.cmix.AddHealthCallback(func(b bool) { healthChan <- b })
for {
......@@ -80,13 +84,13 @@ func (m *manager) sendingThread(cMixParams cmix.CMIXParams, stop *stoppable.Sing
healthy = <-healthChan
}
case packet := <-m.sendQueue:
m.sendCmix(packet, cMixParams)
m.sendCmix(packet)
}
}
}
// sendCmix sends the parts in the packet via Client.SendMany.
func (m *manager) sendCmix(packet []store.Part, cMixParams cmix.CMIXParams) {
func (m *manager) sendCmix(packet []store.Part) {
// validParts will contain all parts in the original packet excluding those
// that return an error from GetEncryptedPart
validParts := make([]store.Part, 0, len(packet))
......@@ -115,12 +119,12 @@ func (m *manager) sendCmix(packet []store.Part, cMixParams cmix.CMIXParams) {
}
// Clear all old rounds from the sent rounds list
cMixParams.ExcludedRounds.(*sentRoundTracker.Manager).RemoveOldRounds()
m.params.Cmix.ExcludedRounds.(*sentRoundTracker.Manager).RemoveOldRounds()
jww.DEBUG.Printf("[FT] Sending %d file parts via SendManyCMIX",
len(messages))
rid, _, err := m.cmix.SendMany(messages, cMixParams)
rid, _, err := m.cmix.SendMany(messages, m.params.Cmix)
if err != nil {
jww.WARN.Printf("[FT] Failed to send %d file parts via "+
"SendManyCMIX: %+v", len(messages), err)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment