diff --git a/api/client.go b/api/client.go index 7145b1f802768a86e376aa9090175754322265b8..97241cf2919891313a25849335efde607e4272b0 100644 --- a/api/client.go +++ b/api/client.go @@ -9,10 +9,11 @@ package api import ( "encoding/json" - "gitlab.com/xx_network/primitives/netTime" "math" "time" + "gitlab.com/xx_network/primitives/netTime" + "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/auth" @@ -230,6 +231,26 @@ func OpenClient(storageDir string, password []byte, userState: userState, } + err = c.initComms() + if err != nil { + return nil, err + } + + c.network, err = cmix.NewClient(parameters.CMix, c.comms, c.storage, + c.rng, c.events) + if err != nil { + return nil, err + } + + user := c.userState.PortableUserInfo() + + c.e2e, err = e2e.Load(c.storage.GetKV(), c.network, + user.ReceptionID, c.storage.GetE2EGroup(), + c.rng, c.events) + if err != nil { + return nil, err + } + return c, nil } @@ -292,14 +313,9 @@ func Login(storageDir string, password []byte, } u := c.userState.PortableUserInfo() - jww.INFO.Printf("Client Logged in: \n\tTransmisstionID: %s "+ + jww.INFO.Printf("Client Logged in: \n\tTransmissionID: %s "+ "\n\tReceptionID: %s", u.TransmissionID, u.ReceptionID) - err = c.initComms() - if err != nil { - return nil, err - } - def := c.storage.GetNDF() //initialize registration @@ -329,17 +345,7 @@ func Login(storageDir string, password []byte, } } - c.network, err = cmix.NewClient(parameters.CMix, c.comms, c.storage, - c.storage.GetNDF(), c.rng, c.events) - if err != nil { - return nil, err - } - - user := c.userState.PortableUserInfo() - - c.e2e, err = e2e.Load(c.storage.GetKV(), c.network, - user.ReceptionID, c.storage.GetE2EGroup(), - c.rng, c.events) + err = c.network.Connect(def) if err != nil { return nil, err } @@ -380,11 +386,6 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, return nil, err } - err = c.initComms() - if err != nil { - return nil, err - } - //store the updated base NDF c.storage.SetNDF(def) @@ -399,15 +400,7 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, "able to register or track network.") } - c.network, err = cmix.NewClient(params.CMix, c.comms, c.storage, - c.storage.GetNDF(), c.rng, c.events) - if err != nil { - return nil, err - } - - c.e2e, err = e2e.Load(c.storage.GetKV(), c.network, - c.GetUser().ReceptionID, c.storage.GetE2EGroup(), - c.rng, c.events) + err = c.network.Connect(def) if err != nil { return nil, err } @@ -453,11 +446,6 @@ func LoginWithProtoClient(storageDir string, password []byte, return nil, err } - err = c.initComms() - if err != nil { - return nil, err - } - c.storage.SetNDF(def) err = c.initPermissioning(def) @@ -465,15 +453,7 @@ func LoginWithProtoClient(storageDir string, password []byte, return nil, err } - c.network, err = cmix.NewClient(params.CMix, c.comms, c.storage, - c.storage.GetNDF(), c.rng, c.events) - if err != nil { - return nil, err - } - - c.e2e, err = e2e.Load(c.storage.GetKV(), c.network, - c.GetUser().ReceptionID, c.storage.GetE2EGroup(), - c.rng, c.events) + err = c.network.Connect(def) if err != nil { return nil, err } diff --git a/cmix/client.go b/cmix/client.go index 402a5660bd452f8ac19f6532e5414f9d33cb8981..d26045a1786a5f900b676f2e8a7aa593f72445eb 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -54,7 +54,7 @@ type client struct { // Generic RNG for client rng *fastRNG.StreamGenerator // Comms pointer to send/receive messages - comms clientCommsInterface + comms *commClient.Comms // Contains the network instance instance *commNetwork.Instance @@ -90,35 +90,22 @@ type client struct { // NewClient builds a new reception client object using inputted key fields. func NewClient(params Params, comms *commClient.Comms, session storage.Session, - ndf *ndf.NetworkDefinition, rng *fastRNG.StreamGenerator, - events event.Reporter) (Client, error) { - - // Start network instance - instance, err := commNetwork.NewInstance( - comms.ProtoComms, ndf, nil, nil, commNetwork.None, - params.FastPolling) - if err != nil { - return nil, errors.WithMessage( - err, "failed to create network client") - } + rng *fastRNG.StreamGenerator, events event.Reporter) (Client, error) { tmpMsg := format.NewMessage(session.GetCmixGroup().GetP().ByteLen()) tracker := uint64(0) earliest := uint64(0) - addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size // Create client object c := &client{ param: params, tracker: &tracker, - Space: address.NewAddressSpace(addrSize), events: events, earliestRound: &earliest, session: session, rng: rng, comms: comms, - instance: instance, maxMsgLen: tmpMsg.ContentsSize(), } @@ -126,6 +113,23 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, c.verboseRounds = NewRoundTracker() } + return c, nil +} + +func (c *client) Connect(ndf *ndf.NetworkDefinition) error { + // Start network instance + instance, err := commNetwork.NewInstance( + c.comms.ProtoComms, ndf, nil, nil, commNetwork.None, + c.param.FastPolling) + if err != nil { + return errors.WithMessage( + err, "failed to create network client") + } + c.instance = instance + + addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size + c.Space = address.NewAddressSpace(addrSize) + /* Set up modules */ nodeChan := make(chan commNetwork.NodeGateway, nodes.InputChanLen) @@ -138,30 +142,31 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, // Enable optimized HostPool initialization poolParams.MaxPings = 50 poolParams.ForceConnection = true - c.Sender, err = gateway.NewSender( - poolParams, rng, ndf, comms, session, nodeChan) + sender, err := gateway.NewSender(poolParams, c.rng, ndf, c.comms, + c.session, nodeChan) if err != nil { - return nil, err + return err } + c.Sender = sender // Set up the node registrar c.Registrar, err = nodes.LoadRegistrar( - session, c.Sender, c.comms, c.rng, nodeChan) + c.session, c.Sender, c.comms, c.rng, nodeChan) if err != nil { - return nil, err + return err } // Set up the historical rounds handler c.Retriever = rounds.NewRetriever( - params.Historical, comms, c.Sender, events) + c.param.Historical, c.comms, c.Sender, c.events) // Set up Message Handler - c.Handler = message.NewHandler(params.Message, c.session.GetKV(), c.events, - c.session.GetReceptionID()) + c.Handler = message.NewHandler(c.param.Message, c.session.GetKV(), + c.events, c.session.GetReceptionID()) // Set up round handler c.Pickup = pickup.NewPickup( - params.Pickup, c.Handler.GetMessageReceptionChannel(), c.Sender, + c.param.Pickup, c.Handler.GetMessageReceptionChannel(), c.Sender, c.Retriever, c.comms, c.rng, c.instance, c.session) // Add the identity system @@ -169,9 +174,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, // Set up the ability to register with new nodes when they appear c.instance.SetAddGatewayChan(nodeChan) - // Set up the health monitor - c.Monitor = health.Init(instance, params.NetworkHealthTimeout) + c.Monitor = health.Init(c.instance, c.param.NetworkHealthTimeout) // Set up critical message tracking (sendCmix only) critSender := func(msg format.Message, recipient *id.ID, params CMIXParams, @@ -181,15 +185,15 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, c.session.GetTransmissionID(), c.comms) } - c.crit = newCritical( - session.GetKV(), c.Monitor, c.instance.GetRoundEvents(), critSender) + c.crit = newCritical(c.session.GetKV(), c.Monitor, + c.instance.GetRoundEvents(), critSender) // Report health events c.AddHealthCallback(func(isHealthy bool) { c.events.Report(5, "health", "IsHealthy", strconv.FormatBool(isHealthy)) }) - return c, nil + return nil } // Follow StartRunners kicks off all network reception goroutines ("threads"). diff --git a/cmix/interface.go b/cmix/interface.go index a5e540c23b7e1d54265dcc1bb03bc578be79bd9b..c03dc1296249a8174916e87f7de2836bd1c7d39f 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -14,9 +14,15 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" + "gitlab.com/xx_network/primitives/ndf" ) type Client interface { + // Connect turns on network handlers, initializing a host pool and + // network health monitors. This should be called before + // network Follow command is called. + Connect(ndf *ndf.NetworkDefinition) error + // Follow starts the tracking of the network in a new thread. // Errors that occur are reported on the ClientErrorReport function if // passed. The returned stoppable can be used to stop the follower. diff --git a/e2e/critical.go b/e2e/critical.go index f6ef8912426a589aa0f06b57a77b0ecc0a7da50c..a605775419c8e2181e3d22eb3f984fed7a05bf3e 100644 --- a/e2e/critical.go +++ b/e2e/critical.go @@ -38,10 +38,11 @@ type critical struct { roundEvents roundEventRegistrar trigger chan bool send criticalSender + healthcb func(f func(bool)) uint64 } func newCritical(kv *versioned.KV, hm func(f func(bool)) uint64, - roundEvents roundEventRegistrar, send criticalSender) *critical { + send criticalSender) *critical { cm, err := NewOrLoadE2eMessageBuffer(kv, e2eCriticalMessagesKey) if err != nil { jww.FATAL.Panicf("cannot load the critical messages buffer: "+ @@ -50,17 +51,19 @@ func newCritical(kv *versioned.KV, hm func(f func(bool)) uint64, c := &critical{ E2eMessageBuffer: cm, - roundEvents: roundEvents, trigger: make(chan bool, 100), send: send, } - hm(func(healthy bool) { c.trigger <- healthy }) - return c } -func (c *critical) runCriticalMessages(stop *stoppable.Single) { +func (c *critical) runCriticalMessages(stop *stoppable.Single, + roundEvents roundEventRegistrar) { + if c.roundEvents == nil { + c.roundEvents = roundEvents + c.healthcb(func(healthy bool) { c.trigger <- healthy }) + } for { select { case <-stop.Quit(): diff --git a/e2e/manager.go b/e2e/manager.go index de6f09d7812f87447ffea943d9982da08dd767d8..73eeb111abdcc7db9b360e7e06065e5f7bce4a49 100644 --- a/e2e/manager.go +++ b/e2e/manager.go @@ -2,10 +2,11 @@ package e2e import ( "encoding/json" - "gitlab.com/xx_network/primitives/netTime" "strings" "time" + "gitlab.com/xx_network/primitives/netTime" + "github.com/pkg/errors" "gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/cmix" @@ -144,8 +145,7 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID, "Failed to unmarshal rekeyParams data") } - m.crit = newCritical(kv, net.AddHealthCallback, - net.GetInstance().GetRoundEvents(), m.SendE2E) + m.crit = newCritical(kv, net.AddHealthCallback, m.SendE2E) return m, nil } @@ -155,7 +155,8 @@ func (m *manager) StartProcesses() (stoppable.Stoppable, error) { critcalNetworkStopper := stoppable.NewSingle( "e2eCriticalMessagesStopper") - m.crit.runCriticalMessages(critcalNetworkStopper) + m.crit.runCriticalMessages(critcalNetworkStopper, + m.net.GetInstance().GetRoundEvents()) multi.Add(critcalNetworkStopper) rekeySendFunc := func(mt catalog.MessageType,