diff --git a/cmd/fileTransfer.go b/cmd/fileTransfer.go index 254e788abf70a69b85df28dc3e5e15ca750ab826..d383011b1f715c22863e46e7b3cc361ede00feb6 100644 --- a/cmd/fileTransfer.go +++ b/cmd/fileTransfer.go @@ -93,11 +93,11 @@ var ftCmd = &cobra.Command{ for done := false; !done; { select { case <-sendDone: - jww.DEBUG.Printf("Finished sending file. Stopping threads " + - "and network follower.") + jww.DEBUG.Printf("[FT] Finished sending file. Stopping " + + "threads and network follower.") done = true case <-receiveDone: - jww.DEBUG.Printf("Finished receiving file. Stopping " + + jww.DEBUG.Printf("[FT] Finished receiving file. Stopping " + "threads and network follower.") done = true } @@ -109,11 +109,11 @@ var ftCmd = &cobra.Command{ // Stop network follower err = client.StopNetworkFollower() if err != nil { - jww.WARN.Printf("Failed to stop network follower: %+v", err) + jww.WARN.Printf("[FT] "+"Failed to stop network follower: %+v", err) } - jww.DEBUG.Print("File transfer finished stopping threads and network " + - "follower.") + jww.DEBUG.Print("[FT] File transfer finished stopping threads and " + + "network follower.") }, } @@ -193,14 +193,16 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, // Get recipient contact from file recipient := getContactFromFile(recipientContactPath) - jww.DEBUG.Printf("Sending file %q of size %d to recipient %s.", - fileName, len(fileData), recipient.ID) + jww.DEBUG.Printf("[FT] Sending file %q to recipient %s: "+ + "{size: %d, type: %q, path: %q, previewPath: %q, preview: %q}", + fileName, recipient.ID, len(fileData), fileType, filePath, + filePreviewPath, filePreviewData) // Create sent progress callback that prints the results progressCB := func(completed bool, sent, arrived, total uint16, t interfaces.FilePartTracker, err error) { - jww.DEBUG.Printf("Sent progress callback for %q "+ - "{completed: %t, sent: %d, arrived: %d, total: %d, err: %v}\n", + jww.DEBUG.Printf("[FT] Sent progress callback for %q "+ + "{completed: %t, sent: %d, arrived: %d, total: %d, err: %v}", fileName, completed, sent, arrived, total, err) if (sent == 0 && arrived == 0) || (arrived == total) || completed || err != nil { @@ -231,17 +233,17 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, // information to the log. func receiveNewFileTransfers(receive chan receivedFtResults, done, quit chan struct{}, m *ft.Manager) { - jww.DEBUG.Print("Starting thread waiting to receive NewFileTransfer " + + jww.DEBUG.Print("[FT] Starting thread waiting to receive NewFileTransfer " + "E2E message.") for { select { case <-quit: - jww.DEBUG.Print("Quitting thread waiting for NewFileTransfer E2E " + - "message.") + jww.DEBUG.Print("[FT] Quitting thread waiting for NewFileTransfer " + + "E2E message.") return case r := <-receive: - jww.DEBUG.Printf("Received new file %q transfer %s of type %q "+ - "from %s of size %d bytes with preview: %q", + jww.DEBUG.Printf("[FT] Received new file %q transfer %s of type "+ + "%q from %s of size %d bytes with preview: %q", r.fileName, r.tid, r.fileType, r.sender, r.size, r.preview) fmt.Printf("Received new file transfer %q of size %d "+ "bytes with preview: %q\n", r.fileName, r.size, r.preview) @@ -262,7 +264,7 @@ func newReceiveProgressCB(tid ftCrypto.TransferID, done chan struct{}, m *ft.Manager) interfaces.ReceivedProgressCallback { return func(completed bool, received, total uint16, t interfaces.FilePartTracker, err error) { - jww.DEBUG.Printf("Receive progress callback for transfer %s "+ + jww.DEBUG.Printf("[FT] Receive progress callback for transfer %s "+ "{completed: %t, received: %d, total: %d, err: %v}", tid, completed, received, total, err) diff --git a/fileTransfer/oldTransferRecovery.go b/fileTransfer/oldTransferRecovery.go index 8c7aa961c12f7d2efa9d55aacb9903e09ee7423c..b6d3c6beb2dd8fdea40353621eae4c1946d835b4 100644 --- a/fileTransfer/oldTransferRecovery.go +++ b/fileTransfer/oldTransferRecovery.go @@ -36,7 +36,6 @@ func (m Manager) oldTransferRecovery(healthyChan chan bool, chanID uint64) { } // Get list of unsent parts and rounds that parts were sent on - // TODO: handle error unsentParts, sentRounds, err := m.sent.GetUnsentPartsAndSentRounds() // Add all unsent parts to the queue diff --git a/fileTransfer/send.go b/fileTransfer/send.go index aad1d93cd79a0ae19383220b27eecdefc5a3c7da..f6c04551fb3ddb6eab858c1ae6638bfd7c5aafc7 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -32,37 +32,41 @@ import ( // Error messages. const ( // Manager.sendParts - sendManyCmixWarn = "Failed to send %d file parts %v via SendManyCMIX: %+v" - setInProgressErr = "Failed to set parts %v to in-progress for transfer %s" - getRoundResultsErr = "Failed to get round results for round %d for file transfers %v: %+v" + sendManyCmixWarn = "[FT] Failed to send %d file parts %v via SendManyCMIX: %+v" + setInProgressErr = "[FT] Failed to set parts %v to in-progress for transfer %s" + getRoundResultsErr = "[FT] Failed to get round results for round %d for file transfers %v: %+v" // Manager.buildMessages - noSentTransferWarn = "Could not get transfer %s for part %d: %+v" + noSentTransferWarn = "[FT] Could not get transfer %s for part %d: %+v" maxRetriesErr = "Stopping message transfer: %+v" - newCmixMessageErr = "Failed to assemble cMix message for file part %d on transfer %s: %+v" + newCmixMessageErr = "[FT] Failed to assemble cMix message for file part %d on transfer %s: %+v" // Manager.makeRoundEventCallback - finishPassNoTransferErr = "Failed to mark in-progress parts as finished on success of round %d for transfer %s: %+v" - finishTransferErr = "Failed to set part(s) to finished for transfer %s: %+v" - finishedEndE2eMsfErr = "Failed to send E2E message to %s on completion of file transfer %s: %+v" - finishFailNoTransferErr = "Failed to requeue in-progress parts on failure of round %d for transfer %s: %+v" - roundFailureWarn = "Failed to send file parts for file transfers %v on round %d: round %s" - unsetInProgressErr = "Failed to remove parts from in-progress list for transfer %s: round %s" - roundFailureCbErr = "Failed to send parts %d for transfer %s on round %d: round %s" + finishPassNoTransferErr = "[FT] Failed to mark in-progress parts as finished on success of round %d for transfer %s: %+v" + finishTransferErr = "[FT] Failed to set part(s) to finished for transfer %s: %+v" + finishedEndE2eMsfErr = "[FT] Failed to send E2E message to %s on completion of file transfer %s: %+v" + roundFailureWarn = "[FT] Failed to send file parts for file transfers %v on round %d: round %s" + finishFailNoTransferErr = "[FT] Failed to requeue in-progress parts on failure of round %d for transfer %s: %+v" + unsetInProgressErr = "[FT] Failed to remove parts from in-progress list for transfer %s: round %s" // Manager.sendEndE2eMessage endE2eGetPartnerErr = "failed to get file transfer partner %s: %+v" endE2eSendErr = "failed to send end file transfer message: %+v" // getRandomNumParts - getRandomNumPartsRandPanic = "Failed to generate random number of file parts to send: %+v" + getRandomNumPartsRandPanic = "[FT] Failed to generate random number of file parts to send: %+v" ) -// Duration to wait for round to finish before timing out. -const roundResultsTimeout = 60 * time.Second +const ( + // Duration to wait for round to finish before timing out. + roundResultsTimeout = 60 * time.Second + + // Duration to wait for send batch to fill before sending partial batch. + pollSleepDuration = 100 * time.Millisecond -// Duration to wait for send batch to fill before sending partial batch. -const pollSleepDuration = 100 * time.Millisecond + // Age when rounds that files were sent from are deleted from the tracker + clearSentRoundsAge = 10 * time.Second +) // sendThread waits on the sendQueue channel for parts to send. Once its // receives a random number between 1 and 11 of file parts, they are encrypted, @@ -70,7 +74,7 @@ const pollSleepDuration = 100 * time.Millisecond // added to the end of the queue. func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, healthChanID uint64, getNumParts getRngNum) { - jww.DEBUG.Print("Starting file part sending thread.") + jww.DEBUG.Print("[FT] Starting file part sending thread.") // Calculate the average amount of data sent via SendManyCMIX avgNumMessages := (minPartsSendPerRound + maxPartsSendPerRound) / 2 @@ -82,11 +86,19 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, // Batch of parts read from the queue to be sent var partList []queuedPart + // Create new sent round tracker that tracks which recent rounds file parts + // were sent on so that they can be avoided on subsequent sends + sentRounds := newSentRoundTracker(clearSentRoundsAge) + // The size of each batch var numParts int + // Timer triggers sending of unfilled batch to prevent hanging when the + // file part queue has fewer items then the batch size timer := time.NewTimer(pollSleepDuration) - lastSend := time.Time{} + + // Tracks time that the last send completed + var lastSend time.Time // Loop forever polling the sendQueue channel for new file parts to send. If // the channel is empty, then polling is suspended for pollSleepDuration. If @@ -103,13 +115,13 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, case healthy := <-healthChan: // If the network is unhealthy, wait until it becomes healthy if !healthy { - jww.TRACE.Print("Suspending file part sending thread: " + + jww.TRACE.Print("[FT] Suspending file part sending thread: " + "network is unhealthy.") } for !healthy { healthy = <-healthChan } - jww.TRACE.Print("File part sending thread: network is healthy.") + jww.TRACE.Print("[FT] File part sending thread: network is healthy.") case part := <-m.sendQueue: // When a part is received from the queue, add it to the list of // parts to be sent @@ -127,7 +139,7 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, // If the batch is full, then send the parts if len(partList) == numParts { quit := m.handleSend( - &partList, &lastSend, delay, stop, healthChan, healthChanID) + &partList, &lastSend, delay, stop, healthChanID, sentRounds) if quit { return } @@ -144,7 +156,7 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, } quit := m.handleSend( - &partList, &lastSend, delay, stop, healthChan, healthChanID) + &partList, &lastSend, delay, stop, healthChanID, sentRounds) if quit { return } @@ -157,7 +169,8 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, func (m *Manager) closeSendThread(partList []queuedPart, stop *stoppable.Single, healthChanID uint64) { // Exit the thread if the stoppable is triggered - jww.DEBUG.Print("Stopping file part sending thread: stoppable triggered.") + jww.DEBUG.Print("[FT] Stopping file part sending thread: stoppable " + + "triggered.") // Add all the unsent parts back in the queue for _, part := range partList { @@ -177,13 +190,13 @@ func (m *Manager) closeSendThread(partList []queuedPart, stop *stoppable.Single, // Returns true if the stoppable has been triggered and the sending thread // should quit. func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, - delay time.Duration, stop *stoppable.Single, _ chan bool, - healthChanID uint64) bool { + delay time.Duration, stop *stoppable.Single, healthChanID uint64, + sentRounds *sentRoundTracker) bool { // Bandwidth limiter: wait to send until the delay has been reached so that // the bandwidth is limited to the maximum throughput if netTime.Since(*lastSend) < delay { waitingTime := delay - netTime.Since(*lastSend) - jww.TRACE.Printf("Suspending file part sending: "+ + jww.TRACE.Printf("[FT] Suspending file part sending: "+ "bandwidth limit reached; waiting %s to send.", waitingTime) select { case <-stop.Quit(): @@ -191,28 +204,19 @@ func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, m.closeSendThread(*partList, stop, healthChanID) return true - // TODO: is this needed? - // case healthy := <-healthChan: - // // If the network is unhealthy, wait until it becomes healthy - // if !healthy { - // jww.TRACE.Print("Suspending file part sending: " + - // "network is unhealthy.") - // } - // for !healthy { - // healthy = <-healthChan - // } - // jww.TRACE.Print("File part sending continuing: network is healthy.") case <-time.NewTimer(delay - netTime.Since(*lastSend)).C: } } // Send all the messages - err := m.sendParts(*partList) - *lastSend = netTime.Now() + err := m.sendParts(*partList, sentRounds) if err != nil { jww.FATAL.Panic(err) } + // Update the timestamp of the send + *lastSend = netTime.Now() + // Clear partList once done *partList = nil @@ -221,7 +225,8 @@ func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, // sendParts handles the composing and sending of a cMix message for each part // in the list. All errors returned are fatal errors. -func (m *Manager) sendParts(partList []queuedPart) error { +func (m *Manager) sendParts(partList []queuedPart, + sentRounds *sentRoundTracker) error { // Build cMix messages messages, transfers, groupedParts, partsToResend, err := @@ -235,8 +240,15 @@ func (m *Manager) sendParts(partList []queuedPart) error { return nil } + // Clear all old rounds from the sent rounds list + sentRounds.removeOldRounds() + + // Create cMix parameters with round exclusion list + p := params.GetDefaultCMIX() + p.ExcludedRounds = sentRounds + // Send parts - rid, _, err := m.net.SendManyCMIX(messages, params.GetDefaultCMIX()) + rid, _, err := m.net.SendManyCMIX(messages, p) if err != nil { // If an error occurs, then print a warning and add the file parts back // to the queue to try sending again @@ -416,6 +428,7 @@ func (m *Manager) makeRoundEventCallback( }(tid, transfer.GetRecipient()) } + // Call progress callback after change in progress transfer.CallProgressCB(nil) } } else { @@ -438,10 +451,8 @@ func (m *Manager) makeRoundEventCallback( jww.ERROR.Printf(unsetInProgressErr, tid, roundResult) } - // Return the error on the progress callback - cbErr := errors.Errorf( - roundFailureCbErr, partsToResend, tid, rid, roundResult) - transfer.CallProgressCB(cbErr) + // Call progress callback after change in progress + transfer.CallProgressCB(nil) // Add all the unsent parts back in the queue m.queueParts(tid, partsToResend) @@ -494,9 +505,9 @@ func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { // If a single partition of the end file transfer message does not transmit, // then the partner will not be able to read the confirmation if !success { - jww.ERROR.Printf("Sending E2E message %s to end file transfer with "+ - "%s failed to transmit %d/%d partitions: %d round failures, %d "+ - "timeouts", recipient, e2eMsgID, numRoundFail+numTimeOut, + jww.ERROR.Printf("[FT] Sending E2E message %s to end file transfer "+ + "with %s failed to transmit %d/%d partitions: %d round failures, "+ + "%d timeouts", recipient, e2eMsgID, numRoundFail+numTimeOut, len(rounds), numRoundFail, numTimeOut) m.store.GetCriticalMessages().Failed(sendMsg, e2eParams) return nil @@ -505,8 +516,8 @@ func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { // Otherwise, the transmission is a success and this should be denoted in // the session and the log m.store.GetCriticalMessages().Succeeded(sendMsg, e2eParams) - jww.INFO.Printf("Sending of message %s informing %s that a transfer ended"+ - "successful.", e2eMsgID, recipient) + jww.INFO.Printf("[FT] Sending of message %s informing %s that a transfer "+ + "ended successful.", e2eMsgID, recipient) return nil } diff --git a/fileTransfer/send_test.go b/fileTransfer/send_test.go index 94897c3a2de7ed141c29c0b379101c9ee7904929..45c273e90bb41657c4ff9042808bcb1497bf93be 100644 --- a/fileTransfer/send_test.go +++ b/fileTransfer/send_test.go @@ -282,7 +282,7 @@ func TestManager_sendParts(t *testing.T) { queuedParts[i], queuedParts[j] = queuedParts[j], queuedParts[i] }) - err := m.sendParts(queuedParts) + err := m.sendParts(queuedParts, newSentRoundTracker(clearSentRoundsAge)) if err != nil { t.Errorf("sendParts returned an error: %+v", err) } @@ -356,7 +356,7 @@ func TestManager_sendParts_SendManyCmixError(t *testing.T) { } } - err := m.sendParts(queuedParts) + err := m.sendParts(queuedParts, newSentRoundTracker(clearSentRoundsAge)) if err != nil { t.Errorf("sendParts returned an error: %+v", err) } @@ -402,7 +402,7 @@ func TestManager_sendParts_RoundResultsError(t *testing.T) { } expectedErr := fmt.Sprintf(getRoundResultsErr, 0, tIDs, grrErr) - err := m.sendParts(queuedParts) + err := m.sendParts(queuedParts, newSentRoundTracker(clearSentRoundsAge)) if err == nil || err.Error() != expectedErr { t.Errorf("sendParts did not return the expected error when "+ "GetRoundResults should have returned an error."+ @@ -784,7 +784,8 @@ func TestManager_makeRoundEventCallback(t *testing.T) { } // Tests that Manager.makeRoundEventCallback returns a callback that calls the -// progress callback with the correct error when a round fails. +// progress callback with no parts sent on round failure. Also checks that the +// file parts were added back into the queue. func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { m := newTestManager(false, nil, nil, nil, nil, t) @@ -806,6 +807,7 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { if err != nil { t.Errorf("Failed to add new transfer: %+v", err) } + partsToSend := []uint16{0, 1, 2, 3} done0, done1 := make(chan bool), make(chan bool) @@ -819,11 +821,11 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { case 0: done0 <- true case 1: - expectedErr := fmt.Sprintf( - roundFailureCbErr, partsToSend, tid, rid, api.Failed) - if r.err == nil || !strings.Contains(r.err.Error(), expectedErr) { - t.Errorf("Callback received unexpected error when round "+ - "failed.\nexpected: %s\nreceived: %v", expectedErr, r.err) + expectedResult := sentProgressResults{ + false, 0, 0, uint16(len(partsToSend)), r.tracker, nil} + if !reflect.DeepEqual(expectedResult, r) { + t.Errorf("Callback returned unexpected values."+ + "\nexpected: %+v\nreceived: %+v", expectedResult, r) } done1 <- true } @@ -832,9 +834,11 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { }() // Create queued part list add parts + partsMap := make(map[uint16]queuedPart, len(partsToSend)) queuedParts := make([]queuedPart, len(partsToSend)) for i := range queuedParts { queuedParts[i] = queuedPart{tid, uint16(i)} + partsMap[uint16(i)] = queuedParts[i] } _, transfers, groupedParts, _, err := m.buildMessages(queuedParts) @@ -852,6 +856,21 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { roundEventCB(false, false, map[id.Round]api.RoundResult{rid: api.Failed}) <-done1 + + // Check that the parts were added to the queue + for i := range partsToSend { + select { + case <-time.NewTimer(10 * time.Millisecond).C: + t.Errorf("Timed out waiting for part %d.", i) + case r := <-m.sendQueue: + if partsMap[r.partNum] != r { + t.Errorf("Incorrect part in queue (%d)."+ + "\nexpected: %+v\nreceived: %+v", i, partsMap[r.partNum], r) + } else { + delete(partsMap, r.partNum) + } + } + } } // Tests that Manager.sendEndE2eMessage sends an E2E message with the expected diff --git a/fileTransfer/sentRoundTracker.go b/fileTransfer/sentRoundTracker.go index 05ce4b1c3f84369344236db55cbcbd00c1e3f762..80fa1da210befa6d3cc7cec698d197259a903ecb 100644 --- a/fileTransfer/sentRoundTracker.go +++ b/fileTransfer/sentRoundTracker.go @@ -15,7 +15,6 @@ package fileTransfer import ( - jww "github.com/spf13/jwalterweatherman" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/netTime" "sync" @@ -66,8 +65,6 @@ func (srt *sentRoundTracker) Insert(rid id.Round) { srt.mux.Lock() defer srt.mux.Unlock() - jww.DEBUG.Printf("[FT]\tInsert round %d into tracker at time %s\n", rid, timeNow.Format("03:04:05.9999999 PM")) - srt.rounds[rid] = timeNow }