From 9b263796e4bc14ce30a38c125f84b24709ff8221 Mon Sep 17 00:00:00 2001 From: Jono Wenger <jono@elixxir.io> Date: Tue, 3 May 2022 15:41:53 -0700 Subject: [PATCH] Work in progress --- fileTransfer2/e2e/ftMessages.pb.go | 220 ----------------------- fileTransfer2/e2e/generateProto.sh | 10 -- fileTransfer2/e2e/listener.go | 10 +- fileTransfer2/e2e/manager.go | 74 ++++---- fileTransfer2/e2e/manager_test.go | 10 +- fileTransfer2/e2e/send.go | 6 +- fileTransfer2/ftMessages.pb.go | 142 +++++++++++++++ fileTransfer2/{e2e => }/ftMessages.proto | 2 +- fileTransfer2/generateProto.sh | 10 ++ fileTransfer2/groupChat/manager.go | 86 ++++----- fileTransfer2/groupChat/processor.go | 60 +++++++ fileTransfer2/groupChat/send.go | 68 +++++++ fileTransfer2/info.go | 36 +++- fileTransfer2/interface.go | 23 ++- fileTransfer2/manager.go | 32 ++-- fileTransfer2/manager_test.go | 33 ++-- fileTransfer2/params.go | 14 +- fileTransfer2/params_test.go | 5 +- groupChat/groupStore/store.go | 14 ++ groupChat/{group.go => interface.go} | 4 +- groupChat/manager.go | 81 ++++++--- groupChat/send.go | 34 ++-- groupChat/send_test.go | 39 ++-- groupChat/service.go | 79 ++++++-- groupChat/utils_test.go | 15 +- 25 files changed, 646 insertions(+), 461 deletions(-) delete mode 100644 fileTransfer2/e2e/ftMessages.pb.go delete mode 100644 fileTransfer2/e2e/generateProto.sh create mode 100644 fileTransfer2/ftMessages.pb.go rename fileTransfer2/{e2e => }/ftMessages.proto (96%) create mode 100644 fileTransfer2/generateProto.sh create mode 100644 fileTransfer2/groupChat/processor.go create mode 100644 fileTransfer2/groupChat/send.go rename groupChat/{group.go => interface.go} (97%) diff --git a/fileTransfer2/e2e/ftMessages.pb.go b/fileTransfer2/e2e/ftMessages.pb.go deleted file mode 100644 index 252cedb3e..000000000 --- a/fileTransfer2/e2e/ftMessages.pb.go +++ /dev/null @@ -1,220 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 xx network SEZC // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file // -//////////////////////////////////////////////////////////////////////////////// - -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.26.0 -// protoc v3.17.3 -// source: fileTransfer/ftMessages.proto - -package e2e - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -// NewFileTransfer is transmitted first on the initialization of a file transfer -// to inform the receiver about the incoming file. -type NewFileTransfer struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - FileName string `protobuf:"bytes,1,opt,name=fileName,proto3" json:"fileName,omitempty"` // Name of the file; max 48 characters - FileType string `protobuf:"bytes,2,opt,name=fileType,proto3" json:"fileType,omitempty"` // Tag of file; max 8 characters - TransferKey []byte `protobuf:"bytes,3,opt,name=transferKey,proto3" json:"transferKey,omitempty"` // 256 bit encryption key to identify the transfer - TransferMac []byte `protobuf:"bytes,4,opt,name=transferMac,proto3" json:"transferMac,omitempty"` // 256 bit MAC of the entire file - NumParts uint32 `protobuf:"varint,5,opt,name=numParts,proto3" json:"numParts,omitempty"` // Number of file parts - Size uint32 `protobuf:"varint,6,opt,name=size,proto3" json:"size,omitempty"` // The size of the file; max of 4 mB - Retry float32 `protobuf:"fixed32,7,opt,name=retry,proto3" json:"retry,omitempty"` // Used to determine how many times to retry sending - Preview []byte `protobuf:"bytes,8,opt,name=preview,proto3" json:"preview,omitempty"` // A preview of the file; max of 4 kB -} - -func (x *NewFileTransfer) Reset() { - *x = NewFileTransfer{} - if protoimpl.UnsafeEnabled { - mi := &file_fileTransfer_ftMessages_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *NewFileTransfer) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*NewFileTransfer) ProtoMessage() {} - -func (x *NewFileTransfer) ProtoReflect() protoreflect.Message { - mi := &file_fileTransfer_ftMessages_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use NewFileTransfer.ProtoReflect.Descriptor instead. -func (*NewFileTransfer) Descriptor() ([]byte, []int) { - return file_fileTransfer_ftMessages_proto_rawDescGZIP(), []int{0} -} - -func (x *NewFileTransfer) GetFileName() string { - if x != nil { - return x.FileName - } - return "" -} - -func (x *NewFileTransfer) GetFileType() string { - if x != nil { - return x.FileType - } - return "" -} - -func (x *NewFileTransfer) GetTransferKey() []byte { - if x != nil { - return x.TransferKey - } - return nil -} - -func (x *NewFileTransfer) GetTransferMac() []byte { - if x != nil { - return x.TransferMac - } - return nil -} - -func (x *NewFileTransfer) GetNumParts() uint32 { - if x != nil { - return x.NumParts - } - return 0 -} - -func (x *NewFileTransfer) GetSize() uint32 { - if x != nil { - return x.Size - } - return 0 -} - -func (x *NewFileTransfer) GetRetry() float32 { - if x != nil { - return x.Retry - } - return 0 -} - -func (x *NewFileTransfer) GetPreview() []byte { - if x != nil { - return x.Preview - } - return nil -} - -var File_fileTransfer_ftMessages_proto protoreflect.FileDescriptor - -var file_fileTransfer_ftMessages_proto_rawDesc = []byte{ - 0x0a, 0x1d, 0x66, 0x69, 0x6c, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x2f, 0x66, - 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x05, 0x70, 0x61, 0x72, 0x73, 0x65, 0x22, 0xed, 0x01, 0x0a, 0x0f, 0x4e, 0x65, 0x77, 0x46, 0x69, - 0x6c, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x66, 0x69, - 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x69, - 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x54, 0x79, - 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x4b, 0x65, - 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, - 0x72, 0x4b, 0x65, 0x79, 0x12, 0x20, 0x0a, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, - 0x4d, 0x61, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, - 0x66, 0x65, 0x72, 0x4d, 0x61, 0x63, 0x12, 0x1a, 0x0a, 0x08, 0x6e, 0x75, 0x6d, 0x50, 0x61, 0x72, - 0x74, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x6e, 0x75, 0x6d, 0x50, 0x61, 0x72, - 0x74, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, - 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x18, - 0x07, 0x20, 0x01, 0x28, 0x02, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12, 0x18, 0x0a, 0x07, - 0x70, 0x72, 0x65, 0x76, 0x69, 0x65, 0x77, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, - 0x72, 0x65, 0x76, 0x69, 0x65, 0x77, 0x42, 0x0f, 0x5a, 0x0d, 0x66, 0x69, 0x6c, 0x65, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x2f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_fileTransfer_ftMessages_proto_rawDescOnce sync.Once - file_fileTransfer_ftMessages_proto_rawDescData = file_fileTransfer_ftMessages_proto_rawDesc -) - -func file_fileTransfer_ftMessages_proto_rawDescGZIP() []byte { - file_fileTransfer_ftMessages_proto_rawDescOnce.Do(func() { - file_fileTransfer_ftMessages_proto_rawDescData = protoimpl.X.CompressGZIP(file_fileTransfer_ftMessages_proto_rawDescData) - }) - return file_fileTransfer_ftMessages_proto_rawDescData -} - -var file_fileTransfer_ftMessages_proto_msgTypes = make([]protoimpl.MessageInfo, 1) -var file_fileTransfer_ftMessages_proto_goTypes = []interface{}{ - (*NewFileTransfer)(nil), // 0: parse.NewFileTransfer -} -var file_fileTransfer_ftMessages_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_fileTransfer_ftMessages_proto_init() } -func file_fileTransfer_ftMessages_proto_init() { - if File_fileTransfer_ftMessages_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_fileTransfer_ftMessages_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*NewFileTransfer); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_fileTransfer_ftMessages_proto_rawDesc, - NumEnums: 0, - NumMessages: 1, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_fileTransfer_ftMessages_proto_goTypes, - DependencyIndexes: file_fileTransfer_ftMessages_proto_depIdxs, - MessageInfos: file_fileTransfer_ftMessages_proto_msgTypes, - }.Build() - File_fileTransfer_ftMessages_proto = out.File - file_fileTransfer_ftMessages_proto_rawDesc = nil - file_fileTransfer_ftMessages_proto_goTypes = nil - file_fileTransfer_ftMessages_proto_depIdxs = nil -} diff --git a/fileTransfer2/e2e/generateProto.sh b/fileTransfer2/e2e/generateProto.sh deleted file mode 100644 index 7dacae81b..000000000 --- a/fileTransfer2/e2e/generateProto.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash - -# -# Copyright © 2020 xx network SEZC /// -# /// -# Use of this source code is governed by a license that can be found in the /// -# LICENSE file /// -# - -protoc --go_out=paths=source_relative:. fileTransfer2/e2e/ftMessages.proto diff --git a/fileTransfer2/e2e/listener.go b/fileTransfer2/e2e/listener.go index a8e054640..33e8aefca 100644 --- a/fileTransfer2/e2e/listener.go +++ b/fileTransfer2/e2e/listener.go @@ -11,6 +11,7 @@ import ( "github.com/golang/protobuf/proto" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/e2e/receive" + "gitlab.com/elixxir/client/fileTransfer2" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" ) @@ -27,7 +28,7 @@ const listenerName = "NewFileTransferListener-E2E" // listener waits for a message indicating a new file transfer is starting. // Adheres to the receive.Listener interface. type listener struct { - m *manager + m *Manager } // Hear is called when a new file transfer is received. It creates a new @@ -35,7 +36,7 @@ type listener struct { // messages. func (l *listener) Hear(msg receive.Message) { // Unmarshal the request message - newFT := &NewFileTransfer{} + newFT := &fileTransfer2.NewFileTransfer{} err := proto.Unmarshal(msg.Payload, newFT) if err != nil { jww.ERROR.Printf(errProtoUnmarshal, err) @@ -45,8 +46,9 @@ func (l *listener) Hear(msg receive.Message) { transferKey := ftCrypto.UnmarshalTransferKey(newFT.GetTransferKey()) // Add new transfer to start receiving parts - tid, err := l.m.AddNew(newFT.FileName, &transferKey, newFT.TransferMac, - uint16(newFT.NumParts), newFT.Size, newFT.Retry, nil, 0) + tid, err := l.m.HandleIncomingTransfer(newFT.FileName, &transferKey, + newFT.TransferMac, uint16(newFT.NumParts), newFT.Size, newFT.Retry, + nil, 0) if err != nil { jww.ERROR.Printf(errNewReceivedTransfer, newFT.FileName, err) return diff --git a/fileTransfer2/e2e/manager.go b/fileTransfer2/e2e/manager.go index 2d5e4bcbb..d7955bcbd 100644 --- a/fileTransfer2/e2e/manager.go +++ b/fileTransfer2/e2e/manager.go @@ -28,13 +28,13 @@ const ( errNewFtManager = "cannot create new E2E file transfer manager: %+v" ) -// manager handles the sending and receiving of file transfers using E2E +// Manager handles the sending and receiving of file transfers using E2E // messages to inform the recipient of incoming file transfers. -type manager struct { +type Manager struct { // Callback that is called every time a new file transfer is received receiveCB ft.ReceiveCallback - // File transfer manager + // File transfer Manager ft ft.FileTransfer myID *id.ID @@ -42,7 +42,7 @@ type manager struct { e2e E2e } -// E2e interface matches a subset of the e2e.Handler methods used by the manager +// E2e interface matches a subset of the e2e.Handler methods used by the Manager // for easier testing. type E2e interface { SendE2E(mt catalog.MessageType, recipient *id.ID, payload []byte, @@ -54,23 +54,14 @@ type E2e interface { // NewManager generates a new file transfer manager using E2E. func NewManager(receiveCB ft.ReceiveCallback, params ft.Params, myID *id.ID, e2e E2e, cmix ft.Cmix, kv *versioned.KV, rng *fastRNG.StreamGenerator) ( - ft.FileTransfer, error) { + *Manager, error) { - sendNewCb := func(recipient *id.ID, info *ft.TransferInfo) error { - return sendNewFileTransferMessage(recipient, info, e2e) - } - - sendEndCb := func(recipient *id.ID) { - sendEndFileTransferMessage(recipient, cmix, e2e) - } - - ftManager, err := ft.NewManager( - sendNewCb, sendEndCb, params, myID, cmix, kv, rng) + ftManager, err := ft.NewManager(params, myID, cmix, kv, rng) if err != nil { return nil, errors.Errorf(errNewFtManager, err) } - return &manager{ + return &Manager{ receiveCB: receiveCB, ft: ftManager, myID: myID, @@ -79,60 +70,77 @@ func NewManager(receiveCB ft.ReceiveCallback, params ft.Params, myID *id.ID, }, nil } -func (m *manager) StartProcesses() (stoppable.Stoppable, error) { +func (m *Manager) StartProcesses() (stoppable.Stoppable, error) { // Register listener to receive new file transfers m.e2e.RegisterListener(m.myID, catalog.NewFileTransfer, &listener{m}) return m.ft.StartProcesses() } -func (m *manager) MaxFileNameLen() int { +func (m *Manager) MaxFileNameLen() int { return m.ft.MaxFileNameLen() } -func (m *manager) MaxFileTypeLen() int { +func (m *Manager) MaxFileTypeLen() int { return m.ft.MaxFileTypeLen() } -func (m *manager) MaxFileSize() int { +func (m *Manager) MaxFileSize() int { return m.ft.MaxFileSize() } -func (m *manager) MaxPreviewSize() int { +func (m *Manager) MaxPreviewSize() int { return m.ft.MaxPreviewSize() } -func (m *manager) Send(fileName, fileType string, fileData []byte, +func (m *Manager) Send(fileName, fileType string, fileData []byte, recipient *id.ID, retry float32, preview []byte, progressCB ft.SentProgressCallback, period time.Duration) ( *ftCrypto.TransferID, error) { + + sendNew := func(info *ft.TransferInfo) error { + return sendNewFileTransferMessage(recipient, info, m.e2e) + } + return m.ft.Send(fileName, fileType, fileData, recipient, retry, preview, - progressCB, period) + progressCB, period, sendNew) } -func (m *manager) RegisterSentProgressCallback(tid *ftCrypto.TransferID, +func (m *Manager) RegisterSentProgressCallback(tid *ftCrypto.TransferID, progressCB ft.SentProgressCallback, period time.Duration) error { + + progressCB2 := func(completed bool, arrived, total uint16, + t ft.FilePartTracker, err error) { + + // If the transfer is completed, send last message informing recipient + if completed { + sendEndFileTransferMessage(nil, m.cmix, m.e2e) + } + + progressCB(completed, arrived, total, t, err) + } + return m.ft.RegisterSentProgressCallback(tid, progressCB, period) } -func (m *manager) CloseSend(tid *ftCrypto.TransferID) error { +func (m *Manager) CloseSend(tid *ftCrypto.TransferID) error { return m.ft.CloseSend(tid) } -func (m *manager) AddNew(fileName string, key *ftCrypto.TransferKey, - transferMAC []byte, numParts uint16, size uint32, retry float32, - progressCB ft.ReceivedProgressCallback, period time.Duration) ( - *ftCrypto.TransferID, error) { +func (m *Manager) HandleIncomingTransfer(fileName string, + key *ftCrypto.TransferKey, transferMAC []byte, numParts uint16, size uint32, + retry float32, progressCB ft.ReceivedProgressCallback, + period time.Duration) (*ftCrypto.TransferID, error) { - return m.ft.AddNew(fileName, key, transferMAC, numParts, size, retry, - progressCB, period) + return m.ft.HandleIncomingTransfer( + fileName, key, transferMAC, numParts, size, retry, progressCB, period) } -func (m *manager) RegisterReceivedProgressCallback(tid *ftCrypto.TransferID, +func (m *Manager) RegisterReceivedProgressCallback(tid *ftCrypto.TransferID, progressCB ft.ReceivedProgressCallback, period time.Duration) error { return m.ft.RegisterReceivedProgressCallback(tid, progressCB, period) } -func (m *manager) Receive(tid *ftCrypto.TransferID) ([]byte, error) { +func (m *Manager) Receive(tid *ftCrypto.TransferID) ([]byte, error) { return m.ft.Receive(tid) } diff --git a/fileTransfer2/e2e/manager_test.go b/fileTransfer2/e2e/manager_test.go index 9fd92e532..bfe61b09f 100644 --- a/fileTransfer2/e2e/manager_test.go +++ b/fileTransfer2/e2e/manager_test.go @@ -26,8 +26,8 @@ import ( "time" ) -// Tests that manager adheres to the fileTransfer2.FileTransfer interface. -var _ ft.FileTransfer = (*manager)(nil) +// Tests that Manager adheres to the fileTransfer2.FileTransfer interface. +var _ ft.FileTransfer = (*Manager)(nil) // Tests that E2e adheres to the e2e.Handler interface. var _ E2e = (e2e.Handler)(nil) @@ -69,7 +69,7 @@ func Test_FileTransfer_Smoke(t *testing.T) { if err != nil { t.Errorf("Failed to create new file transfer manager 1: %+v", err) } - m1 := ftm1.(*manager) + m1 := ftm1.(*Manager) stop1, err := m1.StartProcesses() if err != nil { @@ -94,7 +94,7 @@ func Test_FileTransfer_Smoke(t *testing.T) { if err != nil { t.Errorf("Failed to create new file transfer manager 2: %+v", err) } - m2 := ftm2.(*manager) + m2 := ftm2.(*Manager) stop2, err := m2.StartProcesses() if err != nil { @@ -158,7 +158,7 @@ func Test_FileTransfer_Smoke(t *testing.T) { } } - // Send file. + // Send file sendStart := netTime.Now() tid1, err := m1.Send( fileName, fileType, fileData, myID2, retry, preview, sentProgressCb1, 0) diff --git a/fileTransfer2/e2e/send.go b/fileTransfer2/e2e/send.go index e3d8d799d..55720bb10 100644 --- a/fileTransfer2/e2e/send.go +++ b/fileTransfer2/e2e/send.go @@ -19,11 +19,11 @@ import ( // Error messages. const ( - // manager.sendNewFileTransferMessage + // sendNewFileTransferMessage errProtoMarshal = "failed to proto marshal NewFileTransfer: %+v" errNewFtSendE2e = "failed to send initial file transfer message via E2E: %+v" - // manager.sendEndFileTransferMessage + // sendEndFileTransferMessage errEndFtSendE2e = "[FT] Failed to send ending file transfer message via E2E: %+v" ) @@ -43,7 +43,7 @@ func sendNewFileTransferMessage( recipient *id.ID, info *ft.TransferInfo, e2eHandler E2e) error { // Construct NewFileTransfer message - protoMsg := &NewFileTransfer{ + protoMsg := &ft.NewFileTransfer{ FileName: info.FileName, FileType: info.FileType, TransferKey: info.Key.Bytes(), diff --git a/fileTransfer2/ftMessages.pb.go b/fileTransfer2/ftMessages.pb.go new file mode 100644 index 000000000..d4e473733 --- /dev/null +++ b/fileTransfer2/ftMessages.pb.go @@ -0,0 +1,142 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: fileTransfer2/ftMessages.proto + +package fileTransfer2 + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// NewFileTransfer is transmitted first on the initialization of a file transfer +// to inform the receiver about the incoming file. +type NewFileTransfer struct { + FileName string `protobuf:"bytes,1,opt,name=fileName,proto3" json:"fileName,omitempty"` + FileType string `protobuf:"bytes,2,opt,name=fileType,proto3" json:"fileType,omitempty"` + TransferKey []byte `protobuf:"bytes,3,opt,name=transferKey,proto3" json:"transferKey,omitempty"` + TransferMac []byte `protobuf:"bytes,4,opt,name=transferMac,proto3" json:"transferMac,omitempty"` + NumParts uint32 `protobuf:"varint,5,opt,name=numParts,proto3" json:"numParts,omitempty"` + Size uint32 `protobuf:"varint,6,opt,name=size,proto3" json:"size,omitempty"` + Retry float32 `protobuf:"fixed32,7,opt,name=retry,proto3" json:"retry,omitempty"` + Preview []byte `protobuf:"bytes,8,opt,name=preview,proto3" json:"preview,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *NewFileTransfer) Reset() { *m = NewFileTransfer{} } +func (m *NewFileTransfer) String() string { return proto.CompactTextString(m) } +func (*NewFileTransfer) ProtoMessage() {} +func (*NewFileTransfer) Descriptor() ([]byte, []int) { + return fileDescriptor_7ae154b96911f608, []int{0} +} + +func (m *NewFileTransfer) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_NewFileTransfer.Unmarshal(m, b) +} +func (m *NewFileTransfer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_NewFileTransfer.Marshal(b, m, deterministic) +} +func (m *NewFileTransfer) XXX_Merge(src proto.Message) { + xxx_messageInfo_NewFileTransfer.Merge(m, src) +} +func (m *NewFileTransfer) XXX_Size() int { + return xxx_messageInfo_NewFileTransfer.Size(m) +} +func (m *NewFileTransfer) XXX_DiscardUnknown() { + xxx_messageInfo_NewFileTransfer.DiscardUnknown(m) +} + +var xxx_messageInfo_NewFileTransfer proto.InternalMessageInfo + +func (m *NewFileTransfer) GetFileName() string { + if m != nil { + return m.FileName + } + return "" +} + +func (m *NewFileTransfer) GetFileType() string { + if m != nil { + return m.FileType + } + return "" +} + +func (m *NewFileTransfer) GetTransferKey() []byte { + if m != nil { + return m.TransferKey + } + return nil +} + +func (m *NewFileTransfer) GetTransferMac() []byte { + if m != nil { + return m.TransferMac + } + return nil +} + +func (m *NewFileTransfer) GetNumParts() uint32 { + if m != nil { + return m.NumParts + } + return 0 +} + +func (m *NewFileTransfer) GetSize() uint32 { + if m != nil { + return m.Size + } + return 0 +} + +func (m *NewFileTransfer) GetRetry() float32 { + if m != nil { + return m.Retry + } + return 0 +} + +func (m *NewFileTransfer) GetPreview() []byte { + if m != nil { + return m.Preview + } + return nil +} + +func init() { + proto.RegisterType((*NewFileTransfer)(nil), "parse.NewFileTransfer") +} + +func init() { proto.RegisterFile("fileTransfer2/ftMessages.proto", fileDescriptor_7ae154b96911f608) } + +var fileDescriptor_7ae154b96911f608 = []byte{ + // 215 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0xbf, 0x4a, 0xc6, 0x30, + 0x14, 0xc5, 0xc9, 0xe7, 0xd7, 0x3f, 0x46, 0x4b, 0x21, 0x38, 0x5c, 0x1c, 0x24, 0x38, 0x65, 0x52, + 0xd0, 0x37, 0x70, 0x70, 0x91, 0x16, 0x09, 0x9d, 0xdc, 0x62, 0xb9, 0x95, 0x82, 0x6d, 0xc3, 0x4d, + 0xb4, 0xd4, 0x77, 0xf6, 0x1d, 0xa4, 0xd1, 0xd6, 0x76, 0xcb, 0xef, 0x9c, 0x93, 0x73, 0xe1, 0xf0, + 0xab, 0xa6, 0x7d, 0xc7, 0x8a, 0x4c, 0xef, 0x1a, 0xa4, 0xbb, 0xdb, 0xc6, 0x17, 0xe8, 0x9c, 0x79, + 0x43, 0x77, 0x63, 0x69, 0xf0, 0x83, 0x88, 0xac, 0x21, 0x87, 0xd7, 0xdf, 0x8c, 0xe7, 0x25, 0x8e, + 0x8f, 0x9b, 0xb0, 0xb8, 0xe4, 0xe9, 0xfc, 0xb9, 0x34, 0x1d, 0x02, 0x93, 0x4c, 0x9d, 0xea, 0x95, + 0x17, 0xaf, 0x9a, 0x2c, 0xc2, 0xe1, 0xdf, 0x9b, 0x59, 0x48, 0x7e, 0xe6, 0xff, 0x3a, 0x9e, 0x70, + 0x82, 0x13, 0xc9, 0xd4, 0xb9, 0xde, 0x4a, 0xdb, 0x44, 0x61, 0x6a, 0x38, 0xee, 0x13, 0x85, 0xa9, + 0xe7, 0xfe, 0xfe, 0xa3, 0x7b, 0x36, 0xe4, 0x1d, 0x44, 0x92, 0xa9, 0x4c, 0xaf, 0x2c, 0x04, 0x3f, + 0xba, 0xf6, 0x0b, 0x21, 0x0e, 0x7a, 0x78, 0x8b, 0x0b, 0x1e, 0x11, 0x7a, 0x9a, 0x20, 0x91, 0x4c, + 0x1d, 0xf4, 0x2f, 0x08, 0xe0, 0x89, 0x25, 0xfc, 0x6c, 0x71, 0x84, 0x34, 0xdc, 0x58, 0xf0, 0x21, + 0x7f, 0xc9, 0x76, 0xc3, 0xbc, 0xc6, 0x61, 0x8e, 0xfb, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x99, + 0x46, 0x73, 0x93, 0x30, 0x01, 0x00, 0x00, +} diff --git a/fileTransfer2/e2e/ftMessages.proto b/fileTransfer2/ftMessages.proto similarity index 96% rename from fileTransfer2/e2e/ftMessages.proto rename to fileTransfer2/ftMessages.proto index ede709ac4..b846e5b1f 100644 --- a/fileTransfer2/e2e/ftMessages.proto +++ b/fileTransfer2/ftMessages.proto @@ -8,7 +8,7 @@ syntax = "proto3"; package parse; -option go_package = "fileTransfer2/e2e"; +option go_package = "fileTransfer2"; // NewFileTransfer is transmitted first on the initialization of a file transfer // to inform the receiver about the incoming file. diff --git a/fileTransfer2/generateProto.sh b/fileTransfer2/generateProto.sh new file mode 100644 index 000000000..7f435f2df --- /dev/null +++ b/fileTransfer2/generateProto.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +#/////////////////////////////////////////////////////////////////////////////// +#/ Copyright © 2020 xx network SEZC // +#/ // +#/ Use of this source code is governed by a license that can be found in the // +#/ LICENSE file // +#/////////////////////////////////////////////////////////////////////////////// + +protoc --go_out=paths=source_relative:. fileTransfer2/ftMessages.proto diff --git a/fileTransfer2/groupChat/manager.go b/fileTransfer2/groupChat/manager.go index b61314024..75a5e366b 100644 --- a/fileTransfer2/groupChat/manager.go +++ b/fileTransfer2/groupChat/manager.go @@ -23,6 +23,17 @@ import ( const ( // NewManager errNewFtManager = "cannot create new group chat file transfer manager: %+v" + + // manager.StartProcesses + errAddNewService = "failed to add service to receive new group file transfers: %+v" +) + +const ( + // Tag used when sending/receiving new group chat file transfers message + newFileTransferTag = "NewGroupFileTransfer" + + // Tag used when sending/receiving end group chat file transfers message + endFileTransferTag = "EndGroupFileTransfer" ) // manager handles the sending and receiving of file transfers for group chats. @@ -34,7 +45,7 @@ type manager struct { ft ft.FileTransfer // Group chat manager - gc groupChat.Manager + gc groupChat.GroupChat myID *id.ID cmix ft.Cmix @@ -42,19 +53,10 @@ type manager struct { // NewManager generates a new file transfer manager for group chat. func NewManager(receiveCB ft.ReceiveCallback, params ft.Params, myID *id.ID, - gc groupChat.Manager, cmix ft.Cmix, kv *versioned.KV, + gc groupChat.GroupChat, cmix ft.Cmix, kv *versioned.KV, rng *fastRNG.StreamGenerator) (ft.FileTransfer, error) { - sendNewCb := func(recipient *id.ID, info *ft.TransferInfo) error { - return nil - } - - sendEndCb := func(recipient *id.ID) { - - } - - ftManager, err := ft.NewManager( - sendNewCb, sendEndCb, params, myID, cmix, kv, rng) + ftManager, err := ft.NewManager(params, myID, cmix, kv, rng) if err != nil { return nil, errors.Errorf(errNewFtManager, err) } @@ -68,61 +70,61 @@ func NewManager(receiveCB ft.ReceiveCallback, params ft.Params, myID *id.ID, }, nil } -func (m manager) StartProcesses() (stoppable.Stoppable, error) { - // TODO implement me - panic("implement me") +func (m *manager) StartProcesses() (stoppable.Stoppable, error) { + err := m.gc.AddService(newFileTransferTag, &processor{m}) + if err != nil { + return nil, errors.Errorf(errAddNewService, err) + } + + return m.StartProcesses() } -func (m manager) MaxFileNameLen() int { +func (m *manager) MaxFileNameLen() int { return m.MaxFileNameLen() } -func (m manager) MaxFileTypeLen() int { +func (m *manager) MaxFileTypeLen() int { return m.MaxFileTypeLen() } -func (m manager) MaxFileSize() int { +func (m *manager) MaxFileSize() int { return m.MaxFileSize() } -func (m manager) MaxPreviewSize() int { +func (m *manager) MaxPreviewSize() int { return m.MaxPreviewSize() } -func (m manager) Send(fileName, fileType string, fileData []byte, +func (m *manager) Send(fileName, fileType string, fileData []byte, recipient *id.ID, retry float32, preview []byte, - progressCB ft.SentProgressCallback, period time.Duration) ( - *ftCrypto.TransferID, error) { - // TODO implement me - panic("implement me") + progressCB ft.SentProgressCallback, period time.Duration, + sendNew ft.SendNew) (*ftCrypto.TransferID, error) { + return m.ft.Send(fileName, fileType, fileData, recipient, retry, preview, + progressCB, period, sendNew) } -func (m manager) RegisterSentProgressCallback(tid *ftCrypto.TransferID, +func (m *manager) RegisterSentProgressCallback(tid *ftCrypto.TransferID, progressCB ft.SentProgressCallback, period time.Duration) error { - // TODO implement me - panic("implement me") + return m.ft.RegisterSentProgressCallback(tid, progressCB, period) } -func (m manager) CloseSend(tid *ftCrypto.TransferID) error { - // TODO implement me - panic("implement me") +func (m *manager) CloseSend(tid *ftCrypto.TransferID) error { + return m.CloseSend(tid) } -func (m manager) AddNew(fileName string, key *ftCrypto.TransferKey, - transferMAC []byte, numParts uint16, size uint32, retry float32, - progressCB ft.ReceivedProgressCallback, period time.Duration) ( - *ftCrypto.TransferID, error) { - // TODO implement me - panic("implement me") +func (m *manager) HandleIncomingTransfer(fileName string, + key *ftCrypto.TransferKey, transferMAC []byte, numParts uint16, size uint32, + retry float32, progressCB ft.ReceivedProgressCallback, + period time.Duration) (*ftCrypto.TransferID, error) { + return m.ft.HandleIncomingTransfer( + fileName, key, transferMAC, numParts, size, retry, progressCB, period) } -func (m manager) RegisterReceivedProgressCallback(tid *ftCrypto.TransferID, +func (m *manager) RegisterReceivedProgressCallback(tid *ftCrypto.TransferID, progressCB ft.ReceivedProgressCallback, period time.Duration) error { - // TODO implement me - panic("implement me") + return m.ft.RegisterReceivedProgressCallback(tid, progressCB, period) } -func (m manager) Receive(tid *ftCrypto.TransferID) ([]byte, error) { - // TODO implement me - panic("implement me") +func (m *manager) Receive(tid *ftCrypto.TransferID) ([]byte, error) { + return m.Receive(tid) } diff --git a/fileTransfer2/groupChat/processor.go b/fileTransfer2/groupChat/processor.go new file mode 100644 index 000000000..33ea4d0e0 --- /dev/null +++ b/fileTransfer2/groupChat/processor.go @@ -0,0 +1,60 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package groupChat + +import ( + "github.com/golang/protobuf/proto" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/rounds" + ft "gitlab.com/elixxir/client/fileTransfer2" + "gitlab.com/elixxir/client/groupChat" + ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" + "gitlab.com/elixxir/primitives/format" +) + +// Error messages. +const ( + // processor.Process + errProtoUnmarshal = "[FT] Failed to proto unmarshal new file transfer request: %+v" + errNewReceivedTransfer = "[FT] Failed to add new received transfer for %q: %+v" +) + +type processor struct { + *manager +} + +func (p *processor) Process(decryptedMsg groupChat.MessageReceive, + _ format.Message, _ receptionID.EphemeralIdentity, _ rounds.Round) { + // Unmarshal the request message + var newFT ft.NewFileTransfer + err := proto.Unmarshal(decryptedMsg.Payload, &newFT) + if err != nil { + jww.ERROR.Printf(errProtoUnmarshal, err) + return + } + + transferKey := ftCrypto.UnmarshalTransferKey(newFT.GetTransferKey()) + + // Add new transfer to start receiving parts + tid, err := p.HandleIncomingTransfer(newFT.FileName, &transferKey, + newFT.TransferMac, uint16(newFT.NumParts), newFT.Size, newFT.Retry, + nil, 0) + if err != nil { + jww.ERROR.Printf(errNewReceivedTransfer, newFT.FileName, err) + return + } + + // Call the reception callback + go p.receiveCB(tid, newFT.FileName, newFT.FileType, decryptedMsg.SenderID, + newFT.Size, newFT.Preview) +} + +func (p *processor) String() string { + return "GroupFileTransfer" +} diff --git a/fileTransfer2/groupChat/send.go b/fileTransfer2/groupChat/send.go new file mode 100644 index 000000000..a15fa68bb --- /dev/null +++ b/fileTransfer2/groupChat/send.go @@ -0,0 +1,68 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package groupChat + +import ( + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + ft "gitlab.com/elixxir/client/fileTransfer2" + "gitlab.com/elixxir/client/groupChat" + "gitlab.com/xx_network/primitives/id" +) + +// Error messages. +const ( + // sendNewFileTransferMessage + errProtoMarshal = "failed to proto marshal NewFileTransfer: %+v" + errNewFtSendGroupChat = "failed to send initial file transfer message via group chat: %+v" + + // sendEndFileTransferMessage + errEndFtSendGroupChat = "[FT] Failed to send ending file transfer message via group chat: %+v" +) + +// sendNewFileTransferMessage sends a group chat message to the group ID +// informing them of the incoming file transfer. +func sendNewFileTransferMessage( + groupID *id.ID, info *ft.TransferInfo, gc groupChat.GroupChat) error { + + // Construct NewFileTransfer message + protoMsg := &ft.NewFileTransfer{ + FileName: info.FileName, + FileType: info.FileType, + TransferKey: info.Key.Bytes(), + TransferMac: info.Mac, + NumParts: uint32(info.NumParts), + Size: info.Size, + Retry: info.Retry, + Preview: info.Preview, + } + + // Marshal the message + payload, err := proto.Marshal(protoMsg) + if err != nil { + return errors.Errorf(errProtoMarshal, err) + } + + // Send the message via group chat + _, _, _, err = gc.Send(groupID, newFileTransferTag, payload) + if err != nil { + return errors.Errorf(errNewFtSendGroupChat, err) + } + + return nil +} + +// sendEndFileTransferMessage sends a group chat message to the group ID +// informing them that all file parts have arrived once the network is healthy. +func sendEndFileTransferMessage(groupID *id.ID, gc groupChat.GroupChat) { + _, _, _, err := gc.Send(groupID, endFileTransferTag, nil) + if err != nil { + jww.ERROR.Printf(errEndFtSendGroupChat, err) + } +} diff --git a/fileTransfer2/info.go b/fileTransfer2/info.go index 66bd12e63..8a5da9672 100644 --- a/fileTransfer2/info.go +++ b/fileTransfer2/info.go @@ -8,7 +8,7 @@ package fileTransfer2 import ( - "encoding/json" + "github.com/golang/protobuf/proto" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" ) @@ -24,10 +24,38 @@ type TransferInfo struct { } func (ti *TransferInfo) Marshal() ([]byte, error) { - return json.Marshal(ti) + // Construct NewFileTransfer message + protoMsg := &NewFileTransfer{ + FileName: ti.FileName, + FileType: ti.FileType, + TransferKey: ti.Key.Bytes(), + TransferMac: ti.Mac, + NumParts: uint32(ti.NumParts), + Size: ti.Size, + Retry: ti.Retry, + Preview: ti.Preview, + } + + return proto.Marshal(protoMsg) } func UnmarshalTransferInfo(data []byte) (*TransferInfo, error) { - var ti TransferInfo - return &ti, json.Unmarshal(data, &ti) + // Unmarshal the request message + var newFT NewFileTransfer + err := proto.Unmarshal(data, &newFT) + if err != nil { + return nil, err + } + transferKey := ftCrypto.UnmarshalTransferKey(newFT.GetTransferKey()) + + return &TransferInfo{ + FileName: newFT.FileName, + FileType: newFT.FileType, + Key: transferKey, + Mac: newFT.TransferMac, + NumParts: uint16(newFT.NumParts), + Size: newFT.Size, + Retry: newFT.Retry, + Preview: newFT.Preview, + }, nil } diff --git a/fileTransfer2/interface.go b/fileTransfer2/interface.go index 45344e8d2..9c2100c4a 100644 --- a/fileTransfer2/interface.go +++ b/fileTransfer2/interface.go @@ -30,15 +30,10 @@ type ReceivedProgressCallback func(completed bool, received, total uint16, type ReceiveCallback func(tid *ftCrypto.TransferID, fileName, fileType string, sender *id.ID, size uint32, preview []byte) -// SendNew handles the sending of the initial message to the recipient informing -// them of the incoming file transfer parts. SendNew should block until the send +// SendNew handles the sending of the initial message informing the recipient +// of the incoming file transfer parts. SendNew should block until the send // completes and return an error only on failed sends. -type SendNew func(recipient *id.ID, info *TransferInfo) error - -// SendEnd handles the sending of the last message to the recipient informing -// them that the file transfer has completed. The function will be run in its -// own thread. -type SendEnd func(recipient *id.ID) +type SendNew func(info *TransferInfo) error // FileTransfer facilities the sending and receiving of large file transfers. // It allows for progress tracking of both inbound and outbound transfers. @@ -103,9 +98,11 @@ type FileTransfer interface { // update (or less if restricted by the period), or on fatal error. // period - A progress callback will be limited from triggering only once // per period. + // sendNew - Function that sends the file transfer information to the + // recipient. Send(fileName, fileType string, fileData []byte, recipient *id.ID, retry float32, preview []byte, progressCB SentProgressCallback, - period time.Duration) (*ftCrypto.TransferID, error) + period time.Duration, sendNew SendNew) (*ftCrypto.TransferID, error) // RegisterSentProgressCallback allows for the registration of a callback to // track the progress of an individual sent file transfer. @@ -146,9 +143,9 @@ type FileTransfer interface { full file can be retrieved using Receive. */ - // AddNew starts tracking the received file parts for the given file - // information and returns a transfer ID that uniquely identifies this file - // transfer. + // HandleIncomingTransfer starts tracking the received file parts for the + // given file information and returns a transfer ID that uniquely identifies + // this file transfer. // // This function should be called once for every new file received on the // registered SendNew callback. @@ -161,7 +158,7 @@ type FileTransfer interface { // update (or less if restricted by the period), or on fatal error. // period - A progress callback will be limited from triggering only once // per period. - AddNew(fileName string, key *ftCrypto.TransferKey, transferMAC []byte, + HandleIncomingTransfer(fileName string, key *ftCrypto.TransferKey, transferMAC []byte, numParts uint16, size uint32, retry float32, progressCB ReceivedProgressCallback, period time.Duration) ( *ftCrypto.TransferID, error) diff --git a/fileTransfer2/manager.go b/fileTransfer2/manager.go index 763b9a87d..90d47bca1 100644 --- a/fileTransfer2/manager.go +++ b/fileTransfer2/manager.go @@ -88,7 +88,7 @@ const ( errDeleteSentTransfer = "could not delete sent transfer %s: %+v" errRemoveSentTransfer = "could not remove transfer %s from list: %+v" - // manager.AddNew + // manager.HandleIncomingTransfer errNewRtTransferID = "failed to generate transfer ID for new received file transfer %q: %+v" errAddNewRt = "failed to add new file transfer %s (%q): %+v" @@ -116,12 +116,6 @@ type manager struct { // Queue of batches of parts to send sendQueue chan []store.Part - // Function to call to send new file transfer information to recipient - sendNewCb SendNew - - // Function to call to send notification that file transfer has completed - sendEndCb SendEnd - // File transfer parameters params Params @@ -150,7 +144,7 @@ type Cmix interface { // NewManager creates a new file transfer manager object. If sent or received // transfers already existed, they are loaded from storage and queued to resume // once manager.startProcesses is called. -func NewManager(sendNewCb SendNew, sendEndCb SendEnd, params Params, +func NewManager(params Params, myID *id.ID, cmix Cmix, kv *versioned.KV, rng *fastRNG.StreamGenerator) (FileTransfer, error) { @@ -173,8 +167,6 @@ func NewManager(sendNewCb SendNew, sendEndCb SendEnd, params Params, callbacks: callbackTracker.NewManager(), batchQueue: make(chan store.Part, batchQueueBuffLen), sendQueue: make(chan []store.Part, sendQueueBuffLen), - sendNewCb: sendNewCb, - sendEndCb: sendEndCb, params: params, myID: myID, cmix: cmix, @@ -240,7 +232,7 @@ func (m *manager) MaxPreviewSize() int { // via cmix.SendMany. func (m *manager) Send(fileName, fileType string, fileData []byte, recipient *id.ID, retry float32, preview []byte, - progressCB SentProgressCallback, period time.Duration) ( + progressCB SentProgressCallback, period time.Duration, sendNew SendNew) ( *ftCrypto.TransferID, error) { // Return an error if the file name is too long @@ -294,7 +286,7 @@ func (m *manager) Send(fileName, fileType string, fileData []byte, // Send the initial file transfer message over E2E info := &TransferInfo{ fileName, fileType, key, mac, numParts, fileSize, retry, preview} - err = m.sendNewCb(recipient, info) + err = sendNew(info) if err != nil { return nil, errors.Errorf(errSendNewMsg, err) } @@ -347,11 +339,6 @@ func (m *manager) registerSentProgressCallback(st *store.SentTransfer, arrived, total := st.NumArrived(), st.NumParts() completed := arrived == total - // If the transfer is completed, send last message informing recipient - if completed && m.params.NotifyUponCompletion { - go m.sendEndCb(st.Recipient()) - } - // Build part tracker from copy of part statuses vector tracker := &sentFilePartTracker{st.CopyPartStatusVector()} @@ -398,11 +385,12 @@ func (m *manager) CloseSend(tid *ftCrypto.TransferID) error { /* === Receiving ============================================================ */ -// AddNew starts tracking the received file parts for the given file information -// and returns a transfer ID that uniquely identifies this file transfer. -func (m *manager) AddNew(fileName string, key *ftCrypto.TransferKey, - transferMAC []byte, numParts uint16, size uint32, retry float32, - progressCB ReceivedProgressCallback, period time.Duration) ( +// HandleIncomingTransfer starts tracking the received file parts for the given +// file information and returns a transfer ID that uniquely identifies this file +// transfer. +func (m *manager) HandleIncomingTransfer(fileName string, + key *ftCrypto.TransferKey, transferMAC []byte, numParts uint16, size uint32, + retry float32, progressCB ReceivedProgressCallback, period time.Duration) ( *ftCrypto.TransferID, error) { // Generate new transfer ID diff --git a/fileTransfer2/manager_test.go b/fileTransfer2/manager_test.go index 58c89f0b6..8ae9574b0 100644 --- a/fileTransfer2/manager_test.go +++ b/fileTransfer2/manager_test.go @@ -87,15 +87,8 @@ func Test_FileTransfer_Smoke(t *testing.T) { // Set up the first client myID1 := id.NewIdFromString("myID1", id.User, t) kv1 := versioned.NewKV(ekv.MakeMemstore()) - sendNewCbChan1 := make(chan *TransferInfo) - sendNewCb1 := func(recipient *id.ID, info *TransferInfo) error { - sendNewCbChan1 <- info - return nil - } - sendEndCbChan1 := make(chan *id.ID) - sendEndCb1 := func(recipient *id.ID) { sendEndCbChan1 <- recipient } - ftm1, err := NewManager(sendNewCb1, sendEndCb1, params, myID1, - newMockCmix(myID1, cMixHandler), kv1, rngGen) + ftm1, err := NewManager( + params, myID1, newMockCmix(myID1, cMixHandler), kv1, rngGen) if err != nil { t.Errorf("Failed to create new file transfer manager 1: %+v", err) } @@ -109,8 +102,8 @@ func Test_FileTransfer_Smoke(t *testing.T) { // Set up the second client myID2 := id.NewIdFromString("myID2", id.User, t) kv2 := versioned.NewKV(ekv.MakeMemstore()) - ftm2, err := NewManager(nil, nil, params, myID2, - newMockCmix(myID2, cMixHandler), kv2, rngGen) + ftm2, err := NewManager( + params, myID2, newMockCmix(myID2, cMixHandler), kv2, rngGen) if err != nil { t.Errorf("Failed to create new file transfer manager 2: %+v", err) } @@ -121,6 +114,12 @@ func Test_FileTransfer_Smoke(t *testing.T) { t.Errorf("Failed to start processes for manager 2: %+v", err) } + sendNewCbChan1 := make(chan *TransferInfo) + sendNewCb1 := func(info *TransferInfo) error { + sendNewCbChan1 <- info + return nil + } + // Wait group prevents the test from quiting before the file has completed // sending and receiving var wg sync.WaitGroup @@ -140,7 +139,7 @@ func Test_FileTransfer_Smoke(t *testing.T) { go func() { select { case r := <-sendNewCbChan1: - tid, err := m2.AddNew( + tid, err := m2.HandleIncomingTransfer( r.FileName, &r.Key, r.Mac, r.NumParts, r.Size, r.Retry, nil, 0) if err != nil { t.Errorf("Failed to add transfer: %+v", err) @@ -185,8 +184,8 @@ func Test_FileTransfer_Smoke(t *testing.T) { // Send file. sendStart := netTime.Now() - tid1, err := m1.Send( - fileName, fileType, fileData, myID2, retry, preview, sentProgressCb1, 0) + tid1, err := m1.Send(fileName, fileType, fileData, myID2, retry, preview, + sentProgressCb1, 0, sendNewCb1) if err != nil { t.Errorf("Failed to send file: %+v", err) } @@ -206,12 +205,6 @@ func Test_FileTransfer_Smoke(t *testing.T) { // Wait for file to be sent and received wg.Wait() - select { - case <-sendEndCbChan1: - case <-time.After(15 * time.Millisecond): - t.Error("Timed out waiting for end callback to be called.") - } - err = m1.CloseSend(tid1) if err != nil { t.Errorf("Failed to close transfer: %+v", err) diff --git a/fileTransfer2/params.go b/fileTransfer2/params.go index 5b7292d0c..f573bcddb 100644 --- a/fileTransfer2/params.go +++ b/fileTransfer2/params.go @@ -10,9 +10,8 @@ package fileTransfer2 import "time" const ( - defaultMaxThroughput = 150_000 // 150 kB per second - defaultSendTimeout = 500 * time.Millisecond - defaultNotifyUponCompletion = true + defaultMaxThroughput = 150_000 // 150 kB per second + defaultSendTimeout = 500 * time.Millisecond ) // Params contains parameters used for file transfer. @@ -25,17 +24,12 @@ type Params struct { // times out. It is recommended that SendTimeout is not changed from its // default. SendTimeout time.Duration - - // NotifyUponCompletion indicates if a final notification message is sent - // to the recipient on completion of file transfer. If true, the ping is - NotifyUponCompletion bool } // DefaultParams returns a Params object filled with the default values. func DefaultParams() Params { return Params{ - MaxThroughput: defaultMaxThroughput, - SendTimeout: defaultSendTimeout, - NotifyUponCompletion: defaultNotifyUponCompletion, + MaxThroughput: defaultMaxThroughput, + SendTimeout: defaultSendTimeout, } } diff --git a/fileTransfer2/params_test.go b/fileTransfer2/params_test.go index 48d91b47c..9a304ec9f 100644 --- a/fileTransfer2/params_test.go +++ b/fileTransfer2/params_test.go @@ -15,9 +15,8 @@ import ( // Tests that DefaultParams returns a Params object with the expected defaults. func TestDefaultParams(t *testing.T) { expected := Params{ - MaxThroughput: defaultMaxThroughput, - SendTimeout: defaultSendTimeout, - NotifyUponCompletion: defaultNotifyUponCompletion, + MaxThroughput: defaultMaxThroughput, + SendTimeout: defaultSendTimeout, } received := DefaultParams() diff --git a/groupChat/groupStore/store.go b/groupChat/groupStore/store.go index f180249e7..a49f763ab 100644 --- a/groupChat/groupStore/store.go +++ b/groupChat/groupStore/store.go @@ -250,6 +250,20 @@ func (s *Store) GroupIDs() []*id.ID { return idList } +// Groups returns a list of all groups. +// TODO: add test +func (s *Store) Groups() []Group { + s.mux.RLock() + defer s.mux.RUnlock() + + groupList := make([]Group, 0, len(s.list)) + for _, g := range s.list { + groupList = append(groupList, g) + } + + return groupList +} + // Get returns the Group for the given group ID. Returns false if no Group is // found. func (s *Store) Get(groupID *id.ID) (Group, bool) { diff --git a/groupChat/group.go b/groupChat/interface.go similarity index 97% rename from groupChat/group.go rename to groupChat/interface.go index 80f98edc0..e9c71c51b 100644 --- a/groupChat/group.go +++ b/groupChat/interface.go @@ -70,10 +70,10 @@ type GroupChat interface { // AddService adds a service for all group chat partners of the given tag, // which will call back on the given processor. - AddService(g gs.Group, tag string, p Processor) + AddService(tag string, p Processor) error // RemoveService removes all services for the given tag. - RemoveService(g gs.Group, tag string, p Processor) + RemoveService(tag string) error } // RequestCallback is called when a GroupChat request is received. diff --git a/groupChat/manager.go b/groupChat/manager.go index 9b018013e..ec194a1c5 100644 --- a/groupChat/manager.go +++ b/groupChat/manager.go @@ -13,7 +13,9 @@ import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/message" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/e2e" "gitlab.com/elixxir/client/e2e/ratchet/partner" "gitlab.com/elixxir/client/e2e/ratchet/partner/session" @@ -24,18 +26,28 @@ import ( crypto "gitlab.com/elixxir/crypto/e2e" "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/group" + "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" + "sync" "time" ) // Error messages. const ( - newGroupStoreErr = "failed to create new group store: %+v" - joinGroupErr = "failed to join new group %s: %+v" - leaveGroupErr = "failed to leave group %s: %+v" + // NewManager + newGroupStoreErr = "failed to create new group store: %+v" + errAddDefaultService = "could not add default service: %+v" + + // manager.JoinGroup + joinGroupErr = "failed to join new group %s: %+v" + + // manager.LeaveGroup + leaveGroupErr = "failed to leave group %s: %+v" ) +const defaultServiceTag = "default" + // GroupCmix is a subset of the cmix.Client interface containing only the // methods needed by GroupChat type GroupCmix interface { @@ -66,16 +78,24 @@ type GroupE2e interface { // manager handles the list of groups a user is a part of. type manager struct { - e2e GroupE2e + // Group storage + gs *gs.Store - receptionId *id.ID - rng *fastRNG.StreamGenerator - grp *cyclic.Group - gs *gs.Store - services GroupCmix + // List of registered processors + services map[string]Processor + servicesMux sync.Mutex + // Callback that is called when a new group request is received requestFunc RequestCallback + + // Callback that is called when a new group message is received receiveFunc ReceiveCallback + + receptionId *id.ID + net GroupCmix + e2e GroupE2e + grp *cyclic.Group + rng *fastRNG.StreamGenerator } // NewManager creates a new group chat manager @@ -92,14 +112,15 @@ func NewManager(services GroupCmix, e2e GroupE2e, receptionId *id.ID, // Define the manager object m := &manager{ - e2e: e2e, - rng: rng, - receptionId: receptionId, - grp: grp, gs: gStore, - services: services, + services: make(map[string]Processor), requestFunc: requestFunc, receiveFunc: receiveFunc, + receptionId: receptionId, + net: services, + e2e: e2e, + grp: grp, + rng: rng, } // Register listener for incoming e2e group chat requests @@ -112,6 +133,11 @@ func NewManager(services GroupCmix, e2e GroupE2e, receptionId *id.ID, return nil, err } + err = m.AddService(defaultServiceTag, defaultService{m.receiveFunc}) + if err != nil { + return nil, errors.Errorf(errAddDefaultService, err) + } + // Register all groups for _, gId := range m.GetGroups() { g, exists := m.GetGroup(gId) @@ -120,7 +146,8 @@ func NewManager(services GroupCmix, e2e GroupE2e, receptionId *id.ID, continue } - m.AddService(g, "", nil) + // Add all services for this group + m.addAllServices(g) } return m, nil @@ -134,7 +161,9 @@ func (m *manager) JoinGroup(g gs.Group) error { return errors.Errorf(joinGroupErr, g.ID, err) } - m.AddService(g, "", nil) + // Add all services for this group + m.addAllServices(g) + jww.INFO.Printf("[GC] Joined group %q with ID %s.", g.Name, g.ID) return nil } @@ -145,11 +174,7 @@ func (m *manager) LeaveGroup(groupID *id.ID) error { return errors.Errorf(leaveGroupErr, groupID, err) } - delService := message.Service{ - Identifier: groupID.Bytes(), - Tag: catalog.Group, - } - m.services.DeleteService(m.receptionId, delService, nil) + m.deleteAllServices(groupID) jww.INFO.Printf("[GC] Left group with ID %s.", groupID) return nil @@ -172,3 +197,17 @@ func (m *manager) GetGroup(groupID *id.ID) (gs.Group, bool) { func (m *manager) NumGroups() int { return m.gs.Len() } + +type defaultService struct { + // Callback that is called when a new group message is received + receiveFunc ReceiveCallback +} + +func (d defaultService) Process(decryptedMsg MessageReceive, _ format.Message, + _ receptionID.EphemeralIdentity, _ rounds.Round) { + go d.receiveFunc(decryptedMsg) +} + +func (d defaultService) String() string { + return defaultServiceTag +} diff --git a/groupChat/send.go b/groupChat/send.go index 817eba9cc..350ac03d0 100644 --- a/groupChat/send.go +++ b/groupChat/send.go @@ -24,13 +24,21 @@ import ( // Error messages. const ( - newCmixMsgErr = "failed to generate cMix messages for group chat: %+v" - sendManyCmixErr = "failed to send group chat message from member %s to group %s: %+v" - messageLenErr = "message length %d is greater than maximum message space %d" - newNoGroupErr = "failed to create message for group %s that cannot be found" - newKeyErr = "failed to generate key for encrypting group payload" + + // manager.Send + newNoGroupErr = "no group found with ID %s" + newCmixMsgErr = "failed to generate cMix messages for group chat %q (%s): %+v" + sendManyCmixErr = "failed to send group chat message from member %s to group %q (%s): %+v" + + // newCmixMsg + messageLenErr = "message length %d is greater than maximum payload size %d" + newKeyErr = "failed to generate key for encrypting group payload" + + // newMessageParts newPublicMsgErr = "failed to create new public group message for cMix message: %+v" newInternalMsgErr = "failed to create new internal group message for cMix message: %+v" + + // newSalt saltReadErr = "failed to generate salt for group message: %+v" saltReadLengthErr = "length of generated salt %d != %d required" ) @@ -54,7 +62,7 @@ func (m *manager) Send(groupID *id.ID, tag string, message []byte) ( groupMessages, err := m.newMessages(g, tag, message, timeNow) if err != nil { return 0, time.Time{}, group.MessageID{}, - errors.Errorf(newCmixMsgErr, err) + errors.Errorf(newCmixMsgErr, g.Name, g.ID, err) } // Obtain message ID @@ -67,10 +75,10 @@ func (m *manager) Send(groupID *id.ID, tag string, message []byte) ( // Send all the groupMessages param := cmix.GetDefaultCMIXParams() param.DebugTag = "group.Message" - rid, _, err := m.services.SendMany(groupMessages, param) + rid, _, err := m.net.SendMany(groupMessages, param) if err != nil { return 0, time.Time{}, group.MessageID{}, - errors.Errorf(sendManyCmixErr, m.receptionId, groupID, err) + errors.Errorf(sendManyCmixErr, m.receptionId, g.Name, g.ID, err) } jww.DEBUG.Printf("[GC] Sent message to %d members in group %s at %s.", @@ -78,8 +86,7 @@ func (m *manager) Send(groupID *id.ID, tag string, message []byte) ( return rid, timeNow, msgId, nil } -// newMessages quickly builds messages for all group chat members in multiple -// threads. +// newMessages builds a list of messages, one for each group chat member. func (m *manager) newMessages(g gs.Group, tag string, msg []byte, timestamp time.Time) ([]cmix.TargetedCmixMessage, error) { @@ -97,7 +104,7 @@ func (m *manager) newMessages(g gs.Group, tag string, msg []byte, // Add cMix message to list cMixMsg, err := newCmixMsg(g, tag, msg, timestamp, member, rng, - m.receptionId, m.services.GetMaxMessageLength()) + m.receptionId, m.net.GetMaxMessageLength()) if err != nil { return nil, err } @@ -107,7 +114,8 @@ func (m *manager) newMessages(g gs.Group, tag string, msg []byte, return messages, nil } -// newCmixMsg generates a new cMix message to be sent to a group member. +// newCmixMsg generates a new cmix.TargetedCmixMessage for the given group +// member func newCmixMsg(g gs.Group, tag string, msg []byte, timestamp time.Time, mem group.Member, rng io.Reader, senderId *id.ID, maxCmixMessageSize int) ( cmix.TargetedCmixMessage, error) { @@ -164,7 +172,7 @@ func newCmixMsg(g gs.Group, tag string, msg []byte, timestamp time.Time, return cmixMsg, nil } -// Build the group message ID +// getGroupMessageId builds the group message ID. func getGroupMessageId(grp *cyclic.Group, groupId, senderId *id.ID, timestamp time.Time, msg []byte) (group.MessageID, error) { cmixMsg := format.NewMessage(grp.GetP().ByteLen()) diff --git a/groupChat/send_test.go b/groupChat/send_test.go index 85eeeb245..c4551c314 100644 --- a/groupChat/send_test.go +++ b/groupChat/send_test.go @@ -41,15 +41,15 @@ func Test_manager_Send(t *testing.T) { p: &testProcessor{msgChan}, } - roundId, _, msgId, err := m.Send(g.ID, messageBytes) + roundId, _, msgId, err := m.Send(g.ID, "", messageBytes) if err != nil { - t.Errorf("Send() returned an error: %+v", err) + t.Errorf("Send returned an error: %+v", err) } - // get messages sent with or return an error if no messages were sent + // Get messages sent with or return an error if no messages were sent var messages []format.Message - if len(m.services.(*testNetworkManager).receptionMessages) > 0 { - messages = m.services.(*testNetworkManager).receptionMessages[0] + if len(m.net.(*testNetworkManager).receptionMessages) > 0 { + messages = m.net.(*testNetworkManager).receptionMessages[0] } else { t.Error("No group cMix messages received.") } @@ -84,10 +84,12 @@ func TestGroup_newCmixMsg_SaltReaderError(t *testing.T) { expectedErr := strings.SplitN(saltReadErr, "%", 2)[0] m, _ := newTestManager(rand.New(rand.NewSource(42)), t) - _, err := newCmixMsg(gs.Group{ID: id.NewIdFromString("test", id.User, t)}, - []byte{}, time.Time{}, group.Member{}, strings.NewReader(""), m.receptionId, m.services.GetMaxMessageLength()) + _, err := newCmixMsg( + gs.Group{ID: id.NewIdFromString("test", id.User, t)}, "", + []byte{}, time.Time{}, group.Member{}, strings.NewReader(""), + m.receptionId, m.net.GetMaxMessageLength()) if err == nil || !strings.Contains(err.Error(), expectedErr) { - t.Errorf("newCmixMsg() failed to return the expected error"+ + t.Errorf("newCmixMsg failed to return the expected error"+ "\nexpected: %s\nreceived: %+v", expectedErr, err) } } @@ -106,9 +108,10 @@ func TestGroup_newCmixMsg_InternalMsgSizeError(t *testing.T) { // Create cMix message prng = rand.New(rand.NewSource(42)) - _, err := newCmixMsg(g, testMsg, netTime.Now(), mem, prng, m.receptionId, m.services.GetMaxMessageLength()) + _, err := newCmixMsg(g, "", testMsg, netTime.Now(), mem, prng, + m.receptionId, m.net.GetMaxMessageLength()) if err == nil || !strings.Contains(err.Error(), expectedErr) { - t.Errorf("newCmixMsg() failed to return the expected error"+ + t.Errorf("newCmixMsg failed to return the expected error"+ "\nexpected: %s\nreceived: %+v", expectedErr, err) } } @@ -119,7 +122,7 @@ func Test_newMessageParts_PublicMsgSizeErr(t *testing.T) { _, _, err := newMessageParts(publicMinLen - 1) if err == nil || !strings.Contains(err.Error(), expectedErr) { - t.Errorf("newMessageParts() did not return the expected error."+ + t.Errorf("newMessageParts did not return the expected error."+ "\nexpected: %s\nreceived: %+v", expectedErr, err) } } @@ -130,7 +133,7 @@ func Test_newMessageParts_InternalMsgSizeErr(t *testing.T) { _, _, err := newMessageParts(publicMinLen) if err == nil || !strings.Contains(err.Error(), expectedErr) { - t.Errorf("newMessageParts() did not return the expected error."+ + t.Errorf("newMessageParts did not return the expected error."+ "\nexpected: %s\nreceived: %+v", expectedErr, err) } } @@ -153,13 +156,13 @@ func Test_newSalt_Consistency(t *testing.T) { for i, expected := range expectedSalts { salt, err := newSalt(prng) if err != nil { - t.Errorf("newSalt() returned an error (%d): %+v", i, err) + t.Errorf("newSalt returned an error (%d): %+v", i, err) } saltString := base64.StdEncoding.EncodeToString(salt[:]) if expected != saltString { - t.Errorf("newSalt() did not return the expected salt (%d)."+ + t.Errorf("newSalt did not return the expected salt (%d)."+ "\nexpected: %s\nreceived: %s", i, expected, saltString) } @@ -173,7 +176,7 @@ func Test_newSalt_ReadError(t *testing.T) { _, err := newSalt(strings.NewReader("")) if err == nil || !strings.Contains(err.Error(), expectedErr) { - t.Errorf("newSalt() failed to return the expected error"+ + t.Errorf("newSalt failed to return the expected error"+ "\nexpected: %s\nreceived: %+v", expectedErr, err) } } @@ -184,7 +187,7 @@ func Test_newSalt_ReadLengthError(t *testing.T) { _, err := newSalt(strings.NewReader("A")) if err == nil || !strings.Contains(err.Error(), expectedErr) { - t.Errorf("newSalt() failed to return the expected error"+ + t.Errorf("newSalt failed to return the expected error"+ "\nexpected: %s\nreceived: %+v", expectedErr, err) } } @@ -203,7 +206,7 @@ func Test_setInternalPayload(t *testing.T) { payload := setInternalPayload(internalMessage, timestamp, sender, testMsg) if err != nil { - t.Errorf("setInternalPayload() returned an error: %+v", err) + t.Errorf("setInternalPayload returned an error: %+v", err) } // Attempt to unmarshal and check all values @@ -248,7 +251,7 @@ func Test_setPublicPayload(t *testing.T) { payload := setPublicPayload(publicMessage, salt, encryptedPayload) if err != nil { - t.Errorf("setPublicPayload() returned an error: %+v", err) + t.Errorf("setPublicPayload returned an error: %+v", err) } // Attempt to unmarshal and check all values diff --git a/groupChat/service.go b/groupChat/service.go index 63fb600e4..fd38102a3 100644 --- a/groupChat/service.go +++ b/groupChat/service.go @@ -8,26 +8,85 @@ package groupChat import ( + "github.com/pkg/errors" "gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/cmix/message" gs "gitlab.com/elixxir/client/groupChat/groupStore" + "gitlab.com/xx_network/primitives/id" ) -func (m *manager) AddService(g gs.Group, tag string, p Processor) { - newService := message.Service{ - Identifier: g.ID[:], - Tag: makeServiceTag(tag), - Metadata: g.ID[:], +// Error messages. +const ( + // manager.AddService + errServiceExists = "service with tag %q already exists" + + // manager.RemoveService + errServiceNotExists = "service with tag %q does not exist" +) + +func (m *manager) AddService(tag string, p Processor) error { + m.servicesMux.Lock() + defer m.servicesMux.Unlock() + + // Add the service to the list + if _, exists := m.services[tag]; exists { + return errors.Errorf(errServiceExists, tag) + } else { + m.services[tag] = p + } + + // Add a service for every group + for _, g := range m.gs.Groups() { + newService := makeService(g.ID, tag) + m.net.AddService(g.ID, newService, &receptionProcessor{m, g, p}) + } + + return nil +} + +func (m *manager) RemoveService(tag string) error { + m.servicesMux.Lock() + defer m.servicesMux.Unlock() + + // Delete the service from the list + oldProcess, exists := m.services[tag] + if exists { + return errors.Errorf(errServiceNotExists, tag) + } else { + delete(m.services, tag) + } + + // Delete service for every group + for _, g := range m.gs.Groups() { + toDelete := makeService(g.ID, tag) + m.net.DeleteService(g.ID, toDelete, &receptionProcessor{m, g, oldProcess}) + } + + return nil +} + +// addAllServices adds every service for the given group. +func (m *manager) addAllServices(g gs.Group) { + for tag, p := range m.services { + newService := makeService(g.ID, tag) + m.net.AddService(g.ID, newService, &receptionProcessor{m, g, p}) + } +} + +// deleteAllServices deletes every service for the given group. +func (m *manager) deleteAllServices(groupID *id.ID) { + for tag := range m.services { + toDelete := makeService(groupID, tag) + m.net.DeleteService(groupID, toDelete, nil) } - m.services.AddService(m.receptionId, newService, &receptionProcessor{m, g, p}) } -func (m *manager) RemoveService(g gs.Group, tag string, p Processor) { - toDelete := message.Service{ - Identifier: g.ID[:], +func makeService(groupID *id.ID, tag string) message.Service { + return message.Service{ + Identifier: groupID[:], Tag: makeServiceTag(tag), + Metadata: groupID[:], } - m.services.DeleteService(g.ID, toDelete, &receptionProcessor{m, g, p}) } func makeServiceTag(tag string) string { diff --git a/groupChat/utils_test.go b/groupChat/utils_test.go index 34b3ed5b4..94d909dcf 100644 --- a/groupChat/utils_test.go +++ b/groupChat/utils_test.go @@ -45,10 +45,10 @@ import ( func newTestManager(rng *rand.Rand, t *testing.T) (*manager, gs.Group) { m := &manager{ receptionId: id.NewIdFromString("test", id.User, t), - rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), - grp: getGroup(), - services: newTestNetworkManager(0, t), + net: newTestNetworkManager(0, t), e2e: newTestE2eManager(randCycInt(rng)), + grp: getGroup(), + rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), } user := group.Member{ ID: m.receptionId, @@ -73,12 +73,11 @@ func newTestManagerWithStore(rng *rand.Rand, numGroups int, sendErr int, t *testing.T) (*manager, gs.Group) { m := &manager{ - receptionId: id.NewIdFromString("test", id.User, t), - rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), - grp: getGroup(), + services: make(map[string]Processor), requestFunc: requestFunc, receiveFunc: receiveFunc, - services: newTestNetworkManager(sendErr, t), + receptionId: id.NewIdFromString("test", id.User, t), + net: newTestNetworkManager(sendErr, t), e2e: &testE2eManager{ e2eMessages: []testE2eMessage{}, sendErr: sendErr, @@ -86,6 +85,8 @@ func newTestManagerWithStore(rng *rand.Rand, numGroups int, sendErr int, dhPubKey: randCycInt(rng), partners: make(map[id.ID]partner.Manager), }, + grp: getGroup(), + rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG), } user := group.Member{ ID: m.receptionId, -- GitLab