diff --git a/bindings/send.go b/bindings/send.go index 6b139711bf81cc3ef264bd4ca67e01796bad8c73..be9366974b339fdd06539394cda02e34d94b785b 100644 --- a/bindings/send.go +++ b/bindings/send.go @@ -71,7 +71,7 @@ func (c *Client) SendCmix(recipient, contents []byte, parameters string) (int, e // on failure a round id of -1 is returned // fixme: cannot use a slice of slices over bindings. Will need to modify this function once // a proper input format has been specified -//func (c *Client) SendManyCMIX(recipients, contents [][]byte, parameters string) (int, error) { +// func (c *Client) SendManyCMIX(recipients, contents [][]byte, parameters string) (int, error) { // // p, err := params.GetCMIXParameters(parameters) // if err != nil { @@ -103,7 +103,7 @@ func (c *Client) SendCmix(recipient, contents []byte, parameters string) (int, e // err)) // } // return int(rid), nil -//} +// } // SendUnsafe sends an unencrypted payload to the provided recipient // with the provided msgType. Returns the list of rounds in which parts @@ -188,7 +188,7 @@ type SendReport struct { type SendReportDisk struct { List []id.Round Mid []byte - Ts int64 + Ts int64 } func (sr *SendReport) GetRoundList() *RoundList { @@ -215,7 +215,7 @@ func (sr *SendReport) Marshal() ([]byte, error) { srd := SendReportDisk{ List: sr.rl.list, Mid: sr.mid[:], - Ts: sr.ts.UnixNano(), + Ts: sr.ts.UnixNano(), } return json.Marshal(&srd) } @@ -229,6 +229,6 @@ func (sr *SendReport) Unmarshal(b []byte) error { copy(sr.mid[:], srd.Mid) sr.rl = &RoundList{list: srd.List} - sr.ts = time.Unix(0,srd.Ts) + sr.ts = time.Unix(0, srd.Ts) return nil } diff --git a/cmd/fileTransfer.go b/cmd/fileTransfer.go index 04214377d23c0fdbf6642159888cb65c933f0e9f..ca0e7da681929d98bad35871bb28d579c9cffa16 100644 --- a/cmd/fileTransfer.go +++ b/cmd/fileTransfer.go @@ -145,7 +145,7 @@ func initFileTransferManager(client *api.Client, maxThroughput int) ( // Create new parameters p := ft.DefaultParams() if maxThroughput != 0 { - p = ft.NewParams(maxThroughput) + p.MaxThroughput = maxThroughput } // Create new manager diff --git a/cmd/root.go b/cmd/root.go index 010baff8abe638495865438dcc2a69689bd31b0e..bbc043f7a432b07bf55f3204cb6b429c7db2a9a1 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -227,10 +227,10 @@ var rootCmd = &cobra.Command{ client.GetHealth().AddChannel(connected) waitUntilConnected(connected) - //err = client.RegisterForNotifications("dJwuGGX3KUyKldWK5PgQH8:APA91bFjuvimRc4LqOyMDiy124aLedifA8DhldtaB_b76ggphnFYQWJc_fq0hzQ-Jk4iYp2wPpkwlpE1fsOjs7XWBexWcNZoU-zgMiM0Mso9vTN53RhbXUferCbAiEylucEOacy9pniN") - //if err != nil { + // err = client.RegisterForNotifications("dJwuGGX3KUyKldWK5PgQH8:APA91bFjuvimRc4LqOyMDiy124aLedifA8DhldtaB_b76ggphnFYQWJc_fq0hzQ-Jk4iYp2wPpkwlpE1fsOjs7XWBexWcNZoU-zgMiM0Mso9vTN53RhbXUferCbAiEylucEOacy9pniN") + // if err != nil { // jww.FATAL.Panicf("Failed to register for notifications: %+v", err) - //} + // } // After connection, make sure we have registered with at least // 85% of the nodes @@ -399,7 +399,7 @@ var rootCmd = &cobra.Command{ case m := <-recvCh: fmt.Printf("Message received: %s\n", string( m.Payload)) - //fmt.Printf("%s", m.Timestamp) + // fmt.Printf("%s", m.Timestamp) receiveCnt++ if receiveCnt == expectedCnt { done = true @@ -408,7 +408,7 @@ var rootCmd = &cobra.Command{ } } - //wait an extra 5 seconds to make sure no messages were missed + // wait an extra 5 seconds to make sure no messages were missed done = false timer := time.NewTimer(5 * time.Second) for !done { @@ -419,7 +419,7 @@ var rootCmd = &cobra.Command{ case m := <-recvCh: fmt.Printf("Message received: %s\n", string( m.Payload)) - //fmt.Printf("%s", m.Timestamp) + // fmt.Printf("%s", m.Timestamp) receiveCnt++ } } @@ -533,7 +533,7 @@ func createClient() *api.Client { userIDprefix := viper.GetString("userid-prefix") protoUserPath := viper.GetString("protoUserPath") - //create a new client if none exist + // create a new client if none exist if _, err := os.Stat(storeDir); os.IsNotExist(err) { // Load NDF ndfJSON, err := ioutil.ReadFile(viper.GetString("ndf")) @@ -602,7 +602,7 @@ func initClient() *api.Client { } netParams.VerboseRoundTracking = viper.GetBool("verboseRoundTracking") - //load the client + // load the client client, err := api.Login(storeDir, []byte(pass), netParams) if err != nil { jww.FATAL.Panicf("%+v", err) @@ -680,7 +680,7 @@ func printChanRequest(requestor contact.Contact, message string) { fmt.Printf(msg) msg = fmt.Sprintf("Authentication channel request message: %s\n", message) jww.INFO.Printf(msg) - //fmt.Printf(msg) + // fmt.Printf(msg) } func addPrecanAuthenticatedChannel(client *api.Client, recipientID *id.ID, @@ -743,7 +743,7 @@ func waitUntilConnected(connected chan bool) { waitTimeout := time.Duration(viper.GetUint("waitTimeout")) timeoutTimer := time.NewTimer(waitTimeout * time.Second) isConnected := false - //Wait until we connect or panic if we can't by a timeout + // Wait until we connect or panic if we can't by a timeout for !isConnected { select { case isConnected = <-connected: diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index 56e2a21fc023e69faf66bdb77d28ce2ad65df931..6191cd96c8adbd1d4f31e6bf7f6f3a73ac32ac6f 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -100,13 +100,13 @@ type Manager struct { // Queue of parts to send sendQueue chan queuedPart - // Maximum data transfer speed in bytes per second - maxThroughput int - // Indicates if old transfers saved to storage have been recovered after // file transfer is closed and reopened oldTransfersRecovered bool + // File transfer parameters + p Params + // Client interfaces client *api.Client store *storage.Session @@ -166,8 +166,8 @@ func newManager(client *api.Client, store *storage.Session, sent: sent, received: received, sendQueue: make(chan queuedPart, sendQueueBuffLen), - maxThroughput: p.MaxThroughput, oldTransfersRecovered: false, + p: p, client: client, store: store, swb: swb, diff --git a/fileTransfer/params.go b/fileTransfer/params.go index 76fa1cf2d871f78aeba37b6b08e8b73e2dcdfb8f..ec2d3b53a0734e8ceed0da9f66553a4b283ca80b 100644 --- a/fileTransfer/params.go +++ b/fileTransfer/params.go @@ -18,13 +18,6 @@ type Params struct { MaxThroughput int } -// NewParams generates a new Params object filled with the given parameters. -func NewParams(maxThroughput int) Params { - return Params{ - MaxThroughput: maxThroughput, - } -} - // DefaultParams returns a Params object filled with the default values. func DefaultParams() Params { return Params{ diff --git a/fileTransfer/params_test.go b/fileTransfer/params_test.go index 8a41f1fc8de56b6c906ccecb6d9761b4096402f0..4cf19e52ca927e098b299229d6918f83c3c6f67a 100644 --- a/fileTransfer/params_test.go +++ b/fileTransfer/params_test.go @@ -12,23 +12,11 @@ import ( "testing" ) -// Tests that NewParams returns the expected Params object. -func TestNewParams(t *testing.T) { - expected := Params{ - MaxThroughput: 42, - } - - received := NewParams(expected.MaxThroughput) - - if !reflect.DeepEqual(expected, received) { - t.Errorf("Received Params does not match expected."+ - "\nexpected: %+v\nreceived: %+v", expected, received) - } -} - // Tests that DefaultParams returns a Params object with the expected defaults. func TestDefaultParams(t *testing.T) { - expected := Params{MaxThroughput: defaultMaxThroughput} + expected := Params{ + MaxThroughput: defaultMaxThroughput, + } received := DefaultParams() if !reflect.DeepEqual(expected, received) { diff --git a/fileTransfer/send.go b/fileTransfer/send.go index 24d7f0064bfbb44f0c83559a406ef57817388be8..6b742a7e618538f2b08c51d2c60849ad5fc86cb2 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -78,7 +78,7 @@ func (m *Manager) sendThread(stop *stoppable.Single, healthChan chan bool, avgSendSize := avgNumMessages * (8192 / 8) // Calculate the delay needed to reach max throughput - delay := time.Duration((int(time.Second) * avgSendSize) / m.maxThroughput) + delay := time.Duration((int(time.Second) * avgSendSize) / m.p.MaxThroughput) // Batch of parts read from the queue to be sent var partList []queuedPart @@ -299,24 +299,24 @@ func (m *Manager) buildMessages(partList []queuedPart) ( for i, part := range partList { // Lookup the transfer by the ID; if the transfer does not exist, then // print a warning and skip this message - transfer, err := m.sent.GetTransfer(part.tid) + st, err := m.sent.GetTransfer(part.tid) if err != nil { jww.WARN.Printf(noSentTransferWarn, part.tid, part.partNum, err) continue } // Generate new cMix message with encrypted file part - cmixMsg, err := m.newCmixMessage(transfer, part.partNum, rng) + cmixMsg, err := m.newCmixMessage(st, part.partNum, rng) if err == ftStorage.MaxRetriesErr { jww.DEBUG.Printf("[FT] File transfer %s sent to %s ran out of "+ - "retries {parts: %d, fingerprints: %d}", - part.tid, transfer.GetRecipient(), transfer.GetNumParts(), - transfer.GetNumFps()) + "retries {parts: %d, numFps: %d/%d}", + part.tid, st.GetRecipient(), st.GetNumParts(), + st.GetNumFps()-st.GetNumAvailableFps(), st.GetNumFps()) // If the max number of retries has been reached, then report the // error on the callback, delete the transfer, and skip to the next // message - go transfer.CallProgressCB(errors.Errorf(maxRetriesErr, err)) + go st.CallProgressCB(errors.Errorf(maxRetriesErr, err)) continue } else if err != nil { // For all other errors, return an error @@ -326,13 +326,13 @@ func (m *Manager) buildMessages(partList []queuedPart) ( // Construct TargetedCmixMessage msg := message.TargetedCmixMessage{ - Recipient: transfer.GetRecipient(), + Recipient: st.GetRecipient(), Message: cmixMsg, } // Add to list of messages to send messages = append(messages, msg) - transfers[part.tid] = transfer + transfers[part.tid] = st groupedParts[part.tid] = append(groupedParts[part.tid], part.partNum) partsToResend = append(partsToResend, i) } diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index 85bd6f9e040be6298429ee72a13cf78233192e84..7738b4529052317bf77a952da2d0e5dea311249d 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -186,15 +186,17 @@ func newTestManager(sendErr bool, sendChan, sendE2eChan chan message.Receive, return nil } + p := DefaultParams() avgNumMessages := (minPartsSendPerRound + maxPartsSendPerRound) / 2 avgSendSize := avgNumMessages * (8192 / 8) + p.MaxThroughput = int(time.Second) * avgSendSize m := &Manager{ receiveCB: receiveCB, sent: sent, received: received, sendQueue: make(chan queuedPart, sendQueueBuffLen), - maxThroughput: int(time.Second) * avgSendSize, + p: p, store: storage.InitTestingSession(t), swb: switchboard.New(), net: net, diff --git a/interfaces/params/CMIX.go b/interfaces/params/CMIX.go index ee735d1d96026b416c2790c7a7731e5285ca0801..694553f97c8e3cba4ea35b097d2c314f47e0da37 100644 --- a/interfaces/params/CMIX.go +++ b/interfaces/params/CMIX.go @@ -13,7 +13,7 @@ import ( ) type CMIX struct { - //maximum number of rounds to try and send on + // maximum number of rounds to try and send on RoundTries uint Timeout time.Duration RetryDelay time.Duration @@ -34,7 +34,7 @@ func (c CMIX) Marshal() ([]byte, error) { return json.Marshal(c) } -// Obtain default CMIX parameters, or override with given parameters if set +// GetCMIXParameters func obtains default CMIX parameters, or overrides with given parameters if set func GetCMIXParameters(params string) (CMIX, error) { p := GetDefaultCMIX() if len(params) > 0 { diff --git a/network/gateway/utils_test.go b/network/gateway/utils_test.go index 09081da17cc895becdd069ec7b794d53a11bc8e3..2a90528ff10dc53a2375349d87cbcffba3aa77e4 100644 --- a/network/gateway/utils_test.go +++ b/network/gateway/utils_test.go @@ -9,6 +9,7 @@ package gateway import ( "fmt" + "github.com/pkg/errors" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" @@ -141,16 +142,16 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition { const happyPathReturn = "happyPathReturn" -func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, error) { +func SendToPreferred_HappyPath(*connect.Host, *id.ID) (interface{}, error) { return happyPathReturn, nil } -func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, error) { - return nil, fmt.Errorf(errorsList[0]) +func SendToPreferred_KnownError(*connect.Host, *id.ID) (interface{}, error) { + return nil, errors.Errorf(errorsList[0]) } -func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, error) { - return nil, fmt.Errorf("Unexpected error: Oopsie") +func SendToPreferred_UnknownError(*connect.Host, *id.ID) (interface{}, error) { + return nil, errors.Errorf("Unexpected error: Oopsie") } func SendToAny_HappyPath(host *connect.Host) (interface{}, error) { diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 99010f1577df92aa2bcce967f2e84889a1fcaa7e..43bafc5dae9d2f3d32b27bcd5009ed76731c762c 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -105,8 +105,8 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, msg.Digest()) } + // find the best round to send to, excluding attempted rounds remainingTime := cmixParams.Timeout - elapsed - //find the best round to send to, excluding attempted rounds bestRound, err := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer) if err != nil { jww.WARN.Printf("Failed to GetUpcomingRealtime (msgDigest: %s): %+v", msg.Digest(), err) @@ -116,7 +116,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, continue } - //add the round on to the list of attempted, so it is not tried again + // add the round on to the list of attempted, so it is not tried again attempted.Insert(bestRound) // Determine whether the selected round contains any Nodes @@ -175,7 +175,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, return 0, ephemeral.Id{}, err } - //if the comm errors or the message fails to send, continue retrying. + // if the comm errors or the message fails to send, continue retrying. if err != nil { jww.ERROR.Printf("SendCmix failed to send to EphID %d (%s) on "+ "round %d, trying a new round: %+v", ephID.Int64(), recipient, diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index 6f0f5e15a994317e9bbd342cf406198d020fbab9..3682de1a2970c30f5f816aaa8cde22cf221faac8 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -163,7 +163,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, msgResp, err := comms.RequestMessages(host, msgReq) if err != nil { - //you need to default to a retryable errors because otherwise we cannot enumerate all errors + // you need to default to a retryable errors because otherwise we cannot enumerate all errors return nil, errors.WithMessage(err, gateway.RetryableError) } @@ -202,7 +202,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, jww.INFO.Printf("Received %d messages in Round %v for %d (%s) in %s", len(msgs), roundID, identity.EphId.Int64(), identity.Source, time.Now().Sub(start)) - //build the bundle of messages to send to the message processor + // build the bundle of messages to send to the message processor bundle := message.Bundle{ Round: roundID, Messages: make([]format.Message, len(msgs)), diff --git a/storage/fileTransfer/receiveTransfer.go b/storage/fileTransfer/receiveTransfer.go index 1f5d0756c33fe1fe66931ed7789886a865ca2c23..df4bad66163245c9593bfbbc949d73ccf04b5bc3 100644 --- a/storage/fileTransfer/receiveTransfer.go +++ b/storage/fileTransfer/receiveTransfer.go @@ -244,7 +244,7 @@ func (rt *ReceivedTransfer) stopScheduledProgressCB() error { err := cb.stopThread() if err != nil { failedCallbacks = append(failedCallbacks, i) - jww.WARN.Print(err.Error()) + jww.WARN.Printf("[FT] %s", err) } } diff --git a/storage/fileTransfer/sentFileTransfers.go b/storage/fileTransfer/sentFileTransfers.go index b68c265b18ee9735b35d330e7342b5cdffef2c9b..485eb67d062eca8ccb75f9b53bda262d64996eb3 100644 --- a/storage/fileTransfer/sentFileTransfers.go +++ b/storage/fileTransfer/sentFileTransfers.go @@ -36,7 +36,7 @@ const ( newSentTransferErr = "failed to create new sent transfer: %+v" getSentTransferErr = "sent file transfer not found" - cancelCallbackErr = "Transfer with ID %s: %+v" + cancelCallbackErr = "[FT] Transfer with ID %s: %+v" deleteSentTransferErr = "failed to delete sent transfer with ID %s from store: %+v" ) diff --git a/storage/fileTransfer/sentPartTracker.go b/storage/fileTransfer/sentPartTracker.go index b65a6aa115ab9d046086cb8e3fdfe68804665e0b..b5d30cf6e8960e404f8171b6ba00ed0fa508d8e6 100644 --- a/storage/fileTransfer/sentPartTracker.go +++ b/storage/fileTransfer/sentPartTracker.go @@ -13,6 +13,12 @@ import ( "gitlab.com/elixxir/client/storage/utility" ) +// Error messages. +const ( + // sentPartTracker.GetPartStatus + getInvalidPartErr = "[FT] Failed to get status for part %d: %+v" +) + // sentPartTracker tracks the status of individual sent file parts. type sentPartTracker struct { // The number of file parts in the file @@ -39,7 +45,7 @@ func newSentPartTracker(partStats *utility.MultiStateVector) sentPartTracker { func (spt sentPartTracker) GetPartStatus(partNum uint16) interfaces.FpStatus { status, err := spt.partStats.Get(partNum) if err != nil { - jww.FATAL.Fatalf("failed to get status for part %d: %+v", partNum, err) + jww.FATAL.Fatalf(getInvalidPartErr, partNum, err) } return interfaces.FpStatus(status) } diff --git a/storage/fileTransfer/sentTransfer.go b/storage/fileTransfer/sentTransfer.go index 351c50a78b607af43edde1b95c0470c4f5ba4f3e..a91a59847fee21466f0bff6a80a49ad01b67c82d 100644 --- a/storage/fileTransfer/sentTransfer.go +++ b/storage/fileTransfer/sentTransfer.go @@ -388,7 +388,7 @@ func (st *SentTransfer) stopScheduledProgressCB() error { err := cb.stopThread() if err != nil { failedCallbacks = append(failedCallbacks, i) - jww.WARN.Print(err.Error()) + jww.WARN.Printf("[FT] %s", err) } } diff --git a/storage/rounds/uncheckedRounds.go b/storage/rounds/uncheckedRounds.go index d1e71114f456a7b5028a34481e2185e83e629b34..58140e461bcc1f9603a8f8eef2e23d598da26ba4 100644 --- a/storage/rounds/uncheckedRounds.go +++ b/storage/rounds/uncheckedRounds.go @@ -12,6 +12,7 @@ import ( "encoding/binary" "github.com/golang/protobuf/proto" "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/storage/versioned" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/xx_network/primitives/id" @@ -20,7 +21,6 @@ import ( "sync" "testing" "time" - jww "github.com/spf13/jwalterweatherman" ) const ( @@ -185,7 +185,7 @@ func (s *UncheckedRoundStore) AddRound(rid id.Round, ri *pb.RoundInfo, if !exists || stored.Info == nil { newUncheckedRound := UncheckedRound{ - Id: rid, + Id: rid, Info: ri, Identity: Identity{ EpdId: ephID, @@ -226,7 +226,7 @@ func (s *UncheckedRoundStore) IterateOverList(iterator func(rid id.Round, for _, rnd := range s.list { jww.DEBUG.Printf("rnd for lookup: %d, %+v\n", rnd.Id, rnd) go func(localRid id.Round, - localRnd UncheckedRound){ + localRnd UncheckedRound) { iterator(localRid, localRnd) }(rnd.Id, rnd) }