diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index 8e452207811a9f6b41474085e8786bc23aa2c480..93e033d8695037da6ae05c1a47ee9da56d9f5023 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -16,6 +16,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/storage" + "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/shuffle" @@ -66,7 +67,9 @@ type HostPool struct { rng *fastRNG.StreamGenerator storage *storage.Session manager HostManager - addGatewayChan chan network.NodeGateway + addGatewayChan chan<- network.NodeGateway + + kv *versioned.KV filterMux sync.Mutex filter Filter @@ -106,7 +109,7 @@ func DefaultPoolParams() PoolParams { // Build and return new HostPool object func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, netDef *ndf.NetworkDefinition, getter HostManager, storage *storage.Session, - addGateway chan network.NodeGateway) (*HostPool, error) { + addGateway chan<- network.NodeGateway) (*HostPool, error) { var err error // Determine size of HostPool @@ -127,6 +130,7 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, rng: rng, storage: storage, addGatewayChan: addGateway, + kv: storage.GetKV().Prefix(hostListPrefix), // Initialise the filter so it does not filter any IDs filter: func(m map[id.ID]int, _ *ndf.NetworkDefinition) map[id.ID]int { @@ -141,7 +145,7 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, } // get the last used list of hosts and use it to seed the host pool list - hostList, err := storage.HostList().Get() + hostList, err := getHostList(result.kv) numHostsAdded := 0 if err == nil { for _, hid := range hostList { @@ -519,7 +523,7 @@ func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error { } // Save the list to storage - return h.storage.HostList().Store(idList) + return saveHostList(h.kv, idList) } // replaceHostNoStore replaces the given slot in the HostPool with a new Gateway @@ -677,17 +681,16 @@ func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { } // Send AddGateway event if we do not already possess keys for the GW - if !h.storage.Cmix().Has(gwId) { - ng := network.NodeGateway{ - Node: h.ndf.Nodes[ndfIndex], - Gateway: gw, - } + // the recipient of the channel checks if it should process + ng := network.NodeGateway{ + Node: h.ndf.Nodes[ndfIndex], + Gateway: gw, + } - select { - case h.addGatewayChan <- ng: - default: - jww.WARN.Printf("Unable to send AddGateway event for id %s", gwId.String()) - } + select { + case h.addGatewayChan <- ng: + default: + jww.WARN.Printf("Unable to send AddGateway event for id %s", gwId.String()) } } else if host.GetAddress() != gw.Address { diff --git a/storage/hostList/hostList.go b/network/gateway/storeHostList.go similarity index 86% rename from storage/hostList/hostList.go rename to network/gateway/storeHostList.go index 5242e7b8483ace3820111aa6c22455c7ba5a0257..bb52690b8cf1502b83df8b9367ead7820392d21c 100644 --- a/storage/hostList/hostList.go +++ b/network/gateway/storeHostList.go @@ -12,7 +12,7 @@ // LICENSE file // //////////////////////////////////////////////////////////////////////////////// -package hostList +package gateway import ( "bytes" @@ -36,31 +36,19 @@ const ( unmarshallLenErr = "malformed data: length of data %d incorrect" ) -type Store struct { - kv *versioned.KV -} - -// NewStore creates a new Store with a prefixed KV. -func NewStore(kv *versioned.KV) *Store { - return &Store{ - kv: kv.Prefix(hostListPrefix), - } -} - -// Store saves the list of host IDs to storage. -func (s *Store) Store(list []*id.ID) error { +func saveHostList(kv *versioned.KV, list []*id.ID) error { obj := &versioned.Object{ Version: hostListVersion, Data: marshalHostList(list), Timestamp: netTime.Now(), } - return s.kv.Set(hostListKey, hostListVersion, obj) + return kv.Set(hostListKey, hostListVersion, obj) } // Get returns the host list from storage. -func (s *Store) Get() ([]*id.ID, error) { - obj, err := s.kv.Get(hostListKey, hostListVersion) +func getHostList(kv *versioned.KV) ([]*id.ID, error) { + obj, err := kv.Get(hostListKey, hostListVersion) if err != nil { return nil, errors.Errorf(getStorageErr, err) } diff --git a/storage/hostList/hostList_test.go b/network/gateway/storeHostList_test.go similarity index 99% rename from storage/hostList/hostList_test.go rename to network/gateway/storeHostList_test.go index 1371370cc226dfa01eb52b963d36122c2ce2a902..d56f60bbe5d2c204857d38e8900466e5b8a4f935 100644 --- a/storage/hostList/hostList_test.go +++ b/network/gateway/storeHostList_test.go @@ -12,7 +12,7 @@ // LICENSE file // //////////////////////////////////////////////////////////////////////////////// -package hostList +package gateway import ( "fmt" diff --git a/network/nodes/register.go b/network/nodes/register.go index 23f18f7a7c8516578b8d94cf8ad970394eec5eff..7025e7bb831bae17463d1ea683c60070391551e3 100644 --- a/network/nodes/register.go +++ b/network/nodes/register.go @@ -54,6 +54,16 @@ func registerNodes(r *registrar, stop *stoppable.Single, inProgress, attempts *s case gw := <-r.c: rng := r.rng.GetStream() nidStr := fmt.Sprintf("%x", gw.Node.ID) + nid, err := gw.Node.GetNodeId() + if err != nil { + jww.WARN.Printf("Could not process node ID for registration: %s", err) + continue + } + + if r.Has(nid) { + jww.INFO.Printf("not registering node %s, already registered", nid) + } + if _, operating := inProgress.LoadOrStore(nidStr, struct{}{}); operating { continue } @@ -70,7 +80,7 @@ func registerNodes(r *registrar, stop *stoppable.Single, inProgress, attempts *s jww.DEBUG.Printf("Skipping registration with stale nodes %s", nidStr) continue } - err := registerWithNode(r.sender, r.comms, gw, regSignature, + err = registerWithNode(r.sender, r.comms, gw, regSignature, regTimestamp, uci, r, rng, stop) inProgress.Delete(nidStr) if err != nil {