Skip to content
Snippets Groups Projects
Commit 6b9e4099 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

move storage for the sender structure and improved edge checking on node registrar

parent 735f7dd2
No related branches found
No related tags found
4 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast,!187Xx 3829/triggers
......@@ -16,6 +16,7 @@ import (
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/crypto/shuffle"
......@@ -66,7 +67,9 @@ type HostPool struct {
rng *fastRNG.StreamGenerator
storage *storage.Session
manager HostManager
addGatewayChan chan network.NodeGateway
addGatewayChan chan<- network.NodeGateway
kv *versioned.KV
filterMux sync.Mutex
filter Filter
......@@ -106,7 +109,7 @@ func DefaultPoolParams() PoolParams {
// Build and return new HostPool object
func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator,
netDef *ndf.NetworkDefinition, getter HostManager, storage *storage.Session,
addGateway chan network.NodeGateway) (*HostPool, error) {
addGateway chan<- network.NodeGateway) (*HostPool, error) {
var err error
// Determine size of HostPool
......@@ -127,6 +130,7 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator,
rng: rng,
storage: storage,
addGatewayChan: addGateway,
kv: storage.GetKV().Prefix(hostListPrefix),
// Initialise the filter so it does not filter any IDs
filter: func(m map[id.ID]int, _ *ndf.NetworkDefinition) map[id.ID]int {
......@@ -141,7 +145,7 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator,
}
// get the last used list of hosts and use it to seed the host pool list
hostList, err := storage.HostList().Get()
hostList, err := getHostList(result.kv)
numHostsAdded := 0
if err == nil {
for _, hid := range hostList {
......@@ -519,7 +523,7 @@ func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error {
}
// Save the list to storage
return h.storage.HostList().Store(idList)
return saveHostList(h.kv, idList)
}
// replaceHostNoStore replaces the given slot in the HostPool with a new Gateway
......@@ -677,17 +681,16 @@ func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) {
}
// Send AddGateway event if we do not already possess keys for the GW
if !h.storage.Cmix().Has(gwId) {
ng := network.NodeGateway{
Node: h.ndf.Nodes[ndfIndex],
Gateway: gw,
}
// the recipient of the channel checks if it should process
ng := network.NodeGateway{
Node: h.ndf.Nodes[ndfIndex],
Gateway: gw,
}
select {
case h.addGatewayChan <- ng:
default:
jww.WARN.Printf("Unable to send AddGateway event for id %s", gwId.String())
}
select {
case h.addGatewayChan <- ng:
default:
jww.WARN.Printf("Unable to send AddGateway event for id %s", gwId.String())
}
} else if host.GetAddress() != gw.Address {
......
......@@ -12,7 +12,7 @@
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package hostList
package gateway
import (
"bytes"
......@@ -36,31 +36,19 @@ const (
unmarshallLenErr = "malformed data: length of data %d incorrect"
)
type Store struct {
kv *versioned.KV
}
// NewStore creates a new Store with a prefixed KV.
func NewStore(kv *versioned.KV) *Store {
return &Store{
kv: kv.Prefix(hostListPrefix),
}
}
// Store saves the list of host IDs to storage.
func (s *Store) Store(list []*id.ID) error {
func saveHostList(kv *versioned.KV, list []*id.ID) error {
obj := &versioned.Object{
Version: hostListVersion,
Data: marshalHostList(list),
Timestamp: netTime.Now(),
}
return s.kv.Set(hostListKey, hostListVersion, obj)
return kv.Set(hostListKey, hostListVersion, obj)
}
// Get returns the host list from storage.
func (s *Store) Get() ([]*id.ID, error) {
obj, err := s.kv.Get(hostListKey, hostListVersion)
func getHostList(kv *versioned.KV) ([]*id.ID, error) {
obj, err := kv.Get(hostListKey, hostListVersion)
if err != nil {
return nil, errors.Errorf(getStorageErr, err)
}
......
......@@ -12,7 +12,7 @@
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package hostList
package gateway
import (
"fmt"
......
......@@ -54,6 +54,16 @@ func registerNodes(r *registrar, stop *stoppable.Single, inProgress, attempts *s
case gw := <-r.c:
rng := r.rng.GetStream()
nidStr := fmt.Sprintf("%x", gw.Node.ID)
nid, err := gw.Node.GetNodeId()
if err != nil {
jww.WARN.Printf("Could not process node ID for registration: %s", err)
continue
}
if r.Has(nid) {
jww.INFO.Printf("not registering node %s, already registered", nid)
}
if _, operating := inProgress.LoadOrStore(nidStr, struct{}{}); operating {
continue
}
......@@ -70,7 +80,7 @@ func registerNodes(r *registrar, stop *stoppable.Single, inProgress, attempts *s
jww.DEBUG.Printf("Skipping registration with stale nodes %s", nidStr)
continue
}
err := registerWithNode(r.sender, r.comms, gw, regSignature,
err = registerWithNode(r.sender, r.comms, gw, regSignature,
regTimestamp, uci, r, rng, stop)
inProgress.Delete(nidStr)
if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment