Skip to content
Snippets Groups Projects
Commit c7ad9796 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Modify stack to not require networking when initializing a client

parent e45a4b96
No related branches found
No related tags found
3 merge requests!510Release,!209Modify network stack to not require network,!207WIP: Client Restructure
......@@ -9,10 +9,11 @@ package api
import (
"encoding/json"
"gitlab.com/xx_network/primitives/netTime"
"math"
"time"
"gitlab.com/xx_network/primitives/netTime"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/auth"
......@@ -230,6 +231,26 @@ func OpenClient(storageDir string, password []byte,
userState: userState,
}
err = c.initComms()
if err != nil {
return nil, err
}
c.network, err = cmix.NewClient(parameters.CMix, c.comms, c.storage,
c.rng, c.events)
if err != nil {
return nil, err
}
user := c.userState.PortableUserInfo()
c.e2e, err = e2e.Load(c.storage.GetKV(), c.network,
user.ReceptionID, c.storage.GetE2EGroup(),
c.rng, c.events)
if err != nil {
return nil, err
}
return c, nil
}
......@@ -292,14 +313,9 @@ func Login(storageDir string, password []byte,
}
u := c.userState.PortableUserInfo()
jww.INFO.Printf("Client Logged in: \n\tTransmisstionID: %s "+
jww.INFO.Printf("Client Logged in: \n\tTransmissionID: %s "+
"\n\tReceptionID: %s", u.TransmissionID, u.ReceptionID)
err = c.initComms()
if err != nil {
return nil, err
}
def := c.storage.GetNDF()
//initialize registration
......@@ -329,17 +345,7 @@ func Login(storageDir string, password []byte,
}
}
c.network, err = cmix.NewClient(parameters.CMix, c.comms, c.storage,
c.storage.GetNDF(), c.rng, c.events)
if err != nil {
return nil, err
}
user := c.userState.PortableUserInfo()
c.e2e, err = e2e.Load(c.storage.GetKV(), c.network,
user.ReceptionID, c.storage.GetE2EGroup(),
c.rng, c.events)
err = c.network.Connect(def)
if err != nil {
return nil, err
}
......@@ -380,11 +386,6 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte,
return nil, err
}
err = c.initComms()
if err != nil {
return nil, err
}
//store the updated base NDF
c.storage.SetNDF(def)
......@@ -399,15 +400,7 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte,
"able to register or track network.")
}
c.network, err = cmix.NewClient(params.CMix, c.comms, c.storage,
c.storage.GetNDF(), c.rng, c.events)
if err != nil {
return nil, err
}
c.e2e, err = e2e.Load(c.storage.GetKV(), c.network,
c.GetUser().ReceptionID, c.storage.GetE2EGroup(),
c.rng, c.events)
err = c.network.Connect(def)
if err != nil {
return nil, err
}
......@@ -453,11 +446,6 @@ func LoginWithProtoClient(storageDir string, password []byte,
return nil, err
}
err = c.initComms()
if err != nil {
return nil, err
}
c.storage.SetNDF(def)
err = c.initPermissioning(def)
......@@ -465,15 +453,7 @@ func LoginWithProtoClient(storageDir string, password []byte,
return nil, err
}
c.network, err = cmix.NewClient(params.CMix, c.comms, c.storage,
c.storage.GetNDF(), c.rng, c.events)
if err != nil {
return nil, err
}
c.e2e, err = e2e.Load(c.storage.GetKV(), c.network,
c.GetUser().ReceptionID, c.storage.GetE2EGroup(),
c.rng, c.events)
err = c.network.Connect(def)
if err != nil {
return nil, err
}
......
......@@ -54,7 +54,7 @@ type client struct {
// Generic RNG for client
rng *fastRNG.StreamGenerator
// Comms pointer to send/receive messages
comms clientCommsInterface
comms *commClient.Comms
// Contains the network instance
instance *commNetwork.Instance
......@@ -90,35 +90,22 @@ type client struct {
// NewClient builds a new reception client object using inputted key fields.
func NewClient(params Params, comms *commClient.Comms, session storage.Session,
ndf *ndf.NetworkDefinition, rng *fastRNG.StreamGenerator,
events event.Reporter) (Client, error) {
// Start network instance
instance, err := commNetwork.NewInstance(
comms.ProtoComms, ndf, nil, nil, commNetwork.None,
params.FastPolling)
if err != nil {
return nil, errors.WithMessage(
err, "failed to create network client")
}
rng *fastRNG.StreamGenerator, events event.Reporter) (Client, error) {
tmpMsg := format.NewMessage(session.GetCmixGroup().GetP().ByteLen())
tracker := uint64(0)
earliest := uint64(0)
addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size
// Create client object
c := &client{
param: params,
tracker: &tracker,
Space: address.NewAddressSpace(addrSize),
events: events,
earliestRound: &earliest,
session: session,
rng: rng,
comms: comms,
instance: instance,
maxMsgLen: tmpMsg.ContentsSize(),
}
......@@ -126,6 +113,23 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
c.verboseRounds = NewRoundTracker()
}
return c, nil
}
func (c *client) Connect(ndf *ndf.NetworkDefinition) error {
// Start network instance
instance, err := commNetwork.NewInstance(
c.comms.ProtoComms, ndf, nil, nil, commNetwork.None,
c.param.FastPolling)
if err != nil {
return errors.WithMessage(
err, "failed to create network client")
}
c.instance = instance
addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size
c.Space = address.NewAddressSpace(addrSize)
/* Set up modules */
nodeChan := make(chan commNetwork.NodeGateway, nodes.InputChanLen)
......@@ -138,30 +142,31 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
// Enable optimized HostPool initialization
poolParams.MaxPings = 50
poolParams.ForceConnection = true
c.Sender, err = gateway.NewSender(
poolParams, rng, ndf, comms, session, nodeChan)
sender, err := gateway.NewSender(poolParams, c.rng, ndf, c.comms,
c.session, nodeChan)
if err != nil {
return nil, err
return err
}
c.Sender = sender
// Set up the node registrar
c.Registrar, err = nodes.LoadRegistrar(
session, c.Sender, c.comms, c.rng, nodeChan)
c.session, c.Sender, c.comms, c.rng, nodeChan)
if err != nil {
return nil, err
return err
}
// Set up the historical rounds handler
c.Retriever = rounds.NewRetriever(
params.Historical, comms, c.Sender, events)
c.param.Historical, c.comms, c.Sender, c.events)
// Set up Message Handler
c.Handler = message.NewHandler(params.Message, c.session.GetKV(), c.events,
c.session.GetReceptionID())
c.Handler = message.NewHandler(c.param.Message, c.session.GetKV(),
c.events, c.session.GetReceptionID())
// Set up round handler
c.Pickup = pickup.NewPickup(
params.Pickup, c.Handler.GetMessageReceptionChannel(), c.Sender,
c.param.Pickup, c.Handler.GetMessageReceptionChannel(), c.Sender,
c.Retriever, c.comms, c.rng, c.instance, c.session)
// Add the identity system
......@@ -169,9 +174,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
// Set up the ability to register with new nodes when they appear
c.instance.SetAddGatewayChan(nodeChan)
// Set up the health monitor
c.Monitor = health.Init(instance, params.NetworkHealthTimeout)
c.Monitor = health.Init(c.instance, c.param.NetworkHealthTimeout)
// Set up critical message tracking (sendCmix only)
critSender := func(msg format.Message, recipient *id.ID, params CMIXParams,
......@@ -181,15 +185,15 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
c.session.GetTransmissionID(), c.comms)
}
c.crit = newCritical(
session.GetKV(), c.Monitor, c.instance.GetRoundEvents(), critSender)
c.crit = newCritical(c.session.GetKV(), c.Monitor,
c.instance.GetRoundEvents(), critSender)
// Report health events
c.AddHealthCallback(func(isHealthy bool) {
c.events.Report(5, "health", "IsHealthy", strconv.FormatBool(isHealthy))
})
return c, nil
return nil
}
// Follow StartRunners kicks off all network reception goroutines ("threads").
......
......@@ -14,9 +14,15 @@ import (
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/ndf"
)
type Client interface {
// Connect turns on network handlers, initializing a host pool and
// network health monitors. This should be called before
// network Follow command is called.
Connect(ndf *ndf.NetworkDefinition) error
// Follow starts the tracking of the network in a new thread.
// Errors that occur are reported on the ClientErrorReport function if
// passed. The returned stoppable can be used to stop the follower.
......
......@@ -38,10 +38,11 @@ type critical struct {
roundEvents roundEventRegistrar
trigger chan bool
send criticalSender
healthcb func(f func(bool)) uint64
}
func newCritical(kv *versioned.KV, hm func(f func(bool)) uint64,
roundEvents roundEventRegistrar, send criticalSender) *critical {
send criticalSender) *critical {
cm, err := NewOrLoadE2eMessageBuffer(kv, e2eCriticalMessagesKey)
if err != nil {
jww.FATAL.Panicf("cannot load the critical messages buffer: "+
......@@ -50,17 +51,19 @@ func newCritical(kv *versioned.KV, hm func(f func(bool)) uint64,
c := &critical{
E2eMessageBuffer: cm,
roundEvents: roundEvents,
trigger: make(chan bool, 100),
send: send,
}
hm(func(healthy bool) { c.trigger <- healthy })
return c
}
func (c *critical) runCriticalMessages(stop *stoppable.Single) {
func (c *critical) runCriticalMessages(stop *stoppable.Single,
roundEvents roundEventRegistrar) {
if c.roundEvents == nil {
c.roundEvents = roundEvents
c.healthcb(func(healthy bool) { c.trigger <- healthy })
}
for {
select {
case <-stop.Quit():
......
......@@ -2,10 +2,11 @@ package e2e
import (
"encoding/json"
"gitlab.com/xx_network/primitives/netTime"
"strings"
"time"
"gitlab.com/xx_network/primitives/netTime"
"github.com/pkg/errors"
"gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/cmix"
......@@ -144,8 +145,7 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID,
"Failed to unmarshal rekeyParams data")
}
m.crit = newCritical(kv, net.AddHealthCallback,
net.GetInstance().GetRoundEvents(), m.SendE2E)
m.crit = newCritical(kv, net.AddHealthCallback, m.SendE2E)
return m, nil
}
......@@ -155,7 +155,8 @@ func (m *manager) StartProcesses() (stoppable.Stoppable, error) {
critcalNetworkStopper := stoppable.NewSingle(
"e2eCriticalMessagesStopper")
m.crit.runCriticalMessages(critcalNetworkStopper)
m.crit.runCriticalMessages(critcalNetworkStopper,
m.net.GetInstance().GetRoundEvents())
multi.Add(critcalNetworkStopper)
rekeySendFunc := func(mt catalog.MessageType,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment