diff --git a/interfaces/params/message.go b/interfaces/params/message.go index acecde4ef9861ab722ff05c7e645272f20d21694..fbf9779829b939145cf7bc1277fa79b5617b826a 100644 --- a/interfaces/params/message.go +++ b/interfaces/params/message.go @@ -16,8 +16,6 @@ type Messages struct { MessageReceptionWorkerPoolSize uint MaxChecksGarbledMessage uint GarbledMessageWait time.Duration - // Use proxied (rather than direct) message sending - ProxySending bool } func GetDefaultMessage() Messages { @@ -26,6 +24,5 @@ func GetDefaultMessage() Messages { MessageReceptionWorkerPoolSize: 4, MaxChecksGarbledMessage: 10, GarbledMessageWait: 15 * time.Minute, - ProxySending: false, } } diff --git a/network/gateway/sender.go b/network/gateway/sender.go index 619581cd7c9d4289ef3a88faeff48eff2a67bb6d..7e07b247353156960992382eb93bbe718ab9851a 100644 --- a/network/gateway/sender.go +++ b/network/gateway/sender.go @@ -14,7 +14,6 @@ import ( "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" - "gitlab.com/elixxir/crypto/shuffle" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" @@ -36,44 +35,6 @@ func NewSender(poolParams PoolParams, rng *fastRNG.StreamGenerator, ndf *ndf.Net return &Sender{hostPool}, nil } -// SendToSpecific Call given sendFunc to a specific Host in the HostPool, -// attempting with up to numProxies destinations in case of failure -func (s *Sender) SendToSpecific(target *id.ID, - sendFunc func(host *connect.Host, target *id.ID) (interface{}, bool, error)) (interface{}, error) { - host, ok := s.getSpecific(target) - if ok { - result, didAbort, err := sendFunc(host, target) - if err == nil { - return result, s.forceAdd(target) - } else { - if didAbort { - return nil, errors.WithMessagef(err, "Aborted SendToSpecific gateway %s", host.GetId().String()) - } - jww.WARN.Printf("Unable to SendToSpecific %s: %s", host.GetId().String(), err) - } - } - - proxies := s.getAny(s.poolParams.ProxyAttempts, []*id.ID{target}) - for i := range proxies { - result, didAbort, err := sendFunc(proxies[i], target) - if err == nil { - return result, nil - } else { - if didAbort { - return nil, errors.WithMessagef(err, "Aborted SendToSpecific gateway proxy %s", - host.GetId().String()) - } - jww.WARN.Printf("Unable to SendToSpecific proxy %s: %s", proxies[i].GetId().String(), err) - _, err = s.checkReplace(proxies[i].GetId(), err) - if err != nil { - jww.ERROR.Printf("Unable to checkReplace: %+v", err) - } - } - } - - return nil, errors.Errorf("Unable to send to specific with proxies") -} - // SendToAny Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error)) (interface{}, error) { @@ -96,27 +57,21 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error // SendToPreferred Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations func (s *Sender) SendToPreferred(targets []*id.ID, - sendFunc func(host *connect.Host, target *id.ID) (interface{}, error)) (interface{}, error) { + sendFunc func(host *connect.Host, target *id.ID) (interface{}, bool, error)) (interface{}, error) { // Get the hosts and shuffle randomly targetHosts := s.getPreferred(targets) - var rndBytes [32]byte - stream := s.rng.GetStream() - _, err := stream.Read(rndBytes[:]) - stream.Close() - if err != nil { - return nil, err - } - shuffle.ShuffleSwap(rndBytes[:], len(targetHosts), func(i, j int) { - targetHosts[i], targetHosts[j] = targetHosts[j], targetHosts[i] - }) // Attempt to send directly to targets if they are in the HostPool for i := range targetHosts { - result, err := sendFunc(targetHosts[i], targets[i]) + result, didAbort, err := sendFunc(targetHosts[i], targets[i]) if err == nil { return result, nil } else { + if didAbort { + return nil, errors.WithMessagef(err, "Aborted SendToPreferred gateway %s", + targetHosts[i].GetId().String()) + } jww.WARN.Printf("Unable to SendToPreferred %s via %s: %s", targets[i], targetHosts[i].GetId(), err) _, err = s.checkReplace(targetHosts[i].GetId(), err) @@ -147,10 +102,14 @@ func (s *Sender) SendToPreferred(targets []*id.ID, continue } - result, err := sendFunc(targetProxies[proxyIdx], target) + result, didAbort, err := sendFunc(targetProxies[proxyIdx], target) if err == nil { return result, nil } else { + if didAbort { + return nil, errors.WithMessagef(err, "Aborted SendToPreferred gateway proxy %s", + proxy.GetId().String()) + } jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+ "%s: %s", target, proxy.GetId(), err) wasReplaced, err := s.checkReplace(proxy.GetId(), err) diff --git a/network/gateway/sender_test.go b/network/gateway/sender_test.go index 4dd5d49c02ea525e547164f8c2c6392b526b30de..35868f1d6c32f83d28cfb3c534d1877cc94b53a4 100644 --- a/network/gateway/sender_test.go +++ b/network/gateway/sender_test.go @@ -187,63 +187,3 @@ func TestSender_SendToPreferred(t *testing.T) { } } - -func TestSender_SendToSpecific(t *testing.T) { - manager := newMockManager() - rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) - testNdf := getTestNdf(t) - testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - params := DefaultPoolParams() - params.MaxPoolSize = uint32(len(testNdf.Gateways)) - 5 - - // Do not test proxy attempts code in this test - // (self contain to code specific in sendPreferred) - params.ProxyAttempts = 0 - - // Pull all gateways from ndf into host manager - for _, gw := range testNdf.Gateways { - - gwId, err := id.Unmarshal(gw.ID) - if err != nil { - t.Fatalf("Failed to unmarshal ID in mock ndf: %v", err) - } - // Add mock gateway to manager - _, err = manager.AddHost(gwId, gw.Address, nil, connect.GetDefaultHostParams()) - if err != nil { - t.Fatalf("Could not add mock host to manager: %v", err) - } - - } - - sender, err := NewSender(params, rng, testNdf, manager, testStorage, addGwChan) - if err != nil { - t.Fatalf("Failed to create mock sender: %v", err) - } - - preferredIndex := 0 - preferredHost := sender.hostList[preferredIndex] - - // Happy path - result, err := sender.SendToSpecific(preferredHost.GetId(), SendToSpecific_HappyPath) - if err != nil { - t.Errorf("Should not error in SendToSpecific happy path: %v", err) - } - - if !reflect.DeepEqual(result, happyPathReturn) { - t.Errorf("Expected result not returnev via SendToSpecific interface."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", happyPathReturn, result) - } - - // Ensure host is now in map - if _, ok := sender.hostMap[*preferredHost.GetId()]; !ok { - t.Errorf("Failed to forcefully add new gateway ID: %v", preferredHost.GetId()) - } - - _, err = sender.SendToSpecific(preferredHost.GetId(), SendToSpecific_Abort) - if err == nil { - t.Errorf("Expected sendSpecific to return an abort") - } - -} diff --git a/network/gateway/utils_test.go b/network/gateway/utils_test.go index 0ec7dc11f8edde72a82affd3b718bcec121bf60c..9f75ace1429e947ef7c8a399f3f088e2583777de 100644 --- a/network/gateway/utils_test.go +++ b/network/gateway/utils_test.go @@ -129,16 +129,16 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition { const happyPathReturn = "happyPathReturn" -func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, error) { - return happyPathReturn, nil +func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, bool, error) { + return happyPathReturn, false, nil } -func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, error) { - return nil, fmt.Errorf(errorsList[0]) +func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, bool, error) { + return nil, false, fmt.Errorf(errorsList[0]) } -func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, error) { - return nil, fmt.Errorf("Unexpected error: Oopsie") +func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, bool, error) { + return nil, false, fmt.Errorf("Unexpected error: Oopsie") } func SendToAny_HappyPath(host *connect.Host) (interface{}, error) { diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 2cef107c87f4a35a6534137ba2ce4d19b69f63a7..2263b903f2d05491ede47501802d7e3e3fab5135 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -204,12 +204,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID } return result, false, err } - var result interface{} - if messageParams.ProxySending { - result, err = sender.SendToSpecific(firstGateway, sendFunc) - } else { - result, err = sender.SendToSpecific(firstGateway, sendFunc) - } + result, err := sender.SendToPreferred([]*id.ID{firstGateway}, sendFunc) //if the comm errors or the message fails to send, continue retrying. //return if it sends properly diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index 0d90355c64bb2adaff003442737e09468a5b05f8..d076c5da80f31d6a6eaea019116cd40e96085d6f 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -13,6 +13,7 @@ import ( "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/storage/reception" pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/crypto/shuffle" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" @@ -82,7 +83,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id comms messageRetrievalComms, gwIds []*id.ID) (message.Bundle, error) { // Send to the gateways using backup proxies - result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID) (interface{}, error) { + result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID) (interface{}, bool, error) { jww.DEBUG.Printf("Trying to get messages for round %v for ephemeralID %d (%v) "+ "via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), host.GetId()) @@ -96,10 +97,10 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id // If the gateway doesnt have the round, return an error msgResp, err := comms.RequestMessages(host, msgReq) if err == nil && !msgResp.GetHasRound() { - return message.Bundle{}, errors.Errorf(noRoundError) + return message.Bundle{}, false, errors.Errorf(noRoundError) } - return msgResp, err + return msgResp, false, err }) // Fail the round if an error occurs so it can be tried again later diff --git a/network/rounds/retrieve_test.go b/network/rounds/retrieve_test.go index abd79533dd9189cdef93f8f37aba4b8ad810b8d2..2ca1e98c264f7b8ddd58696c70156bdc4fa40748 100644 --- a/network/rounds/retrieve_test.go +++ b/network/rounds/retrieve_test.go @@ -37,9 +37,12 @@ func TestManager_ProcessMessageRetrieval(t *testing.T) { p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 - testManager.sender, _ = gateway.NewSender(p, - fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), + var err error + testManager.sender, err = gateway.NewSender(p, testManager.Rng, testNdf, mockComms, testManager.Session, nil) + if err != nil { + t.Errorf(err.Error()) + } // Create a local channel so reception is possible (testManager.messageBundles is // send only via newManager call above) @@ -127,9 +130,10 @@ func TestManager_ProcessMessageRetrieval_NoRound(t *testing.T) { gwId := nodeId.DeepCopy() gwId.SetType(id.Gateway) testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}} + testManager.Rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) testManager.sender, _ = gateway.NewSender(p, - fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), + testManager.Rng, testNdf, mockComms, testManager.Session, nil) quitChan := make(chan struct{}) @@ -203,11 +207,12 @@ func TestManager_ProcessMessageRetrieval_FalsePositive(t *testing.T) { gwId := nodeId.DeepCopy() gwId.SetType(id.Gateway) testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}} + testManager.Rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 testManager.sender, _ = gateway.NewSender(p, - fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), + testManager.Rng, testNdf, mockComms, testManager.Session, nil) // Create a local channel so reception is possible (testManager.messageBundles is @@ -348,11 +353,12 @@ func TestManager_ProcessMessageRetrieval_MultipleGateways(t *testing.T) { gwId := nodeId.DeepCopy() gwId.SetType(id.Gateway) testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}} + testManager.Rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 testManager.sender, _ = gateway.NewSender(p, - fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), + testManager.Rng, testNdf, mockComms, testManager.Session, nil) // Create a local channel so reception is possible (testManager.messageBundles is