diff --git a/fileTransfer/fileMessage_test.go b/fileTransfer/fileMessage_test.go index 7189d95facd053c1815a4a08fb210620538e13c5..77b72eac9fa78a04d6c4c36aceda9a0effbd8a3e 100644 --- a/fileTransfer/fileMessage_test.go +++ b/fileTransfer/fileMessage_test.go @@ -48,7 +48,8 @@ func Test_newPartMessage_SmallPayloadSizeError(t *testing.T) { // partMessage. func Test_mapPartMessage(t *testing.T) { // Generate expected values - _, expectedData, expectedPadding, expectedPartNum, expectedFile := newRandomFileMessage() + _, expectedData, expectedPadding, expectedPartNum, expectedFile := + newRandomFileMessage() fm := mapPartMessage(expectedData) @@ -78,7 +79,8 @@ func Test_mapPartMessage(t *testing.T) { // values. func Test_unmarshalPartMessage(t *testing.T) { // Generate expected values - _, expectedData, expectedPadding, expectedPartNumb, expectedFile := newRandomFileMessage() + _, expectedData, expectedPadding, expectedPartNumb, expectedFile := + newRandomFileMessage() fm, err := unmarshalPartMessage(expectedData) if err != nil { diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index 9785856651ddc65e4dc13fdef42ec679c0c2a6f8..e1a82dab762f9f156172c1445b45463d24f16207 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -49,7 +49,7 @@ const ( sendQueueBuffLen = 10_000 // Size of the buffered channel that reports if the network is healthy - networkHealthBuffLen = 10_000 + networkHealthBuffLen = 100 ) // Error messages. @@ -59,7 +59,7 @@ const ( newManagerReceivedErr = "failed to load or create new list of received file transfers: %+v" // Manager.Send - sendNetworkHealthErr = "cannot initiate file transfer of %q to %s when network is not healthy." + sendNetworkHealthErr = "cannot initiate file transfer of %q when network is not healthy." fileNameSizeErr = "length of filename (%d) greater than max allowed length (%d)" fileTypeSizeErr = "length of file type (%d) greater than max allowed length (%d)" fileSizeErr = "size of file (%d bytes) greater than max allowed size (%d bytes)" @@ -193,12 +193,12 @@ func (m *Manager) startProcesses(newFtChan, filePartChan chan message.Receive) ( // Register network health channel that is used by the sending thread to // ensure the network is healthy before sending healthyRecover := make(chan bool, networkHealthBuffLen) - m.net.GetHealthTracker().AddChannel(healthyRecover) + healthyRecoverID := m.net.GetHealthTracker().AddChannel(healthyRecover) healthySend := make(chan bool, networkHealthBuffLen) - m.net.GetHealthTracker().AddChannel(healthySend) + healthySendID := m.net.GetHealthTracker().AddChannel(healthySend) // Recover unsent parts from storage - m.oldTransferRecovery(healthyRecover) + m.oldTransferRecovery(healthyRecover, healthyRecoverID) // Start the new file transfer message reception thread newFtStop := stoppable.NewSingle(newFtStoppableName) @@ -214,7 +214,7 @@ func (m *Manager) startProcesses(newFtChan, filePartChan chan message.Receive) ( // Start the file part sending thread sendStop := stoppable.NewSingle(sendStoppableName) - go m.sendThread(sendStop, healthySend, getRandomNumParts) + go m.sendThread(sendStop, healthySend, healthySendID, getRandomNumParts) // Create a multi stoppable multiStoppable := stoppable.NewMulti(fileTransferStoppableName) @@ -229,6 +229,7 @@ func (m *Manager) startProcesses(newFtChan, filePartChan chan message.Receive) ( // initial NewFileTransfer E2E message to the recipient to inform them of the // incoming file parts. It partitions the file, puts it into storage, and queues // each file for sending. Returns a unique ID identifying the file transfer. +// Returns an error if the network is not healthy. func (m Manager) Send(fileName, fileType string, fileData []byte, recipient *id.ID, retry float32, preview []byte, progressCB interfaces.SentProgressCallback, period time.Duration) ( @@ -236,8 +237,8 @@ func (m Manager) Send(fileName, fileType string, fileData []byte, // Return an error if the network is not healthy if !m.net.GetHealthTracker().IsHealthy() { - return ftCrypto.TransferID{}, errors.Errorf( - sendNetworkHealthErr, fileName, recipient) + return ftCrypto.TransferID{}, + errors.Errorf(sendNetworkHealthErr, fileName) } // Return an error if the file name is too long diff --git a/fileTransfer/manager_test.go b/fileTransfer/manager_test.go index 21f1bbd5068709d8ed92162b0b9bdcfe7030c55d..969be2b513239ccfae8e249619927e83b8d9b85b 100644 --- a/fileTransfer/manager_test.go +++ b/fileTransfer/manager_test.go @@ -155,11 +155,11 @@ func TestManager_Send_NetworkHealthError(t *testing.T) { m := newTestManager(false, nil, nil, nil, nil, t) fileName := "MySentFile" - recipient := id.NewIdFromString("recipient", id.User, t) - expectedErr := fmt.Sprintf(sendNetworkHealthErr, fileName, recipient) + expectedErr := fmt.Sprintf(sendNetworkHealthErr, fileName) m.net.(*testNetworkManager).health.healthy = false + recipient := id.NewIdFromString("recipient", id.User, t) _, err := m.Send(fileName, "", nil, recipient, 0, nil, nil, 0) if err == nil || err.Error() != expectedErr { t.Errorf("Send did not return the expected error when the network is "+ @@ -256,7 +256,7 @@ func TestManager_Send_SendE2eError(t *testing.T) { // transfer and is called when calling from the transfer. func TestManager_RegisterSentProgressCallback(t *testing.T) { m, sti, _ := newTestManagerWithTransfers( - []uint16{12, 4, 1}, false, false, nil, nil, t) + []uint16{12, 4, 1}, false, false, nil, nil, nil, t) expectedErr := errors.New("CallbackError") // Create new callback and channel for the callback to trigger @@ -343,7 +343,7 @@ func TestManager_Resend_NoTransferError(t *testing.T) { // not run out of fingerprints. func TestManager_Resend_NoFingerprints(t *testing.T) { m, sti, _ := newTestManagerWithTransfers( - []uint16{16}, false, false, nil, nil, t) + []uint16{16}, false, false, nil, nil, nil, t) expectedErr := fmt.Sprintf(transferNotFailedErr, sti[0].tid) // Delete the transfer err := m.Resend(sti[0].tid) @@ -358,7 +358,7 @@ func TestManager_Resend_NoFingerprints(t *testing.T) { // fingerprints but is not complete. func TestManager_CloseSend_NoFingerprints(t *testing.T) { m, sti, _ := newTestManagerWithTransfers( - []uint16{16}, false, false, nil, nil, t) + []uint16{16}, false, false, nil, nil, nil, t) prng := NewPrng(42) partSize, _ := m.getPartSize() @@ -389,7 +389,7 @@ func TestManager_CloseSend_NoFingerprints(t *testing.T) { // fingerprints. func TestManager_CloseSend_Complete(t *testing.T) { m, sti, _ := newTestManagerWithTransfers( - []uint16{3}, false, false, nil, nil, t) + []uint16{3}, false, false, nil, nil, nil, t) // Set all parts to finished transfer, _ := m.sent.GetTransfer(sti[0].tid) @@ -437,7 +437,7 @@ func TestManager_CloseSend_NoTransferError(t *testing.T) { // has not run out of fingerprints and is not complete func TestManager_CloseSend_NotCompleteErr(t *testing.T) { m, sti, _ := newTestManagerWithTransfers( - []uint16{16}, false, false, nil, nil, t) + []uint16{16}, false, false, nil, nil, nil, t) expectedErr := fmt.Sprintf(transferInProgressErr, sti[0].tid) err := m.CloseSend(sti[0].tid) @@ -464,7 +464,7 @@ func TestManager_Receive_NoTransferError(t *testing.T) { // incomplete. func TestManager_Receive_GetFileError(t *testing.T) { m, _, rti := newTestManagerWithTransfers( - []uint16{12, 4, 1}, false, false, nil, nil, t) + []uint16{12, 4, 1}, false, false, nil, nil, nil, t) _, err := m.Receive(rti[0].tid) if err == nil || !strings.Contains(err.Error(), "missing") { @@ -478,7 +478,7 @@ func TestManager_Receive_GetFileError(t *testing.T) { // expected transfer and is called when calling from the transfer. func TestManager_RegisterReceivedProgressCallback(t *testing.T) { m, _, rti := newTestManagerWithTransfers( - []uint16{12, 4, 1}, false, false, nil, nil, t) + []uint16{12, 4, 1}, false, false, nil, nil, nil, t) expectedErr := errors.New("CallbackError") // Create new callback and channel for the callback to trigger @@ -715,7 +715,8 @@ func Test_FileTransfer(t *testing.T) { t.Error("Receive progress callback never reported file finishing to receive.") }() - err = m2.RegisterReceivedProgressCallback(receiveTid, receiveCb, time.Millisecond) + err = m2.RegisterReceivedProgressCallback( + receiveTid, receiveCb, time.Millisecond) if err != nil { t.Errorf("Failed to register receive progress callback: %+v", err) } diff --git a/fileTransfer/oldTransferRecovery.go b/fileTransfer/oldTransferRecovery.go index 3d6c6c0d0f9a733317b4f1ab3d723607b86feb90..18987f42b0ba0e0f4249ce232047b05dcdba9788 100644 --- a/fileTransfer/oldTransferRecovery.go +++ b/fileTransfer/oldTransferRecovery.go @@ -26,7 +26,7 @@ const roundResultsMaxAttempts = 5 // oldTransferRecovery adds all unsent file parts back into the queue and // updates the in-progress file parts by getting round updates. -func (m Manager) oldTransferRecovery(healthyChan chan bool) { +func (m Manager) oldTransferRecovery(healthyChan chan bool, chanID uint64) { // Exit if old transfers have already been recovered if m.oldTransfersRecovered { @@ -45,12 +45,16 @@ func (m Manager) oldTransferRecovery(healthyChan chan bool) { // Update parts that were sent by looking up the status of the rounds they // were sent on - go func() { + go func(healthyChan chan bool, chanID uint64, + sentRounds map[id.Round][]ftCrypto.TransferID) { err := m.updateSentRounds(healthyChan, sentRounds) if err != nil { jww.ERROR.Print(err) } - }() + + // Remove channel from tacker once done with it + m.net.GetHealthTracker().RemoveChannel(chanID) + }(healthyChan, chanID, sentRounds) } // updateSentRounds looks up the status of each round that parts were sent on diff --git a/fileTransfer/oldTransferRecovery_test.go b/fileTransfer/oldTransferRecovery_test.go index 4379843c84d7743929aad122b3e0cd8aa689519b..03c32e93947f06f20f11c2f8025f6a75745edbc3 100644 --- a/fileTransfer/oldTransferRecovery_test.go +++ b/fileTransfer/oldTransferRecovery_test.go @@ -8,6 +8,8 @@ package fileTransfer import ( + "fmt" + "github.com/pkg/errors" "gitlab.com/elixxir/client/api" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/storage/versioned" @@ -18,6 +20,7 @@ import ( "reflect" "sort" "sync" + "sync/atomic" "testing" "time" ) @@ -26,7 +29,7 @@ import ( func TestManager_oldTransferRecovery(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) m, sti, _ := newTestManagerWithTransfers( - []uint16{6, 12, 18}, false, true, nil, kv, t) + []uint16{6, 12, 18}, false, true, nil, nil, kv, t) finishedRounds := make(map[id.Round][]ftCrypto.TransferID) expectedStatus := make( @@ -89,20 +92,29 @@ func TestManager_oldTransferRecovery(t *testing.T) { } // Load new manager from the original manager's storage + net := newTestNetworkManager(false, nil, nil, t) loadedManager, err := newManager( - nil, nil, nil, nil, nil, rr, kv, nil, DefaultParams()) + nil, nil, nil, net, nil, rr, kv, nil, DefaultParams()) if err != nil { t.Errorf("Failed to create new manager from KV: %+v", err) } - // Register new sent progress callbacks + // Create new progress callbacks with channels cbChans := make([]chan sentProgressResults, len(sti)) + numCbCalls2 := make(map[ftCrypto.TransferID]int, len(sti)) for i, st := range sti { // Create sent progress callback and channel - cbChan := make(chan sentProgressResults, 8) + cbChan := make(chan sentProgressResults, 32) + numCbCalls2[st.tid] = 0 + tid := st.tid + numCalls, maxNumCalls := int64(0), int64(numCbCalls[tid]) cb := func(completed bool, sent, arrived, total uint16, tr interfaces.FilePartTracker, err error) { - cbChan <- sentProgressResults{completed, sent, arrived, total, tr, err} + if atomic.CompareAndSwapInt64(&numCalls, maxNumCalls, maxNumCalls) { + cbChan <- sentProgressResults{ + completed, sent, arrived, total, tr, err} + } + atomic.AddInt64(&numCalls, 1) } cbChans[i] = cbChan @@ -113,10 +125,6 @@ func TestManager_oldTransferRecovery(t *testing.T) { } } - // Create health chan - healthyRecover := make(chan bool, networkHealthBuffLen) - healthyRecover <- true - // Wait until callbacks have been called to know the transfers have been // recovered var wg sync.WaitGroup @@ -124,19 +132,19 @@ func TestManager_oldTransferRecovery(t *testing.T) { wg.Add(1) go func(i, callNum int, cbChan chan sentProgressResults, st sentTransferInfo) { defer wg.Done() - for j := 0; j < callNum; j++ { - select { - case <-time.NewTimer(250 * time.Millisecond).C: - t.Errorf("Timed out waiting for SentProgressCallback #%d "+ - "for transfer #%d %s", j, i, st.tid) - return - case <-cbChan: - } + select { + case <-time.NewTimer(150 * time.Millisecond).C: + case <-cbChan: } }(i, numCbCalls[st.tid], cbChans[i], st) } - loadedManager.oldTransferRecovery(healthyRecover) + // Create health chan + healthyRecover := make(chan bool, networkHealthBuffLen) + chanID := net.GetHealthTracker().AddChannel(healthyRecover) + healthyRecover <- true + + loadedManager.oldTransferRecovery(healthyRecover, chanID) wg.Wait() @@ -186,7 +194,7 @@ func TestManager_oldTransferRecovery(t *testing.T) { func TestManager_updateSentRounds(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) m, sti, _ := newTestManagerWithTransfers( - []uint16{6, 12, 18}, false, true, nil, kv, t) + []uint16{6, 12, 18}, false, true, nil, nil, kv, t) finishedRounds := make(map[id.Round][]ftCrypto.TransferID) expectedStatus := make( @@ -304,6 +312,43 @@ func TestManager_updateSentRounds(t *testing.T) { } } +// Error path: tests that Manager.updateSentRounds returns the expected error +// when getRoundResults returns only errors. +func TestManager_updateSentRounds_Error(t *testing.T) { + kv := versioned.NewKV(make(ekv.Memstore)) + m, _, _ := newTestManagerWithTransfers( + []uint16{6, 12, 18}, false, true, nil, nil, kv, t) + + // Returns an error on function and round failure on callback if sendErr is + // set; otherwise, it reports round successes and returns nil + m.getRoundResults = func( + []id.Round, time.Duration, api.RoundEventCallback) error { + return errors.Errorf("GetRoundResults error") + } + + // Create health chan + healthyRecover := make(chan bool, roundResultsMaxAttempts) + for i := 0; i < roundResultsMaxAttempts; i++ { + healthyRecover <- true + } + + sentRounds := map[id.Round][]ftCrypto.TransferID{ + 0: {{1}, {2}, {3}}, + 5: {{4}, {2}, {6}}, + 9: {{3}, {9}, {8}}, + } + + expectedErr := fmt.Sprintf( + oldTransfersRoundResultsErr, len(sentRounds), roundResultsMaxAttempts) + err := m.updateSentRounds(healthyRecover, sentRounds) + if err == nil || err.Error() != expectedErr { + t.Errorf("updateSentRounds did not return the expected error when "+ + "getRoundResults returns only errors.\nexpected: %s\nreceived: %+v", + expectedErr, err) + } + +} + // Tests that roundIdMapToList returns all the round IDs in the map. func Test_roundIdMapToList(t *testing.T) { n := 10 diff --git a/fileTransfer/receive_test.go b/fileTransfer/receive_test.go index 2b51e234471f4478828dc63d63f688cc76746a8d..4ea9419b8b96e2125da246d71a216303e3bb0643 100644 --- a/fileTransfer/receive_test.go +++ b/fileTransfer/receive_test.go @@ -53,15 +53,10 @@ func TestManager_receive(t *testing.T) { } // Generate receive callback that should be called when a message is read - type progressResults struct { - completed bool - received, total uint16 - err error - } - cbChan := make(chan progressResults) + cbChan := make(chan receivedProgressResults) cb := func(completed bool, received, total uint16, - t interfaces.FilePartTracker, err error) { - cbChan <- progressResults{completed, received, total, err} + tr interfaces.FilePartTracker, err error) { + cbChan <- receivedProgressResults{completed, received, total, tr, err} } done0, done1 := make(chan bool), make(chan bool) @@ -171,15 +166,10 @@ func TestManager_receive_Stop(t *testing.T) { } // Generate receive callback that should be called when a message is read - type progressResults struct { - completed bool - received, total uint16 - err error - } - cbChan := make(chan progressResults) + cbChan := make(chan receivedProgressResults) cb := func(completed bool, received, total uint16, - t interfaces.FilePartTracker, err error) { - cbChan <- progressResults{completed, received, total, err} + tr interfaces.FilePartTracker, err error) { + cbChan <- receivedProgressResults{completed, received, total, tr, err} } done0, done1 := make(chan bool), make(chan bool) @@ -277,15 +267,10 @@ func TestManager_readMessage(t *testing.T) { } // Generate receive callback that should be called when a message is read - type progressResults struct { - completed bool - received, total uint16 - err error - } - cbChan := make(chan progressResults, 2) + cbChan := make(chan receivedProgressResults, 2) cb := func(completed bool, received, total uint16, - t interfaces.FilePartTracker, err error) { - cbChan <- progressResults{completed, received, total, err} + tr interfaces.FilePartTracker, err error) { + cbChan <- receivedProgressResults{completed, received, total, tr, err} } done0, done1 := make(chan bool), make(chan bool) diff --git a/fileTransfer/send.go b/fileTransfer/send.go index 01ea18ae207e4526e2edb4cb182c0b7c1b5a1ae3..e3ef6afe54944f771c5d4e28fe7c6b904ced76b4 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -68,7 +68,8 @@ const pollSleepDuration = 100 * time.Millisecond // receives a random number between 1 and 11 of file parts, they are encrypted, // put into cMix messages, and sent to their recipients. Failed messages are // added to the end of the queue. -func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, getNumParts getRngNum) { +func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, + healthChanID uint64, getNumParts getRngNum) { jww.DEBUG.Print("Starting file part sending thread.") // Calculate the average amount of data sent via SendManyCMIX @@ -96,7 +97,7 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, getNu select { case <-stop.Quit(): // Close the thread when the stoppable is triggered - m.closeSendThread(partList, stop) + m.closeSendThread(partList, stop, healthChanID) return case healthy := <-healthChan: @@ -125,7 +126,8 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, getNu // If the batch is full, then send the parts if len(partList) == numParts { - quit := m.handleSend(&partList, &lastSend, delay, stop) + quit := m.handleSend( + &partList, &lastSend, delay, stop, healthChan, healthChanID) if quit { return } @@ -141,7 +143,8 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, getNu continue } - quit := m.handleSend(&partList, &lastSend, delay, stop) + quit := m.handleSend( + &partList, &lastSend, delay, stop, healthChan, healthChanID) if quit { return } @@ -151,7 +154,8 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, getNu // closeSendThread safely stops the sending thread by saving unsent parts to the // queue and setting the stoppable to stopped. -func (m *Manager) closeSendThread(partList []queuedPart, stop *stoppable.Single) { +func (m *Manager) closeSendThread(partList []queuedPart, stop *stoppable.Single, + healthChanID uint64) { // Exit the thread if the stoppable is triggered jww.DEBUG.Print("Stopping file part sending thread: stoppable triggered.") @@ -160,6 +164,10 @@ func (m *Manager) closeSendThread(partList []queuedPart, stop *stoppable.Single) m.sendQueue <- part } + // Unregister network health channel + m.net.GetHealthTracker().RemoveChannel(healthChanID) + + // Mark stoppable as stopped stop.ToStopped() } @@ -169,16 +177,31 @@ func (m *Manager) closeSendThread(partList []queuedPart, stop *stoppable.Single) // Returns true if the stoppable has been triggered and the sending thread // should quit. func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, - delay time.Duration, stop *stoppable.Single) bool { + delay time.Duration, stop *stoppable.Single, _ chan bool, + healthChanID uint64) bool { // Bandwidth limiter: wait to send until the delay has been reached so that // the bandwidth is limited to the maximum throughput if netTime.Since(*lastSend) < delay { + waitingTime := delay - netTime.Since(*lastSend) + jww.TRACE.Printf("Suspending file part sending: "+ + "bandwidth limit reached; waiting %s to send.", waitingTime) select { case <-stop.Quit(): // Close the thread when the stoppable is triggered - m.closeSendThread(*partList, stop) + m.closeSendThread(*partList, stop, healthChanID) return true + // TODO: is this needed? + // case healthy := <-healthChan: + // // If the network is unhealthy, wait until it becomes healthy + // if !healthy { + // jww.TRACE.Print("Suspending file part sending: " + + // "network is unhealthy.") + // } + // for !healthy { + // healthy = <-healthChan + // } + // jww.TRACE.Print("File part sending continuing: network is healthy.") case <-time.NewTimer(delay - netTime.Since(*lastSend)).C: } } @@ -430,7 +453,6 @@ func (m *Manager) makeRoundEventCallback( // sendEndE2eMessage sends an E2E message to the recipient once the transfer // complete information them that all file parts have been sent. -// TODO: test func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { // Get the partner partner, err := m.store.E2e().GetPartner(recipient) diff --git a/fileTransfer/send_test.go b/fileTransfer/send_test.go index 37e65378208963351de4c5b5e48170b23210f3db..9e0fda2ce98be89b248fbd3081ee39c08289d1ec 100644 --- a/fileTransfer/send_test.go +++ b/fileTransfer/send_test.go @@ -9,12 +9,16 @@ package fileTransfer import ( "bytes" + "errors" "fmt" "gitlab.com/elixxir/client/api" "gitlab.com/elixxir/client/interfaces" + "gitlab.com/elixxir/client/interfaces/message" + "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/stoppable" ftStorage "gitlab.com/elixxir/client/storage/fileTransfer" "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/crypto/diffieHellman" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" "gitlab.com/elixxir/ekv" "gitlab.com/elixxir/primitives/format" @@ -33,7 +37,7 @@ import ( // progress on the callback. func TestManager_sendThread(t *testing.T) { m, sti, _ := newTestManagerWithTransfers( - []uint16{12, 4, 1}, false, true, nil, nil, t) + []uint16{12, 4, 1}, false, true, nil, nil, nil, t) // Add three transfers partsToSend := [][]uint16{ @@ -79,18 +83,16 @@ func TestManager_sendThread(t *testing.T) { queuedParts[i], queuedParts[j] = queuedParts[j], queuedParts[i] }) - // Crate custom getRngNum function that always returns 11 + // Create custom RNG function that always returns 11 getNumParts := func(rng csprng.Source) int { return len(queuedParts) } - // Register channel for health tracking - healthyChan := make(chan bool) - m.net.GetHealthTracker().AddChannel(healthyChan) - // Start sending thread stop := stoppable.NewSingle("testSendThreadStoppable") - go m.sendThread(stop, healthyChan, getNumParts) + healthyChan := make(chan bool, 8) + healthyChan <- true + go m.sendThread(stop, healthyChan, 0, getNumParts) // Add parts to queue for _, part := range queuedParts { @@ -110,11 +112,53 @@ func TestManager_sendThread(t *testing.T) { } } +// Tests that Manager.sendThread successfully sends the parts and reports their +// progress on the callback. +func TestManager_sendThread_NetworkNotHealthy(t *testing.T) { + m, _, _ := newTestManagerWithTransfers( + []uint16{12, 4, 1}, false, true, nil, nil, nil, t) + + sendingChan := make(chan bool, 4) + getNumParts := func(csprng.Source) int { + sendingChan <- true + return 0 + } + + // Start sending thread + stop := stoppable.NewSingle("testSendThreadStoppable") + healthyChan := make(chan bool, 8) + go m.sendThread(stop, healthyChan, 0, getNumParts) + + for i := 0; i < 15; i++ { + healthyChan <- false + } + m.sendQueue <- queuedPart{ftCrypto.TransferID{5}, 0} + + select { + case <-time.NewTimer(150 * time.Millisecond).C: + healthyChan <- true + case r := <-sendingChan: + t.Errorf("sendThread tried to send even though the network is "+ + "unhealthy. %t", r) + } + + select { + case <-time.NewTimer(150 * time.Millisecond).C: + t.Errorf("Timed out waiting for sending to start.") + case <-sendingChan: + } + + err := stop.Close() + if err != nil { + t.Errorf("Failed to stop stoppable: %+v", err) + } +} + // Tests that Manager.sendThread successfully sends a partially filled batch // of the correct length when its times out waiting for messages. func TestManager_sendThread_Timeout(t *testing.T) { m, sti, _ := newTestManagerWithTransfers( - []uint16{12, 4, 1}, false, false, nil, nil, t) + []uint16{12, 4, 1}, false, false, nil, nil, nil, t) // Add three transfers partsToSend := [][]uint16{ @@ -162,13 +206,9 @@ func TestManager_sendThread_Timeout(t *testing.T) { return len(queuedParts) } - // Register channel for health tracking - healthyChan := make(chan bool) - m.net.GetHealthTracker().AddChannel(healthyChan) - // Start sending thread stop := stoppable.NewSingle("testSendThreadStoppable") - go m.sendThread(stop, healthyChan, getNumParts) + go m.sendThread(stop, make(chan bool), 0, getNumParts) // Add parts to queue for _, part := range queuedParts[:5] { @@ -194,7 +234,7 @@ func TestManager_sendThread_Timeout(t *testing.T) { // the progress callbacks with the correct values. func TestManager_sendParts(t *testing.T) { m, sti, _ := newTestManagerWithTransfers( - []uint16{12, 4, 1}, false, true, nil, nil, t) + []uint16{12, 4, 1}, false, true, nil, nil, nil, t) // Add three transfers partsToSend := [][]uint16{ @@ -279,7 +319,7 @@ func TestManager_sendParts(t *testing.T) { // the progress. func TestManager_sendParts_SendManyCmixError(t *testing.T) { m, sti, _ := newTestManagerWithTransfers( - []uint16{12, 4, 1}, true, false, nil, nil, t) + []uint16{12, 4, 1}, true, false, nil, nil, nil, t) partsToSend := [][]uint16{ {0, 1, 3, 5, 6, 7}, {1, 2, 3, 0}, @@ -332,11 +372,47 @@ func TestManager_sendParts_SendManyCmixError(t *testing.T) { wg.Wait() } +// Error path: tests that Manager.sendParts returns the expected error whe +// getRoundResults returns an error. +func TestManager_sendParts_RoundResultsError(t *testing.T) { + m, sti, _ := newTestManagerWithTransfers( + []uint16{12}, false, true, nil, nil, nil, t) + + grrErr := errors.New("GetRoundResultsError") + m.getRoundResults = + func([]id.Round, time.Duration, api.RoundEventCallback) error { + return grrErr + } + + // Add three transfers + partsToSend := [][]uint16{ + {0, 1, 3, 5, 6, 7}, + } + + // Create queued part list, add parts from each transfer, and shuffle + queuedParts := make([]queuedPart, 0, 11) + tIDs := make([]ftCrypto.TransferID, 0, len(sti)) + for i, sendingParts := range partsToSend { + for _, part := range sendingParts { + queuedParts = append(queuedParts, queuedPart{sti[i].tid, part}) + } + tIDs = append(tIDs, sti[i].tid) + } + + expectedErr := fmt.Sprintf(getRoundResultsErr, 0, tIDs, grrErr) + err := m.sendParts(queuedParts) + if err == nil || err.Error() != expectedErr { + t.Errorf("sendParts did not return the expected error when "+ + "GetRoundResults should have returned an error."+ + "\nexpected: %s\nreceived: %+v", expectedErr, err) + } +} + // Tests that Manager.buildMessages returns the expected values for a group // of 11 file parts from three different transfers. func TestManager_buildMessages(t *testing.T) { m, sti, _ := newTestManagerWithTransfers( - []uint16{12, 4, 1}, false, false, nil, nil, t) + []uint16{12, 4, 1}, false, false, nil, nil, nil, t) partsToSend := [][]uint16{ {0, 1, 3, 5, 6, 7}, {1, 2, 3, 0}, @@ -432,18 +508,11 @@ func TestManager_buildMessages(t *testing.T) { func TestManager_buildMessages_MessageBuildFailureError(t *testing.T) { m := newTestManager(false, nil, nil, nil, nil, t) - // Add transfer - - type callbackResults struct { - completed bool - sent, arrived, total uint16 - err error - } - - callbackChan := make(chan callbackResults, 10) + callbackChan := make(chan sentProgressResults, 10) progressCB := func(completed bool, sent, arrived, total uint16, - t interfaces.FilePartTracker, err error) { - callbackChan <- callbackResults{completed, sent, arrived, total, err} + tr interfaces.FilePartTracker, err error) { + callbackChan <- sentProgressResults{ + completed, sent, arrived, total, tr, err} } done0, done1 := make(chan bool), make(chan bool) @@ -611,19 +680,14 @@ func TestManager_newCmixMessage(t *testing.T) { // Tests that Manager.makeRoundEventCallback returns a callback that calls the // progress callback when a round succeeds. func TestManager_makeRoundEventCallback(t *testing.T) { - m := newTestManager(false, nil, nil, nil, nil, t) + sendE2eChan := make(chan message.Receive, 10) + m := newTestManager(false, nil, sendE2eChan, nil, nil, t) - // Add transfer - type callbackResults struct { - completed bool - sent, arrived, total uint16 - err error - } - - callbackChan := make(chan callbackResults, 10) + callbackChan := make(chan sentProgressResults, 10) progressCB := func(completed bool, sent, arrived, total uint16, - t interfaces.FilePartTracker, err error) { - callbackChan <- callbackResults{completed, sent, arrived, total, err} + tr interfaces.FilePartTracker, err error) { + callbackChan <- sentProgressResults{ + completed, sent, arrived, total, tr, err} } done0, done1 := make(chan bool), make(chan bool) @@ -649,8 +713,18 @@ func TestManager_makeRoundEventCallback(t *testing.T) { } }() - prng := NewPrng(42) + // Add recipient as partner recipient := id.NewIdFromString("recipient", id.User, t) + grp := m.store.E2e().GetGroup() + dhKey := grp.NewInt(42) + pubKey := diffieHellman.GeneratePublicKey(dhKey, grp) + p := params.GetDefaultE2ESessionParams() + err := m.store.E2e().AddPartner(recipient, pubKey, dhKey, p, p) + if err != nil { + t.Errorf("Failed to add partner %s: %+v", recipient, err) + } + + prng := NewPrng(42) key, _ := ftCrypto.NewTransferKey(prng) _, parts := newFile(4, 64, prng, t) tid, err := m.sent.AddTransfer( @@ -684,6 +758,19 @@ func TestManager_makeRoundEventCallback(t *testing.T) { roundEventCB(true, false, map[id.Round]api.RoundResult{rid: api.Succeeded}) <-done1 + + select { + case <-time.NewTimer(50 * time.Millisecond).C: + t.Errorf("Timed out waiting for end E2E message.") + case msg := <-sendE2eChan: + if msg.MessageType != message.EndFileTransfer { + t.Errorf("E2E message has wrong type.\nexpected: %d\nreceived: %d", + message.EndFileTransfer, msg.MessageType) + } else if !msg.RecipientID.Cmp(recipient) { + t.Errorf("E2E message has wrong recipient."+ + "\nexpected: %d\nreceived: %d", recipient, msg.RecipientID) + } + } } // Tests that Manager.makeRoundEventCallback returns a callback that calls the @@ -693,17 +780,11 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { rid := id.Round(42) - // Add transfer - type callbackResults struct { - completed bool - sent, arrived, total uint16 - err error - } - - callbackChan := make(chan callbackResults, 10) + callbackChan := make(chan sentProgressResults, 10) progressCB := func(completed bool, sent, arrived, total uint16, - t interfaces.FilePartTracker, err error) { - callbackChan <- callbackResults{completed, sent, arrived, total, err} + tr interfaces.FilePartTracker, err error) { + callbackChan <- sentProgressResults{ + completed, sent, arrived, total, tr, err} } prng := NewPrng(42) @@ -763,6 +844,45 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { <-done1 } +// Tests that Manager.sendEndE2eMessage sends an E2E message with the expected +// recipient and message type. This does not test round tracking or critical +// messages. +func TestManager_sendEndE2eMessage(t *testing.T) { + sendE2eChan := make(chan message.Receive, 10) + m := newTestManager(false, nil, sendE2eChan, nil, nil, t) + + // Add recipient as partner + recipient := id.NewIdFromString("recipient", id.User, t) + grp := m.store.E2e().GetGroup() + dhKey := grp.NewInt(42) + pubKey := diffieHellman.GeneratePublicKey(dhKey, grp) + p := params.GetDefaultE2ESessionParams() + err := m.store.E2e().AddPartner(recipient, pubKey, dhKey, p, p) + if err != nil { + t.Errorf("Failed to add partner %s: %+v", recipient, err) + } + + go func() { + err = m.sendEndE2eMessage(recipient) + if err != nil { + t.Errorf("sendEndE2eMessage returned an error: %+v", err) + } + }() + + select { + case <-time.NewTimer(50 * time.Millisecond).C: + t.Errorf("Timed out waiting for end E2E message.") + case msg := <-sendE2eChan: + if msg.MessageType != message.EndFileTransfer { + t.Errorf("E2E message has wrong type.\nexpected: %d\nreceived: %d", + message.EndFileTransfer, msg.MessageType) + } else if !msg.RecipientID.Cmp(recipient) { + t.Errorf("E2E message has wrong recipient."+ + "\nexpected: %d\nreceived: %d", recipient, msg.RecipientID) + } + } +} + // Tests that Manager.queueParts adds all the expected parts to the sendQueue // channel. func TestManager_queueParts(t *testing.T) { diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index b80846b847158c9483efd8a84d8e3def1333ada9..eacde9f0df97798d9fffbae2ff04f4aeb8243932 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -206,9 +206,10 @@ func newTestManager(sendErr bool, sendChan, sendE2eChan chan message.Receive, // newTestManagerWithTransfers creates a new test manager with transfers added // to it. func newTestManagerWithTransfers(numParts []uint16, sendErr, addPartners bool, - receiveCB interfaces.ReceiveCallback, kv *versioned.KV, t *testing.T) ( - *Manager, []sentTransferInfo, []receivedTransferInfo) { - m := newTestManager(sendErr, nil, nil, receiveCB, kv, t) + sendE2eChan chan message.Receive, receiveCB interfaces.ReceiveCallback, + kv *versioned.KV, t *testing.T) (*Manager, []sentTransferInfo, + []receivedTransferInfo) { + m := newTestManager(sendErr, sendE2eChan, nil, receiveCB, kv, t) sti := make([]sentTransferInfo, len(numParts)) rti := make([]receivedTransferInfo, len(numParts)) var err error @@ -444,6 +445,8 @@ func (tnm *testNetworkManager) SendE2E(msg message.Send, _ params.E2E, _ *stoppa tnm.sendE2eChan <- message.Receive{ Payload: msg.Payload, MessageType: msg.MessageType, + Sender: &id.ID{}, + RecipientID: msg.Recipient, } } diff --git a/storage/fileTransfer/partStore_test.go b/storage/fileTransfer/partStore_test.go index 4a096e64cfbbdba216176185187079f895578d9c..5e3eabae208b88c8bfed5c2131727939c7c448e7 100644 --- a/storage/fileTransfer/partStore_test.go +++ b/storage/fileTransfer/partStore_test.go @@ -450,7 +450,8 @@ func Test_partStore_marshalList_unmarshalPartList(t *testing.T) { } sort.SliceStable(list, func(i, j int) bool { return list[i] < list[j] }) - sort.SliceStable(expected, func(i, j int) bool { return expected[i] < expected[j] }) + sort.SliceStable(expected, + func(i, j int) bool { return expected[i] < expected[j] }) if !reflect.DeepEqual(expected, list) { t.Errorf("Failed to marshal and unmarshal part list."+ @@ -543,7 +544,8 @@ func newRandomPartStore(numParts uint16, kv *versioned.KV, prng io.Reader, } // newRandomPartSlice returns a list of file parts and the file in one piece. -func newRandomPartSlice(numParts uint16, prng io.Reader, t *testing.T) ([][]byte, []byte) { +func newRandomPartSlice(numParts uint16, prng io.Reader, t *testing.T) ( + [][]byte, []byte) { partSize := 64 fileBuff := bytes.NewBuffer(make([]byte, 0, int(numParts)*partSize)) partList := make([][]byte, numParts) diff --git a/storage/fileTransfer/receiveFileTransfers.go b/storage/fileTransfer/receiveFileTransfers.go index c9acc3e54aef9b2562b762fd23e036eeb5357820..7c624e434d7dedc00706fb17c81137080f8dcc46 100644 --- a/storage/fileTransfer/receiveFileTransfers.go +++ b/storage/fileTransfer/receiveFileTransfers.go @@ -138,7 +138,7 @@ func (rft *ReceivedFileTransfersStore) DeleteTransfer(tid ftCrypto.TransferID) e } // Cancel any scheduled callbacks - err := rt.StopScheduledProgressCB() + err := rt.stopScheduledProgressCB() if err != nil { jww.WARN.Print(errors.Errorf(cancelCallbackErr, tid, err)) } @@ -206,23 +206,6 @@ func (rft *ReceivedFileTransfersStore) AddPart(encryptedPart, padding, return transfer, info.id, nil } -// GetFile returns the combined file parts as a single byte slice for the given -// transfer ID. An error is returned if no file with the given transfer ID is -// found or if the file is missing parts. -func (rft *ReceivedFileTransfersStore) GetFile(tid ftCrypto.TransferID) ([]byte, - error) { - rft.mux.Lock() - defer rft.mux.Unlock() - - // Check if the transfer exists - rt, exists := rft.transfers[tid] - if !exists { - return nil, errors.Errorf(getReceivedTransferErr, tid) - } - - return rt.GetFile() -} - //////////////////////////////////////////////////////////////////////////////// // Storage Functions // //////////////////////////////////////////////////////////////////////////////// @@ -266,7 +249,8 @@ func NewOrLoadReceivedFileTransfersStore(kv *versioned.KV) ( // If the transfer list cannot be loaded from storage, then create a new // ReceivedFileTransfersStore - vo, err := rft.kv.Get(receivedFileTransfersStoreKey, receivedFileTransfersStoreVersion) + vo, err := rft.kv.Get( + receivedFileTransfersStoreKey, receivedFileTransfersStoreVersion) if err != nil { return NewReceivedFileTransfersStore(kv) } diff --git a/storage/fileTransfer/receiveFileTransfers_test.go b/storage/fileTransfer/receiveFileTransfers_test.go index d0bde16aa2e75b14bba6d602a7dc30234c818a65..08571865ed434ba26f3fcfebf1549dbcc5947d42 100644 --- a/storage/fileTransfer/receiveFileTransfers_test.go +++ b/storage/fileTransfer/receiveFileTransfers_test.go @@ -15,7 +15,6 @@ import ( "gitlab.com/elixxir/ekv" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/netTime" - "math/rand" "reflect" "sort" "strings" @@ -471,70 +470,6 @@ func TestReceivedFileTransfersStore_AddPart_AddPartError(t *testing.T) { } } -// Tests that ReceivedFileTransfersStore.GetFile returns the complete file. -func TestReceivedFileTransfersStore_GetFile(t *testing.T) { - kv := versioned.NewKV(make(ekv.Memstore)) - rft, err := NewReceivedFileTransfersStore(kv) - if err != nil { - t.Fatalf("Failed to create new ReceivedFileTransfersStore: %+v", err) - } - - prng := NewPrng(42) - numParts := uint16(16) - - // Generate file parts and expected file - parts, expectedFile := newRandomPartStore( - numParts, kv.Prefix("test"), rand.New(rand.NewSource(42)), t) - - key, _ := ftCrypto.NewTransferKey(prng) - mac := ftCrypto.CreateTransferMAC(expectedFile, key) - - fileSize := uint32(len(expectedFile)) - - tid, err := rft.AddTransfer(key, mac, fileSize, numParts, 24, prng) - if err != nil { - t.Errorf("Failed to add new transfer: %+v", err) - } - - for partNum, part := range parts.parts { - fp := ftCrypto.GenerateFingerprint(key, partNum) - encPart, partMac, padding := newEncryptedPartData(key, part, partNum, t) - _, _, err = rft.AddPart(encPart, padding, partMac, partNum, fp) - if err != nil { - t.Errorf("AddPart encountered an error: %+v", err) - } - } - - receivedFile, err := rft.GetFile(tid) - if err != nil { - t.Errorf("GetFile returned an error: %+v", err) - } - - if !bytes.Equal(expectedFile, receivedFile) { - t.Errorf("Received file does not match expected."+ - "\nexpected: %q\nreceived: %q", expectedFile, receivedFile) - } -} - -// Error path: tests that ReceivedFileTransfersStore.GetFile returns the -// expected error when no transfer with the ID exists. -func TestReceivedFileTransfersStore_GetFile_NoTransferError(t *testing.T) { - kv := versioned.NewKV(make(ekv.Memstore)) - rft, err := NewReceivedFileTransfersStore(kv) - if err != nil { - t.Fatalf("Failed to create new ReceivedFileTransfersStore: %+v", err) - } - - tid := ftCrypto.UnmarshalTransferID([]byte("invalidID")) - - expectedErr := fmt.Sprintf(getReceivedTransferErr, tid) - _, err = rft.GetFile(tid) - if err == nil || err.Error() != expectedErr { - t.Errorf("GetFile did not return the expected error."+ - "\nexpected: %s\nreceived: %+v", expectedErr, err) - } -} - //////////////////////////////////////////////////////////////////////////////// // Storage Functions // //////////////////////////////////////////////////////////////////////////////// diff --git a/storage/fileTransfer/receiveTransfer.go b/storage/fileTransfer/receiveTransfer.go index 233278c0846685c7a3c68740c10a456e1c1682cd..3a29e5271e24e12da9e49047b6340e5bc38ba66a 100644 --- a/storage/fileTransfer/receiveTransfer.go +++ b/storage/fileTransfer/receiveTransfer.go @@ -53,7 +53,7 @@ const ( deleteReceivedFilePartsErr = "failed to delete received file parts from storage: %+v" deleteReceivedVectorErr = "failed to delete received status state vector from storage: %+v" - // ReceivedTransfer.StopScheduledProgressCB + // ReceivedTransfer.stopScheduledProgressCB cancelReceivedCallbacksErr = "could not cancel %d out of %d received progress callbacks: %d" ) @@ -198,7 +198,7 @@ func (rt *ReceivedTransfer) IsPartReceived(partNum uint16) bool { // total is the total number of parts excepted to be received, and t is a part // status tracker that can be used to get the status of individual file parts. func (rt *ReceivedTransfer) GetProgress() (completed bool, received, - total uint16, t ReceivedPartTracker) { + total uint16, t interfaces.FilePartTracker) { rt.mux.RLock() defer rt.mux.RUnlock() @@ -208,7 +208,7 @@ func (rt *ReceivedTransfer) GetProgress() (completed bool, received, // getProgress is the thread-unsafe helper function for GetProgress. func (rt *ReceivedTransfer) getProgress() (completed bool, received, - total uint16, t ReceivedPartTracker) { + total uint16, t interfaces.FilePartTracker) { received = uint16(rt.receivedStatus.GetNumUsed()) total = rt.numParts @@ -217,7 +217,7 @@ func (rt *ReceivedTransfer) getProgress() (completed bool, received, completed = true } - return completed, received, total, NewReceivedPartTracker(rt.receivedStatus) + return completed, received, total, newReceivedPartTracker(rt.receivedStatus) } // CallProgressCB calls all the progress callbacks with the most recent progress @@ -231,9 +231,9 @@ func (rt *ReceivedTransfer) CallProgressCB(err error) { } } -// StopScheduledProgressCB cancels all scheduled received progress callbacks +// stopScheduledProgressCB cancels all scheduled received progress callbacks // calls. -func (rt *ReceivedTransfer) StopScheduledProgressCB() error { +func (rt *ReceivedTransfer) stopScheduledProgressCB() error { rt.mux.Lock() defer rt.mux.Unlock() diff --git a/storage/fileTransfer/receiveTransfer_test.go b/storage/fileTransfer/receiveTransfer_test.go index ce8516a655df79bc707bce1af325b4009bf41895..728480c63756ce39bf681819eaa4ad5f24013f45 100644 --- a/storage/fileTransfer/receiveTransfer_test.go +++ b/storage/fileTransfer/receiveTransfer_test.go @@ -217,10 +217,11 @@ func checkReceivedProgress(completed bool, received, total uint16, return nil } -// checkReceivedTracker checks that the ReceivedPartTracker is reporting the -// correct values for each part. Also checks that ReceivedPartTracker.GetNumParts -// returns the expected value (make sure numParts comes from a correct source). -func checkReceivedTracker(track ReceivedPartTracker, numParts uint16, +// checkReceivedTracker checks that the receivedPartTracker is reporting the +// correct values for each part. Also checks that +// receivedPartTracker.GetNumParts returns the expected value (make sure +// numParts comes from a correct source). +func checkReceivedTracker(track interfaces.FilePartTracker, numParts uint16, received []uint16, t *testing.T) { if track.GetNumParts() != numParts { t.Errorf("Tracker reported incorrect number of parts."+ @@ -299,7 +300,8 @@ func TestReceivedTransfer_GetProgress(t *testing.T) { if err != nil { t.Error(err) } - checkReceivedTracker(track, rt.numParts, []uint16{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, t) + checkReceivedTracker( + track, rt.numParts, []uint16{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, t) for i := 0; i < 4; i++ { _, _ = rt.fpVector.Next() @@ -428,9 +430,9 @@ func TestReceivedTransfer_CallProgressCB(t *testing.T) { wg.Wait() } -// Tests that ReceivedTransfer.StopScheduledProgressCB stops a scheduled +// Tests that ReceivedTransfer.stopScheduledProgressCB stops a scheduled // callback from being triggered. -func TestReceivedTransfer_StopScheduledProgressCB(t *testing.T) { +func TestReceivedTransfer_stopScheduledProgressCB(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) _, rt, _ := newEmptyReceivedTransfer(16, 20, kv, t) @@ -455,9 +457,9 @@ func TestReceivedTransfer_StopScheduledProgressCB(t *testing.T) { case <-cbChan: } - err := rt.StopScheduledProgressCB() + err := rt.stopScheduledProgressCB() if err != nil { - t.Errorf("StopScheduledProgressCB returned an error: %+v", err) + t.Errorf("stopScheduledProgressCB returned an error: %+v", err) } select { diff --git a/storage/fileTransfer/receivedCallbackTracker.go b/storage/fileTransfer/receivedCallbackTracker.go index 9b49a06bf39cd780c134e0a51af685eb10a4eaf6..61e850c5b918ab97e427fe5dc6e75181a6508efb 100644 --- a/storage/fileTransfer/receivedCallbackTracker.go +++ b/storage/fileTransfer/receivedCallbackTracker.go @@ -127,11 +127,11 @@ type receivedProgressTracker interface { // GetProgress returns the received transfer progress in a thread-safe // manner. GetProgress() ( - completed bool, received, total uint16, t ReceivedPartTracker) + completed bool, received, total uint16, t interfaces.FilePartTracker) // getProgress returns the received transfer progress in a thread-unsafe // manner. This function should be used if a lock is already taken on the // sent transfer. getProgress() ( - completed bool, received, total uint16, t ReceivedPartTracker) + completed bool, received, total uint16, t interfaces.FilePartTracker) } diff --git a/storage/fileTransfer/receivedCallbackTracker_test.go b/storage/fileTransfer/receivedCallbackTracker_test.go index 8809f537ca733f78087aaaf4ab931ac034054152..ed5f23390ec4fe4ec01bb16a0125a3070d586111 100644 --- a/storage/fileTransfer/receivedCallbackTracker_test.go +++ b/storage/fileTransfer/receivedCallbackTracker_test.go @@ -80,7 +80,7 @@ func Test_receivedCallbackTracker_call(t *testing.T) { rct := newReceivedCallbackTracker(cbFunc, 50*time.Millisecond) - tracker := testReceiveTrack{false, 1, 3, ReceivedPartTracker{}} + tracker := testReceiveTrack{false, 1, 3, receivedPartTracker{}} rct.call(tracker, nil) select { @@ -93,7 +93,7 @@ func Test_receivedCallbackTracker_call(t *testing.T) { } } - tracker = testReceiveTrack{true, 3, 3, ReceivedPartTracker{}} + tracker = testReceiveTrack{true, 3, 3, receivedPartTracker{}} rct.call(tracker, nil) select { @@ -136,7 +136,7 @@ func Test_receivedCallbackTracker_stopThread(t *testing.T) { rct := newReceivedCallbackTracker(cbFunc, 50*time.Millisecond) - tracker := testReceiveTrack{false, 1, 3, ReceivedPartTracker{}} + tracker := testReceiveTrack{false, 1, 3, receivedPartTracker{}} rct.call(tracker, nil) select { @@ -149,7 +149,7 @@ func Test_receivedCallbackTracker_stopThread(t *testing.T) { } } - tracker = testReceiveTrack{true, 3, 3, ReceivedPartTracker{}} + tracker = testReceiveTrack{true, 3, 3, receivedPartTracker{}} rct.call(tracker, nil) select { @@ -187,16 +187,16 @@ func TestReceivedTransfer_ReceivedProgressTrackerInterface(t *testing.T) { type testReceiveTrack struct { completed bool received, total uint16 - t ReceivedPartTracker + t receivedPartTracker } func (trt testReceiveTrack) getProgress() (completed bool, received, - total uint16, t ReceivedPartTracker) { + total uint16, t interfaces.FilePartTracker) { return trt.completed, trt.received, trt.total, trt.t } // GetProgress returns the values in the testTrack. func (trt testReceiveTrack) GetProgress() (completed bool, received, - total uint16, t ReceivedPartTracker) { + total uint16, t interfaces.FilePartTracker) { return trt.completed, trt.received, trt.total, trt.t } diff --git a/storage/fileTransfer/receivedPartTracker.go b/storage/fileTransfer/receivedPartTracker.go index 78df88c987af35d04d56265e88784c0c37006d67..b509b39479dc2396742334af72346971915a5416 100644 --- a/storage/fileTransfer/receivedPartTracker.go +++ b/storage/fileTransfer/receivedPartTracker.go @@ -12,8 +12,8 @@ import ( "gitlab.com/elixxir/client/storage/utility" ) -// ReceivedPartTracker tracks the status of individual received file parts. -type ReceivedPartTracker struct { +// receivedPartTracker tracks the status of individual received file parts. +type receivedPartTracker struct { // The number of file parts in the file numParts uint16 @@ -21,20 +21,20 @@ type ReceivedPartTracker struct { receivedStatus *utility.StateVector } -// NewReceivedPartTracker creates a new ReceivedPartTracker with copies of the +// newReceivedPartTracker creates a new receivedPartTracker with copies of the // received status state vectors. -func NewReceivedPartTracker(received *utility.StateVector) ReceivedPartTracker { - return ReceivedPartTracker{ +func newReceivedPartTracker(received *utility.StateVector) receivedPartTracker { + return receivedPartTracker{ numParts: uint16(received.GetNumKeys()), receivedStatus: received.DeepCopy(), } } -// GetPartStatus returns the status of the received file part with the given part -// number. The possible values for the status are: +// GetPartStatus returns the status of the received file part with the given +// part number. The possible values for the status are: // 0 = unreceived // 3 = received (receiver has received a part) -func (rpt ReceivedPartTracker) GetPartStatus(partNum uint16) interfaces.FpStatus { +func (rpt receivedPartTracker) GetPartStatus(partNum uint16) interfaces.FpStatus { if rpt.receivedStatus.Used(uint32(partNum)) { return interfaces.FpReceived } else { @@ -43,6 +43,6 @@ func (rpt ReceivedPartTracker) GetPartStatus(partNum uint16) interfaces.FpStatus } // GetNumParts returns the total number of file parts in the transfer. -func (rpt ReceivedPartTracker) GetNumParts() uint16 { +func (rpt receivedPartTracker) GetNumParts() uint16 { return rpt.numParts } diff --git a/storage/fileTransfer/receivedPartTracker_test.go b/storage/fileTransfer/receivedPartTracker_test.go index 379c1d75f8e1f7cccf73038a7d8680e7a125ccd7..64f390058bda927f67d743d78e58cb15cb52f923 100644 --- a/storage/fileTransfer/receivedPartTracker_test.go +++ b/storage/fileTransfer/receivedPartTracker_test.go @@ -16,32 +16,32 @@ import ( "testing" ) -// Tests that ReceivedPartTracker satisfies the interfaces.FilePartTracker +// Tests that receivedPartTracker satisfies the interfaces.FilePartTracker // interface. func TestReceivedPartTracker_FilePartTrackerInterface(t *testing.T) { - var _ interfaces.FilePartTracker = ReceivedPartTracker{} + var _ interfaces.FilePartTracker = receivedPartTracker{} } -// Tests that NewReceivedPartTracker returns a new ReceivedPartTracker with the +// Tests that newReceivedPartTracker returns a new receivedPartTracker with the // expected values. -func TestNewReceivedPartTracker(t *testing.T) { +func Test_newReceivedPartTracker(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) _, rt, _ := newRandomReceivedTransfer(16, 24, kv, t) - expected := ReceivedPartTracker{ + expected := receivedPartTracker{ numParts: rt.numParts, receivedStatus: rt.receivedStatus.DeepCopy(), } - newRPT := NewReceivedPartTracker(rt.receivedStatus) + newRPT := newReceivedPartTracker(rt.receivedStatus) if !reflect.DeepEqual(expected, newRPT) { - t.Errorf("New ReceivedPartTracker does not match expected."+ + t.Errorf("New receivedPartTracker does not match expected."+ "\nexpected: %+v\nreceived: %+v", expected, newRPT) } } -// Tests that ReceivedPartTracker.GetPartStatus returns the expected status for +// Tests that receivedPartTracker.GetPartStatus returns the expected status for // each part loaded from a preconfigured ReceivedTransfer. func TestReceivedPartTracker_GetPartStatus(t *testing.T) { // Create new ReceivedTransfer @@ -52,15 +52,16 @@ func TestReceivedPartTracker_GetPartStatus(t *testing.T) { prng := rand.New(rand.NewSource(42)) partStatuses := make(map[uint16]interfaces.FpStatus, rt.numParts) for partNum := uint16(0); partNum < rt.numParts; partNum++ { - partStatuses[partNum] = interfaces.FpStatus(prng.Intn(2)) * interfaces.FpReceived + partStatuses[partNum] = + interfaces.FpStatus(prng.Intn(2)) * interfaces.FpReceived if partStatuses[partNum] == interfaces.FpReceived { rt.receivedStatus.Use(uint32(partNum)) } } - // Create a new ReceivedPartTracker from the ReceivedTransfer - rpt := NewReceivedPartTracker(rt.receivedStatus) + // Create a new receivedPartTracker from the ReceivedTransfer + rpt := newReceivedPartTracker(rt.receivedStatus) // Check that the statuses for each part matches the map for partNum := uint16(0); partNum < rt.numParts; partNum++ { @@ -72,15 +73,15 @@ func TestReceivedPartTracker_GetPartStatus(t *testing.T) { } } -// Tests that ReceivedPartTracker.GetNumParts returns the same number of parts -// as the ReceivedPartTracker it was created from. +// Tests that receivedPartTracker.GetNumParts returns the same number of parts +// as the receivedPartTracker it was created from. func TestReceivedPartTracker_GetNumParts(t *testing.T) { // Create new ReceivedTransfer kv := versioned.NewKV(make(ekv.Memstore)) _, rt, _ := newEmptyReceivedTransfer(16, 24, kv, t) - // Create a new ReceivedPartTracker from the ReceivedTransfer - rpt := NewReceivedPartTracker(rt.receivedStatus) + // Create a new receivedPartTracker from the ReceivedTransfer + rpt := newReceivedPartTracker(rt.receivedStatus) if rpt.GetNumParts() != rt.GetNumParts() { t.Errorf("Number of parts incorrect.\nexpected: %d\nreceived: %d", diff --git a/storage/fileTransfer/sentCallbackTracker.go b/storage/fileTransfer/sentCallbackTracker.go index f99540707ebd8b3ab7793264bdf93bfa2af329fa..7381422c0cf98f053e47896ae00831bd8eff1cb3 100644 --- a/storage/fileTransfer/sentCallbackTracker.go +++ b/storage/fileTransfer/sentCallbackTracker.go @@ -126,11 +126,11 @@ func (sct *sentCallbackTracker) callNowUnsafe(skipCompletedCheck bool, type sentProgressTracker interface { // GetProgress returns the sent transfer progress in a thread-safe manner. GetProgress() ( - completed bool, sent, arrived, total uint16, t SentPartTracker) + completed bool, sent, arrived, total uint16, t interfaces.FilePartTracker) // getProgress returns the sent transfer progress in a thread-unsafe manner. // This function should be used if a lock is already taken on the sent // transfer. getProgress() ( - completed bool, sent, arrived, total uint16, t SentPartTracker) + completed bool, sent, arrived, total uint16, t interfaces.FilePartTracker) } diff --git a/storage/fileTransfer/sentCallbackTracker_test.go b/storage/fileTransfer/sentCallbackTracker_test.go index 237cea82ff66c51e41b7dd98d8a60bc3ec524818..fc445d11475cf95d41b843e01f64381829577892 100644 --- a/storage/fileTransfer/sentCallbackTracker_test.go +++ b/storage/fileTransfer/sentCallbackTracker_test.go @@ -40,7 +40,7 @@ func Test_newSentCallbackTracker(t *testing.T) { receivedSCT := newSentCallbackTracker(expectedSCT.cb, expectedSCT.period) - go receivedSCT.cb(false, 0, 0, 0, SentPartTracker{}, nil) + go receivedSCT.cb(false, 0, 0, 0, sentPartTracker{}, nil) select { case <-time.NewTimer(time.Millisecond).C: @@ -83,7 +83,7 @@ func Test_sentCallbackTracker_call(t *testing.T) { sct := newSentCallbackTracker(cbFunc, 50*time.Millisecond) - tracker := testSentTrack{false, 1, 2, 3, SentPartTracker{}} + tracker := testSentTrack{false, 1, 2, 3, sentPartTracker{}} sct.call(tracker, nil) select { @@ -97,7 +97,7 @@ func Test_sentCallbackTracker_call(t *testing.T) { } } - tracker = testSentTrack{false, 1, 2, 3, SentPartTracker{}} + tracker = testSentTrack{false, 1, 2, 3, sentPartTracker{}} sct.call(tracker, nil) select { @@ -141,7 +141,7 @@ func Test_sentCallbackTracker_stopThread(t *testing.T) { sct := newSentCallbackTracker(cbFunc, 50*time.Millisecond) - tracker := testSentTrack{false, 1, 2, 3, SentPartTracker{}} + tracker := testSentTrack{false, 1, 2, 3, sentPartTracker{}} sct.call(tracker, nil) select { @@ -155,7 +155,7 @@ func Test_sentCallbackTracker_stopThread(t *testing.T) { } } - tracker = testSentTrack{false, 1, 2, 3, SentPartTracker{}} + tracker = testSentTrack{false, 1, 2, 3, sentPartTracker{}} sct.call(tracker, nil) select { @@ -193,16 +193,16 @@ func TestSentTransfer_SentProgressTrackerInterface(t *testing.T) { type testSentTrack struct { completed bool sent, arrived, total uint16 - t SentPartTracker + t sentPartTracker } func (tst testSentTrack) getProgress() (completed bool, sent, arrived, - total uint16, t SentPartTracker) { + total uint16, t interfaces.FilePartTracker) { return tst.completed, tst.sent, tst.arrived, tst.total, tst.t } // GetProgress returns the values in the testTrack. func (tst testSentTrack) GetProgress() (completed bool, sent, arrived, - total uint16, t SentPartTracker) { + total uint16, t interfaces.FilePartTracker) { return tst.completed, tst.sent, tst.arrived, tst.total, tst.t } diff --git a/storage/fileTransfer/sentFileTransfers.go b/storage/fileTransfer/sentFileTransfers.go index 8e095745bd57aad4097619b59cb41e59beaea2ba..0999627343989b6d50354f1e323e6739b2be5476 100644 --- a/storage/fileTransfer/sentFileTransfers.go +++ b/storage/fileTransfer/sentFileTransfers.go @@ -118,7 +118,7 @@ func (sft *SentFileTransfersStore) DeleteTransfer(tid ftCrypto.TransferID) error } // Cancel any scheduled callbacks - err := st.StopScheduledProgressCB() + err := st.stopScheduledProgressCB() if err != nil { jww.WARN.Print(errors.Errorf(cancelCallbackErr, tid, err)) } @@ -175,7 +175,7 @@ func (sft *SentFileTransfersStore) GetSentRounds() map[id.Round][]ftCrypto.Trans // GetUnsentPartsAndSentRounds returns two maps. The first is a map of all // transfers and a list of their parts that have not been sent (parts that were -// never marked as in-progress). The seconds is a map of all round IDs and which +// never marked as in-progress). The second is a map of all round IDs and which // transfers have parts sent on those rounds (parts marked in-progress). This // function performs the same operations as GetUnsentParts and GetSentRounds but // in a single loop. diff --git a/storage/fileTransfer/sentFileTransfers_test.go b/storage/fileTransfer/sentFileTransfers_test.go index 6d1658deed15baa29f5cff1f3dafc3e31283f3d4..f6bce9becbf43e159ca428839f4412d10285e917 100644 --- a/storage/fileTransfer/sentFileTransfers_test.go +++ b/storage/fileTransfer/sentFileTransfers_test.go @@ -465,7 +465,8 @@ func TestLoadSentFileTransfersStore(t *testing.T) { // Equalize all progressCallbacks because reflect.DeepEqual does not seem to // work on function pointers for _, tid := range list { - loadedSFT.transfers[tid].progressCallbacks = sft.transfers[tid].progressCallbacks + loadedSFT.transfers[tid].progressCallbacks = + sft.transfers[tid].progressCallbacks } if !reflect.DeepEqual(sft, loadedSFT) { diff --git a/storage/fileTransfer/sentPartTracker.go b/storage/fileTransfer/sentPartTracker.go index 3f2e95addeca5b255fffcbf9111c05f003607d96..a13ebd528f58cd14d545bc5975e27f5aafc1dd1a 100644 --- a/storage/fileTransfer/sentPartTracker.go +++ b/storage/fileTransfer/sentPartTracker.go @@ -12,8 +12,8 @@ import ( "gitlab.com/elixxir/client/storage/utility" ) -// SentPartTracker tracks the status of individual sent file parts. -type SentPartTracker struct { +// sentPartTracker tracks the status of individual sent file parts. +type sentPartTracker struct { // The number of file parts in the file numParts uint16 @@ -24,10 +24,10 @@ type SentPartTracker struct { finishedStatus *utility.StateVector } -// NewSentPartTracker creates a new SentPartTracker with copies of the +// newSentPartTracker creates a new sentPartTracker with copies of the // in-progress and finished status state vectors. -func NewSentPartTracker(inProgress, finished *utility.StateVector) SentPartTracker { - return SentPartTracker{ +func newSentPartTracker(inProgress, finished *utility.StateVector) sentPartTracker { + return sentPartTracker{ numParts: uint16(inProgress.GetNumKeys()), inProgressStatus: inProgress.DeepCopy(), finishedStatus: finished.DeepCopy(), @@ -39,7 +39,7 @@ func NewSentPartTracker(inProgress, finished *utility.StateVector) SentPartTrack // 0 = unsent // 1 = sent (sender has sent a part, but it has not arrived) // 2 = arrived (sender has sent a part, and it has arrived) -func (spt SentPartTracker) GetPartStatus(partNum uint16) interfaces.FpStatus { +func (spt sentPartTracker) GetPartStatus(partNum uint16) interfaces.FpStatus { if spt.inProgressStatus.Used(uint32(partNum)) { return interfaces.FpSent } else if spt.finishedStatus.Used(uint32(partNum)) { @@ -50,6 +50,6 @@ func (spt SentPartTracker) GetPartStatus(partNum uint16) interfaces.FpStatus { } // GetNumParts returns the total number of file parts in the transfer. -func (spt SentPartTracker) GetNumParts() uint16 { +func (spt sentPartTracker) GetNumParts() uint16 { return spt.numParts } diff --git a/storage/fileTransfer/sentPartTracker_test.go b/storage/fileTransfer/sentPartTracker_test.go index 89d37fc5213939c47ae92e02391c6a3952d32e95..fa6cf0850f20e3790d7f99e34daf49572527c66d 100644 --- a/storage/fileTransfer/sentPartTracker_test.go +++ b/storage/fileTransfer/sentPartTracker_test.go @@ -16,35 +16,35 @@ import ( "testing" ) -// Tests that SentPartTracker satisfies the interfaces.FilePartTracker +// Tests that sentPartTracker satisfies the interfaces.FilePartTracker // interface. -func TestSentPartTracker_FilePartTrackerInterface(t *testing.T) { - var _ interfaces.FilePartTracker = SentPartTracker{} +func Test_sentPartTracker_FilePartTrackerInterface(t *testing.T) { + var _ interfaces.FilePartTracker = sentPartTracker{} } -// Tests that NewSentPartTracker returns a new SentPartTracker with the expected +// Tests that newSentPartTracker returns a new sentPartTracker with the expected // values. -func TestNewSentPartTracker(t *testing.T) { +func Test_newSentPartTracker(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) _, st := newRandomSentTransfer(16, 24, kv, t) - expected := SentPartTracker{ + expected := sentPartTracker{ numParts: st.numParts, inProgressStatus: st.inProgressStatus.DeepCopy(), finishedStatus: st.finishedStatus.DeepCopy(), } - newSPT := NewSentPartTracker(st.inProgressStatus, st.finishedStatus) + newSPT := newSentPartTracker(st.inProgressStatus, st.finishedStatus) if !reflect.DeepEqual(expected, newSPT) { - t.Errorf("New SentPartTracker does not match expected."+ + t.Errorf("New sentPartTracker does not match expected."+ "\nexpected: %+v\nreceived: %+v", expected, newSPT) } } -// Tests that SentPartTracker.GetPartStatus returns the expected status for each +// Tests that sentPartTracker.GetPartStatus returns the expected status for each // part loaded from a preconfigured SentTransfer. -func TestSentPartTracker_GetPartStatus(t *testing.T) { +func Test_sentPartTracker_GetPartStatus(t *testing.T) { // Create new SentTransfer kv := versioned.NewKV(make(ekv.Memstore)) _, st := newRandomSentTransfer(16, 24, kv, t) @@ -63,8 +63,8 @@ func TestSentPartTracker_GetPartStatus(t *testing.T) { } } - // Create a new SentPartTracker from the SentTransfer - spt := NewSentPartTracker(st.inProgressStatus, st.finishedStatus) + // Create a new sentPartTracker from the SentTransfer + spt := newSentPartTracker(st.inProgressStatus, st.finishedStatus) // Check that the statuses for each part matches the map for partNum := uint16(0); partNum < st.numParts; partNum++ { @@ -76,15 +76,15 @@ func TestSentPartTracker_GetPartStatus(t *testing.T) { } } -// Tests that SentPartTracker.GetNumParts returns the same number of parts as +// Tests that sentPartTracker.GetNumParts returns the same number of parts as // the SentTransfer it was created from. -func TestSentPartTracker_GetNumParts(t *testing.T) { +func Test_sentPartTracker_GetNumParts(t *testing.T) { // Create new SentTransfer kv := versioned.NewKV(make(ekv.Memstore)) _, st := newRandomSentTransfer(16, 24, kv, t) - // Create a new SentPartTracker from the SentTransfer - spt := NewSentPartTracker(st.inProgressStatus, st.finishedStatus) + // Create a new sentPartTracker from the SentTransfer + spt := newSentPartTracker(st.inProgressStatus, st.finishedStatus) if spt.GetNumParts() != st.GetNumParts() { t.Errorf("Number of parts incorrect.\nexpected: %d\nreceived: %d", diff --git a/storage/fileTransfer/sentTransfer.go b/storage/fileTransfer/sentTransfer.go index 380f49e088cec9f29e67b3161308e090e74c0c42..53360da12df33f2b85699e9e3ce2f5260557f77f 100644 --- a/storage/fileTransfer/sentTransfer.go +++ b/storage/fileTransfer/sentTransfer.go @@ -51,7 +51,7 @@ const ( reInitSentInProgressVectorErr = "failed to overwrite in-progress state vector with new vector: %+v" reInitSentFinishedVectorErr = "failed to overwrite finished state vector with new vector: %+v" - // SentTransfer.StopScheduledProgressCB + // SentTransfer.stopScheduledProgressCB cancelSentCallbacksErr = "could not cancel %d out of %d sent progress callbacks: %d" // loadSentTransfer @@ -333,7 +333,7 @@ func (st *SentTransfer) IsPartFinished(partNum uint16) bool { // sent, and t is a part status tracker that can be used to get the status of // individual file parts. func (st *SentTransfer) GetProgress() (completed bool, sent, arrived, - total uint16, t SentPartTracker) { + total uint16, t interfaces.FilePartTracker) { st.mux.RLock() defer st.mux.RUnlock() @@ -343,7 +343,7 @@ func (st *SentTransfer) GetProgress() (completed bool, sent, arrived, // getProgress is the thread-unsafe helper function for GetProgress. func (st *SentTransfer) getProgress() (completed bool, sent, arrived, - total uint16, t SentPartTracker) { + total uint16, t interfaces.FilePartTracker) { arrived = uint16(st.finishedStatus.GetNumUsed()) sent = uint16(st.inProgressStatus.GetNumUsed()) total = st.numParts @@ -352,7 +352,7 @@ func (st *SentTransfer) getProgress() (completed bool, sent, arrived, completed = true } - partTracker := NewSentPartTracker(st.inProgressStatus, st.finishedStatus) + partTracker := newSentPartTracker(st.inProgressStatus, st.finishedStatus) return completed, sent, arrived, total, partTracker } @@ -375,8 +375,8 @@ func (st *SentTransfer) CallProgressCB(err error) { } } -// StopScheduledProgressCB cancels all scheduled sent progress callbacks calls. -func (st *SentTransfer) StopScheduledProgressCB() error { +// stopScheduledProgressCB cancels all scheduled sent progress callbacks calls. +func (st *SentTransfer) stopScheduledProgressCB() error { st.mux.Lock() defer st.mux.Unlock() @@ -466,7 +466,8 @@ func (st *SentTransfer) GetEncryptedPart(partNum uint16, partSize int, // SetInProgress adds the specified file part numbers to the in-progress // transfers for the given round ID. Returns whether the round already exists in // the list. -func (st *SentTransfer) SetInProgress(rid id.Round, partNums ...uint16) (error, bool) { +func (st *SentTransfer) SetInProgress(rid id.Round, partNums ...uint16) ( + error, bool) { st.mux.Lock() defer st.mux.Unlock() diff --git a/storage/fileTransfer/sentTransfer_test.go b/storage/fileTransfer/sentTransfer_test.go index 5ebd94c7fb1b818a9ed4756defdb9782dc739760..d0e6d81b446f6b3d1d2fa9bbecf6d1a014d5a1dc 100644 --- a/storage/fileTransfer/sentTransfer_test.go +++ b/storage/fileTransfer/sentTransfer_test.go @@ -651,9 +651,9 @@ func TestSentTransfer_CallProgressCB(t *testing.T) { wg.Wait() } -// Tests that SentTransfer.StopScheduledProgressCB stops a scheduled callback +// Tests that SentTransfer.stopScheduledProgressCB stops a scheduled callback // from being triggered. -func TestSentTransfer_StopScheduledProgressCB(t *testing.T) { +func TestSentTransfer_stopScheduledProgressCB(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) _, st := newRandomSentTransfer(16, 24, kv, t) @@ -678,9 +678,9 @@ func TestSentTransfer_StopScheduledProgressCB(t *testing.T) { case <-cbChan: } - err := st.StopScheduledProgressCB() + err := st.stopScheduledProgressCB() if err != nil { - t.Errorf("StopScheduledProgressCB returned an error: %+v", err) + t.Errorf("stopScheduledProgressCB returned an error: %+v", err) } select { @@ -1591,7 +1591,8 @@ func TestSentTransfer_marshal_unmarshalSentTransfer(t *testing.T) { marshaledData := st.marshal() - recipient, key, numParts, numFps, status := unmarshalSentTransfer(marshaledData) + recipient, key, numParts, numFps, status := + unmarshalSentTransfer(marshaledData) if !st.recipient.Cmp(recipient) { t.Errorf("Failed to get recipient ID.\nexpected: %s\nreceived: %s", @@ -1714,11 +1715,11 @@ func checkSentProgress(completed bool, sent, arrived, total uint16, return nil } -// checkSentTracker checks that the SentPartTracker is reporting the correct -// values for each part. Also checks that SentPartTracker.GetNumParts returns +// checkSentTracker checks that the sentPartTracker is reporting the correct +// values for each part. Also checks that sentPartTracker.GetNumParts returns // the expected value (make sure numParts comes from a correct source). -func checkSentTracker(track SentPartTracker, numParts uint16, inProgress, - finished []uint16, t *testing.T) { +func checkSentTracker(track interfaces.FilePartTracker, numParts uint16, + inProgress, finished []uint16, t *testing.T) { if track.GetNumParts() != numParts { t.Errorf("Tracker reported incorrect number of parts."+ "\nexpected: %d\nreceived: %d", numParts, track.GetNumParts()) diff --git a/storage/fileTransfer/transferStatus.go b/storage/fileTransfer/transferStatus.go index f223d735471ada1ec2e942b278e8330e80de4e08..ef4fe36dc5716c79be40fd21438101eb16307ae4 100644 --- a/storage/fileTransfer/transferStatus.go +++ b/storage/fileTransfer/transferStatus.go @@ -16,9 +16,16 @@ import ( type TransferStatus int const ( - Running TransferStatus = iota // Sending parts - Stopping // Sent last part but not callback - Stopped // Sent last part and callback + // Running indicates that the transfer is in the processes of sending + Running TransferStatus = iota + + // Stopping indicates that the last part has been sent but the callback + // indicating a completed transfer has not been called + Stopping + + // Stopped indicates that the last part in the transfer has been sent and + // the last callback has been called + Stopped ) const invalidTransferStatusStringErr = "INVALID TransferStatus: " diff --git a/storage/fileTransfer/transferredBundle.go b/storage/fileTransfer/transferredBundle.go index 58e0e72f787d64dcda024ce90b1799c388c5ae68..639c8a096fdd8bf6cb8fd8d111e40d27b8d85491 100644 --- a/storage/fileTransfer/transferredBundle.go +++ b/storage/fileTransfer/transferredBundle.go @@ -46,7 +46,8 @@ type transferredBundle struct { // newTransferredBundle generates a new transferredBundle and saves it to // storage. -func newTransferredBundle(key string, kv *versioned.KV) (*transferredBundle, error) { +func newTransferredBundle(key string, kv *versioned.KV) ( + *transferredBundle, error) { tb := &transferredBundle{ list: make(map[id.Round][]uint16), key: key, @@ -95,7 +96,8 @@ func (tb *transferredBundle) deletePartNums(rid id.Round) error { //////////////////////////////////////////////////////////////////////////////// // loadTransferredBundle loads a transferredBundle from storage. -func loadTransferredBundle(key string, kv *versioned.KV) (*transferredBundle, error) { +func loadTransferredBundle(key string, kv *versioned.KV) (*transferredBundle, + error) { vo, err := kv.Get(makeTransferredBundleKey(key), transferredBundleVersion) if err != nil { return nil, errors.Errorf(loadTransferredBundleErr, err)