diff --git a/cmix/nodes/interfaces.go b/cmix/nodes/interfaces.go index cced1ecd17772741bf647cdde9c41fe901c8f522..35a16cbe2db8d5a62672b657f8c66369240bc258 100644 --- a/cmix/nodes/interfaces.go +++ b/cmix/nodes/interfaces.go @@ -8,16 +8,74 @@ package nodes import ( + "gitlab.com/elixxir/client/stoppable" + "gitlab.com/elixxir/client/storage/versioned" + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/comms/network" + "gitlab.com/elixxir/crypto/cyclic" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/primitives/id" "time" ) +// Registrar is an interface for managing the registrations +// for cMix nodes. +type Registrar interface { + // StartProcesses initiates numParallel amount of threads + // to register with nodes. + StartProcesses(numParallel uint) stoppable.Stoppable + + // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did + // not have a key for. If there are missing keys, then returns nil. + GetNodeKeys(topology *connect.Circuit) (MixCypher, error) + + // HasNode returns whether Registrar has registered with this cMix node + HasNode(nid *id.ID) bool + + // RemoveNode removes the node from the registrar + RemoveNode(nid *id.ID) + + // NumRegisteredNodes returns the number of registered nodes. + NumRegisteredNodes() int + + // GetInputChannel returns the send-only channel for registering with + // a cMix node. + GetInputChannel() chan<- network.NodeGateway + + // TriggerNodeRegistration initiates a registration with the given + // cMix node by sending on the registrar's registration channel. + TriggerNodeRegistration(nid *id.ID) +} + +// MixCypher is an interface for the cryptographic operations done in order +// to encrypt a cMix message to a node. +type MixCypher interface { + // Encrypt encrypts the given message for cMix. Panics if the passed + // message is not sized correctly for the group. + Encrypt(msg format.Message, salt []byte, roundID id.Round) ( + format.Message, [][]byte) + + // MakeClientGatewayAuthMAC generates the MAC the gateway will + // check when receiving a cMix message. + MakeClientGatewayAuthMAC(salt, digest []byte) []byte +} + +// RegisterNodeCommsInterface is a sub-interface of client.Comms containing +// the send function for registering with a cMix node. +type RegisterNodeCommsInterface interface { + SendRequestClientKeyMessage(host *connect.Host, + message *pb.SignedClientKeyRequest) (*pb.SignedKeyResponse, error) +} + // Session is a sub-interface of the storage.Session interface relevant to // the methods used in this package. type Session interface { GetTransmissionID() *id.ID IsPrecanned() bool + GetCmixGroup() *cyclic.Group + GetKV() *versioned.KV GetTransmissionRSA() *rsa.PrivateKey GetRegistrationTimestamp() time.Time GetTransmissionSalt() []byte diff --git a/cmix/nodes/mixCypher.go b/cmix/nodes/mixCypher.go index 98b60efb0f4c50b18458a742d343e995edda3090..7afc8f23de227e170e6ef5d3f0d797b5dbbff5f5 100644 --- a/cmix/nodes/mixCypher.go +++ b/cmix/nodes/mixCypher.go @@ -17,12 +17,7 @@ import ( "golang.org/x/crypto/blake2b" ) -type MixCypher interface { - Encrypt(msg format.Message, salt []byte, roundID id.Round) ( - format.Message, [][]byte) - MakeClientGatewayAuthMAC(salt, digest []byte) []byte -} - +// mixCypher is an implementation of the MixCypher interface. type mixCypher struct { keys []*key g *cyclic.Group diff --git a/cmix/nodes/register.go b/cmix/nodes/register.go index 9a3c3854481e9b48ec200657db88a9672a267fa6..3d03fe4569590f1f6b96af6968e55db6ce359d4e 100644 --- a/cmix/nodes/register.go +++ b/cmix/nodes/register.go @@ -19,13 +19,18 @@ import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/cmix/gateway" "gitlab.com/elixxir/client/stoppable" - "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/cyclic" "gitlab.com/xx_network/primitives/ndf" ) -func registerNodes(r *registrar, s storage.Session, stop *stoppable.Single, +// registerNodes is a manager thread which waits on a channel for nodes +// to register with. On reception, it tries to register with that node. +// This thread is interrupted by the stoppable.Single passed in. +// The sync.Map's keep track of the node(s) that were in progress +// before an interruption and how many registration attempts have +// been attempted. +func registerNodes(r *registrar, s Session, stop *stoppable.Single, inProgress, attempts *sync.Map) { interval := time.Duration(500) * time.Millisecond @@ -33,31 +38,40 @@ func registerNodes(r *registrar, s storage.Session, stop *stoppable.Single, for { select { case <-stop.Quit(): + // On a stop signal, close the thread t.Stop() stop.ToStopped() return case gw := <-r.c: rng := r.rng.GetStream() + + // Pull node information from channel nidStr := hex.EncodeToString(gw.Node.ID) nid, err := gw.Node.GetNodeId() if err != nil { jww.WARN.Printf( "Could not process node ID for registration: %s", err) + rng.Close() continue } + // Check if the registrar has this node already if r.HasNode(nid) { jww.INFO.Printf( "Not registering node %s, already registered", nid) } + // Check if the client is already attempting to register with this + // node in another thread if _, operating := inProgress.LoadOrStore(nidStr, struct{}{}); operating { + rng.Close() continue } - // Keep track of how many times this has been attempted + // Keep track of how many times registering with this node + // has been attempted numAttempts := uint(1) if nunAttemptsInterface, hasValue := attempts.LoadOrStore( nidStr, numAttempts); hasValue { @@ -69,17 +83,24 @@ func registerNodes(r *registrar, s storage.Session, stop *stoppable.Single, if isStale := gw.Node.Status == ndf.Stale; isStale { jww.DEBUG.Printf( "Skipping registration with stale nodes %s", nidStr) + rng.Close() continue } + + // Register with this node err = registerWithNode(r.sender, r.comms, gw, s, r, rng, stop) + + // Remove from in progress immediately (success or failure) inProgress.Delete(nidStr) + + // Process the result if err != nil { jww.ERROR.Printf("Failed to register nodes: %+v", err) // If we have not reached the attempt limit for this gateway, // then send it back into the channel to retry if numAttempts < maxAttempts { go func() { - // Delay the send for a backoff + // Delay the send operation for a backoff time.Sleep(delayTable[numAttempts-1]) r.c <- gw }() diff --git a/cmix/nodes/registrar.go b/cmix/nodes/registrar.go index 460aaab23cae36c329d5d7d44108b0ced07445fd..23f859915ae356020b885b4339c2871ecde8ec6c 100644 --- a/cmix/nodes/registrar.go +++ b/cmix/nodes/registrar.go @@ -5,9 +5,7 @@ import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/cmix/gateway" "gitlab.com/elixxir/client/stoppable" - "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage/versioned" - pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/xx_network/comms/connect" @@ -21,6 +19,7 @@ import ( const InputChanLen = 1000 const maxAttempts = 5 +// Backoff for attempting to register with a cMix node. var delayTable = [5]time.Duration{ 0, 5 * time.Second, @@ -29,27 +28,13 @@ var delayTable = [5]time.Duration{ 120 * time.Second, } -type Registrar interface { - StartProcesses(numParallel uint) stoppable.Stoppable - HasNode(nid *id.ID) bool - RemoveNode(nid *id.ID) - GetNodeKeys(topology *connect.Circuit) (MixCypher, error) - NumRegisteredNodes() int - GetInputChannel() chan<- network.NodeGateway - TriggerNodeRegistration(nid *id.ID) -} - -type RegisterNodeCommsInterface interface { - SendRequestClientKeyMessage(host *connect.Host, - message *pb.SignedClientKeyRequest) (*pb.SignedKeyResponse, error) -} - +// registrar is an implementation of the Registrar interface. type registrar struct { nodes map[id.ID]*key kv *versioned.KV mux sync.RWMutex - session storage.Session + session Session sender gateway.Sender comms RegisterNodeCommsInterface rng *fastRNG.StreamGenerator @@ -59,7 +44,7 @@ type registrar struct { // LoadRegistrar loads a Registrar from disk or creates a new one if it does not // exist. -func LoadRegistrar(session storage.Session, sender gateway.Sender, +func LoadRegistrar(session Session, sender gateway.Sender, comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator, c chan network.NodeGateway) (Registrar, error) { @@ -94,6 +79,8 @@ func LoadRegistrar(session storage.Session, sender gateway.Sender, return r, nil } +// StartProcesses initiates numParallel amount of threads +// to register with nodes. func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { multi := stoppable.NewMulti("NodeRegistrations") @@ -113,19 +100,6 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { return multi } -func (r *registrar) GetInputChannel() chan<- network.NodeGateway { - return r.c -} - -func (r *registrar) TriggerNodeRegistration(nid *id.ID) { - r.c <- network.NodeGateway{ - Node: ndf.Node{ - ID: nid.Marshal(), - Status: ndf.Active, // Must be active because it is in a round - }, - } -} - // GetNodeKeys returns a MixCypher for the topology and a list of nodes it did // not have a key for. If there are missing keys, then returns nil. func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) { @@ -183,3 +157,20 @@ func (r *registrar) NumRegisteredNodes() int { defer r.mux.RUnlock() return len(r.nodes) } + +// GetInputChannel returns the send-only channel for registering with +// a cMix node. +func (r *registrar) GetInputChannel() chan<- network.NodeGateway { + return r.c +} + +// TriggerNodeRegistration initiates a registration with the given +// cMix node by sending on the registrar's registration channel. +func (r *registrar) TriggerNodeRegistration(nid *id.ID) { + r.c <- network.NodeGateway{ + Node: ndf.Node{ + ID: nid.Marshal(), + Status: ndf.Active, // Must be active because it is in a round + }, + } +} diff --git a/cmix/nodes/utils_test.go b/cmix/nodes/utils_test.go index c2dc206fef4a331afa1a0fd0766a13469a6446e4..2c1169c386582248549258522145b80b2afb4413 100644 --- a/cmix/nodes/utils_test.go +++ b/cmix/nodes/utils_test.go @@ -12,6 +12,7 @@ import ( "gitlab.com/elixxir/client/cmix/gateway" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" + "gitlab.com/elixxir/client/storage/versioned" pb "gitlab.com/elixxir/comms/mixmessages" commNetwork "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/cyclic" @@ -155,9 +156,16 @@ type mockSession struct { transmissionSig []byte } +func (m mockSession) GetCmixGroup() *cyclic.Group { + return nil +} + +func (m mockSession) GetKV() *versioned.KV { + return nil +} + func (m mockSession) GetTransmissionID() *id.ID { - //TODO implement me - panic("implement me") + return nil } func (m mockSession) IsPrecanned() bool {