diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index f165ce5a9424a37e08e2442bdafc884959a2472b..aa8a47c45d7673debae46937afcdf6f34df506a2 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -223,7 +223,9 @@ func (h *HostPool) getPreferred(targets []*id.ID) []*connect.Host { } // Replaces the given hostId in the HostPool if the given hostErr is in errorList -func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) error { +// Returns whether the host was replaced +func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) (bool, error) { + var err error // Check if Host should be replaced doReplace := false if hostErr != nil { @@ -237,19 +239,17 @@ func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) error { } if doReplace { - h.hostMux.Lock() - defer h.hostMux.Unlock() - // If the Host is still in the pool + h.hostMux.Lock() if oldPoolIndex, ok := h.hostMap[*hostId]; ok { // Replace it h.ndfMux.RLock() - err := h.forceReplace(oldPoolIndex) + err = h.forceReplace(oldPoolIndex) h.ndfMux.RUnlock() - return err } + h.hostMux.Unlock() } - return nil + return doReplace, err } // Replace given Host index with a new, randomly-selected Host from the NDF diff --git a/network/gateway/hostpool_test.go b/network/gateway/hostpool_test.go index 8181e6d49f0ea5522e8ae33eb6ac143516c45a26..49dcacf6b8231647b8c0cbc0ab6e6f84b0c7d126 100644 --- a/network/gateway/hostpool_test.go +++ b/network/gateway/hostpool_test.go @@ -359,10 +359,13 @@ func TestHostPool_CheckReplace(t *testing.T) { oldGatewayIndex := 0 oldHost := testPool.hostList[oldGatewayIndex] expectedError := fmt.Errorf(errorsList[0]) - err = testPool.checkReplace(oldHost.GetId(), expectedError) + wasReplaced, err := testPool.checkReplace(oldHost.GetId(), expectedError) if err != nil { t.Errorf("Failed to check replace: %v", err) } + if !wasReplaced { + t.Errorf("Expected to replace") + } // Ensure that old gateway has been removed from the map if _, ok := testPool.hostMap[*oldHost.GetId()]; ok { @@ -378,10 +381,13 @@ func TestHostPool_CheckReplace(t *testing.T) { goodGatewayIndex := 0 goodGateway := testPool.hostList[goodGatewayIndex] unexpectedErr := fmt.Errorf("not in global error list") - err = testPool.checkReplace(oldHost.GetId(), unexpectedErr) + wasReplaced, err = testPool.checkReplace(oldHost.GetId(), unexpectedErr) if err != nil { t.Errorf("Failed to check replace: %v", err) } + if wasReplaced { + t.Errorf("Expected not to replace") + } // Ensure that gateway with an unexpected error was not modified if _, ok := testPool.hostMap[*goodGateway.GetId()]; !ok { diff --git a/network/gateway/sender.go b/network/gateway/sender.go index dcfcdbb5b88a0be02b67f4f61dfcaa01c1169489..f05f4a7e5d3a80500a42a18e415c5b1cb2d8d982 100644 --- a/network/gateway/sender.go +++ b/network/gateway/sender.go @@ -63,7 +63,7 @@ func (s *Sender) SendToSpecific(target *id.ID, host.GetId().String()) } jww.WARN.Printf("Unable to SendToSpecific proxy %s: %s", proxies[i].GetId().String(), err) - err = s.checkReplace(proxies[i].GetId(), err) + _, err = s.checkReplace(proxies[i].GetId(), err) if err != nil { jww.ERROR.Printf("Unable to checkReplace: %+v", err) } @@ -83,7 +83,7 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error return result, nil } else { jww.WARN.Printf("Unable to SendToAny %s: %s", proxies[i].GetId().String(), err) - err = s.checkReplace(proxies[i].GetId(), err) + _, err = s.checkReplace(proxies[i].GetId(), err) if err != nil { jww.ERROR.Printf("Unable to checkReplace: %+v", err) } @@ -97,6 +97,7 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error func (s *Sender) SendToPreferred(targets []*id.ID, sendFunc func(host *connect.Host, target *id.ID) (interface{}, error)) (interface{}, error) { + // Attempt to send directly to targets if they are in the HostPool targetHosts := s.getPreferred(targets) for i := range targetHosts { result, err := sendFunc(targetHosts[i], targets[i]) @@ -105,25 +106,48 @@ func (s *Sender) SendToPreferred(targets []*id.ID, } else { jww.WARN.Printf("Unable to SendToPreferred %s via %s: %s", targets[i], targetHosts[i].GetId(), err) - err = s.checkReplace(targetHosts[i].GetId(), err) + _, err = s.checkReplace(targetHosts[i].GetId(), err) if err != nil { jww.ERROR.Printf("Unable to checkReplace: %+v", err) } } } - proxies := s.getAny(s.poolParams.ProxyAttempts, targets) - for i := range proxies { - target := targets[i%len(targets)].DeepCopy() - result, err := sendFunc(proxies[i], target) - if err == nil { - return result, nil - } else { - jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+ - "%s: %s", target, proxies[i].GetId(), err) - err = s.checkReplace(proxies[i].GetId(), err) - if err != nil { - jww.ERROR.Printf("Unable to checkReplace: %+v", err) + // Build a list of proxies for every target + proxies := make([][]*connect.Host, len(targets)) + for i := 0; i < len(targets); i++ { + proxies[i] = s.getAny(s.poolParams.ProxyAttempts, targets) + } + + // Build a map of bad proxies + badProxies := make(map[string]interface{}) + + // Iterate between each target's list of proxies, using the next target for each proxy + for proxyIdx := uint32(0); proxyIdx < s.poolParams.ProxyAttempts; proxyIdx++ { + for targetIdx := range proxies { + target := targets[targetIdx] + targetProxies := proxies[targetIdx] + proxy := targetProxies[proxyIdx] + + // Skip bad proxies + if _, ok := badProxies[proxy.String()]; ok { + continue + } + + result, err := sendFunc(targetProxies[proxyIdx], target) + if err == nil { + return result, nil + } else { + jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+ + "%s: %s", target, proxy.GetId(), err) + wasReplaced, err := s.checkReplace(proxy.GetId(), err) + if err != nil { + jww.ERROR.Printf("Unable to checkReplace: %+v", err) + } + // If the proxy was replaced, add as a bad proxy + if wasReplaced { + badProxies[proxy.String()] = nil + } } } }