diff --git a/fileTransfer2/e2e/listener.go b/fileTransfer2/e2e/listener.go index 33e8aefca35c26861ecbcaed3024f3304e4d1f27..79aeed2963e4bdbd678871b8c0d126593138d3bd 100644 --- a/fileTransfer2/e2e/listener.go +++ b/fileTransfer2/e2e/listener.go @@ -8,11 +8,9 @@ package e2e import ( - "github.com/golang/protobuf/proto" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/e2e/receive" - "gitlab.com/elixxir/client/fileTransfer2" - ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" + ft "gitlab.com/elixxir/client/fileTransfer2" ) // Error messages. @@ -36,27 +34,23 @@ type listener struct { // messages. func (l *listener) Hear(msg receive.Message) { // Unmarshal the request message - newFT := &fileTransfer2.NewFileTransfer{} - err := proto.Unmarshal(msg.Payload, newFT) + info, err := ft.UnmarshalTransferInfo(msg.Payload) if err != nil { jww.ERROR.Printf(errProtoUnmarshal, err) return } - transferKey := ftCrypto.UnmarshalTransferKey(newFT.GetTransferKey()) - // Add new transfer to start receiving parts - tid, err := l.m.HandleIncomingTransfer(newFT.FileName, &transferKey, - newFT.TransferMac, uint16(newFT.NumParts), newFT.Size, newFT.Retry, - nil, 0) + tid, err := l.m.HandleIncomingTransfer(info.FileName, &info.Key, info.Mac, + info.NumParts, info.Size, info.Retry, nil, 0) if err != nil { - jww.ERROR.Printf(errNewReceivedTransfer, newFT.FileName, err) + jww.ERROR.Printf(errNewReceivedTransfer, info.FileName, err) return } // Call the reception callback - go l.m.receiveCB(tid, newFT.FileName, newFT.FileType, msg.Sender, - newFT.Size, newFT.Preview) + go l.m.receiveCB( + tid, info.FileName, info.FileType, msg.Sender, info.Size, info.Preview) } // Name returns a name used for debugging. diff --git a/fileTransfer2/e2e/manager.go b/fileTransfer2/e2e/manager.go index d7955bcbdd90281e29feace52b2e8e6b4eecce51..e7bdc16435f1d1f3cdaf81e984882a99cb9e7dac 100644 --- a/fileTransfer2/e2e/manager.go +++ b/fileTransfer2/e2e/manager.go @@ -102,25 +102,31 @@ func (m *Manager) Send(fileName, fileType string, fileData []byte, return sendNewFileTransferMessage(recipient, info, m.e2e) } + modifiedProgressCB := m.addEndMessageToCallback(progressCB) + return m.ft.Send(fileName, fileType, fileData, recipient, retry, preview, - progressCB, period, sendNew) + modifiedProgressCB, period, sendNew) } func (m *Manager) RegisterSentProgressCallback(tid *ftCrypto.TransferID, progressCB ft.SentProgressCallback, period time.Duration) error { - progressCB2 := func(completed bool, arrived, total uint16, - t ft.FilePartTracker, err error) { + modifiedProgressCB := m.addEndMessageToCallback(progressCB) + + return m.ft.RegisterSentProgressCallback(tid, modifiedProgressCB, period) +} + +func (m *Manager) addEndMessageToCallback(progressCB ft.SentProgressCallback) ft.SentProgressCallback { + return func(completed bool, arrived, total uint16, + st ft.SentTransfer, t ft.FilePartTracker, err error) { // If the transfer is completed, send last message informing recipient if completed { - sendEndFileTransferMessage(nil, m.cmix, m.e2e) + sendEndFileTransferMessage(st.Recipient(), m.cmix, m.e2e) } - progressCB(completed, arrived, total, t, err) + progressCB(completed, arrived, total, st, t, err) } - - return m.ft.RegisterSentProgressCallback(tid, progressCB, period) } func (m *Manager) CloseSend(tid *ftCrypto.TransferID) error { diff --git a/fileTransfer2/e2e/manager_test.go b/fileTransfer2/e2e/manager_test.go index bfe61b09f460f57f7bea3defb3a12d08d95e3138..07071609c7066ba205e373078b517481c9c12b65 100644 --- a/fileTransfer2/e2e/manager_test.go +++ b/fileTransfer2/e2e/manager_test.go @@ -26,9 +26,6 @@ import ( "time" ) -// Tests that Manager adheres to the fileTransfer2.FileTransfer interface. -var _ ft.FileTransfer = (*Manager)(nil) - // Tests that E2e adheres to the e2e.Handler interface. var _ E2e = (e2e.Handler)(nil) @@ -64,12 +61,11 @@ func Test_FileTransfer_Smoke(t *testing.T) { e2e1 := newMockE2e(myID1, e2eHandler) e2e1.RegisterListener( myID1, catalog.EndFileTransfer, newMockListener(endE2eChan1)) - ftm1, err := NewManager(receiveCB1, params, myID1, e2e1, + m1, err := NewManager(receiveCB1, params, myID1, e2e1, newMockCmix(myID1, cMixHandler), kv1, rngGen) if err != nil { t.Errorf("Failed to create new file transfer manager 1: %+v", err) } - m1 := ftm1.(*Manager) stop1, err := m1.StartProcesses() if err != nil { @@ -89,12 +85,11 @@ func Test_FileTransfer_Smoke(t *testing.T) { e2e2 := newMockE2e(myID2, e2eHandler) e2e2.RegisterListener( myID2, catalog.EndFileTransfer, newMockListener(endE2eChan2)) - ftm2, err := NewManager(receiveCB2, params, myID2, e2e2, + m2, err := NewManager(receiveCB2, params, myID2, e2e2, newMockCmix(myID2, cMixHandler), kv2, rngGen) if err != nil { t.Errorf("Failed to create new file transfer manager 2: %+v", err) } - m2 := ftm2.(*Manager) stop2, err := m2.StartProcesses() if err != nil { @@ -121,7 +116,7 @@ func Test_FileTransfer_Smoke(t *testing.T) { select { case r := <-receiveCbChan2: receiveProgressCB := func(completed bool, received, total uint16, - fpt ft.FilePartTracker, err error) { + rt ft.ReceivedTransfer, fpt ft.FilePartTracker, err error) { if completed && !called { timeReceived <- netTime.Now() receivedFile, err2 := m2.Receive(r.tid) @@ -152,7 +147,7 @@ func Test_FileTransfer_Smoke(t *testing.T) { // Define sent progress callback wg.Add(1) sentProgressCb1 := func(completed bool, arrived, total uint16, - fpt ft.FilePartTracker, err error) { + st ft.SentTransfer, fpt ft.FilePartTracker, err error) { if completed { wg.Done() } @@ -181,6 +176,12 @@ func Test_FileTransfer_Smoke(t *testing.T) { // Wait for file to be sent and received wg.Wait() + select { + case <-endE2eChan2: + case <-time.After(15 * time.Millisecond): + t.Errorf("Timed out waiting for end file transfer message.") + } + err = m1.CloseSend(tid1) if err != nil { t.Errorf("Failed to close transfer: %+v", err) diff --git a/fileTransfer2/e2e/send.go b/fileTransfer2/e2e/send.go index 55720bb100b7ae3a923409675fa820248fe53471..8529ecf4e79367d4f2b192f96589894d28969749 100644 --- a/fileTransfer2/e2e/send.go +++ b/fileTransfer2/e2e/send.go @@ -8,7 +8,6 @@ package e2e import ( - "github.com/golang/protobuf/proto" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/catalog" @@ -20,7 +19,7 @@ import ( // Error messages. const ( // sendNewFileTransferMessage - errProtoMarshal = "failed to proto marshal NewFileTransfer: %+v" + errMarshalInfo = "failed to marshal new transfer info: %+v" errNewFtSendE2e = "failed to send initial file transfer message via E2E: %+v" // sendEndFileTransferMessage @@ -42,22 +41,10 @@ const ( func sendNewFileTransferMessage( recipient *id.ID, info *ft.TransferInfo, e2eHandler E2e) error { - // Construct NewFileTransfer message - protoMsg := &ft.NewFileTransfer{ - FileName: info.FileName, - FileType: info.FileType, - TransferKey: info.Key.Bytes(), - TransferMac: info.Mac, - NumParts: uint32(info.NumParts), - Size: info.Size, - Retry: info.Retry, - Preview: info.Preview, - } - // Marshal the message - payload, err := proto.Marshal(protoMsg) + payload, err := info.Marshal() if err != nil { - return errors.Errorf(errProtoMarshal, err) + return errors.Errorf(errMarshalInfo, err) } // Get E2E parameters diff --git a/fileTransfer2/groupChat/manager.go b/fileTransfer2/groupChat/manager.go index 75a5e366bfbe6e3dd503717fce9ead335d3fb64b..47cd7c434f665a91aa6037ab421796965e06f106 100644 --- a/fileTransfer2/groupChat/manager.go +++ b/fileTransfer2/groupChat/manager.go @@ -15,6 +15,7 @@ import ( "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/crypto/fastRNG" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" + "gitlab.com/elixxir/crypto/group" "gitlab.com/xx_network/primitives/id" "time" ) @@ -24,7 +25,7 @@ const ( // NewManager errNewFtManager = "cannot create new group chat file transfer manager: %+v" - // manager.StartProcesses + // Manager.StartProcesses errAddNewService = "failed to add service to receive new group file transfers: %+v" ) @@ -36,32 +37,40 @@ const ( endFileTransferTag = "EndGroupFileTransfer" ) -// manager handles the sending and receiving of file transfers for group chats. -type manager struct { +// Manager handles the sending and receiving of file transfers for group chats. +type Manager struct { // Callback that is called every time a new file transfer is received receiveCB ft.ReceiveCallback - // File transfer manager + // File transfer Manager ft ft.FileTransfer - // Group chat manager - gc groupChat.GroupChat + // Group chat Manager + gc GroupChat myID *id.ID cmix ft.Cmix } -// NewManager generates a new file transfer manager for group chat. +// GroupChat interface matches a subset of the groupChat.GroupChat methods used +// by the Manager for easier testing. +type GroupChat interface { + Send(groupID *id.ID, tag string, message []byte) ( + id.Round, time.Time, group.MessageID, error) + AddService(tag string, p groupChat.Processor) error +} + +// NewManager generates a new file transfer Manager for group chat. func NewManager(receiveCB ft.ReceiveCallback, params ft.Params, myID *id.ID, - gc groupChat.GroupChat, cmix ft.Cmix, kv *versioned.KV, - rng *fastRNG.StreamGenerator) (ft.FileTransfer, error) { + gc GroupChat, cmix ft.Cmix, kv *versioned.KV, rng *fastRNG.StreamGenerator) ( + *Manager, error) { ftManager, err := ft.NewManager(params, myID, cmix, kv, rng) if err != nil { return nil, errors.Errorf(errNewFtManager, err) } - return &manager{ + return &Manager{ receiveCB: receiveCB, ft: ftManager, gc: gc, @@ -70,49 +79,52 @@ func NewManager(receiveCB ft.ReceiveCallback, params ft.Params, myID *id.ID, }, nil } -func (m *manager) StartProcesses() (stoppable.Stoppable, error) { +func (m *Manager) StartProcesses() (stoppable.Stoppable, error) { err := m.gc.AddService(newFileTransferTag, &processor{m}) if err != nil { return nil, errors.Errorf(errAddNewService, err) } - return m.StartProcesses() + return m.ft.StartProcesses() } -func (m *manager) MaxFileNameLen() int { - return m.MaxFileNameLen() +func (m *Manager) MaxFileNameLen() int { + return m.ft.MaxFileNameLen() } -func (m *manager) MaxFileTypeLen() int { - return m.MaxFileTypeLen() +func (m *Manager) MaxFileTypeLen() int { + return m.ft.MaxFileTypeLen() } -func (m *manager) MaxFileSize() int { - return m.MaxFileSize() +func (m *Manager) MaxFileSize() int { + return m.ft.MaxFileSize() } -func (m *manager) MaxPreviewSize() int { - return m.MaxPreviewSize() +func (m *Manager) MaxPreviewSize() int { + return m.ft.MaxPreviewSize() } -func (m *manager) Send(fileName, fileType string, fileData []byte, +func (m *Manager) Send(fileName, fileType string, fileData []byte, recipient *id.ID, retry float32, preview []byte, - progressCB ft.SentProgressCallback, period time.Duration, - sendNew ft.SendNew) (*ftCrypto.TransferID, error) { + progressCB ft.SentProgressCallback, period time.Duration) (*ftCrypto.TransferID, error) { + sendNew := func(info *ft.TransferInfo) error { + return sendNewFileTransferMessage(recipient, info, m.gc) + } + return m.ft.Send(fileName, fileType, fileData, recipient, retry, preview, progressCB, period, sendNew) } -func (m *manager) RegisterSentProgressCallback(tid *ftCrypto.TransferID, +func (m *Manager) RegisterSentProgressCallback(tid *ftCrypto.TransferID, progressCB ft.SentProgressCallback, period time.Duration) error { return m.ft.RegisterSentProgressCallback(tid, progressCB, period) } -func (m *manager) CloseSend(tid *ftCrypto.TransferID) error { - return m.CloseSend(tid) +func (m *Manager) CloseSend(tid *ftCrypto.TransferID) error { + return m.ft.CloseSend(tid) } -func (m *manager) HandleIncomingTransfer(fileName string, +func (m *Manager) HandleIncomingTransfer(fileName string, key *ftCrypto.TransferKey, transferMAC []byte, numParts uint16, size uint32, retry float32, progressCB ft.ReceivedProgressCallback, period time.Duration) (*ftCrypto.TransferID, error) { @@ -120,11 +132,11 @@ func (m *manager) HandleIncomingTransfer(fileName string, fileName, key, transferMAC, numParts, size, retry, progressCB, period) } -func (m *manager) RegisterReceivedProgressCallback(tid *ftCrypto.TransferID, +func (m *Manager) RegisterReceivedProgressCallback(tid *ftCrypto.TransferID, progressCB ft.ReceivedProgressCallback, period time.Duration) error { return m.ft.RegisterReceivedProgressCallback(tid, progressCB, period) } -func (m *manager) Receive(tid *ftCrypto.TransferID) ([]byte, error) { - return m.Receive(tid) +func (m *Manager) Receive(tid *ftCrypto.TransferID) ([]byte, error) { + return m.ft.Receive(tid) } diff --git a/fileTransfer2/groupChat/manager_test.go b/fileTransfer2/groupChat/manager_test.go new file mode 100644 index 0000000000000000000000000000000000000000..925fe764ca1d17547cf7ad7e7829bf8637e304c6 --- /dev/null +++ b/fileTransfer2/groupChat/manager_test.go @@ -0,0 +1,203 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package groupChat + +import ( + "bytes" + "gitlab.com/elixxir/client/e2e/receive" + ft "gitlab.com/elixxir/client/fileTransfer2" + "gitlab.com/elixxir/client/groupChat" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/crypto/fastRNG" + ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" + "gitlab.com/elixxir/ekv" + "gitlab.com/xx_network/crypto/csprng" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" + "math" + "sync" + "testing" + "time" +) + +// Tests that E2e adheres to the e2e.Handler interface. +var _ GroupChat = (groupChat.GroupChat)(nil) + +// Smoke test of the entire file transfer system. +func Test_FileTransfer_Smoke(t *testing.T) { + // jww.SetStdoutThreshold(jww.LevelDebug) + // Set up cMix and E2E message handlers + cMixHandler := newMockCmixHandler() + gcHandler := newMockGcHandler() + rngGen := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG) + params := ft.DefaultParams() + params.MaxThroughput = math.MaxInt + + type receiveCbValues struct { + tid *ftCrypto.TransferID + fileName string + fileType string + sender *id.ID + size uint32 + preview []byte + } + + // Set up the first client + receiveCbChan1 := make(chan receiveCbValues, 10) + receiveCB1 := func(tid *ftCrypto.TransferID, fileName, fileType string, + sender *id.ID, size uint32, preview []byte) { + receiveCbChan1 <- receiveCbValues{ + tid, fileName, fileType, sender, size, preview} + } + myID1 := id.NewIdFromString("myID1", id.User, t) + kv1 := versioned.NewKV(ekv.MakeMemstore()) + gc1 := newMockGC(gcHandler) + m1, err := NewManager(receiveCB1, params, myID1, gc1, + newMockCmix(myID1, cMixHandler), kv1, rngGen) + if err != nil { + t.Errorf("Failed to create new file transfer manager 1: %+v", err) + } + + stop1, err := m1.StartProcesses() + if err != nil { + t.Errorf("Failed to start processes for manager 1: %+v", err) + } + + // Set up the second client + receiveCbChan2 := make(chan receiveCbValues, 10) + receiveCB2 := func(tid *ftCrypto.TransferID, fileName, fileType string, + sender *id.ID, size uint32, preview []byte) { + receiveCbChan2 <- receiveCbValues{ + tid, fileName, fileType, sender, size, preview} + } + myID2 := id.NewIdFromString("myID2", id.User, t) + kv2 := versioned.NewKV(ekv.MakeMemstore()) + endE2eChan2 := make(chan receive.Message, 3) + gc2 := newMockGC(gcHandler) /**/ + m2, err := NewManager(receiveCB2, params, myID2, gc2, + newMockCmix(myID2, cMixHandler), kv2, rngGen) + if err != nil { + t.Errorf("Failed to create new file transfer manager 2: %+v", err) + } + + stop2, err := m2.StartProcesses() + if err != nil { + t.Errorf("Failed to start processes for manager 2: %+v", err) + } + + // Wait group prevents the test from quiting before the file has completed + // sending and receiving + var wg sync.WaitGroup + + // Define details of file to send + fileName, fileType := "myFile", "txt" + fileData := []byte(loremIpsum) + preview := []byte("Lorem ipsum dolor sit amet") + retry := float32(2.0) + + // Create go func that waits for file transfer to be received to register + // a progress callback that then checks that the file received is correct + // when done + wg.Add(1) + var called bool + timeReceived := make(chan time.Time) + go func() { + select { + case r := <-receiveCbChan2: + receiveProgressCB := func(completed bool, received, total uint16, + rt ft.ReceivedTransfer, fpt ft.FilePartTracker, err error) { + if completed && !called { + timeReceived <- netTime.Now() + receivedFile, err2 := m2.Receive(r.tid) + if err2 != nil { + t.Errorf("Failed to receive file: %+v", err2) + } + + if !bytes.Equal(fileData, receivedFile) { + t.Errorf("Received file does not match sent."+ + "\nsent: %q\nreceived: %q", + fileData, receivedFile) + } + wg.Done() + } + } + err3 := m2.RegisterReceivedProgressCallback( + r.tid, receiveProgressCB, 0) + if err3 != nil { + t.Errorf( + "Failed to Rregister received progress callback: %+v", err3) + } + case <-time.After(2100 * time.Millisecond): + t.Errorf("Timed out waiting to receive new file transfer.") + wg.Done() + } + }() + + // Define sent progress callback + wg.Add(1) + sentProgressCb1 := func(completed bool, arrived, total uint16, + st ft.SentTransfer, fpt ft.FilePartTracker, err error) { + if completed { + wg.Done() + } + } + + // Send file + sendStart := netTime.Now() + tid1, err := m1.Send( + fileName, fileType, fileData, myID2, retry, preview, sentProgressCb1, 0) + if err != nil { + t.Errorf("Failed to send file: %+v", err) + } + + go func() { + select { + case tr := <-timeReceived: + fileSize := len(fileData) + sendTime := tr.Sub(sendStart) + fileSizeKb := float32(fileSize) * .001 + speed := fileSizeKb * float32(time.Second) / (float32(sendTime)) + t.Logf("Completed receiving file %q in %s (%.2f kb @ %.2f kb/s).", + fileName, sendTime, fileSizeKb, speed) + } + }() + + // Wait for file to be sent and received + wg.Wait() + + select { + case <-endE2eChan2: + case <-time.After(15 * time.Millisecond): + t.Errorf("Timed out waiting for end file transfer message.") + } + + err = m1.CloseSend(tid1) + if err != nil { + t.Errorf("Failed to close transfer: %+v", err) + } + + err = stop1.Close() + if err != nil { + t.Errorf("Failed to close processes for manager 1: %+v", err) + } + + err = stop2.Close() + if err != nil { + t.Errorf("Failed to close processes for manager 2: %+v", err) + } +} + +const loremIpsum = `Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed sit amet urna venenatis, rutrum magna maximus, tempor orci. Cras sit amet nulla id dolor blandit commodo. Suspendisse potenti. Praesent gravida porttitor metus vel aliquam. Maecenas rutrum velit at lobortis auctor. Mauris porta blandit tempor. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Morbi volutpat posuere maximus. Nunc in augue molestie ante mattis tempor. + +Phasellus placerat elit eu fringilla pharetra. Vestibulum consectetur pulvinar nunc, vestibulum tincidunt felis rhoncus sit amet. Duis non dolor eleifend nibh luctus eleifend. Nunc urna odio, euismod sit amet feugiat ut, dapibus vel elit. Nulla est mauris, posuere eget enim cursus, vehicula viverra est. Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quisque mattis, nisi quis consectetur semper, neque enim rhoncus dolor, ut aliquam leo orci sed dolor. Integer ullamcorper pulvinar turpis, a sollicitudin nunc posuere et. Nullam orci nibh, facilisis ac massa eu, bibendum bibendum sapien. Sed tincidunt nunc mauris, nec ullamcorper enim lacinia nec. Nulla dapibus sapien ut odio bibendum, tempus ornare sapien lacinia. + +Duis ac hendrerit augue. Nullam porttitor feugiat finibus. Nam enim urna, maximus et ligula eu, aliquet convallis turpis. Vestibulum luctus quam in dictum efficitur. Vestibulum ac pulvinar ipsum. Vivamus consectetur augue nec tellus mollis, at iaculis magna efficitur. Nunc dictum convallis sem, at vehicula nulla accumsan non. Nullam blandit orci vel turpis convallis, mollis porttitor felis accumsan. Sed non posuere leo. Proin ultricies varius nulla at ultricies. Phasellus et pharetra justo. Quisque eu orci odio. Pellentesque pharetra tempor tempor. Aliquam ac nulla lorem. Sed dignissim ligula sit amet nibh fermentum facilisis. + +Donec facilisis rhoncus ante. Duis nec nisi et dolor congue semper vel id ligula. Mauris non eleifend libero, et sodales urna. Nullam pharetra gravida velit non mollis. Integer vel ultrices libero, at ultrices magna. Duis semper risus a leo vulputate consectetur. Cras sit amet convallis sapien. Sed blandit, felis et porttitor fringilla, urna tellus commodo metus, at pharetra nibh urna sed sem. Nam ex dui, posuere id mi et, egestas tincidunt est. Nullam elementum pulvinar diam in maximus. Maecenas vel augue vitae nunc consectetur vestibulum in aliquet lacus. Nullam nec lectus dapibus, dictum nisi nec, congue quam. Suspendisse mollis vel diam nec dapibus. Mauris neque justo, scelerisque et suscipit non, imperdiet eget leo. Vestibulum leo turpis, dapibus ac lorem a, mollis pulvinar quam. + +Sed sed mauris a neque dignissim aliquet. Aliquam congue gravida velit in efficitur. Integer elementum feugiat est, ac lacinia libero bibendum sed. Sed vestibulum suscipit dignissim. Nunc scelerisque, turpis quis varius tristique, enim lacus vehicula lacus, id vestibulum velit erat eu odio. Donec tincidunt nunc sit amet sapien varius ornare. Phasellus semper venenatis ligula eget euismod. Mauris sodales massa tempor, cursus velit a, feugiat neque. Sed odio justo, rhoncus eu fermentum non, tristique a quam. In vehicula in tortor nec iaculis. Cras ligula sem, sollicitudin at nulla eget, placerat lacinia massa. Mauris tempus quam sit amet leo efficitur egestas. Proin iaculis, velit in blandit egestas, felis odio sollicitudin ipsum, eget interdum leo odio tempor nisi. Curabitur sed mauris id turpis tempor finibus ut mollis lectus. Curabitur neque libero, aliquam facilisis lobortis eget, posuere in augue. In sodales urna sit amet elit euismod rhoncus.` diff --git a/fileTransfer2/groupChat/processor.go b/fileTransfer2/groupChat/processor.go index 33ea4d0e0ca3047efe642fe869fa59cc46e460ae..d72dcdd9143aa1c7fc816fd696667882b9521a92 100644 --- a/fileTransfer2/groupChat/processor.go +++ b/fileTransfer2/groupChat/processor.go @@ -8,13 +8,11 @@ package groupChat import ( - "github.com/golang/protobuf/proto" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/rounds" ft "gitlab.com/elixxir/client/fileTransfer2" "gitlab.com/elixxir/client/groupChat" - ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" "gitlab.com/elixxir/primitives/format" ) @@ -26,33 +24,29 @@ const ( ) type processor struct { - *manager + *Manager } func (p *processor) Process(decryptedMsg groupChat.MessageReceive, _ format.Message, _ receptionID.EphemeralIdentity, _ rounds.Round) { // Unmarshal the request message - var newFT ft.NewFileTransfer - err := proto.Unmarshal(decryptedMsg.Payload, &newFT) + info, err := ft.UnmarshalTransferInfo(decryptedMsg.Payload) if err != nil { jww.ERROR.Printf(errProtoUnmarshal, err) return } - transferKey := ftCrypto.UnmarshalTransferKey(newFT.GetTransferKey()) - // Add new transfer to start receiving parts - tid, err := p.HandleIncomingTransfer(newFT.FileName, &transferKey, - newFT.TransferMac, uint16(newFT.NumParts), newFT.Size, newFT.Retry, - nil, 0) + tid, err := p.HandleIncomingTransfer(info.FileName, &info.Key, info.Mac, + info.NumParts, info.Size, info.Retry, nil, 0) if err != nil { - jww.ERROR.Printf(errNewReceivedTransfer, newFT.FileName, err) + jww.ERROR.Printf(errNewReceivedTransfer, info.FileName, err) return } // Call the reception callback - go p.receiveCB(tid, newFT.FileName, newFT.FileType, decryptedMsg.SenderID, - newFT.Size, newFT.Preview) + go p.receiveCB(tid, info.FileName, info.FileType, decryptedMsg.SenderID, + info.Size, info.Preview) } func (p *processor) String() string { diff --git a/fileTransfer2/groupChat/send.go b/fileTransfer2/groupChat/send.go index a15fa68bb7fdf5dc1b4e8050e55b94797610d76f..aabc560030669012e33f8b12009524328c4af223 100644 --- a/fileTransfer2/groupChat/send.go +++ b/fileTransfer2/groupChat/send.go @@ -8,18 +8,16 @@ package groupChat import ( - "github.com/golang/protobuf/proto" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" ft "gitlab.com/elixxir/client/fileTransfer2" - "gitlab.com/elixxir/client/groupChat" "gitlab.com/xx_network/primitives/id" ) // Error messages. const ( // sendNewFileTransferMessage - errProtoMarshal = "failed to proto marshal NewFileTransfer: %+v" + errMarshalInfo = "failed to marshal new transfer info: %+v" errNewFtSendGroupChat = "failed to send initial file transfer message via group chat: %+v" // sendEndFileTransferMessage @@ -29,24 +27,12 @@ const ( // sendNewFileTransferMessage sends a group chat message to the group ID // informing them of the incoming file transfer. func sendNewFileTransferMessage( - groupID *id.ID, info *ft.TransferInfo, gc groupChat.GroupChat) error { - - // Construct NewFileTransfer message - protoMsg := &ft.NewFileTransfer{ - FileName: info.FileName, - FileType: info.FileType, - TransferKey: info.Key.Bytes(), - TransferMac: info.Mac, - NumParts: uint32(info.NumParts), - Size: info.Size, - Retry: info.Retry, - Preview: info.Preview, - } + groupID *id.ID, info *ft.TransferInfo, gc GroupChat) error { // Marshal the message - payload, err := proto.Marshal(protoMsg) + payload, err := info.Marshal() if err != nil { - return errors.Errorf(errProtoMarshal, err) + return errors.Errorf(errMarshalInfo, err) } // Send the message via group chat @@ -60,7 +46,7 @@ func sendNewFileTransferMessage( // sendEndFileTransferMessage sends a group chat message to the group ID // informing them that all file parts have arrived once the network is healthy. -func sendEndFileTransferMessage(groupID *id.ID, gc groupChat.GroupChat) { +func sendEndFileTransferMessage(groupID *id.ID, gc GroupChat) { _, _, _, err := gc.Send(groupID, endFileTransferTag, nil) if err != nil { jww.ERROR.Printf(errEndFtSendGroupChat, err) diff --git a/fileTransfer2/groupChat/utils_test.go b/fileTransfer2/groupChat/utils_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c5f1799afea975f6848d5bfe0da885f62a81ef25 --- /dev/null +++ b/fileTransfer2/groupChat/utils_test.go @@ -0,0 +1,166 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package groupChat + +import ( + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/message" + "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/client/groupChat" + "gitlab.com/elixxir/crypto/group" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" + "sync" + "time" +) + +//////////////////////////////////////////////////////////////////////////////// +// Mock cMix Client // +//////////////////////////////////////////////////////////////////////////////// + +type mockCmixHandler struct { + sync.Mutex + processorMap map[format.Fingerprint]message.Processor +} + +func newMockCmixHandler() *mockCmixHandler { + return &mockCmixHandler{ + processorMap: make(map[format.Fingerprint]message.Processor), + } +} + +type mockCmix struct { + myID *id.ID + numPrimeBytes int + health bool + handler *mockCmixHandler + healthCBs map[uint64]func(b bool) + healthIndex uint64 + sync.Mutex +} + +func newMockCmix(myID *id.ID, handler *mockCmixHandler) *mockCmix { + return &mockCmix{ + myID: myID, + numPrimeBytes: 97, + // numPrimeBytes: 4096, + health: true, + handler: handler, + healthCBs: make(map[uint64]func(b bool)), + healthIndex: 0, + } +} + +func (m *mockCmix) GetMaxMessageLength() int { + msg := format.NewMessage(m.numPrimeBytes) + return msg.ContentsSize() +} + +func (m *mockCmix) SendMany(messages []cmix.TargetedCmixMessage, + _ cmix.CMIXParams) (id.Round, []ephemeral.Id, error) { + m.handler.Lock() + for _, targetedMsg := range messages { + msg := format.NewMessage(m.numPrimeBytes) + msg.SetContents(targetedMsg.Payload) + msg.SetMac(targetedMsg.Mac) + msg.SetKeyFP(targetedMsg.Fingerprint) + m.handler.processorMap[targetedMsg.Fingerprint].Process(msg, + receptionID.EphemeralIdentity{Source: targetedMsg.Recipient}, + rounds.Round{ID: 42}) + } + m.handler.Unlock() + return 42, []ephemeral.Id{}, nil +} + +func (m *mockCmix) AddFingerprint(_ *id.ID, fp format.Fingerprint, mp message.Processor) error { + m.Lock() + defer m.Unlock() + m.handler.processorMap[fp] = mp + return nil +} + +func (m *mockCmix) DeleteFingerprint(_ *id.ID, fp format.Fingerprint) { + m.handler.Lock() + delete(m.handler.processorMap, fp) + m.handler.Unlock() +} + +func (m *mockCmix) IsHealthy() bool { + return m.health +} + +func (m *mockCmix) WasHealthy() bool { return true } + +func (m *mockCmix) AddHealthCallback(f func(bool)) uint64 { + m.Lock() + defer m.Unlock() + m.healthIndex++ + m.healthCBs[m.healthIndex] = f + go f(true) + return m.healthIndex +} + +func (m *mockCmix) RemoveHealthCallback(healthID uint64) { + m.Lock() + defer m.Unlock() + if _, exists := m.healthCBs[healthID]; !exists { + jww.FATAL.Panicf("No health callback with ID %d exists.", healthID) + } + delete(m.healthCBs, healthID) +} + +func (m *mockCmix) GetRoundResults(_ time.Duration, + roundCallback cmix.RoundEventCallback, _ ...id.Round) error { + go roundCallback(true, false, map[id.Round]cmix.RoundResult{42: {}}) + return nil +} + +//////////////////////////////////////////////////////////////////////////////// +// Mock Group Chat Manager // +//////////////////////////////////////////////////////////////////////////////// +type mockGcHandler struct { + services map[string]groupChat.Processor + sync.Mutex +} + +func newMockGcHandler() *mockGcHandler { + return &mockGcHandler{ + services: make(map[string]groupChat.Processor), + } +} + +type mockGC struct { + handler *mockGcHandler +} + +func newMockGC(handler *mockGcHandler) *mockGC { + return &mockGC{ + handler: handler, + } +} + +func (m *mockGC) Send(groupID *id.ID, tag string, message []byte) ( + id.Round, time.Time, group.MessageID, error) { + m.handler.Lock() + defer m.handler.Unlock() + m.handler.services[tag].Process(groupChat.MessageReceive{ + GroupID: groupID, + Payload: message, + }, format.Message{}, receptionID.EphemeralIdentity{}, rounds.Round{}) + return 0, time.Time{}, group.MessageID{}, nil +} + +func (m *mockGC) AddService(tag string, p groupChat.Processor) error { + m.handler.Lock() + defer m.handler.Unlock() + m.handler.services[tag] = p + return nil +} diff --git a/fileTransfer2/interface.go b/fileTransfer2/interface.go index 9c2100c4aebe2058373821b506157fb91b9d7064..671399e8bc9bb6ed4bf6f91e985407fb245db834 100644 --- a/fileTransfer2/interface.go +++ b/fileTransfer2/interface.go @@ -18,12 +18,12 @@ import ( // SentProgressCallback is a callback function that tracks the progress of // sending a file. type SentProgressCallback func(completed bool, arrived, total uint16, - t FilePartTracker, err error) + st SentTransfer, t FilePartTracker, err error) // ReceivedProgressCallback is a callback function that tracks the progress of // receiving a file. type ReceivedProgressCallback func(completed bool, received, total uint16, - t FilePartTracker, err error) + rt ReceivedTransfer, t FilePartTracker, err error) // ReceiveCallback is a callback function that notifies the receiver of an // incoming file transfer. @@ -191,6 +191,22 @@ type FileTransfer interface { Receive(tid *ftCrypto.TransferID) ([]byte, error) } +type SentTransfer interface { + Recipient() *id.ID + Transfer +} + +type ReceivedTransfer interface { + Transfer +} + +type Transfer interface { + TransferID() *ftCrypto.TransferID + FileName() string + FileSize() uint32 + NumParts() uint16 +} + // FilePartTracker tracks the status of each file part in a sent or received // file transfer. type FilePartTracker interface { diff --git a/fileTransfer2/manager.go b/fileTransfer2/manager.go index 90d47bca18547f4bffa553fc4a7dc59dd2c78c6a..52283f9575d665a256a68d364e461819670de37e 100644 --- a/fileTransfer2/manager.go +++ b/fileTransfer2/manager.go @@ -295,7 +295,8 @@ func (m *manager) Send(fileName, fileType string, fileData []byte, numFps := calcNumberOfFingerprints(len(parts), retry) // Create new sent transfer - st, err := m.sent.AddTransfer(recipient, &key, &tid, fileName, parts, numFps) + st, err := m.sent.AddTransfer( + recipient, &key, &tid, fileName, fileSize, parts, numFps) if err != nil { return nil, errors.Errorf(errAddSentTransfer, err) } @@ -343,7 +344,7 @@ func (m *manager) registerSentProgressCallback(st *store.SentTransfer, tracker := &sentFilePartTracker{st.CopyPartStatusVector()} // Call the progress callback - progressCB(completed, arrived, total, tracker, err) + progressCB(completed, arrived, total, st, tracker, err) } // Add the callback to the callback tracker @@ -407,7 +408,7 @@ func (m *manager) HandleIncomingTransfer(fileName string, // Store the transfer rt, err := m.received.AddTransfer( - key, &tid, fileName, transferMAC, numParts, numFps, size) + key, &tid, fileName, transferMAC, size, numParts, numFps) if err != nil { return nil, errors.Errorf(errAddNewRt, tid, fileName, err) } @@ -496,7 +497,7 @@ func (m *manager) registerReceivedProgressCallback(rt *store.ReceivedTransfer, tracker := &receivedFilePartTracker{rt.CopyPartStatusVector()} // Call the progress callback - progressCB(completed, received, total, tracker, err) + progressCB(completed, received, total, rt, tracker, err) } // Add the callback to the callback tracker diff --git a/fileTransfer2/manager_test.go b/fileTransfer2/manager_test.go index 8ae9574b06e86cc7fc6fef53085267a6bf659d15..b2220ffec9dbcf6c5862fe7af45a4a2e4bc45baf 100644 --- a/fileTransfer2/manager_test.go +++ b/fileTransfer2/manager_test.go @@ -145,7 +145,7 @@ func Test_FileTransfer_Smoke(t *testing.T) { t.Errorf("Failed to add transfer: %+v", err) } receiveProgressCB := func(completed bool, received, total uint16, - fpt FilePartTracker, err error) { + rt ReceivedTransfer, fpt FilePartTracker, err error) { if completed && !called { timeReceived <- netTime.Now() receivedFile, err2 := m2.Receive(tid) @@ -176,7 +176,7 @@ func Test_FileTransfer_Smoke(t *testing.T) { // Define sent progress callback wg.Add(1) sentProgressCb1 := func(completed bool, arrived, total uint16, - fpt FilePartTracker, err error) { + st SentTransfer, fpt FilePartTracker, err error) { if completed { wg.Done() } diff --git a/fileTransfer2/store/received.go b/fileTransfer2/store/received.go index 81c9439217d06679fe17d9e41968575526780adf..921ced2f624959cdbab4d345e92719f0d76c9ad5 100644 --- a/fileTransfer2/store/received.go +++ b/fileTransfer2/store/received.go @@ -100,8 +100,8 @@ func NewOrLoadReceived(kv *versioned.KV) (*Received, []*ReceivedTransfer, error) // AddTransfer adds the ReceivedTransfer to the map keyed on its transfer ID. func (r *Received) AddTransfer(key *ftCrypto.TransferKey, - tid *ftCrypto.TransferID, fileName string, transferMAC []byte, numParts, - numFps uint16, fileSize uint32) (*ReceivedTransfer, error) { + tid *ftCrypto.TransferID, fileName string, transferMAC []byte, + fileSize uint32, numParts, numFps uint16) (*ReceivedTransfer, error) { r.mux.Lock() defer r.mux.Unlock() @@ -111,8 +111,8 @@ func (r *Received) AddTransfer(key *ftCrypto.TransferKey, return nil, errors.Errorf(errAddExistingReceivedTransfer, tid) } - rt, err := newReceivedTransfer(key, tid, fileName, transferMAC, numParts, - numFps, fileSize, r.kv) + rt, err := newReceivedTransfer( + key, tid, fileName, transferMAC, fileSize, numParts, numFps, r.kv) if err != nil { return nil, err } diff --git a/fileTransfer2/store/receivedTransfer.go b/fileTransfer2/store/receivedTransfer.go index e2ce440a0456542ae5455166130a2d239ed5d44a..d067b86635c3c0d77bda84e08afd19904b0ccb00 100644 --- a/fileTransfer2/store/receivedTransfer.go +++ b/fileTransfer2/store/receivedTransfer.go @@ -80,12 +80,12 @@ type ReceivedTransfer struct { // The MAC for the entire file; used to verify the integrity of all parts transferMAC []byte - // The number of file parts in the file - numParts uint16 - // Size of the entire file in bytes fileSize uint32 + // The number of file parts in the file + numParts uint16 + // Saves each part in order (has its own storage backend) parts [][]byte @@ -99,8 +99,8 @@ type ReceivedTransfer struct { // newReceivedTransfer generates a ReceivedTransfer with the specified transfer // key, transfer ID, and a number of parts. func newReceivedTransfer(key *ftCrypto.TransferKey, tid *ftCrypto.TransferID, - fileName string, transferMAC []byte, numParts, numFps uint16, - fileSize uint32, kv *versioned.KV) (*ReceivedTransfer, error) { + fileName string, transferMAC []byte, fileSize uint32, numParts, + numFps uint16, kv *versioned.KV) (*ReceivedTransfer, error) { kv = kv.Prefix(makeReceivedTransferPrefix(tid)) // Create new cypher manager @@ -121,8 +121,8 @@ func newReceivedTransfer(key *ftCrypto.TransferKey, tid *ftCrypto.TransferID, tid: tid, fileName: fileName, transferMAC: transferMAC, - numParts: numParts, fileSize: fileSize, + numParts: numParts, parts: make([][]byte, numParts), partStatus: partStatus, kv: kv, @@ -175,11 +175,6 @@ func (rt *ReceivedTransfer) GetUnusedCyphers() []cypher.Cypher { return rt.cypherManager.GetUnusedCyphers() } -// NumParts returns the total number of file parts in the transfer. -func (rt *ReceivedTransfer) NumParts() uint16 { - return rt.numParts -} - // TransferID returns the transfer's ID. func (rt *ReceivedTransfer) TransferID() *ftCrypto.TransferID { return rt.tid @@ -190,6 +185,16 @@ func (rt *ReceivedTransfer) FileName() string { return rt.fileName } +// FileSize returns the size of the entire file transfer. +func (rt *ReceivedTransfer) FileSize() uint32 { + return rt.fileSize +} + +// NumParts returns the total number of file parts in the transfer. +func (rt *ReceivedTransfer) NumParts() uint16 { + return rt.numParts +} + // NumReceived returns the number of parts that have been received. func (rt *ReceivedTransfer) NumReceived() uint16 { rt.mux.RLock() @@ -254,8 +259,8 @@ func loadReceivedTransfer(tid *ftCrypto.TransferID, kv *versioned.KV) ( tid: tid, fileName: fileName, transferMAC: transferMAC, - numParts: numParts, fileSize: fileSize, + numParts: numParts, parts: parts, partStatus: partStatus, kv: kv, diff --git a/fileTransfer2/store/receivedTransfer_test.go b/fileTransfer2/store/receivedTransfer_test.go index 2fb0b4b9ab616e4e0f7745e3a5aa41063d0313be..09da21e307c60f7dd277e675afa51e1894a404b2 100644 --- a/fileTransfer2/store/receivedTransfer_test.go +++ b/fileTransfer2/store/receivedTransfer_test.go @@ -34,7 +34,7 @@ func Test_newReceivedTransfer(t *testing.T) { key, _ := ftCrypto.NewTransferKey(csprng.NewSystemRNG()) tid, _ := ftCrypto.NewTransferID(csprng.NewSystemRNG()) numFps := uint16(24) - parts := generateTestParts(16) + parts, _ := generateTestParts(16) fileSize := uint32(len(parts) * len(parts[0])) numParts := uint16(len(parts)) rtKv := kv.Prefix(makeReceivedTransferPrefix(&tid)) @@ -54,15 +54,15 @@ func Test_newReceivedTransfer(t *testing.T) { tid: &tid, fileName: "fileName", transferMAC: []byte("transferMAC"), - numParts: numParts, fileSize: fileSize, + numParts: numParts, parts: make([][]byte, numParts), partStatus: partStatus, kv: rtKv, } rt, err := newReceivedTransfer(&key, &tid, expected.fileName, - expected.transferMAC, numParts, numFps, fileSize, kv) + expected.transferMAC, fileSize, numParts, numFps, kv) if err != nil { t.Errorf("newReceivedTransfer returned an error: %+v", err) } @@ -76,7 +76,7 @@ func Test_newReceivedTransfer(t *testing.T) { // Tests that ReceivedTransfer.AddPart adds the part to the part list and marks // it as received func TestReceivedTransfer_AddPart(t *testing.T) { - rt, _, _, _ := newTestReceivedTransfer(16, t) + rt, _, _, _, _ := newTestReceivedTransfer(16, t) part := []byte("Part") partNum := 6 @@ -99,7 +99,7 @@ func TestReceivedTransfer_AddPart(t *testing.T) { // Tests that ReceivedTransfer.AddPart returns an error if the part number is // not within the range of part numbers func TestReceivedTransfer_AddPart_PartOutOfRangeError(t *testing.T) { - rt, _, _, _ := newTestReceivedTransfer(16, t) + rt, _, _, _, _ := newTestReceivedTransfer(16, t) expectedErr := fmt.Sprintf(errPartOutOfRange, rt.partStatus.GetNumKeys(), rt.partStatus.GetNumKeys()-1) @@ -115,9 +115,9 @@ func TestReceivedTransfer_AddPart_PartOutOfRangeError(t *testing.T) { // parts are added to the transfer. func TestReceivedTransfer_GetFile(t *testing.T) { // Generate parts and make last file part smaller than the rest - parts := generateTestParts(16) + parts, _ := generateTestParts(16) lastPartLen := 6 - rt, _, _, _ := newTestReceivedTransfer(uint16(len(parts)), t) + rt, _, _, _, _ := newTestReceivedTransfer(uint16(len(parts)), t) rt.fileSize = uint32((len(parts)-1)*len(parts[0]) + lastPartLen) for i, p := range parts { @@ -143,7 +143,7 @@ func TestReceivedTransfer_GetFile(t *testing.T) { // unused cyphers. func TestReceivedTransfer_GetUnusedCyphers(t *testing.T) { numParts := uint16(10) - rt, _, numFps, _ := newTestReceivedTransfer(numParts, t) + rt, _, _, numFps, _ := newTestReceivedTransfer(numParts, t) // Check that all cyphers are returned after initialisation unsentCyphers := rt.GetUnusedCyphers() @@ -182,20 +182,9 @@ func TestReceivedTransfer_GetUnusedCyphers(t *testing.T) { } } -// Tests that ReceivedTransfer.NumParts returns the correct number of parts. -func TestReceivedTransfer_NumParts(t *testing.T) { - numParts := uint16(16) - rt, _, _, _ := newTestReceivedTransfer(numParts, t) - - if rt.NumParts() != numParts { - t.Errorf("Incorrect number of parts.\nexpected: %d\nreceived: %d", - numParts, rt.NumParts()) - } -} - // Tests that ReceivedTransfer.TransferID returns the correct transfer ID. func TestReceivedTransfer_TransferID(t *testing.T) { - rt, _, _, _ := newTestReceivedTransfer(16, t) + rt, _, _, _, _ := newTestReceivedTransfer(16, t) if rt.TransferID() != rt.tid { t.Errorf("Incorrect transfer ID.\nexpected: %s\nreceived: %s", @@ -205,7 +194,7 @@ func TestReceivedTransfer_TransferID(t *testing.T) { // Tests that ReceivedTransfer.FileName returns the correct file name. func TestReceivedTransfer_FileName(t *testing.T) { - rt, _, _, _ := newTestReceivedTransfer(16, t) + rt, _, _, _, _ := newTestReceivedTransfer(16, t) if rt.FileName() != rt.fileName { t.Errorf("Incorrect transfer ID.\nexpected: %s\nreceived: %s", @@ -213,10 +202,32 @@ func TestReceivedTransfer_FileName(t *testing.T) { } } +// Tests that ReceivedTransfer.FileSize returns the correct file size. +func TestReceivedTransfer_FileSize(t *testing.T) { + rt, file, _, _, _ := newTestReceivedTransfer(16, t) + fileSize := uint32(len(file)) + + if rt.FileSize() != fileSize { + t.Errorf("Incorrect file size.\nexpected: %d\nreceived: %d", + fileSize, rt.FileSize()) + } +} + +// Tests that ReceivedTransfer.NumParts returns the correct number of parts. +func TestReceivedTransfer_NumParts(t *testing.T) { + numParts := uint16(16) + rt, _, _, _, _ := newTestReceivedTransfer(numParts, t) + + if rt.NumParts() != numParts { + t.Errorf("Incorrect number of parts.\nexpected: %d\nreceived: %d", + numParts, rt.NumParts()) + } +} + // Tests that ReceivedTransfer.NumReceived returns the correct number of // received parts. func TestReceivedTransfer_NumReceived(t *testing.T) { - rt, _, _, _ := newTestReceivedTransfer(16, t) + rt, _, _, _, _ := newTestReceivedTransfer(16, t) if rt.NumReceived() != 0 { t.Errorf("Incorrect number of received parts."+ @@ -238,7 +249,7 @@ func TestReceivedTransfer_NumReceived(t *testing.T) { // Tests that the state vector returned by ReceivedTransfer.CopyPartStatusVector // has the same values as the original but is a copy. func TestReceivedTransfer_CopyPartStatusVector(t *testing.T) { - rt, _, _, _ := newTestReceivedTransfer(64, t) + rt, _, _, _, _ := newTestReceivedTransfer(64, t) // Check that the vectors have the same unused parts partStatus := rt.CopyPartStatusVector() @@ -268,8 +279,8 @@ func TestReceivedTransfer_CopyPartStatusVector(t *testing.T) { // Tests that a ReceivedTransfer loaded via loadReceivedTransfer matches the // original. func Test_loadReceivedTransfer(t *testing.T) { - parts := generateTestParts(16) - rt, _, _, kv := newTestReceivedTransfer(uint16(len(parts)), t) + parts, _ := generateTestParts(16) + rt, _, _, _, kv := newTestReceivedTransfer(uint16(len(parts)), t) for i, p := range parts { if i%2 == 0 { @@ -295,7 +306,7 @@ func Test_loadReceivedTransfer(t *testing.T) { // Tests that ReceivedTransfer.Delete deletes the storage backend of the // ReceivedTransfer and that it cannot be loaded again. func TestReceivedTransfer_Delete(t *testing.T) { - rt, _, _, kv := newTestReceivedTransfer(64, t) + rt, _, _, _, kv := newTestReceivedTransfer(64, t) err := rt.Delete() if err != nil { @@ -311,7 +322,7 @@ func TestReceivedTransfer_Delete(t *testing.T) { // Tests that the fields saved by ReceivedTransfer.save can be loaded from // storage. func TestReceivedTransfer_save(t *testing.T) { - rt, _, _, _ := newTestReceivedTransfer(64, t) + rt, _, _, _, _ := newTestReceivedTransfer(64, t) err := rt.save() if err != nil { @@ -326,23 +337,24 @@ func TestReceivedTransfer_save(t *testing.T) { // newTestReceivedTransfer creates a new ReceivedTransfer for testing. func newTestReceivedTransfer(numParts uint16, t *testing.T) ( - *ReceivedTransfer, *ftCrypto.TransferKey, uint16, *versioned.KV) { - kv := versioned.NewKV(ekv.MakeMemstore()) - key, _ := ftCrypto.NewTransferKey(csprng.NewSystemRNG()) + rt *ReceivedTransfer, file []byte, key *ftCrypto.TransferKey, + numFps uint16, kv *versioned.KV) { + kv = versioned.NewKV(ekv.MakeMemstore()) + keyTmp, _ := ftCrypto.NewTransferKey(csprng.NewSystemRNG()) tid, _ := ftCrypto.NewTransferID(csprng.NewSystemRNG()) transferMAC := []byte("I am a transfer MAC") - numFps := 2 * numParts + numFps = 2 * numParts fileName := "helloFile" - parts := generateTestParts(numParts) - fileSize := uint32(len(parts) * len(parts[0])) + _, file = generateTestParts(numParts) + fileSize := uint32(len(file)) st, err := newReceivedTransfer( - &key, &tid, fileName, transferMAC, numParts, numFps, fileSize, kv) + &keyTmp, &tid, fileName, transferMAC, fileSize, numParts, numFps, kv) if err != nil { t.Errorf("Failed to make new SentTransfer: %+v", err) } - return st, &key, numFps, kv + return st, file, &keyTmp, numFps, kv } // Tests that a ReceivedTransfer marshalled via ReceivedTransfer.marshal and @@ -351,8 +363,8 @@ func TestReceivedTransfer_marshal_unmarshalReceivedTransfer(t *testing.T) { rt := &ReceivedTransfer{ fileName: "transferName", transferMAC: []byte("I am a transfer MAC"), - numParts: 153, fileSize: 735, + numParts: 153, } data, err := rt.marshal() diff --git a/fileTransfer2/store/received_test.go b/fileTransfer2/store/received_test.go index 6ce990a2523b87c9726e082f0314d4f4aad77823..be7652ec9abb9784a667d6b6e7ba715d76fc1135 100644 --- a/fileTransfer2/store/received_test.go +++ b/fileTransfer2/store/received_test.go @@ -67,7 +67,7 @@ func TestNewOrLoadReceived_Load(t *testing.T) { key, _ := ftCrypto.NewTransferKey(csprng.NewSystemRNG()) tid, _ := ftCrypto.NewTransferID(csprng.NewSystemRNG()) rt, err2 := r.AddTransfer(&key, &tid, "file"+strconv.Itoa(i), - []byte("transferMAC"+strconv.Itoa(i)), 10, 20, 128) + []byte("transferMAC"+strconv.Itoa(i)), 128, 10, 20) if err2 != nil { t.Errorf("Failed to add transfer #%d: %+v", i, err2) } @@ -115,7 +115,7 @@ func TestReceived_AddTransfer(t *testing.T) { tid, _ := ftCrypto.NewTransferID(csprng.NewSystemRNG()) rt, err := r.AddTransfer( - &key, &tid, "file", []byte("transferMAC"), 10, 20, 128) + &key, &tid, "file", []byte("transferMAC"), 128, 10, 20) if err != nil { t.Errorf("Failed to add new transfer: %+v", err) } @@ -151,7 +151,7 @@ func TestReceived_GetTransfer(t *testing.T) { tid, _ := ftCrypto.NewTransferID(csprng.NewSystemRNG()) rt, err := r.AddTransfer( - &key, &tid, "file", []byte("transferMAC"), 10, 20, 128) + &key, &tid, "file", []byte("transferMAC"), 128, 10, 20) if err != nil { t.Errorf("Failed to add new transfer: %+v", err) } @@ -177,7 +177,7 @@ func TestReceived_RemoveTransfer(t *testing.T) { tid, _ := ftCrypto.NewTransferID(csprng.NewSystemRNG()) rt, err := r.AddTransfer( - &key, &tid, "file", []byte("transferMAC"), 10, 20, 128) + &key, &tid, "file", []byte("transferMAC"), 128, 10, 20) if err != nil { t.Errorf("Failed to add new transfer: %+v", err) } diff --git a/fileTransfer2/store/sent.go b/fileTransfer2/store/sent.go index 9ca7f482e53540adff504c932ad39219225e39fa..7873d5b469bafc8666b0951d55d7852aff0c1ae6 100644 --- a/fileTransfer2/store/sent.go +++ b/fileTransfer2/store/sent.go @@ -102,8 +102,8 @@ func NewOrLoadSent(kv *versioned.KV) (*Sent, []Part, error) { // AddTransfer creates a SentTransfer and adds it to the map keyed on its // transfer ID. func (s *Sent) AddTransfer(recipient *id.ID, key *ftCrypto.TransferKey, - tid *ftCrypto.TransferID, fileName string, parts [][]byte, numFps uint16) ( - *SentTransfer, error) { + tid *ftCrypto.TransferID, fileName string, fileSize uint32, parts [][]byte, + numFps uint16) (*SentTransfer, error) { s.mux.Lock() defer s.mux.Unlock() @@ -112,7 +112,8 @@ func (s *Sent) AddTransfer(recipient *id.ID, key *ftCrypto.TransferKey, return nil, errors.Errorf(errAddExistingSentTransfer, tid) } - st, err := newSentTransfer(recipient, key, tid, fileName, parts, numFps, s.kv) + st, err := newSentTransfer( + recipient, key, tid, fileName, fileSize, parts, numFps, s.kv) if err != nil { return nil, errors.Errorf(errNewSentTransfer, tid) } diff --git a/fileTransfer2/store/sentTransfer.go b/fileTransfer2/store/sentTransfer.go index c421c86d4d7c68fca4128777fdfc4f677d022756..9891087a1d07193eabb6da92749de0c448d8a68f 100644 --- a/fileTransfer2/store/sentTransfer.go +++ b/fileTransfer2/store/sentTransfer.go @@ -75,6 +75,9 @@ type SentTransfer struct { // ID of the recipient of the file transfer recipient *id.ID + // The size of the entire file + fileSize uint32 + // The number of file parts in the file numParts uint16 @@ -94,8 +97,8 @@ type SentTransfer struct { // newSentTransfer generates a new SentTransfer with the specified transfer key, // transfer ID, and parts. func newSentTransfer(recipient *id.ID, key *ftCrypto.TransferKey, - tid *ftCrypto.TransferID, fileName string, parts [][]byte, numFps uint16, - kv *versioned.KV) (*SentTransfer, error) { + tid *ftCrypto.TransferID, fileName string, fileSize uint32, parts [][]byte, + numFps uint16, kv *versioned.KV) (*SentTransfer, error) { kv = kv.Prefix(makeSentTransferPrefix(tid)) // Create new cypher manager @@ -116,6 +119,7 @@ func newSentTransfer(recipient *id.ID, key *ftCrypto.TransferKey, tid: tid, fileName: fileName, recipient: recipient, + fileSize: fileSize, numParts: uint16(len(parts)), status: Running, parts: parts, @@ -180,11 +184,6 @@ func (st *SentTransfer) Status() TransferStatus { return st.status } -// NumParts returns the total number of file parts in the transfer. -func (st *SentTransfer) NumParts() uint16 { - return st.numParts -} - // TransferID returns the transfer's ID. func (st *SentTransfer) TransferID() *ftCrypto.TransferID { return st.tid @@ -200,6 +199,16 @@ func (st *SentTransfer) Recipient() *id.ID { return st.recipient } +// FileSize returns the size of the entire file transfer. +func (st *SentTransfer) FileSize() uint32 { + return st.fileSize +} + +// NumParts returns the total number of file parts in the transfer. +func (st *SentTransfer) NumParts() uint16 { + return st.numParts +} + // NumArrived returns the number of parts that have arrived. func (st *SentTransfer) NumArrived() uint16 { return uint16(st.partStatus.GetNumUsed()) @@ -250,6 +259,7 @@ func loadSentTransfer(tid *ftCrypto.TransferID, kv *versioned.KV) ( tid: tid, fileName: fileName, recipient: recipient, + fileSize: calcFileSize(parts), numParts: uint16(len(parts)), status: status, parts: parts, @@ -260,6 +270,14 @@ func loadSentTransfer(tid *ftCrypto.TransferID, kv *versioned.KV) ( return st, nil } +// calcFileSize calculates the size of the entire file from a list of parts. All +// parts, except the last, are assumed to have the same length. +func calcFileSize(parts [][]byte) uint32 { + lastPartSize := len(parts[len(parts)-1]) + otherPartsSize := len(parts[0]) * (len(parts) - 1) + return uint32(lastPartSize + otherPartsSize) +} + // Delete deletes all data in the SentTransfer from storage. func (st *SentTransfer) Delete() error { st.mux.Lock() diff --git a/fileTransfer2/store/sentTransfer_test.go b/fileTransfer2/store/sentTransfer_test.go index 254ba213a122bbf8b047494432dca0e13380be6e..96d881c1f1f6ca15545be5f36c21779f1d730970 100644 --- a/fileTransfer2/store/sentTransfer_test.go +++ b/fileTransfer2/store/sentTransfer_test.go @@ -56,6 +56,7 @@ func Test_newSentTransfer(t *testing.T) { tid: &tid, fileName: "file", recipient: id.NewIdFromString("user", id.User, t), + fileSize: calcFileSize(parts), numParts: uint16(len(parts)), status: Running, parts: parts, @@ -63,8 +64,8 @@ func Test_newSentTransfer(t *testing.T) { kv: stKv, } - st, err := newSentTransfer( - expected.recipient, &key, &tid, expected.fileName, parts, numFps, kv) + st, err := newSentTransfer(expected.recipient, &key, &tid, + expected.fileName, expected.fileSize, parts, numFps, kv) if err != nil { t.Errorf("newSentTransfer returned an error: %+v", err) } @@ -235,17 +236,6 @@ func TestSentTransfer_Status(t *testing.T) { } } -// Tests that SentTransfer.NumParts returns the correct number of parts. -func TestSentTransfer_NumParts(t *testing.T) { - numParts := uint16(16) - st, _, _, _, _ := newTestSentTransfer(numParts, t) - - if st.NumParts() != numParts { - t.Errorf("Incorrect number of parts.\nexpected: %d\nreceived: %d", - numParts, st.NumParts()) - } -} - // Tests that SentTransfer.TransferID returns the correct transfer ID. func TestSentTransfer_TransferID(t *testing.T) { st, _, _, _, _ := newTestSentTransfer(16, t) @@ -276,6 +266,28 @@ func TestSentTransfer_Recipient(t *testing.T) { } } +// Tests that SentTransfer.FileSize returns the correct file size. +func TestSentTransfer_FileSize(t *testing.T) { + st, parts, _, _, _ := newTestSentTransfer(16, t) + fileSize := calcFileSize(parts) + + if st.FileSize() != fileSize { + t.Errorf("Incorrect file size.\nexpected: %d\nreceived: %d", + fileSize, st.FileSize()) + } +} + +// Tests that SentTransfer.NumParts returns the correct number of parts. +func TestSentTransfer_NumParts(t *testing.T) { + numParts := uint16(16) + st, _, _, _, _ := newTestSentTransfer(numParts, t) + + if st.NumParts() != numParts { + t.Errorf("Incorrect number of parts.\nexpected: %d\nreceived: %d", + numParts, st.NumParts()) + } +} + // Tests that SentTransfer.NumArrived returns the correct number of arrived // parts. func TestSentTransfer_NumArrived(t *testing.T) { @@ -444,37 +456,41 @@ func Test_makeSentTransferPrefix_Consistency(t *testing.T) { const numPrimeBytes = 512 // newTestSentTransfer creates a new SentTransfer for testing. -func newTestSentTransfer(numParts uint16, t *testing.T) ( - *SentTransfer, [][]byte, *ftCrypto.TransferKey, uint16, *versioned.KV) { - kv := versioned.NewKV(ekv.MakeMemstore()) +func newTestSentTransfer(numParts uint16, t *testing.T) (st *SentTransfer, + parts [][]byte, key *ftCrypto.TransferKey, numFps uint16, kv *versioned.KV) { + kv = versioned.NewKV(ekv.MakeMemstore()) recipient := id.NewIdFromString("recipient", id.User, t) - key, _ := ftCrypto.NewTransferKey(csprng.NewSystemRNG()) + keyTmp, _ := ftCrypto.NewTransferKey(csprng.NewSystemRNG()) tid, _ := ftCrypto.NewTransferID(csprng.NewSystemRNG()) - numFps := 2 * numParts + numFps = 2 * numParts fileName := "helloFile" - parts := generateTestParts(numParts) + parts, file := generateTestParts(numParts) - st, err := newSentTransfer(recipient, &key, &tid, fileName, parts, numFps, kv) + st, err := newSentTransfer( + recipient, &keyTmp, &tid, fileName, uint32(len(file)), parts, numFps, kv) if err != nil { t.Errorf("Failed to make new SentTransfer: %+v", err) } - return st, parts, &key, numFps, kv + return st, parts, &keyTmp, numFps, kv } // generateTestParts generates a list of file parts of the correct size to be // encrypted/decrypted. -func generateTestParts(numParts uint16) [][]byte { +func generateTestParts(numParts uint16) (parts [][]byte, file []byte) { // Calculate part size partSize := fileMessage.NewPartMessage( format.NewMessage(numPrimeBytes).ContentsSize()).GetPartSize() // Create list of parts and fill - parts := make([][]byte, numParts) + parts = make([][]byte, numParts) + var buff bytes.Buffer + buff.Grow(int(numParts) * partSize) for i := range parts { parts[i] = make([]byte, partSize) copy(parts[i], "Hello "+strconv.Itoa(i)) + buff.Write(parts[i]) } - return parts + return parts, buff.Bytes() } diff --git a/fileTransfer2/store/sent_test.go b/fileTransfer2/store/sent_test.go index e07357f2fd97191cced221a3e44209864293c23f..e1b7556b6c62d59f17df86c6a8b74632094b7636 100644 --- a/fileTransfer2/store/sent_test.go +++ b/fileTransfer2/store/sent_test.go @@ -67,10 +67,11 @@ func TestNewOrLoadSent_Load(t *testing.T) { for i := 0; i < 10; i++ { key, _ := ftCrypto.NewTransferKey(csprng.NewSystemRNG()) tid, _ := ftCrypto.NewTransferID(csprng.NewSystemRNG()) + parts, file := generateTestParts(uint16(10 + i)) st, err2 := s.AddTransfer( id.NewIdFromString("recipient"+strconv.Itoa(i), id.User, t), - &key, &tid, "file"+strconv.Itoa(i), - generateTestParts(uint16(10+i)), uint16(2*(10+i))) + &key, &tid, "file"+strconv.Itoa(i), uint32(len(file)), parts, + uint16(2*(10+i))) if err2 != nil { t.Errorf("Failed to add transfer #%d: %+v", i, err2) } @@ -130,9 +131,10 @@ func TestSent_AddTransfer(t *testing.T) { key, _ := ftCrypto.NewTransferKey(csprng.NewSystemRNG()) tid, _ := ftCrypto.NewTransferID(csprng.NewSystemRNG()) + parts, file := generateTestParts(10) st, err := s.AddTransfer(id.NewIdFromString("recipient", id.User, t), - &key, &tid, "file", generateTestParts(10), 20) + &key, &tid, "file", uint32(len(file)), parts, 20) if err != nil { t.Errorf("Failed to add new transfer: %+v", err) } @@ -152,7 +154,7 @@ func TestSent_AddTransfer_TransferAlreadyExists(t *testing.T) { } expectedErr := fmt.Sprintf(errAddExistingSentTransfer, tid) - _, err := s.AddTransfer(nil, nil, &tid, "", nil, 0) + _, err := s.AddTransfer(nil, nil, &tid, "", 0, nil, 0) if err == nil || err.Error() != expectedErr { t.Errorf("Received unexpected error when adding transfer that already "+ "exists.\nexpected: %s\nreceived: %+v", expectedErr, err) @@ -166,9 +168,10 @@ func TestSent_GetTransfer(t *testing.T) { key, _ := ftCrypto.NewTransferKey(csprng.NewSystemRNG()) tid, _ := ftCrypto.NewTransferID(csprng.NewSystemRNG()) + parts, file := generateTestParts(10) st, err := s.AddTransfer(id.NewIdFromString("recipient", id.User, t), - &key, &tid, "file", generateTestParts(10), 20) + &key, &tid, "file", uint32(len(file)), parts, 20) if err != nil { t.Errorf("Failed to add new transfer: %+v", err) } @@ -192,9 +195,10 @@ func TestSent_RemoveTransfer(t *testing.T) { key, _ := ftCrypto.NewTransferKey(csprng.NewSystemRNG()) tid, _ := ftCrypto.NewTransferID(csprng.NewSystemRNG()) + parts, file := generateTestParts(10) st, err := s.AddTransfer(id.NewIdFromString("recipient", id.User, t), - &key, &tid, "file", generateTestParts(10), 20) + &key, &tid, "file", uint32(len(file)), parts, 20) if err != nil { t.Errorf("Failed to add new transfer: %+v", err) } diff --git a/fileTransfer2/utils_test.go b/fileTransfer2/utils_test.go index 8f1c448923884d8cec9aeff4f48f3abb061516d2..cead9800a1ca549807ebf0e45fe92a60f277e921 100644 --- a/fileTransfer2/utils_test.go +++ b/fileTransfer2/utils_test.go @@ -119,6 +119,7 @@ func (m *mockCmix) GetMaxMessageLength() int { func (m *mockCmix) SendMany(messages []cmix.TargetedCmixMessage, _ cmix.CMIXParams) (id.Round, []ephemeral.Id, error) { m.handler.Lock() + defer m.handler.Unlock() for _, targetedMsg := range messages { msg := format.NewMessage(m.numPrimeBytes) msg.SetContents(targetedMsg.Payload) @@ -128,21 +129,20 @@ func (m *mockCmix) SendMany(messages []cmix.TargetedCmixMessage, receptionID.EphemeralIdentity{Source: targetedMsg.Recipient}, rounds.Round{ID: 42}) } - m.handler.Unlock() return 42, []ephemeral.Id{}, nil } func (m *mockCmix) AddFingerprint(_ *id.ID, fp format.Fingerprint, mp message.Processor) error { - m.Lock() - defer m.Unlock() + m.handler.Lock() + defer m.handler.Unlock() m.handler.processorMap[fp] = mp return nil } func (m *mockCmix) DeleteFingerprint(_ *id.ID, fp format.Fingerprint) { m.handler.Lock() + defer m.handler.Unlock() delete(m.handler.processorMap, fp) - m.handler.Unlock() } func (m *mockCmix) IsHealthy() bool {