Skip to content
Snippets Groups Projects
Select Git revision
  • a6b49087016f578ef8dc53fd741ec3ea5b43e114
  • release default protected
  • 11-22-implement-kv-interface-defined-in-collectiveversionedkvgo
  • hotfix/TestHostPool_UpdateNdf_AddFilter
  • XX-4719/announcementChannels
  • xx-4717/logLevel
  • jonah/noob-channel
  • master protected
  • XX-4707/tagDiskJson
  • xx-4698/notification-retry
  • hotfix/notifylockup
  • syncNodes
  • hotfix/localCB
  • XX-4677/NewChanManagerMobile
  • XX-4689/DmSync
  • duplicatePrefix
  • XX-4601/HavenInvites
  • finalizedUICallbacks
  • XX-4673/AdminKeySync
  • debugNotifID
  • anne/test
  • v4.7.5
  • v4.7.4
  • v4.7.3
  • v4.7.2
  • v4.7.1
  • v4.6.3
  • v4.6.1
  • v4.5.0
  • v4.4.4
  • v4.3.11
  • v4.3.8
  • v4.3.7
  • v4.3.6
  • v4.3.5
  • v4.2.0
  • v4.3.0
  • v4.3.4
  • v4.3.3
  • v4.3.2
  • v4.3.1
41 results

trackResults.go

Blame
  • manager.go 7.77 KiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    
    package network
    
    // tracker.go controls access to network resources. Interprocess communications
    // and intraclient state are accessible through the context object.
    
    import (
    	"fmt"
    	"github.com/pkg/errors"
    	"gitlab.com/elixxir/client/event"
    	"gitlab.com/elixxir/client/network/address"
    	"gitlab.com/elixxir/client/network/gateway"
    	"gitlab.com/elixxir/client/network/health"
    	"gitlab.com/elixxir/client/network/historical"
    	"gitlab.com/elixxir/client/network/identity"
    	"gitlab.com/elixxir/client/network/message"
    	"gitlab.com/elixxir/client/network/nodes"
    	"gitlab.com/elixxir/client/network/rounds"
    	"gitlab.com/elixxir/client/stoppable"
    	"gitlab.com/elixxir/client/storage"
    	"gitlab.com/elixxir/comms/client"
    	commNetwork "gitlab.com/elixxir/comms/network"
    	"gitlab.com/elixxir/crypto/fastRNG"
    	"gitlab.com/elixxir/primitives/format"
    	"gitlab.com/xx_network/primitives/id"
    	"gitlab.com/xx_network/primitives/id/ephemeral"
    	"gitlab.com/xx_network/primitives/ndf"
    	"math"
    	"sync/atomic"
    	"time"
    )
    
    // fakeIdentityRange indicates the range generated between
    // 0 (most current) and fakeIdentityRange rounds behind the earliest known
    // round that will be used as the earliest round when polling with a
    // fake identity.
    const fakeIdentityRange = 800
    
    // manager implements the NetworkManager interface inside context. It
    // controls access to network resources and implements all the communications
    // functions used by the client.
    // CRITICAL: Manager must be private. It embeds sub moduals which
    // export functions for it, but not for public consumption. By being private
    // and returning ass the public interface, these can be kept private.
    type manager struct {
    	//User Identity Storage
    	session storage.Session
    	//generic RNG for client
    	rng *fastRNG.StreamGenerator
    	// comms pointer to send/recv messages
    	comms *client.Comms
    	//contains the network instance
    	instance *commNetwork.Instance
    
    	// parameters of the network
    	param Params
    
    	//sub-managers
    	gateway.Sender
    	message.Handler
    	nodes.Registrar
    	historical.Retriever
    	rounds.Pickup
    	address.Space
    	identity.Tracker
    	health.Monitor
    	crit *critical
    
    	// Earliest tracked round
    	earliestRound *uint64
    
    	//number of polls done in a period of time
    	tracker       *uint64
    	latencySum    uint64
    	numLatencies  uint64
    	verboseRounds *RoundTracker
    
    	// Event reporting api
    	events event.Manager
    
    	//storage of the max message length
    	maxMsgLen int
    }
    
    // NewManager builds a new reception manager object using inputted key fields
    func NewManager(params Params, comms *client.Comms, session storage.Session,
    	ndf *ndf.NetworkDefinition, rng *fastRNG.StreamGenerator, events event.Manager,
    ) (Manager, error) {
    
    	//start network instance
    	instance, err := commNetwork.NewInstance(comms.ProtoComms, ndf, nil, nil, commNetwork.None, params.FastPolling)
    	if err != nil {
    		return nil, errors.WithMessage(err, "failed to create"+
    			" client network manager")
    	}
    
    	tmpMsg := format.NewMessage(session.GetCmixGroup().GetP().ByteLen())
    
    	tracker := uint64(0)
    	earliest := uint64(0)
    	// create manager object
    	m := &manager{
    		param:         params,
    		tracker:       &tracker,
    		Space:         address.NewAddressSpace(),
    		events:        events,
    		earliestRound: &earliest,
    		session:       session,
    		rng:           rng,
    		comms:         comms,
    		instance:      instance,
    		maxMsgLen:     tmpMsg.ContentsSize(),
    	}
    	m.UpdateAddressSpace(18)
    
    	if params.VerboseRoundTracking {
    		m.verboseRounds = NewRoundTracker()
    	}
    
    	/* set up modules */
    	nodechan := make(chan commNetwork.NodeGateway, nodes.InputChanLen)
    
    	// Set up gateway.Sender
    	poolParams := gateway.DefaultPoolParams()
    	// Client will not send KeepAlive packets
    	poolParams.HostParams.KaClientOpts.Time = time.Duration(math.MaxInt64)
    	// Enable optimized HostPool initialization
    	poolParams.MaxPings = 50
    	poolParams.ForceConnection = true
    	m.Sender, err = gateway.NewSender(poolParams, rng,
    		ndf, comms, session, nodechan)
    	if err != nil {
    		return nil, err
    	}
    
    	//setup the node registrar
    	m.Registrar, err = nodes.LoadRegistrar(session, m.Sender, m.comms, m.rng, nodechan)
    	if err != nil {
    		return nil, err
    	}
    
    	//setup the historical rounds handler
    	m.Retriever = historical.NewRetriever(params.Historical, comms, m.Sender, events)
    
    	//Set up Message Handler
    	m.Handler = message.NewHandler(params.Message, m.session.GetKV(), m.events)
    
    	//set up round handler
    	m.Pickup = rounds.NewPickup(params.Rounds, m.Handler.GetMessageReceptionChannel(),
    		m.Sender, m.Retriever, m.rng, m.instance, m.session)
    
    	//add the identity system
    	m.Tracker = identity.NewOrLoadTracker(m.session, m.Space)
    
    	// Set upthe ability to register with new nodes when they appear
    	m.instance.SetAddGatewayChan(nodechan)
    
    	// set up the health monitor
    	m.Monitor = health.Init(instance, params.NetworkHealthTimeout)
    
    	//set up critical message tracking (sendCmix only)
    	critSender := func(msg format.Message, recipient *id.ID,
    		params CMIXParams) (id.Round, ephemeral.Id, error) {
    		return sendCmixHelper(m.Sender, msg, recipient, params, m.instance,
    			m.session.GetCmixGroup(), m.Registrar, m.rng, m.events,
    			m.session.GetTransmissionID(), m.comms)
    	}
    
    	m.crit = newCritical(session.GetKV(), m.Monitor,
    		m.instance.GetRoundEvents(), critSender)
    
    	// Report health events
    	m.Monitor.AddHealthCallback(func(isHealthy bool) {
    		m.events.Report(5, "health", "IsHealthy",
    			fmt.Sprintf("%v", isHealthy))
    	})
    
    	return m, nil
    }
    
    // Follow StartRunners kicks off all network reception goroutines ("threads").
    // Started Threads are:
    //   - Network Follower (/network/follow.go)
    //   - Historical Round Retrieval (/network/rounds/historical.go)
    //	 - Message Retrieval Worker Group (/network/rounds/retrieve.go)
    //	 - Message Handling Worker Group (/network/message/handle.go)
    //	 - health tracker (/network/health)
    //	 - Garbled Messages (/network/message/inProgress.go)
    //	 - Critical Messages (/network/message/critical.go)
    //   - Ephemeral ID tracking (network/address/tracker.go)
    func (m *manager) Follow(report ClientErrorReport) (stoppable.Stoppable, error) {
    	multi := stoppable.NewMulti("networkManager")
    
    	// health tracker
    	healthStop, err := m.Monitor.StartProcessies()
    	if err != nil {
    		return nil, errors.Errorf("failed to follow")
    	}
    	multi.Add(healthStop)
    
    	// Node Updates
    	multi.Add(m.Registrar.StartProcesses(m.param.ParallelNodeRegistrations)) // Adding/MixCypher
    	//TODO-node remover
    
    	// Start the Network tracker
    	followNetworkStopper := stoppable.NewSingle("FollowNetwork")
    	go m.followNetwork(report, followNetworkStopper)
    	multi.Add(followNetworkStopper)
    
    	// Message reception
    	multi.Add(m.Handler.StartProcesses())
    
    	// Round processing
    	multi.Add(m.Pickup.StartProcessors())
    
    	// Historical rounds processing
    	multi.Add(m.Retriever.StartProcessies())
    
    	//start the processies for the identity handler
    	multi.Add(m.Tracker.StartProcessies())
    
    	return multi, nil
    }
    
    // GetInstance returns the network instance object (ndf state)
    func (m *manager) GetInstance() *commNetwork.Instance {
    	return m.instance
    }
    
    // GetVerboseRounds returns verbose round information
    func (m *manager) GetVerboseRounds() string {
    	if m.verboseRounds == nil {
    		return "Verbose Round tracking not enabled"
    	}
    	return m.verboseRounds.String()
    }
    
    func (m *manager) SetFakeEarliestRound(rnd id.Round) {
    	atomic.StoreUint64(m.earliestRound, uint64(rnd))
    }
    
    // GetMaxMessageLength returns the maximum length of a cmix message
    func (m *manager) GetMaxMessageLength() int {
    	return m.maxMsgLen
    }