diff --git a/cmd/fileTransfer.go b/cmd/fileTransfer.go index 254e788abf70a69b85df28dc3e5e15ca750ab826..d383011b1f715c22863e46e7b3cc361ede00feb6 100644 --- a/cmd/fileTransfer.go +++ b/cmd/fileTransfer.go @@ -93,11 +93,11 @@ var ftCmd = &cobra.Command{ for done := false; !done; { select { case <-sendDone: - jww.DEBUG.Printf("Finished sending file. Stopping threads " + - "and network follower.") + jww.DEBUG.Printf("[FT] Finished sending file. Stopping " + + "threads and network follower.") done = true case <-receiveDone: - jww.DEBUG.Printf("Finished receiving file. Stopping " + + jww.DEBUG.Printf("[FT] Finished receiving file. Stopping " + "threads and network follower.") done = true } @@ -109,11 +109,11 @@ var ftCmd = &cobra.Command{ // Stop network follower err = client.StopNetworkFollower() if err != nil { - jww.WARN.Printf("Failed to stop network follower: %+v", err) + jww.WARN.Printf("[FT] "+"Failed to stop network follower: %+v", err) } - jww.DEBUG.Print("File transfer finished stopping threads and network " + - "follower.") + jww.DEBUG.Print("[FT] File transfer finished stopping threads and " + + "network follower.") }, } @@ -193,14 +193,16 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, // Get recipient contact from file recipient := getContactFromFile(recipientContactPath) - jww.DEBUG.Printf("Sending file %q of size %d to recipient %s.", - fileName, len(fileData), recipient.ID) + jww.DEBUG.Printf("[FT] Sending file %q to recipient %s: "+ + "{size: %d, type: %q, path: %q, previewPath: %q, preview: %q}", + fileName, recipient.ID, len(fileData), fileType, filePath, + filePreviewPath, filePreviewData) // Create sent progress callback that prints the results progressCB := func(completed bool, sent, arrived, total uint16, t interfaces.FilePartTracker, err error) { - jww.DEBUG.Printf("Sent progress callback for %q "+ - "{completed: %t, sent: %d, arrived: %d, total: %d, err: %v}\n", + jww.DEBUG.Printf("[FT] Sent progress callback for %q "+ + "{completed: %t, sent: %d, arrived: %d, total: %d, err: %v}", fileName, completed, sent, arrived, total, err) if (sent == 0 && arrived == 0) || (arrived == total) || completed || err != nil { @@ -231,17 +233,17 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, // information to the log. func receiveNewFileTransfers(receive chan receivedFtResults, done, quit chan struct{}, m *ft.Manager) { - jww.DEBUG.Print("Starting thread waiting to receive NewFileTransfer " + + jww.DEBUG.Print("[FT] Starting thread waiting to receive NewFileTransfer " + "E2E message.") for { select { case <-quit: - jww.DEBUG.Print("Quitting thread waiting for NewFileTransfer E2E " + - "message.") + jww.DEBUG.Print("[FT] Quitting thread waiting for NewFileTransfer " + + "E2E message.") return case r := <-receive: - jww.DEBUG.Printf("Received new file %q transfer %s of type %q "+ - "from %s of size %d bytes with preview: %q", + jww.DEBUG.Printf("[FT] Received new file %q transfer %s of type "+ + "%q from %s of size %d bytes with preview: %q", r.fileName, r.tid, r.fileType, r.sender, r.size, r.preview) fmt.Printf("Received new file transfer %q of size %d "+ "bytes with preview: %q\n", r.fileName, r.size, r.preview) @@ -262,7 +264,7 @@ func newReceiveProgressCB(tid ftCrypto.TransferID, done chan struct{}, m *ft.Manager) interfaces.ReceivedProgressCallback { return func(completed bool, received, total uint16, t interfaces.FilePartTracker, err error) { - jww.DEBUG.Printf("Receive progress callback for transfer %s "+ + jww.DEBUG.Printf("[FT] Receive progress callback for transfer %s "+ "{completed: %t, received: %d, total: %d, err: %v}", tid, completed, received, total, err) diff --git a/fileTransfer/oldTransferRecovery.go b/fileTransfer/oldTransferRecovery.go index 8c7aa961c12f7d2efa9d55aacb9903e09ee7423c..2957e1b156835be260c470910b8767d0653a8b4b 100644 --- a/fileTransfer/oldTransferRecovery.go +++ b/fileTransfer/oldTransferRecovery.go @@ -16,8 +16,8 @@ import ( // Error messages. const ( - oldTransfersRoundResultsErr = "failed to recover round information for " + - "%d rounds for old file transfers after %d attempts" + oldTransfersRoundResultsErr = "[FT] failed to recover round information " + + "for %d rounds for old file transfers after %d attempts" ) // roundResultsMaxAttempts is the maximum number of attempts to get round @@ -30,13 +30,12 @@ func (m Manager) oldTransferRecovery(healthyChan chan bool, chanID uint64) { // Exit if old transfers have already been recovered if m.oldTransfersRecovered { - jww.DEBUG.Printf("Old file transfer recovery thread not starting: " + - "none to recover (app was not closed)") + jww.DEBUG.Printf("[FT] Old file transfer recovery thread not " + + "starting: none to recover (app was not closed)") return } // Get list of unsent parts and rounds that parts were sent on - // TODO: handle error unsentParts, sentRounds, err := m.sent.GetUnsentPartsAndSentRounds() // Add all unsent parts to the queue @@ -45,11 +44,18 @@ func (m Manager) oldTransferRecovery(healthyChan chan bool, chanID uint64) { } if err != nil { - jww.ERROR.Printf("Failed to get sent rounds: %+v", err) + jww.ERROR.Printf("[FT] Failed to get sent rounds: %+v", err) m.net.GetHealthTracker().RemoveChannel(chanID) return } + // Return if there are no parts to recover + if len(sentRounds) == 0 { + jww.DEBUG.Print( + "[FT] No in-progress rounds from old transfers to recover.") + return + } + // Update parts that were sent by looking up the status of the rounds they // were sent on go func(healthyChan chan bool, chanID uint64, @@ -72,7 +78,7 @@ func (m Manager) updateSentRounds(healthyChan chan bool, // Tracks the number of attempts to get round results var getRoundResultsAttempts int - jww.DEBUG.Print("Starting old file transfer recovery thread.") + jww.DEBUG.Print("[FT] Starting old file transfer recovery thread.") // Wait for network to be healthy to attempt to get round states for getRoundResultsAttempts < roundResultsMaxAttempts { @@ -80,13 +86,13 @@ func (m Manager) updateSentRounds(healthyChan chan bool, case healthy := <-healthyChan: // If the network is unhealthy, wait until it becomes healthy if !healthy { - jww.DEBUG.Print("Suspending old file transfer recovery " + + jww.DEBUG.Print("[FT] Suspending old file transfer recovery " + "thread: network is unhealthy.") } for !healthy { healthy = <-healthyChan } - jww.DEBUG.Print("Old file transfer recovery thread: " + + jww.DEBUG.Print("[FT] Old file transfer recovery thread: " + "network is healthy.") // Register callback to get Round results and retry on error @@ -94,12 +100,13 @@ func (m Manager) updateSentRounds(healthyChan chan bool, err := m.getRoundResults(roundList, roundResultsTimeout, m.makeRoundEventCallback(sentRounds)) if err != nil { - jww.WARN.Printf("Failed to get round results for old "+ + jww.WARN.Printf("[FT] Failed to get round results for old "+ "transfers for rounds %d (attempt %d/%d): %+v", getRoundResultsAttempts, roundResultsMaxAttempts, roundList, err) } else { - jww.INFO.Printf("Successfully recovered old file transfers.") + jww.INFO.Printf( + "[FT] Successfully recovered old file transfers.") return nil } getRoundResultsAttempts++ diff --git a/fileTransfer/receive.go b/fileTransfer/receive.go index f1b2257dc43d384332f3d970caa4fda0b118d317..020f21bf39e6bce001c4f0951851578e28d21038 100644 --- a/fileTransfer/receive.go +++ b/fileTransfer/receive.go @@ -25,12 +25,12 @@ const ( // receive runs a loop that receives file message parts and stores them in their // appropriate transfer. func (m *Manager) receive(rawMsgs chan message.Receive, stop *stoppable.Single) { - jww.DEBUG.Print("Starting file part reception thread.") + jww.DEBUG.Print("[FT] Starting file part reception thread.") for { select { case <-stop.Quit(): - jww.DEBUG.Print("Stopping file part reception thread: stoppable " + + jww.DEBUG.Print("[FT] Stopping file part reception thread: stoppable " + "triggered.") stop.ToStopped() return @@ -41,9 +41,9 @@ func (m *Manager) receive(rawMsgs chan message.Receive, stop *stoppable.Single) // which means this message is not of the correct type and will // be ignored if strings.Contains(err.Error(), "fingerprint") { - jww.INFO.Print(err) + jww.INFO.Printf("[FT] %+v", err) } else { - jww.WARN.Print(err) + jww.WARN.Printf("[FT] %+v", err) } continue } diff --git a/fileTransfer/receiveNew.go b/fileTransfer/receiveNew.go index 47c873c807d3044801f6168e6bda4e4bcbcd83b5..4aadd9d3341f1b5056c45bb01124a3941eb5574b 100644 --- a/fileTransfer/receiveNew.go +++ b/fileTransfer/receiveNew.go @@ -27,26 +27,27 @@ const ( // messages. func (m *Manager) receiveNewFileTransfer(rawMsgs chan message.Receive, stop *stoppable.Single) { - jww.DEBUG.Print("Starting new file transfer message reception thread.") + jww.DEBUG.Print("[FT] Starting new file transfer message reception thread.") for { select { case <-stop.Quit(): - jww.DEBUG.Print("Stopping new file transfer message reception " + - "thread: stoppable triggered") + jww.DEBUG.Print("[FT] Stopping new file transfer message " + + "reception thread: stoppable triggered") stop.ToStopped() return case receivedMsg := <-rawMsgs: - jww.DEBUG.Print("New file transfer message thread received message.") + jww.DEBUG.Print( + "[FT] New file transfer message thread received message.") tid, fileName, fileType, sender, size, preview, err := m.readNewFileTransferMessage(receivedMsg) if err != nil { if err.Error() == receiveMessageTypeErr { - jww.INFO.Printf("Failed to read message as new file "+ + jww.INFO.Printf("[FT] Failed to read message as new file "+ "transfer message: %+v", err) } else { - jww.WARN.Printf("Failed to read message as new file "+ + jww.WARN.Printf("[FT] Failed to read message as new file "+ "transfer message: %+v", err) } continue diff --git a/fileTransfer/send.go b/fileTransfer/send.go index 3dfe633caca4fa886b2326a85ee997bae6a5a3a9..b92133a4ca870a59cc7df42600f227cf1fedf679 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -32,37 +32,38 @@ import ( // Error messages. const ( // Manager.sendParts - sendManyCmixWarn = "Failed to send %d file parts %v via SendManyCMIX: %+v" - setInProgressErr = "Failed to set parts %v to in-progress for transfer %s" - getRoundResultsErr = "Failed to get round results for round %d for file transfers %v: %+v" + sendManyCmixWarn = "[FT] Failed to send %d file parts %v via SendManyCMIX: %+v" + setInProgressErr = "[FT] Failed to set parts %v to in-progress for transfer %s" + getRoundResultsErr = "[FT] Failed to get round results for round %d for file transfers %v: %+v" // Manager.buildMessages - noSentTransferWarn = "Could not get transfer %s for part %d: %+v" + noSentTransferWarn = "[FT] Could not get transfer %s for part %d: %+v" maxRetriesErr = "Stopping message transfer: %+v" - newCmixMessageErr = "Failed to assemble cMix message for file part %d on transfer %s: %+v" + newCmixMessageErr = "[FT] Failed to assemble cMix message for file part %d on transfer %s: %+v" // Manager.makeRoundEventCallback - finishPassNoTransferErr = "Failed to mark in-progress parts as finished on success of round %d for transfer %s: %+v" - finishTransferErr = "Failed to set part(s) to finished for transfer %s: %+v" - finishedEndE2eMsfErr = "Failed to send E2E message to %s on completion of file transfer %s: %+v" - finishFailNoTransferErr = "Failed to requeue in-progress parts on failure of round %d for transfer %s: %+v" - roundFailureWarn = "Failed to send file parts for file transfers %v on round %d: round %s" - unsetInProgressErr = "Failed to remove parts from in-progress list for transfer %s: round %s" - roundFailureCbErr = "Failed to send parts %d for transfer %s on round %d: round %s" + finishPassNoTransferErr = "[FT] Failed to mark in-progress parts as finished on success of round %d for transfer %s: %+v" + finishTransferErr = "[FT] Failed to set part(s) to finished for transfer %s: %+v" + finishedEndE2eMsfErr = "[FT] Failed to send E2E message to %s on completion of file transfer %s: %+v" + roundFailureWarn = "[FT] Failed to send file parts for file transfers %v on round %d: round %s" + finishFailNoTransferErr = "[FT] Failed to requeue in-progress parts on failure of round %d for transfer %s: %+v" + unsetInProgressErr = "[FT] Failed to remove parts from in-progress list for transfer %s: round %s" // Manager.sendEndE2eMessage endE2eGetPartnerErr = "failed to get file transfer partner %s: %+v" endE2eSendErr = "failed to send end file transfer message: %+v" // getRandomNumParts - getRandomNumPartsRandPanic = "Failed to generate random number of file parts to send: %+v" + getRandomNumPartsRandPanic = "[FT] Failed to generate random number of file parts to send: %+v" ) -// Duration to wait for round to finish before timing out. -const roundResultsTimeout = 60 * time.Second +const ( + // Duration to wait for round to finish before timing out. + roundResultsTimeout = 60 * time.Second -// Duration to wait for send batch to fill before sending partial batch. -const pollSleepDuration = 100 * time.Millisecond + // Duration to wait for send batch to fill before sending partial batch. + pollSleepDuration = 100 * time.Millisecond +) // sendThread waits on the sendQueue channel for parts to send. Once its // receives a random number between 1 and 11 of file parts, they are encrypted, @@ -70,7 +71,7 @@ const pollSleepDuration = 100 * time.Millisecond // added to the end of the queue. func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, healthChanID uint64, getNumParts getRngNum) { - jww.DEBUG.Print("Starting file part sending thread.") + jww.DEBUG.Print("[FT] Starting file part sending thread.") // Calculate the average amount of data sent via SendManyCMIX avgNumMessages := (minPartsSendPerRound + maxPartsSendPerRound) / 2 @@ -85,8 +86,12 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, // The size of each batch var numParts int + // Timer triggers sending of unfilled batch to prevent hanging when the + // file part queue has fewer items then the batch size timer := time.NewTimer(pollSleepDuration) - lastSend := time.Time{} + + // Tracks time that the last send completed + var lastSend time.Time // Loop forever polling the sendQueue channel for new file parts to send. If // the channel is empty, then polling is suspended for pollSleepDuration. If @@ -103,13 +108,13 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, case healthy := <-healthChan: // If the network is unhealthy, wait until it becomes healthy if !healthy { - jww.TRACE.Print("Suspending file part sending thread: " + + jww.TRACE.Print("[FT] Suspending file part sending thread: " + "network is unhealthy.") } for !healthy { healthy = <-healthChan } - jww.TRACE.Print("File part sending thread: network is healthy.") + jww.TRACE.Print("[FT] File part sending thread: network is healthy.") case part := <-m.sendQueue: // When a part is received from the queue, add it to the list of // parts to be sent @@ -127,7 +132,7 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, // If the batch is full, then send the parts if len(partList) == numParts { quit := m.handleSend( - &partList, &lastSend, delay, stop, healthChan, healthChanID) + &partList, &lastSend, delay, stop, healthChanID) if quit { return } @@ -144,7 +149,7 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, } quit := m.handleSend( - &partList, &lastSend, delay, stop, healthChan, healthChanID) + &partList, &lastSend, delay, stop, healthChanID) if quit { return } @@ -157,7 +162,8 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, func (m *Manager) closeSendThread(partList []queuedPart, stop *stoppable.Single, healthChanID uint64) { // Exit the thread if the stoppable is triggered - jww.DEBUG.Print("Stopping file part sending thread: stoppable triggered.") + jww.DEBUG.Print("[FT] Stopping file part sending thread: stoppable " + + "triggered.") // Add all the unsent parts back in the queue for _, part := range partList { @@ -177,13 +183,12 @@ func (m *Manager) closeSendThread(partList []queuedPart, stop *stoppable.Single, // Returns true if the stoppable has been triggered and the sending thread // should quit. func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, - delay time.Duration, stop *stoppable.Single, _ chan bool, - healthChanID uint64) bool { + delay time.Duration, stop *stoppable.Single, healthChanID uint64) bool { // Bandwidth limiter: wait to send until the delay has been reached so that // the bandwidth is limited to the maximum throughput if netTime.Since(*lastSend) < delay { waitingTime := delay - netTime.Since(*lastSend) - jww.TRACE.Printf("Suspending file part sending: "+ + jww.TRACE.Printf("[FT] Suspending file part sending: "+ "bandwidth limit reached; waiting %s to send.", waitingTime) select { case <-stop.Quit(): @@ -191,28 +196,19 @@ func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, m.closeSendThread(*partList, stop, healthChanID) return true - // TODO: is this needed? - // case healthy := <-healthChan: - // // If the network is unhealthy, wait until it becomes healthy - // if !healthy { - // jww.TRACE.Print("Suspending file part sending: " + - // "network is unhealthy.") - // } - // for !healthy { - // healthy = <-healthChan - // } - // jww.TRACE.Print("File part sending continuing: network is healthy.") case <-time.NewTimer(delay - netTime.Since(*lastSend)).C: } } // Send all the messages err := m.sendParts(*partList) - *lastSend = netTime.Now() if err != nil { jww.FATAL.Panic(err) } + // Update the timestamp of the send + *lastSend = netTime.Now() + // Clear partList once done *partList = nil @@ -421,6 +417,7 @@ func (m *Manager) makeRoundEventCallback( }(tid, transfer.GetRecipient()) } + // Call progress callback after change in progress transfer.CallProgressCB(nil) } } else { @@ -443,10 +440,8 @@ func (m *Manager) makeRoundEventCallback( jww.ERROR.Printf(unsetInProgressErr, tid, roundResult) } - // Return the error on the progress callback - cbErr := errors.Errorf( - roundFailureCbErr, partsToResend, tid, rid, roundResult) - transfer.CallProgressCB(cbErr) + // Call progress callback after change in progress + transfer.CallProgressCB(nil) // Add all the unsent parts back in the queue m.queueParts(tid, partsToResend) @@ -499,9 +494,9 @@ func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { // If a single partition of the end file transfer message does not transmit, // then the partner will not be able to read the confirmation if !success { - jww.ERROR.Printf("Sending E2E message %s to end file transfer with "+ - "%s failed to transmit %d/%d partitions: %d round failures, %d "+ - "timeouts", recipient, e2eMsgID, numRoundFail+numTimeOut, + jww.ERROR.Printf("[FT] Sending E2E message %s to end file transfer "+ + "with %s failed to transmit %d/%d partitions: %d round failures, "+ + "%d timeouts", recipient, e2eMsgID, numRoundFail+numTimeOut, len(rounds), numRoundFail, numTimeOut) m.store.GetCriticalMessages().Failed(sendMsg, e2eParams) return nil @@ -510,8 +505,8 @@ func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { // Otherwise, the transmission is a success and this should be denoted in // the session and the log m.store.GetCriticalMessages().Succeeded(sendMsg, e2eParams) - jww.INFO.Printf("Sending of message %s informing %s that a transfer ended"+ - "successful.", e2eMsgID, recipient) + jww.INFO.Printf("[FT] Sending of message %s informing %s that a transfer "+ + "ended successful.", e2eMsgID, recipient) return nil } diff --git a/fileTransfer/send_test.go b/fileTransfer/send_test.go index 94897c3a2de7ed141c29c0b379101c9ee7904929..a1360b215e610841d5c164610a1ee728a390f906 100644 --- a/fileTransfer/send_test.go +++ b/fileTransfer/send_test.go @@ -784,7 +784,8 @@ func TestManager_makeRoundEventCallback(t *testing.T) { } // Tests that Manager.makeRoundEventCallback returns a callback that calls the -// progress callback with the correct error when a round fails. +// progress callback with no parts sent on round failure. Also checks that the +// file parts were added back into the queue. func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { m := newTestManager(false, nil, nil, nil, nil, t) @@ -806,6 +807,7 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { if err != nil { t.Errorf("Failed to add new transfer: %+v", err) } + partsToSend := []uint16{0, 1, 2, 3} done0, done1 := make(chan bool), make(chan bool) @@ -819,11 +821,11 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { case 0: done0 <- true case 1: - expectedErr := fmt.Sprintf( - roundFailureCbErr, partsToSend, tid, rid, api.Failed) - if r.err == nil || !strings.Contains(r.err.Error(), expectedErr) { - t.Errorf("Callback received unexpected error when round "+ - "failed.\nexpected: %s\nreceived: %v", expectedErr, r.err) + expectedResult := sentProgressResults{ + false, 0, 0, uint16(len(partsToSend)), r.tracker, nil} + if !reflect.DeepEqual(expectedResult, r) { + t.Errorf("Callback returned unexpected values."+ + "\nexpected: %+v\nreceived: %+v", expectedResult, r) } done1 <- true } @@ -832,9 +834,11 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { }() // Create queued part list add parts + partsMap := make(map[uint16]queuedPart, len(partsToSend)) queuedParts := make([]queuedPart, len(partsToSend)) for i := range queuedParts { queuedParts[i] = queuedPart{tid, uint16(i)} + partsMap[uint16(i)] = queuedParts[i] } _, transfers, groupedParts, _, err := m.buildMessages(queuedParts) @@ -852,6 +856,21 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { roundEventCB(false, false, map[id.Round]api.RoundResult{rid: api.Failed}) <-done1 + + // Check that the parts were added to the queue + for i := range partsToSend { + select { + case <-time.NewTimer(10 * time.Millisecond).C: + t.Errorf("Timed out waiting for part %d.", i) + case r := <-m.sendQueue: + if partsMap[r.partNum] != r { + t.Errorf("Incorrect part in queue (%d)."+ + "\nexpected: %+v\nreceived: %+v", i, partsMap[r.partNum], r) + } else { + delete(partsMap, r.partNum) + } + } + } } // Tests that Manager.sendEndE2eMessage sends an E2E message with the expected