Skip to content
Snippets Groups Projects
Select Git revision
  • d8f444448ea18911594507349d50c1e8c56e226c
  • release default
  • master protected
  • feature/xx-4717/logLevel
  • XX-4626/SingleUsePackage
  • josh/DmPackage
  • xx-4437/no-registration
  • feature/project/DM
  • project/channels
  • feature/ctidh
  • Jakub/rootless-CI
  • jono/wasmDemo
  • feature/XX-4108/updateProtoc
  • feature/hotfix/unsafe_send_to_self
  • Anne/OldSessionTesting
  • hotfix/groupChat
  • josh/groupCreationScript
  • feature/XX-3797/restore
  • feature/XX-3789/DeleteIndividualRequests
  • dev
  • feature/debugSendCMIX
21 results

README.md

Blame
  • manager.go 5.85 KiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    
    package network
    
    // tracker.go controls access to network resources. Interprocess communications
    // and intraclient state are accessible through the context object.
    
    import (
    	"github.com/pkg/errors"
    	"gitlab.com/elixxir/client/interfaces"
    	"gitlab.com/elixxir/client/interfaces/params"
    	"gitlab.com/elixxir/client/network/ephemeral"
    	"gitlab.com/elixxir/client/network/health"
    	"gitlab.com/elixxir/client/network/internal"
    	"gitlab.com/elixxir/client/network/message"
    	"gitlab.com/elixxir/client/network/node"
    	"gitlab.com/elixxir/client/network/rounds"
    	"gitlab.com/elixxir/client/stoppable"
    	"gitlab.com/elixxir/client/storage"
    	"gitlab.com/elixxir/client/switchboard"
    	"gitlab.com/elixxir/comms/client"
    	"gitlab.com/elixxir/comms/network"
    	"gitlab.com/elixxir/crypto/fastRNG"
    	"gitlab.com/xx_network/primitives/ndf"
    	"sync/atomic"
    
    	"time"
    )
    
    // Manager implements the NetworkManager interface inside context. It
    // controls access to network resources and implements all of the communications
    // functions used by the client.
    type manager struct {
    	// parameters of the network
    	param params.Network
    
    	//Shared data with all sub managers
    	internal.Internal
    
    	//sub-managers
    	round   *rounds.Manager
    	message *message.Manager
    	//atomic denotes if the network is running
    	running *uint32
    }
    
    // NewManager builds a new reception manager object using inputted key fields
    func NewManager(session *storage.Session, switchboard *switchboard.Switchboard,
    	rng *fastRNG.StreamGenerator, comms *client.Comms,
    	params params.Network, ndf *ndf.NetworkDefinition) (interfaces.NetworkManager, error) {
    
    	//start network instance
    	instance, err := network.NewInstance(comms.ProtoComms, ndf, nil, nil)
    	if err != nil {
    		return nil, errors.WithMessage(err, "failed to create"+
    			" client network manager")
    	}
    
    	running := uint32(0)
    
    	//create manager object
    	m := manager{
    		param:   params,
    		running: &running,
    	}
    
    	m.Internal = internal.Internal{
    		Session:          session,
    		Switchboard:      switchboard,
    		Rng:              rng,
    		Comms:            comms,
    		Health:           health.Init(instance, params.NetworkHealthTimeout),
    		NodeRegistration: make(chan network.NodeGateway, params.RegNodesBufferLen),
    		Instance:         instance,
    		Uid:              session.User().GetCryptographicIdentity().GetTransmissionID(),
    	}
    
    	//create sub managers
    	m.message = message.NewManager(m.Internal, m.param.Messages, m.NodeRegistration)
    	m.round = rounds.NewManager(m.Internal, m.param.Rounds, m.message.GetMessageReceptionChannel())
    
    	return &m, nil
    }
    
    // StartRunners kicks off all network reception goroutines ("threads").
    // Started Threads are:
    //   - Network Follower (/network/follow.go)
    //   - Historical Round Retrieval (/network/rounds/historical.go)
    //	 - Message Retrieval Worker Group (/network/rounds/retrieve.go)
    //	 - Message Handling Worker Group (/network/message/handle.go)
    //	 - Health Tracker (/network/health)
    //	 - Garbled Messages (/network/message/garbled.go)
    //	 - Critical Messages (/network/message/critical.go)
    //   - Ephemeral ID tracking (network/ephemeral/tracker.go)
    func (m *manager) Follow() (stoppable.Stoppable, error) {
    	if !atomic.CompareAndSwapUint32(m.running, 0, 1) {
    		return nil, errors.Errorf("network routines are already running")
    	}
    
    	multi := stoppable.NewMulti("networkManager")
    
    	// health tracker
    	healthStop, err := m.Health.Start()
    	if err != nil {
    		return nil, errors.Errorf("failed to follow")
    	}
    	multi.Add(healthStop)
    
    	// Node Updates
    	multi.Add(node.StartRegistration(m.Instance, m.Session, m.Rng,
    		m.Comms, m.NodeRegistration)) // Adding/Keys
    	//TODO-remover
    	//m.runners.Add(StartNodeRemover(m.Context))        // Removing
    
    	// Start the Network Tracker
    	trackNetworkStopper := stoppable.NewSingle("TrackNetwork")
    	go m.followNetwork(trackNetworkStopper.Quit())
    	multi.Add(trackNetworkStopper)
    
    	// Message reception
    	multi.Add(m.message.StartProcessies())
    
    	// Round processing
    	multi.Add(m.round.StartProcessors())
    
    	// Ephemeral ID tracking
    	err = checkTimestampStore(m.Session)
    	if err != nil {
    		return nil, errors.Errorf("Could not store timestamp " +
    			"for ephemeral ID tracking: %v", err)
    	}
    
    	multi.Add(ephemeral.Track(m.Session, m.Instance, m.Comms.Id))
    
    	//set the running status back to 0 so it can be started again
    	closer := stoppable.NewCleanup(multi, func(time.Duration) error {
    		if !atomic.CompareAndSwapUint32(m.running, 1, 0) {
    			return errors.Errorf("network routines are already stopped")
    		}
    		return nil
    	})
    
    	return closer, nil
    }
    
    // Sanitation check of timestamp store. If a value has not been stored yet
    // then the current time is stored
    func checkTimestampStore(session *storage.Session) error {
    	if _, err := session.Get(ephemeral.TimestampKey); err != nil {
    		now, err := ephemeral.MarshalTimestamp(time.Now())
    		if err != nil {
    			return errors.Errorf("Could not marshal new timestamp for storage: %v", err)
    		}
    		return session.Set(ephemeral.TimestampKey, now)
    	}
    
    	return nil
    }
    
    // GetHealthTracker returns the health tracker
    func (m *manager) GetHealthTracker() interfaces.HealthTracker {
    	return m.Health
    }
    
    // GetInstance returns the network instance object (ndf state)
    func (m *manager) GetInstance() *network.Instance {
    	return m.Instance
    }
    
    // triggers a check on garbled messages to see if they can be decrypted
    // this should be done when a new e2e client is added in case messages were
    // received early or arrived out of order
    func (m *manager) CheckGarbledMessages() {
    	m.message.CheckGarbledMessages()
    }