From b24060a7b8697f81ffabb48cc828421a40f8f7e0 Mon Sep 17 00:00:00 2001 From: Jake Taylor <jake@elixxir.io> Date: Mon, 29 Mar 2021 13:05:39 -0500 Subject: [PATCH] more interface improvesments --- network/gateway/gateway.go | 119 +++++++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 37 deletions(-) diff --git a/network/gateway/gateway.go b/network/gateway/gateway.go index 6e57101bb..1dda3483f 100644 --- a/network/gateway/gateway.go +++ b/network/gateway/gateway.go @@ -35,11 +35,11 @@ type HostManager interface { // Handles providing hosts to the Client type HostPool struct { - hostMap map[*id.ID]uint32 // map key to its index in the slice - hostList []*connect.Host // each index in the slice contains the value - hostMux sync.RWMutex // Mutex for the above map/list combination + hostMap map[id.ID]uint32 // map key to its index in the slice + hostList []*connect.Host // each index in the slice contains the value + hostMux sync.RWMutex // Mutex for the above map/list combination - ndfMap map[*id.ID]int // map gateway ID to its index in the ndf + ndfMap map[id.ID]int // map gateway ID to its index in the ndf ndf *ndf.NetworkDefinition isNdfUpdated bool // indicates the NDF has been updated and needs processed ndfMux sync.RWMutex @@ -74,7 +74,7 @@ func NewHostPool(poolParams PoolParams, rng io.Reader, ndf *ndf.NetworkDefinitio storage *storage.Session, addGateway chan network.NodeGateway) (*HostPool, error) { result := &HostPool{ manager: getter, - hostMap: make(map[*id.ID]uint32), + hostMap: make(map[id.ID]uint32), hostList: make([]*connect.Host, poolParams.poolSize), poolParams: poolParams, ndf: ndf, @@ -105,29 +105,63 @@ func (h *HostPool) UpdateNdf(ndf *ndf.NetworkDefinition) { h.ndfMux.Unlock() } -// Return a random GwHost from the HostPool -func (h *HostPool) GetAny() *connect.Host { - gwIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng) +// Obtain a random, unique list of Hosts of the given length from the HostPool +func (h *HostPool) GetAnyList(length int) []*connect.Host { + checked := make(map[uint32]interface{}) + result := make([]*connect.Host, length) + h.hostMux.RLock() - defer h.hostMux.RUnlock() - return h.hostList[gwIdx] + for i := 0; i < length; i++ { + gwIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng) + if _, ok := checked[gwIdx]; !ok { + result[i] = h.hostList[gwIdx] + checked[gwIdx] = nil + i++ + } + } + h.hostMux.RUnlock() + + return result } -// Try to obtain a specific host from the HostPool. -// If that GwId is not in the HostPool, return a random GwHost from the HostPool -func (h *HostPool) GetSpecific(gwId *id.ID) *connect.Host { +// Obtain a specific list of Hosts from the manager, irrespective of the HostPool +func (h *HostPool) GetSpecific(targets []*id.ID) []*connect.Host { + result := make([]*connect.Host, len(targets)) + for i := 0; i < len(targets); i++ { + result[i], _ = h.manager.GetHost(targets[i]) + } + return result +} + +// Try to obtain the given targets from the HostPool +// If each is not present, obtain a random replacement from the HostPool +func (h *HostPool) GetPreferred(targets []*id.ID) []*connect.Host { + checked := make(map[uint32]interface{}) // Keep track of Hosts already selected to avoid duplicates + result := make([]*connect.Host, len(targets)) + h.hostMux.RLock() - if hostIdx, ok := h.hostMap[gwId]; ok { - defer h.hostMux.RUnlock() - return h.hostList[hostIdx] + for i := 0; i < len(targets); i++ { + if hostIdx, ok := h.hostMap[*targets[i]]; ok { + result[i] = h.hostList[hostIdx] + i++ + } + + gwIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng) + if _, ok := checked[gwIdx]; !ok { + result[i] = h.hostList[gwIdx] + checked[gwIdx] = nil + i++ + } } h.hostMux.RUnlock() - return h.GetAny() + + return result } // Start long-running thread and return the thread controller to the caller func (h *HostPool) StartHostPool() stoppable.Stoppable { stopper := stoppable.NewSingle("HostPool") + jww.INFO.Printf("Starting Host Pool...") go h.manageHostPool(stopper) return stopper } @@ -185,7 +219,7 @@ func (h *HostPool) pruneHostPool() error { } // Verify the GwId is not already in the hostMap - if _, ok := h.hostMap[gwId]; !ok { + if _, ok := h.hostMap[*gwId]; !ok { // If it is a new GwId, replace the old Host with the new Host err = h.replaceHost(gwId, poolIdx) if err != nil { @@ -213,30 +247,41 @@ func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error { // Use the poolIdx to overwrite the random Host in the corresponding index in the hostList h.hostList[oldPoolIndex] = newHost // Use the GwId to keep track of the new random Host's index in the hostList - h.hostMap[newId] = oldPoolIndex + h.hostMap[*newId] = oldPoolIndex // Clean up and move onto next Host if oldHost != nil { - delete(h.hostMap, oldHost.GetId()) + delete(h.hostMap, *oldHost.GetId()) oldHost.Disconnect() } + jww.DEBUG.Printf("Replaced Host at %d with new Host %s", oldPoolIndex, newId.String()) return nil } -// Force-add the specified Gateway to the HostPool, replacing a random Gateway -// TODO: Possibly needs to take a list? -func (h *HostPool) ForceAdd(gwId *id.ID) error { +// Force-add the Gateways to the HostPool, each replacing a random Gateway +func (h *HostPool) ForceAdd(gwIds []*id.ID) error { h.hostMux.Lock() defer h.hostMux.Unlock() - // Verify the GwId is not already in the hostMap - if _, ok := h.hostMap[gwId]; ok { - return nil - } + checked := make(map[int]interface{}) // Keep track of Hosts already replaced + for i := 0; i < len(gwIds); { + // Verify the GwId is not already in the hostMap + if _, ok := h.hostMap[*gwIds[i]]; ok { + continue + } - // Randomly select another Gateway in the HostPool for replacement - poolIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng) - return h.replaceHost(gwId, poolIdx) + // Randomly select another Gateway in the HostPool for replacement + poolIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng) + if _, ok := checked[i]; !ok { + err := h.replaceHost(gwIds[i], poolIdx) + if err != nil { + return err + } + checked[i] = nil + i++ + } + } + return nil } // Updates the internal HostPool with any changes to the NDF @@ -251,7 +296,7 @@ func (h *HostPool) updateConns() error { for gwId := range h.ndfMap { if _, ok := newMap[gwId]; !ok { // If GwId in ndfMap is not in newMap, remove the Gateway - h.removeGateway(gwId) + h.removeGateway(&gwId) } } @@ -259,7 +304,7 @@ func (h *HostPool) updateConns() error { for gwId, ndfIdx := range newMap { if _, ok := h.ndfMap[gwId]; !ok { // If GwId in newMap is not in ndfMap, add the Gateway - h.addGateway(gwId, ndfIdx) + h.addGateway(&gwId, ndfIdx) } } @@ -269,15 +314,15 @@ func (h *HostPool) updateConns() error { } // Takes ndf.Gateways and puts their IDs into a map object -func convertNdfToMap(ndf *ndf.NetworkDefinition) (map[*id.ID]int, error) { - result := make(map[*id.ID]int) +func convertNdfToMap(ndf *ndf.NetworkDefinition) (map[id.ID]int, error) { + result := make(map[id.ID]int) // Process gateway Id's into set for i, gw := range ndf.Gateways { gwId, err := id.Unmarshal(gw.ID) if err != nil { return nil, err } - result[gwId] = i + result[*gwId] = i } return result, nil @@ -287,9 +332,9 @@ func convertNdfToMap(ndf *ndf.NetworkDefinition) (map[*id.ID]int, error) { func (h *HostPool) removeGateway(gwId *id.ID) { h.manager.RemoveHost(gwId) // If needed, flag the Host for deletion from the HostPool - if poolIndex, ok := h.hostMap[gwId]; ok { + if poolIndex, ok := h.hostMap[*gwId]; ok { h.hostList[poolIndex] = nil - delete(h.hostMap, gwId) + delete(h.hostMap, *gwId) } } -- GitLab