From ae783ebda2fc63268b22cfcbfeba2d18485abe5a Mon Sep 17 00:00:00 2001 From: Jono Wenger <jono@elixxir.io> Date: Wed, 4 May 2022 15:00:26 -0700 Subject: [PATCH] Add more documentation --- fileTransfer2/e2e/listener.go | 4 +- fileTransfer2/e2e/manager.go | 136 --------------- fileTransfer2/e2e/params.go | 26 +++ fileTransfer2/e2e/utils_test.go | 3 +- fileTransfer2/e2e/wrapper.go | 159 ++++++++++++++++++ .../e2e/{manager_test.go => wrapper_test.go} | 27 ++- fileTransfer2/ftMessages.proto | 12 +- fileTransfer2/groupChat/manager.go | 126 -------------- fileTransfer2/groupChat/processor.go | 12 +- fileTransfer2/groupChat/send.go | 13 -- fileTransfer2/groupChat/wrapper.go | 136 +++++++++++++++ .../{manager_test.go => wrapper_test.go} | 26 +-- fileTransfer2/info.go | 5 + fileTransfer2/interface.go | 31 ++-- fileTransfer2/manager.go | 1 - groupChat/processor.go | 3 +- 16 files changed, 395 insertions(+), 325 deletions(-) delete mode 100644 fileTransfer2/e2e/manager.go create mode 100644 fileTransfer2/e2e/params.go create mode 100644 fileTransfer2/e2e/wrapper.go rename fileTransfer2/e2e/{manager_test.go => wrapper_test.go} (95%) delete mode 100644 fileTransfer2/groupChat/manager.go create mode 100644 fileTransfer2/groupChat/wrapper.go rename fileTransfer2/groupChat/{manager_test.go => wrapper_test.go} (93%) diff --git a/fileTransfer2/e2e/listener.go b/fileTransfer2/e2e/listener.go index 79aeed296..342b39c95 100644 --- a/fileTransfer2/e2e/listener.go +++ b/fileTransfer2/e2e/listener.go @@ -26,7 +26,7 @@ const listenerName = "NewFileTransferListener-E2E" // listener waits for a message indicating a new file transfer is starting. // Adheres to the receive.Listener interface. type listener struct { - m *Manager + m *Wrapper } // Hear is called when a new file transfer is received. It creates a new @@ -41,7 +41,7 @@ func (l *listener) Hear(msg receive.Message) { } // Add new transfer to start receiving parts - tid, err := l.m.HandleIncomingTransfer(info.FileName, &info.Key, info.Mac, + tid, err := l.m.ft.HandleIncomingTransfer(info.FileName, &info.Key, info.Mac, info.NumParts, info.Size, info.Retry, nil, 0) if err != nil { jww.ERROR.Printf(errNewReceivedTransfer, info.FileName, err) diff --git a/fileTransfer2/e2e/manager.go b/fileTransfer2/e2e/manager.go deleted file mode 100644 index 34f886fc5..000000000 --- a/fileTransfer2/e2e/manager.go +++ /dev/null @@ -1,136 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 xx network SEZC // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file // -//////////////////////////////////////////////////////////////////////////////// - -package e2e - -import ( - "gitlab.com/elixxir/client/catalog" - "gitlab.com/elixxir/client/e2e" - "gitlab.com/elixxir/client/e2e/receive" - ft "gitlab.com/elixxir/client/fileTransfer2" - "gitlab.com/elixxir/client/stoppable" - e2eCrypto "gitlab.com/elixxir/crypto/e2e" - ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" - "gitlab.com/xx_network/primitives/id" - "time" -) - -// Manager handles the sending and receiving of file transfers using E2E -// messages to inform the recipient of incoming file transfers. -type Manager struct { - // Callback that is called every time a new file transfer is received - receiveCB ft.ReceiveCallback - - // File transfer Manager - ft ft.FileTransfer - - myID *id.ID - cmix ft.Cmix - e2e E2e -} - -// E2e interface matches a subset of the e2e.Handler methods used by the Manager -// for easier testing. -type E2e interface { - SendE2E(mt catalog.MessageType, recipient *id.ID, payload []byte, - params e2e.Params) ([]id.Round, e2eCrypto.MessageID, time.Time, error) - RegisterListener(senderID *id.ID, messageType catalog.MessageType, - newListener receive.Listener) receive.ListenerID -} - -// NewManager generates a new file transfer manager using E2E. -func NewManager(receiveCB ft.ReceiveCallback, ft ft.FileTransfer, myID *id.ID, - e2e E2e, cmix ft.Cmix) (*Manager, error) { - return &Manager{ - receiveCB: receiveCB, - ft: ft, - myID: myID, - cmix: cmix, - e2e: e2e, - }, nil -} - -func (m *Manager) StartProcesses() (stoppable.Stoppable, error) { - // Register listener to receive new file transfers - m.e2e.RegisterListener(m.myID, catalog.NewFileTransfer, &listener{m}) - - return m.ft.StartProcesses() -} - -func (m *Manager) MaxFileNameLen() int { - return m.ft.MaxFileNameLen() -} - -func (m *Manager) MaxFileTypeLen() int { - return m.ft.MaxFileTypeLen() -} - -func (m *Manager) MaxFileSize() int { - return m.ft.MaxFileSize() -} - -func (m *Manager) MaxPreviewSize() int { - return m.ft.MaxPreviewSize() -} - -func (m *Manager) Send(recipient *id.ID, fileName, fileType string, - fileData []byte, retry float32, preview []byte, - progressCB ft.SentProgressCallback, period time.Duration) ( - *ftCrypto.TransferID, error) { - - sendNew := func(info *ft.TransferInfo) error { - return sendNewFileTransferMessage(recipient, info, m.e2e) - } - - modifiedProgressCB := m.addEndMessageToCallback(progressCB) - - return m.ft.Send(recipient, fileName, fileType, fileData, retry, preview, - modifiedProgressCB, period, sendNew) -} - -func (m *Manager) RegisterSentProgressCallback(tid *ftCrypto.TransferID, - progressCB ft.SentProgressCallback, period time.Duration) error { - - modifiedProgressCB := m.addEndMessageToCallback(progressCB) - - return m.ft.RegisterSentProgressCallback(tid, modifiedProgressCB, period) -} - -func (m *Manager) addEndMessageToCallback(progressCB ft.SentProgressCallback) ft.SentProgressCallback { - return func(completed bool, arrived, total uint16, - st ft.SentTransfer, t ft.FilePartTracker, err error) { - - // If the transfer is completed, send last message informing recipient - if completed { - sendEndFileTransferMessage(st.Recipient(), m.cmix, m.e2e) - } - - progressCB(completed, arrived, total, st, t, err) - } -} - -func (m *Manager) CloseSend(tid *ftCrypto.TransferID) error { - return m.ft.CloseSend(tid) -} - -func (m *Manager) HandleIncomingTransfer(fileName string, - key *ftCrypto.TransferKey, transferMAC []byte, numParts uint16, size uint32, - retry float32, progressCB ft.ReceivedProgressCallback, - period time.Duration) (*ftCrypto.TransferID, error) { - - return m.ft.HandleIncomingTransfer( - fileName, key, transferMAC, numParts, size, retry, progressCB, period) -} - -func (m *Manager) RegisterReceivedProgressCallback(tid *ftCrypto.TransferID, - progressCB ft.ReceivedProgressCallback, period time.Duration) error { - return m.ft.RegisterReceivedProgressCallback(tid, progressCB, period) -} - -func (m *Manager) Receive(tid *ftCrypto.TransferID) ([]byte, error) { - return m.ft.Receive(tid) -} diff --git a/fileTransfer2/e2e/params.go b/fileTransfer2/e2e/params.go new file mode 100644 index 000000000..254e7d3a4 --- /dev/null +++ b/fileTransfer2/e2e/params.go @@ -0,0 +1,26 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package e2e + +const ( + defaultNotifyUponCompletion = true +) + +// Params contains parameters used for E2E file transfer. +type Params struct { + // NotifyUponCompletion indicates if a final notification message is sent + // to the recipient on completion of file transfer. If true, the ping is + NotifyUponCompletion bool +} + +// DefaultParams returns a Params object filled with the default values. +func DefaultParams() Params { + return Params{ + NotifyUponCompletion: defaultNotifyUponCompletion, + } +} diff --git a/fileTransfer2/e2e/utils_test.go b/fileTransfer2/e2e/utils_test.go index 18a534d52..ab49a4bc9 100644 --- a/fileTransfer2/e2e/utils_test.go +++ b/fileTransfer2/e2e/utils_test.go @@ -186,7 +186,8 @@ func (m *mockE2e) SendE2E(mt catalog.MessageType, recipient *id.ID, payload []by func (m *mockE2e) RegisterListener(senderID *id.ID, mt catalog.MessageType, listener receive.Listener) receive.ListenerID { if _, exists := m.handler.listeners[*senderID]; !exists { - m.handler.listeners[*senderID] = map[catalog.MessageType]receive.Listener{mt: listener} + m.handler.listeners[*senderID] = + map[catalog.MessageType]receive.Listener{mt: listener} } else if _, exists = m.handler.listeners[*senderID][mt]; !exists { m.handler.listeners[*senderID][mt] = listener } diff --git a/fileTransfer2/e2e/wrapper.go b/fileTransfer2/e2e/wrapper.go new file mode 100644 index 000000000..d361dd052 --- /dev/null +++ b/fileTransfer2/e2e/wrapper.go @@ -0,0 +1,159 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package e2e + +import ( + "gitlab.com/elixxir/client/catalog" + "gitlab.com/elixxir/client/e2e" + "gitlab.com/elixxir/client/e2e/receive" + ft "gitlab.com/elixxir/client/fileTransfer2" + e2eCrypto "gitlab.com/elixxir/crypto/e2e" + ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" + "gitlab.com/xx_network/primitives/id" + "time" +) + +// Wrapper handles the sending and receiving of file transfers using E2E +// messages to inform the recipient of incoming file transfers. +type Wrapper struct { + // Callback that is called every time a new file transfer is received + receiveCB ft.ReceiveCallback + + // File transfer Manager + ft ft.FileTransfer + + // Params for wrapper + p Params + + myID *id.ID + cmix ft.Cmix + e2e E2e +} + +// E2e interface matches a subset of the e2e.Handler methods used by the Wrapper +// for easier testing. +type E2e interface { + SendE2E(mt catalog.MessageType, recipient *id.ID, payload []byte, + params e2e.Params) ([]id.Round, e2eCrypto.MessageID, time.Time, error) + RegisterListener(senderID *id.ID, messageType catalog.MessageType, + newListener receive.Listener) receive.ListenerID +} + +// NewWrapper generates a new file transfer manager using E2E. +func NewWrapper(receiveCB ft.ReceiveCallback, p Params, ft ft.FileTransfer, + myID *id.ID, e2e E2e, cmix ft.Cmix) (*Wrapper, error) { + w := &Wrapper{ + receiveCB: receiveCB, + ft: ft, + p: p, + myID: myID, + cmix: cmix, + e2e: e2e, + } + + // Register listener to receive new file transfers + w.e2e.RegisterListener(w.myID, catalog.NewFileTransfer, &listener{w}) + + return w, nil +} + +// MaxFileNameLen returns the max number of bytes allowed for a file name. +func (w *Wrapper) MaxFileNameLen() int { + return w.ft.MaxFileNameLen() +} + +// MaxFileTypeLen returns the max number of bytes allowed for a file type. +func (w *Wrapper) MaxFileTypeLen() int { + return w.ft.MaxFileTypeLen() +} + +// MaxFileSize returns the max number of bytes allowed for a file. +func (w *Wrapper) MaxFileSize() int { + return w.ft.MaxFileSize() +} + +// MaxPreviewSize returns the max number of bytes allowed for a file preview. +func (w *Wrapper) MaxPreviewSize() int { + return w.ft.MaxPreviewSize() +} + +// Send initiates the sending of a file to a recipient and returns a transfer ID +// that uniquely identifies this file transfer. The initial and final messages +// are sent via E2E. +func (w *Wrapper) Send(recipient *id.ID, fileName, fileType string, + fileData []byte, retry float32, preview []byte, + progressCB ft.SentProgressCallback, period time.Duration) ( + *ftCrypto.TransferID, error) { + + sendNew := func(info *ft.TransferInfo) error { + return sendNewFileTransferMessage(recipient, info, w.e2e) + } + + modifiedProgressCB := w.addEndMessageToCallback(progressCB) + + return w.ft.Send(recipient, fileName, fileType, fileData, retry, preview, + modifiedProgressCB, period, sendNew) +} + +// RegisterSentProgressCallback allows for the registration of a callback to +// track the progress of an individual sent file transfer. +func (w *Wrapper) RegisterSentProgressCallback(tid *ftCrypto.TransferID, + progressCB ft.SentProgressCallback, period time.Duration) error { + + modifiedProgressCB := w.addEndMessageToCallback(progressCB) + + return w.ft.RegisterSentProgressCallback(tid, modifiedProgressCB, period) +} + +// addEndMessageToCallback adds the sending of an E2E message when the transfer +// completed to the callback. If NotifyUponCompletion is not set, then the +// message is not sent. +func (w *Wrapper) addEndMessageToCallback(progressCB ft.SentProgressCallback) ft.SentProgressCallback { + if !w.p.NotifyUponCompletion { + return progressCB + } + return func(completed bool, arrived, total uint16, + st ft.SentTransfer, t ft.FilePartTracker, err error) { + + // If the transfer is completed, send last message informing recipient + if completed { + sendEndFileTransferMessage(st.Recipient(), w.cmix, w.e2e) + } + + progressCB(completed, arrived, total, st, t, err) + } +} + +// CloseSend deletes a file from the internal storage once a transfer has +// completed or reached the retry limit. Returns an error if the transfer +// has not run out of retries. +// +// This function should be called once a transfer completes or errors out +// (as reported by the progress callback). +func (w *Wrapper) CloseSend(tid *ftCrypto.TransferID) error { + return w.ft.CloseSend(tid) +} + +// RegisterReceivedProgressCallback allows for the registration of a callback to +// track the progress of an individual received file transfer. This must be done +// when a new transfer is received on the ReceiveCallback. +func (w *Wrapper) RegisterReceivedProgressCallback(tid *ftCrypto.TransferID, + progressCB ft.ReceivedProgressCallback, period time.Duration) error { + return w.ft.RegisterReceivedProgressCallback(tid, progressCB, period) +} + +// Receive returns the full file on the completion of the transfer. +// It deletes internal references to the data and unregisters any attached +// progress callback. Returns an error if the transfer is not complete, the +// full file cannot be verified, or if the transfer cannot be found. +// +// Receive can only be called once the progress callback returns that the +// file transfer is complete. +func (w *Wrapper) Receive(tid *ftCrypto.TransferID) ([]byte, error) { + return w.ft.Receive(tid) +} diff --git a/fileTransfer2/e2e/manager_test.go b/fileTransfer2/e2e/wrapper_test.go similarity index 95% rename from fileTransfer2/e2e/manager_test.go rename to fileTransfer2/e2e/wrapper_test.go index 50c551aa5..ffe08e2fb 100644 --- a/fileTransfer2/e2e/manager_test.go +++ b/fileTransfer2/e2e/wrapper_test.go @@ -36,8 +36,9 @@ func Test_FileTransfer_Smoke(t *testing.T) { cMixHandler := newMockCmixHandler() e2eHandler := newMockE2eHandler() rngGen := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG) - params := ft.DefaultParams() - params.MaxThroughput = math.MaxInt + ftParams := ft.DefaultParams() + ftParams.MaxThroughput = math.MaxInt + params := DefaultParams() type receiveCbValues struct { tid *ftCrypto.TransferID @@ -62,18 +63,17 @@ func Test_FileTransfer_Smoke(t *testing.T) { e2e1.RegisterListener( myID1, catalog.EndFileTransfer, newMockListener(endE2eChan1)) cmix1 := newMockCmix(myID1, cMixHandler) - ftManager1, err := ft.NewManager(params, myID1, cmix1, kv1, rngGen) + ftManager1, err := ft.NewManager(ftParams, myID1, cmix1, kv1, rngGen) if err != nil { t.Errorf("Failed to make new file transfer manager: %+v", err) } - m1, err := NewManager(receiveCB1, ftManager1, myID1, e2e1, cmix1) + stop1, err := ftManager1.StartProcesses() if err != nil { - t.Errorf("Failed to create new file transfer manager 1: %+v", err) + t.Errorf("Failed to start processes for manager 1: %+v", err) } - - stop1, err := m1.StartProcesses() + m1, err := NewWrapper(receiveCB1, params, ftManager1, myID1, e2e1, cmix1) if err != nil { - t.Errorf("Failed to start processes for manager 1: %+v", err) + t.Errorf("Failed to create new file transfer manager 1: %+v", err) } // Set up the second client @@ -90,18 +90,17 @@ func Test_FileTransfer_Smoke(t *testing.T) { e2e2.RegisterListener( myID2, catalog.EndFileTransfer, newMockListener(endE2eChan2)) cmix2 := newMockCmix(myID1, cMixHandler) - ftManager2, err := ft.NewManager(params, myID2, cmix2, kv2, rngGen) + ftManager2, err := ft.NewManager(ftParams, myID2, cmix2, kv2, rngGen) if err != nil { t.Errorf("Failed to make new file transfer manager: %+v", err) } - m2, err := NewManager(receiveCB2, ftManager2, myID2, e2e2, cmix2) + stop2, err := ftManager2.StartProcesses() if err != nil { - t.Errorf("Failed to create new file transfer manager 2: %+v", err) + t.Errorf("Failed to start processes for manager 2: %+v", err) } - - stop2, err := m2.StartProcesses() + m2, err := NewWrapper(receiveCB2, params, ftManager2, myID2, e2e2, cmix2) if err != nil { - t.Errorf("Failed to start processes for manager 2: %+v", err) + t.Errorf("Failed to create new file transfer manager 2: %+v", err) } // Wait group prevents the test from quiting before the file has completed diff --git a/fileTransfer2/ftMessages.proto b/fileTransfer2/ftMessages.proto index b846e5b1f..83b9f82c7 100644 --- a/fileTransfer2/ftMessages.proto +++ b/fileTransfer2/ftMessages.proto @@ -13,12 +13,12 @@ option go_package = "fileTransfer2"; // NewFileTransfer is transmitted first on the initialization of a file transfer // to inform the receiver about the incoming file. message NewFileTransfer { - string fileName = 1; // Name of the file - string fileType = 2; // String that indicates type of file + string fileName = 1; // Name of the file + string fileType = 2; // String that indicates type of file bytes transferKey = 3; // 256-bit encryption key bytes transferMac = 4; // 256-bit MAC of the entire file - uint32 numParts = 5; // Number of file parts - uint32 size = 6; // The size of the file, in bytes - float retry = 7; // Determines how many times to retry sending - bytes preview = 8; // A preview of the file + uint32 numParts = 5; // Number of file parts + uint32 size = 6; // The size of the file, in bytes + float retry = 7; // Determines how many times to retry sending + bytes preview = 8; // A preview of the file } \ No newline at end of file diff --git a/fileTransfer2/groupChat/manager.go b/fileTransfer2/groupChat/manager.go deleted file mode 100644 index 49e2381fe..000000000 --- a/fileTransfer2/groupChat/manager.go +++ /dev/null @@ -1,126 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 xx network SEZC // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file // -//////////////////////////////////////////////////////////////////////////////// - -package groupChat - -import ( - "github.com/pkg/errors" - ft "gitlab.com/elixxir/client/fileTransfer2" - "gitlab.com/elixxir/client/groupChat" - "gitlab.com/elixxir/client/stoppable" - ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" - "gitlab.com/elixxir/crypto/group" - "gitlab.com/xx_network/primitives/id" - "time" -) - -// Error messages. -const ( - // Manager.StartProcesses - errAddNewService = "failed to add service to receive new group file transfers: %+v" -) - -const ( - // Tag used when sending/receiving new group chat file transfers message - newFileTransferTag = "NewGroupFileTransfer" - - // Tag used when sending/receiving end group chat file transfers message - endFileTransferTag = "EndGroupFileTransfer" -) - -// Manager handles the sending and receiving of file transfers for group chats. -type Manager struct { - // Callback that is called every time a new file transfer is received - receiveCB ft.ReceiveCallback - - // File transfer Manager - ft ft.FileTransfer - - // Group chat Manager - gc GroupChat -} - -// GroupChat interface matches a subset of the groupChat.GroupChat methods used -// by the Manager for easier testing. -type GroupChat interface { - Send(groupID *id.ID, tag string, message []byte) ( - id.Round, time.Time, group.MessageID, error) - AddService(tag string, p groupChat.Processor) error -} - -// NewManager generates a new file transfer Manager for group chat. -func NewManager(receiveCB ft.ReceiveCallback, ft ft.FileTransfer, gc GroupChat) ( - *Manager, error) { - return &Manager{ - receiveCB: receiveCB, - ft: ft, - gc: gc, - }, nil -} - -func (m *Manager) StartProcesses() (stoppable.Stoppable, error) { - err := m.gc.AddService(newFileTransferTag, &processor{m}) - if err != nil { - return nil, errors.Errorf(errAddNewService, err) - } - - return m.ft.StartProcesses() -} - -func (m *Manager) MaxFileNameLen() int { - return m.ft.MaxFileNameLen() -} - -func (m *Manager) MaxFileTypeLen() int { - return m.ft.MaxFileTypeLen() -} - -func (m *Manager) MaxFileSize() int { - return m.ft.MaxFileSize() -} - -func (m *Manager) MaxPreviewSize() int { - return m.ft.MaxPreviewSize() -} - -func (m *Manager) Send(groupID *id.ID, fileName, fileType string, - fileData []byte, retry float32, preview []byte, - progressCB ft.SentProgressCallback, period time.Duration) ( - *ftCrypto.TransferID, error) { - sendNew := func(info *ft.TransferInfo) error { - return sendNewFileTransferMessage(groupID, info, m.gc) - } - - return m.ft.Send(groupID, fileName, fileType, fileData, retry, preview, - progressCB, period, sendNew) -} - -func (m *Manager) RegisterSentProgressCallback(tid *ftCrypto.TransferID, - progressCB ft.SentProgressCallback, period time.Duration) error { - return m.ft.RegisterSentProgressCallback(tid, progressCB, period) -} - -func (m *Manager) CloseSend(tid *ftCrypto.TransferID) error { - return m.ft.CloseSend(tid) -} - -func (m *Manager) HandleIncomingTransfer(fileName string, - key *ftCrypto.TransferKey, transferMAC []byte, numParts uint16, size uint32, - retry float32, progressCB ft.ReceivedProgressCallback, - period time.Duration) (*ftCrypto.TransferID, error) { - return m.ft.HandleIncomingTransfer( - fileName, key, transferMAC, numParts, size, retry, progressCB, period) -} - -func (m *Manager) RegisterReceivedProgressCallback(tid *ftCrypto.TransferID, - progressCB ft.ReceivedProgressCallback, period time.Duration) error { - return m.ft.RegisterReceivedProgressCallback(tid, progressCB, period) -} - -func (m *Manager) Receive(tid *ftCrypto.TransferID) ([]byte, error) { - return m.ft.Receive(tid) -} diff --git a/fileTransfer2/groupChat/processor.go b/fileTransfer2/groupChat/processor.go index d72dcdd91..0aa7d2b23 100644 --- a/fileTransfer2/groupChat/processor.go +++ b/fileTransfer2/groupChat/processor.go @@ -23,10 +23,16 @@ const ( errNewReceivedTransfer = "[FT] Failed to add new received transfer for %q: %+v" ) +// processor processes the incoming E2E new file transfer messages to start +// receiving a new file transfer. Adheres to the Processor interface. type processor struct { - *Manager + *Wrapper } +// Process receives new file transfer messages and registers it with the file +// transfer manager. Then the caller is notified of the file transfer via the +// reception callback. It is the responsibility of the caller to register a +// progress callback. func (p *processor) Process(decryptedMsg groupChat.MessageReceive, _ format.Message, _ receptionID.EphemeralIdentity, _ rounds.Round) { // Unmarshal the request message @@ -37,7 +43,7 @@ func (p *processor) Process(decryptedMsg groupChat.MessageReceive, } // Add new transfer to start receiving parts - tid, err := p.HandleIncomingTransfer(info.FileName, &info.Key, info.Mac, + tid, err := p.ft.HandleIncomingTransfer(info.FileName, &info.Key, info.Mac, info.NumParts, info.Size, info.Retry, nil, 0) if err != nil { jww.ERROR.Printf(errNewReceivedTransfer, info.FileName, err) @@ -49,6 +55,8 @@ func (p *processor) Process(decryptedMsg groupChat.MessageReceive, info.Size, info.Preview) } +// String returns a human-readable identifier for this processor. Adheres to +// the fmt.Stringer interface. func (p *processor) String() string { return "GroupFileTransfer" } diff --git a/fileTransfer2/groupChat/send.go b/fileTransfer2/groupChat/send.go index aabc56003..980aad7ee 100644 --- a/fileTransfer2/groupChat/send.go +++ b/fileTransfer2/groupChat/send.go @@ -9,7 +9,6 @@ package groupChat import ( "github.com/pkg/errors" - jww "github.com/spf13/jwalterweatherman" ft "gitlab.com/elixxir/client/fileTransfer2" "gitlab.com/xx_network/primitives/id" ) @@ -19,9 +18,6 @@ const ( // sendNewFileTransferMessage errMarshalInfo = "failed to marshal new transfer info: %+v" errNewFtSendGroupChat = "failed to send initial file transfer message via group chat: %+v" - - // sendEndFileTransferMessage - errEndFtSendGroupChat = "[FT] Failed to send ending file transfer message via group chat: %+v" ) // sendNewFileTransferMessage sends a group chat message to the group ID @@ -43,12 +39,3 @@ func sendNewFileTransferMessage( return nil } - -// sendEndFileTransferMessage sends a group chat message to the group ID -// informing them that all file parts have arrived once the network is healthy. -func sendEndFileTransferMessage(groupID *id.ID, gc GroupChat) { - _, _, _, err := gc.Send(groupID, endFileTransferTag, nil) - if err != nil { - jww.ERROR.Printf(errEndFtSendGroupChat, err) - } -} diff --git a/fileTransfer2/groupChat/wrapper.go b/fileTransfer2/groupChat/wrapper.go new file mode 100644 index 000000000..09fff399e --- /dev/null +++ b/fileTransfer2/groupChat/wrapper.go @@ -0,0 +1,136 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package groupChat + +import ( + "github.com/pkg/errors" + ft "gitlab.com/elixxir/client/fileTransfer2" + "gitlab.com/elixxir/client/groupChat" + ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" + "gitlab.com/elixxir/crypto/group" + "gitlab.com/xx_network/primitives/id" + "time" +) + +// Error messages. +const ( + // Wrapper.StartProcesses + errAddNewService = "failed to add service to receive new group file transfers: %+v" +) + +const ( + // Tag used when sending/receiving new group chat file transfers message + newFileTransferTag = "NewGroupFileTransfer" +) + +// Wrapper handles the sending and receiving of file transfers for group chats. +type Wrapper struct { + // Callback that is called every time a new file transfer is received + receiveCB ft.ReceiveCallback + + // File transfer Manager + ft ft.FileTransfer + + // Group chat Manager + gc GroupChat +} + +// GroupChat interface matches a subset of the groupChat.GroupChat methods used +// by the Wrapper for easier testing. +type GroupChat interface { + Send(groupID *id.ID, tag string, message []byte) ( + id.Round, time.Time, group.MessageID, error) + AddService(tag string, p groupChat.Processor) error +} + +// NewWrapper generates a new file transfer Wrapper for group chat. +func NewWrapper(receiveCB ft.ReceiveCallback, ft ft.FileTransfer, gc GroupChat) ( + *Wrapper, error) { + w := &Wrapper{ + receiveCB: receiveCB, + ft: ft, + gc: gc, + } + + err := w.gc.AddService(newFileTransferTag, &processor{w}) + if err != nil { + return nil, errors.Errorf(errAddNewService, err) + } + + return w, nil +} + +// MaxFileNameLen returns the max number of bytes allowed for a file name. +func (w *Wrapper) MaxFileNameLen() int { + return w.ft.MaxFileNameLen() +} + +// MaxFileTypeLen returns the max number of bytes allowed for a file type. +func (w *Wrapper) MaxFileTypeLen() int { + return w.ft.MaxFileTypeLen() +} + +// MaxFileSize returns the max number of bytes allowed for a file. +func (w *Wrapper) MaxFileSize() int { + return w.ft.MaxFileSize() +} + +// MaxPreviewSize returns the max number of bytes allowed for a file preview. +func (w *Wrapper) MaxPreviewSize() int { + return w.ft.MaxPreviewSize() +} + +// Send initiates the sending of a file to a group and returns a transfer ID +// that uniquely identifies this file transfer. +func (w *Wrapper) Send(groupID *id.ID, fileName, fileType string, + fileData []byte, retry float32, preview []byte, + progressCB ft.SentProgressCallback, period time.Duration) ( + *ftCrypto.TransferID, error) { + sendNew := func(info *ft.TransferInfo) error { + return sendNewFileTransferMessage(groupID, info, w.gc) + } + + return w.ft.Send(groupID, fileName, fileType, fileData, retry, preview, + progressCB, period, sendNew) +} + +// RegisterSentProgressCallback allows for the registration of a callback to +// track the progress of an individual sent file transfer. +func (w *Wrapper) RegisterSentProgressCallback(tid *ftCrypto.TransferID, + progressCB ft.SentProgressCallback, period time.Duration) error { + return w.ft.RegisterSentProgressCallback(tid, progressCB, period) +} + +// CloseSend deletes a file from the internal storage once a transfer has +// completed or reached the retry limit. Returns an error if the transfer +// has not run out of retries. +// +// This function should be called once a transfer completes or errors out +// (as reported by the progress callback). +func (w *Wrapper) CloseSend(tid *ftCrypto.TransferID) error { + return w.ft.CloseSend(tid) +} + +// RegisterReceivedProgressCallback allows for the registration of a callback to +// track the progress of an individual received file transfer. This must be done +// when a new transfer is received on the ReceiveCallback. +func (w *Wrapper) RegisterReceivedProgressCallback(tid *ftCrypto.TransferID, + progressCB ft.ReceivedProgressCallback, period time.Duration) error { + return w.ft.RegisterReceivedProgressCallback(tid, progressCB, period) +} + +// Receive returns the full file on the completion of the transfer. +// It deletes internal references to the data and unregisters any attached +// progress callback. Returns an error if the transfer is not complete, the +// full file cannot be verified, or if the transfer cannot be found. +// +// Receive can only be called once the progress callback returns that the +// file transfer is complete. +func (w *Wrapper) Receive(tid *ftCrypto.TransferID) ([]byte, error) { + return w.ft.Receive(tid) +} diff --git a/fileTransfer2/groupChat/manager_test.go b/fileTransfer2/groupChat/wrapper_test.go similarity index 93% rename from fileTransfer2/groupChat/manager_test.go rename to fileTransfer2/groupChat/wrapper_test.go index f11cc8dba..d9de19fbc 100644 --- a/fileTransfer2/groupChat/manager_test.go +++ b/fileTransfer2/groupChat/wrapper_test.go @@ -24,7 +24,7 @@ import ( "time" ) -// Tests that E2e adheres to the e2e.Handler interface. +// Tests that GroupChat adheres to the groupChat.GroupChat interface. var _ GroupChat = (groupChat.GroupChat)(nil) // Smoke test of the entire file transfer system. @@ -52,14 +52,16 @@ func Test_FileTransfer_Smoke(t *testing.T) { gc1 := newMockGC(gcHandler) ftManager1, err := ft.NewManager( params, myID1, newMockCmix(myID1, cMixHandler), kv1, rngGen) - m1, err := NewManager(nil, ftManager1, gc1) if err != nil { - t.Errorf("Failed to create new file transfer manager 1: %+v", err) + t.Errorf("Failed to create file transfer manager 2: %+v", err) } - - stop1, err := m1.StartProcesses() + stop1, err := ftManager1.StartProcesses() if err != nil { - t.Errorf("Failed to start processes for manager 1: %+v", err) + t.Errorf("Failed to start file transfer processes for manager 1: %+v", err) + } + m1, err := NewWrapper(nil, ftManager1, gc1) + if err != nil { + t.Errorf("Failed to create new file transfer manager 1: %+v", err) } // Set up the second client @@ -74,14 +76,16 @@ func Test_FileTransfer_Smoke(t *testing.T) { gc2 := newMockGC(gcHandler) ftManager2, err := ft.NewManager( params, myID2, newMockCmix(myID2, cMixHandler), kv2, rngGen) - m2, err := NewManager(receiveCB2, ftManager2, gc2) if err != nil { - t.Errorf("Failed to create new file transfer manager 2: %+v", err) + t.Errorf("Failed to create file transfer manager 2: %+v", err) } - - stop2, err := m2.StartProcesses() + stop2, err := ftManager2.StartProcesses() if err != nil { - t.Errorf("Failed to start processes for manager 2: %+v", err) + t.Errorf("Failed to start file transfer processes for manager 2: %+v", err) + } + m2, err := NewWrapper(receiveCB2, ftManager2, gc2) + if err != nil { + t.Errorf("Failed to create new file transfer manager 2: %+v", err) } // Wait group prevents the test from quiting before the file has completed diff --git a/fileTransfer2/info.go b/fileTransfer2/info.go index 8a5da9672..35068b534 100644 --- a/fileTransfer2/info.go +++ b/fileTransfer2/info.go @@ -12,6 +12,9 @@ import ( ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" ) +// TransferInfo contains all the information for a new transfer. This is the +// information sent in the initial file transfer so the recipient can prepare +// for the incoming file transfer parts. type TransferInfo struct { FileName string // Name of the file FileType string // String that indicates type of file @@ -23,6 +26,7 @@ type TransferInfo struct { Preview []byte // A preview of the file } +// Marshal serialises the TransferInfo for sending over the network. func (ti *TransferInfo) Marshal() ([]byte, error) { // Construct NewFileTransfer message protoMsg := &NewFileTransfer{ @@ -39,6 +43,7 @@ func (ti *TransferInfo) Marshal() ([]byte, error) { return proto.Marshal(protoMsg) } +// UnmarshalTransferInfo deserializes the TransferInfo. func UnmarshalTransferInfo(data []byte) (*TransferInfo, error) { // Unmarshal the request message var newFT NewFileTransfer diff --git a/fileTransfer2/interface.go b/fileTransfer2/interface.go index 7953954b4..566e0166a 100644 --- a/fileTransfer2/interface.go +++ b/fileTransfer2/interface.go @@ -37,10 +37,12 @@ type SendNew func(info *TransferInfo) error // FileTransfer facilities the sending and receiving of large file transfers. // It allows for progress tracking of both inbound and outbound transfers. +// FileTransfer handles the sending of the file data; however, the caller is +// responsible for communicating to the recipient of the incoming file transfer. type FileTransfer interface { - // StartProcesses starts the listening for new file transfer messages and - // starts the sending threads that wait for transfers to send. + // StartProcesses starts the sending threads that wait for file transfers to + // send. Adheres to the api.Service type. StartProcesses() (stoppable.Stoppable, error) // MaxFileNameLen returns the max number of bytes allowed for a file name. @@ -57,10 +59,11 @@ type FileTransfer interface { MaxPreviewSize() int /* === Sending ========================================================== */ - /* The processes of sending a file involves three main steps: - 1. Sending the file using Send - 2. Receiving transfer progress - 3. Closing a finished send using CloseSend + /* The processes of sending a file involves four main steps: + 1. Set up a method to send initial file transfer details using SendNew. + 2. Sending the file using Send and register a progress callback. + 3. Receiving transfer progress on the progress callback. + 4. Closing a finished send using CloseSend. Once the file is sent, it is broken into individual, equal-length parts and sent to the recipient. Every time one of these parts arrives, it is @@ -129,10 +132,14 @@ type FileTransfer interface { CloseSend(tid *ftCrypto.TransferID) error /* === Receiving ======================================================== */ - /* The processes of receiving a file involves three main steps: - 1. Receiving a new file transfer on ReceiveCallback - 2. Receiving transfer progress - 3. Receiving the complete file using Receive + /* The processes of receiving a file involves four main steps: + 1. Receiving a new file transfer through a channel set up by the + caller. + 2. Registering the file transfer and a progress callback with + HandleIncomingTransfer. + 3. Receiving transfer progress on the progress callback. + 4. Receiving the complete file using Receive once the callback says + the transfer is complete. Once the file transfer manager has started, it will call the ReceiveCallback for every new file transfer that is received. Once that @@ -158,8 +165,8 @@ type FileTransfer interface { // update (or less if restricted by the period), or on fatal error. // period - A progress callback will be limited from triggering only once // per period. - HandleIncomingTransfer(fileName string, key *ftCrypto.TransferKey, transferMAC []byte, - numParts uint16, size uint32, retry float32, + HandleIncomingTransfer(fileName string, key *ftCrypto.TransferKey, + transferMAC []byte, numParts uint16, size uint32, retry float32, progressCB ReceivedProgressCallback, period time.Duration) ( *ftCrypto.TransferID, error) diff --git a/fileTransfer2/manager.go b/fileTransfer2/manager.go index 1f6c06e19..704ad22a2 100644 --- a/fileTransfer2/manager.go +++ b/fileTransfer2/manager.go @@ -189,7 +189,6 @@ func NewManager(params Params, // StartProcesses starts the sending threads. Adheres to the api.Service type. func (m *manager) StartProcesses() (stoppable.Stoppable, error) { - // Construct stoppables multiStop := stoppable.NewMulti(workerPoolStoppable) batchBuilderStop := stoppable.NewSingle(batchBuilderThreadStoppable) diff --git a/groupChat/processor.go b/groupChat/processor.go index 1047889f6..fa1157557 100644 --- a/groupChat/processor.go +++ b/groupChat/processor.go @@ -14,12 +14,13 @@ import ( "gitlab.com/elixxir/primitives/format" ) +// Processor manages the handling of received group chat messages. type Processor interface { // Process decrypts and hands off the message to its internal down stream // message processing system. Process(decryptedMsg MessageReceive, msg format.Message, receptionID receptionID.EphemeralIdentity, round rounds.Round) - // Stringer interface for debugging + // Stringer interface for debugging. fmt.Stringer } -- GitLab