From 1a6e67d007a3fe35245557506793016a36d88bb9 Mon Sep 17 00:00:00 2001 From: Jono Wenger <jono@elixxir.io> Date: Thu, 30 Dec 2021 12:31:57 -0800 Subject: [PATCH] Add more tagging to file transfer debug prints --- bindings/send.go | 10 +++++----- cmd/fileTransfer.go | 2 +- cmd/root.go | 20 ++++++++++---------- fileTransfer/manager.go | 8 ++++---- fileTransfer/params.go | 7 ------- fileTransfer/params_test.go | 18 +++--------------- fileTransfer/send.go | 18 +++++++++--------- fileTransfer/utils_test.go | 4 +++- interfaces/params/CMIX.go | 4 ++-- network/gateway/utils_test.go | 11 ++++++----- network/message/sendCmix.go | 6 +++--- network/rounds/retrieve.go | 4 ++-- storage/fileTransfer/receiveTransfer.go | 2 +- storage/fileTransfer/sentFileTransfers.go | 2 +- storage/fileTransfer/sentPartTracker.go | 8 +++++++- storage/fileTransfer/sentTransfer.go | 2 +- storage/rounds/uncheckedRounds.go | 6 +++--- 17 files changed, 61 insertions(+), 71 deletions(-) diff --git a/bindings/send.go b/bindings/send.go index 6b139711b..be9366974 100644 --- a/bindings/send.go +++ b/bindings/send.go @@ -71,7 +71,7 @@ func (c *Client) SendCmix(recipient, contents []byte, parameters string) (int, e // on failure a round id of -1 is returned // fixme: cannot use a slice of slices over bindings. Will need to modify this function once // a proper input format has been specified -//func (c *Client) SendManyCMIX(recipients, contents [][]byte, parameters string) (int, error) { +// func (c *Client) SendManyCMIX(recipients, contents [][]byte, parameters string) (int, error) { // // p, err := params.GetCMIXParameters(parameters) // if err != nil { @@ -103,7 +103,7 @@ func (c *Client) SendCmix(recipient, contents []byte, parameters string) (int, e // err)) // } // return int(rid), nil -//} +// } // SendUnsafe sends an unencrypted payload to the provided recipient // with the provided msgType. Returns the list of rounds in which parts @@ -188,7 +188,7 @@ type SendReport struct { type SendReportDisk struct { List []id.Round Mid []byte - Ts int64 + Ts int64 } func (sr *SendReport) GetRoundList() *RoundList { @@ -215,7 +215,7 @@ func (sr *SendReport) Marshal() ([]byte, error) { srd := SendReportDisk{ List: sr.rl.list, Mid: sr.mid[:], - Ts: sr.ts.UnixNano(), + Ts: sr.ts.UnixNano(), } return json.Marshal(&srd) } @@ -229,6 +229,6 @@ func (sr *SendReport) Unmarshal(b []byte) error { copy(sr.mid[:], srd.Mid) sr.rl = &RoundList{list: srd.List} - sr.ts = time.Unix(0,srd.Ts) + sr.ts = time.Unix(0, srd.Ts) return nil } diff --git a/cmd/fileTransfer.go b/cmd/fileTransfer.go index 04214377d..ca0e7da68 100644 --- a/cmd/fileTransfer.go +++ b/cmd/fileTransfer.go @@ -145,7 +145,7 @@ func initFileTransferManager(client *api.Client, maxThroughput int) ( // Create new parameters p := ft.DefaultParams() if maxThroughput != 0 { - p = ft.NewParams(maxThroughput) + p.MaxThroughput = maxThroughput } // Create new manager diff --git a/cmd/root.go b/cmd/root.go index 010baff8a..bbc043f7a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -227,10 +227,10 @@ var rootCmd = &cobra.Command{ client.GetHealth().AddChannel(connected) waitUntilConnected(connected) - //err = client.RegisterForNotifications("dJwuGGX3KUyKldWK5PgQH8:APA91bFjuvimRc4LqOyMDiy124aLedifA8DhldtaB_b76ggphnFYQWJc_fq0hzQ-Jk4iYp2wPpkwlpE1fsOjs7XWBexWcNZoU-zgMiM0Mso9vTN53RhbXUferCbAiEylucEOacy9pniN") - //if err != nil { + // err = client.RegisterForNotifications("dJwuGGX3KUyKldWK5PgQH8:APA91bFjuvimRc4LqOyMDiy124aLedifA8DhldtaB_b76ggphnFYQWJc_fq0hzQ-Jk4iYp2wPpkwlpE1fsOjs7XWBexWcNZoU-zgMiM0Mso9vTN53RhbXUferCbAiEylucEOacy9pniN") + // if err != nil { // jww.FATAL.Panicf("Failed to register for notifications: %+v", err) - //} + // } // After connection, make sure we have registered with at least // 85% of the nodes @@ -399,7 +399,7 @@ var rootCmd = &cobra.Command{ case m := <-recvCh: fmt.Printf("Message received: %s\n", string( m.Payload)) - //fmt.Printf("%s", m.Timestamp) + // fmt.Printf("%s", m.Timestamp) receiveCnt++ if receiveCnt == expectedCnt { done = true @@ -408,7 +408,7 @@ var rootCmd = &cobra.Command{ } } - //wait an extra 5 seconds to make sure no messages were missed + // wait an extra 5 seconds to make sure no messages were missed done = false timer := time.NewTimer(5 * time.Second) for !done { @@ -419,7 +419,7 @@ var rootCmd = &cobra.Command{ case m := <-recvCh: fmt.Printf("Message received: %s\n", string( m.Payload)) - //fmt.Printf("%s", m.Timestamp) + // fmt.Printf("%s", m.Timestamp) receiveCnt++ } } @@ -533,7 +533,7 @@ func createClient() *api.Client { userIDprefix := viper.GetString("userid-prefix") protoUserPath := viper.GetString("protoUserPath") - //create a new client if none exist + // create a new client if none exist if _, err := os.Stat(storeDir); os.IsNotExist(err) { // Load NDF ndfJSON, err := ioutil.ReadFile(viper.GetString("ndf")) @@ -602,7 +602,7 @@ func initClient() *api.Client { } netParams.VerboseRoundTracking = viper.GetBool("verboseRoundTracking") - //load the client + // load the client client, err := api.Login(storeDir, []byte(pass), netParams) if err != nil { jww.FATAL.Panicf("%+v", err) @@ -680,7 +680,7 @@ func printChanRequest(requestor contact.Contact, message string) { fmt.Printf(msg) msg = fmt.Sprintf("Authentication channel request message: %s\n", message) jww.INFO.Printf(msg) - //fmt.Printf(msg) + // fmt.Printf(msg) } func addPrecanAuthenticatedChannel(client *api.Client, recipientID *id.ID, @@ -743,7 +743,7 @@ func waitUntilConnected(connected chan bool) { waitTimeout := time.Duration(viper.GetUint("waitTimeout")) timeoutTimer := time.NewTimer(waitTimeout * time.Second) isConnected := false - //Wait until we connect or panic if we can't by a timeout + // Wait until we connect or panic if we can't by a timeout for !isConnected { select { case isConnected = <-connected: diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index 56e2a21fc..6191cd96c 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -100,13 +100,13 @@ type Manager struct { // Queue of parts to send sendQueue chan queuedPart - // Maximum data transfer speed in bytes per second - maxThroughput int - // Indicates if old transfers saved to storage have been recovered after // file transfer is closed and reopened oldTransfersRecovered bool + // File transfer parameters + p Params + // Client interfaces client *api.Client store *storage.Session @@ -166,8 +166,8 @@ func newManager(client *api.Client, store *storage.Session, sent: sent, received: received, sendQueue: make(chan queuedPart, sendQueueBuffLen), - maxThroughput: p.MaxThroughput, oldTransfersRecovered: false, + p: p, client: client, store: store, swb: swb, diff --git a/fileTransfer/params.go b/fileTransfer/params.go index 76fa1cf2d..ec2d3b53a 100644 --- a/fileTransfer/params.go +++ b/fileTransfer/params.go @@ -18,13 +18,6 @@ type Params struct { MaxThroughput int } -// NewParams generates a new Params object filled with the given parameters. -func NewParams(maxThroughput int) Params { - return Params{ - MaxThroughput: maxThroughput, - } -} - // DefaultParams returns a Params object filled with the default values. func DefaultParams() Params { return Params{ diff --git a/fileTransfer/params_test.go b/fileTransfer/params_test.go index 8a41f1fc8..4cf19e52c 100644 --- a/fileTransfer/params_test.go +++ b/fileTransfer/params_test.go @@ -12,23 +12,11 @@ import ( "testing" ) -// Tests that NewParams returns the expected Params object. -func TestNewParams(t *testing.T) { - expected := Params{ - MaxThroughput: 42, - } - - received := NewParams(expected.MaxThroughput) - - if !reflect.DeepEqual(expected, received) { - t.Errorf("Received Params does not match expected."+ - "\nexpected: %+v\nreceived: %+v", expected, received) - } -} - // Tests that DefaultParams returns a Params object with the expected defaults. func TestDefaultParams(t *testing.T) { - expected := Params{MaxThroughput: defaultMaxThroughput} + expected := Params{ + MaxThroughput: defaultMaxThroughput, + } received := DefaultParams() if !reflect.DeepEqual(expected, received) { diff --git a/fileTransfer/send.go b/fileTransfer/send.go index 24d7f0064..6b742a7e6 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -78,7 +78,7 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, avgSendSize := avgNumMessages * (8192 / 8) // Calculate the delay needed to reach max throughput - delay := time.Duration((int(time.Second) * avgSendSize) / m.maxThroughput) + delay := time.Duration((int(time.Second) * avgSendSize) / m.p.MaxThroughput) // Batch of parts read from the queue to be sent var partList []queuedPart @@ -299,24 +299,24 @@ func (m *Manager) buildMessages(partList []queuedPart) ( for i, part := range partList { // Lookup the transfer by the ID; if the transfer does not exist, then // print a warning and skip this message - transfer, err := m.sent.GetTransfer(part.tid) + st, err := m.sent.GetTransfer(part.tid) if err != nil { jww.WARN.Printf(noSentTransferWarn, part.tid, part.partNum, err) continue } // Generate new cMix message with encrypted file part - cmixMsg, err := m.newCmixMessage(transfer, part.partNum, rng) + cmixMsg, err := m.newCmixMessage(st, part.partNum, rng) if err == ftStorage.MaxRetriesErr { jww.DEBUG.Printf("[FT] File transfer %s sent to %s ran out of "+ - "retries {parts: %d, fingerprints: %d}", - part.tid, transfer.GetRecipient(), transfer.GetNumParts(), - transfer.GetNumFps()) + "retries {parts: %d, numFps: %d/%d}", + part.tid, st.GetRecipient(), st.GetNumParts(), + st.GetNumFps()-st.GetNumAvailableFps(), st.GetNumFps()) // If the max number of retries has been reached, then report the // error on the callback, delete the transfer, and skip to the next // message - go transfer.CallProgressCB(errors.Errorf(maxRetriesErr, err)) + go st.CallProgressCB(errors.Errorf(maxRetriesErr, err)) continue } else if err != nil { // For all other errors, return an error @@ -326,13 +326,13 @@ func (m *Manager) buildMessages(partList []queuedPart) ( // Construct TargetedCmixMessage msg := message.TargetedCmixMessage{ - Recipient: transfer.GetRecipient(), + Recipient: st.GetRecipient(), Message: cmixMsg, } // Add to list of messages to send messages = append(messages, msg) - transfers[part.tid] = transfer + transfers[part.tid] = st groupedParts[part.tid] = append(groupedParts[part.tid], part.partNum) partsToResend = append(partsToResend, i) } diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index 85bd6f9e0..7738b4529 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -186,15 +186,17 @@ func newTestManager(sendErr bool, sendChan, sendE2eChan chan message.Receive, return nil } + p := DefaultParams() avgNumMessages := (minPartsSendPerRound + maxPartsSendPerRound) / 2 avgSendSize := avgNumMessages * (8192 / 8) + p.MaxThroughput = int(time.Second) * avgSendSize m := &Manager{ receiveCB: receiveCB, sent: sent, received: received, sendQueue: make(chan queuedPart, sendQueueBuffLen), - maxThroughput: int(time.Second) * avgSendSize, + p: p, store: storage.InitTestingSession(t), swb: switchboard.New(), net: net, diff --git a/interfaces/params/CMIX.go b/interfaces/params/CMIX.go index ee735d1d9..694553f97 100644 --- a/interfaces/params/CMIX.go +++ b/interfaces/params/CMIX.go @@ -13,7 +13,7 @@ import ( ) type CMIX struct { - //maximum number of rounds to try and send on + // maximum number of rounds to try and send on RoundTries uint Timeout time.Duration RetryDelay time.Duration @@ -34,7 +34,7 @@ func (c CMIX) Marshal() ([]byte, error) { return json.Marshal(c) } -// Obtain default CMIX parameters, or override with given parameters if set +// GetCMIXParameters func obtains default CMIX parameters, or overrides with given parameters if set func GetCMIXParameters(params string) (CMIX, error) { p := GetDefaultCMIX() if len(params) > 0 { diff --git a/network/gateway/utils_test.go b/network/gateway/utils_test.go index 09081da17..2a90528ff 100644 --- a/network/gateway/utils_test.go +++ b/network/gateway/utils_test.go @@ -9,6 +9,7 @@ package gateway import ( "fmt" + "github.com/pkg/errors" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" @@ -141,16 +142,16 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition { const happyPathReturn = "happyPathReturn" -func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, error) { +func SendToPreferred_HappyPath(*connect.Host, *id.ID) (interface{}, error) { return happyPathReturn, nil } -func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, error) { - return nil, fmt.Errorf(errorsList[0]) +func SendToPreferred_KnownError(*connect.Host, *id.ID) (interface{}, error) { + return nil, errors.Errorf(errorsList[0]) } -func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, error) { - return nil, fmt.Errorf("Unexpected error: Oopsie") +func SendToPreferred_UnknownError(*connect.Host, *id.ID) (interface{}, error) { + return nil, errors.Errorf("Unexpected error: Oopsie") } func SendToAny_HappyPath(host *connect.Host) (interface{}, error) { diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 99010f157..43bafc5da 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -105,8 +105,8 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, msg.Digest()) } + // find the best round to send to, excluding attempted rounds remainingTime := cmixParams.Timeout - elapsed - //find the best round to send to, excluding attempted rounds bestRound, err := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer) if err != nil { jww.WARN.Printf("Failed to GetUpcomingRealtime (msgDigest: %s): %+v", msg.Digest(), err) @@ -116,7 +116,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, continue } - //add the round on to the list of attempted, so it is not tried again + // add the round on to the list of attempted, so it is not tried again attempted.Insert(bestRound) // Determine whether the selected round contains any Nodes @@ -175,7 +175,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, return 0, ephemeral.Id{}, err } - //if the comm errors or the message fails to send, continue retrying. + // if the comm errors or the message fails to send, continue retrying. if err != nil { jww.ERROR.Printf("SendCmix failed to send to EphID %d (%s) on "+ "round %d, trying a new round: %+v", ephID.Int64(), recipient, diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index 6f0f5e15a..3682de1a2 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -163,7 +163,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, msgResp, err := comms.RequestMessages(host, msgReq) if err != nil { - //you need to default to a retryable errors because otherwise we cannot enumerate all errors + // you need to default to a retryable errors because otherwise we cannot enumerate all errors return nil, errors.WithMessage(err, gateway.RetryableError) } @@ -202,7 +202,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, jww.INFO.Printf("Received %d messages in Round %v for %d (%s) in %s", len(msgs), roundID, identity.EphId.Int64(), identity.Source, time.Now().Sub(start)) - //build the bundle of messages to send to the message processor + // build the bundle of messages to send to the message processor bundle := message.Bundle{ Round: roundID, Messages: make([]format.Message, len(msgs)), diff --git a/storage/fileTransfer/receiveTransfer.go b/storage/fileTransfer/receiveTransfer.go index 1f5d0756c..df4bad661 100644 --- a/storage/fileTransfer/receiveTransfer.go +++ b/storage/fileTransfer/receiveTransfer.go @@ -244,7 +244,7 @@ func (rt *ReceivedTransfer) stopScheduledProgressCB() error { err := cb.stopThread() if err != nil { failedCallbacks = append(failedCallbacks, i) - jww.WARN.Print(err.Error()) + jww.WARN.Printf("[FT] %s", err) } } diff --git a/storage/fileTransfer/sentFileTransfers.go b/storage/fileTransfer/sentFileTransfers.go index b68c265b1..485eb67d0 100644 --- a/storage/fileTransfer/sentFileTransfers.go +++ b/storage/fileTransfer/sentFileTransfers.go @@ -36,7 +36,7 @@ const ( newSentTransferErr = "failed to create new sent transfer: %+v" getSentTransferErr = "sent file transfer not found" - cancelCallbackErr = "Transfer with ID %s: %+v" + cancelCallbackErr = "[FT] Transfer with ID %s: %+v" deleteSentTransferErr = "failed to delete sent transfer with ID %s from store: %+v" ) diff --git a/storage/fileTransfer/sentPartTracker.go b/storage/fileTransfer/sentPartTracker.go index b65a6aa11..b5d30cf6e 100644 --- a/storage/fileTransfer/sentPartTracker.go +++ b/storage/fileTransfer/sentPartTracker.go @@ -13,6 +13,12 @@ import ( "gitlab.com/elixxir/client/storage/utility" ) +// Error messages. +const ( + // sentPartTracker.GetPartStatus + getInvalidPartErr = "[FT] Failed to get status for part %d: %+v" +) + // sentPartTracker tracks the status of individual sent file parts. type sentPartTracker struct { // The number of file parts in the file @@ -39,7 +45,7 @@ func newSentPartTracker(partStats *utility.MultiStateVector) sentPartTracker { func (spt sentPartTracker) GetPartStatus(partNum uint16) interfaces.FpStatus { status, err := spt.partStats.Get(partNum) if err != nil { - jww.FATAL.Fatalf("failed to get status for part %d: %+v", partNum, err) + jww.FATAL.Fatalf(getInvalidPartErr, partNum, err) } return interfaces.FpStatus(status) } diff --git a/storage/fileTransfer/sentTransfer.go b/storage/fileTransfer/sentTransfer.go index 351c50a78..a91a59847 100644 --- a/storage/fileTransfer/sentTransfer.go +++ b/storage/fileTransfer/sentTransfer.go @@ -388,7 +388,7 @@ func (st *SentTransfer) stopScheduledProgressCB() error { err := cb.stopThread() if err != nil { failedCallbacks = append(failedCallbacks, i) - jww.WARN.Print(err.Error()) + jww.WARN.Printf("[FT] %s", err) } } diff --git a/storage/rounds/uncheckedRounds.go b/storage/rounds/uncheckedRounds.go index d1e71114f..58140e461 100644 --- a/storage/rounds/uncheckedRounds.go +++ b/storage/rounds/uncheckedRounds.go @@ -12,6 +12,7 @@ import ( "encoding/binary" "github.com/golang/protobuf/proto" "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/storage/versioned" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/xx_network/primitives/id" @@ -20,7 +21,6 @@ import ( "sync" "testing" "time" - jww "github.com/spf13/jwalterweatherman" ) const ( @@ -185,7 +185,7 @@ func (s *UncheckedRoundStore) AddRound(rid id.Round, ri *pb.RoundInfo, if !exists || stored.Info == nil { newUncheckedRound := UncheckedRound{ - Id: rid, + Id: rid, Info: ri, Identity: Identity{ EpdId: ephID, @@ -226,7 +226,7 @@ func (s *UncheckedRoundStore) IterateOverList(iterator func(rid id.Round, for _, rnd := range s.list { jww.DEBUG.Printf("rnd for lookup: %d, %+v\n", rnd.Id, rnd) go func(localRid id.Round, - localRnd UncheckedRound){ + localRnd UncheckedRound) { iterator(localRid, localRnd) }(rnd.Id, rnd) } -- GitLab