diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index 28ad56cd20fd00a8c0369f0bbf78d50ae47817c8..46b6e93539aad0a372fb31dad619c38f4a5a05ff 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -18,6 +18,7 @@ import ( "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/elixxir/crypto/shuffle" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" @@ -26,6 +27,7 @@ import ( "google.golang.org/grpc/balancer" "io" "math" + "sort" "strings" "sync" "time" @@ -155,69 +157,111 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, } // Build the initial HostPool and return - //for i := numHostsAdded; i < len(result.hostList); i++ { - // err := result.forceReplace(uint32(i)) - // if err != nil { - // return nil, err - // } - //} + err = result.initialize(uint32(numHostsAdded)) + if err != nil { + return nil, err + } jww.INFO.Printf("Initialized HostPool with size: %d/%d", poolParams.PoolSize, len(netDef.Gateways)) return result, nil } // Initialize the HostPool with a performant set of Hosts -func (h *HostPool) initialize(startIdx uint32) { - roundTripDurations := make(map[id.ID]time.Duration) - numGatewaysToTry := h.poolParams.PingMultiplier * (h.poolParams.PoolSize - startIdx) - - // Begin checking roundTripDurations for numGatewaysToTry - for i := uint32(0); i < numGatewaysToTry; i++ { - // Select a gateway not yet selected - gwId := h.selectGateway() - if _, ok := roundTripDurations[*gwId]; ok { - continue - } - - // Initialize in the durations map - roundTripDurations[*gwId] = 0 +func (h *HostPool) initialize(startIdx uint32) error { + // Randomly shuffle gateways in NDF + randomGateways := make([]ndf.Gateway, len(h.ndf.Gateways)) + copy(randomGateways, h.ndf.Gateways) + var rndBytes [32]byte + stream := h.rng.GetStream() + _, err := stream.Read(rndBytes[:]) + stream.Close() + if err != nil { + return errors.Errorf("Failed to randomize shuffle for HostPool initialization: %+v", err) + } + shuffle.ShuffleSwap(rndBytes[:], len(randomGateways), func(i, j int) { + randomGateways[i], randomGateways[j] = randomGateways[j], randomGateways[i] + }) + + // Set constants + type gatewayDuration struct { + id *id.ID + latency time.Duration + } + gatewaysNeeded := h.poolParams.PoolSize - startIdx + numGatewaysToTry := h.poolParams.PingMultiplier * gatewaysNeeded + resultList := make([]gatewayDuration, 0, numGatewaysToTry) + + // Begin trying gateways + c := make(chan gatewayDuration, numGatewaysToTry) + exit := false + i := uint32(0) + for !exit { + for ; i < numGatewaysToTry; i++ { + // Ran out of Hosts to try + if i > uint32(len(randomGateways)) { + return errors.Errorf("Unable to initialize enough viable hosts for HostPool") + } - go func() { - // Obtain that GwId's Host object - newHost, ok := h.manager.GetHost(gwId) - if !ok { - jww.WARN.Printf("host for gateway %s could not be "+ - "retrieved", gwId) - return + // Select a gateway not yet selected + gwId, err := randomGateways[i].GetGatewayId() + if err != nil { + jww.WARN.Printf("ID for gateway %s could not be retrieved", gwId) } - pingDuration, _ := newHost.IsOnline() - roundTripDurations[*gwId] = pingDuration - }() - } + go func() { + // Obtain that GwId's Host object + newHost, ok := h.manager.GetHost(gwId) + if !ok { + jww.WARN.Printf("Host for gateway %s could not be "+ + "retrieved", gwId) + return + } + + // Ping the Host latency and send the result + latency, _ := newHost.IsOnline() + c <- gatewayDuration{gwId, latency} + }() + } - // TODO ? - time.Sleep(2 * h.poolParams.HostParams.PingTimeout) + // Collect ping results + timer := time.NewTimer(2 * h.poolParams.HostParams.PingTimeout) + for { + select { + case gw := <-c: + // Only add successful pings + if gw.latency > 0 { + resultList = append(resultList, gw) + } - // Add sufficiently fast gateways - for gwId, roundTripDuration := range roundTripDurations { - // Ignore failed connectivity checks - if roundTripDuration == 0 { - continue + // Break if we have all needed slots + if uint32(len(resultList)) == numGatewaysToTry { + exit = true + break + } + case <-timer.C: + break + } } + } - // Attempt to add to HostPool - err := h.replaceHost(&gwId, startIdx) + // Sort the resultList by lowest latency + sort.Slice(resultList, func(i, j int) bool { + return resultList[i].latency < resultList[j].latency + }) + jww.DEBUG.Printf("Gateway pool results: %+v", resultList) + + // Process ping results + for _, result := range resultList { + err = h.replaceHost(result.id, startIdx) if err != nil { - jww.WARN.Printf("Unable to initialize host in HostPool: %+v", err) + return err } startIdx++ - - // Once HostPool is full, break if startIdx >= h.poolParams.PoolSize { break } } + return nil } // UpdateNdf Mutates internal ndf to the given ndf diff --git a/single/transmission.go b/single/transmission.go index b5375a2e1272d032de531662466322601b6c3b56..1b8bfddc05864b1ae790ba5af0193158f882bc8b 100644 --- a/single/transmission.go +++ b/single/transmission.go @@ -289,7 +289,7 @@ func makeIDs(msg *transmitMessagePayload, publicKey *cyclic.Int, jww.DEBUG.Printf("ephemeral.GetId(%s, %d, %d) = %d", rid, addressSize, timeNow.UnixNano(), ephID.Int64()) } - jww.INFO.Printf("generated by singe use sender reception id for single use: %s, " + + jww.INFO.Printf("generated by singe use sender reception id for single use: %s, "+ "ephId: %d, pubkey: %x, msg: %s", rid, ephID.Int64(), publicKey.Bytes(), msg) return rid, ephID, nil diff --git a/single/transmitMessage.go b/single/transmitMessage.go index da7f88d9458363ed5e5bc4ea4a89e84a4874af12..b3915fbe8ae43a87c3e0b7ce5fcff32e9c7adf12 100644 --- a/single/transmitMessage.go +++ b/single/transmitMessage.go @@ -248,10 +248,9 @@ func (mp transmitMessagePayload) SetContents(contents []byte) { copy(mp.contents, contents) } - // String returns the contents for printing adhering to the stringer interface. -func (mp transmitMessagePayload) String()string { - return fmt.Sprintf("Data: %x [tagFP: %x, nonce: %x, " + +func (mp transmitMessagePayload) String() string { + return fmt.Sprintf("Data: %x [tagFP: %x, nonce: %x, "+ "maxParts: %x, size: %x, content: %x]", mp.data, mp.tagFP, mp.nonce, mp.maxParts, mp.size, mp.contents)