Skip to content
Snippets Groups Projects
Commit eb934962 authored by Jono Wenger's avatar Jono Wenger
Browse files

XX-3344 / Storage host pool on change

parent 34f01d77
No related branches found
No related tags found
1 merge request!23Release
...@@ -219,7 +219,7 @@ func Login(storageDir string, password []byte, parameters params.Network) (*Clie ...@@ -219,7 +219,7 @@ func Login(storageDir string, password []byte, parameters params.Network) (*Clie
} }
//get the NDF to pass into permissioning and the network manager //get the NDF to pass into permissioning and the network manager
def := c.storage.GetBaseNDF() def := c.storage.GetNDF()
//initialize permissioning //initialize permissioning
if def.Registration.Address != "" { if def.Registration.Address != "" {
...@@ -286,7 +286,7 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, ...@@ -286,7 +286,7 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte,
} }
//store the updated base NDF //store the updated base NDF
c.storage.SetBaseNDF(def) c.storage.SetNDF(def)
//initialize permissioning //initialize permissioning
if def.Registration.Address != "" { if def.Registration.Address != "" {
...@@ -622,7 +622,7 @@ func checkVersionAndSetupStorage(def *ndf.NetworkDefinition, storageDir string, ...@@ -622,7 +622,7 @@ func checkVersionAndSetupStorage(def *ndf.NetworkDefinition, storageDir string,
} }
// Save NDF to be used in the future // Save NDF to be used in the future
storageSess.SetBaseNDF(def) storageSess.SetNDF(def)
if !isPrecanned { if !isPrecanned {
//store the registration code for later use //store the registration code for later use
......
...@@ -159,6 +159,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, ...@@ -159,6 +159,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source,
// update gateway connections // update gateway connections
m.GetSender().UpdateNdf(m.GetInstance().GetPartialNdf().Get()) m.GetSender().UpdateNdf(m.GetInstance().GetPartialNdf().Get())
m.Session.SetNDF(m.GetInstance().GetPartialNdf().Get())
} }
// Update the address space size // Update the address space size
......
...@@ -13,7 +13,6 @@ package gateway ...@@ -13,7 +13,6 @@ package gateway
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
"github.com/pkg/errors" "github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
...@@ -115,8 +114,24 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, ndf *ndf.N ...@@ -115,8 +114,24 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, ndf *ndf.N
return nil, err return nil, err
} }
// Get the last used list of hosts and use it to seed the host pool list
hostList, err := storage.HostList().Get()
numHostsAdded := 0
if err == nil {
for _, hid := range hostList {
err := result.replaceHostNoStore(hid, uint32(numHostsAdded))
if err != nil {
jww.WARN.Printf("Unable to add stored host %s: %s", hid, err.Error())
} else {
numHostsAdded++
}
}
} else {
jww.WARN.Printf("Building new HostPool because no HostList stored: %+v", err)
}
// Build the initial HostPool and return // Build the initial HostPool and return
for i := 0; i < len(result.hostList); i++ { for i := numHostsAdded; i < len(result.hostList); i++ {
err := result.forceReplace(uint32(i)) err := result.forceReplace(uint32(i))
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -279,8 +294,29 @@ func (h *HostPool) forceReplace(oldPoolIndex uint32) error { ...@@ -279,8 +294,29 @@ func (h *HostPool) forceReplace(oldPoolIndex uint32) error {
} }
} }
// Replace the given slot in the HostPool with a new Gateway with the specified ID // replaceHost replaces the given slot in the HostPool with a new Gateway with
// the specified ID. The resulting host list is saved to storage.
func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error { func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error {
err := h.replaceHostNoStore(newId, oldPoolIndex)
if err != nil {
return err
}
// Convert list of of non-nil and non-zero hosts to ID list
idList := make([]*id.ID, 0, len(h.hostList))
for _, host := range h.hostList {
if host.GetId() != nil && !host.GetId().Cmp(&id.ID{}) {
idList = append(idList, host.GetId())
}
}
// Save the list to storage
return h.storage.HostList().Store(idList)
}
// replaceHostNoStore replaces the given slot in the HostPool with a new Gateway
// with the specified ID.
func (h *HostPool) replaceHostNoStore(newId *id.ID, oldPoolIndex uint32) error {
// Obtain that GwId's Host object // Obtain that GwId's Host object
newHost, ok := h.manager.GetHost(newId) newHost, ok := h.manager.GetHost(newId)
if !ok { if !ok {
...@@ -291,7 +327,8 @@ func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error { ...@@ -291,7 +327,8 @@ func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error {
// Keep track of oldHost for cleanup // Keep track of oldHost for cleanup
oldHost := h.hostList[oldPoolIndex] oldHost := h.hostList[oldPoolIndex]
// Use the poolIdx to overwrite the random Host in the corresponding index in the hostList // Use the poolIdx to overwrite the random Host in the corresponding index
// in the hostList
h.hostList[oldPoolIndex] = newHost h.hostList[oldPoolIndex] = newHost
// Use the GwId to keep track of the new random Host's index in the hostList // Use the GwId to keep track of the new random Host's index in the hostList
h.hostMap[*newId] = oldPoolIndex h.hostMap[*newId] = oldPoolIndex
...@@ -301,7 +338,9 @@ func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error { ...@@ -301,7 +338,9 @@ func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error {
delete(h.hostMap, *oldHost.GetId()) delete(h.hostMap, *oldHost.GetId())
go oldHost.Disconnect() go oldHost.Disconnect()
} }
jww.DEBUG.Printf("Replaced Host at %d with new Host %s", oldPoolIndex, newId.String()) jww.DEBUG.Printf("Replaced Host at %d with new Host %s", oldPoolIndex,
newId.String())
return nil return nil
} }
...@@ -388,7 +427,7 @@ func (h *HostPool) removeGateway(gwId *id.ID) { ...@@ -388,7 +427,7 @@ func (h *HostPool) removeGateway(gwId *id.ID) {
func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) {
gw := h.ndf.Gateways[ndfIndex] gw := h.ndf.Gateways[ndfIndex]
//check if the host exists // Check if the host exists
host, ok := h.manager.GetHost(gwId) host, ok := h.manager.GetHost(gwId)
if !ok { if !ok {
...@@ -443,7 +482,7 @@ func readUint32(rng io.Reader) uint32 { ...@@ -443,7 +482,7 @@ func readUint32(rng io.Reader) uint32 {
var rndBytes [4]byte var rndBytes [4]byte
i, err := rng.Read(rndBytes[:]) i, err := rng.Read(rndBytes[:])
if i != 4 || err != nil { if i != 4 || err != nil {
panic(fmt.Sprintf("cannot read from rng: %+v", err)) jww.FATAL.Panicf("cannot read from rng: %+v", err)
} }
return binary.BigEndian.Uint32(rndBytes[:]) return binary.BigEndian.Uint32(rndBytes[:])
} }
......
...@@ -54,6 +54,49 @@ func TestNewHostPool(t *testing.T) { ...@@ -54,6 +54,49 @@ func TestNewHostPool(t *testing.T) {
} }
} }
// Tests that the hosts are loaded from storage, if they exist.
func TestNewHostPool_HostListStore(t *testing.T) {
manager := newMockManager()
rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG)
testNdf := getTestNdf(t)
testStorage := storage.InitTestingSession(t)
addGwChan := make(chan network.NodeGateway)
params := DefaultPoolParams()
params.MaxPoolSize = uint32(len(testNdf.Gateways))
addedIDs := []*id.ID{
id.NewIdFromString("testID0", id.Gateway, t),
id.NewIdFromString("testID1", id.Gateway, t),
id.NewIdFromString("testID2", id.Gateway, t),
id.NewIdFromString("testID3", id.Gateway, t),
}
err := testStorage.HostList().Store(addedIDs)
if err != nil {
t.Fatalf("Failed to store host list: %+v", err)
}
for i, hid := range addedIDs {
testNdf.Gateways[i].ID = hid.Marshal()
}
// Call the constructor
hp, err := newHostPool(params, rng, testNdf, manager, testStorage, addGwChan)
if err != nil {
t.Fatalf("Failed to create mock host pool: %v", err)
}
// Check that the host list was saved to storage
hostList, err := hp.storage.HostList().Get()
if err != nil {
t.Errorf("Failed to get host list: %+v", err)
}
if !reflect.DeepEqual(addedIDs, hostList) {
t.Errorf("Failed to save expected host list to storage."+
"\nexpected: %+v\nreceived: %+v", addedIDs, hostList)
}
}
// Unit test // Unit test
func TestHostPool_ManageHostPool(t *testing.T) { func TestHostPool_ManageHostPool(t *testing.T) {
manager := newMockManager() manager := newMockManager()
...@@ -115,7 +158,7 @@ func TestHostPool_ManageHostPool(t *testing.T) { ...@@ -115,7 +158,7 @@ func TestHostPool_ManageHostPool(t *testing.T) {
for _, ndfGw := range testNdf.Gateways { for _, ndfGw := range testNdf.Gateways {
gwId, err := id.Unmarshal(ndfGw.ID) gwId, err := id.Unmarshal(ndfGw.ID)
if err != nil { if err != nil {
t.Errorf("Failed to marshal gateway id for %v", ndfGw) t.Fatalf("Failed to marshal gateway id for %v", ndfGw)
} }
if _, ok := testPool.hostMap[*gwId]; ok { if _, ok := testPool.hostMap[*gwId]; ok {
t.Errorf("Expected gateway %v to be removed from pool", gwId) t.Errorf("Expected gateway %v to be removed from pool", gwId)
...@@ -135,6 +178,7 @@ func TestHostPool_ReplaceHost(t *testing.T) { ...@@ -135,6 +178,7 @@ func TestHostPool_ReplaceHost(t *testing.T) {
hostList: make([]*connect.Host, newIndex+1), hostList: make([]*connect.Host, newIndex+1),
hostMap: make(map[id.ID]uint32), hostMap: make(map[id.ID]uint32),
ndf: testNdf, ndf: testNdf,
storage: storage.InitTestingSession(t),
} }
/* "Replace" a host with no entry */ /* "Replace" a host with no entry */
...@@ -228,6 +272,18 @@ func TestHostPool_ReplaceHost(t *testing.T) { ...@@ -228,6 +272,18 @@ func TestHostPool_ReplaceHost(t *testing.T) {
"\n\tReceived: %d", newIndex, retrievedIndex) "\n\tReceived: %d", newIndex, retrievedIndex)
} }
// Check that the host list was saved to storage
hostList, err := hostPool.storage.HostList().Get()
if err != nil {
t.Errorf("Failed to get host list: %+v", err)
}
expectedList := []*id.ID{gwIdTwo}
if !reflect.DeepEqual(expectedList, hostList) {
t.Errorf("Failed to save expected host list to storage."+
"\nexpected: %+v\nreceived: %+v", expectedList, hostList)
}
} }
// Error path, could not get host // Error path, could not get host
...@@ -754,7 +810,7 @@ func TestHostPool_UpdateConns_RemoveGateways(t *testing.T) { ...@@ -754,7 +810,7 @@ func TestHostPool_UpdateConns_RemoveGateways(t *testing.T) {
for _, ndfGw := range testNdf.Gateways { for _, ndfGw := range testNdf.Gateways {
gwId, err := id.Unmarshal(ndfGw.ID) gwId, err := id.Unmarshal(ndfGw.ID)
if err != nil { if err != nil {
t.Errorf("Failed to marshal gateway id for %v", ndfGw) t.Fatalf("Failed to marshal gateway id for %v", ndfGw)
} }
if _, ok := testPool.hostMap[*gwId]; ok { if _, ok := testPool.hostMap[*gwId]; ok {
t.Errorf("Expected gateway %v to be removed from pool", gwId) t.Errorf("Expected gateway %v to be removed from pool", gwId)
......
...@@ -13,25 +13,25 @@ import ( ...@@ -13,25 +13,25 @@ import (
"gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/ndf"
) )
const baseNdfKey = "baseNdf" const ndfKey = "ndf"
func (s *Session) SetBaseNDF(def *ndf.NetworkDefinition) { func (s *Session) SetNDF(def *ndf.NetworkDefinition) {
err := utility.SaveNDF(s.kv, baseNdfKey, def) err := utility.SaveNDF(s.kv, ndfKey, def)
if err != nil { if err != nil {
jww.FATAL.Printf("Failed to dave the base NDF: %s", err) jww.FATAL.Printf("Failed to dave the NDF: %+v", err)
} }
s.baseNdf = def s.ndf = def
} }
func (s *Session) GetBaseNDF() *ndf.NetworkDefinition { func (s *Session) GetNDF() *ndf.NetworkDefinition {
if s.baseNdf != nil { if s.ndf != nil {
return s.baseNdf return s.ndf
} }
def, err := utility.LoadNDF(s.kv, baseNdfKey) def, err := utility.LoadNDF(s.kv, ndfKey)
if err != nil { if err != nil {
jww.FATAL.Printf("Could not load the base NDF: %s", err) jww.FATAL.Printf("Could not load the NDF: %+v", err)
} }
s.baseNdf = def s.ndf = def
return def return def
} }
...@@ -51,7 +51,7 @@ type Session struct { ...@@ -51,7 +51,7 @@ type Session struct {
//memoized data //memoized data
regStatus RegistrationStatus regStatus RegistrationStatus
baseNdf *ndf.NetworkDefinition ndf *ndf.NetworkDefinition
//sub-stores //sub-stores
e2e *e2e.Store e2e *e2e.Store
...@@ -414,5 +414,7 @@ func InitTestingSession(i interface{}) *Session { ...@@ -414,5 +414,7 @@ func InitTestingSession(i interface{}) *Session {
jww.FATAL.Panicf("Failed to create uncheckRound store: %v", err) jww.FATAL.Panicf("Failed to create uncheckRound store: %v", err)
} }
s.hostList = hostList.NewStore(s.kv)
return s return s
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment