Skip to content
Snippets Groups Projects
Commit c2e08173 authored by Jono Wenger's avatar Jono Wenger
Browse files

Fix file transfer integration test

parent 62150e9e
No related branches found
No related tags found
4 merge requests!510Release,!226WIP: Api2.0,!210XX-3880 / Generic File Transfer,!207WIP: Client Restructure
......@@ -16,7 +16,8 @@ import (
jww "github.com/spf13/jwalterweatherman"
"github.com/spf13/viper"
"gitlab.com/elixxir/client/api"
ft "gitlab.com/elixxir/client/fileTransfer"
ft "gitlab.com/elixxir/client/fileTransfer2"
ftE2e "gitlab.com/elixxir/client/fileTransfer2/e2e"
"gitlab.com/elixxir/crypto/contact"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/xx_network/primitives/id"
......@@ -55,8 +56,8 @@ var ftCmd = &cobra.Command{
// Wait until connected or crash on timeout
connected := make(chan bool, 10)
client.GetNetworkInterface().AddHealthCallback(
func(isconnected bool) {
connected <- isconnected
func(isConnected bool) {
connected <- isConnected
})
waitUntilConnected(connected)
......@@ -66,8 +67,8 @@ var ftCmd = &cobra.Command{
numReg, total, err = client.GetNodeRegistrationStatus()
if err != nil {
jww.FATAL.Panicf("Failed to get node registration status: %+v",
err)
jww.FATAL.Panicf(
"Failed to get node registration status: %+v", err)
}
jww.INFO.Printf("Registering with nodes (%d/%d)...", numReg, total)
......@@ -132,7 +133,7 @@ type receivedFtResults struct {
// reception callback. Returns the file transfer manager and the channel that
// will be triggered when the callback is called.
func initFileTransferManager(client *api.Client, maxThroughput int) (
ft.FileTransfer, chan receivedFtResults) {
*ftE2e.Wrapper, chan receivedFtResults) {
// Create interfaces.ReceiveCallback that returns the results on a channel
receiveChan := make(chan receivedFtResults, 100)
......@@ -150,8 +151,11 @@ func initFileTransferManager(client *api.Client, maxThroughput int) (
}
// Create new manager
// TODO: Fix NewManager parameters
manager, err := ft.NewManager(receiveCB, p, nil, nil, nil, nil, nil)
manager, err := ft.NewManager(p,
client.GetUser().ReceptionID,
client.GetNetworkInterface(),
client.GetStorage().GetKV(),
client.GetRng())
if err != nil {
jww.FATAL.Panicf(
"[FT] Failed to create new file transfer manager: %+v", err)
......@@ -163,12 +167,21 @@ func initFileTransferManager(client *api.Client, maxThroughput int) (
jww.FATAL.Panicf("[FT] Failed to start file transfer threads: %+v", err)
}
return manager, receiveChan
e2eParams := ftE2e.DefaultParams()
e2eFt, err := ftE2e.NewWrapper(receiveCB, e2eParams, manager,
client.GetUser().ReceptionID, client.GetE2EHandler(),
client.GetNetworkInterface())
if err != nil {
jww.FATAL.Panicf(
"[FT] Failed to create new e2e file transfer wrapper: %+v", err)
}
return e2eFt, receiveChan
}
// sendFile sends the file to the recipient and prints the progress.
func sendFile(filePath, fileType, filePreviewPath, filePreviewString,
recipientContactPath string, retry float32, m ft.FileTransfer,
recipientContactPath string, retry float32, m *ftE2e.Wrapper,
done chan struct{}) {
// Get file from path
......@@ -205,7 +218,7 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString,
// Create sent progress callback that prints the results
progressCB := func(completed bool, arrived, total uint16,
t ft.FilePartTracker, err error) {
st ft.SentTransfer, _ ft.FilePartTracker, err error) {
jww.INFO.Printf("[FT] Sent progress callback for %q "+
"{completed: %t, arrived: %d, total: %d, err: %v}",
fileName, completed, arrived, total, err)
......@@ -236,7 +249,7 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString,
sendStart = netTime.Now()
// Send the file
tid, err := m.Send(fileName, fileType, fileData, recipient.ID, retry,
tid, err := m.Send(recipient.ID, fileName, fileType, fileData, retry,
filePreviewData, progressCB, callbackPeriod)
if err != nil {
jww.FATAL.Panicf("[FT] Failed to send file %q to %s: %+v",
......@@ -251,7 +264,7 @@ func sendFile(filePath, fileType, filePreviewPath, filePreviewString,
// receiveNewFileTransfers waits to receive new file transfers and prints its
// information to the log.
func receiveNewFileTransfers(receive chan receivedFtResults, done,
quit chan struct{}, m ft.FileTransfer) {
quit chan struct{}, m *ftE2e.Wrapper) {
jww.INFO.Print("[FT] Starting thread waiting to receive NewFileTransfer " +
"E2E message.")
for {
......@@ -282,9 +295,9 @@ func receiveNewFileTransfers(receive chan receivedFtResults, done,
// the results to the log.
func newReceiveProgressCB(tid *ftCrypto.TransferID, fileName string,
done chan struct{}, receiveStart time.Time,
m ft.FileTransfer) ft.ReceivedProgressCallback {
m *ftE2e.Wrapper) ft.ReceivedProgressCallback {
return func(completed bool, received, total uint16,
t ft.FilePartTracker, err error) {
rt ft.ReceivedTransfer, t ft.FilePartTracker, err error) {
jww.INFO.Printf("[FT] Receive progress callback for transfer %s "+
"{completed: %t, received: %d, total: %d, err: %v}",
tid, completed, received, total, err)
......
......@@ -12,6 +12,9 @@ package cmd
import (
"bufio"
"fmt"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/primitives/format"
"os"
"time"
......@@ -48,8 +51,8 @@ var groupCmd = &cobra.Command{
// Wait until connected or crash on timeout
connected := make(chan bool, 10)
client.GetNetworkInterface().AddHealthCallback(
func(isconnected bool) {
connected <- isconnected
func(isConnected bool) {
connected <- isConnected
})
waitUntilConnected(connected)
......@@ -115,9 +118,6 @@ var groupCmd = &cobra.Command{
func initGroupManager(client *api.Client) (groupChat.GroupChat,
chan groupChat.MessageReceive, chan groupStore.Group) {
recChan := make(chan groupChat.MessageReceive, 10)
receiveCb := func(msg groupChat.MessageReceive) {
recChan <- msg
}
reqChan := make(chan groupStore.Group, 10)
requestCb := func(g groupStore.Group) {
......@@ -128,7 +128,7 @@ func initGroupManager(client *api.Client) (groupChat.GroupChat,
manager, err := groupChat.NewManager(client.GetNetworkInterface(),
client.GetE2EHandler(), client.GetStorage().GetReceptionID(),
client.GetRng(), client.GetStorage().GetE2EGroup(),
client.GetStorage().GetKV(), requestCb, receiveCb)
client.GetStorage().GetKV(), requestCb, &receiveProcessor{recChan})
if err != nil {
jww.FATAL.Panicf("Failed to initialize group chat manager: %+v", err)
}
......@@ -136,6 +136,19 @@ func initGroupManager(client *api.Client) (groupChat.GroupChat,
return manager, recChan, reqChan
}
type receiveProcessor struct {
recChan chan groupChat.MessageReceive
}
func (r *receiveProcessor) Process(decryptedMsg groupChat.MessageReceive,
_ format.Message, _ receptionID.EphemeralIdentity, _ rounds.Round) {
r.recChan <- decryptedMsg
}
func (r *receiveProcessor) String() string {
return "groupChatReceiveProcessor"
}
// createGroup creates a new group with the provided name and sends out requests
// to the list of user IDs found at the given file path.
func createGroup(name, msg []byte, filePath string, gm groupChat.GroupChat) {
......@@ -217,7 +230,7 @@ func sendGroup(groupIdString string, msg []byte, gm groupChat.GroupChat) {
jww.INFO.Printf("Sending to group %s message %q", groupID, msg)
rid, timestamp, _, err := gm.Send(groupID, msg)
rid, timestamp, _, err := gm.Send(groupID, "groupChatTest", msg)
if err != nil {
jww.FATAL.Panicf("Sending message to group %s: %+v", groupID, err)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment