diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index e91303742ebc27ccb95810e004cd69b2d8449d62..b188bcc203fe5e286507a5f7e7de4cb34485a836 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -101,8 +101,8 @@ type Manager struct { sendQueue chan queuedPart // Indicates if old transfers saved to storage have been recovered after - // file transfer is closed and reopened - oldTransfersRecovered bool + // file transfer is closed and reopened; this is an atomic + oldTransfersRecovered *uint32 // File transfer parameters p Params @@ -159,14 +159,16 @@ func newManager(client *api.Client, store *storage.Session, } jww.DEBUG.Printf(""+ - "[FT] Created mew file transfer manager with params: %+v", p) + "[FT] Created new file transfer manager with params: %+v", p) + + oldTransfersRecovered := uint32(0) return &Manager{ receiveCB: receiveCB, sent: sent, received: received, sendQueue: make(chan queuedPart, sendQueueBuffLen), - oldTransfersRecovered: false, + oldTransfersRecovered: &oldTransfersRecovered, p: p, client: client, store: store, diff --git a/fileTransfer/oldTransferRecovery.go b/fileTransfer/oldTransferRecovery.go index 2957e1b156835be260c470910b8767d0653a8b4b..7fc4d41c61915d070717a3001c4154237cf0f81c 100644 --- a/fileTransfer/oldTransferRecovery.go +++ b/fileTransfer/oldTransferRecovery.go @@ -12,6 +12,7 @@ import ( jww "github.com/spf13/jwalterweatherman" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" "gitlab.com/xx_network/primitives/id" + "sync/atomic" ) // Error messages. @@ -29,7 +30,9 @@ const roundResultsMaxAttempts = 5 func (m Manager) oldTransferRecovery(healthyChan chan bool, chanID uint64) { // Exit if old transfers have already been recovered - if m.oldTransfersRecovered { + // TODO: move GetUnsentPartsAndSentRounds to manager creation and remove the + // atomic + if !atomic.CompareAndSwapUint32(m.oldTransfersRecovered, 0, 1) { jww.DEBUG.Printf("[FT] Old file transfer recovery thread not " + "starting: none to recover (app was not closed)") return @@ -38,6 +41,9 @@ func (m Manager) oldTransferRecovery(healthyChan chan bool, chanID uint64) { // Get list of unsent parts and rounds that parts were sent on unsentParts, sentRounds, err := m.sent.GetUnsentPartsAndSentRounds() + jww.DEBUG.Printf("Adding unsent parts from %d recovered transfers: %v", + len(unsentParts), unsentParts) + // Add all unsent parts to the queue for tid, partNums := range unsentParts { m.queueParts(tid, partNums) @@ -106,7 +112,9 @@ func (m Manager) updateSentRounds(healthyChan chan bool, roundList, err) } else { jww.INFO.Printf( - "[FT] Successfully recovered old file transfers.") + "[FT] Successfully recovered old file transfers: %v", + sentRounds) + return nil } getRoundResultsAttempts++ diff --git a/fileTransfer/send.go b/fileTransfer/send.go index dbab0994774046b8ce5fa62ab8030fbde4335fd8..d954811cd8495906b8c5519f8d07911f54113240 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -33,7 +33,7 @@ import ( const ( // Manager.sendParts sendManyCmixWarn = "[FT] Failed to send %d file parts %v via SendManyCMIX: %+v" - setInProgressErr = "[FT] Failed to set parts %v to in-progress for transfer %s" + setInProgressErr = "[FT] Failed to set parts %v to in-progress for transfer %s: %+v" getRoundResultsErr = "[FT] Failed to get round results for round %d for file transfers %v: %+v" // Manager.buildMessages @@ -47,7 +47,7 @@ const ( finishedEndE2eMsfErr = "[FT] Failed to send E2E message to %s on completion of file transfer %s: %+v" roundFailureWarn = "[FT] Failed to send file parts for file transfers %v on round %d: round %s" finishFailNoTransferErr = "[FT] Failed to requeue in-progress parts on failure of round %d for transfer %s: %+v" - unsetInProgressErr = "[FT] Failed to remove parts from in-progress list for transfer %s: round %s" + unsetInProgressErr = "[FT] Failed to remove parts from in-progress list for transfer %s: round %s: %+v" // Manager.sendEndE2eMessage endE2eGetPartnerErr = "failed to get file transfer partner %s: %+v" @@ -211,7 +211,7 @@ func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, // Send all the messages err := m.sendParts(*partList, sentRounds) if err != nil { - jww.FATAL.Panic(err) + jww.ERROR.Print(err) } // Update the timestamp of the send @@ -270,7 +270,7 @@ func (m *Manager) sendParts(partList []queuedPart, for tid, transfer := range transfers { exists, err := transfer.SetInProgress(rid, groupedParts[tid]...) if err != nil { - return errors.Errorf(setInProgressErr, groupedParts[tid], tid) + return errors.Errorf(setInProgressErr, groupedParts[tid], tid, err) } transfer.CallProgressCB(nil) @@ -460,7 +460,8 @@ func (m *Manager) makeRoundEventCallback( // Remove parts from in-progress list partsToResend, err := st.UnsetInProgress(rid) if err != nil { - jww.ERROR.Printf(unsetInProgressErr, tid, roundResult) + jww.ERROR.Printf( + unsetInProgressErr, tid, roundResult, err) } // Call progress callback after change in progress @@ -529,7 +530,7 @@ func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { // the session and the log m.store.GetCriticalMessages().Succeeded(sendMsg, e2eParams) jww.INFO.Printf("[FT] Sending of message %s informing %s that a transfer "+ - "ended successful.", e2eMsgID, recipient) + "completed successfully.", e2eMsgID, recipient) return nil } diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index 7738b4529052317bf77a952da2d0e5dea311249d..a982d3ba226565db9e4886132f164998560826a6 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -191,17 +191,20 @@ func newTestManager(sendErr bool, sendChan, sendE2eChan chan message.Receive, avgSendSize := avgNumMessages * (8192 / 8) p.MaxThroughput = int(time.Second) * avgSendSize + oldTransfersRecovered := uint32(0) + m := &Manager{ - receiveCB: receiveCB, - sent: sent, - received: received, - sendQueue: make(chan queuedPart, sendQueueBuffLen), - p: p, - store: storage.InitTestingSession(t), - swb: switchboard.New(), - net: net, - rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), - getRoundResults: rr, + receiveCB: receiveCB, + sent: sent, + received: received, + sendQueue: make(chan queuedPart, sendQueueBuffLen), + oldTransfersRecovered: &oldTransfersRecovered, + p: p, + store: storage.InitTestingSession(t), + swb: switchboard.New(), + net: net, + rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), + getRoundResults: rr, } return m @@ -495,6 +498,7 @@ func (tnm *testNetworkManager) SendManyCMIX(messages []message.TargetedCmixMessa for _, msg := range messages { tnm.sendChan <- message.Receive{ Payload: msg.Message.Marshal(), + Sender: &id.ID{0}, RoundId: tnm.rid, } } diff --git a/network/message/garbled.go b/network/message/garbled.go index 9388f4dcfb2c42f3f9c106cc949c893207451ced..537fedc042014ee8e2d686c2c978f00020e22b2f 100644 --- a/network/message/garbled.go +++ b/network/message/garbled.go @@ -57,65 +57,72 @@ func (m *Manager) handleGarbledMessages() { var failedMsgs []format.Message //try to decrypt every garbled message, excising those who's counts are too high for grbldMsg, count, timestamp, has := garbledMsgs.Next(); has; grbldMsg, count, timestamp, has = garbledMsgs.Next() { - fingerprint := grbldMsg.GetKeyFP() - // Check if the key is there, process it if it is - if key, isE2E := e2eKv.PopKey(fingerprint); isE2E { - jww.INFO.Printf("[GARBLE] Check E2E for %s, KEYFP: %s", - grbldMsg.Digest(), grbldMsg.GetKeyFP()) - // Decrypt encrypted message - msg, err := key.Decrypt(grbldMsg) - if err == nil { - // get the sender - sender := key.GetSession().GetPartner() - //remove from the buffer if decryption is successful - garbledMsgs.Remove(grbldMsg) + //if it exists, check against all in the list + modifiedContents := append([]byte{0}, grbldMsg.GetContents()...) + identity := m.Session.GetUser().GetContact().ID + _, forMe, _ := m.Session.GetEdge().Check(identity, grbldMsg.GetIdentityFP(), modifiedContents) + if forMe { + fingerprint := grbldMsg.GetKeyFP() + // Check if the key is there, process it if it is + if key, isE2E := e2eKv.PopKey(fingerprint); isE2E { + jww.INFO.Printf("[GARBLE] Check E2E for %s, KEYFP: %s", + grbldMsg.Digest(), grbldMsg.GetKeyFP()) + // Decrypt encrypted message + msg, err := key.Decrypt(grbldMsg) + if err == nil { + // get the sender + sender := key.GetSession().GetPartner() + //remove from the buffer if decryption is successful + garbledMsgs.Remove(grbldMsg) - jww.INFO.Printf("[GARBLE] message decoded as E2E from "+ - "%s, msgDigest: %s", sender, grbldMsg.Digest()) + jww.INFO.Printf("[GARBLE] message decoded as E2E from "+ + "%s, msgDigest: %s", sender, grbldMsg.Digest()) - //handle the successfully decrypted message - xxMsg, ok := m.partitioner.HandlePartition(sender, message.E2E, - msg.GetContents(), - key.GetSession().GetRelationshipFingerprint()) - if ok { - m.Switchboard.Speak(xxMsg) - continue - } - } - } else { - // todo: figure out how to get the ephermal reception id in here. - // we have the raw data, but do not know what address space was - // used int he round - // todo: figure out how to get the round id, the recipient id, and the round timestamp - /* - ephid, err := ephemeral.Marshal(garbledMsg.GetEphemeralRID()) - if err!=nil{ - jww.WARN.Printf("failed to get the ephemeral id for a garbled " + - "message, clearing the message: %+v", err) - garbledMsgs.Remove(garbledMsg) - continue + //handle the successfully decrypted message + xxMsg, ok := m.partitioner.HandlePartition(sender, message.E2E, + msg.GetContents(), + key.GetSession().GetRelationshipFingerprint()) + if ok { + m.Switchboard.Speak(xxMsg) + continue + } } + } else { + // todo: figure out how to get the ephermal reception id in here. + // we have the raw data, but do not know what address space was + // used int he round + // todo: figure out how to get the round id, the recipient id, and the round timestamp + /* + ephid, err := ephemeral.Marshal(garbledMsg.GetEphemeralRID()) + if err!=nil{ + jww.WARN.Printf("failed to get the ephemeral id for a garbled " + + "message, clearing the message: %+v", err) + garbledMsgs.Remove(garbledMsg) + continue + } - ephid.Clear(m.)*/ + ephid.Clear(m.)*/ - raw := message.Receive{ - Payload: grbldMsg.Marshal(), - MessageType: message.Raw, - Sender: &id.ID{}, - EphemeralID: ephemeral.Id{}, - Timestamp: time.Time{}, - Encryption: message.None, - RecipientID: &id.ID{}, - RoundId: 0, - RoundTimestamp: time.Time{}, + raw := message.Receive{ + Payload: grbldMsg.Marshal(), + MessageType: message.Raw, + Sender: &id.ID{}, + EphemeralID: ephemeral.Id{}, + Timestamp: time.Time{}, + Encryption: message.None, + RecipientID: &id.ID{}, + RoundId: 0, + RoundTimestamp: time.Time{}, + } + im := fmt.Sprintf("[GARBLE] RAW Message reprecessed: keyFP: %v, "+ + "msgDigest: %s", grbldMsg.GetKeyFP(), grbldMsg.Digest()) + jww.INFO.Print(im) + m.Internal.Events.Report(1, "MessageReception", "Garbled", im) + m.Session.GetGarbledMessages().Add(grbldMsg) + m.Switchboard.Speak(raw) } - im := fmt.Sprintf("[GARBLE] RAW Message reprecessed: keyFP: %v, "+ - "msgDigest: %s", grbldMsg.GetKeyFP(), grbldMsg.Digest()) - jww.INFO.Print(im) - m.Internal.Events.Report(1, "MessageReception", "Garbled", im) - m.Session.GetGarbledMessages().Add(grbldMsg) - m.Switchboard.Speak(raw) } + // fail the message if any part of the decryption fails, // unless it is the last attempts and has been in the buffer long // enough, in which case remove it diff --git a/network/message/garbled_test.go b/network/message/garbled_test.go index 8656eda570f80f0a7f9333cbaa6f82b6ea80594e..b32e53dd238ab5801b482395da7ab498c867068b 100644 --- a/network/message/garbled_test.go +++ b/network/message/garbled_test.go @@ -11,10 +11,12 @@ import ( "gitlab.com/elixxir/client/network/message/parse" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" + "gitlab.com/elixxir/client/storage/edge" util "gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/switchboard" "gitlab.com/elixxir/comms/client" "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/elixxir/crypto/fingerprint" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/crypto/csprng" @@ -104,6 +106,13 @@ func TestManager_CheckGarbledMessages(t *testing.T) { t.FailNow() } + preimage := edge.Preimage{ + Data: []byte{0}, + Type: "test", + Source: nil, + } + m.Session.GetEdge().Add(preimage, sess2.GetUser().ReceptionID) + err = sess2.E2e().AddPartner(sess1.GetUser().TransmissionID, sess1.E2e().GetDHPublicKey(), sess2.E2e().GetDHPrivateKey(), mySIDHPubKey, partnerSIDHPrivKey, @@ -143,6 +152,7 @@ func TestManager_CheckGarbledMessages(t *testing.T) { copy(fmp.Timestamp, ts) msg.SetContents(fmp.Bytes()) encryptedMsg := key.Encrypt(msg) + msg.SetIdentityFP(fingerprint.IdentityFP(append([]byte{0}, msg.GetContents()...), preimage.Data)) // TODO: back this out after network update i.Session.GetGarbledMessages().Add(encryptedMsg) stop := stoppable.NewSingle("stop") diff --git a/network/message/handler.go b/network/message/handler.go index ac8f9d35edde36cfca3475b9f897bb06c0906a65..a72dece83b97ddaf4c63acd561fdefc9b06477af 100644 --- a/network/message/handler.go +++ b/network/message/handler.go @@ -54,23 +54,27 @@ func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle, edge *edge var relationshipFingerprint []byte //if it exists, check against all in the list - has, forMe, _ := m.Session.GetEdge().Check(identity.Source, ecrMsg.GetIdentityFP(), ecrMsg.GetContents()) + modifiedContents := append([]byte{0}, ecrMsg.GetContents()...) + has, forMe, _ := m.Session.GetEdge().Check(identity.Source, ecrMsg.GetIdentityFP(), modifiedContents) if !has { jww.INFO.Printf("checking backup %v", preimage.MakeDefault(identity.Source)) //if it doesnt exist, check against the default fingerprint for the identity forMe = fingerprint2.CheckIdentityFP(ecrMsg.GetIdentityFP(), - ecrMsg.GetContents(), preimage.MakeDefault(identity.Source)) + modifiedContents, preimage.MakeDefault(identity.Source)) } if !forMe { if jww.GetLogThreshold() == jww.LevelTrace { - expectedFP := fingerprint2.IdentityFP(ecrMsg.GetContents(), + expectedFP := fingerprint2.IdentityFP(modifiedContents, preimage.MakeDefault(identity.Source)) jww.TRACE.Printf("Message for %d (%s) failed identity "+ "check: %v (expected-default) vs %v (received)", identity.EphId, identity.Source, expectedFP, ecrMsg.GetIdentityFP()) } - + im := fmt.Sprintf("Garbled/RAW Message: keyFP: %v, round: %d"+ + "msgDigest: %s, not determined to be for client", msg.GetKeyFP(), bundle.Round, msg.Digest()) + m.Internal.Events.Report(1, "MessageReception", "Garbled", im) + m.Session.GetGarbledMessages().Add(msg) return } diff --git a/network/message/sendCmixUtils.go b/network/message/sendCmixUtils.go index 3911fe0ddbaef888ce558361450c085aff0e2a81..b39acd4373a3b0392cd22cd2847cb1309519c745 100644 --- a/network/message/sendCmixUtils.go +++ b/network/message/sendCmixUtils.go @@ -154,7 +154,8 @@ func buildSlotMessage(msg format.Message, recipient *id.ID, target *id.ID, } // Set the identity fingerprint - ifp := fingerprint.IdentityFP(msg.GetContents(), preimage) + + ifp := fingerprint.IdentityFP(append([]byte{0}, msg.GetContents()...), preimage) msg.SetIdentityFP(ifp) diff --git a/storage/fileTransfer/sentFileTransfers.go b/storage/fileTransfer/sentFileTransfers.go index 485eb67d062eca8ccb75f9b53bda262d64996eb3..e672ce184fa4674a47b94c89bb47abdc2b420eb8 100644 --- a/storage/fileTransfer/sentFileTransfers.go +++ b/storage/fileTransfer/sentFileTransfers.go @@ -193,11 +193,6 @@ func (sft *SentFileTransfersStore) GetUnsentPartsAndSentRounds() ( sentRounds := map[id.Round][]ftCrypto.TransferID{} for tid, st := range sft.transfers { - // Get list of round IDs that transfers have in-progress rounds on - for _, rid := range st.GetSentRounds() { - sentRounds[rid] = append(sentRounds[rid], tid) - } - // Get list of unsent part numbers for each transfer stUnsentParts, err := st.GetUnsentPartNums() if err != nil { @@ -206,6 +201,11 @@ func (sft *SentFileTransfersStore) GetUnsentPartsAndSentRounds() ( if len(stUnsentParts) > 0 { unsentParts[tid] = stUnsentParts } + + // Get list of round IDs that transfers have in-progress rounds on + for _, rid := range st.GetSentRounds() { + sentRounds[rid] = append(sentRounds[rid], tid) + } } return unsentParts, sentRounds, nil diff --git a/storage/fileTransfer/sentPartTracker_test.go b/storage/fileTransfer/sentPartTracker_test.go index 8d5a476ed542563b7524e9f3da9d668680f9ff22..af50953b889415c3c11bf4969199ee6d318408fb 100644 --- a/storage/fileTransfer/sentPartTracker_test.go +++ b/storage/fileTransfer/sentPartTracker_test.go @@ -56,21 +56,20 @@ func Test_sentPartTracker_GetPartStatus(t *testing.T) { switch partStatuses[partNum] { case interfaces.FpSent: - err := st.partStats.Set(partNum, uint8(interfaces.FpSent)) + err := st.partStats.Set(partNum, inProgress) if err != nil { - t.Errorf("Failed to set part %d to %s: %+v", - partNum, interfaces.FpSent, err) + t.Errorf( + "Failed to set part %d to in-progress: %+v", partNum, err) } case interfaces.FpArrived: - err := st.partStats.Set(partNum, uint8(interfaces.FpSent)) + err := st.partStats.Set(partNum, inProgress) if err != nil { - t.Errorf("Failed to set part %d to %s: %+v", - partNum, interfaces.FpSent, err) + t.Errorf( + "Failed to set part %d to in-progress: %+v", partNum, err) } - err = st.partStats.Set(partNum, uint8(interfaces.FpArrived)) + err = st.partStats.Set(partNum, finished) if err != nil { - t.Errorf("Failed to set part %d to %s: %+v", - partNum, interfaces.FpArrived, err) + t.Errorf("Failed to set part %d to finished: %+v", partNum, err) } } } diff --git a/storage/fileTransfer/sentTransfer.go b/storage/fileTransfer/sentTransfer.go index a91a59847fee21466f0bff6a80a49ad01b67c82d..35b8a450f738d78c8aaef199d858d7e1f21abaf3 100644 --- a/storage/fileTransfer/sentTransfer.go +++ b/storage/fileTransfer/sentTransfer.go @@ -91,6 +91,14 @@ const ( // been used. var MaxRetriesErr = errors.New(maxRetriesErr) +// States for parts in the partStats MultiStateVector. +const ( + unsent = iota + inProgress + finished + numStates // The number of part states (for initialisation of the vector) +) + // sentTransferStateMap prevents illegal state changes for part statuses. var sentTransferStateMap = [][]bool{ {false, true, false}, @@ -188,8 +196,8 @@ func NewSentTransfer(recipient *id.ID, tid ftCrypto.TransferID, } // Create new MultiStateVector for storing part statuses - st.partStats, err = utility.NewMultiStateVector( - st.numParts, 3, sentTransferStateMap, sentPartStatsVectorKey, st.kv) + st.partStats, err = utility.NewMultiStateVector(st.numParts, numStates, + sentTransferStateMap, sentPartStatsVectorKey, st.kv) if err != nil { return nil, errors.Errorf(newSentPartStatusVectorErr, err) } @@ -236,8 +244,8 @@ func (st *SentTransfer) ReInit(numFps uint16, } // Overwrite new part status MultiStateVector - st.partStats, err = utility.NewMultiStateVector( - st.numParts, 3, sentTransferStateMap, sentPartStatsVectorKey, st.kv) + st.partStats, err = utility.NewMultiStateVector(st.numParts, numStates, + sentTransferStateMap, sentPartStatsVectorKey, st.kv) if err != nil { return errors.Errorf(reInitSentPartStatusVectorErr, err) } @@ -314,7 +322,7 @@ func (st *SentTransfer) IsPartInProgress(partNum uint16) (bool, error) { if err != nil { return false, errors.Errorf(getStatusErr, partNum, err) } - return status == 1, nil + return status == inProgress, nil } // IsPartFinished returns true if the part has successfully arrived. Returns @@ -325,7 +333,7 @@ func (st *SentTransfer) IsPartFinished(partNum uint16) (bool, error) { if err != nil { return false, errors.Errorf(getStatusErr, partNum, err) } - return status == 2, nil + return status == finished, nil } // GetProgress returns the current progress of the transfer. Completed is true @@ -345,8 +353,8 @@ func (st *SentTransfer) GetProgress() (completed bool, sent, arrived, // getProgress is the thread-unsafe helper function for GetProgress. func (st *SentTransfer) getProgress() (completed bool, sent, arrived, total uint16, t interfaces.FilePartTracker) { - arrived, _ = st.partStats.GetCount(2) - sent, _ = st.partStats.GetCount(1) + arrived, _ = st.partStats.GetCount(finished) + sent, _ = st.partStats.GetCount(inProgress) total = st.numParts if sent == 0 && arrived == total { @@ -476,7 +484,7 @@ func (st *SentTransfer) SetInProgress(rid id.Round, partNums ...uint16) (bool, _, exists := st.inProgressTransfers.getPartNums(rid) // Set parts as in-progress in part status vector - err := st.partStats.SetMany(partNums, 1) + err := st.partStats.SetMany(partNums, inProgress) if err != nil { return false, err } @@ -503,8 +511,16 @@ func (st *SentTransfer) UnsetInProgress(rid id.Round) ([]uint16, error) { // Get the list of part numbers to be removed from list partNums, _ := st.inProgressTransfers.getPartNums(rid) + // The part status is set in partStats before the parts and round ID so that + // in the event of recovery after a crash, the parts will be resent on a new + // round and the parts in the inProgressTransfers will be left until deleted + // with the rest of the storage on transfer completion. The side effect is + // that on recovery, the status of the round will be looked up again and the + // progress callback will be called for an event that has already been + // called on the callback. + // Set parts as unsent in part status vector - err := st.partStats.SetMany(partNums, 0) + err := st.partStats.SetMany(partNums, unsent) if err != nil { return nil, err } @@ -539,7 +555,7 @@ func (st *SentTransfer) FinishTransfer(rid id.Round) (bool, error) { } // Set parts as finished in part status vector - err = st.partStats.SetMany(partNums, 2) + err = st.partStats.SetMany(partNums, finished) if err != nil { return false, err } @@ -561,7 +577,7 @@ func (st *SentTransfer) GetUnsentPartNums() ([]uint16, error) { defer st.mux.RUnlock() // Get list of parts with a status of unsent - unsentPartNums, err := st.partStats.GetKeys(0) + unsentPartNums, err := st.partStats.GetKeys(unsent) if err != nil { return nil, errors.Errorf(getUnsentPartsErr, err) } diff --git a/storage/fileTransfer/sentTransfer_test.go b/storage/fileTransfer/sentTransfer_test.go index 79415befe0632d48092d75f078969f28f86f5693..da835f58a6c16aa6bf2955584b441d710d0bb8d5 100644 --- a/storage/fileTransfer/sentTransfer_test.go +++ b/storage/fileTransfer/sentTransfer_test.go @@ -962,15 +962,15 @@ func TestSentTransfer_SetInProgress(t *testing.T) { // Check that the part numbers were set on the in-progress status vector for i, partNum := range expectedPartNums { - if status, _ := st.partStats.Get(partNum); status != 1 { - t.Errorf("Part number %d not marked as used in status vector (%d).", - partNum, i) + if status, _ := st.partStats.Get(partNum); status != inProgress { + t.Errorf("Part number %d not marked as in-progress in status "+ + "vector (%d).", partNum, i) } } // Check that the correct number of parts were marked as in-progress in the // status vector - count, _ := st.partStats.GetCount(1) + count, _ := st.partStats.GetCount(inProgress) if int(count) != len(expectedPartNums) { t.Errorf("Incorrect number of parts marked as in-progress."+ "\nexpected: %d\nreceived: %d", len(expectedPartNums), count) @@ -989,7 +989,7 @@ func TestSentTransfer_SetInProgress(t *testing.T) { } // Check that the number of parts were marked as in-progress is unchanged - count, _ = st.partStats.GetCount(1) + count, _ = st.partStats.GetCount(inProgress) if int(count) != len(expectedPartNums2)+len(expectedPartNums) { t.Errorf("Incorrect number of parts marked as in-progress."+ "\nexpected: %d\nreceived: %d", @@ -1075,10 +1075,10 @@ func TestSentTransfer_UnsetInProgress(t *testing.T) { } // Check that there are no set parts in the in-progress status vector - status, _ := st.partStats.Get(1) - if status != 0 { + status, _ := st.partStats.Get(inProgress) + if status != unsent { t.Errorf("Failed to unset all parts in the in-progress vector."+ - "\nexpected: %d\nreceived: %d", 0, status) + "\nexpected: %d\nreceived: %d", unsent, status) } } @@ -1134,7 +1134,7 @@ func TestSentTransfer_FinishTransfer(t *testing.T) { } // Check that there are no set parts in the in-progress status vector - count, _ := st.partStats.GetCount(1) + count, _ := st.partStats.GetCount(inProgress) if count != 0 { t.Errorf("Failed to unset all parts in the in-progress vector."+ "\nexpected: %d\nreceived: %d", 0, count) @@ -1142,17 +1142,16 @@ func TestSentTransfer_FinishTransfer(t *testing.T) { // Check that the part numbers were set on the finished status vector for i, partNum := range expectedPartNums { - - status, _ := st.partStats.Get(1) - if status != 2 { - t.Errorf("Part number %d not marked as used in status vector (%d).", - partNum, i) + status, _ := st.partStats.Get(inProgress) + if status != finished { + t.Errorf("Part number %d not marked as finished in status vector "+ + "(%d).", partNum, i) } } // Check that the correct number of parts were marked as finished in the // status vector - count, _ = st.partStats.GetCount(2) + count, _ = st.partStats.GetCount(finished) if int(count) != len(expectedPartNums) { t.Errorf("Incorrect number of parts marked as finished."+ "\nexpected: %d\nreceived: %d", len(expectedPartNums), count) diff --git a/storage/utility/multiStateVector.go b/storage/utility/multiStateVector.go index 0a882ada564d7044ae1a5b42e914b1b311692146..77cf24ad1435c6c83ce431a13080a4d9d27cae68 100644 --- a/storage/utility/multiStateVector.go +++ b/storage/utility/multiStateVector.go @@ -43,7 +43,7 @@ const ( saveSetStateErr = "failed to save MultiStateVector after setting key %d state to %d: %+v" // MultiStateVector.SetMany - setManyStateErr = "failed to set state of key %d (%d/%d): %+v" + setManyStateErr = "failed to set state of key %d (%d of %d): %+v" saveManySetStateErr = "failed to save MultiStateVector after setting keys %d state to %d: %+v" // MultiStateVector.set