Skip to content
Snippets Groups Projects
Commit c52b99fa authored by Jake Taylor's avatar Jake Taylor :lips:
Browse files

Merge branch 'XX-3273/SendPreferred' into 'release'

Resolve XX-3273 "/sendpreferred"

Closes XX-3273

See merge request !615
parents ad3714b7 df0fd4c1
No related branches found
No related tags found
1 merge request!23Release
...@@ -16,6 +16,8 @@ type Messages struct { ...@@ -16,6 +16,8 @@ type Messages struct {
MessageReceptionWorkerPoolSize uint MessageReceptionWorkerPoolSize uint
MaxChecksGarbledMessage uint MaxChecksGarbledMessage uint
GarbledMessageWait time.Duration GarbledMessageWait time.Duration
// Use proxied (rather than direct) message sending
ProxySending bool
} }
func GetDefaultMessage() Messages { func GetDefaultMessage() Messages {
...@@ -24,5 +26,6 @@ func GetDefaultMessage() Messages { ...@@ -24,5 +26,6 @@ func GetDefaultMessage() Messages {
MessageReceptionWorkerPoolSize: 4, MessageReceptionWorkerPoolSize: 4,
MaxChecksGarbledMessage: 10, MaxChecksGarbledMessage: 10,
GarbledMessageWait: 15 * time.Minute, GarbledMessageWait: 15 * time.Minute,
ProxySending: false,
} }
} }
...@@ -225,7 +225,9 @@ func (h *HostPool) getPreferred(targets []*id.ID) []*connect.Host { ...@@ -225,7 +225,9 @@ func (h *HostPool) getPreferred(targets []*id.ID) []*connect.Host {
} }
// Replaces the given hostId in the HostPool if the given hostErr is in errorList // Replaces the given hostId in the HostPool if the given hostErr is in errorList
func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) error { // Returns whether the host was replaced
func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) (bool, error) {
var err error
// Check if Host should be replaced // Check if Host should be replaced
doReplace := false doReplace := false
if hostErr != nil { if hostErr != nil {
...@@ -239,19 +241,17 @@ func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) error { ...@@ -239,19 +241,17 @@ func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) error {
} }
if doReplace { if doReplace {
h.hostMux.Lock()
defer h.hostMux.Unlock()
// If the Host is still in the pool // If the Host is still in the pool
h.hostMux.Lock()
if oldPoolIndex, ok := h.hostMap[*hostId]; ok { if oldPoolIndex, ok := h.hostMap[*hostId]; ok {
// Replace it // Replace it
h.ndfMux.RLock() h.ndfMux.RLock()
err := h.forceReplace(oldPoolIndex) err = h.forceReplace(oldPoolIndex)
h.ndfMux.RUnlock() h.ndfMux.RUnlock()
return err
} }
h.hostMux.Unlock()
} }
return nil return doReplace, err
} }
// Replace given Host index with a new, randomly-selected Host from the NDF // Replace given Host index with a new, randomly-selected Host from the NDF
......
...@@ -359,10 +359,13 @@ func TestHostPool_CheckReplace(t *testing.T) { ...@@ -359,10 +359,13 @@ func TestHostPool_CheckReplace(t *testing.T) {
oldGatewayIndex := 0 oldGatewayIndex := 0
oldHost := testPool.hostList[oldGatewayIndex] oldHost := testPool.hostList[oldGatewayIndex]
expectedError := fmt.Errorf(errorsList[0]) expectedError := fmt.Errorf(errorsList[0])
err = testPool.checkReplace(oldHost.GetId(), expectedError) wasReplaced, err := testPool.checkReplace(oldHost.GetId(), expectedError)
if err != nil { if err != nil {
t.Errorf("Failed to check replace: %v", err) t.Errorf("Failed to check replace: %v", err)
} }
if !wasReplaced {
t.Errorf("Expected to replace")
}
// Ensure that old gateway has been removed from the map // Ensure that old gateway has been removed from the map
if _, ok := testPool.hostMap[*oldHost.GetId()]; ok { if _, ok := testPool.hostMap[*oldHost.GetId()]; ok {
...@@ -378,10 +381,13 @@ func TestHostPool_CheckReplace(t *testing.T) { ...@@ -378,10 +381,13 @@ func TestHostPool_CheckReplace(t *testing.T) {
goodGatewayIndex := 0 goodGatewayIndex := 0
goodGateway := testPool.hostList[goodGatewayIndex] goodGateway := testPool.hostList[goodGatewayIndex]
unexpectedErr := fmt.Errorf("not in global error list") unexpectedErr := fmt.Errorf("not in global error list")
err = testPool.checkReplace(oldHost.GetId(), unexpectedErr) wasReplaced, err = testPool.checkReplace(oldHost.GetId(), unexpectedErr)
if err != nil { if err != nil {
t.Errorf("Failed to check replace: %v", err) t.Errorf("Failed to check replace: %v", err)
} }
if wasReplaced {
t.Errorf("Expected not to replace")
}
// Ensure that gateway with an unexpected error was not modified // Ensure that gateway with an unexpected error was not modified
if _, ok := testPool.hostMap[*goodGateway.GetId()]; !ok { if _, ok := testPool.hostMap[*goodGateway.GetId()]; !ok {
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/crypto/shuffle"
"gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/ndf"
...@@ -63,7 +64,7 @@ func (s *Sender) SendToSpecific(target *id.ID, ...@@ -63,7 +64,7 @@ func (s *Sender) SendToSpecific(target *id.ID,
host.GetId().String()) host.GetId().String())
} }
jww.WARN.Printf("Unable to SendToSpecific proxy %s: %s", proxies[i].GetId().String(), err) jww.WARN.Printf("Unable to SendToSpecific proxy %s: %s", proxies[i].GetId().String(), err)
err = s.checkReplace(proxies[i].GetId(), err) _, err = s.checkReplace(proxies[i].GetId(), err)
if err != nil { if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err) jww.ERROR.Printf("Unable to checkReplace: %+v", err)
} }
...@@ -83,7 +84,7 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error ...@@ -83,7 +84,7 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error
return result, nil return result, nil
} else { } else {
jww.WARN.Printf("Unable to SendToAny %s: %s", proxies[i].GetId().String(), err) jww.WARN.Printf("Unable to SendToAny %s: %s", proxies[i].GetId().String(), err)
err = s.checkReplace(proxies[i].GetId(), err) _, err = s.checkReplace(proxies[i].GetId(), err)
if err != nil { if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err) jww.ERROR.Printf("Unable to checkReplace: %+v", err)
} }
...@@ -97,7 +98,20 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error ...@@ -97,7 +98,20 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error
func (s *Sender) SendToPreferred(targets []*id.ID, func (s *Sender) SendToPreferred(targets []*id.ID,
sendFunc func(host *connect.Host, target *id.ID) (interface{}, error)) (interface{}, error) { sendFunc func(host *connect.Host, target *id.ID) (interface{}, error)) (interface{}, error) {
// Get the hosts and shuffle randomly
targetHosts := s.getPreferred(targets) targetHosts := s.getPreferred(targets)
var rndBytes [32]byte
stream := s.rng.GetStream()
_, err := stream.Read(rndBytes[:])
stream.Close()
if err != nil {
return nil, err
}
shuffle.ShuffleSwap(rndBytes[:], len(targetHosts), func(i, j int) {
targetHosts[i], targetHosts[j] = targetHosts[j], targetHosts[i]
})
// Attempt to send directly to targets if they are in the HostPool
for i := range targetHosts { for i := range targetHosts {
result, err := sendFunc(targetHosts[i], targets[i]) result, err := sendFunc(targetHosts[i], targets[i])
if err == nil { if err == nil {
...@@ -105,26 +119,49 @@ func (s *Sender) SendToPreferred(targets []*id.ID, ...@@ -105,26 +119,49 @@ func (s *Sender) SendToPreferred(targets []*id.ID,
} else { } else {
jww.WARN.Printf("Unable to SendToPreferred %s via %s: %s", jww.WARN.Printf("Unable to SendToPreferred %s via %s: %s",
targets[i], targetHosts[i].GetId(), err) targets[i], targetHosts[i].GetId(), err)
err = s.checkReplace(targetHosts[i].GetId(), err) _, err = s.checkReplace(targetHosts[i].GetId(), err)
if err != nil { if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err) jww.ERROR.Printf("Unable to checkReplace: %+v", err)
} }
} }
} }
proxies := s.getAny(s.poolParams.ProxyAttempts, targets) // Build a list of proxies for every target
for i := range proxies { proxies := make([][]*connect.Host, len(targets))
target := targets[i%len(targets)].DeepCopy() for i := 0; i < len(targets); i++ {
result, err := sendFunc(proxies[i], target) proxies[i] = s.getAny(s.poolParams.ProxyAttempts, targets)
}
// Build a map of bad proxies
badProxies := make(map[string]interface{})
// Iterate between each target's list of proxies, using the next target for each proxy
for proxyIdx := uint32(0); proxyIdx < s.poolParams.ProxyAttempts; proxyIdx++ {
for targetIdx := range proxies {
target := targets[targetIdx]
targetProxies := proxies[targetIdx]
proxy := targetProxies[proxyIdx]
// Skip bad proxies
if _, ok := badProxies[proxy.String()]; ok {
continue
}
result, err := sendFunc(targetProxies[proxyIdx], target)
if err == nil { if err == nil {
return result, nil return result, nil
} else { } else {
jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+ jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+
"%s: %s", target, proxies[i].GetId(), err) "%s: %s", target, proxy.GetId(), err)
err = s.checkReplace(proxies[i].GetId(), err) wasReplaced, err := s.checkReplace(proxy.GetId(), err)
if err != nil { if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err) jww.ERROR.Printf("Unable to checkReplace: %+v", err)
} }
// If the proxy was replaced, add as a bad proxy
if wasReplaced {
badProxies[proxy.String()] = nil
}
}
} }
} }
......
...@@ -40,7 +40,7 @@ const sendTimeBuffer = 2500 * time.Millisecond ...@@ -40,7 +40,7 @@ const sendTimeBuffer = 2500 * time.Millisecond
// Public manager function to send a message over CMIX // Public manager function to send a message over CMIX
func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX) (id.Round, ephemeral.Id, error) { func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX) (id.Round, ephemeral.Id, error) {
msgCopy := msg.Copy() msgCopy := msg.Copy()
return sendCmixHelper(sender, msgCopy, recipient, param, m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, m.Comms) return sendCmixHelper(sender, msgCopy, recipient, m.param, param, m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, m.Comms)
} }
// Payloads send are not End to End encrypted, MetaData is NOT protected with // Payloads send are not End to End encrypted, MetaData is NOT protected with
...@@ -51,7 +51,7 @@ func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient ...@@ -51,7 +51,7 @@ func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient
// If the message is successfully sent, the id of the round sent it is returned, // If the message is successfully sent, the id of the round sent it is returned,
// which can be registered with the network instance to get a callback on // which can be registered with the network instance to get a callback on
// its status // its status
func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX, instance *network.Instance, func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID, messageParams params.Messages, cmixParams params.CMIX, instance *network.Instance,
session *storage.Session, nodeRegistration chan network.NodeGateway, rng *fastRNG.StreamGenerator, senderId *id.ID, session *storage.Session, nodeRegistration chan network.NodeGateway, rng *fastRNG.StreamGenerator, senderId *id.ID,
comms sendCmixCommsInterface) (id.Round, ephemeral.Id, error) { comms sendCmixCommsInterface) (id.Round, ephemeral.Id, error) {
...@@ -61,13 +61,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID ...@@ -61,13 +61,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID
jww.INFO.Printf("Looking for round to send cMix message to %s "+ jww.INFO.Printf("Looking for round to send cMix message to %s "+
"(msgDigest: %s)", recipient, msg.Digest()) "(msgDigest: %s)", recipient, msg.Digest())
for numRoundTries := uint(0); numRoundTries < param.RoundTries; numRoundTries++ { for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries++ {
elapsed := netTime.Now().Sub(timeStart) elapsed := netTime.Now().Sub(timeStart)
if elapsed > param.Timeout { if elapsed > cmixParams.Timeout {
jww.INFO.Printf("No rounds to send to %s (msgDigest: %s) "+ jww.INFO.Printf("No rounds to send to %s (msgDigest: %s) "+
"were found before timeout %s", recipient, msg.Digest(), "were found before timeout %s", recipient, msg.Digest(),
param.Timeout) cmixParams.Timeout)
return 0, ephemeral.Id{}, errors.New("Sending cmix message timed out") return 0, ephemeral.Id{}, errors.New("Sending cmix message timed out")
} }
if numRoundTries > 0 { if numRoundTries > 0 {
...@@ -76,7 +76,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID ...@@ -76,7 +76,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID
msg.Digest()) msg.Digest())
} }
remainingTime := param.Timeout - elapsed remainingTime := cmixParams.Timeout - elapsed
//find the best round to send to, excluding attempted rounds //find the best round to send to, excluding attempted rounds
bestRound, _ := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer) bestRound, _ := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer)
if bestRound == nil { if bestRound == nil {
...@@ -128,7 +128,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID ...@@ -128,7 +128,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID
"(msgDigest: %s) due to missing relationships with nodes: %s", "(msgDigest: %s) due to missing relationships with nodes: %s",
bestRound.ID, recipient, msg.Digest(), missingKeys) bestRound.ID, recipient, msg.Digest(), missingKeys)
go handleMissingNodeKeys(instance, nodeRegistration, missingKeys) go handleMissingNodeKeys(instance, nodeRegistration, missingKeys)
time.Sleep(param.RetryDelay) time.Sleep(cmixParams.RetryDelay)
continue continue
} }
...@@ -175,7 +175,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID ...@@ -175,7 +175,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID
encMsg.Digest(), firstGateway.String()) encMsg.Digest(), firstGateway.String())
// Send the payload // Send the payload
result, err := sender.SendToSpecific(firstGateway, func(host *connect.Host, target *id.ID) (interface{}, bool, error) { sendFunc := func(host *connect.Host, target *id.ID) (interface{}, bool, error) {
wrappedMsg.Target = target.Marshal() wrappedMsg.Target = target.Marshal()
result, err := comms.SendPutMessage(host, wrappedMsg) result, err := comms.SendPutMessage(host, wrappedMsg)
if err != nil { if err != nil {
...@@ -203,7 +203,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID ...@@ -203,7 +203,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID
} }
} }
return result, false, err return result, false, err
}) }
var result interface{}
if messageParams.ProxySending {
result, err = sender.SendToSpecific(firstGateway, sendFunc)
} else {
result, err = sender.SendToSpecific(firstGateway, sendFunc)
}
//if the comm errors or the message fails to send, continue retrying. //if the comm errors or the message fails to send, continue retrying.
//return if it sends properly //return if it sends properly
......
...@@ -143,7 +143,7 @@ func Test_attemptSendCmix(t *testing.T) { ...@@ -143,7 +143,7 @@ func Test_attemptSendCmix(t *testing.T) {
msgCmix := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen()) msgCmix := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen())
msgCmix.SetContents([]byte("test")) msgCmix.SetContents([]byte("test"))
e2e.SetUnencrypted(msgCmix, m.Session.User().GetCryptographicIdentity().GetTransmissionID()) e2e.SetUnencrypted(msgCmix, m.Session.User().GetCryptographicIdentity().GetTransmissionID())
_, _, err = sendCmixHelper(sender, msgCmix, sess2.GetUser().ReceptionID, params.GetDefaultCMIX(), _, _, err = sendCmixHelper(sender, msgCmix, sess2.GetUser().ReceptionID, params.GetDefaultMessage(), params.GetDefaultCMIX(),
m.Instance, m.Session, m.nodeRegistration, m.Rng, m.Instance, m.Session, m.nodeRegistration, m.Rng,
m.TransmissionID, &MockSendCMIXComms{t: t}) m.TransmissionID, &MockSendCMIXComms{t: t})
if err != nil { if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment