diff --git a/cmd/fileTransfer.go b/cmd/fileTransfer.go index 78442b9afd575e44a111ea3976ecb8f8ddfef3ef..0d7b26e6c6485cc2134d85f91645cb3a40fc729b 100644 --- a/cmd/fileTransfer.go +++ b/cmd/fileTransfer.go @@ -16,7 +16,8 @@ import ( jww "github.com/spf13/jwalterweatherman" "github.com/spf13/viper" "gitlab.com/elixxir/client/api" - ft "gitlab.com/elixxir/client/fileTransfer" + ft "gitlab.com/elixxir/client/fileTransfer2" + ftE2e "gitlab.com/elixxir/client/fileTransfer2/e2e" "gitlab.com/elixxir/crypto/contact" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" "gitlab.com/xx_network/primitives/id" @@ -55,8 +56,8 @@ var ftCmd = &cobra.Command{ // Wait until connected or crash on timeout connected := make(chan bool, 10) client.GetNetworkInterface().AddHealthCallback( - func(isconnected bool) { - connected <- isconnected + func(isConnected bool) { + connected <- isConnected }) waitUntilConnected(connected) @@ -66,8 +67,8 @@ var ftCmd = &cobra.Command{ numReg, total, err = client.GetNodeRegistrationStatus() if err != nil { - jww.FATAL.Panicf("Failed to get node registration status: %+v", - err) + jww.FATAL.Panicf( + "Failed to get node registration status: %+v", err) } jww.INFO.Printf("Registering with nodes (%d/%d)...", numReg, total) @@ -132,7 +133,7 @@ type receivedFtResults struct { // reception callback. Returns the file transfer manager and the channel that // will be triggered when the callback is called. func initFileTransferManager(client *api.Client, maxThroughput int) ( - ft.FileTransfer, chan receivedFtResults) { + *ftE2e.Wrapper, chan receivedFtResults) { // Create interfaces.ReceiveCallback that returns the results on a channel receiveChan := make(chan receivedFtResults, 100) @@ -150,8 +151,11 @@ func initFileTransferManager(client *api.Client, maxThroughput int) ( } // Create new manager - // TODO: Fix NewManager parameters - manager, err := ft.NewManager(receiveCB, p, nil, nil, nil, nil, nil) + manager, err := ft.NewManager(p, + client.GetUser().ReceptionID, + client.GetNetworkInterface(), + client.GetStorage().GetKV(), + client.GetRng()) if err != nil { jww.FATAL.Panicf( "[FT] Failed to create new file transfer manager: %+v", err) @@ -163,12 +167,21 @@ func initFileTransferManager(client *api.Client, maxThroughput int) ( jww.FATAL.Panicf("[FT] Failed to start file transfer threads: %+v", err) } - return manager, receiveChan + e2eParams := ftE2e.DefaultParams() + e2eFt, err := ftE2e.NewWrapper(receiveCB, e2eParams, manager, + client.GetUser().ReceptionID, client.GetE2EHandler(), + client.GetNetworkInterface()) + if err != nil { + jww.FATAL.Panicf( + "[FT] Failed to create new e2e file transfer wrapper: %+v", err) + } + + return e2eFt, receiveChan } // sendFile sends the file to the recipient and prints the progress. func sendFile(filePath, fileType, filePreviewPath, filePreviewString, - recipientContactPath string, retry float32, m ft.FileTransfer, + recipientContactPath string, retry float32, m *ftE2e.Wrapper, done chan struct{}) { // Get file from path @@ -205,7 +218,7 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, // Create sent progress callback that prints the results progressCB := func(completed bool, arrived, total uint16, - t ft.FilePartTracker, err error) { + st ft.SentTransfer, _ ft.FilePartTracker, err error) { jww.INFO.Printf("[FT] Sent progress callback for %q "+ "{completed: %t, arrived: %d, total: %d, err: %v}", fileName, completed, arrived, total, err) @@ -236,7 +249,7 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, sendStart = netTime.Now() // Send the file - tid, err := m.Send(fileName, fileType, fileData, recipient.ID, retry, + tid, err := m.Send(recipient.ID, fileName, fileType, fileData, retry, filePreviewData, progressCB, callbackPeriod) if err != nil { jww.FATAL.Panicf("[FT] Failed to send file %q to %s: %+v", @@ -251,7 +264,7 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString, // receiveNewFileTransfers waits to receive new file transfers and prints its // information to the log. func receiveNewFileTransfers(receive chan receivedFtResults, done, - quit chan struct{}, m ft.FileTransfer) { + quit chan struct{}, m *ftE2e.Wrapper) { jww.INFO.Print("[FT] Starting thread waiting to receive NewFileTransfer " + "E2E message.") for { @@ -282,9 +295,9 @@ func receiveNewFileTransfers(receive chan receivedFtResults, done, // the results to the log. func newReceiveProgressCB(tid *ftCrypto.TransferID, fileName string, done chan struct{}, receiveStart time.Time, - m ft.FileTransfer) ft.ReceivedProgressCallback { + m *ftE2e.Wrapper) ft.ReceivedProgressCallback { return func(completed bool, received, total uint16, - t ft.FilePartTracker, err error) { + rt ft.ReceivedTransfer, t ft.FilePartTracker, err error) { jww.INFO.Printf("[FT] Receive progress callback for transfer %s "+ "{completed: %t, received: %d, total: %d, err: %v}", tid, completed, received, total, err) diff --git a/cmd/group.go b/cmd/group.go index 1f7e1140887961f1d8e29c1524ad1e216a17016e..cdb43c51e2416102dc724531eac04a3e1af00f12 100644 --- a/cmd/group.go +++ b/cmd/group.go @@ -12,6 +12,9 @@ package cmd import ( "bufio" "fmt" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/primitives/format" "os" "time" @@ -48,8 +51,8 @@ var groupCmd = &cobra.Command{ // Wait until connected or crash on timeout connected := make(chan bool, 10) client.GetNetworkInterface().AddHealthCallback( - func(isconnected bool) { - connected <- isconnected + func(isConnected bool) { + connected <- isConnected }) waitUntilConnected(connected) @@ -115,9 +118,6 @@ var groupCmd = &cobra.Command{ func initGroupManager(client *api.Client) (groupChat.GroupChat, chan groupChat.MessageReceive, chan groupStore.Group) { recChan := make(chan groupChat.MessageReceive, 10) - receiveCb := func(msg groupChat.MessageReceive) { - recChan <- msg - } reqChan := make(chan groupStore.Group, 10) requestCb := func(g groupStore.Group) { @@ -128,7 +128,7 @@ func initGroupManager(client *api.Client) (groupChat.GroupChat, manager, err := groupChat.NewManager(client.GetNetworkInterface(), client.GetE2EHandler(), client.GetStorage().GetReceptionID(), client.GetRng(), client.GetStorage().GetE2EGroup(), - client.GetStorage().GetKV(), requestCb, receiveCb) + client.GetStorage().GetKV(), requestCb, &receiveProcessor{recChan}) if err != nil { jww.FATAL.Panicf("Failed to initialize group chat manager: %+v", err) } @@ -136,6 +136,19 @@ func initGroupManager(client *api.Client) (groupChat.GroupChat, return manager, recChan, reqChan } +type receiveProcessor struct { + recChan chan groupChat.MessageReceive +} + +func (r *receiveProcessor) Process(decryptedMsg groupChat.MessageReceive, + _ format.Message, _ receptionID.EphemeralIdentity, _ rounds.Round) { + r.recChan <- decryptedMsg +} + +func (r *receiveProcessor) String() string { + return "groupChatReceiveProcessor" +} + // createGroup creates a new group with the provided name and sends out requests // to the list of user IDs found at the given file path. func createGroup(name, msg []byte, filePath string, gm groupChat.GroupChat) { @@ -217,7 +230,7 @@ func sendGroup(groupIdString string, msg []byte, gm groupChat.GroupChat) { jww.INFO.Printf("Sending to group %s message %q", groupID, msg) - rid, timestamp, _, err := gm.Send(groupID, msg) + rid, timestamp, _, err := gm.Send(groupID, "groupChatTest", msg) if err != nil { jww.FATAL.Panicf("Sending message to group %s: %+v", groupID, err) }