Skip to content
Snippets Groups Projects
Commit 992c6f14 authored by Jono Wenger's avatar Jono Wenger
Browse files

Fix file transfer rate limiter to allow rates slower than 1 batch per second

parent eda76494
No related branches found
No related tags found
1 merge request!510Release
...@@ -154,7 +154,7 @@ func initFileTransferManager(client *messenger.Client, maxThroughput int) ( ...@@ -154,7 +154,7 @@ func initFileTransferManager(client *messenger.Client, maxThroughput int) (
manager, err := ft.NewManager(p, manager, err := ft.NewManager(p,
client.GetUser().ReceptionID, client.GetUser().ReceptionID,
client.GetCmix(), client.GetCmix(),
client.GetStorage().GetKV(), client.GetStorage(),
client.GetRng()) client.GetRng())
if err != nil { if err != nil {
jww.FATAL.Panicf( jww.FATAL.Panicf(
......
...@@ -12,7 +12,9 @@ import ( ...@@ -12,7 +12,9 @@ import (
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/fileTransfer2/store" "gitlab.com/elixxir/client/fileTransfer2/store"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/crypto/cyclic"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/crypto/csprng"
"go.uber.org/ratelimit" "go.uber.org/ratelimit"
"time" "time"
...@@ -27,28 +29,10 @@ const ( ...@@ -27,28 +29,10 @@ const (
// batchBuilderThread creates batches of file parts as they become available and // batchBuilderThread creates batches of file parts as they become available and
// buffer them to send. Also rate limits adding to the buffer. // buffer them to send. Also rate limits adding to the buffer.
func (m *manager) batchBuilderThread(stop *stoppable.Single) { func (m *manager) batchBuilderThread(stop *stoppable.Single) {
// Calculate the average amount of data sent via SendManyCMIX // Calculate rate and make rate limiter
avgNumMessages := (minPartsSendPerRound + maxPartsSendPerRound) / 2 rl := newRateLimiter(m.params.MaxThroughput, m.cmixGroup)
avgSendSize := avgNumMessages * 8192
// Calculate the rate (sends per second) to achieve the desired bandwidth
rate := 1
if m.params.MaxThroughput > avgSendSize {
rate = m.params.MaxThroughput / avgSendSize
}
// Calculate rate and make rate limiter if max throughput is set
rl := ratelimit.NewUnlimited()
if m.params.MaxThroughput > 0 {
jww.INFO.Printf("[FT] Max throughput is %d. "+
"File transfer will be rate limited to %d parts per second.",
m.params.MaxThroughput, rate)
rl = ratelimit.New(rate, ratelimit.WithoutSlack)
} else {
jww.WARN.Printf("[FT] Max throughput is %d. "+
"File transfer will not be rate limited.", m.params.MaxThroughput)
}
// Build each batch and add to the queue
for { for {
numParts := generateRandomPacketSize(m.rng) numParts := generateRandomPacketSize(m.rng)
packet := make([]store.Part, 0, numParts) packet := make([]store.Part, 0, numParts)
...@@ -76,6 +60,53 @@ func (m *manager) batchBuilderThread(stop *stoppable.Single) { ...@@ -76,6 +60,53 @@ func (m *manager) batchBuilderThread(stop *stoppable.Single) {
} }
} }
// newRateLimiter generates a new ratelimit.Limiter that limits the bandwidth to
// the given max throughput (in bytes per second).
func newRateLimiter(
maxThroughput int, cmixGroup *cyclic.Group) ratelimit.Limiter {
// Calculate rate and make rate limiter if max throughput is set
if maxThroughput > 0 {
// Calculate the average amount of data sent in each batch
messageSize := format.NewMessage(cmixGroup.GetP().ByteLen()).ContentsSize()
avgNumMessages := (minPartsSendPerRound + maxPartsSendPerRound) / 2
avgSendSize := avgNumMessages * messageSize
jww.DEBUG.Printf("[FT] Rate limiting parameters: message size: %d, "+
"average number of messages per send: %d, average size of send: %d",
messageSize, avgNumMessages, avgSendSize)
// Calculate the time window needed to achieve the desired bandwidth
per := time.Second
switch {
case avgSendSize < maxThroughput:
per = time.Second
case avgSendSize < maxThroughput*60:
per = time.Minute
case avgSendSize < maxThroughput*60*60:
per = time.Hour
case avgSendSize < maxThroughput*60*60*24:
per = time.Hour * 24
case avgSendSize < maxThroughput*60*60*24*7:
per = time.Hour * 24 * 7
}
// Calculate the rate of messages per time window
rate := int((float64(maxThroughput) / float64(avgSendSize)) *
float64(per/time.Second))
jww.INFO.Printf("[FT] Max throughput is %d bytes/second. "+
"File transfer will be rate limited to %d per %s.",
maxThroughput, rate, per)
return ratelimit.New(rate, ratelimit.WithoutSlack, ratelimit.Per(per))
}
// If the max throughput is zero, then create an unlimited rate limiter
jww.WARN.Printf("[FT] Max throughput is %d bytes/second. "+
"File transfer will not be rate limited.", maxThroughput)
return ratelimit.NewUnlimited()
}
// generateRandomPacketSize returns a random number between minPartsSendPerRound // generateRandomPacketSize returns a random number between minPartsSendPerRound
// and maxPartsSendPerRound, inclusive. // and maxPartsSendPerRound, inclusive.
func generateRandomPacketSize(rngGen *fastRNG.StreamGenerator) int { func generateRandomPacketSize(rngGen *fastRNG.StreamGenerator) int {
......
...@@ -16,8 +16,14 @@ import ( ...@@ -16,8 +16,14 @@ import (
"gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/client/e2e" "gitlab.com/elixxir/client/e2e"
"gitlab.com/elixxir/client/e2e/receive" "gitlab.com/elixxir/client/e2e/receive"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/cyclic"
e2eCrypto "gitlab.com/elixxir/crypto/e2e" e2eCrypto "gitlab.com/elixxir/crypto/e2e"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/ekv"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/crypto/large"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
...@@ -50,11 +56,11 @@ type mockCmix struct { ...@@ -50,11 +56,11 @@ type mockCmix struct {
sync.Mutex sync.Mutex
} }
func newMockCmix(myID *id.ID, handler *mockCmixHandler) *mockCmix { func newMockCmix(
myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix {
return &mockCmix{ return &mockCmix{
myID: myID, myID: myID,
numPrimeBytes: 97, numPrimeBytes: storage.GetCmixGroup().GetP().ByteLen(),
// numPrimeBytes: 4096,
health: true, health: true,
handler: handler, handler: handler,
healthCBs: make(map[uint64]func(b bool)), healthCBs: make(map[uint64]func(b bool)),
...@@ -192,3 +198,27 @@ func (m *mockConnection) RegisterListener( ...@@ -192,3 +198,27 @@ func (m *mockConnection) RegisterListener(
m.handler.listeners[mt] = listener m.handler.listeners[mt] = listener
return receive.ListenerID{} return receive.ListenerID{}
} }
////////////////////////////////////////////////////////////////////////////////
// Mock Storage Session //
////////////////////////////////////////////////////////////////////////////////
type mockStorage struct {
kv *versioned.KV
cmixGroup *cyclic.Group
}
func newMockStorage() *mockStorage {
b := make([]byte, 768)
rng := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG).GetStream()
_, _ = rng.Read(b)
rng.Close()
return &mockStorage{
kv: versioned.NewKV(ekv.MakeMemstore()),
cmixGroup: cyclic.NewGroup(large.NewIntFromBytes(b), large.NewInt(2)),
}
}
func (m *mockStorage) GetKV() *versioned.KV { return m.kv }
func (m *mockStorage) GetCmixGroup() *cyclic.Group { return m.cmixGroup }
...@@ -13,10 +13,8 @@ import ( ...@@ -13,10 +13,8 @@ import (
"gitlab.com/elixxir/client/connect" "gitlab.com/elixxir/client/connect"
"gitlab.com/elixxir/client/e2e/receive" "gitlab.com/elixxir/client/e2e/receive"
ft "gitlab.com/elixxir/client/fileTransfer2" ft "gitlab.com/elixxir/client/fileTransfer2"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/elixxir/ekv"
"gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
...@@ -57,12 +55,12 @@ func Test_FileTransfer_Smoke(t *testing.T) { ...@@ -57,12 +55,12 @@ func Test_FileTransfer_Smoke(t *testing.T) {
tid, fileName, fileType, sender, size, preview} tid, fileName, fileType, sender, size, preview}
} }
myID1 := id.NewIdFromString("myID1", id.User, t) myID1 := id.NewIdFromString("myID1", id.User, t)
kv1 := versioned.NewKV(ekv.MakeMemstore()) storage1 := newMockStorage()
endE2eChan1 := make(chan receive.Message, 3) endE2eChan1 := make(chan receive.Message, 3)
conn1 := newMockConnection(myID1, e2eHandler) conn1 := newMockConnection(myID1, e2eHandler)
conn1.RegisterListener(catalog.EndFileTransfer, newMockListener(endE2eChan1)) conn1.RegisterListener(catalog.EndFileTransfer, newMockListener(endE2eChan1))
cmix1 := newMockCmix(myID1, cMixHandler) cmix1 := newMockCmix(myID1, cMixHandler, storage1)
ftManager1, err := ft.NewManager(ftParams, myID1, cmix1, kv1, rngGen) ftManager1, err := ft.NewManager(ftParams, myID1, cmix1, storage1, rngGen)
if err != nil { if err != nil {
t.Errorf("Failed to make new file transfer manager: %+v", err) t.Errorf("Failed to make new file transfer manager: %+v", err)
} }
...@@ -83,12 +81,12 @@ func Test_FileTransfer_Smoke(t *testing.T) { ...@@ -83,12 +81,12 @@ func Test_FileTransfer_Smoke(t *testing.T) {
tid, fileName, fileType, sender, size, preview} tid, fileName, fileType, sender, size, preview}
} }
myID2 := id.NewIdFromString("myID2", id.User, t) myID2 := id.NewIdFromString("myID2", id.User, t)
kv2 := versioned.NewKV(ekv.MakeMemstore()) storage2 := newMockStorage()
endE2eChan2 := make(chan receive.Message, 3) endE2eChan2 := make(chan receive.Message, 3)
conn2 := newMockConnection(myID2, e2eHandler) conn2 := newMockConnection(myID2, e2eHandler)
conn2.RegisterListener(catalog.EndFileTransfer, newMockListener(endE2eChan2)) conn2.RegisterListener(catalog.EndFileTransfer, newMockListener(endE2eChan2))
cmix2 := newMockCmix(myID1, cMixHandler) cmix2 := newMockCmix(myID1, cMixHandler, storage2)
ftManager2, err := ft.NewManager(ftParams, myID2, cmix2, kv2, rngGen) ftManager2, err := ft.NewManager(ftParams, myID2, cmix2, storage2, rngGen)
if err != nil { if err != nil {
t.Errorf("Failed to make new file transfer manager: %+v", err) t.Errorf("Failed to make new file transfer manager: %+v", err)
} }
......
...@@ -16,8 +16,14 @@ import ( ...@@ -16,8 +16,14 @@ import (
"gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/client/e2e" "gitlab.com/elixxir/client/e2e"
"gitlab.com/elixxir/client/e2e/receive" "gitlab.com/elixxir/client/e2e/receive"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/cyclic"
e2eCrypto "gitlab.com/elixxir/crypto/e2e" e2eCrypto "gitlab.com/elixxir/crypto/e2e"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/ekv"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/crypto/large"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
...@@ -50,11 +56,10 @@ type mockCmix struct { ...@@ -50,11 +56,10 @@ type mockCmix struct {
sync.Mutex sync.Mutex
} }
func newMockCmix(myID *id.ID, handler *mockCmixHandler) *mockCmix { func newMockCmix(myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix {
return &mockCmix{ return &mockCmix{
myID: myID, myID: myID,
numPrimeBytes: 97, numPrimeBytes: storage.GetCmixGroup().GetP().ByteLen(),
// numPrimeBytes: 4096,
health: true, health: true,
handler: handler, handler: handler,
healthCBs: make(map[uint64]func(b bool)), healthCBs: make(map[uint64]func(b bool)),
...@@ -193,3 +198,27 @@ func (m *mockE2e) RegisterListener(senderID *id.ID, mt catalog.MessageType, ...@@ -193,3 +198,27 @@ func (m *mockE2e) RegisterListener(senderID *id.ID, mt catalog.MessageType,
} }
return receive.ListenerID{} return receive.ListenerID{}
} }
////////////////////////////////////////////////////////////////////////////////
// Mock Storage Session //
////////////////////////////////////////////////////////////////////////////////
type mockStorage struct {
kv *versioned.KV
cmixGroup *cyclic.Group
}
func newMockStorage() *mockStorage {
b := make([]byte, 768)
rng := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG).GetStream()
_, _ = rng.Read(b)
rng.Close()
return &mockStorage{
kv: versioned.NewKV(ekv.MakeMemstore()),
cmixGroup: cyclic.NewGroup(large.NewIntFromBytes(b), large.NewInt(2)),
}
}
func (m *mockStorage) GetKV() *versioned.KV { return m.kv }
func (m *mockStorage) GetCmixGroup() *cyclic.Group { return m.cmixGroup }
...@@ -13,10 +13,8 @@ import ( ...@@ -13,10 +13,8 @@ import (
"gitlab.com/elixxir/client/e2e" "gitlab.com/elixxir/client/e2e"
"gitlab.com/elixxir/client/e2e/receive" "gitlab.com/elixxir/client/e2e/receive"
ft "gitlab.com/elixxir/client/fileTransfer2" ft "gitlab.com/elixxir/client/fileTransfer2"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/elixxir/ekv"
"gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
...@@ -57,13 +55,13 @@ func Test_FileTransfer_Smoke(t *testing.T) { ...@@ -57,13 +55,13 @@ func Test_FileTransfer_Smoke(t *testing.T) {
tid, fileName, fileType, sender, size, preview} tid, fileName, fileType, sender, size, preview}
} }
myID1 := id.NewIdFromString("myID1", id.User, t) myID1 := id.NewIdFromString("myID1", id.User, t)
kv1 := versioned.NewKV(ekv.MakeMemstore()) storage1 := newMockStorage()
endE2eChan1 := make(chan receive.Message, 3) endE2eChan1 := make(chan receive.Message, 3)
e2e1 := newMockE2e(myID1, e2eHandler) e2e1 := newMockE2e(myID1, e2eHandler)
e2e1.RegisterListener( e2e1.RegisterListener(
myID1, catalog.EndFileTransfer, newMockListener(endE2eChan1)) myID1, catalog.EndFileTransfer, newMockListener(endE2eChan1))
cmix1 := newMockCmix(myID1, cMixHandler) cmix1 := newMockCmix(myID1, cMixHandler, storage1)
ftManager1, err := ft.NewManager(ftParams, myID1, cmix1, kv1, rngGen) ftManager1, err := ft.NewManager(ftParams, myID1, cmix1, storage1, rngGen)
if err != nil { if err != nil {
t.Errorf("Failed to make new file transfer manager: %+v", err) t.Errorf("Failed to make new file transfer manager: %+v", err)
} }
...@@ -84,13 +82,13 @@ func Test_FileTransfer_Smoke(t *testing.T) { ...@@ -84,13 +82,13 @@ func Test_FileTransfer_Smoke(t *testing.T) {
tid, fileName, fileType, sender, size, preview} tid, fileName, fileType, sender, size, preview}
} }
myID2 := id.NewIdFromString("myID2", id.User, t) myID2 := id.NewIdFromString("myID2", id.User, t)
kv2 := versioned.NewKV(ekv.MakeMemstore()) storage2 := newMockStorage()
endE2eChan2 := make(chan receive.Message, 3) endE2eChan2 := make(chan receive.Message, 3)
e2e2 := newMockE2e(myID2, e2eHandler) e2e2 := newMockE2e(myID2, e2eHandler)
e2e2.RegisterListener( e2e2.RegisterListener(
myID2, catalog.EndFileTransfer, newMockListener(endE2eChan2)) myID2, catalog.EndFileTransfer, newMockListener(endE2eChan2))
cmix2 := newMockCmix(myID1, cMixHandler) cmix2 := newMockCmix(myID1, cMixHandler, storage2)
ftManager2, err := ft.NewManager(ftParams, myID2, cmix2, kv2, rngGen) ftManager2, err := ft.NewManager(ftParams, myID2, cmix2, storage2, rngGen)
if err != nil { if err != nil {
t.Errorf("Failed to make new file transfer manager: %+v", err) t.Errorf("Failed to make new file transfer manager: %+v", err)
} }
......
...@@ -14,8 +14,14 @@ import ( ...@@ -14,8 +14,14 @@ import (
"gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/client/groupChat" "gitlab.com/elixxir/client/groupChat"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/cyclic"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/crypto/group" "gitlab.com/elixxir/crypto/group"
"gitlab.com/elixxir/ekv"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/crypto/large"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
"sync" "sync"
...@@ -47,11 +53,11 @@ type mockCmix struct { ...@@ -47,11 +53,11 @@ type mockCmix struct {
sync.Mutex sync.Mutex
} }
func newMockCmix(myID *id.ID, handler *mockCmixHandler) *mockCmix { func newMockCmix(
myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix {
return &mockCmix{ return &mockCmix{
myID: myID, myID: myID,
numPrimeBytes: 97, numPrimeBytes: storage.GetCmixGroup().GetP().ByteLen(),
// numPrimeBytes: 4096,
health: true, health: true,
handler: handler, handler: handler,
healthCBs: make(map[uint64]func(b bool)), healthCBs: make(map[uint64]func(b bool)),
...@@ -164,3 +170,27 @@ func (m *mockGC) AddService(tag string, p groupChat.Processor) error { ...@@ -164,3 +170,27 @@ func (m *mockGC) AddService(tag string, p groupChat.Processor) error {
m.handler.services[tag] = p m.handler.services[tag] = p
return nil return nil
} }
////////////////////////////////////////////////////////////////////////////////
// Mock Storage Session //
////////////////////////////////////////////////////////////////////////////////
type mockStorage struct {
kv *versioned.KV
cmixGroup *cyclic.Group
}
func newMockStorage() *mockStorage {
b := make([]byte, 768)
rng := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG).GetStream()
_, _ = rng.Read(b)
rng.Close()
return &mockStorage{
kv: versioned.NewKV(ekv.MakeMemstore()),
cmixGroup: cyclic.NewGroup(large.NewIntFromBytes(b), large.NewInt(2)),
}
}
func (m *mockStorage) GetKV() *versioned.KV { return m.kv }
func (m *mockStorage) GetCmixGroup() *cyclic.Group { return m.cmixGroup }
...@@ -11,10 +11,8 @@ import ( ...@@ -11,10 +11,8 @@ import (
"bytes" "bytes"
ft "gitlab.com/elixxir/client/fileTransfer2" ft "gitlab.com/elixxir/client/fileTransfer2"
"gitlab.com/elixxir/client/groupChat" "gitlab.com/elixxir/client/groupChat"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/elixxir/ekv"
"gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
...@@ -48,10 +46,10 @@ func Test_FileTransfer_Smoke(t *testing.T) { ...@@ -48,10 +46,10 @@ func Test_FileTransfer_Smoke(t *testing.T) {
// Set up the first client // Set up the first client
myID1 := id.NewIdFromString("myID1", id.User, t) myID1 := id.NewIdFromString("myID1", id.User, t)
kv1 := versioned.NewKV(ekv.MakeMemstore()) storage1 := newMockStorage()
gc1 := newMockGC(gcHandler) gc1 := newMockGC(gcHandler)
ftManager1, err := ft.NewManager( ftManager1, err := ft.NewManager(params, myID1,
params, myID1, newMockCmix(myID1, cMixHandler), kv1, rngGen) newMockCmix(myID1, cMixHandler, storage1), storage1, rngGen)
if err != nil { if err != nil {
t.Errorf("Failed to create file transfer manager 2: %+v", err) t.Errorf("Failed to create file transfer manager 2: %+v", err)
} }
...@@ -72,10 +70,10 @@ func Test_FileTransfer_Smoke(t *testing.T) { ...@@ -72,10 +70,10 @@ func Test_FileTransfer_Smoke(t *testing.T) {
tid, fileName, fileType, sender, size, preview} tid, fileName, fileType, sender, size, preview}
} }
myID2 := id.NewIdFromString("myID2", id.User, t) myID2 := id.NewIdFromString("myID2", id.User, t)
kv2 := versioned.NewKV(ekv.MakeMemstore()) storage2 := newMockStorage()
gc2 := newMockGC(gcHandler) gc2 := newMockGC(gcHandler)
ftManager2, err := ft.NewManager( ftManager2, err := ft.NewManager(params, myID2,
params, myID2, newMockCmix(myID2, cMixHandler), kv2, rngGen) newMockCmix(myID2, cMixHandler, storage2), storage2, rngGen)
if err != nil { if err != nil {
t.Errorf("Failed to create file transfer manager 2: %+v", err) t.Errorf("Failed to create file transfer manager 2: %+v", err)
} }
......
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"gitlab.com/elixxir/client/fileTransfer2/store/fileMessage" "gitlab.com/elixxir/client/fileTransfer2/store/fileMessage"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/cyclic"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
...@@ -122,6 +123,7 @@ type manager struct { ...@@ -122,6 +123,7 @@ type manager struct {
myID *id.ID myID *id.ID
cmix Cmix cmix Cmix
cmixGroup *cyclic.Group
kv *versioned.KV kv *versioned.KV
rng *fastRNG.StreamGenerator rng *fastRNG.StreamGenerator
} }
...@@ -142,12 +144,19 @@ type Cmix interface { ...@@ -142,12 +144,19 @@ type Cmix interface {
roundCallback cmix.RoundEventCallback, roundList ...id.Round) error roundCallback cmix.RoundEventCallback, roundList ...id.Round) error
} }
// Storage interface matches a subset of the storage.Session methods used by the
// manager for easier testing.
type Storage interface {
GetKV() *versioned.KV
GetCmixGroup() *cyclic.Group
}
// NewManager creates a new file transfer manager object. If sent or received // NewManager creates a new file transfer manager object. If sent or received
// transfers already existed, they are loaded from storage and queued to resume // transfers already existed, they are loaded from storage and queued to resume
// once manager.startProcesses is called. // once manager.startProcesses is called.
func NewManager(params Params, func NewManager(params Params, myID *id.ID, cmix Cmix, storage Storage,
myID *id.ID, cmix Cmix, kv *versioned.KV,
rng *fastRNG.StreamGenerator) (FileTransfer, error) { rng *fastRNG.StreamGenerator) (FileTransfer, error) {
kv := storage.GetKV()
// Create a new list of sent file transfers or load one if it exists // Create a new list of sent file transfers or load one if it exists
sent, unsentParts, err := store.NewOrLoadSent(kv) sent, unsentParts, err := store.NewOrLoadSent(kv)
...@@ -171,6 +180,7 @@ func NewManager(params Params, ...@@ -171,6 +180,7 @@ func NewManager(params Params,
params: params, params: params,
myID: myID, myID: myID,
cmix: cmix, cmix: cmix,
cmixGroup: storage.GetCmixGroup(),
kv: kv, kv: kv,
rng: rng, rng: rng,
} }
......
This diff is collapsed.
...@@ -15,7 +15,13 @@ import ( ...@@ -15,7 +15,13 @@ import (
"gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/cyclic"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/ekv"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/crypto/large"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
"io" "io"
...@@ -96,17 +102,19 @@ type mockCmix struct { ...@@ -96,17 +102,19 @@ type mockCmix struct {
handler *mockCmixHandler handler *mockCmixHandler
healthCBs map[uint64]func(b bool) healthCBs map[uint64]func(b bool)
healthIndex uint64 healthIndex uint64
round id.Round
sync.Mutex sync.Mutex
} }
func newMockCmix(myID *id.ID, handler *mockCmixHandler) *mockCmix { func newMockCmix(
myID *id.ID, handler *mockCmixHandler, storage *mockStorage) *mockCmix {
return &mockCmix{ return &mockCmix{
myID: myID, myID: myID,
numPrimeBytes: 97, numPrimeBytes: storage.GetCmixGroup().GetP().ByteLen(),
// numPrimeBytes: 4096,
health: true, health: true,
handler: handler, handler: handler,
healthCBs: make(map[uint64]func(b bool)), healthCBs: make(map[uint64]func(b bool)),
round: 0,
healthIndex: 0, healthIndex: 0,
} }
} }
...@@ -120,6 +128,8 @@ func (m *mockCmix) SendMany(messages []cmix.TargetedCmixMessage, ...@@ -120,6 +128,8 @@ func (m *mockCmix) SendMany(messages []cmix.TargetedCmixMessage,
_ cmix.CMIXParams) (id.Round, []ephemeral.Id, error) { _ cmix.CMIXParams) (id.Round, []ephemeral.Id, error) {
m.handler.Lock() m.handler.Lock()
defer m.handler.Unlock() defer m.handler.Unlock()
round := m.round
m.round++
for _, targetedMsg := range messages { for _, targetedMsg := range messages {
msg := format.NewMessage(m.numPrimeBytes) msg := format.NewMessage(m.numPrimeBytes)
msg.SetContents(targetedMsg.Payload) msg.SetContents(targetedMsg.Payload)
...@@ -127,9 +137,9 @@ func (m *mockCmix) SendMany(messages []cmix.TargetedCmixMessage, ...@@ -127,9 +137,9 @@ func (m *mockCmix) SendMany(messages []cmix.TargetedCmixMessage,
msg.SetKeyFP(targetedMsg.Fingerprint) msg.SetKeyFP(targetedMsg.Fingerprint)
m.handler.processorMap[targetedMsg.Fingerprint].Process(msg, m.handler.processorMap[targetedMsg.Fingerprint].Process(msg,
receptionID.EphemeralIdentity{Source: targetedMsg.Recipient}, receptionID.EphemeralIdentity{Source: targetedMsg.Recipient},
rounds.Round{ID: 42}) rounds.Round{ID: round})
} }
return 42, []ephemeral.Id{}, nil return round, []ephemeral.Id{}, nil
} }
func (m *mockCmix) AddFingerprint(_ *id.ID, fp format.Fingerprint, mp message.Processor) error { func (m *mockCmix) AddFingerprint(_ *id.ID, fp format.Fingerprint, mp message.Processor) error {
...@@ -170,7 +180,31 @@ func (m *mockCmix) RemoveHealthCallback(healthID uint64) { ...@@ -170,7 +180,31 @@ func (m *mockCmix) RemoveHealthCallback(healthID uint64) {
} }
func (m *mockCmix) GetRoundResults(_ time.Duration, func (m *mockCmix) GetRoundResults(_ time.Duration,
roundCallback cmix.RoundEventCallback, _ ...id.Round) error { roundCallback cmix.RoundEventCallback, rids ...id.Round) error {
go roundCallback(true, false, map[id.Round]cmix.RoundResult{42: {}}) go roundCallback(true, false, map[id.Round]cmix.RoundResult{rids[0]: {}})
return nil return nil
} }
////////////////////////////////////////////////////////////////////////////////
// Mock Storage Session //
////////////////////////////////////////////////////////////////////////////////
type mockStorage struct {
kv *versioned.KV
cmixGroup *cyclic.Group
}
func newMockStorage() *mockStorage {
b := make([]byte, 768)
rng := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG).GetStream()
_, _ = rng.Read(b)
rng.Close()
return &mockStorage{
kv: versioned.NewKV(ekv.MakeMemstore()),
cmixGroup: cyclic.NewGroup(large.NewIntFromBytes(b), large.NewInt(2)),
}
}
func (m *mockStorage) GetKV() *versioned.KV { return m.kv }
func (m *mockStorage) GetCmixGroup() *cyclic.Group { return m.cmixGroup }
...@@ -51,7 +51,6 @@ require ( ...@@ -51,7 +51,6 @@ require (
gitlab.com/xx_network/ring v0.0.3-0.20220222211904-da613960ad93 // indirect gitlab.com/xx_network/ring v0.0.3-0.20220222211904-da613960ad93 // indirect
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect
golang.org/x/text v0.3.6 // indirect golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
google.golang.org/genproto v0.0.0-20210105202744-fe13368bc0e1 // indirect google.golang.org/genproto v0.0.0-20210105202744-fe13368bc0e1 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment