Select Git revision
service.go
send.go 5.69 KiB
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package fileTransfer2
import (
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/client/fileTransfer2/sentRoundTracker"
"gitlab.com/elixxir/client/fileTransfer2/store"
"gitlab.com/elixxir/client/stoppable"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/xx_network/primitives/id"
"strconv"
"time"
)
// Error messages.
const (
// generateRandomPacketSize
getRandomNumPartsRandPanic = "[FT] Failed to generate random number of file parts to send: %+v"
// manager.sendCmix
errNoMoreRetries = "file transfer failed: ran our of retries."
)
const (
// Duration to wait for round to finish before timing out.
roundResultsTimeout = 15 * time.Second
// Age when rounds that files were sent from are deleted from the tracker.
clearSentRoundsAge = 10 * time.Second
// Number of concurrent sending threads
workerPoolThreads = 4
// Tag that prints with cMix sending logs.
cMixDebugTag = "FT.Part"
// Prefix used for the name of a stoppable used for a sending thread
sendThreadStoppableName = "FilePartSendingThread#"
)
// startSendingWorkerPool initialises a worker pool of file part sending
// threads.
func (m *manager) startSendingWorkerPool(multiStop *stoppable.Multi) {
// Set up cMix sending parameters
m.params.Cmix.SendTimeout = m.params.SendTimeout
m.params.Cmix.ExcludedRounds =
sentRoundTracker.NewManager(clearSentRoundsAge)
if m.params.Cmix.DebugTag == cmix.DefaultDebugTag ||
m.params.Cmix.DebugTag == "" {
m.params.Cmix.DebugTag = cMixDebugTag
}
for i := 0; i < workerPoolThreads; i++ {
stop := stoppable.NewSingle(sendThreadStoppableName + strconv.Itoa(i))
multiStop.Add(stop)
go m.sendingThread(stop)
}
}
// sendingThread sends part packets that become available oin the send queue.