Skip to content
Snippets Groups Projects
Commit aae122b6 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

improved comments and documentation, moved keyexchange, and exposed garbled message checking

parent cc007869
No related branches found
No related tags found
No related merge requests found
......@@ -11,6 +11,7 @@ import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/keyExchange"
"gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/permissioning"
"gitlab.com/elixxir/client/stoppable"
......@@ -50,13 +51,13 @@ type Client struct {
// with the network. Note that this does not register a username/identity, but
// merely creates a new cryptographic identity for adding such information
// at a later date.
func NewClient(defJSON, storageDir string, password []byte) (*Client, error) {
func NewClient(ndfJSON, storageDir string, password []byte, registrationCode string) (*Client, error) {
// Use fastRNG for RNG ops (AES fortuna based RNG using system RNG)
rngStreamGen := fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG)
rngStream := rngStreamGen.GetStream()
// Parse the NDF
def, err := parseNDF(defJSON)
def, err := parseNDF(ndfJSON)
if err != nil {
return nil, err
}
......@@ -76,6 +77,9 @@ func NewClient(defJSON, storageDir string, password []byte) (*Client, error) {
// Save NDF to be used in the future
storageSess.SetBaseNDF(def)
//store the registration code for later use
storageSess.SetRegCode(registrationCode)
//execute the rest of the loading as normal
return loadClient(storageSess, rngStreamGen)
}
......@@ -187,19 +191,39 @@ func loadClient(session *storage.Session, rngStreamGen *fastRNG.StreamGenerator)
return c, nil
}
// ----- Client Functions -----
// RegisterListener registers a listener callback function that is called
// every time a new message matches the specified parameters.
func (c *Client) RegisterListenerCb(uid id.ID, msgType int, username string,
listenerCb func(msg Message)) {
jww.INFO.Printf("RegisterListener(%s, %d, %s, func())", uid, msgType,
username)
// StartNetworkFollower kicks off the tracking of the network. It starts
// long running network client threads and returns an object for checking
// state and stopping those threads.
// Call this when returning from sleep and close when going back to
// sleep.
// Threads Started:
// - Network Follower (/network/follow.go)
// tracks the network events and hands them off to workers for handling
// - Historical Round Retrieval (/network/rounds/historical.go)
// Retrieves data about rounds which are too old to be stored by the client
// - Message Retrieval Worker Group (/network/rounds/retreive.go)
// Requests all messages in a given round from the gateway of the last node
// - Message Handling Worker Group (/network/message/reception.go)
// Decrypts and partitions messages when signals via the Switchboard
// - Health Tracker (/network/health)
func (c *Client) StartNetworkFollower() (stoppable.Stoppable, error) {
jww.INFO.Printf("StartNetworkFollower()")
multi := stoppable.NewMulti("client")
stopFollow, err := c.network.Follow()
if err != nil {
return nil, errors.WithMessage(err, "Failed to start following "+
"the network")
}
multi.Add(stopFollow)
// Key exchange
multi.Add(keyExchange.Start(c.switchboard, c.storage, c.network))
return multi, nil
}
// SendE2E sends an end-to-end payload to the provided recipient with
// the provided msgType. Returns the list of rounds in which parts of
// the message were sent or an error if it fails.
......@@ -231,6 +255,14 @@ func (c *Client) SendCMIX(payload []byte, recipient id.ID) (int, error) {
return 0, nil
}
// RegisterListener registers a listener callback function that is called
// every time a new message matches the specified parameters.
func (c *Client) RegisterListenerCb(uid id.ID, msgType int, username string,
listenerCb func(msg Message)) {
jww.INFO.Printf("RegisterListener(%s, %d, %s, func())", uid, msgType,
username)
}
// RegisterForNotifications allows a client to register for push
// notifications.
// Note that clients are not required to register for push notifications
......@@ -395,15 +427,6 @@ func (c *Client) RegisterAuthRequestCb(cb func(contact Contact,
jww.INFO.Printf("RegisterAuthRequestCb(...)")
}
// StartNetworkRunner kicks off the longrunning network client threads
// and returns an object for checking state and stopping those threads.
// Call this when returning from sleep and close when going back to
// sleep.
func (c *Client) StartNetworkRunner() stoppable.Stoppable {
jww.INFO.Printf("StartNetworkRunner()")
return nil
}
// RegisterRoundEventsCb registers a callback for round
// events.
func (c *Client) RegisterRoundEventsCb(
......
......@@ -15,7 +15,8 @@ type NetworkManager interface {
SendCMIX(message format.Message, p params.CMIX) (id.Round, error)
GetInstance() *network.Instance
GetHealthTracker() HealthTracker
GetStoppable() stoppable.Stoppable
Follow() (stoppable.Stoppable, error)
CheckGarbledMessages()
}
//for use in key exchange which needs to be callable inside of network
......
package network
package utility
import (
ds "gitlab.com/elixxir/comms/network/dataStructures"
"gitlab.com/elixxir/primitives/states"
)
// Function to track the results of events. It returns true if the collection of
// Function to follow the results of events. It returns true if the collection of
// events resolved well, and then a count of how many rounds failed and how
// many roundEvents timed out.
func TrackResults(resultsCh chan ds.EventReturn, numResults int) (bool, int, int) {
......
......@@ -15,7 +15,7 @@ const keyExchangeConfirmName = "KeyExchangeConfirm"
const keyExchangeMulti = "KeyExchange"
func Start(switchboard *switchboard.Switchboard, sess *storage.Session,
net interfaces.NetworkManager, garbledMessageTrigger chan<- struct{}) stoppable.Stoppable {
net interfaces.NetworkManager) stoppable.Stoppable {
// register the rekey trigger thread
triggerCh := make(chan message.Receive, 100)
......@@ -31,7 +31,7 @@ func Start(switchboard *switchboard.Switchboard, sess *storage.Session,
})
// start the trigger thread
go startTrigger(sess, net, triggerCh, triggerStop, garbledMessageTrigger)
go startTrigger(sess, net, triggerCh, triggerStop.Quit())
//register the rekey confirm thread
confirmCh := make(chan message.Receive, 100)
......
......@@ -13,7 +13,7 @@ import (
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params"
network2 "gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/interfaces/utility"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/e2e"
"gitlab.com/elixxir/comms/network"
......@@ -116,7 +116,7 @@ func negotiate(instance *network.Instance, sendE2E interfaces.SendE2E,
}
//Wait until the result tracking responds
success, numTimeOut, numRoundFail := network2.TrackResults(sendResults, len(rounds))
success, numTimeOut, numRoundFail := utility.TrackResults(sendResults, len(rounds))
// If a single partition of the Key Negotiation request does not
// transmit, the partner cannot read the result. Log the error and set
......
......@@ -8,8 +8,7 @@ import (
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/interfaces/utility"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/e2e"
ds "gitlab.com/elixxir/comms/network/dataStructures"
......@@ -23,14 +22,14 @@ const (
errUnknown = "unknown trigger from partner %s"
)
func startTrigger(sess *storage.Session, net interfaces.NetworkManager, c chan message.Receive,
stop *stoppable.Single, garbledMessageTrigger chan<- struct{}) {
func startTrigger(sess *storage.Session, net interfaces.NetworkManager,
c chan message.Receive, quitCh <-chan struct{}) {
for true {
select {
case <-stop.Quit():
case <-quitCh:
return
case request := <-c:
err := handleTrigger(sess, net, request, garbledMessageTrigger)
err := handleTrigger(sess, net, request)
if err != nil {
jww.ERROR.Printf("Failed to handle rekey trigger: %s",
err)
......@@ -39,8 +38,8 @@ func startTrigger(sess *storage.Session, net interfaces.NetworkManager, c chan m
}
}
func handleTrigger(sess *storage.Session, net interfaces.NetworkManager, request message.Receive,
garbledMessageTrigger chan<- struct{}) error {
func handleTrigger(sess *storage.Session, net interfaces.NetworkManager,
request message.Receive) error {
//ensure the message was encrypted properly
if request.Encryption != message.E2E {
errMsg := fmt.Sprintf(errBadTrigger, request.Sender)
......@@ -85,13 +84,9 @@ func handleTrigger(sess *storage.Session, net interfaces.NetworkManager, request
"create session %s for partner %s is a duplicate, request ignored",
session.GetID(), request.Sender)
} else {
//if the session is new, attempt to trigger garbled message processing
//if there is contention, skip
select {
case garbledMessageTrigger <- struct{}{}:
default:
jww.WARN.Println("Failed to trigger garbled messages")
}
// if the session is new, attempt to trigger garbled message processing
// automatically skips if there is contention
net.CheckGarbledMessages()
}
//Send the Confirmation Message
......@@ -131,7 +126,7 @@ func handleTrigger(sess *storage.Session, net interfaces.NetworkManager, request
}
//Wait until the result tracking responds
success, numTimeOut, numRoundFail := network.TrackResults(sendResults, len(rounds))
success, numTimeOut, numRoundFail := utility.TrackResults(sendResults, len(rounds))
// If a single partition of the Key Negotiation request does not
// transmit, the partner will not be able to read the confirmation. If
......
......@@ -6,16 +6,20 @@
package network
// updates.go tracks the network for:
// 1. Node addition and removal
// follow.go tracks the network for:
// 1. The status of the network and its accessibility
// 2. New/Active/Complete rounds and their contact gateways
// 3. Node addition and removal
// This information is tracked by polling a gateway for the network definition
// file (NDF). Once it detects an event it sends it off to the proper channel
// for a worker to update the client state (add/remove a node, check for
// messages at a gateway, etc). See:
// - nodes.go for add/remove node events
// - rounds.go for round event handling & processing
// - receive.go for message handling
// - /node/register.go for add/remove node events
// - /rounds/historical.go for old round retrieval
// - /rounds/retrieve.go for message retrieval
// - /message/reception.go decryption, partitioning, and signaling of messages
// - /health/tracker.go - tracks the state of the network through the network
// instance
import (
"gitlab.com/elixxir/client/network/gateway"
......@@ -29,14 +33,14 @@ import (
"time"
)
type trackNetworkComms interface {
type followNetworkComms interface {
GetHost(hostId *id.ID) (*connect.Host, bool)
SendPoll(host *connect.Host, message *pb.GatewayPoll) (*pb.GatewayPollResponse, error)
}
// TrackNetwork polls the network to get updated on the state of nodes, the
// followNetwork polls the network to get updated on the state of nodes, the
// round status, and informs the client when messages can be retrieved.
func (m *manager) trackNetwork(quitCh <-chan struct{}) {
func (m *manager) followNetwork(quitCh <-chan struct{}) {
ticker := time.NewTicker(m.param.TrackNetworkPeriod)
rng := m.Rng.GetStream()
......@@ -46,20 +50,22 @@ func (m *manager) trackNetwork(quitCh <-chan struct{}) {
rng.Close()
break
case <-ticker.C:
m.track(rng, m.Comms)
m.follow(rng, m.Comms)
}
}
}
func (m *manager) track(rng csprng.Source, comms trackNetworkComms) {
func (m *manager) follow(rng csprng.Source, comms followNetworkComms) {
//randomly select a gateway to poll
//TODO: make this more intelligent
gwHost, err := gateway.Get(m.Instance.GetPartialNdf().Get(), comms, rng)
if err != nil {
jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+
jww.FATAL.Panicf("Failed to follow network, NDF has corrupt "+
"data: %s", err)
}
// Poll for the new NDF
// Poll network updates
pollReq := pb.GatewayPoll{
Partial: &pb.NDFHash{
Hash: m.Instance.GetPartialNdf().GetHash(),
......@@ -72,10 +78,8 @@ func (m *manager) track(rng csprng.Source, comms trackNetworkComms) {
return
}
//handle updates
newNDF := pollResp.PartialNDF
// ---- Process Update Data ----
lastTrackedRound := id.Round(pollResp.LastTrackedRound)
roundUpdates := pollResp.Updates
gwRoundsState := &knownRounds.KnownRounds{}
err = gwRoundsState.Unmarshal(pollResp.KnownRounds)
if err != nil {
......@@ -83,18 +87,25 @@ func (m *manager) track(rng csprng.Source, comms trackNetworkComms) {
return
}
// ---- NODE EVENTS ----
// NOTE: this updates the structure AND sends events over the node
// update channels
err = m.Instance.UpdatePartialNdf(newNDF)
if err != nil {
jww.ERROR.Printf(err.Error())
return
// ---- Node Events ----
// NOTE: this updates the structure, AND sends events over the node
// update channels about new and removed nodes
if pollResp.PartialNDF != nil {
err = m.Instance.UpdatePartialNdf(pollResp.PartialNDF)
if err != nil {
jww.ERROR.Printf(err.Error())
return
}
}
err = m.Instance.RoundUpdates(roundUpdates)
if err != nil {
jww.ERROR.Printf(err.Error())
return
// NOTE: this updates rounds and updates the tracking of the health of the
// network
if pollResp.Updates != nil {
err = m.Instance.RoundUpdates(pollResp.Updates)
if err != nil {
jww.ERROR.Printf(err.Error())
return
}
}
// ---- Round Processing -----
......
File moved
......@@ -13,7 +13,6 @@ import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/keyExchange"
"gitlab.com/elixxir/client/network/health"
"gitlab.com/elixxir/client/network/internal"
"gitlab.com/elixxir/client/network/message"
......@@ -26,6 +25,7 @@ import (
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/primitives/ndf"
"sync/atomic"
"time"
)
......@@ -40,12 +40,12 @@ type manager struct {
//Shared data with all sub managers
internal.Internal
// runners are the Network goroutines that handle reception
runners *stoppable.Multi
//sub-managers
round *rounds.Manager
message *message.Manager
//atomic denotes if the network is running
running *uint32
}
// NewManager builds a new reception manager object using inputted key fields
......@@ -60,10 +60,12 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard,
" client network manager")
}
running := uint32(0)
//create manager object
m := manager{
param: params,
runners: stoppable.NewMulti("network.Manager"),
running: &running,
}
m.Internal = internal.Internal{
......@@ -85,43 +87,46 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard,
}
// StartRunners kicks off all network reception goroutines ("threads").
func (m *manager) StartRunners() error {
if m.runners.IsRunning() {
return errors.Errorf("network routines are already running")
func (m *manager) Follow() (stoppable.Stoppable, error) {
if !atomic.CompareAndSwapUint32(m.running, 0, 1) {
return nil, errors.Errorf("network routines are already running")
}
multi := stoppable.NewMulti("networkManager")
// health tracker
m.Health.Start()
m.runners.Add(m.Health)
multi.Add(m.Health)
// Node Updates
m.runners.Add(node.StartRegistration(m.Instance, m.Session, m.Rng,
multi.Add(node.StartRegistration(m.Instance, m.Session, m.Rng,
m.Comms, m.NodeRegistration)) // Adding/Keys
//TODO-remover
//m.runners.Add(StartNodeRemover(m.Context)) // Removing
// Start the Network Tracker
trackNetworkStopper := stoppable.NewSingle("TrackNetwork")
go m.trackNetwork(trackNetworkStopper.Quit())
m.runners.Add(trackNetworkStopper)
go m.followNetwork(trackNetworkStopper.Quit())
multi.Add(trackNetworkStopper)
// Message reception
m.runners.Add(m.message.StartProcessies())
multi.Add(m.message.StartProcessies())
// Round processing
m.runners.Add(m.round.StartProcessors())
multi.Add(m.round.StartProcessors())
// Key exchange
m.runners.Add(keyExchange.Start(m.Switchboard, m.Session, m,
m.message.GetTriggerGarbledCheckChannel()))
//set the running status back to 0 so it can be started again
closer := stoppable.NewCleanup(multi, func(time.Duration) error {
if !atomic.CompareAndSwapUint32(m.running, 1, 0) {
return errors.Errorf("network routines are already stopped")
}
return nil
})
return nil
return closer, nil
}
// StopRunners stops all the reception goroutines
func (m *manager) GetStoppable() stoppable.Stoppable {
return m.runners
}
// GetHealthTracker returns the health tracker
func (m *manager) GetHealthTracker() interfaces.HealthTracker {
......@@ -133,3 +138,10 @@ func (m *manager) GetInstance() *network.Instance {
return m.Instance
}
// triggers a check on garbled messages to see if they can be decrypted
// this should be done when a new e2e client is added in case messages were
// received early or arrived out of order
func (m *manager) CheckGarbledMessages() {
m.message.CheckGarbledMessages()
}
......@@ -4,7 +4,7 @@ import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/interfaces/utility"
ds "gitlab.com/elixxir/comms/network/dataStructures"
"gitlab.com/elixxir/primitives/states"
"time"
......@@ -46,7 +46,7 @@ func (m *Manager) criticalMessages() {
roundEvents.AddRoundEventChan(r, sendResults, 1*time.Minute,
states.COMPLETED, states.FAILED)
}
success, numTimeOut, numRoundFail := network.TrackResults(sendResults, len(rounds))
success, numTimeOut, numRoundFail := utility.TrackResults(sendResults, len(rounds))
if !success {
jww.ERROR.Printf("critical message send failed to transmit "+
"transmit %v/%v paritions: %v round failures, %v timeouts",
......
......@@ -42,8 +42,11 @@ func (m *Manager) GetMessageReceptionChannel() chan<- Bundle {
}
//Gets the channel to send received messages on
func (m *Manager) GetTriggerGarbledCheckChannel() chan<- struct{} {
return m.triggerGarbled
func (m *Manager) CheckGarbledMessages() {
select {
case m.triggerGarbled <- struct{}{}:
default:
}
}
//Starts all worker pool
......
File moved
......@@ -2,21 +2,23 @@ package storage
import (
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/client/vendor/github.com/pkg/errors"
"github.com/pkg/errors"
"time"
jww "github.com/spf13/jwalterweatherman"
)
const regCodeKey = "regCode"
const regCodeVersion = 0
// SetNDF stores a network definition json file
func (s *Session) SetRegCode(regCode string) error {
return s.Set(regCodeKey,
func (s *Session) SetRegCode(regCode string) {
err := s.Set(regCodeKey,
&versioned.Object{
Version: regCodeVersion,
Data: []byte(regCode),
Timestamp: time.Now(),
})
jww.FATAL.Printf("Failed to set the registration code: %s", err)
}
// Returns the stored network definition json file
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment