diff --git a/cmd/fileTransfer.go b/cmd/fileTransfer.go index c259a6250e769808921694d0d828a0072107664b..6892a9cb9538673ee7351bcd0c74ed439d11bbf1 100644 --- a/cmd/fileTransfer.go +++ b/cmd/fileTransfer.go @@ -18,6 +18,7 @@ import ( "gitlab.com/elixxir/crypto/contact" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/utils" "io/ioutil" "time" @@ -62,7 +63,7 @@ var ftCmd = &cobra.Command{ numReg, total, err = client.GetNodeRegistrationStatus() if err != nil { - jww.FATAL.Panicf("Failed to get nodes registration status: %+v", + jww.FATAL.Panicf("Failed to get node registration status: %+v", err) } @@ -90,17 +91,13 @@ var ftCmd = &cobra.Command{ // Wait until either the file finishes sending or the file finishes // being received, stop the receiving thread, and exit - for done := false; !done; { - select { - case <-sendDone: - jww.INFO.Printf("[FT] Finished sending file. Stopping " + - "threads and network follower.") - done = true - case <-receiveDone: - jww.INFO.Printf("[FT] Finished receiving file. Stopping " + - "threads and network follower.") - done = true - } + select { + case <-sendDone: + jww.INFO.Printf("[FT] Finished sending file. Stopping threads " + + "and network follower.") + case <-receiveDone: + jww.INFO.Printf("[FT] Finished receiving file. Stopping threads " + + "and network follower.") } // Stop reception thread @@ -170,13 +167,13 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, recipientContactPath string, retry float32, m *ft.Manager, done chan struct{}) { - // get file from path + // Get file from path fileData, err := utils.ReadFile(filePath) if err != nil { jww.FATAL.Panicf("[FT] Failed to read file %q: %+v", filePath, err) } - // get file preview from path + // Get file preview from path filePreviewData := []byte(filePreviewString) if filePreviewPath != "" { filePreviewData, err = utils.ReadFile(filePreviewPath) @@ -192,7 +189,7 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, fileName = fileName[:ft.FileNameMaxLen] } - // get recipient contact from file + // Get recipient contact from file recipient := getContactFromFile(recipientContactPath) jww.INFO.Printf("[FT] Going to start sending file %q to %s {type: %q, "+ @@ -200,6 +197,8 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, fileName, recipient.ID, fileType, len(fileData), retry, filePath, filePreviewPath, filePreviewData) + var sendStart time.Time + // Create sent progress callback that prints the results progressCB := func(completed bool, sent, arrived, total uint16, t interfaces.FilePartTracker, err error) { @@ -214,14 +213,24 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, } if completed { + fileSize := len(fileData) + sendTime := netTime.Since(sendStart) + fileSizeKb := float32(fileSize) * .001 + speed := fileSizeKb * float32(time.Second) / (float32(sendTime)) + jww.INFO.Printf("[FT] Completed sending file %q in %s (%.2f kb @ %.2f kb/s).", + fileName, sendTime, fileSizeKb, speed) fmt.Printf("Completed sending file.\n") done <- struct{}{} } else if err != nil { + jww.ERROR.Printf("[FT] Failed sending file %q in %s: %+v", + fileName, netTime.Since(sendStart), err) fmt.Printf("Failed sending file: %+v\n", err) done <- struct{}{} } } + sendStart = netTime.Now() + // Send the file tid, err := m.Send(fileName, fileType, fileData, recipient.ID, retry, filePreviewData, progressCB, callbackPeriod) @@ -248,13 +257,14 @@ func receiveNewFileTransfers(receive chan receivedFtResults, done, "E2E message.") return case r := <-receive: + receiveStart := netTime.Now() jww.INFO.Printf("[FT] Received new file %q transfer %s of type "+ "%q from %s of size %d bytes with preview: %q", r.fileName, r.tid, r.fileType, r.sender, r.size, r.preview) fmt.Printf("Received new file transfer %q of size %d "+ "bytes with preview: %q\n", r.fileName, r.size, r.preview) - cb := newReceiveProgressCB(r.tid, done, m) + cb := newReceiveProgressCB(r.tid, r.fileName, done, receiveStart, m) err := m.RegisterReceivedProgressCallback(r.tid, cb, callbackPeriod) if err != nil { jww.FATAL.Panicf("[FT] Failed to register new receive "+ @@ -266,7 +276,8 @@ func receiveNewFileTransfers(receive chan receivedFtResults, done, // newReceiveProgressCB creates a new reception progress callback that prints // the results to the log. -func newReceiveProgressCB(tid ftCrypto.TransferID, done chan struct{}, +func newReceiveProgressCB(tid ftCrypto.TransferID, fileName string, + done chan struct{}, receiveStart time.Time, m *ft.Manager) interfaces.ReceivedProgressCallback { return func(completed bool, received, total uint16, t interfaces.FilePartTracker, err error) { @@ -286,9 +297,13 @@ func newReceiveProgressCB(tid ftCrypto.TransferID, done chan struct{}, jww.FATAL.Panicf( "[FT] Failed to receive file %s: %+v", tid, err) } + jww.INFO.Printf("[FT] Completed receiving file %q in %s.", + fileName, netTime.Since(receiveStart)) fmt.Printf("Completed receiving file:\n%s\n", receivedFile) done <- struct{}{} } else if err != nil { + jww.INFO.Printf("[FT] Failed receiving file %q in %s.", + fileName, netTime.Since(receiveStart)) fmt.Printf("Failed sending file: %+v\n", err) done <- struct{}{} } diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index 236881b0693d24eabf43f2684f36eda90641cc05..713e230c96a3076669fbd718115f62b97b65a8af 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -50,7 +50,7 @@ const ( sendQueueBuffLen = 10_000 // Size of the buffered channel that reports if the network is healthy - networkHealthBuffLen = 100 + networkHealthBuffLen = 10 ) // Error messages. @@ -280,7 +280,7 @@ func (m Manager) Send(fileName, fileType string, fileData []byte, } rng.Close() - // get the size of each file part + // Get the size of each file part partSize, err := m.getPartSize() if err != nil { return ftCrypto.TransferID{}, errors.Errorf(getPartSizeErr, err) @@ -329,7 +329,7 @@ func (m Manager) Send(fileName, fileType string, fileData []byte, // most once per period. func (m Manager) RegisterSentProgressCallback(tid ftCrypto.TransferID, progressCB interfaces.SentProgressCallback, period time.Duration) error { - // get the transfer for the given ID + // Get the transfer for the given ID transfer, err := m.sent.GetTransfer(tid) if err != nil { return err @@ -350,7 +350,7 @@ func (m Manager) RegisterSentProgressCallback(tid ftCrypto.TransferID, // - Can you reuse fingerprints? // - What to do if sendE2E fails? func (m Manager) Resend(tid ftCrypto.TransferID) error { - // get the transfer for the given ID + // Get the transfer for the given ID transfer, err := m.sent.GetTransfer(tid) if err != nil { return err @@ -369,7 +369,7 @@ func (m Manager) Resend(tid ftCrypto.TransferID) error { // storage once a transfer has completed or reached the retry limit. Returns an // error if the transfer has not run out of retries. func (m Manager) CloseSend(tid ftCrypto.TransferID) error { - // get the transfer for the given ID + // Get the transfer for the given ID st, err := m.sent.GetTransfer(tid) if err != nil { return err @@ -395,13 +395,13 @@ func (m Manager) CloseSend(tid ftCrypto.TransferID) error { // Returns an error if the transfer is not complete, the full file cannot be // verified, or if the transfer cannot be found. func (m Manager) Receive(tid ftCrypto.TransferID) ([]byte, error) { - // get the transfer for the given ID + // Get the transfer for the given ID rt, err := m.received.GetTransfer(tid) if err != nil { return nil, err } - // get the file from the transfer + // Get the file from the transfer file, err := rt.GetFile() if err != nil { return nil, err @@ -421,7 +421,7 @@ func (m Manager) Receive(tid ftCrypto.TransferID) ([]byte, error) { // updates, at most once per period. func (m Manager) RegisterReceivedProgressCallback(tid ftCrypto.TransferID, progressCB interfaces.ReceivedProgressCallback, period time.Duration) error { - // get the transfer for the given ID + // Get the transfer for the given ID transfer, err := m.received.GetTransfer(tid) if err != nil { return err diff --git a/fileTransfer/manager_test.go b/fileTransfer/manager_test.go index a4b06b61db081fd98e8ee7b8b643f42f49e73e7c..63952f154487962fc6e504c56d58f6d0d00cda02 100644 --- a/fileTransfer/manager_test.go +++ b/fileTransfer/manager_test.go @@ -125,7 +125,7 @@ func TestManager_Send(t *testing.T) { } //// - // get NewFileTransfer E2E message + // Get NewFileTransfer E2E message //// sendMsg := m.net.(*testNetworkManager).GetE2eMsg(0) if sendMsg.MessageType != message.NewFileTransfer { diff --git a/fileTransfer/oldTransferRecovery.go b/fileTransfer/oldTransferRecovery.go index 9bcb95f74719bd721a1c4b7e3591c237ac5f3304..f47e9ba0d004272b8605cf6b5a23332df9e3c67d 100644 --- a/fileTransfer/oldTransferRecovery.go +++ b/fileTransfer/oldTransferRecovery.go @@ -38,10 +38,10 @@ func (m Manager) oldTransferRecovery(healthyChan chan bool, chanID uint64) { return } - // get list of unsent parts and rounds that parts were sent on + // Get list of unsent parts and rounds that parts were sent on unsentParts, sentRounds, err := m.sent.GetUnsentPartsAndSentRounds() - jww.DEBUG.Printf("Adding unsent parts from %d recovered transfers: %v", + jww.DEBUG.Printf("[FT] Adding unsent parts from %d recovered transfers: %v", len(unsentParts), unsentParts) // Add all unsent parts to the queue diff --git a/fileTransfer/oldTransferRecovery_test.go b/fileTransfer/oldTransferRecovery_test.go index 252df434401beb2a524b5541c39f0011e080d962..341b39542ae7540282669a1cb293ca763cb8e92f 100644 --- a/fileTransfer/oldTransferRecovery_test.go +++ b/fileTransfer/oldTransferRecovery_test.go @@ -264,7 +264,7 @@ func TestManager_updateSentRounds(t *testing.T) { healthyRecover := make(chan bool, networkHealthBuffLen) healthyRecover <- true - // get list of rounds that parts were sent on + // Get list of rounds that parts were sent on _, loadedSentRounds, _ := m.sent.GetUnsentPartsAndSentRounds() err = loadedManager.updateSentRounds(healthyRecover, loadedSentRounds) diff --git a/fileTransfer/receive.go b/fileTransfer/receive.go index 3552fdf5635a819f66cbecfb9945ca01ab529846..c122541ee778c2a7943af2ecb69268f1f94ca7ab 100644 --- a/fileTransfer/receive.go +++ b/fileTransfer/receive.go @@ -15,12 +15,6 @@ import ( "strings" ) -// Error messages. -const ( - // Manager.readMessage - unmarshalPartMessageErr = "failed to unmarshal cMix message contents into file part message: %+v" -) - // receive runs a loop that receives file message parts and stores them in their // appropriate transfer. func (m *Manager) receive(rawMsgs chan message.Receive, stop *stoppable.Single) { @@ -40,9 +34,9 @@ func (m *Manager) receive(rawMsgs chan message.Receive, stop *stoppable.Single) // which means this message is not of the correct type and will // be ignored if strings.Contains(err.Error(), "fingerprint") { - jww.TRACE.Printf("[FT] %+v", err) + jww.TRACE.Printf("[FT] %v", err) } else { - jww.WARN.Printf("[FT] %+v", err) + jww.WARN.Printf("[FT] %v", err) } continue } diff --git a/fileTransfer/receiveNew.go b/fileTransfer/receiveNew.go index e35539d1fa490e67306311104cdad0da469acfb0..67f3ce3e44570f4f6647c69b7ba098f965b19c62 100644 --- a/fileTransfer/receiveNew.go +++ b/fileTransfer/receiveNew.go @@ -83,7 +83,7 @@ func (m *Manager) readNewFileTransferMessage(msg message.Receive) ( return } - // get RNG from stream + // Get RNG from stream rng := m.rng.GetStream() defer rng.Close() diff --git a/fileTransfer/receive_test.go b/fileTransfer/receive_test.go index 31ac0d40913bb7ddfc08eaca34b51bcd1eeed466..cc28fc24f1b910128ceba15d1486b1ef9e120966 100644 --- a/fileTransfer/receive_test.go +++ b/fileTransfer/receive_test.go @@ -332,7 +332,7 @@ func TestManager_readMessage(t *testing.T) { <-done1 - // get the file and check that the part was added to it + // Get the file and check that the part was added to it fileData, err := rt.GetFile() if err == nil { t.Error("GetFile did not return an error when parts are missing.") diff --git a/fileTransfer/send.go b/fileTransfer/send.go index 27c81207399056999af73731532d29389c816102..b42c16d13cb3c589a3b8dc620878e206aa73b65e 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -51,7 +51,8 @@ const ( // Manager.sendEndE2eMessage endE2eGetPartnerErr = "failed to get file transfer partner %s: %+v" - endE2eSendErr = "failed to send end file transfer message: %+v" + endE2eHealthTimeout = "waiting for network to become healthy timed out after %s." + endE2eSendErr = "failed to send end file transfer message via E2E to recipient %s: %+v" // getRandomNumParts getRandomNumPartsRandPanic = "[FT] Failed to generate random number of file parts to send: %+v" @@ -64,8 +65,15 @@ const ( // Duration to wait for send batch to fill before sending partial batch. pollSleepDuration = 100 * time.Millisecond - // Age when rounds that files were sent from are deleted from the tracker + // Age when rounds that files were sent from are deleted from the tracker. clearSentRoundsAge = 10 * time.Second + + // Duration to wait for network to become healthy to send end E2E message + // before timing out. + sendEndE2eHealthTimeout = 5 * time.Second + + // Tag that prints with cMix sending logs. + cMixDebugTag = "FT.Part" ) // sendThread waits on the sendQueue channel for parts to send. Once its @@ -108,20 +116,27 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, timer = time.NewTimer(pollSleepDuration) select { case <-stop.Quit(): + timer.Stop() + // Close the thread when the stoppable is triggered m.closeSendThread(partList, stop, healthChanID) return case healthy := <-healthChan: + var wasNotHealthy bool // If the network is unhealthy, wait until it becomes healthy if !healthy { jww.TRACE.Print("[FT] Suspending file part sending thread: " + "network is unhealthy.") + wasNotHealthy = true } for !healthy { healthy = <-healthChan } - jww.TRACE.Print("[FT] File part sending thread: network is healthy.") + if wasNotHealthy { + jww.TRACE.Print("[FT] File part sending thread: " + + "network is healthy.") + } case part := <-m.sendQueue: // When a part is received from the queue, add it to the list of // parts to be sent @@ -141,17 +156,15 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, quit := m.handleSend( &partList, &lastSend, delay, stop, healthChanID, sentRounds) if quit { + timer.Stop() return } - } else { - timer = time.NewTimer(pollSleepDuration) } case <-timer.C: // If the timeout is reached, send an incomplete batch // Skip if there are no parts to send if len(partList) == 0 { - timer = time.NewTimer(pollSleepDuration) continue } @@ -196,23 +209,33 @@ func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, // the bandwidth is limited to the maximum throughput if netTime.Since(*lastSend) < delay { waitingTime := delay - netTime.Since(*lastSend) - jww.TRACE.Printf("[FT] Suspending file part sending: "+ - "bandwidth limit reached; waiting %s to send.", waitingTime) + jww.TRACE.Printf("[FT] Suspending file part sending (%d parts): "+ + "bandwidth limit reached; waiting %s to send.", + len(*partList), waitingTime) + + waitingTimer := time.NewTimer(waitingTime) select { case <-stop.Quit(): + waitingTimer.Stop() + // Close the thread when the stoppable is triggered m.closeSendThread(*partList, stop, healthChanID) return true - case <-time.NewTimer(delay - netTime.Since(*lastSend)).C: + case <-waitingTimer.C: + jww.TRACE.Printf("[FT] Resuming file part sending (%d parts) "+ + "after waiting %s for bandwidth limiting.", + len(*partList), waitingTime) } } // Send all the messages - err := m.sendParts(*partList, sentRounds) - if err != nil { - jww.ERROR.Print(err) - } + go func(partList []queuedPart, sentRounds *sentRoundTracker) { + err := m.sendParts(partList, sentRounds) + if err != nil { + jww.ERROR.Print(err) + } + }(copyPartList(*partList), sentRounds) // Update the timestamp of the send *lastSend = netTime.Now() @@ -223,6 +246,13 @@ func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, return false } +// copyPartList makes a copy of the list of queuedPart. +func copyPartList(partList []queuedPart) []queuedPart { + newPartList := make([]queuedPart, len(partList)) + copy(newPartList, partList) + return newPartList +} + // sendParts handles the composing and sending of a cMix message for each part // in the list. All errors returned are fatal errors. func (m *Manager) sendParts(partList []queuedPart, @@ -247,7 +277,10 @@ func (m *Manager) sendParts(partList []queuedPart, p := params.GetDefaultCMIX() p.SendTimeout = m.p.SendTimeout p.ExcludedRounds = sentRounds - p.DebugTag = "ft.Part" + p.DebugTag = cMixDebugTag + + jww.TRACE.Printf("[FT] Sending %d file parts via SendManyCMIX with "+ + "parameters %+v", len(messages), p) // Send parts rid, _, err := m.net.SendManyCMIX(messages, p) @@ -365,7 +398,7 @@ func (m *Manager) newCmixMessage(transfer *ftStorage.SentTransfer, // Create new empty cMix message cmixMsg := format.NewMessage(m.store.Cmix().GetGroup().GetP().ByteLen()) - // get encrypted file part, file part MAC, nonce (nonce), and fingerprint + // Get encrypted file part, file part MAC, nonce (nonce), and fingerprint encPart, mac, fp, err := transfer.GetEncryptedPart(partNum, cmixMsg.ContentsSize()) if err != nil { return format.Message{}, err @@ -407,6 +440,9 @@ func (m *Manager) makeRoundEventCallback( continue } + // Call progress callback after change in progress + st.CallProgressCB(nil) + // If the transfer is complete, send an E2E message to the // recipient informing them if completed { @@ -424,9 +460,6 @@ func (m *Manager) makeRoundEventCallback( } }(tid, st.GetRecipient()) } - - // Call progress callback after change in progress - st.CallProgressCB(nil) } } else { @@ -463,7 +496,7 @@ func (m *Manager) makeRoundEventCallback( // sendEndE2eMessage sends an E2E message to the recipient once the transfer // complete information them that all file parts have been sent. func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { - // get the partner + // Get the partner partner, err := m.store.E2e().GetPartner(recipient) if err != nil { return errors.Errorf(endE2eGetPartnerErr, recipient, err) @@ -478,15 +511,31 @@ func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { // Send the message under file transfer preimage e2eParams := params.GetDefaultE2E() e2eParams.IdentityPreimage = partner.GetFileTransferPreimage() - e2eParams.DebugTag = "ft.End" + e2eParams.DebugTag = "FT.End" // Store the message in the critical messages buffer first to ensure it is // present if the send fails m.store.GetCriticalMessages().AddProcessing(sendMsg, e2eParams) + // Register health channel and wait for network to become healthy + healthChan := make(chan bool, networkHealthBuffLen) + healthChanID := m.net.GetHealthTracker().AddChannel(healthChan) + defer m.net.GetHealthTracker().RemoveChannel(healthChanID) + isHealthy := m.net.GetHealthTracker().IsHealthy() + healthCheckTimer := time.NewTimer(sendEndE2eHealthTimeout) + for !isHealthy { + select { + case isHealthy = <-healthChan: + case <-healthCheckTimer.C: + return errors.Errorf(endE2eHealthTimeout, sendEndE2eHealthTimeout) + } + } + healthCheckTimer.Stop() + + // Send E2E message rounds, e2eMsgID, _, err := m.net.SendE2E(sendMsg, e2eParams, nil) if err != nil { - return errors.Errorf(endE2eSendErr, err) + return errors.Errorf(endE2eSendErr, recipient, err) } // Register the event for all rounds diff --git a/fileTransfer/sendNew.go b/fileTransfer/sendNew.go index 4dce71d6e3b900d5584c8c940f240f626d2cbb0c..91c979dc806324d5a4077a902605c239239c9926 100644 --- a/fileTransfer/sendNew.go +++ b/fileTransfer/sendNew.go @@ -34,7 +34,7 @@ func (m *Manager) sendNewFileTransfer(recipient *id.ID, fileName, return errors.Errorf(newFtProtoMarshalErr, err) } - // get partner relationship so that the silent preimage can be generated + // Get partner relationship so that the silent preimage can be generated relationship, err := m.store.E2e().GetPartner(recipient) if err != nil { return err @@ -43,11 +43,11 @@ func (m *Manager) sendNewFileTransfer(recipient *id.ID, fileName, // Sends as a silent message to avoid a notification p := params.GetDefaultE2E() p.CMIX.IdentityPreimage = relationship.GetSilentPreimage() - p.DebugTag = "ft.New" + p.DebugTag = "FT.New" // Send E2E message - rounds, _, _, err := m.net.SendE2E(sendMsg, p, nil) - if err != nil && len(rounds) == 0 { + _, _, _, err = m.net.SendE2E(sendMsg, p, nil) + if err != nil { return errors.Errorf(newFtSendE2eErr, recipient, err) } diff --git a/fileTransfer/send_test.go b/fileTransfer/send_test.go index 813c50fa83bc0de0bc239fee4ff75e6a40287f2b..d090d4dca6aca58f5fae3a3b6fbfa10d42cf3b2a 100644 --- a/fileTransfer/send_test.go +++ b/fileTransfer/send_test.go @@ -888,13 +888,11 @@ func TestManager_sendEndE2eMessage(t *testing.T) { p := params.GetDefaultE2ESessionParams() rng := csprng.NewSystemRNG() - _, mySidhPriv := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhA, - rng) - theirSidhPub, _ := util.GenerateSIDHKeyPair( - sidh.KeyVariantSidhB, rng) + _, mySidhPriv := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhA, rng) + theirSidhPub, _ := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhB, rng) - err := m.store.E2e().AddPartner(recipient, pubKey, dhKey, mySidhPriv, - theirSidhPub, p, p) + err := m.store.E2e().AddPartner( + recipient, pubKey, dhKey, mySidhPriv, theirSidhPub, p, p) if err != nil { t.Errorf("Failed to add partner %s: %+v", recipient, err) } @@ -1009,7 +1007,7 @@ func TestManager_getPartSize(t *testing.T) { filePartMsgUsedLen := ftStorage.FmMinSize expected := 2*primeByteLen - cmixMsgUsedLen - filePartMsgUsedLen - 1 - // get the part size + // Get the part size partSize, err := m.getPartSize() if err != nil { t.Errorf("GetPartSize returned an error: %+v", err) diff --git a/fileTransfer/sentRoundTracker.go b/fileTransfer/sentRoundTracker.go index 80fa1da210befa6d3cc7cec698d197259a903ecb..75a06eb13e593f205b470b30f2dadc76a1c2c869 100644 --- a/fileTransfer/sentRoundTracker.go +++ b/fileTransfer/sentRoundTracker.go @@ -59,13 +59,20 @@ func (srt *sentRoundTracker) Has(rid id.Round) bool { return exists } -// Insert adds the round to the tracker with the current time. -func (srt *sentRoundTracker) Insert(rid id.Round) { +// Insert adds the round to the tracker with the current time. Returns true if +// the round was added. +func (srt *sentRoundTracker) Insert(rid id.Round) bool { timeNow := netTime.Now() srt.mux.Lock() defer srt.mux.Unlock() + _, exists := srt.rounds[rid] + if exists { + return false + } + srt.rounds[rid] = timeNow + return true } // Remove deletes a round ID from the tracker. diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index 5e4271c13796e22d93aabd2f1f13836066f29df8..a982d3ba226565db9e4886132f164998560826a6 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -538,7 +538,7 @@ type testHealthTracker struct { } //////////////////////////////////////////////////////////////////////////////// -// Test health Tracker // +// Test Health Tracker // //////////////////////////////////////////////////////////////////////////////// func newTestHealthTracker() testHealthTracker { diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index 6624066ff872a71908c4a72235bc0078593bb0d2..4012868b0af841deb2219bef7eb6ae0ed116084b 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -35,9 +35,17 @@ import ( ) // List of errors that initiate a Host replacement -var errorsList = []string{context.DeadlineExceeded.Error(), "connection refused", "host disconnected", - "transport is closing", balancer.ErrTransientFailure.Error(), "Last try to connect", - ndf.NO_NDF, "Host is in cool down", grpc.ErrClientConnClosing.Error()} +var errorsList = []string{ + context.DeadlineExceeded.Error(), + "connection refused", + "host disconnected", + "transport is closing", + balancer.ErrTransientFailure.Error(), + "Last try to connect", + ndf.NO_NDF, + "Host is in cool down", + grpc.ErrClientConnClosing.Error(), +} // HostManager Interface allowing storage and retrieval of Host objects type HostManager interface { @@ -47,7 +55,7 @@ type HostManager interface { } // Filter filters out IDs from the provided map based on criteria in the NDF. -// The passed in map is a map of the NDF for easier acesss. The map is ID -> index in the NDF +// The passed in map is a map of the NDF for easier access. The map is ID -> index in the NDF // There is no multithreading, the filter function can either edit the passed map or make a new one // and return it. The general pattern is to loop through the map, then look up data about the nodes // in the ndf to make a filtering decision, then add them to a new map if they are accepted. @@ -100,8 +108,8 @@ func DefaultPoolParams() PoolParams { p.HostParams.EnableCoolOff = false p.HostParams.NumSendsBeforeCoolOff = 1 p.HostParams.CoolOffTimeout = 5 * time.Minute - p.HostParams.SendTimeout = 1 * time.Second - p.HostParams.PingTimeout = 1 * time.Second + p.HostParams.SendTimeout = 1000 * time.Millisecond + p.HostParams.PingTimeout = 1000 * time.Millisecond return p } @@ -190,8 +198,16 @@ func (h *HostPool) initialize(startIdx uint32) error { } // Randomly shuffle gateways in NDF - randomGateways := make([]ndf.Gateway, len(h.ndf.Gateways)) - copy(randomGateways, h.ndf.Gateways) + randomGateways := make([]ndf.Gateway, 0, len(h.ndf.Gateways)) + + // Filter out not active gateways + for i := 0; i < len(h.ndf.Gateways); i++ { + if h.ndf.Nodes[i].Status == ndf.Active { + randomGateways = append(randomGateways, h.ndf.Gateways[i]) + } + } + + // Randomize the gateway order var rndBytes [32]byte stream := h.rng.GetStream() _, err := stream.Read(rndBytes[:]) @@ -249,14 +265,15 @@ func (h *HostPool) initialize(startIdx uint32) error { } // Ping the Host latency and send the result - jww.DEBUG.Printf("Testing host %s...", gwId.String()) + jww.DEBUG.Printf("Testing host %s...", gwId) latency, _ := newHost.IsOnline() c <- gatewayDuration{gwId, latency} }() } // Collect ping results - timer := time.NewTimer(2 * h.poolParams.HostParams.PingTimeout) + pingTimeout := 2 * h.poolParams.HostParams.PingTimeout + timer := time.NewTimer(pingTimeout) innerLoop: for { select { @@ -265,16 +282,18 @@ func (h *HostPool) initialize(startIdx uint32) error { if gw.latency > 0 { resultList = append(resultList, gw) jww.DEBUG.Printf("Adding HostPool result %d/%d: %s: %d", - len(resultList), numGatewaysToTry, gw.id.String(), gw.latency) + len(resultList), numGatewaysToTry, gw.id, gw.latency) } // Break if we have all needed slots if uint32(len(resultList)) == numGatewaysToTry { exit = true + timer.Stop() break innerLoop } case <-timer.C: - jww.INFO.Printf("HostPool initialization timed out!") + jww.INFO.Printf("HostPool initialization timed out after %s.", + pingTimeout) break innerLoop } } @@ -491,9 +510,9 @@ func (h *HostPool) selectGateway() *id.ID { nodeId := gwId.DeepCopy() nodeId.SetType(id.Node) nodeNdfIdx := h.ndfMap[*nodeId] - isNodeStale := h.ndf.Nodes[nodeNdfIdx].Status == ndf.Stale - if isNodeStale { - jww.DEBUG.Printf("Ignoring stale nodes: %s", nodeId.String()) + isNodeIsNotActive := h.ndf.Nodes[nodeNdfIdx].Status != ndf.Active + if isNodeIsNotActive { + jww.DEBUG.Printf("Ignoring stale node: %s", nodeId) continue } @@ -556,13 +575,14 @@ func (h *HostPool) replaceHostNoStore(newId *id.ID, oldPoolIndex uint32) error { go func() { err := newHost.Connect() if err != nil { - jww.WARN.Printf("Unable to initialize Host connection: %+v", err) + jww.WARN.Printf("Unable to initialize Host connection to %s: "+ + "%+v", newId, err) } }() } - jww.DEBUG.Printf("Replaced Host at %d [%s] with new Host %s", oldPoolIndex, oldHostIDStr, - newId.String()) + jww.DEBUG.Printf("Replaced Host at %d [%s] with new Host %s", + oldPoolIndex, oldHostIDStr, newId) return nil } @@ -688,7 +708,8 @@ func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { select { case h.addGatewayChan <- ng: default: - jww.WARN.Printf("Unable to send AddGateway event for id %s", gwId.String()) + jww.WARN.Printf( + "Unable to send AddGateway event for id %s", gwId) } } else if host.GetAddress() != gw.Address { @@ -724,9 +745,9 @@ func readUint32(rng io.Reader) uint32 { // readRangeUint32 reduces an integer from 0, MaxUint32 to the range start, end func readRangeUint32(start, end uint32, rng io.Reader) uint32 { size := end - start - // note we could just do the part inside the () here, but then extra - // can == size which means a little range is wasted, either - // choice seems negligible, so we went with the "more correct" + // Note that we could just do the part inside the () here, but then extra + // can == size which means a little range is wasted; either choice seems + // negligible, so we went with the "more correct" extra := (math.MaxUint32%size + 1) % size limit := math.MaxUint32 - extra // Loop until we read something inside the limit diff --git a/network/message/pickup.go b/network/message/pickup.go index 49ebca38504d0207e5b4a7f53e0ef84e3db9e190..4e9195973f9bc575d265536866a713460e71173c 100644 --- a/network/message/pickup.go +++ b/network/message/pickup.go @@ -72,7 +72,8 @@ func NewPickup(param params.Network, kv *versioned.KV, events interfaces.EventMa for _, nodeId := range param.BlacklistedNodes { decodedId, err := base64.StdEncoding.DecodeString(nodeId) if err != nil { - jww.ERROR.Printf("Unable to decode blacklisted Node ID %s: %+v", decodedId, err) + jww.ERROR.Printf("Unable to decode blacklisted Node ID %s: %+v", + decodedId, err) continue } m.blacklistedNodes[string(decodedId)] = nil @@ -92,7 +93,7 @@ func (p *pickup) GetMessageReceptionChannel() chan<- Bundle { func (p *pickup) StartProcesses() stoppable.Stoppable { multi := stoppable.NewMulti("MessageReception") - // create the message handler workers + // Create the message handler workers for i := uint(0); i < p.param.MessageReceptionWorkerPoolSize; i++ { stop := stoppable.NewSingle( "MessageReception Worker " + strconv.Itoa(int(i))) @@ -100,7 +101,7 @@ func (p *pickup) StartProcesses() stoppable.Stoppable { multi.Add(stop) } - // create the in progress messages thread + // Create the in progress messages thread garbledStop := stoppable.NewSingle("GarbledMessages") go p.recheckInProgressRunner(garbledStop) multi.Add(garbledStop) diff --git a/network/sendCmix.go b/network/sendCmix.go index 96f0c6479d1068fa5d04714c6aebfd6b23cba946..28570dad1567acca0ec7f87a6eda74aebfadc934 100644 --- a/network/sendCmix.go +++ b/network/sendCmix.go @@ -115,9 +115,6 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, } jww.TRACE.Printf("[SendCMIX-%s] bestRound: %v", cmixParams.DebugTag, bestRound) - // add the round on to the list of attempted, so it is not tried again - attempted.Insert(bestRound.GetRoundId()) - // Determine whether the selected round contains any Nodes // that are blacklisted by the params.Network object containsBlacklisted := false @@ -176,7 +173,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, timeout = calculatedTimeout } - //send the message + // Send the message result, err := comms.SendPutMessage(host, wrappedMsg, timeout) jww.TRACE.Printf("[SendCMIX-%s]sendFunc %s putmsg", cmixParams.DebugTag, host) diff --git a/network/sendCmixUtils.go b/network/sendCmixUtils.go index 07a0c8884dd616e0182213298b8c6c0b675e9fb4..6b0a59d1992c54dd97849424083dfdf74150c6cb 100644 --- a/network/sendCmixUtils.go +++ b/network/sendCmixUtils.go @@ -249,10 +249,10 @@ func ephemeralIdListToString(idList []ephemeral.Id) string { } func calculateSendTimeout(best *pb.RoundInfo, max time.Duration) time.Duration { - RoundStartTime := time.Unix(0, + roundStartTime := time.Unix(0, int64(best.Timestamps[states.QUEUED])) // 250ms AFTER the round starts to hear the response. - timeout := RoundStartTime.Sub( + timeout := roundStartTime.Sub( netTime.Now().Add(250 * time.Millisecond)) if timeout > max { timeout = max diff --git a/network/sendManyCmix.go b/network/sendManyCmix.go index 9e03583b4bb0ecaadf7a2f6c7e73096af8ef7a0e..ea9a3cb5ca8fd0f8300854bb189df83e6084c11a 100644 --- a/network/sendManyCmix.go +++ b/network/sendManyCmix.go @@ -102,10 +102,6 @@ func sendManyCmixHelper(sender *gateway.Sender, continue } - // Add the round on to the list of attempted rounds so that it is not - // tried again - attempted.Insert(bestRound.GetRoundId()) - // Determine whether the selected round contains any nodes that are // blacklisted by the params.Network object containsBlacklisted := false @@ -207,8 +203,9 @@ func sendManyCmixHelper(sender *gateway.Sender, param.DebugTag, ephemeralIDsString, recipientString, bestRound.ID, err) jww.INFO.Printf("[SendManyCMIX-%s]error received, continuing: %v", param.DebugTag, err) continue + } else { + jww.INFO.Printf("[SendManyCMIX-%s]Error received: %v", param.DebugTag, err) } - jww.INFO.Printf("error received: %v", err) return 0, []ephemeral.Id{}, err } @@ -216,13 +213,13 @@ func sendManyCmixHelper(sender *gateway.Sender, gwSlotResp := result.(*pb.GatewaySlotResponse) if gwSlotResp.Accepted { m := fmt.Sprintf("[SendManyCMIX-%s]Successfully sent to EphIDs %s (sources: [%s]) "+ - "in round %d", param.DebugTag, ephemeralIDsString, recipientString, bestRound.ID) + "in round %d (msgDigest: %s)", param.DebugTag, ephemeralIDsString, recipientString, bestRound.ID, msgDigests) jww.INFO.Print(m) events.Report(1, "MessageSendMany", "Metric", m) return id.Round(bestRound.ID), ephemeralIDs, nil } else { - jww.FATAL.Panicf("Gateway %s returned no error, but failed to "+ - "accept message when sending to EphIDs [%s] (%s) on round %d", + jww.FATAL.Panicf("[SendManyCMIX-%s]Gateway %s returned no error, but failed to "+ + "accept message when sending to EphIDs [%s] (%s) on round %d", param.DebugTag, firstGateway, ephemeralIDsString, recipientString, bestRound.ID) } }