Skip to content
Snippets Groups Projects
Commit a7abeffd authored by Jono Wenger's avatar Jono Wenger
Browse files

Create wrapper for e2e file transfer

parent 7e0d65a2
No related branches found
No related tags found
4 merge requests!510Release,!226WIP: Api2.0,!210XX-3880 / Generic File Transfer,!207WIP: Client Restructure
......@@ -5,20 +5,13 @@
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.17.3
// source: fileTransfer/ftMessages.proto
package fileTransfer2
package e2e
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
......
......@@ -8,7 +8,7 @@
syntax = "proto3";
package parse;
option go_package = "fileTransfer/";
option go_package = "fileTransfer2/e2e";
// NewFileTransfer is transmitted first on the initialization of a file transfer
// to inform the receiver about the incoming file.
......
File moved
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package e2e
import (
"github.com/golang/protobuf/proto"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/e2e/receive"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
)
// Error messages.
const (
// listener.Hear
errProtoUnmarshal = "[FT] Failed to proto unmarshal new file transfer request: %+v"
errNewReceivedTransfer = "[FT] Failed to add new received transfer for %q: %+v"
)
// Name of listener (used for debugging)
const listenerName = "NewFileTransferListener-E2E"
// listener waits for a message indicating a new file transfer is starting.
// Adheres to the receive.Listener interface.
type listener struct {
m *manager
}
// Hear is called when a new file transfer is received. It creates a new
// internal received file transfer and starts waiting to receive file part
// messages.
func (l *listener) Hear(msg receive.Message) {
// Unmarshal the request message
newFT := &NewFileTransfer{}
err := proto.Unmarshal(msg.Payload, newFT)
if err != nil {
jww.ERROR.Printf(errProtoUnmarshal, err)
return
}
transferKey := ftCrypto.UnmarshalTransferKey(newFT.GetTransferKey())
// Add new transfer to start receiving parts
tid, err := l.m.AddNew(newFT.FileName, &transferKey, newFT.TransferMac,
uint16(newFT.NumParts), newFT.Size, newFT.Retry, nil, 0)
if err != nil {
jww.ERROR.Printf(errNewReceivedTransfer, newFT.FileName, err)
return
}
// Call the reception callback
go l.m.receiveCB(tid, newFT.FileName, newFT.FileType, msg.Sender,
newFT.Size, newFT.Preview)
}
// Name returns a name used for debugging.
func (l *listener) Name() string {
return listenerName
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package e2e
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/e2e"
"gitlab.com/elixxir/client/e2e/receive"
ft "gitlab.com/elixxir/client/fileTransfer2"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage/versioned"
e2eCrypto "gitlab.com/elixxir/crypto/e2e"
"gitlab.com/elixxir/crypto/fastRNG"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/xx_network/primitives/id"
"time"
)
// Error messages.
const (
// NewManager
errNewFtManager = "cannot create new E2E file transfer manager: %+v"
)
type manager struct {
// Callback that is called every time a new file transfer is received
receiveCB ft.ReceiveCallback
// File transfer manager
ft ft.FileTransfer
myID *id.ID
cmix ft.Cmix
e2e E2e
}
// E2e interface matches a subset of the e2e.Handler methods used by the manager
// for easier testing.
type E2e interface {
SendE2E(mt catalog.MessageType, recipient *id.ID, payload []byte,
params e2e.Params) ([]id.Round, e2eCrypto.MessageID, time.Time, error)
RegisterListener(senderID *id.ID, messageType catalog.MessageType,
newListener receive.Listener) receive.ListenerID
}
func NewManager(receiveCB ft.ReceiveCallback, params ft.Params, myID *id.ID,
e2e E2e, cmix ft.Cmix, kv *versioned.KV, rng *fastRNG.StreamGenerator) (
ft.FileTransfer, error) {
sendNewCb := func(recipient *id.ID, info *ft.TransferInfo) error {
return sendNewFileTransferMessage(recipient, info, e2e)
}
sendEndCb := func(recipient *id.ID) {
sendEndFileTransferMessage(recipient, cmix, e2e)
}
ftManager, err := ft.NewManager(
sendNewCb, sendEndCb, params, myID, cmix, kv, rng)
if err != nil {
return nil, errors.Errorf(errNewFtManager, err)
}
return &manager{
receiveCB: receiveCB,
ft: ftManager,
myID: myID,
cmix: cmix,
e2e: e2e,
}, nil
}
func (m *manager) StartProcesses() (stoppable.Stoppable, error) {
// Register listener to receive new file transfers
m.e2e.RegisterListener(m.myID, catalog.NewFileTransfer, &listener{m})
return m.ft.StartProcesses()
}
func (m *manager) MaxFileNameLen() int {
return m.ft.MaxFileNameLen()
}
func (m *manager) MaxFileTypeLen() int {
return m.ft.MaxFileTypeLen()
}
func (m *manager) MaxFileSize() int {
return m.ft.MaxFileSize()
}
func (m *manager) MaxPreviewSize() int {
return m.ft.MaxPreviewSize()
}
func (m *manager) Send(fileName, fileType string, fileData []byte,
recipient *id.ID, retry float32, preview []byte,
progressCB ft.SentProgressCallback, period time.Duration) (
*ftCrypto.TransferID, error) {
return m.ft.Send(fileName, fileType, fileData, recipient, retry, preview,
progressCB, period)
}
func (m *manager) RegisterSentProgressCallback(tid *ftCrypto.TransferID,
progressCB ft.SentProgressCallback, period time.Duration) error {
return m.ft.RegisterSentProgressCallback(tid, progressCB, period)
}
func (m *manager) CloseSend(tid *ftCrypto.TransferID) error {
return m.ft.CloseSend(tid)
}
func (m *manager) AddNew(fileName string, key *ftCrypto.TransferKey,
transferMAC []byte, numParts uint16, size uint32, retry float32,
progressCB ft.ReceivedProgressCallback, period time.Duration) (
*ftCrypto.TransferID, error) {
return m.ft.AddNew(fileName, key, transferMAC, numParts, size, retry,
progressCB, period)
}
func (m *manager) RegisterReceivedProgressCallback(tid *ftCrypto.TransferID,
progressCB ft.ReceivedProgressCallback, period time.Duration) error {
return m.ft.RegisterReceivedProgressCallback(tid, progressCB, period)
}
func (m *manager) Receive(tid *ftCrypto.TransferID) ([]byte, error) {
return m.ft.Receive(tid)
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package e2e
import (
"bytes"
"gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/e2e"
"gitlab.com/elixxir/client/e2e/receive"
ft "gitlab.com/elixxir/client/fileTransfer2"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/fastRNG"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/elixxir/ekv"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime"
"math"
"sync"
"testing"
"time"
)
// Tests that manager adheres to the fileTransfer2.FileTransfer interface.
var _ ft.FileTransfer = (*manager)(nil)
// Tests that E2e adheres to the e2e.Handler interface.
var _ E2e = (e2e.Handler)(nil)
// Smoke test of the entire file transfer system.
func Test_FileTransfer_Smoke(t *testing.T) {
// jww.SetStdoutThreshold(jww.LevelDebug)
// Set up cMix and E2E message handlers
cMixHandler := newMockCmixHandler()
e2eHandler := newMockE2eHandler()
rngGen := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG)
params := ft.DefaultParams()
params.MaxThroughput = math.MaxInt
type receiveCbValues struct {
tid *ftCrypto.TransferID
fileName string
fileType string
sender *id.ID
size uint32
preview []byte
}
// Set up the first client
receiveCbChan1 := make(chan receiveCbValues, 10)
receiveCB1 := func(tid *ftCrypto.TransferID, fileName, fileType string,
sender *id.ID, size uint32, preview []byte) {
receiveCbChan1 <- receiveCbValues{
tid, fileName, fileType, sender, size, preview}
}
myID1 := id.NewIdFromString("myID1", id.User, t)
kv1 := versioned.NewKV(ekv.MakeMemstore())
endE2eChan1 := make(chan receive.Message, 3)
e2e1 := newMockE2e(myID1, e2eHandler)
e2e1.RegisterListener(
myID1, catalog.EndFileTransfer, newMockListener(endE2eChan1))
ftm1, err := NewManager(receiveCB1, params, myID1, e2e1,
newMockCmix(myID1, cMixHandler), kv1, rngGen)
if err != nil {
t.Errorf("Failed to create new file transfer manager 1: %+v", err)
}
m1 := ftm1.(*manager)
stop1, err := m1.StartProcesses()
if err != nil {
t.Errorf("Failed to start processes for manager 1: %+v", err)
}
// Set up the second client
receiveCbChan2 := make(chan receiveCbValues, 10)
receiveCB2 := func(tid *ftCrypto.TransferID, fileName, fileType string,
sender *id.ID, size uint32, preview []byte) {
receiveCbChan2 <- receiveCbValues{
tid, fileName, fileType, sender, size, preview}
}
myID2 := id.NewIdFromString("myID2", id.User, t)
kv2 := versioned.NewKV(ekv.MakeMemstore())
endE2eChan2 := make(chan receive.Message, 3)
e2e2 := newMockE2e(myID2, e2eHandler)
e2e2.RegisterListener(
myID2, catalog.EndFileTransfer, newMockListener(endE2eChan2))
ftm2, err := NewManager(receiveCB2, params, myID2, e2e2,
newMockCmix(myID2, cMixHandler), kv2, rngGen)
if err != nil {
t.Errorf("Failed to create new file transfer manager 2: %+v", err)
}
m2 := ftm2.(*manager)
stop2, err := m2.StartProcesses()
if err != nil {
t.Errorf("Failed to start processes for manager 2: %+v", err)
}
// Wait group prevents the test from quiting before the file has completed
// sending and receiving
var wg sync.WaitGroup
// Define details of file to send
fileName, fileType := "myFile", "txt"
fileData := []byte(loremIpsum)
preview := []byte("Lorem ipsum dolor sit amet")
retry := float32(2.0)
// Create go func that waits for file transfer to be received to register
// a progress callback that then checks that the file received is correct
// when done
wg.Add(1)
var called bool
timeReceived := make(chan time.Time)
go func() {
select {
case r := <-receiveCbChan2:
receiveProgressCB := func(completed bool, received, total uint16,
fpt ft.FilePartTracker, err error) {
if completed && !called {
timeReceived <- netTime.Now()
receivedFile, err2 := m2.Receive(r.tid)
if err2 != nil {
t.Errorf("Failed to receive file: %+v", err2)
}
if !bytes.Equal(fileData, receivedFile) {
t.Errorf("Received file does not match sent."+
"\nsent: %q\nreceived: %q",
fileData, receivedFile)
}
wg.Done()
}
}
err3 := m2.RegisterReceivedProgressCallback(
r.tid, receiveProgressCB, 0)
if err3 != nil {
t.Errorf(
"Failed to Rregister received progress callback: %+v", err3)
}
case <-time.After(2100 * time.Millisecond):
t.Errorf("Timed out waiting to receive new file transfer.")
wg.Done()
}
}()
// Define sent progress callback
wg.Add(1)
sentProgressCb1 := func(completed bool, arrived, total uint16,
fpt ft.FilePartTracker, err error) {
if completed {
wg.Done()
}
}
// Send file.
sendStart := netTime.Now()
tid1, err := m1.Send(
fileName, fileType, fileData, myID2, retry, preview, sentProgressCb1, 0)
if err != nil {
t.Errorf("Failed to send file: %+v", err)
}
go func() {
select {
case tr := <-timeReceived:
fileSize := len(fileData)
sendTime := tr.Sub(sendStart)
fileSizeKb := float32(fileSize) * .001
speed := fileSizeKb * float32(time.Second) / (float32(sendTime))
t.Logf("Completed receiving file %q in %s (%.2f kb @ %.2f kb/s).",
fileName, sendTime, fileSizeKb, speed)
}
}()
// Wait for file to be sent and received
wg.Wait()
err = m1.CloseSend(tid1)
if err != nil {
t.Errorf("Failed to close transfer: %+v", err)
}
err = stop1.Close()
if err != nil {
t.Errorf("Failed to close processes for manager 1: %+v", err)
}
err = stop2.Close()
if err != nil {
t.Errorf("Failed to close processes for manager 2: %+v", err)
}
}
const loremIpsum = `Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed sit amet urna venenatis, rutrum magna maximus, tempor orci. Cras sit amet nulla id dolor blandit commodo. Suspendisse potenti. Praesent gravida porttitor metus vel aliquam. Maecenas rutrum velit at lobortis auctor. Mauris porta blandit tempor. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Morbi volutpat posuere maximus. Nunc in augue molestie ante mattis tempor.
Phasellus placerat elit eu fringilla pharetra. Vestibulum consectetur pulvinar nunc, vestibulum tincidunt felis rhoncus sit amet. Duis non dolor eleifend nibh luctus eleifend. Nunc urna odio, euismod sit amet feugiat ut, dapibus vel elit. Nulla est mauris, posuere eget enim cursus, vehicula viverra est. Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quisque mattis, nisi quis consectetur semper, neque enim rhoncus dolor, ut aliquam leo orci sed dolor. Integer ullamcorper pulvinar turpis, a sollicitudin nunc posuere et. Nullam orci nibh, facilisis ac massa eu, bibendum bibendum sapien. Sed tincidunt nunc mauris, nec ullamcorper enim lacinia nec. Nulla dapibus sapien ut odio bibendum, tempus ornare sapien lacinia.
Duis ac hendrerit augue. Nullam porttitor feugiat finibus. Nam enim urna, maximus et ligula eu, aliquet convallis turpis. Vestibulum luctus quam in dictum efficitur. Vestibulum ac pulvinar ipsum. Vivamus consectetur augue nec tellus mollis, at iaculis magna efficitur. Nunc dictum convallis sem, at vehicula nulla accumsan non. Nullam blandit orci vel turpis convallis, mollis porttitor felis accumsan. Sed non posuere leo. Proin ultricies varius nulla at ultricies. Phasellus et pharetra justo. Quisque eu orci odio. Pellentesque pharetra tempor tempor. Aliquam ac nulla lorem. Sed dignissim ligula sit amet nibh fermentum facilisis.
Donec facilisis rhoncus ante. Duis nec nisi et dolor congue semper vel id ligula. Mauris non eleifend libero, et sodales urna. Nullam pharetra gravida velit non mollis. Integer vel ultrices libero, at ultrices magna. Duis semper risus a leo vulputate consectetur. Cras sit amet convallis sapien. Sed blandit, felis et porttitor fringilla, urna tellus commodo metus, at pharetra nibh urna sed sem. Nam ex dui, posuere id mi et, egestas tincidunt est. Nullam elementum pulvinar diam in maximus. Maecenas vel augue vitae nunc consectetur vestibulum in aliquet lacus. Nullam nec lectus dapibus, dictum nisi nec, congue quam. Suspendisse mollis vel diam nec dapibus. Mauris neque justo, scelerisque et suscipit non, imperdiet eget leo. Vestibulum leo turpis, dapibus ac lorem a, mollis pulvinar quam.
Sed sed mauris a neque dignissim aliquet. Aliquam congue gravida velit in efficitur. Integer elementum feugiat est, ac lacinia libero bibendum sed. Sed vestibulum suscipit dignissim. Nunc scelerisque, turpis quis varius tristique, enim lacus vehicula lacus, id vestibulum velit erat eu odio. Donec tincidunt nunc sit amet sapien varius ornare. Phasellus semper venenatis ligula eget euismod. Mauris sodales massa tempor, cursus velit a, feugiat neque. Sed odio justo, rhoncus eu fermentum non, tristique a quam. In vehicula in tortor nec iaculis. Cras ligula sem, sollicitudin at nulla eget, placerat lacinia massa. Mauris tempus quam sit amet leo efficitur egestas. Proin iaculis, velit in blandit egestas, felis odio sollicitudin ipsum, eget interdum leo odio tempor nisi. Curabitur sed mauris id turpis tempor finibus ut mollis lectus. Curabitur neque libero, aliquam facilisis lobortis eget, posuere in augue. In sodales urna sit amet elit euismod rhoncus.`
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package e2e
import (
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/e2e"
ft "gitlab.com/elixxir/client/fileTransfer2"
"gitlab.com/xx_network/primitives/id"
)
// Error messages.
const (
// manager.sendNewFileTransferMessage
errProtoMarshal = "failed to proto marshal NewFileTransfer: %+v"
errNewFtSendE2e = "failed to send initial file transfer message via E2E: %+v"
// manager.sendEndFileTransferMessage
errEndFtSendE2e = "[FT] Failed to send ending file transfer message via E2E: %+v"
)
const (
// Tag that is used for log printing in SendE2E when sending the initial
// message
initialMessageDebugTag = "FT.New"
// Tag that is used for log printing in SendE2E when sending the ending
// message
lastMessageDebugTag = "FT.End"
)
// sendNewFileTransferMessage sends an E2E message to the recipient informing
// them of the incoming file transfer.
func sendNewFileTransferMessage(
recipient *id.ID, info *ft.TransferInfo, e2eHandler E2e) error {
// Construct NewFileTransfer message
protoMsg := &NewFileTransfer{
FileName: info.FileName,
FileType: info.FileType,
TransferKey: info.Key.Bytes(),
TransferMac: info.Mac,
NumParts: uint32(info.NumParts),
Size: info.Size,
Retry: info.Retry,
Preview: info.Preview,
}
// Marshal the message
payload, err := proto.Marshal(protoMsg)
if err != nil {
return errors.Errorf(errProtoMarshal, err)
}
// Get E2E parameters
params := e2e.GetDefaultParams()
params.ServiceTag = catalog.Silent
params.LastServiceTag = catalog.Silent
params.DebugTag = initialMessageDebugTag
_, _, _, err = e2eHandler.SendE2E(
catalog.NewFileTransfer, recipient, payload, params)
if err != nil {
return errors.Errorf(errNewFtSendE2e, err)
}
return nil
}
// sendEndFileTransferMessage sends an E2E message to the recipient informing
// them that all file parts have arrived once the network is healthy.
func sendEndFileTransferMessage(recipient *id.ID, cmix ft.Cmix, e2eHandler E2e) {
callbackID := make(chan uint64, 1)
callbackID <- cmix.AddHealthCallback(
func(healthy bool) {
if healthy {
params := e2e.GetDefaultParams()
params.LastServiceTag = catalog.EndFT
params.DebugTag = lastMessageDebugTag
_, _, _, err := e2eHandler.SendE2E(
catalog.EndFileTransfer, recipient, nil, params)
if err != nil {
jww.ERROR.Printf(errEndFtSendE2e, err)
}
cbID := <-callbackID
cmix.RemoveHealthCallback(cbID)
}
},
)
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package e2e
import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/client/e2e"
"gitlab.com/elixxir/client/e2e/receive"
e2eCrypto "gitlab.com/elixxir/crypto/e2e"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/netTime"
"sync"
"time"
)
////////////////////////////////////////////////////////////////////////////////
// Mock cMix Client //
////////////////////////////////////////////////////////////////////////////////
type mockCmixHandler struct {
sync.Mutex
processorMap map[format.Fingerprint]message.Processor
}
func newMockCmixHandler() *mockCmixHandler {
return &mockCmixHandler{
processorMap: make(map[format.Fingerprint]message.Processor),
}
}
type mockCmix struct {
myID *id.ID
numPrimeBytes int
health bool
handler *mockCmixHandler
healthCBs map[uint64]func(b bool)
healthIndex uint64
sync.Mutex
}
func newMockCmix(myID *id.ID, handler *mockCmixHandler) *mockCmix {
return &mockCmix{
myID: myID,
numPrimeBytes: 97,
// numPrimeBytes: 4096,
health: true,
handler: handler,
healthCBs: make(map[uint64]func(b bool)),
healthIndex: 0,
}
}
func (m *mockCmix) GetMaxMessageLength() int {
msg := format.NewMessage(m.numPrimeBytes)
return msg.ContentsSize()
}
func (m *mockCmix) SendMany(messages []cmix.TargetedCmixMessage,
_ cmix.CMIXParams) (id.Round, []ephemeral.Id, error) {
m.handler.Lock()
for _, targetedMsg := range messages {
msg := format.NewMessage(m.numPrimeBytes)
msg.SetContents(targetedMsg.Payload)
msg.SetMac(targetedMsg.Mac)
msg.SetKeyFP(targetedMsg.Fingerprint)
m.handler.processorMap[targetedMsg.Fingerprint].Process(msg,
receptionID.EphemeralIdentity{Source: targetedMsg.Recipient},
rounds.Round{ID: 42})
}
m.handler.Unlock()
return 42, []ephemeral.Id{}, nil
}
func (m *mockCmix) AddFingerprint(_ *id.ID, fp format.Fingerprint, mp message.Processor) error {
m.Lock()
defer m.Unlock()
m.handler.processorMap[fp] = mp
return nil
}
func (m *mockCmix) DeleteFingerprint(_ *id.ID, fp format.Fingerprint) {
m.handler.Lock()
delete(m.handler.processorMap, fp)
m.handler.Unlock()
}
func (m *mockCmix) IsHealthy() bool {
return m.health
}
func (m *mockCmix) WasHealthy() bool { return true }
func (m *mockCmix) AddHealthCallback(f func(bool)) uint64 {
m.Lock()
defer m.Unlock()
m.healthIndex++
m.healthCBs[m.healthIndex] = f
go f(true)
return m.healthIndex
}
func (m *mockCmix) RemoveHealthCallback(healthID uint64) {
m.Lock()
defer m.Unlock()
if _, exists := m.healthCBs[healthID]; !exists {
jww.FATAL.Panicf("No health callback with ID %d exists.", healthID)
}
delete(m.healthCBs, healthID)
}
func (m *mockCmix) GetRoundResults(_ time.Duration,
roundCallback cmix.RoundEventCallback, _ ...id.Round) error {
go roundCallback(true, false, map[id.Round]cmix.RoundResult{42: {}})
return nil
}
////////////////////////////////////////////////////////////////////////////////
// Mock E2E Handler //
////////////////////////////////////////////////////////////////////////////////
func newMockListener(hearChan chan receive.Message) *mockListener {
return &mockListener{hearChan: hearChan}
}
func (l *mockListener) Hear(item receive.Message) {
l.hearChan <- item
}
func (l *mockListener) Name() string {
return "mockListener"
}
type mockE2eHandler struct {
msgMap map[id.ID]map[catalog.MessageType][][]byte
listeners map[id.ID]map[catalog.MessageType]receive.Listener
}
func newMockE2eHandler() *mockE2eHandler {
return &mockE2eHandler{
msgMap: make(map[id.ID]map[catalog.MessageType][][]byte),
listeners: make(map[id.ID]map[catalog.MessageType]receive.Listener),
}
}
type mockE2e struct {
myID *id.ID
handler *mockE2eHandler
}
type mockListener struct {
hearChan chan receive.Message
}
func newMockE2e(myID *id.ID, handler *mockE2eHandler) *mockE2e {
return &mockE2e{
myID: myID,
handler: handler,
}
}
// SendE2E adds the message to the e2e handler map.
func (m *mockE2e) SendE2E(mt catalog.MessageType, recipient *id.ID, payload []byte,
_ e2e.Params) ([]id.Round, e2eCrypto.MessageID, time.Time, error) {
m.handler.listeners[*recipient][mt].Hear(receive.Message{
MessageType: mt,
Payload: payload,
Sender: m.myID,
RecipientID: recipient,
})
return []id.Round{42}, e2eCrypto.MessageID{}, netTime.Now(), nil
}
func (m *mockE2e) RegisterListener(senderID *id.ID, mt catalog.MessageType,
listener receive.Listener) receive.ListenerID {
if _, exists := m.handler.listeners[*senderID]; !exists {
m.handler.listeners[*senderID] = map[catalog.MessageType]receive.Listener{mt: listener}
} else if _, exists = m.handler.listeners[*senderID][mt]; !exists {
m.handler.listeners[*senderID][mt] = listener
}
return receive.ListenerID{}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment