diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index e91303742ebc27ccb95810e004cd69b2d8449d62..b188bcc203fe5e286507a5f7e7de4cb34485a836 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -101,8 +101,8 @@ type Manager struct { sendQueue chan queuedPart // Indicates if old transfers saved to storage have been recovered after - // file transfer is closed and reopened - oldTransfersRecovered bool + // file transfer is closed and reopened; this is an atomic + oldTransfersRecovered *uint32 // File transfer parameters p Params @@ -159,14 +159,16 @@ func newManager(client *api.Client, store *storage.Session, } jww.DEBUG.Printf(""+ - "[FT] Created mew file transfer manager with params: %+v", p) + "[FT] Created new file transfer manager with params: %+v", p) + + oldTransfersRecovered := uint32(0) return &Manager{ receiveCB: receiveCB, sent: sent, received: received, sendQueue: make(chan queuedPart, sendQueueBuffLen), - oldTransfersRecovered: false, + oldTransfersRecovered: &oldTransfersRecovered, p: p, client: client, store: store, diff --git a/fileTransfer/oldTransferRecovery.go b/fileTransfer/oldTransferRecovery.go index 2957e1b156835be260c470910b8767d0653a8b4b..7fc4d41c61915d070717a3001c4154237cf0f81c 100644 --- a/fileTransfer/oldTransferRecovery.go +++ b/fileTransfer/oldTransferRecovery.go @@ -12,6 +12,7 @@ import ( jww "github.com/spf13/jwalterweatherman" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" "gitlab.com/xx_network/primitives/id" + "sync/atomic" ) // Error messages. @@ -29,7 +30,9 @@ const roundResultsMaxAttempts = 5 func (m Manager) oldTransferRecovery(healthyChan chan bool, chanID uint64) { // Exit if old transfers have already been recovered - if m.oldTransfersRecovered { + // TODO: move GetUnsentPartsAndSentRounds to manager creation and remove the + // atomic + if !atomic.CompareAndSwapUint32(m.oldTransfersRecovered, 0, 1) { jww.DEBUG.Printf("[FT] Old file transfer recovery thread not " + "starting: none to recover (app was not closed)") return @@ -38,6 +41,9 @@ func (m Manager) oldTransferRecovery(healthyChan chan bool, chanID uint64) { // Get list of unsent parts and rounds that parts were sent on unsentParts, sentRounds, err := m.sent.GetUnsentPartsAndSentRounds() + jww.DEBUG.Printf("Adding unsent parts from %d recovered transfers: %v", + len(unsentParts), unsentParts) + // Add all unsent parts to the queue for tid, partNums := range unsentParts { m.queueParts(tid, partNums) @@ -106,7 +112,9 @@ func (m Manager) updateSentRounds(healthyChan chan bool, roundList, err) } else { jww.INFO.Printf( - "[FT] Successfully recovered old file transfers.") + "[FT] Successfully recovered old file transfers: %v", + sentRounds) + return nil } getRoundResultsAttempts++ diff --git a/fileTransfer/send.go b/fileTransfer/send.go index dbab0994774046b8ce5fa62ab8030fbde4335fd8..d954811cd8495906b8c5519f8d07911f54113240 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -33,7 +33,7 @@ import ( const ( // Manager.sendParts sendManyCmixWarn = "[FT] Failed to send %d file parts %v via SendManyCMIX: %+v" - setInProgressErr = "[FT] Failed to set parts %v to in-progress for transfer %s" + setInProgressErr = "[FT] Failed to set parts %v to in-progress for transfer %s: %+v" getRoundResultsErr = "[FT] Failed to get round results for round %d for file transfers %v: %+v" // Manager.buildMessages @@ -47,7 +47,7 @@ const ( finishedEndE2eMsfErr = "[FT] Failed to send E2E message to %s on completion of file transfer %s: %+v" roundFailureWarn = "[FT] Failed to send file parts for file transfers %v on round %d: round %s" finishFailNoTransferErr = "[FT] Failed to requeue in-progress parts on failure of round %d for transfer %s: %+v" - unsetInProgressErr = "[FT] Failed to remove parts from in-progress list for transfer %s: round %s" + unsetInProgressErr = "[FT] Failed to remove parts from in-progress list for transfer %s: round %s: %+v" // Manager.sendEndE2eMessage endE2eGetPartnerErr = "failed to get file transfer partner %s: %+v" @@ -211,7 +211,7 @@ func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, // Send all the messages err := m.sendParts(*partList, sentRounds) if err != nil { - jww.FATAL.Panic(err) + jww.ERROR.Print(err) } // Update the timestamp of the send @@ -270,7 +270,7 @@ func (m *Manager) sendParts(partList []queuedPart, for tid, transfer := range transfers { exists, err := transfer.SetInProgress(rid, groupedParts[tid]...) if err != nil { - return errors.Errorf(setInProgressErr, groupedParts[tid], tid) + return errors.Errorf(setInProgressErr, groupedParts[tid], tid, err) } transfer.CallProgressCB(nil) @@ -460,7 +460,8 @@ func (m *Manager) makeRoundEventCallback( // Remove parts from in-progress list partsToResend, err := st.UnsetInProgress(rid) if err != nil { - jww.ERROR.Printf(unsetInProgressErr, tid, roundResult) + jww.ERROR.Printf( + unsetInProgressErr, tid, roundResult, err) } // Call progress callback after change in progress @@ -529,7 +530,7 @@ func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { // the session and the log m.store.GetCriticalMessages().Succeeded(sendMsg, e2eParams) jww.INFO.Printf("[FT] Sending of message %s informing %s that a transfer "+ - "ended successful.", e2eMsgID, recipient) + "completed successfully.", e2eMsgID, recipient) return nil } diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index 7738b4529052317bf77a952da2d0e5dea311249d..a982d3ba226565db9e4886132f164998560826a6 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -191,17 +191,20 @@ func newTestManager(sendErr bool, sendChan, sendE2eChan chan message.Receive, avgSendSize := avgNumMessages * (8192 / 8) p.MaxThroughput = int(time.Second) * avgSendSize + oldTransfersRecovered := uint32(0) + m := &Manager{ - receiveCB: receiveCB, - sent: sent, - received: received, - sendQueue: make(chan queuedPart, sendQueueBuffLen), - p: p, - store: storage.InitTestingSession(t), - swb: switchboard.New(), - net: net, - rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), - getRoundResults: rr, + receiveCB: receiveCB, + sent: sent, + received: received, + sendQueue: make(chan queuedPart, sendQueueBuffLen), + oldTransfersRecovered: &oldTransfersRecovered, + p: p, + store: storage.InitTestingSession(t), + swb: switchboard.New(), + net: net, + rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), + getRoundResults: rr, } return m @@ -495,6 +498,7 @@ func (tnm *testNetworkManager) SendManyCMIX(messages []message.TargetedCmixMessa for _, msg := range messages { tnm.sendChan <- message.Receive{ Payload: msg.Message.Marshal(), + Sender: &id.ID{0}, RoundId: tnm.rid, } } diff --git a/go.mod b/go.mod index e4762cd35b89dc5ca2a563a8bcaac06f38f08d55..a0f5409a9057faaac33ef1c6855c1c256b1269b0 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 gitlab.com/elixxir/comms v0.0.4-0.20220104174855-044783c5c1e6 gitlab.com/elixxir/crypto v0.0.7-0.20220104174238-dbd761b30553 - gitlab.com/elixxir/ekv v0.1.5 + gitlab.com/elixxir/ekv v0.1.6 gitlab.com/elixxir/primitives v0.0.3-0.20220104173924-275cb9d7834f gitlab.com/xx_network/comms v0.0.4-0.20211227194445-c099754b3cda gitlab.com/xx_network/crypto v0.0.5-0.20211227194420-f311e8920467 diff --git a/go.sum b/go.sum index f331b2abbba23eafdf7369c14a046d90de1dada3..2b6c2766a664eb94c9e04c459af8aa099c8cdd77 100644 --- a/go.sum +++ b/go.sum @@ -279,6 +279,8 @@ gitlab.com/elixxir/crypto v0.0.7-0.20220104174238-dbd761b30553 h1:BPwepGZspxgiY4 gitlab.com/elixxir/crypto v0.0.7-0.20220104174238-dbd761b30553/go.mod h1:fexaw14nwGMlT6vL9eIJ1ixgiomyAp88hSHl0Yx0/xU= gitlab.com/elixxir/ekv v0.1.5 h1:R8M1PA5zRU1HVnTyrtwybdABh7gUJSCvt1JZwUSeTzk= gitlab.com/elixxir/ekv v0.1.5/go.mod h1:e6WPUt97taFZe5PFLPb1Dupk7tqmDCTQu1kkstqJvw4= +gitlab.com/elixxir/ekv v0.1.6 h1:M2hUSNhH/ChxDd+s8xBqSEKgoPtmE6hOEBqQ73KbN6A= +gitlab.com/elixxir/ekv v0.1.6/go.mod h1:e6WPUt97taFZe5PFLPb1Dupk7tqmDCTQu1kkstqJvw4= gitlab.com/elixxir/primitives v0.0.0-20200731184040-494269b53b4d/go.mod h1:OQgUZq7SjnE0b+8+iIAT2eqQF+2IFHn73tOo+aV11mg= gitlab.com/elixxir/primitives v0.0.0-20200804170709-a1896d262cd9/go.mod h1:p0VelQda72OzoUckr1O+vPW0AiFe0nyKQ6gYcmFSuF8= gitlab.com/elixxir/primitives v0.0.0-20200804182913-788f47bded40/go.mod h1:tzdFFvb1ESmuTCOl1z6+yf6oAICDxH2NPUemVgoNLxc= diff --git a/storage/fileTransfer/sentFileTransfers.go b/storage/fileTransfer/sentFileTransfers.go index 485eb67d062eca8ccb75f9b53bda262d64996eb3..e672ce184fa4674a47b94c89bb47abdc2b420eb8 100644 --- a/storage/fileTransfer/sentFileTransfers.go +++ b/storage/fileTransfer/sentFileTransfers.go @@ -193,11 +193,6 @@ func (sft *SentFileTransfersStore) GetUnsentPartsAndSentRounds() ( sentRounds := map[id.Round][]ftCrypto.TransferID{} for tid, st := range sft.transfers { - // Get list of round IDs that transfers have in-progress rounds on - for _, rid := range st.GetSentRounds() { - sentRounds[rid] = append(sentRounds[rid], tid) - } - // Get list of unsent part numbers for each transfer stUnsentParts, err := st.GetUnsentPartNums() if err != nil { @@ -206,6 +201,11 @@ func (sft *SentFileTransfersStore) GetUnsentPartsAndSentRounds() ( if len(stUnsentParts) > 0 { unsentParts[tid] = stUnsentParts } + + // Get list of round IDs that transfers have in-progress rounds on + for _, rid := range st.GetSentRounds() { + sentRounds[rid] = append(sentRounds[rid], tid) + } } return unsentParts, sentRounds, nil diff --git a/storage/fileTransfer/sentPartTracker_test.go b/storage/fileTransfer/sentPartTracker_test.go index 8d5a476ed542563b7524e9f3da9d668680f9ff22..af50953b889415c3c11bf4969199ee6d318408fb 100644 --- a/storage/fileTransfer/sentPartTracker_test.go +++ b/storage/fileTransfer/sentPartTracker_test.go @@ -56,21 +56,20 @@ func Test_sentPartTracker_GetPartStatus(t *testing.T) { switch partStatuses[partNum] { case interfaces.FpSent: - err := st.partStats.Set(partNum, uint8(interfaces.FpSent)) + err := st.partStats.Set(partNum, inProgress) if err != nil { - t.Errorf("Failed to set part %d to %s: %+v", - partNum, interfaces.FpSent, err) + t.Errorf( + "Failed to set part %d to in-progress: %+v", partNum, err) } case interfaces.FpArrived: - err := st.partStats.Set(partNum, uint8(interfaces.FpSent)) + err := st.partStats.Set(partNum, inProgress) if err != nil { - t.Errorf("Failed to set part %d to %s: %+v", - partNum, interfaces.FpSent, err) + t.Errorf( + "Failed to set part %d to in-progress: %+v", partNum, err) } - err = st.partStats.Set(partNum, uint8(interfaces.FpArrived)) + err = st.partStats.Set(partNum, finished) if err != nil { - t.Errorf("Failed to set part %d to %s: %+v", - partNum, interfaces.FpArrived, err) + t.Errorf("Failed to set part %d to finished: %+v", partNum, err) } } } diff --git a/storage/fileTransfer/sentTransfer.go b/storage/fileTransfer/sentTransfer.go index a91a59847fee21466f0bff6a80a49ad01b67c82d..35b8a450f738d78c8aaef199d858d7e1f21abaf3 100644 --- a/storage/fileTransfer/sentTransfer.go +++ b/storage/fileTransfer/sentTransfer.go @@ -91,6 +91,14 @@ const ( // been used. var MaxRetriesErr = errors.New(maxRetriesErr) +// States for parts in the partStats MultiStateVector. +const ( + unsent = iota + inProgress + finished + numStates // The number of part states (for initialisation of the vector) +) + // sentTransferStateMap prevents illegal state changes for part statuses. var sentTransferStateMap = [][]bool{ {false, true, false}, @@ -188,8 +196,8 @@ func NewSentTransfer(recipient *id.ID, tid ftCrypto.TransferID, } // Create new MultiStateVector for storing part statuses - st.partStats, err = utility.NewMultiStateVector( - st.numParts, 3, sentTransferStateMap, sentPartStatsVectorKey, st.kv) + st.partStats, err = utility.NewMultiStateVector(st.numParts, numStates, + sentTransferStateMap, sentPartStatsVectorKey, st.kv) if err != nil { return nil, errors.Errorf(newSentPartStatusVectorErr, err) } @@ -236,8 +244,8 @@ func (st *SentTransfer) ReInit(numFps uint16, } // Overwrite new part status MultiStateVector - st.partStats, err = utility.NewMultiStateVector( - st.numParts, 3, sentTransferStateMap, sentPartStatsVectorKey, st.kv) + st.partStats, err = utility.NewMultiStateVector(st.numParts, numStates, + sentTransferStateMap, sentPartStatsVectorKey, st.kv) if err != nil { return errors.Errorf(reInitSentPartStatusVectorErr, err) } @@ -314,7 +322,7 @@ func (st *SentTransfer) IsPartInProgress(partNum uint16) (bool, error) { if err != nil { return false, errors.Errorf(getStatusErr, partNum, err) } - return status == 1, nil + return status == inProgress, nil } // IsPartFinished returns true if the part has successfully arrived. Returns @@ -325,7 +333,7 @@ func (st *SentTransfer) IsPartFinished(partNum uint16) (bool, error) { if err != nil { return false, errors.Errorf(getStatusErr, partNum, err) } - return status == 2, nil + return status == finished, nil } // GetProgress returns the current progress of the transfer. Completed is true @@ -345,8 +353,8 @@ func (st *SentTransfer) GetProgress() (completed bool, sent, arrived, // getProgress is the thread-unsafe helper function for GetProgress. func (st *SentTransfer) getProgress() (completed bool, sent, arrived, total uint16, t interfaces.FilePartTracker) { - arrived, _ = st.partStats.GetCount(2) - sent, _ = st.partStats.GetCount(1) + arrived, _ = st.partStats.GetCount(finished) + sent, _ = st.partStats.GetCount(inProgress) total = st.numParts if sent == 0 && arrived == total { @@ -476,7 +484,7 @@ func (st *SentTransfer) SetInProgress(rid id.Round, partNums ...uint16) (bool, _, exists := st.inProgressTransfers.getPartNums(rid) // Set parts as in-progress in part status vector - err := st.partStats.SetMany(partNums, 1) + err := st.partStats.SetMany(partNums, inProgress) if err != nil { return false, err } @@ -503,8 +511,16 @@ func (st *SentTransfer) UnsetInProgress(rid id.Round) ([]uint16, error) { // Get the list of part numbers to be removed from list partNums, _ := st.inProgressTransfers.getPartNums(rid) + // The part status is set in partStats before the parts and round ID so that + // in the event of recovery after a crash, the parts will be resent on a new + // round and the parts in the inProgressTransfers will be left until deleted + // with the rest of the storage on transfer completion. The side effect is + // that on recovery, the status of the round will be looked up again and the + // progress callback will be called for an event that has already been + // called on the callback. + // Set parts as unsent in part status vector - err := st.partStats.SetMany(partNums, 0) + err := st.partStats.SetMany(partNums, unsent) if err != nil { return nil, err } @@ -539,7 +555,7 @@ func (st *SentTransfer) FinishTransfer(rid id.Round) (bool, error) { } // Set parts as finished in part status vector - err = st.partStats.SetMany(partNums, 2) + err = st.partStats.SetMany(partNums, finished) if err != nil { return false, err } @@ -561,7 +577,7 @@ func (st *SentTransfer) GetUnsentPartNums() ([]uint16, error) { defer st.mux.RUnlock() // Get list of parts with a status of unsent - unsentPartNums, err := st.partStats.GetKeys(0) + unsentPartNums, err := st.partStats.GetKeys(unsent) if err != nil { return nil, errors.Errorf(getUnsentPartsErr, err) } diff --git a/storage/fileTransfer/sentTransfer_test.go b/storage/fileTransfer/sentTransfer_test.go index 79415befe0632d48092d75f078969f28f86f5693..da835f58a6c16aa6bf2955584b441d710d0bb8d5 100644 --- a/storage/fileTransfer/sentTransfer_test.go +++ b/storage/fileTransfer/sentTransfer_test.go @@ -962,15 +962,15 @@ func TestSentTransfer_SetInProgress(t *testing.T) { // Check that the part numbers were set on the in-progress status vector for i, partNum := range expectedPartNums { - if status, _ := st.partStats.Get(partNum); status != 1 { - t.Errorf("Part number %d not marked as used in status vector (%d).", - partNum, i) + if status, _ := st.partStats.Get(partNum); status != inProgress { + t.Errorf("Part number %d not marked as in-progress in status "+ + "vector (%d).", partNum, i) } } // Check that the correct number of parts were marked as in-progress in the // status vector - count, _ := st.partStats.GetCount(1) + count, _ := st.partStats.GetCount(inProgress) if int(count) != len(expectedPartNums) { t.Errorf("Incorrect number of parts marked as in-progress."+ "\nexpected: %d\nreceived: %d", len(expectedPartNums), count) @@ -989,7 +989,7 @@ func TestSentTransfer_SetInProgress(t *testing.T) { } // Check that the number of parts were marked as in-progress is unchanged - count, _ = st.partStats.GetCount(1) + count, _ = st.partStats.GetCount(inProgress) if int(count) != len(expectedPartNums2)+len(expectedPartNums) { t.Errorf("Incorrect number of parts marked as in-progress."+ "\nexpected: %d\nreceived: %d", @@ -1075,10 +1075,10 @@ func TestSentTransfer_UnsetInProgress(t *testing.T) { } // Check that there are no set parts in the in-progress status vector - status, _ := st.partStats.Get(1) - if status != 0 { + status, _ := st.partStats.Get(inProgress) + if status != unsent { t.Errorf("Failed to unset all parts in the in-progress vector."+ - "\nexpected: %d\nreceived: %d", 0, status) + "\nexpected: %d\nreceived: %d", unsent, status) } } @@ -1134,7 +1134,7 @@ func TestSentTransfer_FinishTransfer(t *testing.T) { } // Check that there are no set parts in the in-progress status vector - count, _ := st.partStats.GetCount(1) + count, _ := st.partStats.GetCount(inProgress) if count != 0 { t.Errorf("Failed to unset all parts in the in-progress vector."+ "\nexpected: %d\nreceived: %d", 0, count) @@ -1142,17 +1142,16 @@ func TestSentTransfer_FinishTransfer(t *testing.T) { // Check that the part numbers were set on the finished status vector for i, partNum := range expectedPartNums { - - status, _ := st.partStats.Get(1) - if status != 2 { - t.Errorf("Part number %d not marked as used in status vector (%d).", - partNum, i) + status, _ := st.partStats.Get(inProgress) + if status != finished { + t.Errorf("Part number %d not marked as finished in status vector "+ + "(%d).", partNum, i) } } // Check that the correct number of parts were marked as finished in the // status vector - count, _ = st.partStats.GetCount(2) + count, _ = st.partStats.GetCount(finished) if int(count) != len(expectedPartNums) { t.Errorf("Incorrect number of parts marked as finished."+ "\nexpected: %d\nreceived: %d", len(expectedPartNums), count) diff --git a/storage/utility/multiStateVector.go b/storage/utility/multiStateVector.go index 0a882ada564d7044ae1a5b42e914b1b311692146..77cf24ad1435c6c83ce431a13080a4d9d27cae68 100644 --- a/storage/utility/multiStateVector.go +++ b/storage/utility/multiStateVector.go @@ -43,7 +43,7 @@ const ( saveSetStateErr = "failed to save MultiStateVector after setting key %d state to %d: %+v" // MultiStateVector.SetMany - setManyStateErr = "failed to set state of key %d (%d/%d): %+v" + setManyStateErr = "failed to set state of key %d (%d of %d): %+v" saveManySetStateErr = "failed to save MultiStateVector after setting keys %d state to %d: %+v" // MultiStateVector.set