Skip to content
Snippets Groups Projects
Commit c9dfa2a4 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Merge remote-tracking branch 'origin/release' into quantumSecure

parents 4ee8e9a2 955f1839
No related branches found
No related tags found
2 merge requests!117Release,!73Quantum secure xx messenger key negotiation
Showing
with 2516 additions and 18 deletions
...@@ -500,7 +500,7 @@ func (c *Client) GetErrorsChannel() <-chan interfaces.ClientError { ...@@ -500,7 +500,7 @@ func (c *Client) GetErrorsChannel() <-chan interfaces.ClientError {
// Handles both auth confirm and requests // Handles both auth confirm and requests
func (c *Client) StartNetworkFollower(timeout time.Duration) error { func (c *Client) StartNetworkFollower(timeout time.Duration) error {
u := c.GetUser() u := c.GetUser()
jww.INFO.Printf("StartNetworkFollower() \n\tTransmisstionID: %s "+ jww.INFO.Printf("StartNetworkFollower() \n\tTransmissionID: %s "+
"\n\tReceptionID: %s", u.TransmissionID, u.ReceptionID) "\n\tReceptionID: %s", u.TransmissionID, u.ReceptionID)
return c.followerServices.start(timeout) return c.followerServices.start(timeout)
...@@ -580,9 +580,9 @@ func (c *Client) GetNetworkInterface() interfaces.NetworkManager { ...@@ -580,9 +580,9 @@ func (c *Client) GetNetworkInterface() interfaces.NetworkManager {
} }
// GetNodeRegistrationStatus gets the current state of node registration. It // GetNodeRegistrationStatus gets the current state of node registration. It
// returns the the total number of nodes in the NDF and the number of those // returns the total number of nodes in the NDF and the number of those which
// which are currently registers with. An error is returned if the network is // are currently registers with. An error is returned if the network is not
// not healthy. // healthy.
func (c *Client) GetNodeRegistrationStatus() (int, int, error) { func (c *Client) GetNodeRegistrationStatus() (int, int, error) {
// Return an error if the network is not healthy // Return an error if the network is not healthy
if !c.GetHealth().IsHealthy() { if !c.GetHealth().IsHealthy() {
...@@ -595,19 +595,24 @@ func (c *Client) GetNodeRegistrationStatus() (int, int, error) { ...@@ -595,19 +595,24 @@ func (c *Client) GetNodeRegistrationStatus() (int, int, error) {
cmixStore := c.storage.Cmix() cmixStore := c.storage.Cmix()
var numRegistered int var numRegistered int
var numStale = 0
for i, n := range nodes { for i, n := range nodes {
nid, err := id.Unmarshal(n.ID) nid, err := id.Unmarshal(n.ID)
if err != nil { if err != nil {
return 0, 0, errors.Errorf("Failed to unmarshal node ID %v "+ return 0, 0, errors.Errorf("Failed to unmarshal node ID %v "+
"(#%d): %s", n.ID, i, err.Error()) "(#%d): %s", n.ID, i, err.Error())
} }
if n.Status == ndf.Stale {
numStale += 1
continue
}
if cmixStore.Has(nid) { if cmixStore.Has(nid) {
numRegistered++ numRegistered++
} }
} }
// Get the number of in progress node registrations // Get the number of in progress node registrations
return numRegistered, len(nodes), nil return numRegistered, len(nodes) - numStale, nil
} }
// DeleteContact is a function which removes a partner from Client's storage // DeleteContact is a function which removes a partner from Client's storage
......
...@@ -56,7 +56,7 @@ func (c *Client) SendCMIX(msg format.Message, recipientID *id.ID, ...@@ -56,7 +56,7 @@ func (c *Client) SendCMIX(msg format.Message, recipientID *id.ID,
// SendManyCMIX sends many "raw" CMIX message payloads to each of the // SendManyCMIX sends many "raw" CMIX message payloads to each of the
// provided recipients. Used for group chat functionality. Returns the // provided recipients. Used for group chat functionality. Returns the
// round ID of the round the payload was sent or an error if it fails. // round ID of the round the payload was sent or an error if it fails.
func (c *Client) SendManyCMIX(messages map[id.ID]format.Message, func (c *Client) SendManyCMIX(messages []message.TargetedCmixMessage,
params params.CMIX) (id.Round, []ephemeral.Id, error) { params params.CMIX) (id.Round, []ephemeral.Id, error) {
return c.network.SendManyCMIX(messages, params) return c.network.SendManyCMIX(messages, params)
} }
......
...@@ -100,16 +100,13 @@ func (t *testNetworkManagerGeneric) Follow(report interfaces.ClientErrorReport) ...@@ -100,16 +100,13 @@ func (t *testNetworkManagerGeneric) Follow(report interfaces.ClientErrorReport)
func (t *testNetworkManagerGeneric) CheckGarbledMessages() { func (t *testNetworkManagerGeneric) CheckGarbledMessages() {
return return
} }
func (t *testNetworkManagerGeneric) GetVerboseRounds() string { func (t *testNetworkManagerGeneric) GetVerboseRounds() string {
return "" return ""
} }
func (t *testNetworkManagerGeneric) SendE2E(message.Send, params.E2E, *stoppable.Single) ( func (t *testNetworkManagerGeneric) SendE2E(message.Send, params.E2E, *stoppable.Single) (
[]id.Round, cE2e.MessageID, time.Time, error) { []id.Round, cE2e.MessageID, time.Time, error) {
rounds := []id.Round{id.Round(0), id.Round(1), id.Round(2)} rounds := []id.Round{id.Round(0), id.Round(1), id.Round(2)}
return rounds, cE2e.MessageID{}, time.Time{}, nil return rounds, cE2e.MessageID{}, time.Time{}, nil
} }
func (t *testNetworkManagerGeneric) SendUnsafe(m message.Send, p params.Unsafe) ([]id.Round, error) { func (t *testNetworkManagerGeneric) SendUnsafe(m message.Send, p params.Unsafe) ([]id.Round, error) {
return nil, nil return nil, nil
...@@ -117,7 +114,7 @@ func (t *testNetworkManagerGeneric) SendUnsafe(m message.Send, p params.Unsafe) ...@@ -117,7 +114,7 @@ func (t *testNetworkManagerGeneric) SendUnsafe(m message.Send, p params.Unsafe)
func (t *testNetworkManagerGeneric) SendCMIX(message format.Message, rid *id.ID, p params.CMIX) (id.Round, ephemeral.Id, error) { func (t *testNetworkManagerGeneric) SendCMIX(message format.Message, rid *id.ID, p params.CMIX) (id.Round, ephemeral.Id, error) {
return id.Round(0), ephemeral.Id{}, nil return id.Round(0), ephemeral.Id{}, nil
} }
func (t *testNetworkManagerGeneric) SendManyCMIX(messages map[id.ID]format.Message, p params.CMIX) (id.Round, []ephemeral.Id, error) { func (t *testNetworkManagerGeneric) SendManyCMIX(messages []message.TargetedCmixMessage, p params.CMIX) (id.Round, []ephemeral.Id, error) {
return 0, []ephemeral.Id{}, nil return 0, []ephemeral.Id{}, nil
} }
func (t *testNetworkManagerGeneric) GetInstance() *network.Instance { func (t *testNetworkManagerGeneric) GetInstance() *network.Instance {
......
// Code generated by go generate; DO NOT EDIT. // Code generated by go generate; DO NOT EDIT.
// This file was generated by robots at // This file was generated by robots at
// 2021-10-25 17:01:53.308095 -0500 CDT m=+0.028238587 // 2021-11-11 13:19:35.619263 -0600 CST m=+0.049820030
package api package api
const GITVERSION = `391bf128 Merge branch 'Josh/ProtoClient' into 'release'` const GITVERSION = `e595b772 Merge branch 'hotfix/stale-registration' into 'release'`
const SEMVER = "3.1.0" const SEMVER = "3.2.0"
const DEPENDENCIES = `module gitlab.com/elixxir/client const DEPENDENCIES = `module gitlab.com/elixxir/client
go 1.13 go 1.13
...@@ -24,10 +24,10 @@ require ( ...@@ -24,10 +24,10 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/jwalterweatherman v1.1.0
github.com/spf13/viper v1.7.1 github.com/spf13/viper v1.7.1
gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228
gitlab.com/elixxir/comms v0.0.4-0.20211022203907-77e054b70ec6 gitlab.com/elixxir/comms v0.0.4-0.20211101174956-590ba1b47887
gitlab.com/elixxir/crypto v0.0.7-0.20211022013957-3a7899285c4c gitlab.com/elixxir/crypto v0.0.7-0.20211022013957-3a7899285c4c
gitlab.com/elixxir/ekv v0.1.5 gitlab.com/elixxir/ekv v0.1.5
gitlab.com/elixxir/primitives v0.0.3-0.20211014164029-06022665b576 gitlab.com/elixxir/primitives v0.0.3-0.20211102233208-a716d5c670b6
gitlab.com/xx_network/comms v0.0.4-0.20211014163953-e774276b83ae gitlab.com/xx_network/comms v0.0.4-0.20211014163953-e774276b83ae
gitlab.com/xx_network/crypto v0.0.5-0.20211014163843-57b345890686 gitlab.com/xx_network/crypto v0.0.5-0.20211014163843-57b345890686
gitlab.com/xx_network/primitives v0.0.4-0.20211014163031-53405cf191fb gitlab.com/xx_network/primitives v0.0.4-0.20211014163031-53405cf191fb
...@@ -35,7 +35,7 @@ require ( ...@@ -35,7 +35,7 @@ require (
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 golang.org/x/net v0.0.0-20210525063256-abc453219eb5
google.golang.org/genproto v0.0.0-20210105202744-fe13368bc0e1 // indirect google.golang.org/genproto v0.0.0-20210105202744-fe13368bc0e1 // indirect
google.golang.org/grpc v1.38.0 google.golang.org/grpc v1.38.0
google.golang.org/protobuf v1.26.0 google.golang.org/protobuf v1.27.1
gopkg.in/ini.v1 v1.62.0 // indirect gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
) )
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package bindings
import (
"encoding/json"
ft "gitlab.com/elixxir/client/fileTransfer"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/xx_network/primitives/id"
"time"
)
// FileTransfer contains the file transfer manager.
type FileTransfer struct {
m *ft.Manager
}
// FileTransferSentProgressFunc contains a function callback that tracks the
// progress of sending a file. It is called when a file part is sent, a file
// part arrives, the transfer completes, or on error.
type FileTransferSentProgressFunc interface {
SentProgressCallback(completed bool, sent, arrived, total int, err error)
}
// FileTransferReceivedProgressFunc contains a function callback that tracks the
// progress of receiving a file. It is called when a file part is received, the
// transfer completes, or on error.
type FileTransferReceivedProgressFunc interface {
ReceivedProgressCallback(completed bool, received, total int, err error)
}
// FileTransferReceiveFunc contains a function callback that notifies the
// receiver of an incoming file transfer. It is called on the reception of the
// initial file transfer message.
type FileTransferReceiveFunc interface {
ReceiveCallback(tid []byte, fileName string, sender []byte, size int,
preview []byte)
}
// NewFileTransferManager creates a new file transfer manager and starts the
// sending and receiving threads. The receiveFunc is called everytime a new file
// transfer is received. The parameters string is a JSON formatted string of the
// fileTransfer.Params object. If it is left empty, then defaults are used. It
// must match the following format: {"MaxThroughput":150000}
func NewFileTransferManager(client *Client, receiveFunc FileTransferReceiveFunc,
parameters string) (FileTransfer, error) {
receiveCB := func(tid ftCrypto.TransferID, fileName string, sender *id.ID,
size uint32, preview []byte) {
receiveFunc.ReceiveCallback(
tid.Bytes(), fileName, sender.Bytes(), int(size), preview)
}
// JSON unmarshal parameters string
p := ft.DefaultParams()
if parameters != "" {
err := json.Unmarshal([]byte(parameters), &p)
if err != nil {
return FileTransfer{}, err
}
}
// Create new file transfer manager
m, err := ft.NewManager(&client.api, receiveCB, p)
if err != nil {
return FileTransfer{}, err
}
// Start sending and receiving threads
err = client.api.AddService(m.StartProcesses)
if err != nil {
return FileTransfer{}, err
}
return FileTransfer{m}, nil
}
// Send sends a file to the recipient. The sender must have an E2E relationship
// with the recipient.
// The file name is the name of the file to show a user. It has a max length of
// 32 bytes.
// The file data cannot be larger than 4 mB
// The retry float is the total amount of data to send relative to the data
// size. Data will be resent on error and will resend up to [(1 + retry) *
// fileSize].
// The preview stores a preview of the data (such as a thumbnail) and is
// capped at 4 kB in size.
// Returns a unique transfer ID used to identify the transfer.
// PeriodMS is the duration, in milliseconds, to wait between progress callback
// calls. Set this large enough to prevent spamming.
func (f FileTransfer) Send(fileName string, fileData []byte, recipientID []byte,
retry float32, preview []byte, progressFunc FileTransferSentProgressFunc,
periodMS int) ([]byte, error) {
// Create SentProgressCallback
progressCB := func(completed bool, sent, arrived, total uint16, err error) {
progressFunc.SentProgressCallback(
completed, int(sent), int(arrived), int(total), err)
}
// Convert recipient ID bytes to id.ID
recipient, err := id.Unmarshal(recipientID)
if err != nil {
return []byte{}, err
}
// Convert period to time.Duration
period := time.Duration(periodMS) * time.Millisecond
// Send file
tid, err := f.m.Send(
fileName, fileData, recipient, retry, preview, progressCB, period)
if err != nil {
return nil, err
}
// Return transfer ID as bytes and error
return tid.Bytes(), nil
}
// RegisterSendProgressCallback allows for the registration of a callback to
// track the progress of an individual sent file transfer. The callback will be
// called immediately when added to report the current status of the transfer.
// It will then call every time a file part is sent, a file part arrives, the
// transfer completes, or an error occurs. It is called at most once every
// period, which means if events occur faster than the period, then they will
// not be reported and instead the progress will be reported once at the end of
// the period.
// The period is specified in milliseconds.
func (f FileTransfer) RegisterSendProgressCallback(transferID []byte,
progressFunc FileTransferSentProgressFunc, periodMS int) error {
// Unmarshal transfer ID
tid := ftCrypto.UnmarshalTransferID(transferID)
// Create SentProgressCallback
progressCB := func(completed bool, sent, arrived, total uint16, err error) {
progressFunc.SentProgressCallback(
completed, int(sent), int(arrived), int(total), err)
}
// Convert period to time.Duration
period := time.Duration(periodMS) * time.Millisecond
return f.m.RegisterSendProgressCallback(tid, progressCB, period)
}
// Resend resends a file if Send fails.
func (f FileTransfer) Resend(transferID []byte) error {
// Unmarshal transfer ID
tid := ftCrypto.UnmarshalTransferID(transferID)
return f.m.Resend(tid)
}
// CloseSend deletes a sent file transfer from the sent transfer map and from
// storage once a transfer has completed or reached the retry limit. Returns an
// error if the transfer has not run out of retries.
func (f FileTransfer) CloseSend(transferID []byte) error {
// Unmarshal transfer ID
tid := ftCrypto.UnmarshalTransferID(transferID)
return f.m.CloseSend(tid)
}
// RegisterReceiveProgressCallback allows for the registration of a callback to
// track the progress of an individual received file transfer. The callback will
// be called immediately when added to report the current status of the
// transfer. It will then call every time a file part is received, the transfer
// completes, or an error occurs. It is called at most once ever period, which
// means if events occur faster than the period, then they will not be reported
// and instead the progress will be reported once at the end of the period.
// Once the callback reports that the transfer has completed, the recipient
// can get the full file by calling Receive.
// The period is specified in milliseconds.
func (f FileTransfer) RegisterReceiveProgressCallback(transferID []byte,
progressFunc FileTransferReceivedProgressFunc, periodMS int) error {
// Unmarshal transfer ID
tid := ftCrypto.UnmarshalTransferID(transferID)
// Create ReceivedProgressCallback
progressCB := func(completed bool, received, total uint16, err error) {
progressFunc.ReceivedProgressCallback(
completed, int(received), int(total), err)
}
// Convert period to time.Duration
period := time.Duration(periodMS) * time.Millisecond
return f.m.RegisterReceiveProgressCallback(tid, progressCB, period)
}
// Receive returns the fully assembled file on the completion of the transfer.
// It deletes the transfer from the received transfer map and from storage.
// Returns an error if the transfer is not complete, the full file cannot be
// verified, or if the transfer cannot be found.
func (f FileTransfer) Receive(transferID []byte) ([]byte, error) {
// Unmarshal transfer ID
tid := ftCrypto.UnmarshalTransferID(transferID)
return f.m.Receive(tid)
}
////////////////////////////////////////////////////////////////////////////////
// Utility Functions //
////////////////////////////////////////////////////////////////////////////////
// GetMaxFilePreviewSize returns the maximum file preview size, in bytes.
func (f FileTransfer) GetMaxFilePreviewSize() int {
return ft.PreviewMaxSize
}
// GetMaxFileNameByteLength returns the maximum length, in bytes, allowed for a
// file name.
func (f FileTransfer) GetMaxFileNameByteLength() int {
return ft.FileNameMaxLen
}
// GetMaxFileSize returns the maximum file size, in bytes, allowed to be
// transferred.
func (f FileTransfer) GetMaxFileSize() int {
return ft.FileMaxSize
}
...@@ -8,6 +8,8 @@ ...@@ -8,6 +8,8 @@
package bindings package bindings
import ( import (
"encoding/json"
"fmt"
"github.com/pkg/errors" "github.com/pkg/errors"
gc "gitlab.com/elixxir/client/groupChat" gc "gitlab.com/elixxir/client/groupChat"
gs "gitlab.com/elixxir/client/groupChat/groupStore" gs "gitlab.com/elixxir/client/groupChat/groupStore"
...@@ -167,6 +169,12 @@ type NewGroupReport struct { ...@@ -167,6 +169,12 @@ type NewGroupReport struct {
err string err string
} }
type GroupReportDisk struct {
List []id.Round
GrpId []byte
Status int
}
// GetGroup returns the Group. // GetGroup returns the Group.
func (ngr *NewGroupReport) GetGroup() *Group { func (ngr *NewGroupReport) GetGroup() *Group {
return ngr.group return ngr.group
...@@ -193,6 +201,36 @@ func (ngr *NewGroupReport) GetError() string { ...@@ -193,6 +201,36 @@ func (ngr *NewGroupReport) GetError() string {
return ngr.err return ngr.err
} }
func (ngr *NewGroupReport) Marshal() ([]byte, error) {
grpReportDisk := GroupReportDisk{
List: ngr.rounds,
GrpId: ngr.group.GetID()[:],
Status: ngr.GetStatus(),
}
return json.Marshal(&grpReportDisk)
}
func (ngr *NewGroupReport) Unmarshal(b []byte) error {
grpReportDisk := GroupReportDisk{}
if err := json.Unmarshal(b, &grpReportDisk); err != nil {
return errors.New(fmt.Sprintf("Failed to unmarshal group "+
"report: %s", err.Error()))
}
grpId, err := id.Unmarshal(grpReportDisk.GrpId)
if err != nil {
return errors.New(fmt.Sprintf("Failed to unmarshal group "+
"id: %s", err.Error()))
}
ngr.group.g.ID = grpId
ngr.rounds = grpReportDisk.List
ngr.status = gc.RequestStatus(grpReportDisk.Status)
return nil
}
//// ////
// NewGroupReport Structure // NewGroupReport Structure
//// ////
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package cmd
import (
"fmt"
"github.com/spf13/cobra"
jww "github.com/spf13/jwalterweatherman"
"github.com/spf13/viper"
"gitlab.com/elixxir/client/api"
ft "gitlab.com/elixxir/client/fileTransfer"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/crypto/contact"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime"
"gitlab.com/xx_network/primitives/utils"
"io/ioutil"
"strconv"
"time"
)
const callbackPeriod = 25 * time.Millisecond
// ftCmd starts the file transfer manager and allows the sending and receiving
// of files.
var ftCmd = &cobra.Command{
Use: "fileTransfer",
Short: "Send and receive file for cMix client",
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
// Initialise a new client
client := initClient()
// Print user's reception ID and save contact file
user := client.GetUser()
jww.INFO.Printf("User: %s", user.ReceptionID)
writeContact(user.GetContact())
// Start the network follower
err := client.StartNetworkFollower(5 * time.Second)
if err != nil {
jww.FATAL.Panicf("Failed to start the network follower: %+v", err)
}
// Initialize the file transfer manager
maxThroughput := viper.GetInt("maxThroughput")
m, receiveChan := initFileTransferManager(client, maxThroughput)
// Wait until connected or crash on timeout
connected := make(chan bool, 10)
client.GetHealth().AddChannel(connected)
waitUntilConnected(connected)
// After connection, wait until registered with at least 85% of nodes
for numReg, total := 1, 100; numReg < (total*3)/4; {
time.Sleep(1 * time.Second)
numReg, total, err = client.GetNodeRegistrationStatus()
if err != nil {
jww.FATAL.Panicf("Failed to get node registration status: %+v",
err)
}
jww.INFO.Printf("Registering with nodes (%d/%d)...", numReg, total)
}
// Start thread that receives new file transfers and prints them to log
receiveQuit := make(chan struct{})
receiveDone := make(chan struct{})
go receiveNewFileTransfers(receiveChan, receiveDone, receiveQuit, m)
// If set, send the file to the recipient
sendDone := make(chan struct{})
if viper.IsSet("sendFile") {
recipientContactPath := viper.GetString("sendFile")
filePath := viper.GetString("filePath")
filePreviewPath := viper.GetString("filePreviewPath")
filePreviewString := viper.GetString("filePreviewString")
retry := float32(viper.GetFloat64("retry"))
sendFile(filePath, filePreviewPath, filePreviewString,
recipientContactPath, retry, m, sendDone)
}
// Wait until either the file finishes sending or the file finishes
// being received, stop the receiving thread, and exit
for done := false; !done; {
select {
case <-sendDone:
jww.DEBUG.Printf("Finished sending message. Stopping threads " +
"and network follower.")
done = true
case <-receiveDone:
jww.DEBUG.Printf("Finished receiving message. Stopping " +
"threads and network follower.")
done = true
}
}
// Stop reception thread
receiveQuit <- struct{}{}
// Stop network follower
err = client.StopNetworkFollower()
if err != nil {
jww.WARN.Printf("Failed to stop network follower: %+v", err)
}
jww.DEBUG.Print("File transfer finished stopping threads and network " +
"follower.")
},
}
// receivedFtResults is used to return received new file transfer results on a
// channel from a callback.
type receivedFtResults struct {
tid ftCrypto.TransferID
fileName string
sender *id.ID
size uint32
preview []byte
}
// initFileTransferManager creates a new file transfer manager with a new
// reception callback. Returns the file transfer manager and the channel that
// will be triggered when the callback is called.
func initFileTransferManager(client *api.Client, maxThroughput int) (
*ft.Manager, chan receivedFtResults) {
// Create interfaces.ReceiveCallback that returns the results on a channel
receiveChan := make(chan receivedFtResults, 100)
receiveCB := func(tid ftCrypto.TransferID, fileName string, sender *id.ID,
size uint32, preview []byte) {
receiveChan <- receivedFtResults{tid, fileName, sender, size, preview}
}
// Create new parameters
p := ft.DefaultParams()
if maxThroughput != 0 {
p = ft.NewParams(maxThroughput)
}
// Create new manager
manager, err := ft.NewManager(client, receiveCB, p)
if err != nil {
jww.FATAL.Panicf("Failed to create new file transfer manager: %+v", err)
}
// Start the file transfer sending and receiving threads
err = client.AddService(manager.StartProcesses)
if err != nil {
jww.FATAL.Panicf("Failed to start file transfer threads: %+v", err)
}
return manager, receiveChan
}
// sendFile sends the file to the recipient and prints the progress.
func sendFile(filePath, filePreviewPath, filePreviewString,
recipientContactPath string, retry float32, m *ft.Manager,
done chan struct{}) {
// Get file from path
fileData, err := utils.ReadFile(filePath)
if err != nil {
jww.FATAL.Panicf("Failed to read file %q: %+v", filePath, err)
}
// Get file preview from path
filePreviewData := []byte(filePreviewString)
if filePreviewPath != "" {
filePreviewData, err = utils.ReadFile(filePreviewPath)
if err != nil {
jww.FATAL.Panicf("Failed to read file preview %q: %+v",
filePreviewPath, err)
}
}
// Get recipient contact from file
recipient := getContactFromFile(recipientContactPath)
jww.DEBUG.Printf("Sending file %q of size %d to recipient %s.",
filePath, len(fileData), recipient.ID)
// Create sent progress callback that prints the results
progressCB := func(completed bool, sent, arrived, total uint16, err error) {
jww.DEBUG.Printf("Sent progress callback for %q "+
"{completed: %t, sent: %d, arrived: %d, total: %d, err: %v}\n",
filePath, completed, sent, arrived, total, err)
if (sent == 0 && arrived == 0) || (arrived == total) || completed ||
err != nil {
fmt.Printf("Sent progress callback for %q "+
"{completed: %t, sent: %d, arrived: %d, total: %d, err: %v}\n",
filePath, completed, sent, arrived, total, err)
}
if completed {
fmt.Printf("Completed sending file.\n")
done <- struct{}{}
} else if err != nil {
fmt.Printf("Failed sending file: %+v\n", err)
done <- struct{}{}
}
}
// Send the file
_, err = m.Send(filePath, fileData, recipient.ID, retry, filePreviewData,
progressCB, callbackPeriod)
if err != nil {
jww.FATAL.Panicf("Failed to send file %q to %s: %+v",
filePath, recipient.ID, err)
}
}
// receiveNewFileTransfers waits to receive new file transfers and prints its
// information to the log.
func receiveNewFileTransfers(receive chan receivedFtResults, done,
quit chan struct{}, m *ft.Manager) {
jww.DEBUG.Print("Starting thread waiting to receive NewFileTransfer " +
"E2E message.")
select {
case <-quit:
jww.DEBUG.Print("Quitting thread waiting for NewFileTransfer E2E " +
"message.")
return
case r := <-receive:
jww.DEBUG.Printf("Received new file %q transfer %s from %s of size %d "+
"bytes with preview: %q",
r.fileName, r.tid, r.sender, r.size, r.preview)
fmt.Printf("Received new file transfer %q of size %d "+
"bytes with preview: %q\n", r.fileName, r.size, r.preview)
cb := newReceiveProgressCB(r.tid, done, m)
err := m.RegisterReceiveProgressCallback(r.tid, cb, callbackPeriod)
if err != nil {
jww.FATAL.Panicf("Failed to register new receive progress "+
"callback for transfer %s: %+v", r.tid, err)
}
}
}
// newReceiveProgressCB creates a new reception progress callback that prints
// the results to the log.
func newReceiveProgressCB(tid ftCrypto.TransferID, done chan struct{},
m *ft.Manager) interfaces.ReceivedProgressCallback {
return func(completed bool, received, total uint16, err error) {
jww.DEBUG.Printf("Receive progress callback for transfer %s "+
"{completed: %t, received: %d, total: %d, err: %v}",
tid, completed, received, total, err)
if received == total || completed || err != nil {
fmt.Printf("Received progress callback "+
"{completed: %t, received: %d, total: %d, err: %v}\n",
completed, received, total, err)
}
if completed {
receivedFile, err2 := m.Receive(tid)
if err2 != nil {
jww.FATAL.Panicf("Failed to receive file %s: %+v", tid, err)
}
fmt.Printf("Completed receiving file:\n%s\n", receivedFile)
done <- struct{}{}
} else if err != nil {
fmt.Printf("Failed sending file: %+v\n", err)
done <- struct{}{}
}
}
}
// getContactFromFile loads the contact from the given file path.
func getContactFromFile(path string) contact.Contact {
data, err := ioutil.ReadFile(path)
jww.INFO.Printf("Read in contact file of size %d bytes", len(data))
if err != nil {
jww.FATAL.Panicf("Failed to read contact file: %+v", err)
}
c, err := contact.Unmarshal(data)
if err != nil {
jww.FATAL.Panicf("Failed to unmarshal contact: %+v", err)
}
return c
}
////////////////////////////////////////////////////////////////////////////////
// Command Line Flags //
////////////////////////////////////////////////////////////////////////////////
// init initializes commands and flags for Cobra.
func init() {
ftCmd.Flags().String("sendFile", "",
"Sends a file to a recipient with with the contact at this path.")
bindPFlagCheckErr("sendFile")
ftCmd.Flags().String("filePath", "testFile-"+timeNanoString()+".txt",
"The path to the file to send. Also used as the file name.")
bindPFlagCheckErr("filePath")
ftCmd.Flags().String("filePreviewPath", "",
"The path to the file preview to send. Set either this flag or "+
"filePreviewString.")
bindPFlagCheckErr("filePreviewPath")
ftCmd.Flags().String("filePreviewString", "",
"File preview data. Set either this flag or filePreviewPath.")
bindPFlagCheckErr("filePreviewString")
ftCmd.Flags().Int("maxThroughput", 0,
"Maximum data transfer speed to send file parts (in bytes per second)")
bindPFlagCheckErr("maxThroughput")
ftCmd.Flags().Float64("retry", 0.5,
"Retry rate.")
bindPFlagCheckErr("retry")
rootCmd.AddCommand(ftCmd)
}
// timeNanoString returns the current UNIX time in nanoseconds as a string.
func timeNanoString() string {
return strconv.Itoa(int(netTime.Now().UnixNano()))
}
// bindPFlagCheckErr binds the key to a pflag.Flag used by Cobra and prints an
// error if one occurs.
func bindPFlagCheckErr(key string) {
err := viper.BindPFlag(key, ftCmd.Flags().Lookup(key))
if err != nil {
jww.ERROR.Printf("viper.BindPFlag failed for %q: %+v", key, err)
}
}
...@@ -18,7 +18,7 @@ import ( ...@@ -18,7 +18,7 @@ import (
) )
// Change this value to set the version for this build // Change this value to set the version for this build
const currentVersion = "3.1.0" const currentVersion = "3.2.0"
func Version() string { func Version() string {
out := fmt.Sprintf("Elixxir Client v%s -- %s\n\n", api.SEMVER, out := fmt.Sprintf("Elixxir Client v%s -- %s\n\n", api.SEMVER,
......
...@@ -133,7 +133,7 @@ func (tnm *testNetworkManager) SendCMIX(message format.Message, ...@@ -133,7 +133,7 @@ func (tnm *testNetworkManager) SendCMIX(message format.Message,
return 0, ephemeral.Id{}, nil return 0, ephemeral.Id{}, nil
} }
func (tnm *testNetworkManager) SendManyCMIX(map[id.ID]format.Message, params.CMIX) ( func (tnm *testNetworkManager) SendManyCMIX([]message.TargetedCmixMessage, params.CMIX) (
id.Round, []ephemeral.Id, error) { id.Round, []ephemeral.Id, error) {
return 0, nil, nil return 0, nil, nil
} }
......
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package fileTransfer
import (
"encoding/binary"
"github.com/pkg/errors"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
)
// Size constants.
const (
paddingLen = ftCrypto.NonceSize // The length of the padding in bytes
partNumLen = 2 // The length of the part number in bytes
fmMinSize = partNumLen + paddingLen // Minimum size for the partMessage
)
// Error messages.
const (
newFmSizeErr = "size of external payload (%d) must be greater than %d"
unmarshalFmSizeErr = "size of passed in bytes (%d) must be greater than %d"
setFileFmErr = "length of part bytes (%d) must be smaller than maximum payload size %d"
)
/*
+-----------------------------------------+
| CMIX Message Contents |
+---------+-------------+-----------------+
| Padding | Part Number | File Data |
| 8 bytes | 2 bytes | remaining space |
+---------+-------------+-----------------+
*/
// partMessage contains part of the data being transferred and 256-bit padding
// that is used as a nonce.
type partMessage struct {
data []byte // Serial of all contents
padding []byte // Random padding bytes
partNum []byte // The part number of the file
part []byte // File part data
}
// newPartMessage generates a new part message that fits into the specified
// external payload size. An error is returned if the external payload size is
// too small to fit the part message.
func newPartMessage(externalPayloadSize int) (partMessage, error) {
if externalPayloadSize < fmMinSize {
return partMessage{},
errors.Errorf(newFmSizeErr, externalPayloadSize, fmMinSize)
}
return mapPartMessage(make([]byte, externalPayloadSize)), nil
}
// mapPartMessage maps the data to the components of a partMessage. It is mapped
// by reference; a copy is not made.
func mapPartMessage(data []byte) partMessage {
return partMessage{
data: data,
padding: data[:paddingLen],
partNum: data[paddingLen : paddingLen+partNumLen],
part: data[paddingLen+partNumLen:],
}
}
// unmarshalPartMessage converts the bytes into a partMessage. An error is
// returned if the size of the data is too small for a partMessage.
func unmarshalPartMessage(b []byte) (partMessage, error) {
if len(b) < fmMinSize {
return partMessage{},
errors.Errorf(unmarshalFmSizeErr, len(b), fmMinSize)
}
return mapPartMessage(b), nil
}
// marshal returns the byte representation of the partMessage.
func (m partMessage) marshal() []byte {
return m.data
}
// getPadding returns the padding in the message.
func (m partMessage) getPadding() []byte {
return m.padding
}
// setPadding sets the partMessage padding to the given bytes. Note that this
// padding should be random bytes generated via the appropriate crypto function.
func (m partMessage) setPadding(b []byte) {
copy(m.padding, b)
}
// getPartNum returns the file part number.
func (m partMessage) getPartNum() uint16 {
return binary.LittleEndian.Uint16(m.partNum)
}
// setPartNum sets the file part number.
func (m partMessage) setPartNum(num uint16) {
b := make([]byte, partNumLen)
binary.LittleEndian.PutUint16(b, num)
copy(m.partNum, b)
}
// getPart returns the file part data from the message.
func (m partMessage) getPart() []byte {
return m.part
}
// setPart sets the partMessage part to the given bytes. An error is returned if
// the size of the provided part data is too large to store.
func (m partMessage) setPart(b []byte) error {
if len(b) > len(m.part) {
return errors.Errorf(setFileFmErr, len(b), len(m.part))
}
copy(m.part, b)
return nil
}
// getPartSize returns the number of bytes available to store part data.
func (m partMessage) getPartSize() int {
return len(m.part)
}
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package fileTransfer
import (
"bytes"
"encoding/binary"
"fmt"
"math/rand"
"testing"
)
// Tests that newPartMessage returns a partMessage of the expected size.
func Test_newPartMessage(t *testing.T) {
externalPayloadSize := 256
fm, err := newPartMessage(externalPayloadSize)
if err != nil {
t.Errorf("newPartMessage returned an error: %+v", err)
}
if len(fm.data) != externalPayloadSize {
t.Errorf("Size of partMessage data does not match payload size."+
"\nexpected: %d\nreceived: %d", externalPayloadSize, len(fm.data))
}
}
// Error path: tests that newPartMessage returns the expected error when the
// external payload size is too small.
func Test_newPartMessage_SmallPayloadSizeError(t *testing.T) {
externalPayloadSize := fmMinSize - 1
expectedErr := fmt.Sprintf(newFmSizeErr, externalPayloadSize, fmMinSize)
_, err := newPartMessage(externalPayloadSize)
if err == nil || err.Error() != expectedErr {
t.Errorf("newPartMessage did not return the expected error when the "+
"given external payload size is too small."+
"\nexpected: %s\nreceived: %+v", expectedErr, err)
}
}
// Tests that mapPartMessage maps the data to the correct parts of the
// partMessage.
func Test_mapPartMessage(t *testing.T) {
// Generate expected values
_, expectedData, expectedPadding, expectedPartNum, expectedFile := newRandomFileMessage()
fm := mapPartMessage(expectedData)
if !bytes.Equal(expectedData, fm.data) {
t.Errorf("Incorrect data.\nexpected: %q\nreceived: %q",
expectedData, fm.data)
}
if !bytes.Equal(expectedPadding, fm.padding) {
t.Errorf("Incorrect padding data.\nexpected: %q\nreceived: %q",
expectedPadding, fm.padding)
}
if !bytes.Equal(expectedPartNum, fm.partNum) {
t.Errorf("Incorrect part number.\nexpected: %q\nreceived: %q",
expectedPartNum, fm.partNum)
}
if !bytes.Equal(expectedFile, fm.part) {
t.Errorf("Incorrect part data.\nexpected: %q\nreceived: %q",
expectedFile, fm.part)
}
}
// Tests that unmarshalPartMessage returns a partMessage with the expected
// values.
func Test_unmarshalPartMessage(t *testing.T) {
// Generate expected values
_, expectedData, expectedPadding, expectedPartNumb, expectedFile := newRandomFileMessage()
fm, err := unmarshalPartMessage(expectedData)
if err != nil {
t.Errorf("unmarshalPartMessage return an error: %+v", err)
}
if !bytes.Equal(expectedData, fm.data) {
t.Errorf("Incorrect data.\nexpected: %q\nreceived: %q",
expectedData, fm.data)
}
if !bytes.Equal(expectedPadding, fm.padding) {
t.Errorf("Incorrect padding data.\nexpected: %q\nreceived: %q",
expectedPadding, fm.padding)
}
if !bytes.Equal(expectedPartNumb, fm.partNum) {
t.Errorf("Incorrect part number.\nexpected: %q\nreceived: %q",
expectedPartNumb, fm.partNum)
}
if !bytes.Equal(expectedFile, fm.part) {
t.Errorf("Incorrect part data.\nexpected: %q\nreceived: %q",
expectedFile, fm.part)
}
}
// Error path: tests that unmarshalPartMessage returns the expected error when
// the provided data is too small to be unmarshalled into a partMessage.
func Test_unmarshalPartMessage_SizeError(t *testing.T) {
data := make([]byte, fmMinSize-1)
expectedErr := fmt.Sprintf(unmarshalFmSizeErr, len(data), fmMinSize)
_, err := unmarshalPartMessage(data)
if err == nil || err.Error() != expectedErr {
t.Errorf("unmarshalPartMessage did not return the expected error when "+
"the given bytes are too small to be a partMessage."+
"\nexpected: %s\nreceived: %+v", expectedErr, err)
}
}
// Tests that partMessage.marshal returns the correct data.
func Test_fileMessage_marshal(t *testing.T) {
fm, expectedData, _, _, _ := newRandomFileMessage()
data := fm.marshal()
if !bytes.Equal(expectedData, data) {
t.Errorf("Marshalled data does not match expected."+
"\nexpected: %q\nreceived: %q", expectedData, data)
}
}
// Tests that partMessage.getPadding returns the correct padding data.
func Test_fileMessage_getPadding(t *testing.T) {
fm, _, expectedPadding, _, _ := newRandomFileMessage()
padding := fm.getPadding()
if !bytes.Equal(expectedPadding, padding) {
t.Errorf("Padding data does not match expected."+
"\nexpected: %q\nreceived: %q", expectedPadding, padding)
}
}
// Tests that partMessage.setPadding sets the correct data.
func Test_fileMessage_setPadding(t *testing.T) {
fm, err := newPartMessage(256)
if err != nil {
t.Errorf("Failed to create new partMessage: %+v", err)
}
expectedPadding := make([]byte, paddingLen)
rand.New(rand.NewSource(42)).Read(expectedPadding)
fm.setPadding(expectedPadding)
if !bytes.Equal(expectedPadding, fm.getPadding()) {
t.Errorf("Failed to set correct padding.\nexpected: %q\nreceived: %q",
expectedPadding, fm.getPadding())
}
}
// Tests that partMessage.getPartNum returns the correct part number.
func Test_fileMessage_getPartNum(t *testing.T) {
fm, _, _, expectedPartNum, _ := newRandomFileMessage()
partNum := fm.getPartNum()
expected := binary.LittleEndian.Uint16(expectedPartNum)
if expected != partNum {
t.Errorf("Part number does not match expected."+
"\nexpected: %d\nreceived: %d", expected, partNum)
}
}
// Tests that partMessage.setPartNum sets the correct part number.
func Test_fileMessage_setPartNum(t *testing.T) {
fm, err := newPartMessage(256)
if err != nil {
t.Errorf("Failed to create new partMessage: %+v", err)
}
expectedPartNum := make([]byte, partNumLen)
rand.New(rand.NewSource(42)).Read(expectedPartNum)
expected := binary.LittleEndian.Uint16(expectedPartNum)
fm.setPartNum(expected)
if expected != fm.getPartNum() {
t.Errorf("Failed to set correct part number.\nexpected: %d\nreceived: %d",
expected, fm.getPartNum())
}
}
// Tests that partMessage.getPart returns the correct part data.
func Test_fileMessage_getFile(t *testing.T) {
fm, _, _, _, expectedFile := newRandomFileMessage()
file := fm.getPart()
if !bytes.Equal(expectedFile, file) {
t.Errorf("File data does not match expected."+
"\nexpected: %q\nreceived: %q", expectedFile, file)
}
}
// Tests that partMessage.setPart sets the correct part data.
func Test_fileMessage_setFile(t *testing.T) {
fm, err := newPartMessage(256)
if err != nil {
t.Errorf("Failed to create new partMessage: %+v", err)
}
fileData := make([]byte, 64)
rand.New(rand.NewSource(42)).Read(fileData)
expectedFile := make([]byte, fm.getPartSize())
copy(expectedFile, fileData)
err = fm.setPart(expectedFile)
if err != nil {
t.Errorf("setPart returned an error: %+v", err)
}
if !bytes.Equal(expectedFile, fm.getPart()) {
t.Errorf("Failed to set correct part data.\nexpected: %q\nreceived: %q",
expectedFile, fm.getPart())
}
}
// Error path: tests that partMessage.setPart returns the expected error when
// the provided part data is too large for the message.
func Test_fileMessage_setFile_FileTooLargeError(t *testing.T) {
fm, err := newPartMessage(fmMinSize + 1)
if err != nil {
t.Errorf("Failed to create new partMessage: %+v", err)
}
expectedErr := fmt.Sprintf(setFileFmErr, fm.getPartSize()+1, fm.getPartSize())
err = fm.setPart(make([]byte, fm.getPartSize()+1))
if err == nil || err.Error() != expectedErr {
t.Errorf("setPart did not return the expected error when the given "+
"part data is too large to fit in the partMessage."+
"\nexpected: %s\nreceived: %+v", expectedErr, err)
}
}
// Tests that partMessage.getPartSize returns the expected available space for
// the part data.
func Test_fileMessage_getFileSize(t *testing.T) {
expectedSize := 256
fm, err := newPartMessage(fmMinSize + expectedSize)
if err != nil {
t.Errorf("Failed to create new partMessage: %+v", err)
}
if expectedSize != fm.getPartSize() {
t.Errorf("File size incorrect.\nexpected: %d\nreceived: %d",
expectedSize, fm.getPartSize())
}
}
// newRandomFileMessage generates a new partMessage filled with random data and
// return the partMessage and its individual parts.
func newRandomFileMessage() (partMessage, []byte, []byte, []byte, []byte) {
prng := rand.New(rand.NewSource(42))
padding := make([]byte, paddingLen)
prng.Read(padding)
partNum := make([]byte, partNumLen)
prng.Read(partNum)
part := make([]byte, 64)
prng.Read(part)
data := append(append(padding, partNum...), part...)
fm := mapPartMessage(data)
return fm, data, padding, partNum, part
}
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: fileTransfer/ftMessages.proto
package fileTransfer
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type NewFileTransfer struct {
FileName string `protobuf:"bytes,1,opt,name=fileName,proto3" json:"fileName,omitempty"`
TransferKey []byte `protobuf:"bytes,2,opt,name=transferKey,proto3" json:"transferKey,omitempty"`
TransferMac []byte `protobuf:"bytes,3,opt,name=transferMac,proto3" json:"transferMac,omitempty"`
NumParts uint32 `protobuf:"varint,4,opt,name=numParts,proto3" json:"numParts,omitempty"`
Size uint32 `protobuf:"varint,5,opt,name=size,proto3" json:"size,omitempty"`
Retry float32 `protobuf:"fixed32,6,opt,name=retry,proto3" json:"retry,omitempty"`
Preview []byte `protobuf:"bytes,7,opt,name=preview,proto3" json:"preview,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *NewFileTransfer) Reset() { *m = NewFileTransfer{} }
func (m *NewFileTransfer) String() string { return proto.CompactTextString(m) }
func (*NewFileTransfer) ProtoMessage() {}
func (*NewFileTransfer) Descriptor() ([]byte, []int) {
return fileDescriptor_9d574f363dd34365, []int{0}
}
func (m *NewFileTransfer) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_NewFileTransfer.Unmarshal(m, b)
}
func (m *NewFileTransfer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_NewFileTransfer.Marshal(b, m, deterministic)
}
func (m *NewFileTransfer) XXX_Merge(src proto.Message) {
xxx_messageInfo_NewFileTransfer.Merge(m, src)
}
func (m *NewFileTransfer) XXX_Size() int {
return xxx_messageInfo_NewFileTransfer.Size(m)
}
func (m *NewFileTransfer) XXX_DiscardUnknown() {
xxx_messageInfo_NewFileTransfer.DiscardUnknown(m)
}
var xxx_messageInfo_NewFileTransfer proto.InternalMessageInfo
func (m *NewFileTransfer) GetFileName() string {
if m != nil {
return m.FileName
}
return ""
}
func (m *NewFileTransfer) GetTransferKey() []byte {
if m != nil {
return m.TransferKey
}
return nil
}
func (m *NewFileTransfer) GetTransferMac() []byte {
if m != nil {
return m.TransferMac
}
return nil
}
func (m *NewFileTransfer) GetNumParts() uint32 {
if m != nil {
return m.NumParts
}
return 0
}
func (m *NewFileTransfer) GetSize() uint32 {
if m != nil {
return m.Size
}
return 0
}
func (m *NewFileTransfer) GetRetry() float32 {
if m != nil {
return m.Retry
}
return 0
}
func (m *NewFileTransfer) GetPreview() []byte {
if m != nil {
return m.Preview
}
return nil
}
func init() {
proto.RegisterType((*NewFileTransfer)(nil), "parse.NewFileTransfer")
}
func init() { proto.RegisterFile("fileTransfer/ftMessages.proto", fileDescriptor_9d574f363dd34365) }
var fileDescriptor_9d574f363dd34365 = []byte{
// 202 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x4d, 0xcb, 0xcc, 0x49,
0x0d, 0x29, 0x4a, 0xcc, 0x2b, 0x4e, 0x4b, 0x2d, 0xd2, 0x4f, 0x2b, 0xf1, 0x4d, 0x2d, 0x2e, 0x4e,
0x4c, 0x4f, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2d, 0x48, 0x2c, 0x2a, 0x4e,
0x55, 0xba, 0xc8, 0xc8, 0xc5, 0xef, 0x97, 0x5a, 0xee, 0x86, 0xa4, 0x56, 0x48, 0x8a, 0x8b, 0x03,
0xa4, 0xd7, 0x2f, 0x31, 0x37, 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x33, 0x08, 0xce, 0x17, 0x52,
0xe0, 0xe2, 0x2e, 0x81, 0xaa, 0xf3, 0x4e, 0xad, 0x94, 0x60, 0x52, 0x60, 0xd4, 0xe0, 0x09, 0x42,
0x16, 0x42, 0x56, 0xe1, 0x9b, 0x98, 0x2c, 0xc1, 0x8c, 0xaa, 0xc2, 0x37, 0x31, 0x19, 0x64, 0x7e,
0x5e, 0x69, 0x6e, 0x40, 0x62, 0x51, 0x49, 0xb1, 0x04, 0x8b, 0x02, 0xa3, 0x06, 0x6f, 0x10, 0x9c,
0x2f, 0x24, 0xc4, 0xc5, 0x52, 0x9c, 0x59, 0x95, 0x2a, 0xc1, 0x0a, 0x16, 0x07, 0xb3, 0x85, 0x44,
0xb8, 0x58, 0x8b, 0x52, 0x4b, 0x8a, 0x2a, 0x25, 0xd8, 0x14, 0x18, 0x35, 0x98, 0x82, 0x20, 0x1c,
0x21, 0x09, 0x2e, 0xf6, 0x82, 0xa2, 0xd4, 0xb2, 0xcc, 0xd4, 0x72, 0x09, 0x76, 0xb0, 0x1d, 0x30,
0xae, 0x13, 0x5f, 0x14, 0x0f, 0xb2, 0xdf, 0x93, 0xd8, 0xc0, 0x3e, 0x36, 0x06, 0x04, 0x00, 0x00,
0xff, 0xff, 0x3e, 0x0f, 0x1c, 0x27, 0x12, 0x01, 0x00, 0x00,
}
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
syntax = "proto3";
package parse;
option go_package = "fileTransfer";
message NewFileTransfer {
string fileName = 1; // Name of the file; max 32 characters
bytes transferKey = 2; // 256 bit encryption key to identify the transfer
bytes transferMac = 3; // 256 bit MAC of the entire file
uint32 numParts = 4; // Number of file parts
uint32 size = 5; // The size of the file; max of 4 mB
float retry = 6; // Used to determine how many times to retry sending
bytes preview = 7; // A preview of the file; max of 4 kB
}
\ No newline at end of file
#!/bin/bash
protoc --go_out=paths=source_relative:. fileTransfer/ftMessages.proto
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package fileTransfer
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/api"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage"
ftStorage "gitlab.com/elixxir/client/storage/fileTransfer"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/fastRNG"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/xx_network/primitives/id"
"time"
)
const (
// PreviewMaxSize is the maximum size, in bytes, for a file preview.
// Currently, it is set to 4 kB.
PreviewMaxSize = 4_000
// FileNameMaxLen is the maximum size, in bytes, for a file name. Currently,
// it is set to 32 bytes.
FileNameMaxLen = 32
// FileMaxSize is the maximum file size that can be transferred. Currently,
// it is set to 4 mB.
FileMaxSize = 4_000_000
// minPartsSendPerRound is the minimum number of file parts sent each round.
minPartsSendPerRound = 1
// maxPartsSendPerRound is the maximum number of file parts sent each round.
maxPartsSendPerRound = 11
// Size of the buffered channel that queues file parts to send
sendQueueBuffLen = 10_000
// Size of the buffered channel that reports if the network is healthy
networkHealthBuffLen = 10_000
)
// Error messages.
const (
// newManager
newManagerSentErr = "failed to load or create new list of sent file transfers: %+v"
newManagerReceivedErr = "failed to load or create new list of received file transfers: %+v"
// Manager.Send
fileNameSizeErr = "length of filename (%d) greater than max allowed length (%d)"
fileSizeErr = "size of file (%d bytes) greater than max allowed size (%d bytes)"
previewSizeErr = "size of preview (%d bytes) greater than max allowed size (%d bytes)"
getPartSizeErr = "failed to get file part size: %+v"
sendInitMsgErr = "failed to send initial file transfer message: %+v"
// Manager.Resend
transferNotFailedErr = "transfer %s has not failed"
// Manager.CloseSend
transferInProgressErr = "transfer %s has not completed or failed"
)
// Stoppable and listener values.
const (
rawMessageBuffSize = 10_000
sendStoppableName = "FileTransferSend"
newFtStoppableName = "FileTransferNew"
newFtListenerName = "FileTransferNewListener"
filePartStoppableName = "FilePart"
filePartListenerName = "FilePartListener"
fileTransferStoppableName = "FileTransfer"
)
// Manager is used to manage the sending and receiving of all file transfers.
type Manager struct {
// Callback that is called every time a new file transfer is received
receiveCB interfaces.ReceiveCallback
// Storage-backed structure for tracking sent file transfers
sent *ftStorage.SentFileTransfers
// Storage-backed structure for tracking received file transfers
received *ftStorage.ReceivedFileTransfers
// Queue of parts to send
sendQueue chan queuedPart
// Maximum data transfer speed in bytes per second
maxThroughput int
// Client interfaces
client *api.Client
store *storage.Session
swb interfaces.Switchboard
net interfaces.NetworkManager
healthy chan bool
rng *fastRNG.StreamGenerator
getRoundResults getRoundResultsFunc
}
// getRoundResultsFunc is a function that matches client.GetRoundResults. It is
// used to pass in an alternative function for testing.
type getRoundResultsFunc func(roundList []id.Round, timeout time.Duration,
roundCallback api.RoundEventCallback) error
// queuedPart contains the unique information identifying a file part.
type queuedPart struct {
tid ftCrypto.TransferID
partNum uint16
}
// NewManager produces a new empty file transfer Manager. Does not start sending
// and receiving services.
func NewManager(client *api.Client, receiveCB interfaces.ReceiveCallback,
p Params) (*Manager, error) {
return newManager(client, client.GetStorage(), client.GetSwitchboard(),
client.GetNetworkInterface(), client.GetRng(), client.GetRoundResults,
client.GetStorage().GetKV(), receiveCB, p)
}
// newManager builds the manager from fields explicitly passed in. This function
// is a helper function for NewManager to make it easier to test.
func newManager(client *api.Client, store *storage.Session,
swb interfaces.Switchboard, net interfaces.NetworkManager,
rng *fastRNG.StreamGenerator, getRoundResults getRoundResultsFunc,
kv *versioned.KV, receiveCB interfaces.ReceiveCallback, p Params) (
*Manager, error) {
// Create a new list of sent file transfers or load one if it exists in
// storage
sent, err := ftStorage.NewOrLoadSentFileTransfers(kv)
if err != nil {
return nil, errors.Errorf(newManagerSentErr, err)
}
// Create a new list of received file transfers or load one if it exists in
// storage
received, err := ftStorage.NewOrLoadReceivedFileTransfers(kv)
if err != nil {
return nil, errors.Errorf(newManagerReceivedErr, err)
}
return &Manager{
receiveCB: receiveCB,
sent: sent,
received: received,
sendQueue: make(chan queuedPart, sendQueueBuffLen),
maxThroughput: p.MaxThroughput,
client: client,
store: store,
swb: swb,
net: net,
healthy: make(chan bool, networkHealthBuffLen),
rng: rng,
getRoundResults: getRoundResults,
}, nil
}
// StartProcesses starts the processes needed to send and receive file parts. It
// starts three threads that (1) receives the initial NewFileTransfer E2E
// message; (2) receives each file part; and (3) sends file parts. It also
// registers the network health channel.
func (m *Manager) StartProcesses() (stoppable.Stoppable, error) {
// Create the two reception channels
newFtChan := make(chan message.Receive, rawMessageBuffSize)
filePartChan := make(chan message.Receive, rawMessageBuffSize)
return m.startProcesses(newFtChan, filePartChan)
}
// startProcesses starts the sending and receiving processes with the provided
// channels.
func (m *Manager) startProcesses(newFtChan, filePartChan chan message.Receive) (
stoppable.Stoppable, error) {
// Register network health channel that is used by the sending thread to
// ensure the network is healthy before sending
m.net.GetHealthTracker().AddChannel(m.healthy)
// Start the new file transfer message reception thread
newFtStop := stoppable.NewSingle(newFtStoppableName)
m.swb.RegisterChannel(newFtListenerName, &id.ID{},
message.NewFileTransfer, newFtChan)
go m.receiveNewFileTransfer(newFtChan, newFtStop)
// Start the file part message reception thread
filePartStop := stoppable.NewSingle(filePartStoppableName)
m.swb.RegisterChannel(filePartListenerName, &id.ID{}, message.Raw,
filePartChan)
go m.receive(filePartChan, filePartStop)
// Start the file part sending thread
sendStop := stoppable.NewSingle(sendStoppableName)
go m.sendThread(sendStop, getRandomNumParts)
// Create a multi stoppable
multiStoppable := stoppable.NewMulti(fileTransferStoppableName)
multiStoppable.Add(newFtStop)
multiStoppable.Add(filePartStop)
multiStoppable.Add(sendStop)
return multiStoppable, nil
}
// Send starts the sending of a file transfer to the recipient. It sends the
// initial NewFileTransfer E2E message to the recipient to inform them of the
// incoming file parts. It partitions the file, puts it into storage, and queues
// each file for sending. Returns a unique ID identifying the file transfer.
func (m Manager) Send(fileName string, fileData []byte, recipient *id.ID,
retry float32, preview []byte, progressCB interfaces.SentProgressCallback,
period time.Duration) (ftCrypto.TransferID, error) {
// Return an error if the file name is too long
if len(fileName) > FileNameMaxLen {
return ftCrypto.TransferID{}, errors.Errorf(
fileNameSizeErr, len(fileName), FileNameMaxLen)
}
// Return an error if the file is too large
if len(fileData) > FileMaxSize {
return ftCrypto.TransferID{}, errors.Errorf(
fileSizeErr, len(fileData), FileMaxSize)
}
// Return an error if the preview is too large
if len(preview) > PreviewMaxSize {
return ftCrypto.TransferID{}, errors.Errorf(
previewSizeErr, len(preview), PreviewMaxSize)
}
// Generate new transfer key
rng := m.rng.GetStream()
transferKey, err := ftCrypto.NewTransferKey(rng)
if err != nil {
rng.Close()
return ftCrypto.TransferID{}, err
}
rng.Close()
// Get the size of each file part
partSize, err := m.getPartSize()
if err != nil {
return ftCrypto.TransferID{}, errors.Errorf(getPartSizeErr, err)
}
// Generate transfer MAC
mac := ftCrypto.CreateTransferMAC(fileData, transferKey)
// Partition the file into parts
parts := partitionFile(fileData, partSize)
numParts := uint16(len(parts))
fileSize := uint32(len(fileData))
// Send the initial file transfer message over E2E
err = m.sendNewFileTransfer(recipient, fileName, transferKey, mac, numParts,
fileSize, retry, preview)
if err != nil {
return ftCrypto.TransferID{}, errors.Errorf(sendInitMsgErr, err)
}
// Calculate the number of fingerprints to generate
numFps := calcNumberOfFingerprints(numParts, retry)
// Add the transfer to storage
rng = m.rng.GetStream()
transferID, err := m.sent.AddTransfer(
recipient, transferKey, parts, numFps, progressCB, period, rng)
if err != nil {
return ftCrypto.TransferID{}, err
}
rng.Close()
m.queueParts(transferID, numParts)
return transferID, nil
}
// RegisterSendProgressCallback adds the sent progress callback to the sent
// transfer so that it will be called when updates for the transfer occur. The
// progress callback is called when initially added and on transfer updates, at
// most once per period.
func (m Manager) RegisterSendProgressCallback(tid ftCrypto.TransferID,
progressCB interfaces.SentProgressCallback, period time.Duration) error {
// Get the transfer for the given ID
transfer, err := m.sent.GetTransfer(tid)
if err != nil {
return err
}
// Add the progress callback
transfer.AddProgressCB(progressCB, period)
return nil
}
// Resend resends a file if Send fails. Returns an error if CloseSend
// was already called or if the transfer did not run out of retries.
// TODO: add test
// TODO: write test
// TODO: can you resend? Can you reuse fingerprints?
// TODO: what to do if sendE2E fails?
func (m Manager) Resend(tid ftCrypto.TransferID) error {
// Get the transfer for the given ID
transfer, err := m.sent.GetTransfer(tid)
if err != nil {
return err
}
// Check if the transfer has run out of fingerprints, which occurs when the
// retry limit is reached
if transfer.GetNumAvailableFps() > 0 {
return errors.Errorf(transferNotFailedErr, tid)
}
return nil
}
// CloseSend deletes a sent file transfer from the sent transfer map and from
// storage once a transfer has completed or reached the retry limit. Returns an
// error if the transfer has not run out of retries.
func (m Manager) CloseSend(tid ftCrypto.TransferID) error {
// Get the transfer for the given ID
transfer, err := m.sent.GetTransfer(tid)
if err != nil {
return err
}
// Check if the transfer has completed or run out of fingerprints, which
// occurs when the retry limit is reached
completed, _, _, _ := transfer.GetProgress()
if transfer.GetNumAvailableFps() > 0 && !completed {
return errors.Errorf(transferInProgressErr, tid)
}
// Delete the transfer from storage
return m.sent.DeleteTransfer(tid)
}
// RegisterReceiveProgressCallback adds the reception progress callback to the
// received transfer so that it will be called when updates for the transfer
// occur. The progress callback is called when initially added and on transfer
// updates, at most once per period.
func (m Manager) RegisterReceiveProgressCallback(tid ftCrypto.TransferID,
progressCB interfaces.ReceivedProgressCallback, period time.Duration) error {
// Get the transfer for the given ID
transfer, err := m.received.GetTransfer(tid)
if err != nil {
return err
}
// Add the progress callback
transfer.AddProgressCB(progressCB, period)
return nil
}
// Receive returns the fully assembled file on the completion of the transfer.
// It deletes the transfer from the received transfer map and from storage.
// Returns an error if the transfer is not complete, the full file cannot be
// verified, or if the transfer cannot be found.
func (m Manager) Receive(tid ftCrypto.TransferID) ([]byte, error) {
// Get the transfer for the given ID
transfer, err := m.received.GetTransfer(tid)
if err != nil {
return nil, err
}
// Get the file from the transfer
file, err := transfer.GetFile()
if err != nil {
return nil, err
}
// Return the file and delete the transfer from storage
return file, m.received.DeleteTransfer(tid)
}
// calcNumberOfFingerprints is the formula used to calculate the number of
// fingerprints to generate, which is based off the number of file parts and the
// retry float.
func calcNumberOfFingerprints(numParts uint16, retry float32) uint16 {
return uint16(float32(numParts) * (1 + retry))
}
This diff is collapsed.
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package fileTransfer
const (
defaultMaxThroughput = 150_000 // 150 kB per second
)
// Params contains parameters used for file transfer.
type Params struct {
// MaxThroughput is the maximum data transfer speed to send file parts (in
// bytes per second)
MaxThroughput int
}
// NewParams generates a new Params object filled with the given parameters.
func NewParams(maxThroughput int) Params {
return Params{
MaxThroughput: maxThroughput,
}
}
// DefaultParams returns a Params object filled with the default values.
func DefaultParams() Params {
return Params{
MaxThroughput: defaultMaxThroughput,
}
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package fileTransfer
import (
"reflect"
"testing"
)
// Tests that NewParams returns the expected Params object.
func TestNewParams(t *testing.T) {
expected := Params{
MaxThroughput: 42,
}
received := NewParams(expected.MaxThroughput)
if !reflect.DeepEqual(expected, received) {
t.Errorf("Received Params does not match expected."+
"\nexpected: %+v\nreceived: %+v", expected, received)
}
}
// Tests that DefaultParams returns a Params object with the expected defaults.
func TestDefaultParams(t *testing.T) {
expected := Params{MaxThroughput: defaultMaxThroughput}
received := DefaultParams()
if !reflect.DeepEqual(expected, received) {
t.Errorf("Received Params does not match expected."+
"\nexpected: %+v\nreceived: %+v", expected, received)
}
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package fileTransfer
import (
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/primitives/format"
"strings"
)
// Error messages.
const (
// Manager.readMessage
unmarshalPartMessageErr = "failed to unmarshal cMix message contents into file part message: %+v"
)
// receive runs a loop that receives file message parts and stores them in their
// appropriate transfer.
func (m *Manager) receive(rawMsgs chan message.Receive, stop *stoppable.Single) {
jww.DEBUG.Print("Starting file part reception thread.")
for {
select {
case <-stop.Quit():
jww.DEBUG.Print("Stopping file part reception thread: stoppable " +
"triggered")
stop.ToStopped()
return
case receiveMsg := <-rawMsgs:
cMixMsg, err := m.readMessage(receiveMsg)
if err != nil {
// Print error as warning unless the fingerprint does not match,
// which means this message is not of the correct type and will
// be ignored
if strings.Contains(err.Error(), "fingerprint") {
jww.INFO.Print(err)
} else {
jww.WARN.Print(err)
}
continue
}
// Denote that the message is a file part
m.store.GetGarbledMessages().Remove(cMixMsg)
}
}
}
// readMessage unmarshal the payload in the message.Receive and stores it with
// the appropriate received transfer. The cMix message is returned so that, on
// error, it can be either marked as used not used.
func (m *Manager) readMessage(msg message.Receive) (format.Message, error) {
// Unmarshal payload into cMix message
cMixMsg := format.Unmarshal(msg.Payload)
// Unmarshal cMix message contents into a file part message
partMsg, err := unmarshalPartMessage(cMixMsg.GetContents())
if err != nil {
return cMixMsg, errors.Errorf(unmarshalPartMessageErr, err)
}
// Add part to received transfer
transfer, _, err := m.received.AddPart(partMsg.getPart(),
partMsg.getPadding(), cMixMsg.GetMac(), partMsg.getPartNum(),
cMixMsg.GetKeyFP())
if err != nil {
return cMixMsg, err
}
// Call callback with updates
transfer.CallProgressCB(nil)
return cMixMsg, nil
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package fileTransfer
import (
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/stoppable"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/xx_network/primitives/id"
)
// Error messages.
const (
receiveMessageTypeErr = "received message is not of type NewFileTransfer"
protoUnmarshalErr = "failed to unmarshal request: %+v"
)
// receiveNewFileTransfer starts a thread that waits for new file transfer
// messages.
func (m *Manager) receiveNewFileTransfer(rawMsgs chan message.Receive,
stop *stoppable.Single) {
jww.DEBUG.Print("Starting new file transfer message reception thread.")
for {
select {
case <-stop.Quit():
jww.DEBUG.Print("Stopping new file transfer message reception " +
"thread: stoppable triggered")
stop.ToStopped()
return
case receivedMsg := <-rawMsgs:
jww.DEBUG.Print("New file transfer message thread received message.")
tid, fileName, sender, size, preview, err :=
m.readNewFileTransferMessage(receivedMsg)
if err != nil {
if err.Error() == receiveMessageTypeErr {
jww.INFO.Printf("Failed to read message as new file "+
"transfer message: %+v", err)
} else {
jww.WARN.Printf("Failed to read message as new file "+
"transfer message: %+v", err)
}
continue
}
// Call the reception callback
go m.receiveCB(tid, fileName, sender, size, preview)
// Trigger a resend of all garbled messages
m.net.CheckGarbledMessages()
}
}
}
// readNewFileTransferMessage reads the received message and adds it to the
// received transfer list. Returns the transfer ID, sender ID, file size, and
// file preview.
func (m *Manager) readNewFileTransferMessage(msg message.Receive) (
ftCrypto.TransferID, string, *id.ID, uint32, []byte, error) {
// Return an error if the message is not a NewFileTransfer
if msg.MessageType != message.NewFileTransfer {
return ftCrypto.TransferID{}, "", nil, 0, nil,
errors.New(receiveMessageTypeErr)
}
// Unmarshal the request message
newFT := &NewFileTransfer{}
err := proto.Unmarshal(msg.Payload, newFT)
if err != nil {
return ftCrypto.TransferID{}, "", nil, 0, nil,
errors.Errorf(protoUnmarshalErr, err)
}
// Get RNG from stream
rng := m.rng.GetStream()
defer rng.Close()
// Add the transfer to the list of receiving transfers
key := ftCrypto.UnmarshalTransferKey(newFT.TransferKey)
numParts := uint16(newFT.NumParts)
numFps := calcNumberOfFingerprints(numParts, newFT.Retry)
tid, err := m.received.AddTransfer(
key, newFT.TransferMac, newFT.Size, numParts, numFps, rng)
if err != nil {
return ftCrypto.TransferID{}, "", nil, 0, nil, err
}
return tid, newFT.FileName, msg.Sender, newFT.Size, newFT.Preview, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment