diff --git a/api/utils_test.go b/api/utils_test.go index 3abfd68616da5da0da44fbee18123f4338ed951f..9b9fccac0225438387907c405efbcc023fa0e5a6 100644 --- a/api/utils_test.go +++ b/api/utils_test.go @@ -66,7 +66,7 @@ func newTestingClient(face interface{}) (*Client, error) { } p := gateway.DefaultPoolParams() - p.PoolSize = 1 + p.MaxPoolSize = 1 sender, _ := gateway.NewSender(p, c.rng, def, commsManager, c.storage, nil) c.network = &testNetworkManagerGeneric{instance: thisInstance, sender: sender} diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index c2f7b4af1bd16aeb41af9699527a0a30368b8933..cd88556be1543ceecd4f97f0681b40964f43b02c 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -58,7 +58,8 @@ type HostPool struct { // PoolParams Allows configuration of HostPool parameters type PoolParams struct { - PoolSize uint32 // Quantity of Hosts in the HostPool + MaxPoolSize uint32 // Maximum number of Hosts in the HostPool + PoolSize uint32 // Allows override of HostPool size. Set to zero for dynamic size calculation // TODO: Move up a layer ProxyAttempts uint32 // How many proxies will be used in event of send failure HostParams connect.HostParams // Parameters for the creation of new Host objects @@ -67,8 +68,9 @@ type PoolParams struct { // DefaultPoolParams Returns a default set of PoolParams func DefaultPoolParams() PoolParams { p := PoolParams{ - PoolSize: 30, + MaxPoolSize: 30, ProxyAttempts: 5, + PoolSize: 0, HostParams: connect.GetDefaultHostParams(), } p.HostParams.MaxRetries = 1 @@ -82,6 +84,16 @@ func DefaultPoolParams() PoolParams { // Build and return new HostPool object func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, ndf *ndf.NetworkDefinition, getter HostManager, storage *storage.Session, addGateway chan network.NodeGateway) (*HostPool, error) { + var err error + + // Determine size of HostPool + if poolParams.PoolSize == 0 { + poolParams.PoolSize, err = getPoolSize(uint32(len(ndf.Gateways)), poolParams.MaxPoolSize) + if err != nil { + return nil, err + } + } + result := &HostPool{ manager: getter, hostMap: make(map[id.ID]uint32), @@ -93,15 +105,8 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, ndf *ndf.N addGatewayChan: addGateway, } - // Verify the NDF has at least as many Gateways as needed for the HostPool - ndfLen := uint32(len(result.ndf.Gateways)) - if ndfLen == 0 || ndfLen < result.poolParams.PoolSize { - return nil, errors.Errorf("Unable to create HostPool: %d/%d gateways available", - len(result.ndf.Gateways), result.poolParams.PoolSize) - } - // Propagate the NDF - err := result.updateConns() + err = result.updateConns() if err != nil { return nil, err } @@ -113,6 +118,8 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, ndf *ndf.N return nil, err } } + + jww.INFO.Printf("Initialized HostPool with size: %d/%d", poolParams.PoolSize, len(ndf.Gateways)) return result, nil } @@ -406,6 +413,21 @@ func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { } } +// getPoolSize determines the size of the HostPool based on the size of the NDF +func getPoolSize(ndfLen, maxSize uint32) (uint32, error) { + // Verify the NDF has at least one Gateway for the HostPool + if ndfLen == 0 { + return 0, errors.Errorf("Unable to create HostPool: no gateways available") + } + + // PoolSize = ceil(sqrt(len(ndf,Gateways))) + poolSize := uint32(math.Ceil(math.Sqrt(float64(ndfLen)))) + if poolSize > maxSize { + return maxSize, nil + } + return poolSize, nil +} + // readUint32 reads an integer from an io.Reader (which should be a CSPRNG) func readUint32(rng io.Reader) uint32 { var rndBytes [4]byte diff --git a/network/gateway/hostpool_test.go b/network/gateway/hostpool_test.go index 6c6feff093a5c29d4475f6417466ffb1b26f433a..59530e69ee34ede156e9b9b8ad694b90bf2ac5b4 100644 --- a/network/gateway/hostpool_test.go +++ b/network/gateway/hostpool_test.go @@ -28,7 +28,7 @@ func TestNewHostPool(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) + params.MaxPoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -64,7 +64,7 @@ func TestHostPool_ManageHostPool(t *testing.T) { // Construct custom params params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) + params.MaxPoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -330,7 +330,7 @@ func TestHostPool_CheckReplace(t *testing.T) { // Construct custom params params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) - 5 + params.MaxPoolSize = uint32(len(testNdf.Gateways)) - 5 // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -509,7 +509,7 @@ func TestHostPool_GetAny(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) + params.MaxPoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -625,7 +625,7 @@ func TestHostPool_UpdateConns_AddGateways(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) + params.MaxPoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -699,7 +699,7 @@ func TestHostPool_UpdateConns_RemoveGateways(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) + params.MaxPoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -770,7 +770,7 @@ func TestHostPool_AddGateway(t *testing.T) { testNdf := getTestNdf(t) newIndex := uint32(20) params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) + params.MaxPoolSize = uint32(len(testNdf.Gateways)) // Construct a manager (bypass business logic in constructor) hostPool := &HostPool{ @@ -803,7 +803,7 @@ func TestHostPool_RemoveGateway(t *testing.T) { testNdf := getTestNdf(t) newIndex := uint32(20) params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) + params.MaxPoolSize = uint32(len(testNdf.Gateways)) // Construct a manager (bypass business logic in constructor) hostPool := &HostPool{ @@ -824,8 +824,7 @@ func TestHostPool_RemoveGateway(t *testing.T) { } // Manually add host information - hostPool.addGateway(gwId, ndfIndex) - + hostPool.addGateway(gwId, ndfIndex) // Call the removal hostPool.removeGateway(gwId) diff --git a/network/gateway/sender_test.go b/network/gateway/sender_test.go index 9ce8155b9f3af7e70ebe4c7054859cae34b1ea5a..73d36ef312a40c9324fdbe1294f504fe366cab78 100644 --- a/network/gateway/sender_test.go +++ b/network/gateway/sender_test.go @@ -26,7 +26,7 @@ func TestNewSender(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) + params.MaxPoolSize = uint32(len(testNdf.Gateways)) _, err := NewSender(params, rng, testNdf, manager, testStorage, addGwChan) if err != nil { @@ -42,7 +42,7 @@ func TestSender_SendToAny(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) + params.MaxPoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -109,7 +109,7 @@ func TestSender_SendToPreferred(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) - 5 + params.MaxPoolSize = uint32(len(testNdf.Gateways)) - 5 // Do not test proxy attempts code in this test // (self contain to code specific in sendPreferred) @@ -195,7 +195,7 @@ func TestSender_SendToSpecific(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) - 5 + params.MaxPoolSize = uint32(len(testNdf.Gateways)) - 5 // Do not test proxy attempts code in this test // (self contain to code specific in sendPreferred) diff --git a/network/manager.go b/network/manager.go index e97a304d38ea561604efb1d9314a6b39e369b2de..89cf3a3a83b7a94eb0d5a6b27458634876801548 100644 --- a/network/manager.go +++ b/network/manager.go @@ -93,7 +93,6 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, // Set up gateway.Sender poolParams := gateway.DefaultPoolParams() - poolParams.PoolSize = 10 m.sender, err = gateway.NewSender(poolParams, rng, ndf, comms, session, m.NodeRegistration) if err != nil { diff --git a/network/message/garbled_test.go b/network/message/garbled_test.go index d82448cdda7440f77352d5d9ad4cf06eeaa3ded7..9d021ae488ed26717aade2d76d87ba688c7f9001 100644 --- a/network/message/garbled_test.go +++ b/network/message/garbled_test.go @@ -59,7 +59,7 @@ func TestManager_CheckGarbledMessages(t *testing.T) { NodeRegistration: nil, } p := gateway.DefaultPoolParams() - p.PoolSize = 1 + p.MaxPoolSize = 1 sender, err := gateway.NewSender(p, i.Rng, getNDF(), &MockSendCMIXComms{t: t}, i.Session, nil) if err != nil { t.Errorf(err.Error()) diff --git a/network/message/sendCmix_test.go b/network/message/sendCmix_test.go index 3d038538add21ce7d86913d4c83277324d3ce787..04da108426dda151bec4777338fe4a0036366832 100644 --- a/network/message/sendCmix_test.go +++ b/network/message/sendCmix_test.go @@ -128,7 +128,7 @@ func Test_attemptSendCmix(t *testing.T) { NodeRegistration: nil, } p := gateway.DefaultPoolParams() - p.PoolSize = 1 + p.MaxPoolSize = 1 sender, err := gateway.NewSender(p, i.Rng, getNDF(), &MockSendCMIXComms{t: t}, i.Session, nil) if err != nil { t.Errorf("%+v", errors.New(err.Error())) diff --git a/network/rounds/retrieve_test.go b/network/rounds/retrieve_test.go index f7529f3eafe5dd47da3e1c66a190dd017b6e5a00..abd79533dd9189cdef93f8f37aba4b8ad810b8d2 100644 --- a/network/rounds/retrieve_test.go +++ b/network/rounds/retrieve_test.go @@ -36,7 +36,7 @@ func TestManager_ProcessMessageRetrieval(t *testing.T) { testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}} p := gateway.DefaultPoolParams() - p.PoolSize = 1 + p.MaxPoolSize = 1 testManager.sender, _ = gateway.NewSender(p, fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), testNdf, mockComms, testManager.Session, nil) @@ -119,7 +119,7 @@ func TestManager_ProcessMessageRetrieval_NoRound(t *testing.T) { // General initializations testManager := newManager(t) p := gateway.DefaultPoolParams() - p.PoolSize = 1 + p.MaxPoolSize = 1 roundId := id.Round(5) mockComms := &mockMessageRetrievalComms{testingSignature: t} testNdf := getNDF() @@ -205,7 +205,7 @@ func TestManager_ProcessMessageRetrieval_FalsePositive(t *testing.T) { testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}} p := gateway.DefaultPoolParams() - p.PoolSize = 1 + p.MaxPoolSize = 1 testManager.sender, _ = gateway.NewSender(p, fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), testNdf, mockComms, testManager.Session, nil) @@ -350,7 +350,7 @@ func TestManager_ProcessMessageRetrieval_MultipleGateways(t *testing.T) { testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}} p := gateway.DefaultPoolParams() - p.PoolSize = 1 + p.MaxPoolSize = 1 testManager.sender, _ = gateway.NewSender(p, fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), testNdf, mockComms, testManager.Session, nil)