Skip to content
Snippets Groups Projects
Commit c17ac9c6 authored by Jake Taylor's avatar Jake Taylor
Browse files

first pass at adding new initialize functionality

parent d235fc15
No related branches found
No related tags found
2 merge requests!53Release,!33Hotfix/ping
......@@ -21,7 +21,7 @@ require (
gitlab.com/elixxir/crypto v0.0.7-0.20210920180151-6c9b84bae372
gitlab.com/elixxir/ekv v0.1.5
gitlab.com/elixxir/primitives v0.0.3-0.20210920180121-b85bca5212f4
gitlab.com/xx_network/comms v0.0.4-0.20211005155924-3659069a608a
gitlab.com/xx_network/comms v0.0.4-0.20211005180518-f27edc052505
gitlab.com/xx_network/crypto v0.0.5-0.20210920180047-4dd4aed4a942
gitlab.com/xx_network/primitives v0.0.4-0.20210915220237-70cb4551d6f3
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
......
......@@ -271,6 +271,8 @@ gitlab.com/xx_network/comms v0.0.0-20200805174823-841427dd5023/go.mod h1:owEcxTR
gitlab.com/xx_network/comms v0.0.4-0.20210921011654-3b73a40ed3d6/go.mod h1:5K8m1XBzIMWodR4BqGydP+9RyyVJ9e/yjmfyEyeju14=
gitlab.com/xx_network/comms v0.0.4-0.20211005155924-3659069a608a h1:tArQsPqN5GZuQSTiDjBSIANWvN75IN2gTgR0qa76TTg=
gitlab.com/xx_network/comms v0.0.4-0.20211005155924-3659069a608a/go.mod h1:5K8m1XBzIMWodR4BqGydP+9RyyVJ9e/yjmfyEyeju14=
gitlab.com/xx_network/comms v0.0.4-0.20211005180518-f27edc052505 h1:GziWfJ5jHLBWQqQ2ObVnojmZ1R7ytRRho0cXp/9Hj78=
gitlab.com/xx_network/comms v0.0.4-0.20211005180518-f27edc052505/go.mod h1:5K8m1XBzIMWodR4BqGydP+9RyyVJ9e/yjmfyEyeju14=
gitlab.com/xx_network/crypto v0.0.3/go.mod h1:DF2HYvvCw9wkBybXcXAgQMzX+MiGbFPjwt3t17VRqRE=
gitlab.com/xx_network/crypto v0.0.4/go.mod h1:+lcQEy+Th4eswFgQDwT0EXKp4AXrlubxalwQFH5O0Mk=
gitlab.com/xx_network/crypto v0.0.5-0.20210920180047-4dd4aed4a942 h1:pOFwTWCdaFhwve2aWoqicqQIECuZ1mIUeLtVMUAauEg=
......
......@@ -74,8 +74,8 @@ type HostPool struct {
type PoolParams struct {
MaxPoolSize uint32 // Maximum number of Hosts in the HostPool
PoolSize uint32 // Allows override of HostPool size. Set to zero for dynamic size calculation
// TODO: Move up a layer
ProxyAttempts uint32 // How many proxies will be used in event of send failure
PingMultiplier uint32 // How many gateways to concurrently test when initializing HostPool
HostParams connect.HostParams // Parameters for the creation of new Host objects
}
......@@ -85,6 +85,7 @@ func DefaultPoolParams() PoolParams {
MaxPoolSize: 30,
ProxyAttempts: 5,
PoolSize: 0,
PingMultiplier: 2,
HostParams: connect.GetDefaultHostParams(),
}
p.HostParams.MaxRetries = 1
......@@ -93,6 +94,7 @@ func DefaultPoolParams() PoolParams {
p.HostParams.NumSendsBeforeCoolOff = 1
p.HostParams.CoolOffTimeout = 5 * time.Minute
p.HostParams.SendTimeout = 2000 * time.Millisecond
p.HostParams.PingTimeout = 1 * time.Second
return p
}
......@@ -153,17 +155,71 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator,
}
// Build the initial HostPool and return
for i := numHostsAdded; i < len(result.hostList); i++ {
err := result.forceReplace(uint32(i))
if err != nil {
return nil, err
}
}
//for i := numHostsAdded; i < len(result.hostList); i++ {
// err := result.forceReplace(uint32(i))
// if err != nil {
// return nil, err
// }
//}
jww.INFO.Printf("Initialized HostPool with size: %d/%d", poolParams.PoolSize, len(netDef.Gateways))
return result, nil
}
// Initialize the HostPool with a performant set of Hosts
func (h *HostPool) initialize(startIdx uint32) {
roundTripDurations := make(map[id.ID]time.Duration)
numGatewaysToTry := h.poolParams.PingMultiplier * (h.poolParams.PoolSize - startIdx)
// Begin checking roundTripDurations for numGatewaysToTry
for i := uint32(0); i < numGatewaysToTry; i++ {
// Select a gateway not yet selected
gwId := h.selectGateway()
if _, ok := roundTripDurations[*gwId]; ok {
continue
}
// Initialize in the durations map
roundTripDurations[*gwId] = 0
go func() {
// Obtain that GwId's Host object
newHost, ok := h.manager.GetHost(gwId)
if !ok {
jww.WARN.Printf("host for gateway %s could not be "+
"retrieved", gwId)
return
}
pingDuration, _ := newHost.IsOnline()
roundTripDurations[*gwId] = pingDuration
}()
}
// TODO ?
time.Sleep(2 * h.poolParams.HostParams.PingTimeout)
// Add sufficiently fast gateways
for gwId, roundTripDuration := range roundTripDurations {
// Ignore failed connectivity checks
if roundTripDuration == 0 {
continue
}
// Attempt to add to HostPool
err := h.replaceHost(&gwId, startIdx)
if err != nil {
jww.WARN.Printf("Unable to initialize host in HostPool: %+v", err)
}
startIdx++
// Once HostPool is full, break
if startIdx >= h.poolParams.PoolSize {
break
}
}
}
// UpdateNdf Mutates internal ndf to the given ndf
func (h *HostPool) UpdateNdf(ndf *ndf.NetworkDefinition) {
if len(ndf.Gateways) == 0 {
......@@ -299,7 +355,7 @@ func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) (bool, error) {
if oldPoolIndex, ok := h.hostMap[*hostId]; ok {
// Replace it
h.ndfMux.RLock()
err = h.forceReplace(oldPoolIndex)
err = h.replaceHost(h.selectGateway(), oldPoolIndex)
h.ndfMux.RUnlock()
}
h.hostMux.Unlock()
......@@ -307,8 +363,8 @@ func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) (bool, error) {
return doReplace, err
}
// Replace given Host index with a new, randomly-selected Host from the NDF
func (h *HostPool) forceReplace(oldPoolIndex uint32) error {
// Select a viable HostPool candidate from the NDF
func (h *HostPool) selectGateway() *id.ID {
rng := h.rng.GetStream()
defer rng.Close()
......@@ -316,12 +372,12 @@ func (h *HostPool) forceReplace(oldPoolIndex uint32) error {
for {
// Randomly select a new Gw by index in the NDF
ndfIdx := readRangeUint32(0, uint32(len(h.ndf.Gateways)), rng)
jww.DEBUG.Printf("Attempting to replace Host at HostPool %d with Host at NDF %d...", oldPoolIndex, ndfIdx)
// Use the random ndfIdx to obtain a GwId from the NDF
gwId, err := id.Unmarshal(h.ndf.Gateways[ndfIdx].ID)
if err != nil {
return errors.WithMessage(err, "failed to get Gateway for pruning")
jww.WARN.Printf("Unable to unmarshal gateway: %+v", err)
continue
}
// Verify the Gateway's Node is not Stale before adding to HostPool
......@@ -336,8 +392,7 @@ func (h *HostPool) forceReplace(oldPoolIndex uint32) error {
// Verify the new GwId is not already in the hostMap
if _, ok := h.hostMap[*gwId]; !ok {
// If it is a new GwId, replace the old Host with the new Host
return h.replaceHost(gwId, oldPoolIndex)
return gwId
}
}
}
......@@ -478,7 +533,7 @@ func (h *HostPool) removeGateway(gwId *id.ID) {
h.manager.RemoveHost(gwId)
// If needed, replace the removed Gateway in the HostPool with a new one
if poolIndex, ok := h.hostMap[*gwId]; ok {
err := h.forceReplace(poolIndex)
err := h.replaceHost(h.selectGateway(), poolIndex)
if err != nil {
jww.ERROR.Printf("Unable to removeGateway: %+v", err)
}
......
......@@ -54,7 +54,11 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error
jww.INFO.Printf("Unable to SendToAny via %s: non-fatal error received, retrying: %s",
proxies[proxy].GetId().String(), err)
} else if strings.Contains(err.Error(), "unable to connect to target host") {
// Retry of the proxy could not communicate
jww.WARN.Printf("Unable to SendToAny via %s: %s,"+
" proxy could not contact requested host",
proxy, err)
continue
} else if replaced, checkReplaceErr := s.checkReplace(proxies[proxy].GetId(), err); replaced {
if checkReplaceErr != nil {
jww.WARN.Printf("Unable to SendToAny, replaced a proxy %s with error %s",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment