Skip to content
Snippets Groups Projects
Commit 65988c4a authored by Josh Brooks's avatar Josh Brooks
Browse files

Improve documentation for cmix/nodes

parent b7470ca3
No related branches found
No related tags found
2 merge requests!510Release,!207WIP: Client Restructure
......@@ -8,16 +8,74 @@
package nodes
import (
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage/versioned"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/cyclic"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/crypto/signature/rsa"
"gitlab.com/xx_network/primitives/id"
"time"
)
// Registrar is an interface for managing the registrations
// for cMix nodes.
type Registrar interface {
// StartProcesses initiates numParallel amount of threads
// to register with nodes.
StartProcesses(numParallel uint) stoppable.Stoppable
// GetNodeKeys returns a MixCypher for the topology and a list of nodes it did
// not have a key for. If there are missing keys, then returns nil.
GetNodeKeys(topology *connect.Circuit) (MixCypher, error)
// HasNode returns whether Registrar has registered with this cMix node
HasNode(nid *id.ID) bool
// RemoveNode removes the node from the registrar
RemoveNode(nid *id.ID)
// NumRegisteredNodes returns the number of registered nodes.
NumRegisteredNodes() int
// GetInputChannel returns the send-only channel for registering with
// a cMix node.
GetInputChannel() chan<- network.NodeGateway
// TriggerNodeRegistration initiates a registration with the given
// cMix node by sending on the registrar's registration channel.
TriggerNodeRegistration(nid *id.ID)
}
// MixCypher is an interface for the cryptographic operations done in order
// to encrypt a cMix message to a node.
type MixCypher interface {
// Encrypt encrypts the given message for cMix. Panics if the passed
// message is not sized correctly for the group.
Encrypt(msg format.Message, salt []byte, roundID id.Round) (
format.Message, [][]byte)
// MakeClientGatewayAuthMAC generates the MAC the gateway will
// check when receiving a cMix message.
MakeClientGatewayAuthMAC(salt, digest []byte) []byte
}
// RegisterNodeCommsInterface is a sub-interface of client.Comms containing
// the send function for registering with a cMix node.
type RegisterNodeCommsInterface interface {
SendRequestClientKeyMessage(host *connect.Host,
message *pb.SignedClientKeyRequest) (*pb.SignedKeyResponse, error)
}
// Session is a sub-interface of the storage.Session interface relevant to
// the methods used in this package.
type Session interface {
GetTransmissionID() *id.ID
IsPrecanned() bool
GetCmixGroup() *cyclic.Group
GetKV() *versioned.KV
GetTransmissionRSA() *rsa.PrivateKey
GetRegistrationTimestamp() time.Time
GetTransmissionSalt() []byte
......
......@@ -17,12 +17,7 @@ import (
"golang.org/x/crypto/blake2b"
)
type MixCypher interface {
Encrypt(msg format.Message, salt []byte, roundID id.Round) (
format.Message, [][]byte)
MakeClientGatewayAuthMAC(salt, digest []byte) []byte
}
// mixCypher is an implementation of the MixCypher interface.
type mixCypher struct {
keys []*key
g *cyclic.Group
......
......@@ -19,13 +19,18 @@ import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/cmix/gateway"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/cyclic"
"gitlab.com/xx_network/primitives/ndf"
)
func registerNodes(r *registrar, s storage.Session, stop *stoppable.Single,
// registerNodes is a manager thread which waits on a channel for nodes
// to register with. On reception, it tries to register with that node.
// This thread is interrupted by the stoppable.Single passed in.
// The sync.Map's keep track of the node(s) that were in progress
// before an interruption and how many registration attempts have
// been attempted.
func registerNodes(r *registrar, s Session, stop *stoppable.Single,
inProgress, attempts *sync.Map) {
interval := time.Duration(500) * time.Millisecond
......@@ -33,31 +38,40 @@ func registerNodes(r *registrar, s storage.Session, stop *stoppable.Single,
for {
select {
case <-stop.Quit():
// On a stop signal, close the thread
t.Stop()
stop.ToStopped()
return
case gw := <-r.c:
rng := r.rng.GetStream()
// Pull node information from channel
nidStr := hex.EncodeToString(gw.Node.ID)
nid, err := gw.Node.GetNodeId()
if err != nil {
jww.WARN.Printf(
"Could not process node ID for registration: %s", err)
rng.Close()
continue
}
// Check if the registrar has this node already
if r.HasNode(nid) {
jww.INFO.Printf(
"Not registering node %s, already registered", nid)
}
// Check if the client is already attempting to register with this
// node in another thread
if _, operating := inProgress.LoadOrStore(nidStr,
struct{}{}); operating {
rng.Close()
continue
}
// Keep track of how many times this has been attempted
// Keep track of how many times registering with this node
// has been attempted
numAttempts := uint(1)
if nunAttemptsInterface, hasValue := attempts.LoadOrStore(
nidStr, numAttempts); hasValue {
......@@ -69,17 +83,24 @@ func registerNodes(r *registrar, s storage.Session, stop *stoppable.Single,
if isStale := gw.Node.Status == ndf.Stale; isStale {
jww.DEBUG.Printf(
"Skipping registration with stale nodes %s", nidStr)
rng.Close()
continue
}
// Register with this node
err = registerWithNode(r.sender, r.comms, gw, s, r, rng, stop)
// Remove from in progress immediately (success or failure)
inProgress.Delete(nidStr)
// Process the result
if err != nil {
jww.ERROR.Printf("Failed to register nodes: %+v", err)
// If we have not reached the attempt limit for this gateway,
// then send it back into the channel to retry
if numAttempts < maxAttempts {
go func() {
// Delay the send for a backoff
// Delay the send operation for a backoff
time.Sleep(delayTable[numAttempts-1])
r.c <- gw
}()
......
......@@ -5,9 +5,7 @@ import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/cmix/gateway"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/versioned"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/comms/connect"
......@@ -21,6 +19,7 @@ import (
const InputChanLen = 1000
const maxAttempts = 5
// Backoff for attempting to register with a cMix node.
var delayTable = [5]time.Duration{
0,
5 * time.Second,
......@@ -29,27 +28,13 @@ var delayTable = [5]time.Duration{
120 * time.Second,
}
type Registrar interface {
StartProcesses(numParallel uint) stoppable.Stoppable
HasNode(nid *id.ID) bool
RemoveNode(nid *id.ID)
GetNodeKeys(topology *connect.Circuit) (MixCypher, error)
NumRegisteredNodes() int
GetInputChannel() chan<- network.NodeGateway
TriggerNodeRegistration(nid *id.ID)
}
type RegisterNodeCommsInterface interface {
SendRequestClientKeyMessage(host *connect.Host,
message *pb.SignedClientKeyRequest) (*pb.SignedKeyResponse, error)
}
// registrar is an implementation of the Registrar interface.
type registrar struct {
nodes map[id.ID]*key
kv *versioned.KV
mux sync.RWMutex
session storage.Session
session Session
sender gateway.Sender
comms RegisterNodeCommsInterface
rng *fastRNG.StreamGenerator
......@@ -59,7 +44,7 @@ type registrar struct {
// LoadRegistrar loads a Registrar from disk or creates a new one if it does not
// exist.
func LoadRegistrar(session storage.Session, sender gateway.Sender,
func LoadRegistrar(session Session, sender gateway.Sender,
comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator,
c chan network.NodeGateway) (Registrar, error) {
......@@ -94,6 +79,8 @@ func LoadRegistrar(session storage.Session, sender gateway.Sender,
return r, nil
}
// StartProcesses initiates numParallel amount of threads
// to register with nodes.
func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable {
multi := stoppable.NewMulti("NodeRegistrations")
......@@ -113,19 +100,6 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable {
return multi
}
func (r *registrar) GetInputChannel() chan<- network.NodeGateway {
return r.c
}
func (r *registrar) TriggerNodeRegistration(nid *id.ID) {
r.c <- network.NodeGateway{
Node: ndf.Node{
ID: nid.Marshal(),
Status: ndf.Active, // Must be active because it is in a round
},
}
}
// GetNodeKeys returns a MixCypher for the topology and a list of nodes it did
// not have a key for. If there are missing keys, then returns nil.
func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) {
......@@ -183,3 +157,20 @@ func (r *registrar) NumRegisteredNodes() int {
defer r.mux.RUnlock()
return len(r.nodes)
}
// GetInputChannel returns the send-only channel for registering with
// a cMix node.
func (r *registrar) GetInputChannel() chan<- network.NodeGateway {
return r.c
}
// TriggerNodeRegistration initiates a registration with the given
// cMix node by sending on the registrar's registration channel.
func (r *registrar) TriggerNodeRegistration(nid *id.ID) {
r.c <- network.NodeGateway{
Node: ndf.Node{
ID: nid.Marshal(),
Status: ndf.Active, // Must be active because it is in a round
},
}
}
......@@ -12,6 +12,7 @@ import (
"gitlab.com/elixxir/client/cmix/gateway"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/versioned"
pb "gitlab.com/elixxir/comms/mixmessages"
commNetwork "gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/cyclic"
......@@ -155,9 +156,16 @@ type mockSession struct {
transmissionSig []byte
}
func (m mockSession) GetCmixGroup() *cyclic.Group {
return nil
}
func (m mockSession) GetKV() *versioned.KV {
return nil
}
func (m mockSession) GetTransmissionID() *id.ID {
//TODO implement me
panic("implement me")
return nil
}
func (m mockSession) IsPrecanned() bool {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment