Skip to content
Snippets Groups Projects
Commit 61ff6612 authored by Jake Taylor's avatar Jake Taylor
Browse files

removed sendspecific,

improved sendpreferred
parent 778923e4
Branches
Tags
1 merge request!23Release
......@@ -16,8 +16,6 @@ type Messages struct {
MessageReceptionWorkerPoolSize uint
MaxChecksGarbledMessage uint
GarbledMessageWait time.Duration
// Use proxied (rather than direct) message sending
ProxySending bool
}
func GetDefaultMessage() Messages {
......@@ -26,6 +24,5 @@ func GetDefaultMessage() Messages {
MessageReceptionWorkerPoolSize: 4,
MaxChecksGarbledMessage: 10,
GarbledMessageWait: 15 * time.Minute,
ProxySending: false,
}
}
......@@ -14,7 +14,6 @@ import (
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/crypto/shuffle"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/ndf"
......@@ -36,44 +35,6 @@ func NewSender(poolParams PoolParams, rng *fastRNG.StreamGenerator, ndf *ndf.Net
return &Sender{hostPool}, nil
}
// SendToSpecific Call given sendFunc to a specific Host in the HostPool,
// attempting with up to numProxies destinations in case of failure
func (s *Sender) SendToSpecific(target *id.ID,
sendFunc func(host *connect.Host, target *id.ID) (interface{}, bool, error)) (interface{}, error) {
host, ok := s.getSpecific(target)
if ok {
result, didAbort, err := sendFunc(host, target)
if err == nil {
return result, s.forceAdd(target)
} else {
if didAbort {
return nil, errors.WithMessagef(err, "Aborted SendToSpecific gateway %s", host.GetId().String())
}
jww.WARN.Printf("Unable to SendToSpecific %s: %s", host.GetId().String(), err)
}
}
proxies := s.getAny(s.poolParams.ProxyAttempts, []*id.ID{target})
for i := range proxies {
result, didAbort, err := sendFunc(proxies[i], target)
if err == nil {
return result, nil
} else {
if didAbort {
return nil, errors.WithMessagef(err, "Aborted SendToSpecific gateway proxy %s",
host.GetId().String())
}
jww.WARN.Printf("Unable to SendToSpecific proxy %s: %s", proxies[i].GetId().String(), err)
_, err = s.checkReplace(proxies[i].GetId(), err)
if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err)
}
}
}
return nil, errors.Errorf("Unable to send to specific with proxies")
}
// SendToAny Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations
func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error)) (interface{}, error) {
......@@ -96,27 +57,21 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error
// SendToPreferred Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations
func (s *Sender) SendToPreferred(targets []*id.ID,
sendFunc func(host *connect.Host, target *id.ID) (interface{}, error)) (interface{}, error) {
sendFunc func(host *connect.Host, target *id.ID) (interface{}, bool, error)) (interface{}, error) {
// Get the hosts and shuffle randomly
targetHosts := s.getPreferred(targets)
var rndBytes [32]byte
stream := s.rng.GetStream()
_, err := stream.Read(rndBytes[:])
stream.Close()
if err != nil {
return nil, err
}
shuffle.ShuffleSwap(rndBytes[:], len(targetHosts), func(i, j int) {
targetHosts[i], targetHosts[j] = targetHosts[j], targetHosts[i]
})
// Attempt to send directly to targets if they are in the HostPool
for i := range targetHosts {
result, err := sendFunc(targetHosts[i], targets[i])
result, didAbort, err := sendFunc(targetHosts[i], targets[i])
if err == nil {
return result, nil
} else {
if didAbort {
return nil, errors.WithMessagef(err, "Aborted SendToPreferred gateway %s",
targetHosts[i].GetId().String())
}
jww.WARN.Printf("Unable to SendToPreferred %s via %s: %s",
targets[i], targetHosts[i].GetId(), err)
_, err = s.checkReplace(targetHosts[i].GetId(), err)
......@@ -147,10 +102,14 @@ func (s *Sender) SendToPreferred(targets []*id.ID,
continue
}
result, err := sendFunc(targetProxies[proxyIdx], target)
result, didAbort, err := sendFunc(targetProxies[proxyIdx], target)
if err == nil {
return result, nil
} else {
if didAbort {
return nil, errors.WithMessagef(err, "Aborted SendToPreferred gateway proxy %s",
proxy.GetId().String())
}
jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+
"%s: %s", target, proxy.GetId(), err)
wasReplaced, err := s.checkReplace(proxy.GetId(), err)
......
......@@ -187,63 +187,3 @@ func TestSender_SendToPreferred(t *testing.T) {
}
}
func TestSender_SendToSpecific(t *testing.T) {
manager := newMockManager()
rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG)
testNdf := getTestNdf(t)
testStorage := storage.InitTestingSession(t)
addGwChan := make(chan network.NodeGateway)
params := DefaultPoolParams()
params.MaxPoolSize = uint32(len(testNdf.Gateways)) - 5
// Do not test proxy attempts code in this test
// (self contain to code specific in sendPreferred)
params.ProxyAttempts = 0
// Pull all gateways from ndf into host manager
for _, gw := range testNdf.Gateways {
gwId, err := id.Unmarshal(gw.ID)
if err != nil {
t.Fatalf("Failed to unmarshal ID in mock ndf: %v", err)
}
// Add mock gateway to manager
_, err = manager.AddHost(gwId, gw.Address, nil, connect.GetDefaultHostParams())
if err != nil {
t.Fatalf("Could not add mock host to manager: %v", err)
}
}
sender, err := NewSender(params, rng, testNdf, manager, testStorage, addGwChan)
if err != nil {
t.Fatalf("Failed to create mock sender: %v", err)
}
preferredIndex := 0
preferredHost := sender.hostList[preferredIndex]
// Happy path
result, err := sender.SendToSpecific(preferredHost.GetId(), SendToSpecific_HappyPath)
if err != nil {
t.Errorf("Should not error in SendToSpecific happy path: %v", err)
}
if !reflect.DeepEqual(result, happyPathReturn) {
t.Errorf("Expected result not returnev via SendToSpecific interface."+
"\n\tExpected: %v"+
"\n\tReceived: %v", happyPathReturn, result)
}
// Ensure host is now in map
if _, ok := sender.hostMap[*preferredHost.GetId()]; !ok {
t.Errorf("Failed to forcefully add new gateway ID: %v", preferredHost.GetId())
}
_, err = sender.SendToSpecific(preferredHost.GetId(), SendToSpecific_Abort)
if err == nil {
t.Errorf("Expected sendSpecific to return an abort")
}
}
......@@ -129,16 +129,16 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition {
const happyPathReturn = "happyPathReturn"
func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, error) {
return happyPathReturn, nil
func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, bool, error) {
return happyPathReturn, false, nil
}
func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, error) {
return nil, fmt.Errorf(errorsList[0])
func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, bool, error) {
return nil, false, fmt.Errorf(errorsList[0])
}
func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, error) {
return nil, fmt.Errorf("Unexpected error: Oopsie")
func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, bool, error) {
return nil, false, fmt.Errorf("Unexpected error: Oopsie")
}
func SendToAny_HappyPath(host *connect.Host) (interface{}, error) {
......
......@@ -204,12 +204,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID
}
return result, false, err
}
var result interface{}
if messageParams.ProxySending {
result, err = sender.SendToSpecific(firstGateway, sendFunc)
} else {
result, err = sender.SendToSpecific(firstGateway, sendFunc)
}
result, err := sender.SendToPreferred([]*id.ID{firstGateway}, sendFunc)
//if the comm errors or the message fails to send, continue retrying.
//return if it sends properly
......
......@@ -13,6 +13,7 @@ import (
"gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/storage/reception"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/crypto/shuffle"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
......@@ -82,7 +83,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id
comms messageRetrievalComms, gwIds []*id.ID) (message.Bundle, error) {
// Send to the gateways using backup proxies
result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID) (interface{}, error) {
result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID) (interface{}, bool, error) {
jww.DEBUG.Printf("Trying to get messages for round %v for ephemeralID %d (%v) "+
"via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), host.GetId())
......@@ -96,10 +97,10 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id
// If the gateway doesnt have the round, return an error
msgResp, err := comms.RequestMessages(host, msgReq)
if err == nil && !msgResp.GetHasRound() {
return message.Bundle{}, errors.Errorf(noRoundError)
return message.Bundle{}, false, errors.Errorf(noRoundError)
}
return msgResp, err
return msgResp, false, err
})
// Fail the round if an error occurs so it can be tried again later
......
......@@ -37,9 +37,12 @@ func TestManager_ProcessMessageRetrieval(t *testing.T) {
p := gateway.DefaultPoolParams()
p.MaxPoolSize = 1
testManager.sender, _ = gateway.NewSender(p,
fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG),
var err error
testManager.sender, err = gateway.NewSender(p, testManager.Rng,
testNdf, mockComms, testManager.Session, nil)
if err != nil {
t.Errorf(err.Error())
}
// Create a local channel so reception is possible (testManager.messageBundles is
// send only via newManager call above)
......@@ -127,9 +130,10 @@ func TestManager_ProcessMessageRetrieval_NoRound(t *testing.T) {
gwId := nodeId.DeepCopy()
gwId.SetType(id.Gateway)
testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}}
testManager.Rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG)
testManager.sender, _ = gateway.NewSender(p,
fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG),
testManager.Rng,
testNdf, mockComms, testManager.Session, nil)
quitChan := make(chan struct{})
......@@ -203,11 +207,12 @@ func TestManager_ProcessMessageRetrieval_FalsePositive(t *testing.T) {
gwId := nodeId.DeepCopy()
gwId.SetType(id.Gateway)
testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}}
testManager.Rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG)
p := gateway.DefaultPoolParams()
p.MaxPoolSize = 1
testManager.sender, _ = gateway.NewSender(p,
fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG),
testManager.Rng,
testNdf, mockComms, testManager.Session, nil)
// Create a local channel so reception is possible (testManager.messageBundles is
......@@ -348,11 +353,12 @@ func TestManager_ProcessMessageRetrieval_MultipleGateways(t *testing.T) {
gwId := nodeId.DeepCopy()
gwId.SetType(id.Gateway)
testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}}
testManager.Rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG)
p := gateway.DefaultPoolParams()
p.MaxPoolSize = 1
testManager.sender, _ = gateway.NewSender(p,
fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG),
testManager.Rng,
testNdf, mockComms, testManager.Session, nil)
// Create a local channel so reception is possible (testManager.messageBundles is
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment