Select Git revision
manager.go 4.96 KiB
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 Privategrity Corporation /
// /
// All rights reserved. /
////////////////////////////////////////////////////////////////////////////////
package network
// manager.go controls access to network resources. Interprocess communications
// and intraclient state are accessible through the context object.
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/network/health"
"gitlab.com/elixxir/client/network/internal"
"gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/network/node"
"gitlab.com/elixxir/client/network/rounds"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/switchboard"
"gitlab.com/elixxir/comms/client"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/primitives/ndf"
"sync/atomic"
"time"
)
// Manager implements the NetworkManager interface inside context. It
// controls access to network resources and implements all of the communications
// functions used by the client.
type manager struct {
// parameters of the network
param params.Network
//Shared data with all sub managers
internal.Internal
//sub-managers
round *rounds.Manager
message *message.Manager
//atomic denotes if the network is running
running *uint32
}
// NewManager builds a new reception manager object using inputted key fields
func NewManager(session *storage.Session, switchboard *switchboard.Switchboard,
rng *fastRNG.StreamGenerator, comms *client.Comms,
params params.Network, ndf *ndf.NetworkDefinition) (interfaces.NetworkManager, error) {
//start network instance
instance, err := network.NewInstance(comms.ProtoComms, ndf, nil, nil)
if err != nil {
return nil, errors.WithMessage(err, "failed to create"+
" client network manager")
}
running := uint32(0)
//create manager object
m := manager{
param: params,
running: &running,
}
m.Internal = internal.Internal{
Session: session,
Switchboard: switchboard,
Rng: rng,
Comms: comms,
Health: health.Init(instance, 5*time.Second),
NodeRegistration: make(chan network.NodeGateway, params.RegNodesBufferLen),
Instance: instance,
Uid: session.User().GetCryptographicIdentity().GetUserID(),
}
//create sub managers
m.message = message.NewManager(m.Internal, m.param.Messages, m.NodeRegistration)
m.round = rounds.NewManager(m.Internal, m.param.Rounds, m.message.GetMessageReceptionChannel())
return &m, nil
}
// StartRunners kicks off all network reception goroutines ("threads").
// Started Threads are:
// - Network Follower (/network/follow.go)
// - Historical Round Retrieval (/network/rounds/historical.go)
// - Message Retrieval Worker Group (/network/rounds/retrieve.go)
// - Message Handling Worker Group (/network/message/handle.go)
// - Health Tracker (/network/health)
// - Garbled Messages (/network/message/garbled.go)
// - Critical Messages (/network/message/critical.go)
func (m *manager) Follow() (stoppable.Stoppable, error) {
if !atomic.CompareAndSwapUint32(m.running, 0, 1) {
return nil, errors.Errorf("network routines are already running")
}
multi := stoppable.NewMulti("networkManager")
// health tracker
healthStop, err := m.Health.Start()
if err != nil {
return nil, errors.Errorf("failed to follow")
}
multi.Add(healthStop)
// Node Updates
multi.Add(node.StartRegistration(m.Instance, m.Session, m.Rng,
m.Comms, m.NodeRegistration)) // Adding/Keys
//TODO-remover
//m.runners.Add(StartNodeRemover(m.Context)) // Removing
// Start the Network Tracker
trackNetworkStopper := stoppable.NewSingle("TrackNetwork")
go m.followNetwork(trackNetworkStopper.Quit())
multi.Add(trackNetworkStopper)
// Message reception
multi.Add(m.message.StartProcessies())
// Round processing
multi.Add(m.round.StartProcessors())
//set the running status back to 0 so it can be started again
closer := stoppable.NewCleanup(multi, func(time.Duration) error {
if !atomic.CompareAndSwapUint32(m.running, 1, 0) {
return errors.Errorf("network routines are already stopped")
}
return nil
})
return closer, nil
}
// GetHealthTracker returns the health tracker
func (m *manager) GetHealthTracker() interfaces.HealthTracker {
return m.Health
}
// GetInstance returns the network instance object (ndf state)
func (m *manager) GetInstance() *network.Instance {
return m.Instance
}
// triggers a check on garbled messages to see if they can be decrypted
// this should be done when a new e2e client is added in case messages were
// received early or arrived out of order
func (m *manager) CheckGarbledMessages() {
m.message.CheckGarbledMessages()
}