Skip to content
Snippets Groups Projects
Commit 8deff0f7 authored by Jake Taylor's avatar Jake Taylor
Browse files

added force host removal, removed long running thread and host metrics

parent 18eb073d
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,6 @@ import (
"fmt"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/comms/network"
"gitlab.com/xx_network/comms/connect"
......@@ -23,10 +22,15 @@ import (
"gitlab.com/xx_network/primitives/ndf"
"io"
"math"
"strings"
"sync"
"time"
)
// List of errors that initiate a Host replacement
var errorsList = []string{"context deadline exceeded", "connection refused", "host disconnected",
"transport is closing", "all SubConns are in TransientFailure", ndf.NO_NDF}
// HostManager Interface allowing storage and retrieval of Host objects
type HostManager interface {
GetHost(hostId *id.ID) (*connect.Host, bool)
......@@ -42,7 +46,6 @@ type HostPool struct {
ndfMap map[id.ID]int // map gateway ID to its index in the ndf
ndf *ndf.NetworkDefinition
isNdfUpdated bool // indicates the NDF has been updated and needs processed
ndfMux sync.RWMutex
poolParams PoolParams
......@@ -55,8 +58,6 @@ type HostPool struct {
// PoolParams Allows configuration of HostPool parameters
type PoolParams struct {
PoolSize uint32 // Quantity of Hosts in the HostPool
ErrThreshold uint64 // How many errors will cause a Host to be ejected from the HostPool
PruneInterval time.Duration // How frequently the HostPool updates the pool
// TODO: Move up a layer
ProxyAttempts uint32 // How many proxies will be used in event of send failure
HostParams connect.HostParams // Parameters for the creation of new Host objects
......@@ -66,12 +67,9 @@ type PoolParams struct {
func DefaultPoolParams() PoolParams {
p := PoolParams{
PoolSize: 30,
ErrThreshold: 1,
PruneInterval: 10 * time.Second,
ProxyAttempts: 5,
HostParams: connect.GetDefaultHostParams(),
}
p.HostParams.EnableMetrics = true
p.HostParams.MaxRetries = 1
p.HostParams.AuthEnabled = false
p.HostParams.EnableCoolOff = true
......@@ -94,6 +92,13 @@ func newHostPool(poolParams PoolParams, rng io.Reader, ndf *ndf.NetworkDefinitio
addGatewayChan: addGateway,
}
// Verify the NDF has at least as many Gateways as needed for the HostPool
ndfLen := uint32(len(result.ndf.Gateways))
if ndfLen == 0 || ndfLen < result.poolParams.PoolSize {
return nil, errors.Errorf("Unable to create HostPool: %d/%d gateways available",
len(result.ndf.Gateways), result.poolParams.PoolSize)
}
// Propagate the NDF
err := result.updateConns()
if err != nil {
......@@ -101,15 +106,27 @@ func newHostPool(poolParams PoolParams, rng io.Reader, ndf *ndf.NetworkDefinitio
}
// Build the initial HostPool and return
return result, result.pruneHostPool()
for i := 0; i < len(result.hostList); i++ {
err := result.forceReplace(uint32(i))
if err != nil {
return nil, err
}
}
return result, nil
}
// UpdateNdf Mutates internal ndf to the given ndf
func (h *HostPool) UpdateNdf(ndf *ndf.NetworkDefinition) {
h.ndfMux.Lock()
h.isNdfUpdated = true
h.ndf = ndf
h.ndfMux.Unlock()
h.ndfMux.RLock()
err := h.updateConns()
if err != nil {
jww.ERROR.Printf("Unable to updateConns: %+v", err)
}
h.ndfMux.RUnlock()
}
// Obtain a random, unique list of Hosts of the given length from the HostPool
......@@ -177,60 +194,33 @@ func (h *HostPool) getPreferred(targets []*id.ID) []*connect.Host {
return result
}
// StartHostPool Start long-running thread and return the thread controller to the caller
func (h *HostPool) StartHostPool() stoppable.Stoppable {
stopper := stoppable.NewSingle("HostPool")
jww.INFO.Printf("Starting Host Pool...")
go h.manageHostPool(stopper)
return stopper
}
// Long-running thread that manages the HostPool on a timer
func (h *HostPool) manageHostPool(stopper *stoppable.Single) {
tick := time.Tick(h.poolParams.PruneInterval)
for {
select {
case <-stopper.Quit():
break
case <-tick:
h.ndfMux.RLock()
if h.isNdfUpdated {
err := h.updateConns()
if err != nil {
jww.ERROR.Printf("Unable to updateConns: %+v", err)
}
h.isNdfUpdated = false
}
h.hostMux.Lock()
err := h.pruneHostPool()
h.hostMux.Unlock()
if err != nil {
jww.ERROR.Printf("Unable to pruneHostPool: %+v", err)
}
h.ndfMux.RUnlock()
// Replaces the given hostId in the HostPool if the given hostErr is in errorList
func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) error {
if hostErr != nil {
for _, errString := range errorsList {
if strings.Contains(hostErr.Error(), errString) {
h.hostMux.RLock()
oldPoolIndex := h.hostMap[*hostId]
h.hostMux.RUnlock()
return h.forceReplace(oldPoolIndex)
}
}
}
// Iterate over the hostList, replacing any empty Hosts or Hosts with errors
// with new, randomly-selected Hosts from the NDF
func (h *HostPool) pruneHostPool() error {
// Verify the NDF has at least as many Gateways as needed for the HostPool
ndfLen := uint32(len(h.ndf.Gateways))
if ndfLen == 0 || ndfLen < h.poolParams.PoolSize {
return errors.Errorf("Unable to pruneHostPool: %d/%d gateways available",
len(h.ndf.Gateways), h.poolParams.PoolSize)
return nil
}
jww.FATAL.Printf("CAT: 0")
for poolIdx := uint32(0); poolIdx < h.poolParams.PoolSize; {
host := h.hostList[poolIdx]
// Check the Host for errors
if host == nil || host.GetMetrics().GetErrorCounter() >= h.poolParams.ErrThreshold {
// Replace given Host index with a new, randomly-selected Host from the NDF
func (h *HostPool) forceReplace(oldPoolIndex uint32) error {
h.ndfMux.RLock()
h.hostMux.Lock()
defer h.hostMux.Unlock()
defer h.ndfMux.RUnlock()
// If errors occurred, randomly select a new Gw by index in the NDF
// Loop until a replacement Host is found
for {
// Randomly select a new Gw by index in the NDF
ndfIdx := readRangeUint32(0, uint32(len(h.ndf.Gateways)), h.rng)
jww.DEBUG.Printf("Attempting to replace Host at HostPool %d with Host at NDF %d...", oldPoolIndex, ndfIdx)
// Use the random ndfIdx to obtain a GwId from the NDF
gwId, err := id.Unmarshal(h.ndf.Gateways[ndfIdx].ID)
......@@ -238,22 +228,13 @@ func (h *HostPool) pruneHostPool() error {
return errors.WithMessage(err, "failed to get Gateway for pruning")
}
// Verify the GwId is not already in the hostMap
// Verify the new GwId is not already in the hostMap
if _, ok := h.hostMap[*gwId]; !ok {
// If it is a new GwId, replace the old Host with the new Host
err = h.replaceHost(gwId, poolIdx)
if err != nil {
return err
}
poolIdx++
return h.replaceHost(gwId, oldPoolIndex)
}
} else {
poolIdx++
}
}
return nil
}
// Replace the given slot in the HostPool with a new Gateway with the specified ID
func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error {
......
......@@ -37,13 +37,13 @@ func NewSender(poolParams PoolParams, rng io.Reader, ndf *ndf.NetworkDefinition,
// SendToSpecific Call given sendFunc to a specific Host in the HostPool,
// attempting with up to numProxies destinations in case of failure
func (m *Sender) SendToSpecific(target *id.ID,
func (s *Sender) SendToSpecific(target *id.ID,
sendFunc func(host *connect.Host, target *id.ID) (interface{}, bool, error)) (interface{}, error) {
host, ok := m.getSpecific(target)
host, ok := s.getSpecific(target)
if ok {
result, didAbort, err := sendFunc(host, target)
if err == nil {
return result, m.forceAdd([]*id.ID{host.GetId()})
return result, s.forceAdd([]*id.ID{host.GetId()})
} else {
if didAbort {
return nil, errors.WithMessagef(err, "Aborted SendToSpecific gateway %s", host.GetId().String())
......@@ -52,9 +52,9 @@ func (m *Sender) SendToSpecific(target *id.ID,
}
}
proxies := m.getAny(m.poolParams.ProxyAttempts, []*id.ID{target})
for proxyIdx := 0; proxyIdx < len(proxies); proxyIdx++ {
result, didAbort, err := sendFunc(proxies[proxyIdx], target)
proxies := s.getAny(s.poolParams.ProxyAttempts, []*id.ID{target})
for i := range proxies {
result, didAbort, err := sendFunc(proxies[i], target)
if err == nil {
return result, nil
} else {
......@@ -62,7 +62,11 @@ func (m *Sender) SendToSpecific(target *id.ID,
return nil, errors.WithMessagef(err, "Aborted SendToSpecific gateway proxy %s",
host.GetId().String())
}
jww.WARN.Printf("Unable to SendToSpecific proxy %s: %+v", proxies[proxyIdx].GetId().String(), err)
jww.WARN.Printf("Unable to SendToSpecific proxy %s: %+v", proxies[i].GetId().String(), err)
err = s.checkReplace(proxies[i].GetId(), err)
if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err)
}
}
}
......@@ -70,15 +74,19 @@ func (m *Sender) SendToSpecific(target *id.ID,
}
// SendToAny Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations
func (m *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error)) (interface{}, error) {
func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error)) (interface{}, error) {
proxies := m.getAny(m.poolParams.ProxyAttempts, nil)
for _, proxy := range proxies {
result, err := sendFunc(proxy)
proxies := s.getAny(s.poolParams.ProxyAttempts, nil)
for i := range proxies {
result, err := sendFunc(proxies[i])
if err == nil {
return result, nil
} else {
jww.WARN.Printf("Unable to SendToAny %s: %+v", proxy.GetId().String(), err)
jww.WARN.Printf("Unable to SendToAny %s: %+v", proxies[i].GetId().String(), err)
err = s.checkReplace(proxies[i].GetId(), err)
if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err)
}
}
}
......@@ -86,26 +94,34 @@ func (m *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error
}
// SendToPreferred Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations
func (m *Sender) SendToPreferred(targets []*id.ID,
func (s *Sender) SendToPreferred(targets []*id.ID,
sendFunc func(host *connect.Host, target *id.ID) (interface{}, error)) (interface{}, error) {
targetHosts := m.getPreferred(targets)
for i, host := range targetHosts {
result, err := sendFunc(host, targets[i])
targetHosts := s.getPreferred(targets)
for i := range targetHosts {
result, err := sendFunc(targetHosts[i], targets[i])
if err == nil {
return result, nil
} else {
jww.WARN.Printf("Unable to SendToPreferred %s: %+v", host.GetId().String(), err)
jww.WARN.Printf("Unable to SendToPreferred %s: %+v", targetHosts[i].GetId().String(), err)
err = s.checkReplace(targetHosts[i].GetId(), err)
if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err)
}
}
}
proxies := m.getAny(m.poolParams.ProxyAttempts, targets)
for i, proxy := range proxies {
result, err := sendFunc(proxy, targets[i%len(targets)])
proxies := s.getAny(s.poolParams.ProxyAttempts, targets)
for i := range proxies {
result, err := sendFunc(proxies[i], targets[i%len(targets)])
if err == nil {
return result, nil
} else {
jww.WARN.Printf("Unable to SendToPreferred proxy %s: %+v", proxy.GetId().String(), err)
jww.WARN.Printf("Unable to SendToPreferred proxy %s: %+v", proxies[i].GetId().String(), err)
err = s.checkReplace(proxies[i].GetId(), err)
if err != nil {
jww.ERROR.Printf("Unable to checkReplace: %+v", err)
}
}
}
......
......@@ -143,9 +143,6 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab
// Round processing
multi.Add(m.round.StartProcessors())
// Message sending
multi.Add(m.sender.StartHostPool())
multi.Add(ephemeral.Track(m.Session, m.ReceptionID))
return multi, nil
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment