diff --git a/interfaces/params/message.go b/interfaces/params/message.go index fbf9779829b939145cf7bc1277fa79b5617b826a..acecde4ef9861ab722ff05c7e645272f20d21694 100644 --- a/interfaces/params/message.go +++ b/interfaces/params/message.go @@ -16,6 +16,8 @@ type Messages struct { MessageReceptionWorkerPoolSize uint MaxChecksGarbledMessage uint GarbledMessageWait time.Duration + // Use proxied (rather than direct) message sending + ProxySending bool } func GetDefaultMessage() Messages { @@ -24,5 +26,6 @@ func GetDefaultMessage() Messages { MessageReceptionWorkerPoolSize: 4, MaxChecksGarbledMessage: 10, GarbledMessageWait: 15 * time.Minute, + ProxySending: false, } } diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index 34b381c67a928f0bce6c9e6d14e3fa9c87458644..ba8198da839c4e9d2693d866be165f6afaf08d47 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -225,7 +225,9 @@ func (h *HostPool) getPreferred(targets []*id.ID) []*connect.Host { } // Replaces the given hostId in the HostPool if the given hostErr is in errorList -func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) error { +// Returns whether the host was replaced +func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) (bool, error) { + var err error // Check if Host should be replaced doReplace := false if hostErr != nil { @@ -239,19 +241,17 @@ func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) error { } if doReplace { - h.hostMux.Lock() - defer h.hostMux.Unlock() - // If the Host is still in the pool + h.hostMux.Lock() if oldPoolIndex, ok := h.hostMap[*hostId]; ok { // Replace it h.ndfMux.RLock() - err := h.forceReplace(oldPoolIndex) + err = h.forceReplace(oldPoolIndex) h.ndfMux.RUnlock() - return err } + h.hostMux.Unlock() } - return nil + return doReplace, err } // Replace given Host index with a new, randomly-selected Host from the NDF diff --git a/network/gateway/hostpool_test.go b/network/gateway/hostpool_test.go index 8181e6d49f0ea5522e8ae33eb6ac143516c45a26..49dcacf6b8231647b8c0cbc0ab6e6f84b0c7d126 100644 --- a/network/gateway/hostpool_test.go +++ b/network/gateway/hostpool_test.go @@ -359,10 +359,13 @@ func TestHostPool_CheckReplace(t *testing.T) { oldGatewayIndex := 0 oldHost := testPool.hostList[oldGatewayIndex] expectedError := fmt.Errorf(errorsList[0]) - err = testPool.checkReplace(oldHost.GetId(), expectedError) + wasReplaced, err := testPool.checkReplace(oldHost.GetId(), expectedError) if err != nil { t.Errorf("Failed to check replace: %v", err) } + if !wasReplaced { + t.Errorf("Expected to replace") + } // Ensure that old gateway has been removed from the map if _, ok := testPool.hostMap[*oldHost.GetId()]; ok { @@ -378,10 +381,13 @@ func TestHostPool_CheckReplace(t *testing.T) { goodGatewayIndex := 0 goodGateway := testPool.hostList[goodGatewayIndex] unexpectedErr := fmt.Errorf("not in global error list") - err = testPool.checkReplace(oldHost.GetId(), unexpectedErr) + wasReplaced, err = testPool.checkReplace(oldHost.GetId(), unexpectedErr) if err != nil { t.Errorf("Failed to check replace: %v", err) } + if wasReplaced { + t.Errorf("Expected not to replace") + } // Ensure that gateway with an unexpected error was not modified if _, ok := testPool.hostMap[*goodGateway.GetId()]; !ok { diff --git a/network/gateway/sender.go b/network/gateway/sender.go index dcfcdbb5b88a0be02b67f4f61dfcaa01c1169489..619581cd7c9d4289ef3a88faeff48eff2a67bb6d 100644 --- a/network/gateway/sender.go +++ b/network/gateway/sender.go @@ -14,6 +14,7 @@ import ( "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/elixxir/crypto/shuffle" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" @@ -63,7 +64,7 @@ func (s *Sender) SendToSpecific(target *id.ID, host.GetId().String()) } jww.WARN.Printf("Unable to SendToSpecific proxy %s: %s", proxies[i].GetId().String(), err) - err = s.checkReplace(proxies[i].GetId(), err) + _, err = s.checkReplace(proxies[i].GetId(), err) if err != nil { jww.ERROR.Printf("Unable to checkReplace: %+v", err) } @@ -83,7 +84,7 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error return result, nil } else { jww.WARN.Printf("Unable to SendToAny %s: %s", proxies[i].GetId().String(), err) - err = s.checkReplace(proxies[i].GetId(), err) + _, err = s.checkReplace(proxies[i].GetId(), err) if err != nil { jww.ERROR.Printf("Unable to checkReplace: %+v", err) } @@ -97,7 +98,20 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error func (s *Sender) SendToPreferred(targets []*id.ID, sendFunc func(host *connect.Host, target *id.ID) (interface{}, error)) (interface{}, error) { + // Get the hosts and shuffle randomly targetHosts := s.getPreferred(targets) + var rndBytes [32]byte + stream := s.rng.GetStream() + _, err := stream.Read(rndBytes[:]) + stream.Close() + if err != nil { + return nil, err + } + shuffle.ShuffleSwap(rndBytes[:], len(targetHosts), func(i, j int) { + targetHosts[i], targetHosts[j] = targetHosts[j], targetHosts[i] + }) + + // Attempt to send directly to targets if they are in the HostPool for i := range targetHosts { result, err := sendFunc(targetHosts[i], targets[i]) if err == nil { @@ -105,25 +119,48 @@ func (s *Sender) SendToPreferred(targets []*id.ID, } else { jww.WARN.Printf("Unable to SendToPreferred %s via %s: %s", targets[i], targetHosts[i].GetId(), err) - err = s.checkReplace(targetHosts[i].GetId(), err) + _, err = s.checkReplace(targetHosts[i].GetId(), err) if err != nil { jww.ERROR.Printf("Unable to checkReplace: %+v", err) } } } - proxies := s.getAny(s.poolParams.ProxyAttempts, targets) - for i := range proxies { - target := targets[i%len(targets)].DeepCopy() - result, err := sendFunc(proxies[i], target) - if err == nil { - return result, nil - } else { - jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+ - "%s: %s", target, proxies[i].GetId(), err) - err = s.checkReplace(proxies[i].GetId(), err) - if err != nil { - jww.ERROR.Printf("Unable to checkReplace: %+v", err) + // Build a list of proxies for every target + proxies := make([][]*connect.Host, len(targets)) + for i := 0; i < len(targets); i++ { + proxies[i] = s.getAny(s.poolParams.ProxyAttempts, targets) + } + + // Build a map of bad proxies + badProxies := make(map[string]interface{}) + + // Iterate between each target's list of proxies, using the next target for each proxy + for proxyIdx := uint32(0); proxyIdx < s.poolParams.ProxyAttempts; proxyIdx++ { + for targetIdx := range proxies { + target := targets[targetIdx] + targetProxies := proxies[targetIdx] + proxy := targetProxies[proxyIdx] + + // Skip bad proxies + if _, ok := badProxies[proxy.String()]; ok { + continue + } + + result, err := sendFunc(targetProxies[proxyIdx], target) + if err == nil { + return result, nil + } else { + jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+ + "%s: %s", target, proxy.GetId(), err) + wasReplaced, err := s.checkReplace(proxy.GetId(), err) + if err != nil { + jww.ERROR.Printf("Unable to checkReplace: %+v", err) + } + // If the proxy was replaced, add as a bad proxy + if wasReplaced { + badProxies[proxy.String()] = nil + } } } } diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 2a7095843eb57c6f9af2af16ea9725fab0aac627..b608e8c6e2c62ace780a8ce8391f943ac1712bb1 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -40,7 +40,7 @@ const sendTimeBuffer = 2500 * time.Millisecond // Public manager function to send a message over CMIX func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX) (id.Round, ephemeral.Id, error) { msgCopy := msg.Copy() - return sendCmixHelper(sender, msgCopy, recipient, param, m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, m.Comms) + return sendCmixHelper(sender, msgCopy, recipient, m.param, param, m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, m.Comms) } // Payloads send are not End to End encrypted, MetaData is NOT protected with @@ -51,7 +51,7 @@ func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient // If the message is successfully sent, the id of the round sent it is returned, // which can be registered with the network instance to get a callback on // its status -func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX, instance *network.Instance, +func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID, messageParams params.Messages, cmixParams params.CMIX, instance *network.Instance, session *storage.Session, nodeRegistration chan network.NodeGateway, rng *fastRNG.StreamGenerator, senderId *id.ID, comms sendCmixCommsInterface) (id.Round, ephemeral.Id, error) { @@ -61,13 +61,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID jww.INFO.Printf("Looking for round to send cMix message to %s "+ "(msgDigest: %s)", recipient, msg.Digest()) - for numRoundTries := uint(0); numRoundTries < param.RoundTries; numRoundTries++ { + for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries++ { elapsed := netTime.Now().Sub(timeStart) - if elapsed > param.Timeout { + if elapsed > cmixParams.Timeout { jww.INFO.Printf("No rounds to send to %s (msgDigest: %s) "+ "were found before timeout %s", recipient, msg.Digest(), - param.Timeout) + cmixParams.Timeout) return 0, ephemeral.Id{}, errors.New("Sending cmix message timed out") } if numRoundTries > 0 { @@ -76,7 +76,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID msg.Digest()) } - remainingTime := param.Timeout - elapsed + remainingTime := cmixParams.Timeout - elapsed //find the best round to send to, excluding attempted rounds bestRound, _ := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer) if bestRound == nil { @@ -128,7 +128,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID "(msgDigest: %s) due to missing relationships with nodes: %s", bestRound.ID, recipient, msg.Digest(), missingKeys) go handleMissingNodeKeys(instance, nodeRegistration, missingKeys) - time.Sleep(param.RetryDelay) + time.Sleep(cmixParams.RetryDelay) continue } @@ -175,7 +175,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID encMsg.Digest(), firstGateway.String()) // Send the payload - result, err := sender.SendToSpecific(firstGateway, func(host *connect.Host, target *id.ID) (interface{}, bool, error) { + sendFunc := func(host *connect.Host, target *id.ID) (interface{}, bool, error) { wrappedMsg.Target = target.Marshal() result, err := comms.SendPutMessage(host, wrappedMsg) if err != nil { @@ -203,7 +203,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID } } return result, false, err - }) + } + var result interface{} + if messageParams.ProxySending { + result, err = sender.SendToSpecific(firstGateway, sendFunc) + } else { + result, err = sender.SendToSpecific(firstGateway, sendFunc) + } //if the comm errors or the message fails to send, continue retrying. //return if it sends properly diff --git a/network/message/sendCmix_test.go b/network/message/sendCmix_test.go index 04da108426dda151bec4777338fe4a0036366832..28a2cdbab74a68f12ce6e45814bde8ff521adcf1 100644 --- a/network/message/sendCmix_test.go +++ b/network/message/sendCmix_test.go @@ -143,7 +143,7 @@ func Test_attemptSendCmix(t *testing.T) { msgCmix := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen()) msgCmix.SetContents([]byte("test")) e2e.SetUnencrypted(msgCmix, m.Session.User().GetCryptographicIdentity().GetTransmissionID()) - _, _, err = sendCmixHelper(sender, msgCmix, sess2.GetUser().ReceptionID, params.GetDefaultCMIX(), + _, _, err = sendCmixHelper(sender, msgCmix, sess2.GetUser().ReceptionID, params.GetDefaultMessage(), params.GetDefaultCMIX(), m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, &MockSendCMIXComms{t: t}) if err != nil {