Skip to content
Snippets Groups Projects
Commit 19c02b66 authored by Jake Taylor's avatar Jake Taylor
Browse files

second pass at adding new initialize functionality

parent bcd7c4bb
No related branches found
No related tags found
2 merge requests!53Release,!33Hotfix/ping
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/crypto/shuffle"
"gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/ndf"
...@@ -26,6 +27,7 @@ import ( ...@@ -26,6 +27,7 @@ import (
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"io" "io"
"math" "math"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
...@@ -155,69 +157,111 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, ...@@ -155,69 +157,111 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator,
} }
// Build the initial HostPool and return // Build the initial HostPool and return
//for i := numHostsAdded; i < len(result.hostList); i++ { err = result.initialize(uint32(numHostsAdded))
// err := result.forceReplace(uint32(i)) if err != nil {
// if err != nil { return nil, err
// return nil, err }
// }
//}
jww.INFO.Printf("Initialized HostPool with size: %d/%d", poolParams.PoolSize, len(netDef.Gateways)) jww.INFO.Printf("Initialized HostPool with size: %d/%d", poolParams.PoolSize, len(netDef.Gateways))
return result, nil return result, nil
} }
// Initialize the HostPool with a performant set of Hosts // Initialize the HostPool with a performant set of Hosts
func (h *HostPool) initialize(startIdx uint32) { func (h *HostPool) initialize(startIdx uint32) error {
roundTripDurations := make(map[id.ID]time.Duration) // Randomly shuffle gateways in NDF
numGatewaysToTry := h.poolParams.PingMultiplier * (h.poolParams.PoolSize - startIdx) randomGateways := make([]ndf.Gateway, len(h.ndf.Gateways))
copy(randomGateways, h.ndf.Gateways)
var rndBytes [32]byte
stream := h.rng.GetStream()
_, err := stream.Read(rndBytes[:])
stream.Close()
if err != nil {
return errors.Errorf("Failed to randomize shuffle for HostPool initialization: %+v", err)
}
shuffle.ShuffleSwap(rndBytes[:], len(randomGateways), func(i, j int) {
randomGateways[i], randomGateways[j] = randomGateways[j], randomGateways[i]
})
// Begin checking roundTripDurations for numGatewaysToTry // Set constants
for i := uint32(0); i < numGatewaysToTry; i++ { type gatewayDuration struct {
// Select a gateway not yet selected id *id.ID
gwId := h.selectGateway() latency time.Duration
if _, ok := roundTripDurations[*gwId]; ok {
continue
} }
gatewaysNeeded := h.poolParams.PoolSize - startIdx
numGatewaysToTry := h.poolParams.PingMultiplier * gatewaysNeeded
resultList := make([]gatewayDuration, 0, numGatewaysToTry)
// Initialize in the durations map // Begin trying gateways
roundTripDurations[*gwId] = 0 c := make(chan gatewayDuration, numGatewaysToTry)
exit := false
i := uint32(0)
for !exit {
for ; i < numGatewaysToTry; i++ {
// Ran out of Hosts to try
if i > uint32(len(randomGateways)) {
return errors.Errorf("Unable to initialize enough viable hosts for HostPool")
}
// Select a gateway not yet selected
gwId, err := randomGateways[i].GetGatewayId()
if err != nil {
jww.WARN.Printf("ID for gateway %s could not be retrieved", gwId)
}
go func() { go func() {
// Obtain that GwId's Host object // Obtain that GwId's Host object
newHost, ok := h.manager.GetHost(gwId) newHost, ok := h.manager.GetHost(gwId)
if !ok { if !ok {
jww.WARN.Printf("host for gateway %s could not be "+ jww.WARN.Printf("Host for gateway %s could not be "+
"retrieved", gwId) "retrieved", gwId)
return return
} }
pingDuration, _ := newHost.IsOnline() // Ping the Host latency and send the result
roundTripDurations[*gwId] = pingDuration latency, _ := newHost.IsOnline()
c <- gatewayDuration{gwId, latency}
}() }()
} }
// TODO ? // Collect ping results
time.Sleep(2 * h.poolParams.HostParams.PingTimeout) timer := time.NewTimer(2 * h.poolParams.HostParams.PingTimeout)
for {
select {
case gw := <-c:
// Only add successful pings
if gw.latency > 0 {
resultList = append(resultList, gw)
}
// Add sufficiently fast gateways // Break if we have all needed slots
for gwId, roundTripDuration := range roundTripDurations { if uint32(len(resultList)) == numGatewaysToTry {
// Ignore failed connectivity checks exit = true
if roundTripDuration == 0 { break
continue }
case <-timer.C:
break
} }
}
}
// Sort the resultList by lowest latency
sort.Slice(resultList, func(i, j int) bool {
return resultList[i].latency < resultList[j].latency
})
jww.DEBUG.Printf("Gateway pool results: %+v", resultList)
// Attempt to add to HostPool // Process ping results
err := h.replaceHost(&gwId, startIdx) for _, result := range resultList {
err = h.replaceHost(result.id, startIdx)
if err != nil { if err != nil {
jww.WARN.Printf("Unable to initialize host in HostPool: %+v", err) return err
} }
startIdx++ startIdx++
// Once HostPool is full, break
if startIdx >= h.poolParams.PoolSize { if startIdx >= h.poolParams.PoolSize {
break break
} }
} }
return nil
} }
// UpdateNdf Mutates internal ndf to the given ndf // UpdateNdf Mutates internal ndf to the given ndf
......
...@@ -248,7 +248,6 @@ func (mp transmitMessagePayload) SetContents(contents []byte) { ...@@ -248,7 +248,6 @@ func (mp transmitMessagePayload) SetContents(contents []byte) {
copy(mp.contents, contents) copy(mp.contents, contents)
} }
// String returns the contents for printing adhering to the stringer interface. // String returns the contents for printing adhering to the stringer interface.
func (mp transmitMessagePayload) String() string { func (mp transmitMessagePayload) String() string {
return fmt.Sprintf("Data: %x [tagFP: %x, nonce: %x, "+ return fmt.Sprintf("Data: %x [tagFP: %x, nonce: %x, "+
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment