From aae122b67e61cf61f4a5e60d67d8870f23fb157e Mon Sep 17 00:00:00 2001 From: Benjamin Wenger <ben@elixxir.ioo> Date: Wed, 23 Sep 2020 15:56:51 -0700 Subject: [PATCH] improved comments and documentation, moved keyexchange, and exposed garbled message checking --- api/client.go | 65 +++++++++++++------ interfaces/networkManager.go | 3 +- .../utility}/trackResults.go | 4 +- keyExchange/exchange.go | 4 +- keyExchange/rekey.go | 4 +- keyExchange/trigger.go | 27 ++++---- network/{track.go => follow.go} | 63 ++++++++++-------- .../health/{healthTracker.go => tracker.go} | 0 ...{healthTracker_test.go => tracker_test.go} | 0 network/manager.go | 56 +++++++++------- network/message/critical.go | 4 +- network/message/manager.go | 7 +- network/rounds/{retreive.go => retrieve.go} | 0 storage/regCode.go | 8 ++- 14 files changed, 146 insertions(+), 99 deletions(-) rename {network => interfaces/utility}/trackResults.go (86%) rename network/{track.go => follow.go} (62%) rename network/health/{healthTracker.go => tracker.go} (100%) rename network/health/{healthTracker_test.go => tracker_test.go} (100%) rename network/rounds/{retreive.go => retrieve.go} (100%) diff --git a/api/client.go b/api/client.go index d2735cd86..346025c9b 100644 --- a/api/client.go +++ b/api/client.go @@ -11,6 +11,7 @@ import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/keyExchange" "gitlab.com/elixxir/client/network" "gitlab.com/elixxir/client/permissioning" "gitlab.com/elixxir/client/stoppable" @@ -50,13 +51,13 @@ type Client struct { // with the network. Note that this does not register a username/identity, but // merely creates a new cryptographic identity for adding such information // at a later date. -func NewClient(defJSON, storageDir string, password []byte) (*Client, error) { +func NewClient(ndfJSON, storageDir string, password []byte, registrationCode string) (*Client, error) { // Use fastRNG for RNG ops (AES fortuna based RNG using system RNG) rngStreamGen := fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG) rngStream := rngStreamGen.GetStream() // Parse the NDF - def, err := parseNDF(defJSON) + def, err := parseNDF(ndfJSON) if err != nil { return nil, err } @@ -76,6 +77,9 @@ func NewClient(defJSON, storageDir string, password []byte) (*Client, error) { // Save NDF to be used in the future storageSess.SetBaseNDF(def) + //store the registration code for later use + storageSess.SetRegCode(registrationCode) + //execute the rest of the loading as normal return loadClient(storageSess, rngStreamGen) } @@ -187,19 +191,39 @@ func loadClient(session *storage.Session, rngStreamGen *fastRNG.StreamGenerator) return c, nil } - - - // ----- Client Functions ----- - -// RegisterListener registers a listener callback function that is called -// every time a new message matches the specified parameters. -func (c *Client) RegisterListenerCb(uid id.ID, msgType int, username string, - listenerCb func(msg Message)) { - jww.INFO.Printf("RegisterListener(%s, %d, %s, func())", uid, msgType, - username) +// StartNetworkFollower kicks off the tracking of the network. It starts +// long running network client threads and returns an object for checking +// state and stopping those threads. +// Call this when returning from sleep and close when going back to +// sleep. +// Threads Started: +// - Network Follower (/network/follow.go) +// tracks the network events and hands them off to workers for handling +// - Historical Round Retrieval (/network/rounds/historical.go) +// Retrieves data about rounds which are too old to be stored by the client +// - Message Retrieval Worker Group (/network/rounds/retreive.go) +// Requests all messages in a given round from the gateway of the last node +// - Message Handling Worker Group (/network/message/reception.go) +// Decrypts and partitions messages when signals via the Switchboard +// - Health Tracker (/network/health) +func (c *Client) StartNetworkFollower() (stoppable.Stoppable, error) { + jww.INFO.Printf("StartNetworkFollower()") + multi := stoppable.NewMulti("client") + + stopFollow, err := c.network.Follow() + if err != nil { + return nil, errors.WithMessage(err, "Failed to start following "+ + "the network") + } + multi.Add(stopFollow) + // Key exchange + multi.Add(keyExchange.Start(c.switchboard, c.storage, c.network)) + return multi, nil } + + // SendE2E sends an end-to-end payload to the provided recipient with // the provided msgType. Returns the list of rounds in which parts of // the message were sent or an error if it fails. @@ -231,6 +255,14 @@ func (c *Client) SendCMIX(payload []byte, recipient id.ID) (int, error) { return 0, nil } +// RegisterListener registers a listener callback function that is called +// every time a new message matches the specified parameters. +func (c *Client) RegisterListenerCb(uid id.ID, msgType int, username string, + listenerCb func(msg Message)) { + jww.INFO.Printf("RegisterListener(%s, %d, %s, func())", uid, msgType, + username) +} + // RegisterForNotifications allows a client to register for push // notifications. // Note that clients are not required to register for push notifications @@ -395,15 +427,6 @@ func (c *Client) RegisterAuthRequestCb(cb func(contact Contact, jww.INFO.Printf("RegisterAuthRequestCb(...)") } -// StartNetworkRunner kicks off the longrunning network client threads -// and returns an object for checking state and stopping those threads. -// Call this when returning from sleep and close when going back to -// sleep. -func (c *Client) StartNetworkRunner() stoppable.Stoppable { - jww.INFO.Printf("StartNetworkRunner()") - return nil -} - // RegisterRoundEventsCb registers a callback for round // events. func (c *Client) RegisterRoundEventsCb( diff --git a/interfaces/networkManager.go b/interfaces/networkManager.go index d2414bd97..3c173aebf 100644 --- a/interfaces/networkManager.go +++ b/interfaces/networkManager.go @@ -15,7 +15,8 @@ type NetworkManager interface { SendCMIX(message format.Message, p params.CMIX) (id.Round, error) GetInstance() *network.Instance GetHealthTracker() HealthTracker - GetStoppable() stoppable.Stoppable + Follow() (stoppable.Stoppable, error) + CheckGarbledMessages() } //for use in key exchange which needs to be callable inside of network diff --git a/network/trackResults.go b/interfaces/utility/trackResults.go similarity index 86% rename from network/trackResults.go rename to interfaces/utility/trackResults.go index 9b0bd2416..5f1fc6453 100644 --- a/network/trackResults.go +++ b/interfaces/utility/trackResults.go @@ -1,11 +1,11 @@ -package network +package utility import ( ds "gitlab.com/elixxir/comms/network/dataStructures" "gitlab.com/elixxir/primitives/states" ) -// Function to track the results of events. It returns true if the collection of +// Function to follow the results of events. It returns true if the collection of // events resolved well, and then a count of how many rounds failed and how // many roundEvents timed out. func TrackResults(resultsCh chan ds.EventReturn, numResults int) (bool, int, int) { diff --git a/keyExchange/exchange.go b/keyExchange/exchange.go index 604d1b1d2..27ba638d6 100644 --- a/keyExchange/exchange.go +++ b/keyExchange/exchange.go @@ -15,7 +15,7 @@ const keyExchangeConfirmName = "KeyExchangeConfirm" const keyExchangeMulti = "KeyExchange" func Start(switchboard *switchboard.Switchboard, sess *storage.Session, - net interfaces.NetworkManager, garbledMessageTrigger chan<- struct{}) stoppable.Stoppable { + net interfaces.NetworkManager) stoppable.Stoppable { // register the rekey trigger thread triggerCh := make(chan message.Receive, 100) @@ -31,7 +31,7 @@ func Start(switchboard *switchboard.Switchboard, sess *storage.Session, }) // start the trigger thread - go startTrigger(sess, net, triggerCh, triggerStop, garbledMessageTrigger) + go startTrigger(sess, net, triggerCh, triggerStop.Quit()) //register the rekey confirm thread confirmCh := make(chan message.Receive, 100) diff --git a/keyExchange/rekey.go b/keyExchange/rekey.go index a0f3b4163..08c136ab1 100644 --- a/keyExchange/rekey.go +++ b/keyExchange/rekey.go @@ -13,7 +13,7 @@ import ( "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" - network2 "gitlab.com/elixxir/client/network" + "gitlab.com/elixxir/client/interfaces/utility" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage/e2e" "gitlab.com/elixxir/comms/network" @@ -116,7 +116,7 @@ func negotiate(instance *network.Instance, sendE2E interfaces.SendE2E, } //Wait until the result tracking responds - success, numTimeOut, numRoundFail := network2.TrackResults(sendResults, len(rounds)) + success, numTimeOut, numRoundFail := utility.TrackResults(sendResults, len(rounds)) // If a single partition of the Key Negotiation request does not // transmit, the partner cannot read the result. Log the error and set diff --git a/keyExchange/trigger.go b/keyExchange/trigger.go index 19d31d52d..be081c5f8 100644 --- a/keyExchange/trigger.go +++ b/keyExchange/trigger.go @@ -8,8 +8,7 @@ import ( "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" - "gitlab.com/elixxir/client/network" - "gitlab.com/elixxir/client/stoppable" + "gitlab.com/elixxir/client/interfaces/utility" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage/e2e" ds "gitlab.com/elixxir/comms/network/dataStructures" @@ -23,14 +22,14 @@ const ( errUnknown = "unknown trigger from partner %s" ) -func startTrigger(sess *storage.Session, net interfaces.NetworkManager, c chan message.Receive, - stop *stoppable.Single, garbledMessageTrigger chan<- struct{}) { +func startTrigger(sess *storage.Session, net interfaces.NetworkManager, + c chan message.Receive, quitCh <-chan struct{}) { for true { select { - case <-stop.Quit(): + case <-quitCh: return case request := <-c: - err := handleTrigger(sess, net, request, garbledMessageTrigger) + err := handleTrigger(sess, net, request) if err != nil { jww.ERROR.Printf("Failed to handle rekey trigger: %s", err) @@ -39,8 +38,8 @@ func startTrigger(sess *storage.Session, net interfaces.NetworkManager, c chan m } } -func handleTrigger(sess *storage.Session, net interfaces.NetworkManager, request message.Receive, - garbledMessageTrigger chan<- struct{}) error { +func handleTrigger(sess *storage.Session, net interfaces.NetworkManager, + request message.Receive) error { //ensure the message was encrypted properly if request.Encryption != message.E2E { errMsg := fmt.Sprintf(errBadTrigger, request.Sender) @@ -85,13 +84,9 @@ func handleTrigger(sess *storage.Session, net interfaces.NetworkManager, request "create session %s for partner %s is a duplicate, request ignored", session.GetID(), request.Sender) } else { - //if the session is new, attempt to trigger garbled message processing - //if there is contention, skip - select { - case garbledMessageTrigger <- struct{}{}: - default: - jww.WARN.Println("Failed to trigger garbled messages") - } + // if the session is new, attempt to trigger garbled message processing + // automatically skips if there is contention + net.CheckGarbledMessages() } //Send the Confirmation Message @@ -131,7 +126,7 @@ func handleTrigger(sess *storage.Session, net interfaces.NetworkManager, request } //Wait until the result tracking responds - success, numTimeOut, numRoundFail := network.TrackResults(sendResults, len(rounds)) + success, numTimeOut, numRoundFail := utility.TrackResults(sendResults, len(rounds)) // If a single partition of the Key Negotiation request does not // transmit, the partner will not be able to read the confirmation. If diff --git a/network/track.go b/network/follow.go similarity index 62% rename from network/track.go rename to network/follow.go index 6da65c4df..49f49b4e5 100644 --- a/network/track.go +++ b/network/follow.go @@ -6,16 +6,20 @@ package network -// updates.go tracks the network for: -// 1. Node addition and removal +// follow.go tracks the network for: +// 1. The status of the network and its accessibility // 2. New/Active/Complete rounds and their contact gateways +// 3. Node addition and removal // This information is tracked by polling a gateway for the network definition // file (NDF). Once it detects an event it sends it off to the proper channel // for a worker to update the client state (add/remove a node, check for // messages at a gateway, etc). See: -// - nodes.go for add/remove node events -// - rounds.go for round event handling & processing -// - receive.go for message handling +// - /node/register.go for add/remove node events +// - /rounds/historical.go for old round retrieval +// - /rounds/retrieve.go for message retrieval +// - /message/reception.go decryption, partitioning, and signaling of messages +// - /health/tracker.go - tracks the state of the network through the network +// instance import ( "gitlab.com/elixxir/client/network/gateway" @@ -29,14 +33,14 @@ import ( "time" ) -type trackNetworkComms interface { +type followNetworkComms interface { GetHost(hostId *id.ID) (*connect.Host, bool) SendPoll(host *connect.Host, message *pb.GatewayPoll) (*pb.GatewayPollResponse, error) } -// TrackNetwork polls the network to get updated on the state of nodes, the +// followNetwork polls the network to get updated on the state of nodes, the // round status, and informs the client when messages can be retrieved. -func (m *manager) trackNetwork(quitCh <-chan struct{}) { +func (m *manager) followNetwork(quitCh <-chan struct{}) { ticker := time.NewTicker(m.param.TrackNetworkPeriod) rng := m.Rng.GetStream() @@ -46,20 +50,22 @@ func (m *manager) trackNetwork(quitCh <-chan struct{}) { rng.Close() break case <-ticker.C: - m.track(rng, m.Comms) + m.follow(rng, m.Comms) } } } -func (m *manager) track(rng csprng.Source, comms trackNetworkComms) { +func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { + //randomly select a gateway to poll + //TODO: make this more intelligent gwHost, err := gateway.Get(m.Instance.GetPartialNdf().Get(), comms, rng) if err != nil { - jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+ + jww.FATAL.Panicf("Failed to follow network, NDF has corrupt "+ "data: %s", err) } - // Poll for the new NDF + // Poll network updates pollReq := pb.GatewayPoll{ Partial: &pb.NDFHash{ Hash: m.Instance.GetPartialNdf().GetHash(), @@ -72,10 +78,8 @@ func (m *manager) track(rng csprng.Source, comms trackNetworkComms) { return } - //handle updates - newNDF := pollResp.PartialNDF + // ---- Process Update Data ---- lastTrackedRound := id.Round(pollResp.LastTrackedRound) - roundUpdates := pollResp.Updates gwRoundsState := &knownRounds.KnownRounds{} err = gwRoundsState.Unmarshal(pollResp.KnownRounds) if err != nil { @@ -83,18 +87,25 @@ func (m *manager) track(rng csprng.Source, comms trackNetworkComms) { return } - // ---- NODE EVENTS ---- - // NOTE: this updates the structure AND sends events over the node - // update channels - err = m.Instance.UpdatePartialNdf(newNDF) - if err != nil { - jww.ERROR.Printf(err.Error()) - return + // ---- Node Events ---- + // NOTE: this updates the structure, AND sends events over the node + // update channels about new and removed nodes + if pollResp.PartialNDF != nil { + err = m.Instance.UpdatePartialNdf(pollResp.PartialNDF) + if err != nil { + jww.ERROR.Printf(err.Error()) + return + } } - err = m.Instance.RoundUpdates(roundUpdates) - if err != nil { - jww.ERROR.Printf(err.Error()) - return + + // NOTE: this updates rounds and updates the tracking of the health of the + // network + if pollResp.Updates != nil { + err = m.Instance.RoundUpdates(pollResp.Updates) + if err != nil { + jww.ERROR.Printf(err.Error()) + return + } } // ---- Round Processing ----- diff --git a/network/health/healthTracker.go b/network/health/tracker.go similarity index 100% rename from network/health/healthTracker.go rename to network/health/tracker.go diff --git a/network/health/healthTracker_test.go b/network/health/tracker_test.go similarity index 100% rename from network/health/healthTracker_test.go rename to network/health/tracker_test.go diff --git a/network/manager.go b/network/manager.go index 0231702d2..1482b1dba 100644 --- a/network/manager.go +++ b/network/manager.go @@ -13,7 +13,6 @@ import ( "github.com/pkg/errors" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/params" - "gitlab.com/elixxir/client/keyExchange" "gitlab.com/elixxir/client/network/health" "gitlab.com/elixxir/client/network/internal" "gitlab.com/elixxir/client/network/message" @@ -26,6 +25,7 @@ import ( "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/xx_network/primitives/ndf" + "sync/atomic" "time" ) @@ -40,12 +40,12 @@ type manager struct { //Shared data with all sub managers internal.Internal - // runners are the Network goroutines that handle reception - runners *stoppable.Multi - //sub-managers round *rounds.Manager message *message.Manager + + //atomic denotes if the network is running + running *uint32 } // NewManager builds a new reception manager object using inputted key fields @@ -60,10 +60,12 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, " client network manager") } + running := uint32(0) + //create manager object m := manager{ param: params, - runners: stoppable.NewMulti("network.Manager"), + running: &running, } m.Internal = internal.Internal{ @@ -85,43 +87,46 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, } // StartRunners kicks off all network reception goroutines ("threads"). -func (m *manager) StartRunners() error { - if m.runners.IsRunning() { - return errors.Errorf("network routines are already running") +func (m *manager) Follow() (stoppable.Stoppable, error) { + + if !atomic.CompareAndSwapUint32(m.running, 0, 1) { + return nil, errors.Errorf("network routines are already running") } + multi := stoppable.NewMulti("networkManager") + // health tracker m.Health.Start() - m.runners.Add(m.Health) + multi.Add(m.Health) // Node Updates - m.runners.Add(node.StartRegistration(m.Instance, m.Session, m.Rng, + multi.Add(node.StartRegistration(m.Instance, m.Session, m.Rng, m.Comms, m.NodeRegistration)) // Adding/Keys //TODO-remover //m.runners.Add(StartNodeRemover(m.Context)) // Removing // Start the Network Tracker trackNetworkStopper := stoppable.NewSingle("TrackNetwork") - go m.trackNetwork(trackNetworkStopper.Quit()) - m.runners.Add(trackNetworkStopper) + go m.followNetwork(trackNetworkStopper.Quit()) + multi.Add(trackNetworkStopper) // Message reception - m.runners.Add(m.message.StartProcessies()) + multi.Add(m.message.StartProcessies()) // Round processing - m.runners.Add(m.round.StartProcessors()) + multi.Add(m.round.StartProcessors()) - // Key exchange - m.runners.Add(keyExchange.Start(m.Switchboard, m.Session, m, - m.message.GetTriggerGarbledCheckChannel())) + //set the running status back to 0 so it can be started again + closer := stoppable.NewCleanup(multi, func(time.Duration) error { + if !atomic.CompareAndSwapUint32(m.running, 1, 0) { + return errors.Errorf("network routines are already stopped") + } + return nil + }) - return nil + return closer, nil } -// StopRunners stops all the reception goroutines -func (m *manager) GetStoppable() stoppable.Stoppable { - return m.runners -} // GetHealthTracker returns the health tracker func (m *manager) GetHealthTracker() interfaces.HealthTracker { @@ -133,3 +138,10 @@ func (m *manager) GetInstance() *network.Instance { return m.Instance } +// triggers a check on garbled messages to see if they can be decrypted +// this should be done when a new e2e client is added in case messages were +// received early or arrived out of order +func (m *manager) CheckGarbledMessages() { + m.message.CheckGarbledMessages() +} + diff --git a/network/message/critical.go b/network/message/critical.go index 42caa9421..164dcf52d 100644 --- a/network/message/critical.go +++ b/network/message/critical.go @@ -4,7 +4,7 @@ import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" - "gitlab.com/elixxir/client/network" + "gitlab.com/elixxir/client/interfaces/utility" ds "gitlab.com/elixxir/comms/network/dataStructures" "gitlab.com/elixxir/primitives/states" "time" @@ -46,7 +46,7 @@ func (m *Manager) criticalMessages() { roundEvents.AddRoundEventChan(r, sendResults, 1*time.Minute, states.COMPLETED, states.FAILED) } - success, numTimeOut, numRoundFail := network.TrackResults(sendResults, len(rounds)) + success, numTimeOut, numRoundFail := utility.TrackResults(sendResults, len(rounds)) if !success { jww.ERROR.Printf("critical message send failed to transmit "+ "transmit %v/%v paritions: %v round failures, %v timeouts", diff --git a/network/message/manager.go b/network/message/manager.go index 56a45a49f..3d95ef040 100644 --- a/network/message/manager.go +++ b/network/message/manager.go @@ -42,8 +42,11 @@ func (m *Manager) GetMessageReceptionChannel() chan<- Bundle { } //Gets the channel to send received messages on -func (m *Manager) GetTriggerGarbledCheckChannel() chan<- struct{} { - return m.triggerGarbled +func (m *Manager) CheckGarbledMessages() { + select { + case m.triggerGarbled <- struct{}{}: + default: + } } //Starts all worker pool diff --git a/network/rounds/retreive.go b/network/rounds/retrieve.go similarity index 100% rename from network/rounds/retreive.go rename to network/rounds/retrieve.go diff --git a/storage/regCode.go b/storage/regCode.go index 68e147f12..c2d7c2fad 100644 --- a/storage/regCode.go +++ b/storage/regCode.go @@ -2,21 +2,23 @@ package storage import ( "gitlab.com/elixxir/client/storage/versioned" - "gitlab.com/elixxir/client/vendor/github.com/pkg/errors" + "github.com/pkg/errors" "time" + jww "github.com/spf13/jwalterweatherman" ) const regCodeKey = "regCode" const regCodeVersion = 0 // SetNDF stores a network definition json file -func (s *Session) SetRegCode(regCode string) error { - return s.Set(regCodeKey, +func (s *Session) SetRegCode(regCode string) { + err := s.Set(regCodeKey, &versioned.Object{ Version: regCodeVersion, Data: []byte(regCode), Timestamp: time.Now(), }) + jww.FATAL.Printf("Failed to set the registration code: %s", err) } // Returns the stored network definition json file -- GitLab