diff --git a/api/results.go b/api/results.go index 20d4871c5fa6610bfe379c81a6e006281e46d396..4a31b66b525e60037479ac4042017b21cfece313 100644 --- a/api/results.go +++ b/api/results.go @@ -93,6 +93,13 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, oldestRound := networkInstance.GetOldestRoundID() + // Set a lower timeout so there is room for retries, + // while ensuring it does not go too low and cause too many timeouts + roundEventTimeout := 5 * time.Second + if timeout < roundEventTimeout { + roundEventTimeout = timeout + } + // Parse and adjudicate every round for _, rnd := range roundList { // Every round is timed out by default, until proven to have finished @@ -109,20 +116,20 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, } else { // If in progress, add a channel monitoring its state roundEvents.AddRoundEventChan(rnd, sendResults, - timeout-time.Millisecond, states.COMPLETED, states.FAILED) + roundEventTimeout, states.COMPLETED, states.FAILED) numResults++ } } else { - // Update oldest round (buffer may have updated externally) + // Update the oldest round (buffer may have updated externally) if rnd < oldestRound { // If round is older that oldest round in our buffer // Add it to the historical round request (performed later) historicalRequest.Rounds = append(historicalRequest.Rounds, uint64(rnd)) numResults++ } else { - // Otherwise, monitor it's progress + // Otherwise, monitor its progress roundEvents.AddRoundEventChan(rnd, sendResults, - timeout-time.Millisecond, states.COMPLETED, states.FAILED) + roundEventTimeout, states.COMPLETED, states.FAILED) numResults++ } } @@ -151,13 +158,19 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, roundCallback(false, true, roundsResults) return case roundReport := <-sendResults: - - numResults-- - // Skip if the round is nil (unknown from historical rounds) // they default to timed out, so correct behavior is preserved - if roundReport.RoundInfo == nil || roundReport.TimedOut { + if roundReport.RoundInfo == nil { allRoundsSucceeded = false + numResults-- + } else if roundReport.TimedOut { + // Generate a message to track the timed out round + timeoutRequest := &pb.HistoricalRounds{ + Rounds: []uint64{roundReport.RoundInfo.ID}, + } + // Request that round's information, feeding back into sendResults + jww.DEBUG.Printf("Sending HistoricalRounds retry for Round %d", roundReport.RoundInfo.ID) + go c.getHistoricalRounds(timeoutRequest, sendResults, commsInterface) } else { // If available, denote the result roundId := id.Round(roundReport.RoundInfo.ID) @@ -167,6 +180,7 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, roundsResults[roundId] = Failed allRoundsSucceeded = false } + numResults-- } } } @@ -176,7 +190,7 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, } // Helper function which asynchronously pings a random gateway until -// it gets information on it's requested historical rounds +// it gets information on its requested historical rounds func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds, sendResults chan ds.EventReturn, comms historicalRoundsComm) { diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index e1a82dab762f9f156172c1445b45463d24f16207..4ac4c1ab75b820edfc7e390a3a14bc69f1fc6036 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -9,6 +9,7 @@ package fileTransfer import ( "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/api" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" @@ -307,6 +308,10 @@ func (m Manager) Send(fileName, fileType string, fileData []byte, } rng.Close() + jww.DEBUG.Printf("[FT] Sending new file transfer %q to %s {parts: %d, "+ + "size: %d, type: %q, ID: %s}", + fileName, recipient, numParts, fileSize, fileType, transferID) + // Add all parts to queue m.queueParts(transferID, makeListOfPartNums(numParts)) @@ -372,6 +377,10 @@ func (m Manager) CloseSend(tid ftCrypto.TransferID) error { return errors.Errorf(transferInProgressErr, tid) } + jww.DEBUG.Printf("[FT] Closing file transfer %s sent to %s "+ + "{completed: %t, parts: %d}", + tid, transfer.GetRecipient(), completed, transfer.GetNumParts()) + // Delete the transfer from storage return m.sent.DeleteTransfer(tid) } diff --git a/fileTransfer/receiveNew.go b/fileTransfer/receiveNew.go index 45528f9ba270a94fa71ea1652ae1eb8c304375e9..4aadd9d3341f1b5056c45bb01124a3941eb5574b 100644 --- a/fileTransfer/receiveNew.go +++ b/fileTransfer/receiveNew.go @@ -83,6 +83,10 @@ func (m *Manager) readNewFileTransferMessage(msg message.Receive) ( return } + jww.DEBUG.Printf("[FT] Received new file transfer %q from %s {parts: %d, "+ + "size: %d, type: %q}", + newFT.FileName, msg.Sender, newFT.NumParts, newFT.Size, newFT.FileType) + // Get RNG from stream rng := m.rng.GetStream() defer rng.Close() diff --git a/fileTransfer/send.go b/fileTransfer/send.go index 3494f1de083dd46469682a9f65dc85284dc9a997..17bd403d213aaa92654b70a512dd49f30dd3c09d 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -328,6 +328,11 @@ func (m *Manager) buildMessages(partList []queuedPart) ( // Generate new cMix message with encrypted file part cmixMsg, err := m.newCmixMessage(transfer, part.partNum, rng) if err == ftStorage.MaxRetriesErr { + jww.DEBUG.Printf("[FT] File transfer %s sent to %s ran out of "+ + "retries {parts: %d, fingerprints: %d}", + part.tid, transfer.GetRecipient(), transfer.GetNumParts(), + transfer.GetNumFps()) + // If the max number of retries has been reached, then report the // error on the callback, delete the transfer, and skip to the next // message diff --git a/network/message/handler.go b/network/message/handler.go index ecced449eb9f1077311dd26aac3c90851b32c4ac..b7574f4cf8636b67aaa37ef7330058dd037de2f6 100644 --- a/network/message/handler.go +++ b/network/message/handler.go @@ -126,17 +126,19 @@ func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle, edge *edge return } - im := fmt.Sprintf("Received message of type %s from %s in round %d,"+ - " msgDigest: %s, keyFP: %v", encTy, sender, bundle.Round, - msgDigest, msg.GetKeyFP()) - jww.INFO.Print(im) - m.Internal.Events.Report(2, "MessageReception", "MessagePart", im) + // Process the decrypted/unencrypted message partition, to see if // we get a full message xxMsg, ok := m.partitioner.HandlePartition(sender, encTy, msg.GetContents(), relationshipFingerprint) + im := fmt.Sprintf("Received message of ecr type %s and msg type " + + "%d from %s in round %d,msgDigest: %s, keyFP: %v", encTy, + xxMsg.MessageType, sender, bundle.Round, msgDigest, msg.GetKeyFP()) + jww.INFO.Print(im) + m.Internal.Events.Report(2, "MessageReception", "MessagePart", im) + // If the reception completed a message, hear it on the switchboard if ok { //Set the identities diff --git a/network/node/register.go b/network/node/register.go index a4424eb18b16b98f2b1766f68101fcf7c8fe11b4..3e19eb6750aa7bd040e12625ef1cf3bb0852ed0a 100644 --- a/network/node/register.go +++ b/network/node/register.go @@ -31,6 +31,7 @@ import ( "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/crypto/tls" "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/netTime" "strconv" "sync" @@ -83,6 +84,11 @@ func registerNodes(sender *gateway.Sender, session *storage.Session, if _, operating := inProgress.LoadOrStore(nidStr, struct{}{}); operating { continue } + // No need to register with stale nodes + if isStale := gw.Node.Status == ndf.Stale; isStale { + jww.DEBUG.Printf("Skipping registration with stale node %s", nidStr) + continue + } err := registerWithNode(sender, comms, gw, regSignature, regTimestamp, uci, cmix, rng, stop) inProgress.Delete(nidStr) diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go index e1d7351b52c6fa8aa5d74779862baa085a11a5cf..d9c7ad749a3b1011c976fe97387dac281aec7086 100644 --- a/storage/utility/messageBuffer.go +++ b/storage/utility/messageBuffer.go @@ -270,6 +270,7 @@ func (mb *MessageBuffer) Next() (interface{}, bool) { // Retrieve the message for storage m, err = mb.handler.LoadMessage(mb.kv, makeStoredMessageKey(mb.key, h)) if err != nil { + m=nil jww.ERROR.Printf("Failed to load message %s from store, "+ "this may happen on occasion due to replays to increase "+ "reliability: %v", h, err) diff --git a/storage/utility/meteredCmixMessageBuffer.go b/storage/utility/meteredCmixMessageBuffer.go index dd5ade5a31a51469587c4dd8a32d67f07b9c515e..8060b4781fc25af49b97251abcb87981e9929cff 100644 --- a/storage/utility/meteredCmixMessageBuffer.go +++ b/storage/utility/meteredCmixMessageBuffer.go @@ -56,7 +56,7 @@ func (*meteredCmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (int // Load the versioned object vo, err := kv.Get(key, currentMeteredCmixMessageVersion) if err != nil { - return format.Message{}, err + return nil, err } msg := meteredCmixMessage{}