Skip to content
Snippets Groups Projects
Commit b24060a7 authored by Jake Taylor's avatar Jake Taylor
Browse files

more interface improvesments

parent 54928a47
Branches
Tags
No related merge requests found
...@@ -35,11 +35,11 @@ type HostManager interface { ...@@ -35,11 +35,11 @@ type HostManager interface {
// Handles providing hosts to the Client // Handles providing hosts to the Client
type HostPool struct { type HostPool struct {
hostMap map[*id.ID]uint32 // map key to its index in the slice hostMap map[id.ID]uint32 // map key to its index in the slice
hostList []*connect.Host // each index in the slice contains the value hostList []*connect.Host // each index in the slice contains the value
hostMux sync.RWMutex // Mutex for the above map/list combination hostMux sync.RWMutex // Mutex for the above map/list combination
ndfMap map[*id.ID]int // map gateway ID to its index in the ndf ndfMap map[id.ID]int // map gateway ID to its index in the ndf
ndf *ndf.NetworkDefinition ndf *ndf.NetworkDefinition
isNdfUpdated bool // indicates the NDF has been updated and needs processed isNdfUpdated bool // indicates the NDF has been updated and needs processed
ndfMux sync.RWMutex ndfMux sync.RWMutex
...@@ -74,7 +74,7 @@ func NewHostPool(poolParams PoolParams, rng io.Reader, ndf *ndf.NetworkDefinitio ...@@ -74,7 +74,7 @@ func NewHostPool(poolParams PoolParams, rng io.Reader, ndf *ndf.NetworkDefinitio
storage *storage.Session, addGateway chan network.NodeGateway) (*HostPool, error) { storage *storage.Session, addGateway chan network.NodeGateway) (*HostPool, error) {
result := &HostPool{ result := &HostPool{
manager: getter, manager: getter,
hostMap: make(map[*id.ID]uint32), hostMap: make(map[id.ID]uint32),
hostList: make([]*connect.Host, poolParams.poolSize), hostList: make([]*connect.Host, poolParams.poolSize),
poolParams: poolParams, poolParams: poolParams,
ndf: ndf, ndf: ndf,
...@@ -105,29 +105,63 @@ func (h *HostPool) UpdateNdf(ndf *ndf.NetworkDefinition) { ...@@ -105,29 +105,63 @@ func (h *HostPool) UpdateNdf(ndf *ndf.NetworkDefinition) {
h.ndfMux.Unlock() h.ndfMux.Unlock()
} }
// Return a random GwHost from the HostPool // Obtain a random, unique list of Hosts of the given length from the HostPool
func (h *HostPool) GetAny() *connect.Host { func (h *HostPool) GetAnyList(length int) []*connect.Host {
gwIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng) checked := make(map[uint32]interface{})
result := make([]*connect.Host, length)
h.hostMux.RLock() h.hostMux.RLock()
defer h.hostMux.RUnlock() for i := 0; i < length; i++ {
return h.hostList[gwIdx] gwIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng)
if _, ok := checked[gwIdx]; !ok {
result[i] = h.hostList[gwIdx]
checked[gwIdx] = nil
i++
}
}
h.hostMux.RUnlock()
return result
}
// Obtain a specific list of Hosts from the manager, irrespective of the HostPool
func (h *HostPool) GetSpecific(targets []*id.ID) []*connect.Host {
result := make([]*connect.Host, len(targets))
for i := 0; i < len(targets); i++ {
result[i], _ = h.manager.GetHost(targets[i])
} }
return result
}
// Try to obtain the given targets from the HostPool
// If each is not present, obtain a random replacement from the HostPool
func (h *HostPool) GetPreferred(targets []*id.ID) []*connect.Host {
checked := make(map[uint32]interface{}) // Keep track of Hosts already selected to avoid duplicates
result := make([]*connect.Host, len(targets))
// Try to obtain a specific host from the HostPool.
// If that GwId is not in the HostPool, return a random GwHost from the HostPool
func (h *HostPool) GetSpecific(gwId *id.ID) *connect.Host {
h.hostMux.RLock() h.hostMux.RLock()
if hostIdx, ok := h.hostMap[gwId]; ok { for i := 0; i < len(targets); i++ {
defer h.hostMux.RUnlock() if hostIdx, ok := h.hostMap[*targets[i]]; ok {
return h.hostList[hostIdx] result[i] = h.hostList[hostIdx]
i++
}
gwIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng)
if _, ok := checked[gwIdx]; !ok {
result[i] = h.hostList[gwIdx]
checked[gwIdx] = nil
i++
}
} }
h.hostMux.RUnlock() h.hostMux.RUnlock()
return h.GetAny()
return result
} }
// Start long-running thread and return the thread controller to the caller // Start long-running thread and return the thread controller to the caller
func (h *HostPool) StartHostPool() stoppable.Stoppable { func (h *HostPool) StartHostPool() stoppable.Stoppable {
stopper := stoppable.NewSingle("HostPool") stopper := stoppable.NewSingle("HostPool")
jww.INFO.Printf("Starting Host Pool...")
go h.manageHostPool(stopper) go h.manageHostPool(stopper)
return stopper return stopper
} }
...@@ -185,7 +219,7 @@ func (h *HostPool) pruneHostPool() error { ...@@ -185,7 +219,7 @@ func (h *HostPool) pruneHostPool() error {
} }
// Verify the GwId is not already in the hostMap // Verify the GwId is not already in the hostMap
if _, ok := h.hostMap[gwId]; !ok { if _, ok := h.hostMap[*gwId]; !ok {
// If it is a new GwId, replace the old Host with the new Host // If it is a new GwId, replace the old Host with the new Host
err = h.replaceHost(gwId, poolIdx) err = h.replaceHost(gwId, poolIdx)
if err != nil { if err != nil {
...@@ -213,30 +247,41 @@ func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error { ...@@ -213,30 +247,41 @@ func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error {
// Use the poolIdx to overwrite the random Host in the corresponding index in the hostList // Use the poolIdx to overwrite the random Host in the corresponding index in the hostList
h.hostList[oldPoolIndex] = newHost h.hostList[oldPoolIndex] = newHost
// Use the GwId to keep track of the new random Host's index in the hostList // Use the GwId to keep track of the new random Host's index in the hostList
h.hostMap[newId] = oldPoolIndex h.hostMap[*newId] = oldPoolIndex
// Clean up and move onto next Host // Clean up and move onto next Host
if oldHost != nil { if oldHost != nil {
delete(h.hostMap, oldHost.GetId()) delete(h.hostMap, *oldHost.GetId())
oldHost.Disconnect() oldHost.Disconnect()
} }
jww.DEBUG.Printf("Replaced Host at %d with new Host %s", oldPoolIndex, newId.String())
return nil return nil
} }
// Force-add the specified Gateway to the HostPool, replacing a random Gateway // Force-add the Gateways to the HostPool, each replacing a random Gateway
// TODO: Possibly needs to take a list? func (h *HostPool) ForceAdd(gwIds []*id.ID) error {
func (h *HostPool) ForceAdd(gwId *id.ID) error {
h.hostMux.Lock() h.hostMux.Lock()
defer h.hostMux.Unlock() defer h.hostMux.Unlock()
checked := make(map[int]interface{}) // Keep track of Hosts already replaced
for i := 0; i < len(gwIds); {
// Verify the GwId is not already in the hostMap // Verify the GwId is not already in the hostMap
if _, ok := h.hostMap[gwId]; ok { if _, ok := h.hostMap[*gwIds[i]]; ok {
return nil continue
} }
// Randomly select another Gateway in the HostPool for replacement // Randomly select another Gateway in the HostPool for replacement
poolIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng) poolIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng)
return h.replaceHost(gwId, poolIdx) if _, ok := checked[i]; !ok {
err := h.replaceHost(gwIds[i], poolIdx)
if err != nil {
return err
}
checked[i] = nil
i++
}
}
return nil
} }
// Updates the internal HostPool with any changes to the NDF // Updates the internal HostPool with any changes to the NDF
...@@ -251,7 +296,7 @@ func (h *HostPool) updateConns() error { ...@@ -251,7 +296,7 @@ func (h *HostPool) updateConns() error {
for gwId := range h.ndfMap { for gwId := range h.ndfMap {
if _, ok := newMap[gwId]; !ok { if _, ok := newMap[gwId]; !ok {
// If GwId in ndfMap is not in newMap, remove the Gateway // If GwId in ndfMap is not in newMap, remove the Gateway
h.removeGateway(gwId) h.removeGateway(&gwId)
} }
} }
...@@ -259,7 +304,7 @@ func (h *HostPool) updateConns() error { ...@@ -259,7 +304,7 @@ func (h *HostPool) updateConns() error {
for gwId, ndfIdx := range newMap { for gwId, ndfIdx := range newMap {
if _, ok := h.ndfMap[gwId]; !ok { if _, ok := h.ndfMap[gwId]; !ok {
// If GwId in newMap is not in ndfMap, add the Gateway // If GwId in newMap is not in ndfMap, add the Gateway
h.addGateway(gwId, ndfIdx) h.addGateway(&gwId, ndfIdx)
} }
} }
...@@ -269,15 +314,15 @@ func (h *HostPool) updateConns() error { ...@@ -269,15 +314,15 @@ func (h *HostPool) updateConns() error {
} }
// Takes ndf.Gateways and puts their IDs into a map object // Takes ndf.Gateways and puts their IDs into a map object
func convertNdfToMap(ndf *ndf.NetworkDefinition) (map[*id.ID]int, error) { func convertNdfToMap(ndf *ndf.NetworkDefinition) (map[id.ID]int, error) {
result := make(map[*id.ID]int) result := make(map[id.ID]int)
// Process gateway Id's into set // Process gateway Id's into set
for i, gw := range ndf.Gateways { for i, gw := range ndf.Gateways {
gwId, err := id.Unmarshal(gw.ID) gwId, err := id.Unmarshal(gw.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
result[gwId] = i result[*gwId] = i
} }
return result, nil return result, nil
...@@ -287,9 +332,9 @@ func convertNdfToMap(ndf *ndf.NetworkDefinition) (map[*id.ID]int, error) { ...@@ -287,9 +332,9 @@ func convertNdfToMap(ndf *ndf.NetworkDefinition) (map[*id.ID]int, error) {
func (h *HostPool) removeGateway(gwId *id.ID) { func (h *HostPool) removeGateway(gwId *id.ID) {
h.manager.RemoveHost(gwId) h.manager.RemoveHost(gwId)
// If needed, flag the Host for deletion from the HostPool // If needed, flag the Host for deletion from the HostPool
if poolIndex, ok := h.hostMap[gwId]; ok { if poolIndex, ok := h.hostMap[*gwId]; ok {
h.hostList[poolIndex] = nil h.hostList[poolIndex] = nil
delete(h.hostMap, gwId) delete(h.hostMap, *gwId)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment