diff --git a/cmd/fileTransfer.go b/cmd/fileTransfer.go index 932abff2a8cc1a99574fb4128c6762ba6f84a535..6892a9cb9538673ee7351bcd0c74ed439d11bbf1 100644 --- a/cmd/fileTransfer.go +++ b/cmd/fileTransfer.go @@ -18,6 +18,7 @@ import ( "gitlab.com/elixxir/crypto/contact" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/utils" "io/ioutil" "time" @@ -90,17 +91,13 @@ var ftCmd = &cobra.Command{ // Wait until either the file finishes sending or the file finishes // being received, stop the receiving thread, and exit - for done := false; !done; { - select { - case <-sendDone: - jww.INFO.Printf("[FT] Finished sending file. Stopping " + - "threads and network follower.") - done = true - case <-receiveDone: - jww.INFO.Printf("[FT] Finished receiving file. Stopping " + - "threads and network follower.") - done = true - } + select { + case <-sendDone: + jww.INFO.Printf("[FT] Finished sending file. Stopping threads " + + "and network follower.") + case <-receiveDone: + jww.INFO.Printf("[FT] Finished receiving file. Stopping threads " + + "and network follower.") } // Stop reception thread @@ -200,6 +197,8 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, fileName, recipient.ID, fileType, len(fileData), retry, filePath, filePreviewPath, filePreviewData) + var sendStart time.Time + // Create sent progress callback that prints the results progressCB := func(completed bool, sent, arrived, total uint16, t interfaces.FilePartTracker, err error) { @@ -214,14 +213,24 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, } if completed { + fileSize := len(fileData) + sendTime := netTime.Since(sendStart) + fileSizeKb := float32(fileSize) * .001 + speed := fileSizeKb * float32(time.Second) / (float32(sendTime)) + jww.INFO.Printf("[FT] Completed sending file %q in %s (%.2f kb @ %.2f kb/s).", + fileName, sendTime, fileSizeKb, speed) fmt.Printf("Completed sending file.\n") done <- struct{}{} } else if err != nil { + jww.ERROR.Printf("[FT] Failed sending file %q in %s: %+v", + fileName, netTime.Since(sendStart), err) fmt.Printf("Failed sending file: %+v\n", err) done <- struct{}{} } } + sendStart = netTime.Now() + // Send the file tid, err := m.Send(fileName, fileType, fileData, recipient.ID, retry, filePreviewData, progressCB, callbackPeriod) @@ -248,13 +257,14 @@ func receiveNewFileTransfers(receive chan receivedFtResults, done, "E2E message.") return case r := <-receive: + receiveStart := netTime.Now() jww.INFO.Printf("[FT] Received new file %q transfer %s of type "+ "%q from %s of size %d bytes with preview: %q", r.fileName, r.tid, r.fileType, r.sender, r.size, r.preview) fmt.Printf("Received new file transfer %q of size %d "+ "bytes with preview: %q\n", r.fileName, r.size, r.preview) - cb := newReceiveProgressCB(r.tid, done, m) + cb := newReceiveProgressCB(r.tid, r.fileName, done, receiveStart, m) err := m.RegisterReceivedProgressCallback(r.tid, cb, callbackPeriod) if err != nil { jww.FATAL.Panicf("[FT] Failed to register new receive "+ @@ -266,7 +276,8 @@ func receiveNewFileTransfers(receive chan receivedFtResults, done, // newReceiveProgressCB creates a new reception progress callback that prints // the results to the log. -func newReceiveProgressCB(tid ftCrypto.TransferID, done chan struct{}, +func newReceiveProgressCB(tid ftCrypto.TransferID, fileName string, + done chan struct{}, receiveStart time.Time, m *ft.Manager) interfaces.ReceivedProgressCallback { return func(completed bool, received, total uint16, t interfaces.FilePartTracker, err error) { @@ -286,9 +297,13 @@ func newReceiveProgressCB(tid ftCrypto.TransferID, done chan struct{}, jww.FATAL.Panicf( "[FT] Failed to receive file %s: %+v", tid, err) } + jww.INFO.Printf("[FT] Completed receiving file %q in %s.", + fileName, netTime.Since(receiveStart)) fmt.Printf("Completed receiving file:\n%s\n", receivedFile) done <- struct{}{} } else if err != nil { + jww.INFO.Printf("[FT] Failed receiving file %q in %s.", + fileName, netTime.Since(receiveStart)) fmt.Printf("Failed sending file: %+v\n", err) done <- struct{}{} } diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index b188bcc203fe5e286507a5f7e7de4cb34485a836..713e230c96a3076669fbd718115f62b97b65a8af 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -50,7 +50,7 @@ const ( sendQueueBuffLen = 10_000 // Size of the buffered channel that reports if the network is healthy - networkHealthBuffLen = 100 + networkHealthBuffLen = 10 ) // Error messages. diff --git a/fileTransfer/oldTransferRecovery.go b/fileTransfer/oldTransferRecovery.go index 7fc4d41c61915d070717a3001c4154237cf0f81c..f47e9ba0d004272b8605cf6b5a23332df9e3c67d 100644 --- a/fileTransfer/oldTransferRecovery.go +++ b/fileTransfer/oldTransferRecovery.go @@ -41,7 +41,7 @@ func (m Manager) oldTransferRecovery(healthyChan chan bool, chanID uint64) { // Get list of unsent parts and rounds that parts were sent on unsentParts, sentRounds, err := m.sent.GetUnsentPartsAndSentRounds() - jww.DEBUG.Printf("Adding unsent parts from %d recovered transfers: %v", + jww.DEBUG.Printf("[FT] Adding unsent parts from %d recovered transfers: %v", len(unsentParts), unsentParts) // Add all unsent parts to the queue diff --git a/fileTransfer/receive.go b/fileTransfer/receive.go index 3552fdf5635a819f66cbecfb9945ca01ab529846..9c46cb257fe733ad1731874d4ca8f5db2ee60529 100644 --- a/fileTransfer/receive.go +++ b/fileTransfer/receive.go @@ -40,9 +40,9 @@ func (m *Manager) receive(rawMsgs chan message.Receive, stop *stoppable.Single) // which means this message is not of the correct type and will // be ignored if strings.Contains(err.Error(), "fingerprint") { - jww.TRACE.Printf("[FT] %+v", err) + jww.TRACE.Printf("[FT] %v", err) } else { - jww.WARN.Printf("[FT] %+v", err) + jww.WARN.Printf("[FT] %v", err) } continue } diff --git a/fileTransfer/send.go b/fileTransfer/send.go index 875e01890fba131b0acc1273b388158ea3c06698..b42c16d13cb3c589a3b8dc620878e206aa73b65e 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -51,7 +51,8 @@ const ( // Manager.sendEndE2eMessage endE2eGetPartnerErr = "failed to get file transfer partner %s: %+v" - endE2eSendErr = "failed to send end file transfer message: %+v" + endE2eHealthTimeout = "waiting for network to become healthy timed out after %s." + endE2eSendErr = "failed to send end file transfer message via E2E to recipient %s: %+v" // getRandomNumParts getRandomNumPartsRandPanic = "[FT] Failed to generate random number of file parts to send: %+v" @@ -64,8 +65,15 @@ const ( // Duration to wait for send batch to fill before sending partial batch. pollSleepDuration = 100 * time.Millisecond - // Age when rounds that files were sent from are deleted from the tracker + // Age when rounds that files were sent from are deleted from the tracker. clearSentRoundsAge = 10 * time.Second + + // Duration to wait for network to become healthy to send end E2E message + // before timing out. + sendEndE2eHealthTimeout = 5 * time.Second + + // Tag that prints with cMix sending logs. + cMixDebugTag = "FT.Part" ) // sendThread waits on the sendQueue channel for parts to send. Once its @@ -108,20 +116,27 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, timer = time.NewTimer(pollSleepDuration) select { case <-stop.Quit(): + timer.Stop() + // Close the thread when the stoppable is triggered m.closeSendThread(partList, stop, healthChanID) return case healthy := <-healthChan: + var wasNotHealthy bool // If the network is unhealthy, wait until it becomes healthy if !healthy { jww.TRACE.Print("[FT] Suspending file part sending thread: " + "network is unhealthy.") + wasNotHealthy = true } for !healthy { healthy = <-healthChan } - jww.TRACE.Print("[FT] File part sending thread: network is healthy.") + if wasNotHealthy { + jww.TRACE.Print("[FT] File part sending thread: " + + "network is healthy.") + } case part := <-m.sendQueue: // When a part is received from the queue, add it to the list of // parts to be sent @@ -141,17 +156,15 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, quit := m.handleSend( &partList, &lastSend, delay, stop, healthChanID, sentRounds) if quit { + timer.Stop() return } - } else { - timer = time.NewTimer(pollSleepDuration) } case <-timer.C: // If the timeout is reached, send an incomplete batch // Skip if there are no parts to send if len(partList) == 0 { - timer = time.NewTimer(pollSleepDuration) continue } @@ -196,23 +209,33 @@ func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, // the bandwidth is limited to the maximum throughput if netTime.Since(*lastSend) < delay { waitingTime := delay - netTime.Since(*lastSend) - jww.TRACE.Printf("[FT] Suspending file part sending: "+ - "bandwidth limit reached; waiting %s to send.", waitingTime) + jww.TRACE.Printf("[FT] Suspending file part sending (%d parts): "+ + "bandwidth limit reached; waiting %s to send.", + len(*partList), waitingTime) + + waitingTimer := time.NewTimer(waitingTime) select { case <-stop.Quit(): + waitingTimer.Stop() + // Close the thread when the stoppable is triggered m.closeSendThread(*partList, stop, healthChanID) return true - case <-time.NewTimer(delay - netTime.Since(*lastSend)).C: + case <-waitingTimer.C: + jww.TRACE.Printf("[FT] Resuming file part sending (%d parts) "+ + "after waiting %s for bandwidth limiting.", + len(*partList), waitingTime) } } // Send all the messages - err := m.sendParts(*partList, sentRounds) - if err != nil { - jww.ERROR.Print(err) - } + go func(partList []queuedPart, sentRounds *sentRoundTracker) { + err := m.sendParts(partList, sentRounds) + if err != nil { + jww.ERROR.Print(err) + } + }(copyPartList(*partList), sentRounds) // Update the timestamp of the send *lastSend = netTime.Now() @@ -223,6 +246,13 @@ func (m *Manager) handleSend(partList *[]queuedPart, lastSend *time.Time, return false } +// copyPartList makes a copy of the list of queuedPart. +func copyPartList(partList []queuedPart) []queuedPart { + newPartList := make([]queuedPart, len(partList)) + copy(newPartList, partList) + return newPartList +} + // sendParts handles the composing and sending of a cMix message for each part // in the list. All errors returned are fatal errors. func (m *Manager) sendParts(partList []queuedPart, @@ -247,7 +277,10 @@ func (m *Manager) sendParts(partList []queuedPart, p := params.GetDefaultCMIX() p.SendTimeout = m.p.SendTimeout p.ExcludedRounds = sentRounds - p.DebugTag = "ft.Part" + p.DebugTag = cMixDebugTag + + jww.TRACE.Printf("[FT] Sending %d file parts via SendManyCMIX with "+ + "parameters %+v", len(messages), p) // Send parts rid, _, err := m.net.SendManyCMIX(messages, p) @@ -407,6 +440,9 @@ func (m *Manager) makeRoundEventCallback( continue } + // Call progress callback after change in progress + st.CallProgressCB(nil) + // If the transfer is complete, send an E2E message to the // recipient informing them if completed { @@ -424,9 +460,6 @@ func (m *Manager) makeRoundEventCallback( } }(tid, st.GetRecipient()) } - - // Call progress callback after change in progress - st.CallProgressCB(nil) } } else { @@ -478,15 +511,31 @@ func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { // Send the message under file transfer preimage e2eParams := params.GetDefaultE2E() e2eParams.IdentityPreimage = partner.GetFileTransferPreimage() - e2eParams.DebugTag = "ft.End" + e2eParams.DebugTag = "FT.End" // Store the message in the critical messages buffer first to ensure it is // present if the send fails m.store.GetCriticalMessages().AddProcessing(sendMsg, e2eParams) + // Register health channel and wait for network to become healthy + healthChan := make(chan bool, networkHealthBuffLen) + healthChanID := m.net.GetHealthTracker().AddChannel(healthChan) + defer m.net.GetHealthTracker().RemoveChannel(healthChanID) + isHealthy := m.net.GetHealthTracker().IsHealthy() + healthCheckTimer := time.NewTimer(sendEndE2eHealthTimeout) + for !isHealthy { + select { + case isHealthy = <-healthChan: + case <-healthCheckTimer.C: + return errors.Errorf(endE2eHealthTimeout, sendEndE2eHealthTimeout) + } + } + healthCheckTimer.Stop() + + // Send E2E message rounds, e2eMsgID, _, err := m.net.SendE2E(sendMsg, e2eParams, nil) if err != nil { - return errors.Errorf(endE2eSendErr, err) + return errors.Errorf(endE2eSendErr, recipient, err) } // Register the event for all rounds diff --git a/fileTransfer/sendNew.go b/fileTransfer/sendNew.go index b28fe263958c3ff5b95455faee9f3623f44267b1..91c979dc806324d5a4077a902605c239239c9926 100644 --- a/fileTransfer/sendNew.go +++ b/fileTransfer/sendNew.go @@ -43,11 +43,11 @@ func (m *Manager) sendNewFileTransfer(recipient *id.ID, fileName, // Sends as a silent message to avoid a notification p := params.GetDefaultE2E() p.CMIX.IdentityPreimage = relationship.GetSilentPreimage() - p.DebugTag = "ft.New" + p.DebugTag = "FT.New" // Send E2E message - rounds, _, _, err := m.net.SendE2E(sendMsg, p, nil) - if err != nil && len(rounds) == 0 { + _, _, _, err = m.net.SendE2E(sendMsg, p, nil) + if err != nil { return errors.Errorf(newFtSendE2eErr, recipient, err) } diff --git a/fileTransfer/send_test.go b/fileTransfer/send_test.go index d35e418b8efb80f7a1107d90370a1e7a23706687..d090d4dca6aca58f5fae3a3b6fbfa10d42cf3b2a 100644 --- a/fileTransfer/send_test.go +++ b/fileTransfer/send_test.go @@ -888,13 +888,11 @@ func TestManager_sendEndE2eMessage(t *testing.T) { p := params.GetDefaultE2ESessionParams() rng := csprng.NewSystemRNG() - _, mySidhPriv := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhA, - rng) - theirSidhPub, _ := util.GenerateSIDHKeyPair( - sidh.KeyVariantSidhB, rng) + _, mySidhPriv := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhA, rng) + theirSidhPub, _ := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhB, rng) - err := m.store.E2e().AddPartner(recipient, pubKey, dhKey, mySidhPriv, - theirSidhPub, p, p) + err := m.store.E2e().AddPartner( + recipient, pubKey, dhKey, mySidhPriv, theirSidhPub, p, p) if err != nil { t.Errorf("Failed to add partner %s: %+v", recipient, err) } diff --git a/fileTransfer/sentRoundTracker.go b/fileTransfer/sentRoundTracker.go index 80fa1da210befa6d3cc7cec698d197259a903ecb..75a06eb13e593f205b470b30f2dadc76a1c2c869 100644 --- a/fileTransfer/sentRoundTracker.go +++ b/fileTransfer/sentRoundTracker.go @@ -59,13 +59,20 @@ func (srt *sentRoundTracker) Has(rid id.Round) bool { return exists } -// Insert adds the round to the tracker with the current time. -func (srt *sentRoundTracker) Insert(rid id.Round) { +// Insert adds the round to the tracker with the current time. Returns true if +// the round was added. +func (srt *sentRoundTracker) Insert(rid id.Round) bool { timeNow := netTime.Now() srt.mux.Lock() defer srt.mux.Unlock() + _, exists := srt.rounds[rid] + if exists { + return false + } + srt.rounds[rid] = timeNow + return true } // Remove deletes a round ID from the tracker. diff --git a/go.mod b/go.mod index 0d5e6d485cfc66aa506a1fefa30f30cbf1c3999c..ccd5fd7bb7b6af98fef44da7876ee92cc42c08d6 100644 --- a/go.mod +++ b/go.mod @@ -12,13 +12,13 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/viper v1.7.1 gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 - gitlab.com/elixxir/comms v0.0.4-0.20220308183624-c2183e687a03 - gitlab.com/elixxir/crypto v0.0.7-0.20220309234716-1ba339865787 + gitlab.com/elixxir/comms v0.0.4-0.20220323190139-9ed75f3a8b2c + gitlab.com/elixxir/crypto v0.0.7-0.20220317172048-3de167bd9406 gitlab.com/elixxir/ekv v0.1.6 - gitlab.com/elixxir/primitives v0.0.3-0.20220222212109-d412a6e46623 - gitlab.com/xx_network/comms v0.0.4-0.20220311192415-d95fe8906580 - gitlab.com/xx_network/crypto v0.0.5-0.20220222212031-750f7e8a01f4 - gitlab.com/xx_network/primitives v0.0.4-0.20220222211843-901fa4a2d72b + gitlab.com/elixxir/primitives v0.0.3-0.20220323183834-b98f255361b8 + gitlab.com/xx_network/comms v0.0.4-0.20220315161313-76acb14429ac + gitlab.com/xx_network/crypto v0.0.5-0.20220317171841-084640957d71 + gitlab.com/xx_network/primitives v0.0.4-0.20220317172007-4d2a53e6e669 golang.org/x/crypto v0.0.0-20220128200615-198e4374d7ed golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 google.golang.org/grpc v1.42.0 diff --git a/go.sum b/go.sum index d5504c7a5d85bda9142d40c5dd364953fd617a98..4cafe55501ef1123d66d1b53d3671f8706cc3b2a 100644 --- a/go.sum +++ b/go.sum @@ -272,34 +272,35 @@ github.com/zeebo/pcg v1.0.0 h1:dt+dx+HvX8g7Un32rY9XWoYnd0NmKmrIzpHF7qiTDj0= github.com/zeebo/pcg v1.0.0/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 h1:Gi6rj4mAlK0BJIk1HIzBVMjWNjIUfstrsXC2VqLYPcA= gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228/go.mod h1:H6jztdm0k+wEV2QGK/KYA+MY9nj9Zzatux/qIvDDv3k= -gitlab.com/elixxir/comms v0.0.4-0.20220308183624-c2183e687a03 h1:4eNjO3wCyHgxpGeq2zgDb5SsdTcQaG5IZjBOuEL6KgM= -gitlab.com/elixxir/comms v0.0.4-0.20220308183624-c2183e687a03/go.mod h1:4yMdU+Jee5W9lqkZGHJAuipEhW7FloT0eyVEFUJza+E= +gitlab.com/elixxir/comms v0.0.4-0.20220323190139-9ed75f3a8b2c h1:ajjTw08YjRjl3HvtBNGtoCWhOg8k8upqmTweH18wkC4= +gitlab.com/elixxir/comms v0.0.4-0.20220323190139-9ed75f3a8b2c/go.mod h1:tlHSrtSliKWUxsck8z/Ql/VJkMdSONV2BeWaUAAXzgk= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp07WQmhA= -gitlab.com/elixxir/crypto v0.0.7-0.20220222221347-95c7ae58da6b/go.mod h1:tD6XjtQh87T2nKZL5I/pYPck5M2wLpkZ1Oz7H/LqO10= -gitlab.com/elixxir/crypto v0.0.7-0.20220309234716-1ba339865787 h1:+qmsWov412+Yn7AKUhTbOcDgAydNXlNLPmFpO2W5LwY= -gitlab.com/elixxir/crypto v0.0.7-0.20220309234716-1ba339865787/go.mod h1:tD6XjtQh87T2nKZL5I/pYPck5M2wLpkZ1Oz7H/LqO10= +gitlab.com/elixxir/crypto v0.0.7-0.20220317172048-3de167bd9406 h1:PRA8OJMXuy9JZmUuZ442AIE/tWY7HisqezyLNhpZZ9w= +gitlab.com/elixxir/crypto v0.0.7-0.20220317172048-3de167bd9406/go.mod h1:tD6XjtQh87T2nKZL5I/pYPck5M2wLpkZ1Oz7H/LqO10= gitlab.com/elixxir/ekv v0.1.6 h1:M2hUSNhH/ChxDd+s8xBqSEKgoPtmE6hOEBqQ73KbN6A= gitlab.com/elixxir/ekv v0.1.6/go.mod h1:e6WPUt97taFZe5PFLPb1Dupk7tqmDCTQu1kkstqJvw4= gitlab.com/elixxir/primitives v0.0.0-20200731184040-494269b53b4d/go.mod h1:OQgUZq7SjnE0b+8+iIAT2eqQF+2IFHn73tOo+aV11mg= gitlab.com/elixxir/primitives v0.0.0-20200804170709-a1896d262cd9/go.mod h1:p0VelQda72OzoUckr1O+vPW0AiFe0nyKQ6gYcmFSuF8= gitlab.com/elixxir/primitives v0.0.0-20200804182913-788f47bded40/go.mod h1:tzdFFvb1ESmuTCOl1z6+yf6oAICDxH2NPUemVgoNLxc= gitlab.com/elixxir/primitives v0.0.1/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2YKnw5c+LJCE= -gitlab.com/elixxir/primitives v0.0.3-0.20220222212109-d412a6e46623 h1:NzJ06KdJd3fVJee0QvGhNr3CO+Ki8Ea1PeakZsm+rZM= gitlab.com/elixxir/primitives v0.0.3-0.20220222212109-d412a6e46623/go.mod h1:MtFIyJUQn9P7djzVlBpEYkPNnnWFTjZvw89swoXY+QM= +gitlab.com/elixxir/primitives v0.0.3-0.20220323183834-b98f255361b8 h1:U3Ahbg2N6QL5uwPyccWWN4ZUBFBLgCsuq5sQxOI2VCw= +gitlab.com/elixxir/primitives v0.0.3-0.20220323183834-b98f255361b8/go.mod h1:MtFIyJUQn9P7djzVlBpEYkPNnnWFTjZvw89swoXY+QM= gitlab.com/xx_network/comms v0.0.0-20200805174823-841427dd5023/go.mod h1:owEcxTRl7gsoM8c3RQ5KAm5GstxrJp5tn+6JfQ4z5Hw= -gitlab.com/xx_network/comms v0.0.4-0.20220223205228-7c4974139569/go.mod h1:isHnwem0v4rTcwwHP455FhVlFyPcHkHiVz+N3s/uCSI= -gitlab.com/xx_network/comms v0.0.4-0.20220311192415-d95fe8906580 h1:IV0gDwdTxtCpc9Vkx7IeSStSqvG+0ZpF57X+OhTQDIM= -gitlab.com/xx_network/comms v0.0.4-0.20220311192415-d95fe8906580/go.mod h1:isHnwem0v4rTcwwHP455FhVlFyPcHkHiVz+N3s/uCSI= +gitlab.com/xx_network/comms v0.0.4-0.20220315161313-76acb14429ac h1:+ykw0JqLH/qMprPEKazGHNH8gUoHGA78EIr4ienxnw4= +gitlab.com/xx_network/comms v0.0.4-0.20220315161313-76acb14429ac/go.mod h1:isHnwem0v4rTcwwHP455FhVlFyPcHkHiVz+N3s/uCSI= gitlab.com/xx_network/crypto v0.0.3/go.mod h1:DF2HYvvCw9wkBybXcXAgQMzX+MiGbFPjwt3t17VRqRE= gitlab.com/xx_network/crypto v0.0.4/go.mod h1:+lcQEy+Th4eswFgQDwT0EXKp4AXrlubxalwQFH5O0Mk= -gitlab.com/xx_network/crypto v0.0.5-0.20220222212031-750f7e8a01f4 h1:95dZDMn/hpLNwsgZO9eyQgGKaSDyh6F6+WygqZIciww= gitlab.com/xx_network/crypto v0.0.5-0.20220222212031-750f7e8a01f4/go.mod h1:6apvsoHCQJDjO0J4E3uhR3yO9tTz/Mq5be5rjB3tQPU= +gitlab.com/xx_network/crypto v0.0.5-0.20220317171841-084640957d71 h1:N2+Jja4xNg66entu6rGvzRcf3Vc785xgiaHeDPYnBvg= +gitlab.com/xx_network/crypto v0.0.5-0.20220317171841-084640957d71/go.mod h1:/SJf+R75E+QepdTLh0H1/udsovxx2Q5ru34q1v0umKk= gitlab.com/xx_network/primitives v0.0.0-20200803231956-9b192c57ea7c/go.mod h1:wtdCMr7DPePz9qwctNoAUzZtbOSHSedcK++3Df3psjA= gitlab.com/xx_network/primitives v0.0.0-20200804183002-f99f7a7284da/go.mod h1:OK9xevzWCaPO7b1wiluVJGk7R5ZsuC7pHY5hteZFQug= gitlab.com/xx_network/primitives v0.0.2/go.mod h1:cs0QlFpdMDI6lAo61lDRH2JZz+3aVkHy+QogOB6F/qc= -gitlab.com/xx_network/primitives v0.0.4-0.20220222211843-901fa4a2d72b h1:shZZ3xZNNKYVpEp4Mqu/G9+ZR+J8QA8mmPk/4Cit8+Y= gitlab.com/xx_network/primitives v0.0.4-0.20220222211843-901fa4a2d72b/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= +gitlab.com/xx_network/primitives v0.0.4-0.20220317172007-4d2a53e6e669 h1:ECBYK/VtBfpPorO7hI2Rhwex2b5pZU2cBIOU92lMOYk= +gitlab.com/xx_network/primitives v0.0.4-0.20220317172007-4d2a53e6e669/go.mod h1:AXVVFt7dDAeIUpOGPiStCcUIKsBXLWbmV/BgZ4T+tOo= gitlab.com/xx_network/ring v0.0.3-0.20220222211904-da613960ad93 h1:eJZrXqHsMmmejEPWw8gNAt0I8CGAMNO/7C339Zco3TM= gitlab.com/xx_network/ring v0.0.3-0.20220222211904-da613960ad93/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index dd4616cc4583d54621cf2fb42dc2611ab96cee95..a093152ce656a77a798fee3180deaa5fb737dde0 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -33,10 +33,18 @@ import ( "time" ) -// List of errors that initiate a Host replacement -var errorsList = []string{context.DeadlineExceeded.Error(), "connection refused", "host disconnected", - "transport is closing", balancer.ErrTransientFailure.Error(), "Last try to connect", - ndf.NO_NDF, "Host is in cool down", grpc.ErrClientConnClosing.Error()} +// List of errors that initiate a Host replacement. +var errorsList = []string{ + context.DeadlineExceeded.Error(), + "connection refused", + "host disconnected", + "transport is closing", + balancer.ErrTransientFailure.Error(), + "Last try to connect", + ndf.NO_NDF, + "Host is in cool down", + grpc.ErrClientConnClosing.Error(), +} // HostManager Interface allowing storage and retrieval of Host objects type HostManager interface { @@ -46,7 +54,7 @@ type HostManager interface { } // Filter filters out IDs from the provided map based on criteria in the NDF. -// The passed in map is a map of the NDF for easier acesss. The map is ID -> index in the NDF +// The passed in map is a map of the NDF for easier access. The map is ID -> index in the NDF // There is no multithreading, the filter function can either edit the passed map or make a new one // and return it. The general pattern is to loop through the map, then look up data about the node // in the ndf to make a filtering decision, then add them to a new map if they are accepted. @@ -98,8 +106,8 @@ func DefaultPoolParams() PoolParams { p.HostParams.EnableCoolOff = false p.HostParams.NumSendsBeforeCoolOff = 1 p.HostParams.CoolOffTimeout = 5 * time.Minute - p.HostParams.SendTimeout = 1 * time.Second - p.HostParams.PingTimeout = 1 * time.Second + p.HostParams.SendTimeout = 1000 * time.Millisecond + p.HostParams.PingTimeout = 1000 * time.Millisecond return p } @@ -188,8 +196,16 @@ func (h *HostPool) initialize(startIdx uint32) error { } // Randomly shuffle gateways in NDF - randomGateways := make([]ndf.Gateway, len(h.ndf.Gateways)) - copy(randomGateways, h.ndf.Gateways) + randomGateways := make([]ndf.Gateway, 0, len(h.ndf.Gateways)) + + // Filter out not active gateways + for i := 0; i < len(h.ndf.Gateways); i++ { + if h.ndf.Nodes[i].Status == ndf.Active { + randomGateways = append(randomGateways, h.ndf.Gateways[i]) + } + } + + // Randomize the gateway order var rndBytes [32]byte stream := h.rng.GetStream() _, err := stream.Read(rndBytes[:]) @@ -247,14 +263,15 @@ func (h *HostPool) initialize(startIdx uint32) error { } // Ping the Host latency and send the result - jww.DEBUG.Printf("Testing host %s...", gwId.String()) + jww.DEBUG.Printf("Testing host %s...", gwId) latency, _ := newHost.IsOnline() c <- gatewayDuration{gwId, latency} }() } // Collect ping results - timer := time.NewTimer(2 * h.poolParams.HostParams.PingTimeout) + pingTimeout := 2 * h.poolParams.HostParams.PingTimeout + timer := time.NewTimer(pingTimeout) innerLoop: for { select { @@ -263,16 +280,18 @@ func (h *HostPool) initialize(startIdx uint32) error { if gw.latency > 0 { resultList = append(resultList, gw) jww.DEBUG.Printf("Adding HostPool result %d/%d: %s: %d", - len(resultList), numGatewaysToTry, gw.id.String(), gw.latency) + len(resultList), numGatewaysToTry, gw.id, gw.latency) } // Break if we have all needed slots if uint32(len(resultList)) == numGatewaysToTry { exit = true + timer.Stop() break innerLoop } case <-timer.C: - jww.INFO.Printf("HostPool initialization timed out!") + jww.INFO.Printf("HostPool initialization timed out after %s.", + pingTimeout) break innerLoop } } @@ -489,9 +508,9 @@ func (h *HostPool) selectGateway() *id.ID { nodeId := gwId.DeepCopy() nodeId.SetType(id.Node) nodeNdfIdx := h.ndfMap[*nodeId] - isNodeStale := h.ndf.Nodes[nodeNdfIdx].Status == ndf.Stale - if isNodeStale { - jww.DEBUG.Printf("Ignoring stale node: %s", nodeId.String()) + isNodeIsNotActive := h.ndf.Nodes[nodeNdfIdx].Status != ndf.Active + if isNodeIsNotActive { + jww.DEBUG.Printf("Ignoring stale node: %s", nodeId) continue } @@ -554,13 +573,14 @@ func (h *HostPool) replaceHostNoStore(newId *id.ID, oldPoolIndex uint32) error { go func() { err := newHost.Connect() if err != nil { - jww.WARN.Printf("Unable to initialize Host connection: %+v", err) + jww.WARN.Printf("Unable to initialize Host connection to %s: "+ + "%+v", newId, err) } }() } - jww.DEBUG.Printf("Replaced Host at %d [%s] with new Host %s", oldPoolIndex, oldHostIDStr, - newId.String()) + jww.DEBUG.Printf("Replaced Host at %d [%s] with new Host %s", + oldPoolIndex, oldHostIDStr, newId) return nil } @@ -686,7 +706,8 @@ func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { select { case h.addGatewayChan <- ng: default: - jww.WARN.Printf("Unable to send AddGateway event for id %s", gwId.String()) + jww.WARN.Printf( + "Unable to send AddGateway event for id %s", gwId) } } @@ -723,9 +744,9 @@ func readUint32(rng io.Reader) uint32 { // readRangeUint32 reduces an integer from 0, MaxUint32 to the range start, end func readRangeUint32(start, end uint32, rng io.Reader) uint32 { size := end - start - // note we could just do the part inside the () here, but then extra - // can == size which means a little range is wasted, either - // choice seems negligible, so we went with the "more correct" + // Note that we could just do the part inside the () here, but then extra + // can == size which means a little range is wasted; either choice seems + // negligible, so we went with the "more correct" extra := (math.MaxUint32%size + 1) % size limit := math.MaxUint32 - extra // Loop until we read something inside the limit diff --git a/network/manager.go b/network/manager.go index 1cec837978ffc0aaa120ea061b4d993df8110e07..7e35db63d6ddb15ece6521893daf75441bf73e64 100644 --- a/network/manager.go +++ b/network/manager.go @@ -185,7 +185,7 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab multi.Add(trackNetworkStopper) // Message reception - multi.Add(m.message.StartProcessies()) + multi.Add(m.message.StartProcesses()) // Round processing multi.Add(m.round.StartProcessors()) diff --git a/network/message/manager.go b/network/message/manager.go index 0ac5eeceb0f892ef6ef7d4b0e0f578904e1901a3..ae51c15520e5cbdd56b494578a9bff60180fc405 100644 --- a/network/message/manager.go +++ b/network/message/manager.go @@ -39,17 +39,19 @@ func NewManager(internal internal.Internal, param params.Network, m := Manager{ param: param, partitioner: parse.NewPartitioner(dummyMessage.ContentsSize(), internal.Session), + Internal: internal, + sender: sender, + blacklistedNodes: make(map[string]interface{}, len(param.BlacklistedNodes)), messageReception: make(chan Bundle, param.MessageReceptionBuffLen), + nodeRegistration: nodeRegistration, networkIsHealthy: make(chan bool, 1), triggerGarbled: make(chan struct{}, 100), - nodeRegistration: nodeRegistration, - sender: sender, - Internal: internal, } for _, nodeId := range param.BlacklistedNodes { decodedId, err := base64.StdEncoding.DecodeString(nodeId) if err != nil { - jww.ERROR.Printf("Unable to decode blacklisted Node ID %s: %+v", decodedId, err) + jww.ERROR.Printf("Unable to decode blacklisted Node ID %s: %+v", + decodedId, err) continue } m.blacklistedNodes[string(decodedId)] = nil @@ -57,29 +59,29 @@ func NewManager(internal internal.Internal, param params.Network, return &m } -//Gets the channel to send received messages on +// GetMessageReceptionChannel gets the channel to send received messages on. func (m *Manager) GetMessageReceptionChannel() chan<- Bundle { return m.messageReception } -//Starts all worker pool -func (m *Manager) StartProcessies() stoppable.Stoppable { +// StartProcesses starts all worker pool. +func (m *Manager) StartProcesses() stoppable.Stoppable { multi := stoppable.NewMulti("MessageReception") - //create the message handler workers + // Create the message handler workers for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ { stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i)) go m.handleMessages(stop) multi.Add(stop) } - //create the critical messages thread + // Create the critical messages thread critStop := stoppable.NewSingle("CriticalMessages") go m.processCriticalMessages(critStop) m.Health.AddChannel(m.networkIsHealthy) multi.Add(critStop) - //create the garbled messages thread + // Create the garbled messages thread garbledStop := stoppable.NewSingle("GarbledMessages") go m.processGarbledMessages(garbledStop) multi.Add(garbledStop) diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index a7db6d51f80e38bad648ccff338dc68409d6cb05..82b4d21e56bbb86605fcfcd56458d592a21585ee 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -45,11 +45,9 @@ func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, } func calculateSendTimeout(best *pb.RoundInfo, max time.Duration) time.Duration { - RoundStartTime := time.Unix(0, - int64(best.Timestamps[states.QUEUED])) + RoundStartTime := time.Unix(0, int64(best.Timestamps[states.QUEUED])) // 250ms AFTER the round starts to hear the response. - timeout := RoundStartTime.Sub( - netTime.Now().Add(250 * time.Millisecond)) + timeout := RoundStartTime.Sub(netTime.Now().Add(250 * time.Millisecond)) if timeout > max { timeout = max } @@ -128,9 +126,6 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, } jww.TRACE.Printf("[SendCMIX-%s] bestRound: %v", cmixParams.DebugTag, bestRound) - // add the round on to the list of attempted, so it is not tried again - attempted.Insert(bestRound.GetRoundId()) - // Determine whether the selected round contains any Nodes // that are blacklisted by the params.Network object containsBlacklisted := false @@ -184,7 +179,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, timeout = calculatedTimeout } - //send the message + // send the message result, err := comms.SendPutMessage(host, wrappedMsg, timeout) jww.TRACE.Printf("[SendCMIX-%s]sendFunc %s putmsg", cmixParams.DebugTag, host) diff --git a/network/message/sendManyCmix.go b/network/message/sendManyCmix.go index 29e766edd90738a3e0f89afa8832c48f109b2f68..2e2b67fc716c50907a3dbfb9a14761291491c22d 100644 --- a/network/message/sendManyCmix.go +++ b/network/message/sendManyCmix.go @@ -102,10 +102,6 @@ func sendManyCmixHelper(sender *gateway.Sender, continue } - // Add the round on to the list of attempted rounds so that it is not - // tried again - attempted.Insert(bestRound.GetRoundId()) - // Determine whether the selected round contains any nodes that are // blacklisted by the params.Network object containsBlacklisted := false @@ -203,8 +199,9 @@ func sendManyCmixHelper(sender *gateway.Sender, param.DebugTag, ephemeralIDsString, recipientString, bestRound.ID, err) jww.INFO.Printf("[SendManyCMIX-%s]error received, continuing: %v", param.DebugTag, err) continue + } else { + jww.INFO.Printf("[SendManyCMIX-%s]Error received: %v", param.DebugTag, err) } - jww.INFO.Printf("error received: %v", err) return 0, []ephemeral.Id{}, err } @@ -212,14 +209,14 @@ func sendManyCmixHelper(sender *gateway.Sender, gwSlotResp := result.(*pb.GatewaySlotResponse) if gwSlotResp.Accepted { m := fmt.Sprintf("[SendManyCMIX-%s]Successfully sent to EphIDs %s (sources: [%s]) "+ - "in round %d", param.DebugTag, ephemeralIDsString, recipientString, bestRound.ID) + "in round %d (msgDigest: %s)", param.DebugTag, ephemeralIDsString, recipientString, bestRound.ID, msgDigests) jww.INFO.Print(m) events.Report(1, "MessageSendMany", "Metric", m) onSend(uint32(len(msgs)), session) return id.Round(bestRound.ID), ephemeralIDs, nil } else { - jww.FATAL.Panicf("Gateway %s returned no error, but failed to "+ - "accept message when sending to EphIDs [%s] (%s) on round %d", + jww.FATAL.Panicf("[SendManyCMIX-%s]Gateway %s returned no error, but failed to "+ + "accept message when sending to EphIDs [%s] (%s) on round %d", param.DebugTag, firstGateway, ephemeralIDsString, recipientString, bestRound.ID) } }