Skip to content
Snippets Groups Projects
Commit 83c7c554 authored by Jake Taylor's avatar Jake Taylor
Browse files

send to preferred now will try more proxies than before

parent b6f474ae
No related branches found
No related tags found
1 merge request!23Release
...@@ -223,7 +223,9 @@ func (h *HostPool) getPreferred(targets []*id.ID) []*connect.Host { ...@@ -223,7 +223,9 @@ func (h *HostPool) getPreferred(targets []*id.ID) []*connect.Host {
} }
// Replaces the given hostId in the HostPool if the given hostErr is in errorList // Replaces the given hostId in the HostPool if the given hostErr is in errorList
func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) error { // Returns whether the host was replaced
func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) (bool, error) {
var err error
// Check if Host should be replaced // Check if Host should be replaced
doReplace := false doReplace := false
if hostErr != nil { if hostErr != nil {
...@@ -237,19 +239,17 @@ func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) error { ...@@ -237,19 +239,17 @@ func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) error {
} }
if doReplace { if doReplace {
h.hostMux.Lock()
defer h.hostMux.Unlock()
// If the Host is still in the pool // If the Host is still in the pool
h.hostMux.Lock()
if oldPoolIndex, ok := h.hostMap[*hostId]; ok { if oldPoolIndex, ok := h.hostMap[*hostId]; ok {
// Replace it // Replace it
h.ndfMux.RLock() h.ndfMux.RLock()
err := h.forceReplace(oldPoolIndex) err = h.forceReplace(oldPoolIndex)
h.ndfMux.RUnlock() h.ndfMux.RUnlock()
return err
} }
h.hostMux.Unlock()
} }
return nil return doReplace, err
} }
// Replace given Host index with a new, randomly-selected Host from the NDF // Replace given Host index with a new, randomly-selected Host from the NDF
......
...@@ -359,10 +359,13 @@ func TestHostPool_CheckReplace(t *testing.T) { ...@@ -359,10 +359,13 @@ func TestHostPool_CheckReplace(t *testing.T) {
oldGatewayIndex := 0 oldGatewayIndex := 0
oldHost := testPool.hostList[oldGatewayIndex] oldHost := testPool.hostList[oldGatewayIndex]
expectedError := fmt.Errorf(errorsList[0]) expectedError := fmt.Errorf(errorsList[0])
err = testPool.checkReplace(oldHost.GetId(), expectedError) wasReplaced, err := testPool.checkReplace(oldHost.GetId(), expectedError)
if err != nil { if err != nil {
t.Errorf("Failed to check replace: %v", err) t.Errorf("Failed to check replace: %v", err)
} }
if !wasReplaced {
t.Errorf("Expected to replace")
}
// Ensure that old gateway has been removed from the map // Ensure that old gateway has been removed from the map
if _, ok := testPool.hostMap[*oldHost.GetId()]; ok { if _, ok := testPool.hostMap[*oldHost.GetId()]; ok {
...@@ -378,10 +381,13 @@ func TestHostPool_CheckReplace(t *testing.T) { ...@@ -378,10 +381,13 @@ func TestHostPool_CheckReplace(t *testing.T) {
goodGatewayIndex := 0 goodGatewayIndex := 0
goodGateway := testPool.hostList[goodGatewayIndex] goodGateway := testPool.hostList[goodGatewayIndex]
unexpectedErr := fmt.Errorf("not in global error list") unexpectedErr := fmt.Errorf("not in global error list")
err = testPool.checkReplace(oldHost.GetId(), unexpectedErr) wasReplaced, err = testPool.checkReplace(oldHost.GetId(), unexpectedErr)
if err != nil { if err != nil {
t.Errorf("Failed to check replace: %v", err) t.Errorf("Failed to check replace: %v", err)
} }
if wasReplaced {
t.Errorf("Expected not to replace")
}
// Ensure that gateway with an unexpected error was not modified // Ensure that gateway with an unexpected error was not modified
if _, ok := testPool.hostMap[*goodGateway.GetId()]; !ok { if _, ok := testPool.hostMap[*goodGateway.GetId()]; !ok {
......
...@@ -63,7 +63,7 @@ func (s *Sender) SendToSpecific(target *id.ID, ...@@ -63,7 +63,7 @@ func (s *Sender) SendToSpecific(target *id.ID,
host.GetId().String()) host.GetId().String())
} }
jww.WARN.Printf("Unable to SendToSpecific proxy %s: %s", proxies[i].GetId().String(), err) jww.WARN.Printf("Unable to SendToSpecific proxy %s: %s", proxies[i].GetId().String(), err)
err = s.checkReplace(proxies[i].GetId(), err) _, err = s.checkReplace(proxies[i].GetId(), err)
if err != nil { if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err) jww.ERROR.Printf("Unable to checkReplace: %+v", err)
} }
...@@ -83,7 +83,7 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error ...@@ -83,7 +83,7 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error
return result, nil return result, nil
} else { } else {
jww.WARN.Printf("Unable to SendToAny %s: %s", proxies[i].GetId().String(), err) jww.WARN.Printf("Unable to SendToAny %s: %s", proxies[i].GetId().String(), err)
err = s.checkReplace(proxies[i].GetId(), err) _, err = s.checkReplace(proxies[i].GetId(), err)
if err != nil { if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err) jww.ERROR.Printf("Unable to checkReplace: %+v", err)
} }
...@@ -97,6 +97,7 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error ...@@ -97,6 +97,7 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error
func (s *Sender) SendToPreferred(targets []*id.ID, func (s *Sender) SendToPreferred(targets []*id.ID,
sendFunc func(host *connect.Host, target *id.ID) (interface{}, error)) (interface{}, error) { sendFunc func(host *connect.Host, target *id.ID) (interface{}, error)) (interface{}, error) {
// Attempt to send directly to targets if they are in the HostPool
targetHosts := s.getPreferred(targets) targetHosts := s.getPreferred(targets)
for i := range targetHosts { for i := range targetHosts {
result, err := sendFunc(targetHosts[i], targets[i]) result, err := sendFunc(targetHosts[i], targets[i])
...@@ -105,26 +106,49 @@ func (s *Sender) SendToPreferred(targets []*id.ID, ...@@ -105,26 +106,49 @@ func (s *Sender) SendToPreferred(targets []*id.ID,
} else { } else {
jww.WARN.Printf("Unable to SendToPreferred %s via %s: %s", jww.WARN.Printf("Unable to SendToPreferred %s via %s: %s",
targets[i], targetHosts[i].GetId(), err) targets[i], targetHosts[i].GetId(), err)
err = s.checkReplace(targetHosts[i].GetId(), err) _, err = s.checkReplace(targetHosts[i].GetId(), err)
if err != nil { if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err) jww.ERROR.Printf("Unable to checkReplace: %+v", err)
} }
} }
} }
proxies := s.getAny(s.poolParams.ProxyAttempts, targets) // Build a list of proxies for every target
for i := range proxies { proxies := make([][]*connect.Host, len(targets))
target := targets[i%len(targets)].DeepCopy() for i := 0; i < len(targets); i++ {
result, err := sendFunc(proxies[i], target) proxies[i] = s.getAny(s.poolParams.ProxyAttempts, targets)
}
// Build a map of bad proxies
badProxies := make(map[string]interface{})
// Iterate between each target's list of proxies, using the next target for each proxy
for proxyIdx := uint32(0); proxyIdx < s.poolParams.ProxyAttempts; proxyIdx++ {
for targetIdx := range proxies {
target := targets[targetIdx]
targetProxies := proxies[targetIdx]
proxy := targetProxies[proxyIdx]
// Skip bad proxies
if _, ok := badProxies[proxy.String()]; ok {
continue
}
result, err := sendFunc(targetProxies[proxyIdx], target)
if err == nil { if err == nil {
return result, nil return result, nil
} else { } else {
jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+ jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+
"%s: %s", target, proxies[i].GetId(), err) "%s: %s", target, proxy.GetId(), err)
err = s.checkReplace(proxies[i].GetId(), err) wasReplaced, err := s.checkReplace(proxy.GetId(), err)
if err != nil { if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err) jww.ERROR.Printf("Unable to checkReplace: %+v", err)
} }
// If the proxy was replaced, add as a bad proxy
if wasReplaced {
badProxies[proxy.String()] = nil
}
}
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment