From 677def42374d88f9278252802094070171473443 Mon Sep 17 00:00:00 2001 From: Jono Wenger <jono@elixxir.io> Date: Tue, 29 Mar 2022 15:28:51 -0700 Subject: [PATCH] More minor fixes to code formatting in network package --- network/gateway/hostPool.go | 165 +++++++++++------- network/gateway/hostpool_test.go | 12 +- network/gateway/sender.go | 19 +- network/gateway/sender_test.go | 12 +- network/gateway/storeHostList.go | 2 +- network/gateway/storeHostList_test.go | 2 +- network/gateway/utils_test.go | 23 +-- network/health/tracker.go | 24 +-- network/health/tracker_test.go | 2 +- network/historical/historical.go | 172 +++++++++---------- network/historical/historical_test.go | 107 ++++++------ network/historical/params.go | 20 ++- network/identity/tracker.go | 6 +- network/manager.go | 6 +- network/message/inProgress_test.go | 2 +- network/nodes/mixCypher.go | 27 +-- network/nodes/mixCypher_test.go | 145 ++++++++-------- network/nodes/register.go | 40 +++-- network/nodes/registrar.go | 48 +++--- network/nodes/store.go | 7 +- network/rounds/retrieve_test.go | 16 +- network/rounds/store/uncheckedRounds_test.go | 4 +- network/rounds/unchecked_test.go | 4 +- network/trackResults.go | 4 +- 24 files changed, 456 insertions(+), 413 deletions(-) diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index 2f7ade53b..d6e56a778 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -5,9 +5,10 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -// Package gateway Handles functionality related to providing Gateway connect.Host objects -// for message sending to the rest of the client repo -// Used to minimize # of open connections on mobile clients +// Package gateway Handles functionality related to providing Gateway +// connect.Host objects for message sending to the rest of the client +// repository. +// Used to minimize the number of open connections on mobile clients. package gateway @@ -50,24 +51,27 @@ var errorsList = []string{ // HostManager Interface allowing storage and retrieval of Host objects type HostManager interface { GetHost(hostId *id.ID) (*connect.Host, bool) - AddHost(hid *id.ID, address string, cert []byte, params connect.HostParams) (host *connect.Host, err error) + AddHost(hid *id.ID, address string, cert []byte, params connect.HostParams) ( + host *connect.Host, err error) RemoveHost(hid *id.ID) } // Filter filters out IDs from the provided map based on criteria in the NDF. -// The passed in map is a map of the NDF for easier access. The map is ID -> index in the NDF -// There is no multithreading, the filter function can either edit the passed map or make a new one -// and return it. The general pattern is to loop through the map, then look up data about the nodes -// in the ndf to make a filtering decision, then add them to a new map if they are accepted. +// The passed in map is a map of the NDF for easier access. The map is +// ID -> index in the NDF. There is no multithreading; the filter function can +// either edit the passed map or make a new one and return it. The general +// pattern is to loop through the map, then look up data about the nodes in the +// NDF to make a filtering decision, then add them to a new map if they are +// accepted. type Filter func(map[id.ID]int, *ndf.NetworkDefinition) map[id.ID]int // HostPool Handles providing hosts to the Client type HostPool struct { - hostMap map[id.ID]uint32 // map key to its index in the slice - hostList []*connect.Host // each index in the slice contains the value + hostMap map[id.ID]uint32 // Map key to its index in the slice + hostList []*connect.Host // Each index in the slice contains the value hostMux sync.RWMutex // Mutex for the above map/list combination - ndfMap map[id.ID]int // map gateway ID to its index in the ndf + ndfMap map[id.ID]int // Map gateway ID to its index in the NDF ndf *ndf.NetworkDefinition ndfMux sync.RWMutex @@ -82,17 +86,32 @@ type HostPool struct { filter Filter } -// PoolParams Allows configuration of HostPool parameters +// PoolParams allows configuration of HostPool parameters. type PoolParams struct { - MaxPoolSize uint32 // Maximum number of Hosts in the HostPool - PoolSize uint32 // Allows override of HostPool size. Set to zero for dynamic size calculation - ProxyAttempts uint32 // How many proxies will be used in event of send failure - MaxPings uint32 // How many gateways to concurrently test when initializing HostPool. Disabled if zero. - ForceConnection bool // Flag determining whether Host connections are initialized when added to HostPool - HostParams connect.HostParams // Parameters for the creation of new Host objects + // MaxPoolSize is the maximum number of Hosts in the HostPool. + MaxPoolSize uint32 + + // PoolSize allows override of HostPool size. Set to zero for dynamic size + // calculation. + PoolSize uint32 + + // ProxyAttempts dictates how many proxies will be used in event of send + // failure. + ProxyAttempts uint32 + + // MaxPings is the number of gateways to concurrently test when initializing + // HostPool. Disabled if zero. + MaxPings uint32 + + // ForceConnection determines whether host connections are initialized when + // added to HostPool. + ForceConnection bool + + // HostParams is the parameters for the creation of new Host objects. + HostParams connect.HostParams } -// DefaultPoolParams Returns a default set of PoolParams +// DefaultPoolParams returns a default set of PoolParams. func DefaultPoolParams() PoolParams { p := PoolParams{ MaxPoolSize: 30, @@ -113,7 +132,7 @@ func DefaultPoolParams() PoolParams { return p } -// Build and return new HostPool object +// newHostPool builds and returns a new HostPool object. func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, netDef *ndf.NetworkDefinition, getter HostManager, storage storage.Session, addGateway chan<- network.NodeGateway) (*HostPool, error) { @@ -121,8 +140,8 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, // Determine size of HostPool if poolParams.PoolSize == 0 { - poolParams.PoolSize, err = getPoolSize(uint32(len(netDef.Gateways)), - poolParams.MaxPoolSize) + poolParams.PoolSize, err = getPoolSize( + uint32(len(netDef.Gateways)), poolParams.MaxPoolSize) if err != nil { return nil, err } @@ -150,14 +169,14 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, return nil, err } - // get the last used list of hosts and use it to seed the host pool list + // Get the last used list of hosts and use it to seed the host pool list hostList, err := getHostList(result.kv) numHostsAdded := 0 if err == nil { for _, hid := range hostList { - err := result.replaceHostNoStore(hid, uint32(numHostsAdded)) + err = result.replaceHostNoStore(hid, uint32(numHostsAdded)) if err != nil { - jww.WARN.Printf("Unable to add stored host %s: %s", hid, err.Error()) + jww.WARN.Printf("Unable to add stored host %s: %s", hid, err) } else { numHostsAdded++ if numHostsAdded >= len(result.hostList) { @@ -166,7 +185,8 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, } } } else { - jww.WARN.Printf("Building new HostPool because no HostList stored: %+v", err) + jww.WARN.Printf( + "Building new HostPool because no HostList stored: %+v", err) } // Build the initial HostPool and return @@ -177,22 +197,23 @@ func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, return nil, err } } else { - // Else, select random Hosts + // Otherwise, select random Hosts for i := numHostsAdded; i < len(result.hostList); i++ { - err := result.replaceHost(result.selectGateway(), uint32(i)) + err = result.replaceHost(result.selectGateway(), uint32(i)) if err != nil { return nil, err } } } - jww.INFO.Printf("Initialized HostPool with size: %d/%d", poolParams.PoolSize, len(netDef.Gateways)) + jww.INFO.Printf("Initialized HostPool with size: %d/%d", + poolParams.PoolSize, len(netDef.Gateways)) return result, nil } -// Initialize the HostPool with a performant set of Hosts +// initialize initializes the HostPool with a performant set of Hosts. func (h *HostPool) initialize(startIdx uint32) error { - // If HostPool is full, don't need to initialize + // If HostPool is full, there is no need to initialize if startIdx == h.poolParams.PoolSize { return nil } @@ -213,7 +234,8 @@ func (h *HostPool) initialize(startIdx uint32) error { _, err := stream.Read(rndBytes[:]) stream.Close() if err != nil { - return errors.Errorf("Failed to randomize shuffle for HostPool initialization: %+v", err) + return errors.Errorf( + "Failed to randomize shuffle for HostPool initialization: %+v", err) } shuffle.ShuffleSwap(rndBytes[:], len(randomGateways), func(i, j int) { randomGateways[i], randomGateways[j] = randomGateways[j], randomGateways[i] @@ -248,6 +270,7 @@ func (h *HostPool) initialize(startIdx uint32) error { if err != nil { jww.WARN.Printf("ID for gateway %s could not be retrieved", gwId) } + // Skip if already in HostPool if _, ok := h.hostMap[*gwId]; ok { // Try another Host instead @@ -259,8 +282,8 @@ func (h *HostPool) initialize(startIdx uint32) error { // Obtain that GwId's Host object newHost, ok := h.manager.GetHost(gwId) if !ok { - jww.WARN.Printf("Host for gateway %s could not be "+ - "retrieved", gwId) + jww.WARN.Printf( + "Host for gateway %s could not be retrieved", gwId) return } @@ -291,6 +314,7 @@ func (h *HostPool) initialize(startIdx uint32) error { timer.Stop() break innerLoop } + case <-timer.C: jww.INFO.Printf("HostPool initialization timed out after %s.", pingTimeout) @@ -320,13 +344,14 @@ func (h *HostPool) initialize(startIdx uint32) error { // Ran out of Hosts to try if startIdx < h.poolParams.PoolSize { - return errors.Errorf("Unable to initialize enough viable hosts for HostPool") + return errors.Errorf( + "Unable to initialize enough viable hosts for HostPool") } return nil } -// UpdateNdf Mutates internal ndf to the given ndf +// UpdateNdf mutates internal NDF to the given NDF func (h *HostPool) UpdateNdf(ndf *ndf.NetworkDefinition) { if len(ndf.Gateways) == 0 { jww.WARN.Printf("Unable to UpdateNdf: no gateways available") @@ -345,7 +370,7 @@ func (h *HostPool) UpdateNdf(ndf *ndf.NetworkDefinition) { h.ndfMux.Unlock() } -// SetPoolFilter sets the filter used to filter gateways from the ID map. +// SetGatewayFilter sets the filter used to filter gateways from the ID map. func (h *HostPool) SetGatewayFilter(f Filter) { h.filterMux.Lock() defer h.filterMux.Unlock() @@ -353,7 +378,7 @@ func (h *HostPool) SetGatewayFilter(f Filter) { h.filter = f } -// GetHostParams returns a copy of the host parameters struct +// GetHostParams returns a copy of the connect.HostParams struct. func (h *HostPool) GetHostParams() connect.HostParams { hp := h.poolParams.HostParams hpCopy := connect.HostParams{ @@ -381,13 +406,15 @@ func (h *HostPool) getFilter() Filter { return h.filter } -// Obtain a random, unique list of Hosts of the given length from the HostPool +// getAny obtains a random and unique list of hosts of the given length from the +// HostPool. func (h *HostPool) getAny(length uint32, excluded []*id.ID) []*connect.Host { if length > h.poolParams.PoolSize { length = h.poolParams.PoolSize } - checked := make(map[uint32]interface{}) // Keep track of Hosts already selected to avoid duplicates + // Keep track of Hosts already selected to avoid duplicates + checked := make(map[uint32]interface{}) if excluded != nil { // Add excluded Hosts to already-checked list for i := range excluded { @@ -415,21 +442,24 @@ func (h *HostPool) getAny(length uint32, excluded []*id.ID) []*connect.Host { i++ } } + h.hostMux.RUnlock() rng.Close() return result } -// Obtain a specific connect.Host from the manager, irrespective of the HostPool +// getSpecific obtains a specific connect.Host from the manager, irrespective of +// the HostPool. func (h *HostPool) getSpecific(target *id.ID) (*connect.Host, bool) { return h.manager.GetHost(target) } -// Try to obtain the given targets from the HostPool -// If each is not present, obtain a random replacement from the HostPool +// getPreferred tries to obtain the given targets from the HostPool. If each is +// not present, then obtains a random replacement from the HostPool. func (h *HostPool) getPreferred(targets []*id.ID) []*connect.Host { - checked := make(map[uint32]interface{}) // Keep track of Hosts already selected to avoid duplicates + // Keep track of Hosts already selected to avoid duplicates + checked := make(map[uint32]interface{}) length := len(targets) if length > int(h.poolParams.PoolSize) { length = int(h.poolParams.PoolSize) @@ -459,11 +489,12 @@ func (h *HostPool) getPreferred(targets []*id.ID) []*connect.Host { return result } -// Replaces the given hostId in the HostPool if the given hostErr is in errorList -// Returns whether the host was replaced +// checkReplace replaces the given hostId in the HostPool if the given hostErr +// is in errorList. Returns whether the host was replaced. func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) (bool, error) { var err error - // Check if Host should be replaced + + // Check if host should be replaced doReplace := false if hostErr != nil { for _, errString := range errorsList { @@ -486,10 +517,11 @@ func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) (bool, error) { } h.hostMux.Unlock() } + return doReplace && err == nil, err } -// Select a viable HostPool candidate from the NDF +// selectGateway selects a viable HostPool candidate from the NDF. func (h *HostPool) selectGateway() *id.ID { rng := h.rng.GetStream() defer rng.Close() @@ -544,13 +576,12 @@ func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error { } // replaceHostNoStore replaces the given slot in the HostPool with a new Gateway -// with the specified ID without saving changes to storage +// with the specified ID without saving changes to storage. func (h *HostPool) replaceHostNoStore(newId *id.ID, oldPoolIndex uint32) error { // Obtain that GwId's Host object newHost, ok := h.manager.GetHost(newId) if !ok { - return errors.Errorf("host for gateway %s could not be "+ - "retrieved", newId) + return errors.Errorf("host for gateway %s could not be retrieved", newId) } // Keep track of oldHost for cleanup @@ -559,6 +590,7 @@ func (h *HostPool) replaceHostNoStore(newId *id.ID, oldPoolIndex uint32) error { // Use the poolIdx to overwrite the random Host in the corresponding index // in the hostList h.hostList[oldPoolIndex] = newHost + // Use the GwId to keep track of the new random Host's index in the hostList h.hostMap[*newId] = oldPoolIndex @@ -586,14 +618,14 @@ func (h *HostPool) replaceHostNoStore(newId *id.ID, oldPoolIndex uint32) error { return nil } -// Force-add the Gateways to the HostPool, each replacing a random Gateway +// forceAdd adds the Gateways to the HostPool, each replacing a random Gateway. func (h *HostPool) forceAdd(gwId *id.ID) error { rng := h.rng.GetStream() h.hostMux.Lock() defer h.hostMux.Unlock() defer rng.Close() - // Verify the GwId is not already in the hostMap + // Verify the gateway ID is not already in the hostMap if _, ok := h.hostMap[*gwId]; ok { // If it is, skip return nil @@ -604,7 +636,7 @@ func (h *HostPool) forceAdd(gwId *id.ID) error { return h.replaceHost(gwId, poolIdx) } -// Updates the internal HostPool with any changes to the NDF +// updateConns updates the internal HostPool with any changes to the NDF. func (h *HostPool) updateConns() error { // Prepare NDFs for comparison newMap, err := convertNdfToMap(h.ndf) @@ -639,14 +671,14 @@ func (h *HostPool) updateConns() error { return nil } -// Takes ndf.Gateways and puts their IDs into a map object +// convertNdfToMap takes ndf.Gateways and puts their IDs into a map object. func convertNdfToMap(ndf *ndf.NetworkDefinition) (map[id.ID]int, error) { result := make(map[id.ID]int) if ndf == nil { return result, nil } - // Process Node and Gateway Ids into set + // Process node and gateway IDs into set // NOTE: We expect len(ndf.Gateways) == len(ndf.Nodes) for i := range ndf.Gateways { gw := ndf.Gateways[i] @@ -667,19 +699,19 @@ func convertNdfToMap(ndf *ndf.NetworkDefinition) (map[id.ID]int, error) { return result, nil } -// updateConns helper for removing old Gateways +// removeGateway removes old gateways. func (h *HostPool) removeGateway(gwId *id.ID) { h.manager.RemoveHost(gwId) // If needed, replace the removed Gateway in the HostPool with a new one if poolIndex, ok := h.hostMap[*gwId]; ok { err := h.replaceHost(h.selectGateway(), poolIndex) if err != nil { - jww.ERROR.Printf("Unable to removeGateway: %+v", err) + jww.ERROR.Printf("Unable to remove gateway: %+v", err) } } } -// updateConns helper for adding new Gateways +// addGateway adds a new Gateway. func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { gw := h.ndf.Gateways[ndfIndex] @@ -689,11 +721,12 @@ func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { // Check if gateway ID collides with an existing hard coded ID if id.CollidesWithHardCodedID(gwId) { jww.ERROR.Printf("Gateway ID invalid, collides with a "+ - "hard coded ID. Invalid ID: %v", gwId.Marshal()) + "hard coded ID. Invalid ID: %s", gwId) } // Add the new gateway host - _, err := h.manager.AddHost(gwId, gw.Address, []byte(gw.TlsCertificate), h.poolParams.HostParams) + _, err := h.manager.AddHost( + gwId, gw.Address, []byte(gw.TlsCertificate), h.poolParams.HostParams) if err != nil { jww.ERROR.Printf("Could not add gateway host %s: %+v", gwId, err) } @@ -717,14 +750,14 @@ func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { } } -// getPoolSize determines the size of the HostPool based on the size of the NDF +// getPoolSize determines the size of the HostPool based on the size of the NDF. func getPoolSize(ndfLen, maxSize uint32) (uint32, error) { // Verify the NDF has at least one Gateway for the HostPool if ndfLen == 0 { - return 0, errors.Errorf("Unable to create HostPool: no gateways available") + return 0, errors.Errorf( + "Unable to create HostPool: no gateways available") } - // PoolSize = ceil(sqrt(len(ndf,Gateways))) poolSize := uint32(math.Ceil(math.Sqrt(float64(ndfLen)))) if poolSize > maxSize { return maxSize, nil @@ -732,7 +765,7 @@ func getPoolSize(ndfLen, maxSize uint32) (uint32, error) { return poolSize, nil } -// readUint32 reads an integer from an io.Reader (which should be a CSPRNG) +// readUint32 reads an integer from an io.Reader (which should be a CSPRNG). func readUint32(rng io.Reader) uint32 { var rndBytes [4]byte i, err := rng.Read(rndBytes[:]) @@ -742,7 +775,7 @@ func readUint32(rng io.Reader) uint32 { return binary.BigEndian.Uint32(rndBytes[:]) } -// readRangeUint32 reduces an integer from 0, MaxUint32 to the range start, end +// readRangeUint32 reduces an integer from 0, MaxUint32 to the range start, end. func readRangeUint32(start, end uint32, rng io.Reader) uint32 { size := end - start // Note that we could just do the part inside the () here, but then extra diff --git a/network/gateway/hostpool_test.go b/network/gateway/hostpool_test.go index 91885a6cb..d1c1f8db0 100644 --- a/network/gateway/hostpool_test.go +++ b/network/gateway/hostpool_test.go @@ -222,12 +222,11 @@ func TestHostPool_ReplaceHost(t *testing.T) { "\n\tReceived: %d", newIndex, retrievedIndex) } - // Check that the state of the list list been correctly updated + // Check that the state of the list been correctly updated retrievedHost := hostPool.hostList[newIndex] if !gwIdOne.Cmp(retrievedHost.GetId()) { - t.Errorf("Id pulled from list is not expected."+ - "\n\tExpected: %s"+ - "\n\tReceived: %s", gwIdOne, retrievedHost.GetId()) + t.Errorf("ID pulled from list is not expected."+ + "\nexpected: %s\nreceived: %s", gwIdOne, retrievedHost.GetId()) } /* Replace the initial host with a new host */ @@ -253,9 +252,8 @@ func TestHostPool_ReplaceHost(t *testing.T) { // Check that the state of the list been correctly updated for new host retrievedHost = hostPool.hostList[newIndex] if !gwIdTwo.Cmp(retrievedHost.GetId()) { - t.Errorf("Id pulled from list is not expected."+ - "\n\tExpected: %s"+ - "\n\tReceived: %s", gwIdTwo, retrievedHost.GetId()) + t.Errorf("ID pulled from list is not expected."+ + "\nexpected: %s\nreceived: %s", gwIdTwo, retrievedHost.GetId()) } // Check the state of the map has been correctly removed for the old gateway diff --git a/network/gateway/sender.go b/network/gateway/sender.go index aa0ad9bae..3725aa873 100644 --- a/network/gateway/sender.go +++ b/network/gateway/sender.go @@ -25,7 +25,8 @@ import ( // Sender Object used for sending that wraps the HostPool for providing destinations type Sender interface { - SendToAny(sendFunc func(host *connect.Host) (interface{}, error), stop *stoppable.Single) (interface{}, error) + SendToAny(sendFunc func(host *connect.Host) (interface{}, error), + stop *stoppable.Single) (interface{}, error) SendToPreferred(targets []*id.ID, sendFunc SendToPreferredFunc, stop *stoppable.Single, timeout time.Duration) (interface{}, error) UpdateNdf(ndf *ndf.NetworkDefinition) @@ -40,18 +41,22 @@ type sender struct { const RetryableError = "Nonfatal error occurred, please retry" // NewSender Create a new Sender object wrapping a HostPool object -func NewSender(poolParams PoolParams, rng *fastRNG.StreamGenerator, ndf *ndf.NetworkDefinition, getter HostManager, - storage storage.Session, addGateway chan network.NodeGateway) (Sender, error) { +func NewSender(poolParams PoolParams, rng *fastRNG.StreamGenerator, + ndf *ndf.NetworkDefinition, getter HostManager, storage storage.Session, + addGateway chan network.NodeGateway) (Sender, error) { - hostPool, err := newHostPool(poolParams, rng, ndf, getter, storage, addGateway) + hostPool, err := newHostPool( + poolParams, rng, ndf, getter, storage, addGateway) if err != nil { return nil, err } return &sender{hostPool}, nil } -// SendToAny Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations -func (s *sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error), stop *stoppable.Single) (interface{}, error) { +// SendToAny call given sendFunc to any Host in the HostPool, attempting with up +// to numProxies destinations. +func (s *sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error), + stop *stoppable.Single) (interface{}, error) { proxies := s.getAny(s.poolParams.ProxyAttempts, nil) for proxy := range proxies { @@ -106,7 +111,7 @@ func (s *sender) SendToPreferred(targets []*id.ID, sendFunc SendToPreferredFunc, // Return an error if the timeout duration is reached if netTime.Since(startTime) > timeout { return nil, errors.Errorf( - "sending to targets in HostPool timed out after %s", timeout) + "sending to target in HostPool timed out after %s", timeout) } remainingTimeout := timeout - netTime.Since(startTime) diff --git a/network/gateway/sender_test.go b/network/gateway/sender_test.go index 3e8bdc21e..5b0a96d64 100644 --- a/network/gateway/sender_test.go +++ b/network/gateway/sender_test.go @@ -80,7 +80,7 @@ func TestSender_SendToAny(t *testing.T) { } // Test sendToAny with test interfaces - result, err := s.SendToAny(SendToAny_HappyPath, nil) + result, err := s.SendToAny(SendToAnyHappyPath, nil) if err != nil { t.Errorf("Should not error in SendToAny happy path: %v", err) } @@ -91,12 +91,12 @@ func TestSender_SendToAny(t *testing.T) { "\n\tReceived: %v", happyPathReturn, result) } - _, err = s.SendToAny(SendToAny_KnownError, nil) + _, err = s.SendToAny(SendToAnyKnownError, nil) if err == nil { t.Fatalf("Expected error path did not receive error") } - _, err = s.SendToAny(SendToAny_UnknownError, nil) + _, err = s.SendToAny(SendToAnyUnknownError, nil) if err == nil { t.Fatalf("Expected error path did not receive error") } @@ -143,7 +143,7 @@ func TestSender_SendToPreferred(t *testing.T) { // Happy path result, err := s.SendToPreferred([]*id.ID{preferredHost.GetId()}, - SendToPreferred_HappyPath, nil, 250*time.Millisecond) + SendToPreferredHappyPath, nil, 250*time.Millisecond) if err != nil { t.Errorf("Should not error in SendToPreferred happy path: %v", err) } @@ -156,7 +156,7 @@ func TestSender_SendToPreferred(t *testing.T) { // Call a send which returns an error which triggers replacement _, err = s.SendToPreferred([]*id.ID{preferredHost.GetId()}, - SendToPreferred_KnownError, nil, 250*time.Millisecond) + SendToPreferredKnownError, nil, 250*time.Millisecond) if err == nil { t.Fatalf("Expected error path did not receive error") } @@ -177,7 +177,7 @@ func TestSender_SendToPreferred(t *testing.T) { // Unknown error return will not trigger replacement _, err = s.SendToPreferred([]*id.ID{preferredHost.GetId()}, - SendToPreferred_UnknownError, nil, 250*time.Millisecond) + SendToPreferredUnknownError, nil, 250*time.Millisecond) if err == nil { t.Fatalf("Expected error path did not receive error") } diff --git a/network/gateway/storeHostList.go b/network/gateway/storeHostList.go index bb52690b8..9d635005e 100644 --- a/network/gateway/storeHostList.go +++ b/network/gateway/storeHostList.go @@ -46,7 +46,7 @@ func saveHostList(kv *versioned.KV, list []*id.ID) error { return kv.Set(hostListKey, hostListVersion, obj) } -// Get returns the host list from storage. +// getHostList returns the host list from storage. func getHostList(kv *versioned.KV) ([]*id.ID, error) { obj, err := kv.Get(hostListKey, hostListVersion) if err != nil { diff --git a/network/gateway/storeHostList_test.go b/network/gateway/storeHostList_test.go index e8ac8088d..3e0a90522 100644 --- a/network/gateway/storeHostList_test.go +++ b/network/gateway/storeHostList_test.go @@ -93,7 +93,7 @@ func Test_marshalHostList_unmarshalHostList(t *testing.T) { // Marshal list data := marshalHostList(list) - // Unmarshal marsalled data into new object + // Unmarshal marshalled data into new object newList, err := unmarshalHostList(data) if err != nil { t.Errorf("unmarshalHostList produced an error: %+v", err) diff --git a/network/gateway/utils_test.go b/network/gateway/utils_test.go index 7bd89f123..f1b47786a 100644 --- a/network/gateway/utils_test.go +++ b/network/gateway/utils_test.go @@ -16,12 +16,13 @@ import ( "time" ) -// Mock structure adhering to HostManager to be used for happy path +// mockManager is a mock structure adhering to HostManager to be used for happy +// path. type mockManager struct { hosts map[string]*connect.Host } -// Constructor for mockManager +// newMockManager is the constructor for mockManager. func newMockManager() *mockManager { return &mockManager{ hosts: make(map[string]*connect.Host), @@ -143,34 +144,34 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition { const happyPathReturn = "happyPathReturn" -func SendToPreferred_HappyPath(*connect.Host, *id.ID, time.Duration) (interface{}, error) { +func SendToPreferredHappyPath(*connect.Host, *id.ID, time.Duration) (interface{}, error) { return happyPathReturn, nil } -func SendToPreferred_KnownError(*connect.Host, *id.ID, time.Duration) (interface{}, error) { +func SendToPreferredKnownError(*connect.Host, *id.ID, time.Duration) (interface{}, error) { return nil, errors.Errorf(errorsList[0]) } -func SendToPreferred_UnknownError(*connect.Host, *id.ID, time.Duration) (interface{}, error) { +func SendToPreferredUnknownError(*connect.Host, *id.ID, time.Duration) (interface{}, error) { return nil, errors.Errorf("Unexpected error: Oopsie") } -func SendToAny_HappyPath(host *connect.Host) (interface{}, error) { +func SendToAnyHappyPath(*connect.Host) (interface{}, error) { return happyPathReturn, nil } -func SendToAny_KnownError(host *connect.Host) (interface{}, error) { +func SendToAnyKnownError(*connect.Host) (interface{}, error) { return nil, fmt.Errorf(errorsList[0]) } -func SendToAny_UnknownError(host *connect.Host) (interface{}, error) { - return nil, fmt.Errorf("Unexpected error: Oopsie") +func SendToAnyUnknownError(*connect.Host) (interface{}, error) { + return nil, errors.Errorf("Unexpected error: Oopsie") } -func SendToSpecific_HappyPath(host *connect.Host, target *id.ID) (interface{}, bool, error) { +func SendToSpecificHappyPath(_ *connect.Host, _ *id.ID) (interface{}, bool, error) { return happyPathReturn, false, nil } -func SendToSpecific_Abort(host *connect.Host, target *id.ID) (interface{}, bool, error) { +func SendToSpecificAbort(_ *connect.Host, _ *id.ID) (interface{}, bool, error) { return nil, true, fmt.Errorf(errorsList[0]) } diff --git a/network/health/tracker.go b/network/health/tracker.go index 5873e9d64..20566459b 100644 --- a/network/health/tracker.go +++ b/network/health/tracker.go @@ -24,7 +24,7 @@ type Monitor interface { RemoveHealthCallback(uint64) IsHealthy() bool WasHealthy() bool - StartProcessies() (stoppable.Stoppable, error) + StartProcesses() (stoppable.Stoppable, error) } type tracker struct { @@ -67,8 +67,8 @@ func newTracker(timeout time.Duration) *tracker { } } -// AddFunc adds a function to the list of tracker functions such that each -// function can be run after network changes. Returns a unique ID for the +// AddHealthCallback adds a function to the list of tracker functions such that +// each function can be run after network changes. Returns a unique ID for the // function. func (t *tracker) AddHealthCallback(f func(isHealthy bool)) uint64 { var currentID uint64 @@ -84,8 +84,8 @@ func (t *tracker) AddHealthCallback(f func(isHealthy bool)) uint64 { return currentID } -// RemoveFunc removes the function with the given ID from the list of tracker -// functions so that it will not longer be run. +// RemoveHealthCallback removes the function with the given ID from the list of +// tracker functions so that it will no longer be run. func (t *tracker) RemoveHealthCallback(chanID uint64) { t.mux.Lock() delete(t.funcs, chanID) @@ -119,12 +119,12 @@ func (t *tracker) setHealth(h bool) { t.transmit(h) } -func (t *tracker) StartProcessies() (stoppable.Stoppable, error) { +func (t *tracker) StartProcesses() (stoppable.Stoppable, error) { t.mux.Lock() if t.running { t.mux.Unlock() - return nil, errors.New("cannot start health tracker threads, " + - "they are already running") + return nil, errors.New( + "cannot start health tracker threads, they are already running") } t.running = true @@ -155,15 +155,15 @@ func (t *tracker) start(stop *stoppable.Single) { return case heartbeat = <-t.heartbeat: - // FIXME: There's no transition to unhealthy here - // and there needs to be after some number of bad - // polls + // FIXME: There's no transition to unhealthy here and there needs to + // be after some number of bad polls if healthy(heartbeat) { t.setHealth(true) } case <-time.After(t.timeout): if !t.isHealthy { - jww.WARN.Printf("Network health tracker timed out, network is no longer healthy...") + jww.WARN.Printf("Network health tracker timed out, network " + + "is no longer healthy...") } t.setHealth(false) } diff --git a/network/health/tracker_test.go b/network/health/tracker_test.go index 925eaeca1..7d3684b9a 100644 --- a/network/health/tracker_test.go +++ b/network/health/tracker_test.go @@ -46,7 +46,7 @@ func TestNewTracker(t *testing.T) { }() // Begin the health tracker - _, err := trkr.StartProcessies() + _, err := trkr.StartProcesses() if err != nil { t.Fatalf("Unable to start tracker: %+v", err) } diff --git a/network/historical/historical.go b/network/historical/historical.go index 884b33778..37f4fdc5e 100644 --- a/network/historical/historical.go +++ b/network/historical/historical.go @@ -21,20 +21,20 @@ import ( "gitlab.com/xx_network/primitives/id" ) -// Historical Rounds looks up the round history via random gateways. -// It batches these quests but never waits longer than -// params.HistoricalRoundsPeriod to do a lookup. +// Historical Rounds looks up the round history via random gateways. It batches +// these quests but never waits longer than params.HistoricalRoundsPeriod to d +// a lookup. // Historical rounds receives input from: // - Network Follower (/network/follow.go) // Historical Rounds sends the output to: // - Message Retrieval Workers (/network/round/retrieve.go) type Retriever interface { - StartProcessies() *stoppable.Single + StartProcesses() *stoppable.Single LookupHistoricalRound(rid id.Round, callback RoundResultCallback) error } -// manager is the controlling structure +// manager is the controlling structure. type manager struct { params Params @@ -45,25 +45,25 @@ type manager struct { c chan roundRequest } -//RoundsComms interface to increase east of testing of historical rounds +// RoundsComms interface to increase east of testing of historical rounds. type RoundsComms interface { GetHost(hostId *id.ID) (*connect.Host, bool) - RequestHistoricalRounds(host *connect.Host, - message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) + RequestHistoricalRounds(host *connect.Host, message *pb.HistoricalRounds) ( + *pb.HistoricalRoundsResponse, error) } -//RoundResultCallback is the used callback when a round is found +// RoundResultCallback is the used callback when a round is found. type RoundResultCallback func(info *pb.RoundInfo, success bool) -//roundRequest is an internal structure which tracks a request +// roundRequest is an internal structure that tracks a request. type roundRequest struct { rid id.Round RoundResultCallback numAttempts uint } -func NewRetriever(param Params, comms RoundsComms, - sender gateway.Sender, events event.Manager) Retriever { +func NewRetriever(param Params, comms RoundsComms, sender gateway.Sender, + events event.Manager) Retriever { return &manager{ params: param, comms: comms, @@ -73,81 +73,79 @@ func NewRetriever(param Params, comms RoundsComms, } } -// LookupHistoricalRound sends the lookup request to the internal handler -// and will return the result on the callback when it returns -func (m *manager) LookupHistoricalRound(rid id.Round, callback RoundResultCallback) error { +// LookupHistoricalRound sends the lookup request to the internal handler and +// returns the result on the callback. +func (m *manager) LookupHistoricalRound( + rid id.Round, callback RoundResultCallback) error { if rid == 0 { - return errors.Errorf("Cannot lookup round 0, rounds start at 1") + return errors.New("Cannot look up round 0, rounds start at 1") } + select { - case m.c <- roundRequest{ - rid: rid, - RoundResultCallback: callback, - numAttempts: 0, - }: + case m.c <- roundRequest{rid, callback, 0}: return nil default: - return errors.Errorf("Cannot lookup round %d, "+ - "channel is full", rid) + return errors.Errorf("Cannot look up round %d, channel is full", rid) } } -// StartProcessies starts the Long running thread which -// process historical rounds. Can be killed by sending a -// signal to the quit channel -func (m *manager) StartProcessies() *stoppable.Single { +// StartProcesses starts the Long running thread that process historical rounds. +// The thread can be killed by sending a signal to the returned stoppable. +func (m *manager) StartProcesses() *stoppable.Single { stop := stoppable.NewSingle("TrackNetwork") go m.processHistoricalRounds(m.comms, stop) return stop } -// processHistoricalRounds is a long running thread which -// process historical rounds. Can be killed by sending -// a signal to the quit channel takes a comms interface to aid in testing -func (m *manager) processHistoricalRounds(comm RoundsComms, - stop *stoppable.Single) { - +// processHistoricalRounds is a long-running thread that process historical +// rounds. The thread can be killed by triggering the stoppable. It takes a +// comms interface to aid in testing. +func (m *manager) processHistoricalRounds(comm RoundsComms, stop *stoppable.Single) { timerCh := make(<-chan time.Time) - var roundRequests []roundRequest for { - shouldProcess := false - // wait for a quit or new round to check + + // Wait for a quit or new round to check select { case <-stop.Quit(): - // return all roundRequests in the queue to - // the input channel so they can be checked in - // the future. If the queue is full, disable - // them as processing so they are picked up - // from the beginning + // Return all roundRequests in the queue to the input channel so + // that they can be checked in the future. If the queue is full, + // then disable them as processing so that they are picked up from + // the beginning. for _, r := range roundRequests { select { case m.c <- r: default: } } + stop.ToStopped() return - // if the timer elapses process roundRequests to - // ensure the delay isn't too long + case <-timerCh: + // If the timer elapses, then process roundRequests to ensure that + // the delay is not too long if len(roundRequests) > 0 { shouldProcess = true } - // get new round to lookup and force a lookup if + case r := <-m.c: - jww.DEBUG.Printf("Received and queueing round %d for "+ - "historical rounds lookup", r.rid) + // Get new round to look up and force a lookup if + jww.DEBUG.Printf("Received and queueing round %d for historical "+ + "rounds lookup.", r.rid) + roundRequests = append(roundRequests, r) + if len(roundRequests) > int(m.params.MaxHistoricalRounds) { shouldProcess = true } else if len(roundRequests) != 0 { - //if this is the first round, start the timeout + // If this is the first round, start the timeout timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C } } + if !shouldProcess { continue } @@ -157,86 +155,82 @@ func (m *manager) processHistoricalRounds(comm RoundsComms, rounds[i] = uint64(rr.rid) } - //send the historical roundRequests request - hr := &pb.HistoricalRounds{ - Rounds: rounds, - } + // Send the historical roundRequests request + hr := &pb.HistoricalRounds{Rounds: rounds} var gwHost *connect.Host result, err := m.sender.SendToAny( func(host *connect.Host) (interface{}, error) { - jww.DEBUG.Printf("Requesting Historical "+ - "rounds %v from gateway %s", rounds, - host.GetId()) + jww.DEBUG.Printf("Requesting Historical rounds %v from "+ + "gateway %s", rounds, host.GetId()) + gwHost = host + return comm.RequestHistoricalRounds(host, hr) }, stop) if err != nil { - jww.ERROR.Printf("Failed to request historical "+ - "roundRequests data for rounds %v: %s", - rounds, err) - // if the check fails to resolve, break the - // loop and so they will be checked again + jww.ERROR.Printf("Failed to request historical roundRequests "+ + "data for rounds %v: %s", rounds, err) + + // If the check fails to resolve, then break the loop so that they + // will be checked again timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C continue } + response := result.(*pb.HistoricalRoundsResponse) rids, retries := processHistoricalRoundsResponse(response, - roundRequests, m.params.MaxHistoricalRoundsRetries, - m.events) + roundRequests, m.params.MaxHistoricalRoundsRetries, m.events) m.events.Report(1, "HistoricalRounds", "Metrics", - fmt.Sprintf("Received %d historical rounds from"+ - " gateway %s: %v", len(response.Rounds), gwHost, - rids)) + fmt.Sprintf("Received %d historical rounds from gateway %s: %v", + len(response.Rounds), gwHost, rids)) - // reset the buffer to those left to retry now that all - // have been checked + // Reset the buffer to those left to retry now that all have been + // checked roundRequests = retries - // Now reset the timer, this prevents immediate reprocessing - // of the retries, limiting it to the next historical round - // request when buffer is full OR next timer tick + + // Now reset the timer, this prevents immediate reprocessing of the + // retries, limiting it to the next historical round request when buffer + // is full OR next timer tick timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C } } func processHistoricalRoundsResponse(response *pb.HistoricalRoundsResponse, - roundRequests []roundRequest, maxRetries uint, - events event.Manager) ([]uint64, []roundRequest) { + roundRequests []roundRequest, maxRetries uint, events event.Manager) ( + []uint64, []roundRequest) { retries := make([]roundRequest, 0) rids := make([]uint64, 0) - // process the returned historical roundRequests. + + // Process the returned historical roundRequests for i, roundInfo := range response.Rounds { - // The interface has missing returns returned as nil, - // such roundRequests need be be removes as processing - // so the network follower will pick them up in the - // future. + // The interface has missing returns returned as nil, such roundRequests + // need to be removed as processing so that the network follower will + // pick them up in the future. if roundInfo == nil { var errMsg string roundRequests[i].numAttempts++ + if roundRequests[i].numAttempts == maxRetries { - errMsg = fmt.Sprintf("Failed to retrieve "+ - "historical round %d on last attempt,"+ - " will not try again", - roundRequests[i].rid) - go roundRequests[i].RoundResultCallback(nil, - false) + errMsg = fmt.Sprintf("Failed to retrieve historical round %d "+ + "on last attempt, will not try again", roundRequests[i].rid) + go roundRequests[i].RoundResultCallback(nil, false) } else { retries = append(retries, roundRequests[i]) - errMsg = fmt.Sprintf("Failed to retrieve "+ - "historical round %d, will try up to "+ - "%d more times", roundRequests[i].rid, + errMsg = fmt.Sprintf("Failed to retrieve historical round "+ + "%d, will try up to %d more times", roundRequests[i].rid, maxRetries-roundRequests[i].numAttempts) } + jww.WARN.Printf(errMsg) - events.Report(5, "HistoricalRounds", - "Error", errMsg) + events.Report(5, "HistoricalRounds", "Error", errMsg) continue } - // Successfully retrieved roundRequests are returned - // on the callback + + // Successfully retrieved roundRequests are returned on the callback go roundRequests[i].RoundResultCallback(roundInfo, true) rids = append(rids, roundInfo.ID) diff --git a/network/historical/historical_test.go b/network/historical/historical_test.go index 22fef0d99..676fb1c75 100644 --- a/network/historical/historical_test.go +++ b/network/historical/historical_test.go @@ -19,8 +19,8 @@ import ( "gitlab.com/xx_network/primitives/ndf" ) -// TestHistoricalRounds provides a smoke test to run through most of the code -// paths for historical round lookup. +// Provides a smoke test to run through most of the code paths for historical +// round lookup. func TestHistoricalRounds(t *testing.T) { params := GetDefaultParams() params.HistoricalRoundsPeriod = 500 * time.Millisecond @@ -29,89 +29,91 @@ func TestHistoricalRounds(t *testing.T) { sender := &testGWSender{sendCnt: 0} events := &testEventMgr{} hMgr := NewRetriever(params, comms, sender, events) - stopper := hMgr.StartProcessies() + stopper := hMgr.StartProcesses() - // case 1: Send a round request and wait for timeout for - // processing - hMgr.LookupHistoricalRound(42, func(info *pb.RoundInfo, success bool) { - t.Errorf("first called when it shouldn't") + // Case 1: Send a round request and wait for timeout for processing + err := hMgr.LookupHistoricalRound(42, func(*pb.RoundInfo, bool) { + t.Error("Called when it should not have been.") }) + if err != nil { + t.Errorf("Failed to look up historical round: %+v", err) + } time.Sleep(501 * time.Millisecond) if sender.sendCnt != 1 { - t.Errorf("did not send as expected") + t.Errorf("Did not send as expected") } - // case 2: make round requests up to m.params.MaxHistoricalRounds - for i := 0; i < 3; i++ { - hMgr.LookupHistoricalRound(id.Round(40+i), - func(info *pb.RoundInfo, success bool) { - t.Errorf("i called when it shouldn't") - }) + // Case 2: make round requests up to m.params.MaxHistoricalRounds + for i := id.Round(0); i < 3; i++ { + err = hMgr.LookupHistoricalRound(40+i, func(*pb.RoundInfo, bool) { + t.Errorf("%d called when it should not have been.", i) + }) + if err != nil { + t.Errorf("Failed to look up historical round (%d): %+v", i, err) + } } time.Sleep(10 * time.Millisecond) if sender.sendCnt != 2 { - t.Errorf("unexpected send count: %d != 2", sender.sendCnt) + t.Errorf("Unexpected send count.\nexpected: %d\nreceived: %d", + 2, sender.sendCnt) } - err := stopper.Close() + err = stopper.Close() if err != nil { - t.Errorf("%+v", err) + t.Error(err) } if stopper.IsRunning() { - t.Errorf("historical rounds routine failed to close") + t.Errorf("Historical rounds routine failed to close.") } } -// TestHistoricalRoundsProcessing exercises the func TestProcessHistoricalRoundsResponse(t *testing.T) { params := GetDefaultParams() - bad_rr := roundRequest{ + badRR := roundRequest{ rid: id.Round(41), - RoundResultCallback: func(info *pb.RoundInfo, success bool) { - t.Errorf("bad called when it shouldn't") + RoundResultCallback: func(*pb.RoundInfo, bool) { + t.Error("Called when it should not have been.") }, numAttempts: params.MaxHistoricalRoundsRetries - 2, } - expired_rr := roundRequest{ + expiredRR := roundRequest{ rid: id.Round(42), RoundResultCallback: func(info *pb.RoundInfo, success bool) { if info == nil && !success { return } - t.Errorf("expired called with bad params") + t.Errorf("Expired called with bad params.") }, numAttempts: params.MaxHistoricalRoundsRetries - 1, } x := false callbackCalled := &x - good_rr := roundRequest{ + goodRR := roundRequest{ rid: id.Round(43), RoundResultCallback: func(info *pb.RoundInfo, success bool) { *callbackCalled = true }, numAttempts: 0, } - rrs := []roundRequest{bad_rr, expired_rr, good_rr} - rifs := make([]*pb.RoundInfo, 3) - rifs[0] = nil - rifs[1] = nil - rifs[2] = &pb.RoundInfo{ID: 43} - response := &pb.HistoricalRoundsResponse{ - Rounds: rifs, - } + rrs := []roundRequest{badRR, expiredRR, goodRR} + infos := make([]*pb.RoundInfo, 3) + infos[0] = nil + infos[1] = nil + infos[2] = &pb.RoundInfo{ID: 43} + response := &pb.HistoricalRoundsResponse{Rounds: infos} events := &testEventMgr{} rids, retries := processHistoricalRoundsResponse(response, rrs, params.MaxHistoricalRoundsRetries, events) if len(rids) != 1 || rids[0] != 43 { - t.Errorf("bad return: %v, expected [43]", rids) + t.Errorf("Bad return: %v, expected [43]", rids) } - // Note: 1 of the entries was expired, thats why this is not 2. + // Note: one of the entries was expired, that is why this is not 2. if len(retries) != 1 { t.Errorf("retries not right length: %d != 1", len(retries)) } @@ -123,14 +125,14 @@ func TestProcessHistoricalRoundsResponse(t *testing.T) { } } -// Test structure implementations +// Test structure implementations. type testRoundsComms struct{} -func (t *testRoundsComms) GetHost(hostId *id.ID) (*connect.Host, bool) { +func (t *testRoundsComms) GetHost(*id.ID) (*connect.Host, bool) { return nil, false } -func (t *testRoundsComms) RequestHistoricalRounds(host *connect.Host, - message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) { +func (t *testRoundsComms) RequestHistoricalRounds(*connect.Host, + *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) { return nil, nil } @@ -138,27 +140,28 @@ type testGWSender struct { sendCnt int } -func (t *testGWSender) SendToAny(sendFunc func(host *connect.Host) (interface{}, - error), stop *stoppable.Single) (interface{}, error) { - // this is always called with at least 1 round info set - rifs := make([]*pb.RoundInfo, 1) - rifs[0] = nil - m := &pb.HistoricalRoundsResponse{Rounds: rifs} +func (t *testGWSender) SendToAny(func(host *connect.Host) (interface{}, error), + *stoppable.Single) (interface{}, error) { + // This is always called with at least one round info set + infos := make([]*pb.RoundInfo, 1) + infos[0] = nil + m := &pb.HistoricalRoundsResponse{Rounds: infos} t.sendCnt += 1 + return m, nil } -func (t *testGWSender) SendToPreferred(targets []*id.ID, sendFunc gateway.SendToPreferredFunc, - stop *stoppable.Single, timeout time.Duration) (interface{}, error) { + +func (t *testGWSender) SendToPreferred([]*id.ID, gateway.SendToPreferredFunc, + *stoppable.Single, time.Duration) (interface{}, error) { return t, nil } -func (t *testGWSender) UpdateNdf(ndf *ndf.NetworkDefinition) { -} -func (t *testGWSender) SetGatewayFilter(f gateway.Filter) {} + +func (t *testGWSender) UpdateNdf(*ndf.NetworkDefinition) {} +func (t *testGWSender) SetGatewayFilter(gateway.Filter) {} func (t *testGWSender) GetHostParams() connect.HostParams { return connect.GetDefaultHostParams() } type testEventMgr struct{} -func (t *testEventMgr) Report(priority int, category, evtType, details string) { -} +func (t *testEventMgr) Report(int, string, string, string) {} diff --git a/network/historical/params.go b/network/historical/params.go index b9965394e..ae97e1921 100644 --- a/network/historical/params.go +++ b/network/historical/params.go @@ -3,25 +3,27 @@ package historical import "time" type Params struct { - // Number of historical rounds required to automatically send a historical - // rounds query + // MaxHistoricalRounds is the number of historical rounds required to + // automatically send a historical rounds query. MaxHistoricalRounds uint - // Maximum period of time a pending historical round query will wait before - // it is transmitted + + // HistoricalRoundsPeriod is the maximum period of time a pending historical + // round query will wait before it is transmitted. HistoricalRoundsPeriod time.Duration - // Length of historical rounds channel buffer + // HistoricalRoundsBufferLen is the length of historical rounds channel + // buffer. HistoricalRoundsBufferLen uint - // Maximum number of times a historical round lookup will be attempted + // MaxHistoricalRoundsRetries is the maximum number of times a historical + // round lookup will be attempted. MaxHistoricalRoundsRetries uint } func GetDefaultParams() Params { return Params{ - MaxHistoricalRounds: 100, - HistoricalRoundsPeriod: 100 * time.Millisecond, - + MaxHistoricalRounds: 100, + HistoricalRoundsPeriod: 100 * time.Millisecond, HistoricalRoundsBufferLen: 1000, MaxHistoricalRoundsRetries: 3, } diff --git a/network/identity/tracker.go b/network/identity/tracker.go index 3d1a34c19..5dd895176 100644 --- a/network/identity/tracker.go +++ b/network/identity/tracker.go @@ -45,7 +45,7 @@ const ( const DefaultExtraChecks = 10 type Tracker interface { - StartProcessies() stoppable.Stoppable + StartProcesses() stoppable.Stoppable AddIdentity(id *id.ID, validUntil time.Time, persistent bool) RemoveIdentity(id *id.ID) GetEphemeralIdentity(rng io.Reader, addressSize uint8) (receptionID.IdentityUse, error) @@ -125,9 +125,9 @@ func NewOrLoadTracker(session storage.Session, addrSpace address.Space) *manager return t } -// StartProcessies track runs a thread which checks for past and present address +// StartProcesses track runs a thread which checks for past and present address // ID. -func (t manager) StartProcessies() stoppable.Stoppable { +func (t manager) StartProcesses() stoppable.Stoppable { stop := stoppable.NewSingle(ephemeralStoppable) go t.track(stop) diff --git a/network/manager.go b/network/manager.go index 441d90390..219619987 100644 --- a/network/manager.go +++ b/network/manager.go @@ -203,7 +203,7 @@ func (m *manager) Follow(report ClientErrorReport) (stoppable.Stoppable, error) multi := stoppable.NewMulti("networkManager") // health tracker - healthStop, err := m.Monitor.StartProcessies() + healthStop, err := m.Monitor.StartProcesses() if err != nil { return nil, errors.Errorf("failed to follow") } @@ -225,10 +225,10 @@ func (m *manager) Follow(report ClientErrorReport) (stoppable.Stoppable, error) multi.Add(m.Pickup.StartProcessors()) // Historical rounds processing - multi.Add(m.Retriever.StartProcessies()) + multi.Add(m.Retriever.StartProcesses()) // Start the processes for the identity handler - multi.Add(m.Tracker.StartProcessies()) + multi.Add(m.Tracker.StartProcesses()) return multi, nil } diff --git a/network/message/inProgress_test.go b/network/message/inProgress_test.go index 0220c26d8..70d710732 100644 --- a/network/message/inProgress_test.go +++ b/network/message/inProgress_test.go @@ -28,7 +28,7 @@ func Test_pickup_CheckInProgressMessages(t *testing.T) { MessageReceptionWorkerPoolSize: 20, MaxChecksInProcessMessage: 20, InProcessMessageWait: time.Hour, - }, kv, nil).(*handler) + }, kv, nil, nil).(*handler) msg := makeTestFormatMessages(1)[0] cid := id.NewIdFromString("clientID", id.User, t) diff --git a/network/nodes/mixCypher.go b/network/nodes/mixCypher.go index 730212def..98b60efb0 100644 --- a/network/nodes/mixCypher.go +++ b/network/nodes/mixCypher.go @@ -18,7 +18,8 @@ import ( ) type MixCypher interface { - Encrypt(msg format.Message, salt []byte, roundID id.Round) (format.Message, [][]byte) + Encrypt(msg format.Message, salt []byte, roundID id.Round) ( + format.Message, [][]byte) MakeClientGatewayAuthMAC(salt, digest []byte) []byte } @@ -27,14 +28,15 @@ type mixCypher struct { g *cyclic.Group } -// Encrypt encrypts the given message for CMIX. -// Panics if the passed message is not sized correctly for the group. -func (mc *mixCypher) Encrypt(msg format.Message, - salt []byte, roundID id.Round) (format.Message, [][]byte) { +// Encrypt encrypts the given message for CMIX. Panics if the passed message is +// not sized correctly for the group. +func (mc *mixCypher) Encrypt(msg format.Message, salt []byte, roundID id.Round) ( + format.Message, [][]byte) { if msg.GetPrimeByteLen() != mc.g.GetP().ByteLen() { - jww.FATAL.Panicf("Cannot encrypt message whose size does not " + - "align with the size of the prime") + jww.FATAL.Panicf("Cannot encrypt message when its size (%d) is not "+ + "the same as the size of the prime (%d)", + msg.GetPrimeByteLen(), mc.g.GetP().ByteLen()) } keys := make([]*cyclic.Int, len(mc.keys)) @@ -48,7 +50,7 @@ func (mc *mixCypher) Encrypt(msg format.Message, h, err := hash.NewCMixHash() if err != nil { - jww.FATAL.Panicf("Could not get hash for KMAC generation: %+v", h) + jww.FATAL.Panicf("Could not get hash for KMAC generation: %+v", err) } KMAC := cmix.GenerateKMACs(salt, keys, roundID, h) @@ -62,6 +64,7 @@ func (mc *mixCypher) MakeClientGatewayAuthMAC(salt, digest []byte) []byte { h.Write(clientGatewayKey) h.Write(salt) hashed := h.Sum(nil) + h.Reset() h.Write(hashed) h.Write(digest) @@ -71,20 +74,20 @@ func (mc *mixCypher) MakeClientGatewayAuthMAC(salt, digest []byte) []byte { func clientEncrypt(grp *cyclic.Group, msg format.Message, salt []byte, roundID id.Round, baseKeys []*cyclic.Int) format.Message { - // get the salt for associated data + // Get the salt for associated data h, err := blake2b.New256(nil) if err != nil { - panic("E2E Client Encrypt could not get blake2b Hash") + jww.FATAL.Panicf("Failed to get blake2b hash: %+v", err) } h.Reset() h.Write(salt) salt2 := h.Sum(nil) - // get encryption keys + // Get encryption keys keyEcrA := cmix.ClientKeyGen(grp, salt, roundID, baseKeys) keyEcrB := cmix.ClientKeyGen(grp, salt2, roundID, baseKeys) - // get message payloads as cyclic integers + // Get message payloads as cyclic integers payloadA := grp.NewIntFromBytes(msg.GetPayloadA()) payloadB := grp.NewIntFromBytes(msg.GetPayloadB()) diff --git a/network/nodes/mixCypher_test.go b/network/nodes/mixCypher_test.go index 12b65210b..14727c138 100644 --- a/network/nodes/mixCypher_test.go +++ b/network/nodes/mixCypher_test.go @@ -23,73 +23,70 @@ import ( func TestRoundKeys_Encrypt_Consistency(t *testing.T) { const numKeys = 5 - expectedPayload := []byte{220, 95, 160, 88, 229, 136, 42, 254, 239, 32, - 57, 120, 7, 187, 69, 66, 199, 95, 136, 118, 130, 192, 167, 143, - 94, 80, 250, 22, 85, 47, 200, 208, 68, 179, 143, 31, 21, 215, - 17, 117, 179, 170, 67, 59, 14, 158, 116, 249, 10, 116, 166, 127, - 168, 26, 11, 41, 129, 166, 133, 135, 93, 217, 61, 99, 29, 198, - 86, 34, 83, 72, 158, 44, 178, 57, 158, 168, 107, 43, 54, 107, - 183, 16, 149, 133, 109, 166, 154, 248, 185, 218, 32, 11, 200, - 191, 240, 197, 27, 21, 82, 198, 42, 109, 79, 28, 116, 64, 34, - 44, 178, 75, 142, 79, 17, 31, 17, 196, 104, 20, 44, 125, 80, 72, - 205, 76, 23, 69, 132, 176, 180, 211, 193, 200, 175, 149, 133, 2, - 153, 114, 21, 239, 107, 46, 237, 41, 48, 188, 241, 97, 89, 65, - 213, 218, 73, 38, 213, 194, 113, 142, 203, 176, 124, 222, 172, - 128, 152, 228, 18, 128, 26, 122, 199, 192, 255, 84, 222, 165, - 77, 199, 57, 56, 7, 72, 20, 158, 133, 90, 63, 68, 145, 54, 34, - 223, 152, 157, 105, 217, 30, 111, 83, 4, 200, 125, 120, 189, - 232, 146, 130, 84, 119, 240, 144, 166, 111, 6, 56, 26, 93, 95, - 69, 225, 103, 174, 211, 204, 66, 181, 33, 198, 65, 140, 53, 255, - 37, 120, 204, 59, 128, 70, 54, 228, 26, 197, 107, 186, 22, 93, - 189, 234, 89, 217, 90, 133, 153, 189, 114, 73, 75, 55, 77, 209, - 136, 102, 193, 60, 241, 25, 101, 238, 162, 49, 94, 219, 46, 152, - 100, 120, 152, 131, 78, 128, 226, 47, 21, 253, 171, 40, 122, 161, - 69, 56, 102, 63, 89, 160, 209, 219, 142, 51, 179, 165, 243, 70, - 137, 24, 221, 105, 39, 0, 214, 201, 221, 184, 104, 165, 44, 82, - 13, 239, 197, 80, 252, 200, 115, 146, 200, 51, 63, 173, 88, 163, - 3, 214, 135, 89, 118, 99, 197, 98, 80, 176, 150, 139, 71, 6, 7, - 37, 252, 82, 225, 187, 212, 65, 4, 154, 28, 170, 224, 242, 17, - 68, 245, 73, 234, 216, 255, 2, 168, 235, 116, 147, 252, 217, 85, - 157, 38, 243, 43, 213, 250, 219, 124, 86, 155, 129, 99, 195, - 217, 163, 9, 133, 217, 6, 77, 127, 88, 168, 217, 84, 66, 224, - 90, 11, 210, 218, 215, 143, 239, 221, 138, 231, 57, 149, 175, - 221, 188, 128, 169, 28, 215, 39, 147, 36, 52, 146, 75, 20, 228, - 230, 197, 1, 80, 38, 208, 139, 4, 240, 163, 104, 158, 49, 29, - 248, 206, 79, 52, 203, 219, 178, 46, 81, 170, 100, 14, 253, 150, - 240, 191, 92, 18, 23, 94, 73, 110, 212, 237, 84, 86, 102, 32, - 78, 209, 207, 213, 117, 141, 148, 218, 209, 253, 220, 108, 135, - 163, 159, 134, 125, 6, 225, 163, 35, 115, 146, 103, 169, 152, - 251, 188, 125, 159, 185, 119, 67, 80, 92, 232, 208, 1, 32, 144, - 250, 32, 187} - - expectedKmacs := [][]byte{{110, 235, 79, 128, 16, 94, 181, 95, 101, - 152, 187, 204, 87, 236, 211, 102, 88, 130, 191, 103, 23, 229, - 48, 142, 155, 167, 200, 108, 66, 172, 178, 209}, - {48, 74, 148, 205, 235, 46, 172, 128, 28, 42, 116, 27, 64, 83, 122, - 5, 51, 162, 200, 198, 234, 92, 77, 131, 136, 108, 57, 97, 193, - 208, 148, 217}, - {202, 163, 19, 179, 175, 100, 71, 176, 241, 80, 85, 174, 120, 45, - 152, 117, 82, 193, 203, 188, 158, 60, 111, 217, 64, 47, 219, - 169, 100, 177, 42, 159}, - {66, 121, 20, 21, 206, 142, 3, 75, 229, 94, 197, 4, 117, 223, 245, - 117, 14, 17, 158, 138, 176, 106, 93, 55, 247, 155, 250, 232, - 41, 169, 197, 150}, + expectedPayload := []byte{220, 95, 160, 88, 229, 136, 42, 254, 239, 32, 57, + 120, 7, 187, 69, 66, 199, 95, 136, 118, 130, 192, 167, 143, 94, 80, 250, + 22, 85, 47, 200, 208, 68, 179, 143, 31, 21, 215, 17, 117, 179, 170, 67, + 59, 14, 158, 116, 249, 10, 116, 166, 127, 168, 26, 11, 41, 129, 166, + 133, 135, 93, 217, 61, 99, 29, 198, 86, 34, 83, 72, 158, 44, 178, 57, + 158, 168, 107, 43, 54, 107, 183, 16, 149, 133, 109, 166, 154, 248, 185, + 218, 32, 11, 200, 191, 240, 197, 27, 21, 82, 198, 42, 109, 79, 28, 116, + 64, 34, 44, 178, 75, 142, 79, 17, 31, 17, 196, 104, 20, 44, 125, 80, 72, + 205, 76, 23, 69, 132, 176, 180, 211, 193, 200, 175, 149, 133, 2, 153, + 114, 21, 239, 107, 46, 237, 41, 48, 188, 241, 97, 89, 65, 213, 218, 73, + 38, 213, 194, 113, 142, 203, 176, 124, 222, 172, 128, 152, 228, 18, 128, + 26, 122, 199, 192, 255, 84, 222, 165, 77, 199, 57, 56, 7, 72, 20, 158, + 133, 90, 63, 68, 145, 54, 34, 223, 152, 157, 105, 217, 30, 111, 83, 4, + 200, 125, 120, 189, 232, 146, 130, 84, 119, 240, 144, 166, 111, 6, 56, + 26, 93, 95, 69, 225, 103, 174, 211, 204, 66, 181, 33, 198, 65, 140, 53, + 255, 37, 120, 204, 59, 128, 70, 54, 228, 26, 197, 107, 186, 22, 93, 189, + 234, 89, 217, 90, 133, 153, 189, 114, 73, 75, 55, 77, 209, 136, 102, + 193, 60, 241, 25, 101, 238, 162, 49, 94, 219, 46, 152, 100, 120, 152, + 131, 78, 128, 226, 47, 21, 253, 171, 40, 122, 161, 69, 56, 102, 63, 89, + 160, 209, 219, 142, 51, 179, 165, 243, 70, 137, 24, 221, 105, 39, 0, + 214, 201, 221, 184, 104, 165, 44, 82, 13, 239, 197, 80, 252, 200, 115, + 146, 200, 51, 63, 173, 88, 163, 3, 214, 135, 89, 118, 99, 197, 98, 80, + 176, 150, 139, 71, 6, 7, 37, 252, 82, 225, 187, 212, 65, 4, 154, 28, + 170, 224, 242, 17, 68, 245, 73, 234, 216, 255, 2, 168, 235, 116, 147, + 252, 217, 85, 157, 38, 243, 43, 213, 250, 219, 124, 86, 155, 129, 99, + 195, 217, 163, 9, 133, 217, 6, 77, 127, 88, 168, 217, 84, 66, 224, 90, + 11, 210, 218, 215, 143, 239, 221, 138, 231, 57, 149, 175, 221, 188, 128, + 169, 28, 215, 39, 147, 36, 52, 146, 75, 20, 228, 230, 197, 1, 80, 38, + 208, 139, 4, 240, 163, 104, 158, 49, 29, 248, 206, 79, 52, 203, 219, + 178, 46, 81, 170, 100, 14, 253, 150, 240, 191, 92, 18, 23, 94, 73, 110, + 212, 237, 84, 86, 102, 32, 78, 209, 207, 213, 117, 141, 148, 218, 209, + 253, 220, 108, 135, 163, 159, 134, 125, 6, 225, 163, 35, 115, 146, 103, + 169, 152, 251, 188, 125, 159, 185, 119, 67, 80, 92, 232, 208, 1, 32, + 144, 250, 32, 187} + + expectedKmacs := [][]byte{ + {110, 235, 79, 128, 16, 94, 181, 95, 101, 152, 187, 204, 87, 236, 211, + 102, 88, 130, 191, 103, 23, 229, 48, 142, 155, 167, 200, 108, 66, + 172, 178, 209}, + {48, 74, 148, 205, 235, 46, 172, 128, 28, 42, 116, 27, 64, 83, 122, 5, + 51, 162, 200, 198, 234, 92, 77, 131, 136, 108, 57, 97, 193, 208, + 148, 217}, + {202, 163, 19, 179, 175, 100, 71, 176, 241, 80, 85, 174, 120, 45, 152, + 117, 82, 193, 203, 188, 158, 60, 111, 217, 64, 47, 219, 169, 100, + 177, 42, 159}, + {66, 121, 20, 21, 206, 142, 3, 75, 229, 94, 197, 4, 117, 223, 245, 117, + 14, 17, 158, 138, 176, 106, 93, 55, 247, 155, 250, 232, 41, 169, + 197, 150}, {65, 74, 222, 172, 217, 13, 56, 208, 111, 98, 199, 205, 74, 141, 30, - 109, 51, 20, 186, 9, 234, 197, 6, 200, 139, 86, 139, 130, 8, 15, - 32, 209}} + 109, 51, 20, 186, 9, 234, 197, 6, 200, 139, 86, 139, 130, 8, 15, 32, + 209}, + } cmixGrp := cyclic.NewGroup( - large.NewIntFromString("FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1"+ - "29024E088A67CC74020BBEA63B139B22514A08798E3404DD"+ - "EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245"+ - "E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED"+ - "EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D"+ - "C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F"+ - "83655D23DCA3AD961C62F356208552BB9ED529077096966D"+ - "670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B"+ - "E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9"+ - "DE2BCBF6955817183995497CEA956AE515D2261898FA0510"+ - "15728E5A8AACAA68FFFFFFFFFFFFFFFF", 16), + large.NewIntFromString( + "FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74"+ + "020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F"+ + "14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6"+ + "F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC200"+ + "7CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD96"+ + "1C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18"+ + "217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF0"+ + "6F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA05101572"+ + "8E5A8AACAA68FFFFFFFFFFFFFFFF", 16), large.NewIntFromString("2", 16)) prng := rand.New(rand.NewSource(42)) @@ -97,10 +94,9 @@ func TestRoundKeys_Encrypt_Consistency(t *testing.T) { keys := make([]*key, numKeys) for i := 0; i < numKeys; i++ { - keyBytes, _ := csprng.GenerateInGroup(cmixGrp.GetPBytes(), cmixGrp.GetP().ByteLen(), prng) - keys[i] = &key{ - k: cmixGrp.NewIntFromBytes(keyBytes), - } + keyBytes, _ := csprng.GenerateInGroup( + cmixGrp.GetPBytes(), cmixGrp.GetP().ByteLen(), prng) + keys[i] = &key{k: cmixGrp.NewIntFromBytes(keyBytes)} } salt := make([]byte, 32) @@ -111,22 +107,19 @@ func TestRoundKeys_Encrypt_Consistency(t *testing.T) { prng.Read(contents) msg.SetContents(contents) - rk := mixCypher{ - keys: keys, - g: cmixGrp, - } + rk := mixCypher{keys, cmixGrp} rid := id.Round(42) encMsg, kmacs := rk.Encrypt(msg, salt, rid) if !bytes.Equal(encMsg.Marshal(), expectedPayload) { - t.Errorf("Encrypted messages do not match\n "+ - "expected: %v\n received: %v", expectedPayload, encMsg.Marshal()) + t.Errorf("Encrypted messages do not match.\nexpected: %v\nreceived: %v", + expectedPayload, encMsg.Marshal()) } if !reflect.DeepEqual(kmacs, expectedKmacs) { - t.Errorf("kmacs do not match\n "+ - "expected: %v\n received: %v", expectedKmacs, kmacs) + t.Errorf("KMACs do not match.\nexpected: %v\nreceived: %v", + expectedKmacs, kmacs) } } diff --git a/network/nodes/register.go b/network/nodes/register.go index 0a4b1d670..2ab22c112 100644 --- a/network/nodes/register.go +++ b/network/nodes/register.go @@ -9,7 +9,7 @@ package nodes import ( "crypto/sha256" - "fmt" + "encoding/hex" "github.com/golang/protobuf/proto" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" @@ -38,6 +38,7 @@ import ( func registerNodes(r *registrar, s storage.Session, stop *stoppable.Single, inProgress, attempts *sync.Map) { + interval := time.Duration(500) * time.Millisecond t := time.NewTicker(interval) for { @@ -46,9 +47,10 @@ func registerNodes(r *registrar, s storage.Session, stop *stoppable.Single, t.Stop() stop.ToStopped() return + case gw := <-r.c: rng := r.rng.GetStream() - nidStr := fmt.Sprintf("%x", gw.Node.ID) + nidStr := hex.EncodeToString(gw.Node.ID) nid, err := gw.Node.GetNodeId() if err != nil { jww.WARN.Printf( @@ -74,7 +76,8 @@ func registerNodes(r *registrar, s storage.Session, stop *stoppable.Single, // No need to register with stale nodes if isStale := gw.Node.Status == ndf.Stale; isStale { - jww.DEBUG.Printf("Skipping registration with stale nodes %s", nidStr) + jww.DEBUG.Printf( + "Skipping registration with stale nodes %s", nidStr) continue } err = registerWithNode(r.sender, r.comms, gw, s, r, rng, stop) @@ -97,15 +100,15 @@ func registerNodes(r *registrar, s storage.Session, stop *stoppable.Single, } } -// registerWithNode serves as a helper for RegisterWithNodes. -// It registers a user with a specific in the client's NDF. +// registerWithNode serves as a helper for registerNodes. It registers a user +// with a specific in the client's NDF. func registerWithNode(sender gateway.Sender, comms RegisterNodeCommsInterface, ngw network.NodeGateway, s storage.Session, r *registrar, rng csprng.Source, stop *stoppable.Single) error { nodeID, err := ngw.Node.GetNodeId() if err != nil { - jww.ERROR.Println("registerWithNode() failed to decode nodeId") + jww.ERROR.Print("registerWithNode failed to decode node ID: %v", err) return err } @@ -113,12 +116,12 @@ func registerWithNode(sender gateway.Sender, comms RegisterNodeCommsInterface, return nil } - jww.INFO.Printf("registerWithNode() begin registration with nodes: %s", nodeID) + jww.INFO.Printf("registerWithNode begin registration with node: %s", nodeID) var transmissionKey *cyclic.Int var validUntil uint64 var keyId []byte - // TODO: should move this to a precanned user initialization + // TODO: should move this to a pre-canned user initialization if s.IsPrecanned() { userNum := int(s.GetTransmissionID().Bytes()[7]) h := sha256.New() @@ -140,7 +143,7 @@ func registerWithNode(sender gateway.Sender, comms RegisterNodeCommsInterface, r.add(nodeID, transmissionKey, validUntil, keyId) - jww.INFO.Printf("Completed registration with nodes %s", nodeID) + jww.INFO.Printf("Completed registration with node %s", nodeID) return nil } @@ -198,8 +201,7 @@ func requestKey(sender gateway.Sender, comms RegisterNodeCommsInterface, data := h.Sum(nil) // Sign DH pubkey - clientSig, err := rsa.Sign(rng, s.GetTransmissionRSA(), opts.Hash, - data, opts) + clientSig, err := rsa.Sign(rng, s.GetTransmissionRSA(), opts.Hash, data, opts) if err != nil { return nil, nil, 0, err } @@ -207,7 +209,7 @@ func requestKey(sender gateway.Sender, comms RegisterNodeCommsInterface, gwId := ngw.Gateway.ID gatewayID, err := id.Unmarshal(gwId) if err != nil { - jww.ERROR.Println("registerWithNode() failed to decode gatewayID") + jww.ERROR.Printf("registerWithNode failed to decode gateway ID: %v", err) return nil, nil, 0, err } @@ -215,20 +217,21 @@ func requestKey(sender gateway.Sender, comms RegisterNodeCommsInterface, jww.INFO.Printf("Register: Requesting client key from gateway %s", gatewayID) result, err := sender.SendToAny(func(host *connect.Host) (interface{}, error) { - keyResponse, err := comms.SendRequestClientKeyMessage(host, + keyResponse, err2 := comms.SendRequestClientKeyMessage(host, &pb.SignedClientKeyRequest{ ClientKeyRequest: serializedMessage, ClientKeyRequestSignature: &messages.RSASignature{Signature: clientSig}, Target: gatewayID.Bytes(), }) - if err != nil { - return nil, errors.WithMessage(err, + if err2 != nil { + return nil, errors.WithMessage(err2, "Register: Failed requesting client key from gateway") } if keyResponse.Error != "" { - return nil, errors.WithMessage(err, + return nil, errors.WithMessage(err2, "requestKey: clientKeyResponse error") } + return keyResponse, nil }, stop) @@ -287,8 +290,9 @@ func requestKey(sender gateway.Sender, comms RegisterNodeCommsInterface, // Verify the HMAC h.Reset() - if !registration.VerifyClientHMAC(sessionKey.Bytes(), keyResponse.EncryptedClientKey, - opts.Hash.New, keyResponse.EncryptedClientKeyHMAC) { + if !registration.VerifyClientHMAC(sessionKey.Bytes(), + keyResponse.EncryptedClientKey, opts.Hash.New, + keyResponse.EncryptedClientKeyHMAC) { return nil, nil, 0, errors.New("Failed to verify client HMAC") } diff --git a/network/nodes/registrar.go b/network/nodes/registrar.go index 9c3b8c726..6af74986c 100644 --- a/network/nodes/registrar.go +++ b/network/nodes/registrar.go @@ -1,7 +1,6 @@ package nodes import ( - "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/network/gateway" @@ -14,6 +13,7 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" + "strconv" "sync" "time" ) @@ -57,11 +57,12 @@ type registrar struct { c chan network.NodeGateway } -// LoadRegistrar loads a registrar from disk, and creates a new one if it does -// not exist. -func LoadRegistrar(session storage.Session, - sender gateway.Sender, comms RegisterNodeCommsInterface, - rngGen *fastRNG.StreamGenerator, c chan network.NodeGateway) (Registrar, error) { +// LoadRegistrar loads a Registrar from disk or creates a new one if it does not +// exist. +func LoadRegistrar(session storage.Session, sender gateway.Sender, + comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator, + c chan network.NodeGateway) (Registrar, error) { + kv := session.GetKV().Prefix(prefix) r := ®istrar{ nodes: make(map[id.ID]*key), @@ -69,9 +70,9 @@ func LoadRegistrar(session storage.Session, } obj, err := kv.Get(storeKey, currentKeyVersion) - // If there is no stored data, make a new node handler if err != nil { - jww.WARN.Printf("Failed to load Node Registrar, creating a new object") + // If there is no stored data, make a new node handler + jww.WARN.Printf("Failed to load Node Registrar, creating a new object.") err = r.save() if err != nil { return nil, errors.WithMessagef(err, "Failed to make a new registrar") @@ -97,12 +98,13 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { multi := stoppable.NewMulti("NodeRegistrations") inProgress := &sync.Map{} + // We are relying on the in progress check to ensure there is only a single // operator at a time, as a result this is a map of ID -> int attempts := &sync.Map{} for i := uint(0); i < numParallel; i++ { - stop := stoppable.NewSingle(fmt.Sprintf("NodeRegistration %d", i)) + stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i))) go registerNodes(r, r.session, stop, inProgress, attempts) multi.Add(stop) @@ -114,24 +116,26 @@ func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable { func (r *registrar) GetInputChannel() chan<- network.NodeGateway { return r.c } + func (r *registrar) TriggerNodeRegistration(nid *id.ID) { r.c <- network.NodeGateway{ - Node: ndf.Node{ID: nid.Marshal(), - //status must be active because it is in a round - Status: ndf.Active}, + Node: ndf.Node{ + ID: nid.Marshal(), + Status: ndf.Active, // Must be active because it is in a round + }, } } -// GetKeys returns a MixCypher for the topology and a list of nodes it did -// not have a key for. If there are missing keys, then returns nil MixCypher. +// GetNodeKeys returns a MixCypher for the topology and a list of nodes it did +// not have a key for. If there are missing keys, then returns nil. func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) { r.mux.RLock() defer r.mux.RUnlock() keys := make([]*key, topology.Len()) - // Get keys for every node. If it cannot be found, thn add it to the missing - // nodes list so that it can be + // Get keys for every node. If it cannot be found, then add it to the + // missing nodes list so that it can be. for i := 0; i < topology.Len(); i++ { nid := topology.GetNodeAtIndex(i) k, ok := r.nodes[*nid] @@ -139,9 +143,10 @@ func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) { r.c <- network.NodeGateway{ Node: ndf.Node{ ID: nid.Marshal(), - Status: ndf.Active, // Status must be active because it is in a round + Status: ndf.Active, // Must be active because it is in a round }, } + return nil, errors.Errorf( "cannot get key for %s, triggered registration", nid) } else { @@ -157,19 +162,22 @@ func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) { return rk, nil } -// Has returns if the store has the nodes. +// HasNode returns true if the registrar has the node. func (r *registrar) HasNode(nid *id.ID) bool { r.mux.RLock() + defer r.mux.RUnlock() + _, exists := r.nodes[*nid] - r.mux.RUnlock() + return exists } +// RemoveNode removes the node from the registrar. func (r *registrar) RemoveNode(nid *id.ID) { r.remove(nid) } -// NumRegistered returns the number of registered nodes. +// NumRegisteredNodes returns the number of registered nodes. func (r *registrar) NumRegisteredNodes() int { r.mux.RLock() defer r.mux.RUnlock() diff --git a/network/nodes/store.go b/network/nodes/store.go index 8a576abbd..ff4511934 100644 --- a/network/nodes/store.go +++ b/network/nodes/store.go @@ -17,11 +17,10 @@ import ( "gitlab.com/xx_network/primitives/netTime" ) -const prefix = "cmix" -const currentStoreVersion = 0 const ( - storeKey = "KeyStore" - grpKey = "GroupKey" + prefix = "cmix" + currentStoreVersion = 0 + storeKey = "KeyStore" ) // Add adds the key for a round to the cMix storage object. Saves the updated diff --git a/network/rounds/retrieve_test.go b/network/rounds/retrieve_test.go index 81f2a3871..1d5a5225e 100644 --- a/network/rounds/retrieve_test.go +++ b/network/rounds/retrieve_test.go @@ -48,8 +48,8 @@ func TestManager_ProcessMessageRetrieval(t *testing.T) { t.Errorf(err.Error()) } - // Create a local channel so reception is possible (testManager.messageBundles is - // send only via newManager call above) + // Create a local channel so reception is possible + // (testManager.messageBundles is sent only via newManager call above) messageBundleChan := make(chan message.Bundle) testManager.messageBundles = messageBundleChan @@ -218,8 +218,8 @@ func TestManager_ProcessMessageRetrieval_FalsePositive(t *testing.T) { testManager.rng, testNdf, mockComms, testManager.session, nil) - // Create a local channel so reception is possible (testManager.messageBundles is - // send only via newManager call above) + // Create a local channel so reception is possible + // (testManager.messageBundles is sent only via newManager call above) messageBundleChan := make(chan message.Bundle) testManager.messageBundles = messageBundleChan @@ -285,8 +285,8 @@ func TestManager_ProcessMessageRetrieval_Quit(t *testing.T) { mockComms := &mockMessageRetrievalComms{testingSignature: t} stop := stoppable.NewSingle("singleStoppable") - // Create a local channel so reception is possible (testManager.messageBundles is - // send only via newManager call above) + // Create a local channel so reception is possible + // (testManager.messageBundles is sent only via newManager call above) messageBundleChan := make(chan message.Bundle) testManager.messageBundles = messageBundleChan @@ -367,8 +367,8 @@ func TestManager_ProcessMessageRetrieval_MultipleGateways(t *testing.T) { testManager.rng, testNdf, mockComms, testManager.session, nil) - // Create a local channel so reception is possible (testManager.messageBundles is - // send only via newManager call above) + // Create a local channel so reception is possible + // (testManager.messageBundles is sent only via newManager call above) messageBundleChan := make(chan message.Bundle) testManager.messageBundles = messageBundleChan diff --git a/network/rounds/store/uncheckedRounds_test.go b/network/rounds/store/uncheckedRounds_test.go index f38ad89a0..f8ae0becc 100644 --- a/network/rounds/store/uncheckedRounds_test.go +++ b/network/rounds/store/uncheckedRounds_test.go @@ -382,7 +382,7 @@ func TestUncheckedRoundStore_Remove(t *testing.T) { source := id.NewIdFromUInt(uint64(removedRound), id.User, t) err = testStore.Remove(removedRound, source, ephId) if err != nil { - t.Errorf("Could not removed round %d from storage: %v", removedRound, err) + t.Errorf("Could not have removed round %d from storage: %v", removedRound, err) } // Check that round was removed @@ -395,6 +395,6 @@ func TestUncheckedRoundStore_Remove(t *testing.T) { unknownRound := id.Round(numRounds + 5) err = testStore.Remove(unknownRound, source, ephId) if err == nil { - t.Errorf("Should not removed round %d which is not in storage", unknownRound) + t.Errorf("Should not have removed round %d which is not in storage", unknownRound) } } diff --git a/network/rounds/unchecked_test.go b/network/rounds/unchecked_test.go index 66d475bcf..4dbb174f3 100644 --- a/network/rounds/unchecked_test.go +++ b/network/rounds/unchecked_test.go @@ -42,8 +42,8 @@ func TestUncheckedRoundScheduler(t *testing.T) { testManager.sender, _ = gateway.NewSender( p, rngGen, testNdf, mockComms, testManager.session, nil) - // Create a local channel so reception is possible (testManager.messageBundles is - // send only via newManager call above) + // Create a local channel so reception is possible + // (testManager.messageBundles is sent only via newManager call above) messageBundleChan := make(chan message.Bundle) testManager.messageBundles = messageBundleChan diff --git a/network/trackResults.go b/network/trackResults.go index 4d7bbb1d6..1f17b2cfe 100644 --- a/network/trackResults.go +++ b/network/trackResults.go @@ -13,8 +13,8 @@ import ( "gitlab.com/elixxir/primitives/states" ) -// Function to follow the results of events. It returns true if the collection of -// events resolved well, and then a count of how many rounds failed and how +// TrackResults follows the results of events. It returns true if the collection +// of events resolved well, and then a count of how many rounds failed and how // many roundEvents timed out. func TrackResults(resultsCh chan ds.EventReturn, numResults int) (bool, int, int) { numTimeOut, numRoundFail := 0, 0 -- GitLab