diff --git a/api/client.go b/api/client.go index 7145b1f802768a86e376aa9090175754322265b8..97241cf2919891313a25849335efde607e4272b0 100644 --- a/api/client.go +++ b/api/client.go @@ -9,10 +9,11 @@ package api import ( "encoding/json" - "gitlab.com/xx_network/primitives/netTime" "math" "time" + "gitlab.com/xx_network/primitives/netTime" + "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/auth" @@ -230,6 +231,26 @@ func OpenClient(storageDir string, password []byte, userState: userState, } + err = c.initComms() + if err != nil { + return nil, err + } + + c.network, err = cmix.NewClient(parameters.CMix, c.comms, c.storage, + c.rng, c.events) + if err != nil { + return nil, err + } + + user := c.userState.PortableUserInfo() + + c.e2e, err = e2e.Load(c.storage.GetKV(), c.network, + user.ReceptionID, c.storage.GetE2EGroup(), + c.rng, c.events) + if err != nil { + return nil, err + } + return c, nil } @@ -292,14 +313,9 @@ func Login(storageDir string, password []byte, } u := c.userState.PortableUserInfo() - jww.INFO.Printf("Client Logged in: \n\tTransmisstionID: %s "+ + jww.INFO.Printf("Client Logged in: \n\tTransmissionID: %s "+ "\n\tReceptionID: %s", u.TransmissionID, u.ReceptionID) - err = c.initComms() - if err != nil { - return nil, err - } - def := c.storage.GetNDF() //initialize registration @@ -329,17 +345,7 @@ func Login(storageDir string, password []byte, } } - c.network, err = cmix.NewClient(parameters.CMix, c.comms, c.storage, - c.storage.GetNDF(), c.rng, c.events) - if err != nil { - return nil, err - } - - user := c.userState.PortableUserInfo() - - c.e2e, err = e2e.Load(c.storage.GetKV(), c.network, - user.ReceptionID, c.storage.GetE2EGroup(), - c.rng, c.events) + err = c.network.Connect(def) if err != nil { return nil, err } @@ -380,11 +386,6 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, return nil, err } - err = c.initComms() - if err != nil { - return nil, err - } - //store the updated base NDF c.storage.SetNDF(def) @@ -399,15 +400,7 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, "able to register or track network.") } - c.network, err = cmix.NewClient(params.CMix, c.comms, c.storage, - c.storage.GetNDF(), c.rng, c.events) - if err != nil { - return nil, err - } - - c.e2e, err = e2e.Load(c.storage.GetKV(), c.network, - c.GetUser().ReceptionID, c.storage.GetE2EGroup(), - c.rng, c.events) + err = c.network.Connect(def) if err != nil { return nil, err } @@ -453,11 +446,6 @@ func LoginWithProtoClient(storageDir string, password []byte, return nil, err } - err = c.initComms() - if err != nil { - return nil, err - } - c.storage.SetNDF(def) err = c.initPermissioning(def) @@ -465,15 +453,7 @@ func LoginWithProtoClient(storageDir string, password []byte, return nil, err } - c.network, err = cmix.NewClient(params.CMix, c.comms, c.storage, - c.storage.GetNDF(), c.rng, c.events) - if err != nil { - return nil, err - } - - c.e2e, err = e2e.Load(c.storage.GetKV(), c.network, - c.GetUser().ReceptionID, c.storage.GetE2EGroup(), - c.rng, c.events) + err = c.network.Connect(def) if err != nil { return nil, err } diff --git a/api/precan.go b/api/precan.go index 8b084f9124a934153a0d62e208384ef1c06fd048..f85210df17ac5f37845201cedb1b68430f6b12e3 100644 --- a/api/precan.go +++ b/api/precan.go @@ -16,6 +16,7 @@ import ( "gitlab.com/elixxir/client/e2e/ratchet/partner/session" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage/user" + util "gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/crypto/contact" "gitlab.com/elixxir/crypto/cyclic" "gitlab.com/elixxir/crypto/fastRNG" diff --git a/cmd/single.go b/cmd/single.go index c2cd177f2c01987aa03d308f2fa96dce32946e06..55f33b26b8328c9381a1c2a88fc37cd454c585e1 100644 --- a/cmd/single.go +++ b/cmd/single.go @@ -99,7 +99,7 @@ var singleCmd = &cobra.Command{ // If the reply flag is set, then start waiting for a // message and reply when it is received if viper.GetBool("reply") { - replySingleUse(client, timeout, receiver) + replySingleUse(timeout, receiver) } listener.Stop() }, @@ -172,6 +172,7 @@ func sendSingleUse(m *api.Client, partner contact.Contact, payload []byte, jww.DEBUG.Printf("Sending single-use transmission to %s: %s", partner.ID, payload) params := single.GetDefaultRequestParams() + params.MaxResponseMessages = maxMessages rng := m.GetRng().GetStream() defer rng.Close() @@ -202,7 +203,7 @@ func sendSingleUse(m *api.Client, partner contact.Contact, payload []byte, // replySingleUse responds to any single-use message it receives by replying\ // with the same payload. -func replySingleUse(m *api.Client, timeout time.Duration, receiver *Receiver) { +func replySingleUse(timeout time.Duration, receiver *Receiver) { // Wait to receive a message or stop after timeout occurs fmt.Println("Waiting for single-use message.") timer := time.NewTimer(timeout) @@ -220,7 +221,7 @@ func replySingleUse(m *api.Client, timeout time.Duration, receiver *Receiver) { // Create new payload from repeated received payloads so that each // message part contains the same payload resPayload := makeResponsePayload(payload, results.request.GetMaxParts(), - results.request.GetMaxResponseLength()) + results.request.GetMaxContentsSize()) fmt.Printf("Sending single-use response message: %s\n", payload) jww.DEBUG.Printf("Sending single-use response to %s: %s", @@ -264,11 +265,11 @@ func (r *Receiver) Callback(req *single.Request, ephID receptionID.EphemeralIden // makeResponsePayload generates a new payload that will span the max number of // message parts in the contact. Each resulting message payload will contain a // copy of the supplied payload with spaces taking up any remaining data. -func makeResponsePayload(payload []byte, maxParts uint8, maxSize int) []byte { +func makeResponsePayload(payload []byte, maxParts uint8, maxSizePerPart int) []byte { payloads := make([][]byte, maxParts) - payloadPart := makeResponsePayloadPart(payload, maxSize) + payloadPart := makeResponsePayloadPart(payload, maxSizePerPart) for i := range payloads { - payloads[i] = make([]byte, maxSize) + payloads[i] = make([]byte, maxSizePerPart) copy(payloads[i], payloadPart) } return bytes.Join(payloads, []byte{}) diff --git a/cmix/client.go b/cmix/client.go index 402a5660bd452f8ac19f6532e5414f9d33cb8981..15318e5f7cd4cdad302547f09bef96a3d09f8655 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -54,7 +54,7 @@ type client struct { // Generic RNG for client rng *fastRNG.StreamGenerator // Comms pointer to send/receive messages - comms clientCommsInterface + comms *commClient.Comms // Contains the network instance instance *commNetwork.Instance @@ -90,35 +90,22 @@ type client struct { // NewClient builds a new reception client object using inputted key fields. func NewClient(params Params, comms *commClient.Comms, session storage.Session, - ndf *ndf.NetworkDefinition, rng *fastRNG.StreamGenerator, - events event.Reporter) (Client, error) { - - // Start network instance - instance, err := commNetwork.NewInstance( - comms.ProtoComms, ndf, nil, nil, commNetwork.None, - params.FastPolling) - if err != nil { - return nil, errors.WithMessage( - err, "failed to create network client") - } + rng *fastRNG.StreamGenerator, events event.Reporter) (Client, error) { tmpMsg := format.NewMessage(session.GetCmixGroup().GetP().ByteLen()) tracker := uint64(0) earliest := uint64(0) - addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size // Create client object c := &client{ param: params, tracker: &tracker, - Space: address.NewAddressSpace(addrSize), events: events, earliestRound: &earliest, session: session, rng: rng, comms: comms, - instance: instance, maxMsgLen: tmpMsg.ContentsSize(), } @@ -126,6 +113,27 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, c.verboseRounds = NewRoundTracker() } + // Set up Message Handler + c.Handler = message.NewHandler(c.param.Message, c.session.GetKV(), + c.events, c.session.GetReceptionID()) + + return c, nil +} + +func (c *client) Connect(ndf *ndf.NetworkDefinition) error { + // Start network instance + instance, err := commNetwork.NewInstance( + c.comms.ProtoComms, ndf, nil, nil, commNetwork.None, + c.param.FastPolling) + if err != nil { + return errors.WithMessage( + err, "failed to create network client") + } + c.instance = instance + + addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size + c.Space = address.NewAddressSpace(addrSize) + /* Set up modules */ nodeChan := make(chan commNetwork.NodeGateway, nodes.InputChanLen) @@ -138,30 +146,27 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, // Enable optimized HostPool initialization poolParams.MaxPings = 50 poolParams.ForceConnection = true - c.Sender, err = gateway.NewSender( - poolParams, rng, ndf, comms, session, nodeChan) + sender, err := gateway.NewSender(poolParams, c.rng, ndf, c.comms, + c.session, nodeChan) if err != nil { - return nil, err + return err } + c.Sender = sender // Set up the node registrar c.Registrar, err = nodes.LoadRegistrar( - session, c.Sender, c.comms, c.rng, nodeChan) + c.session, c.Sender, c.comms, c.rng, nodeChan) if err != nil { - return nil, err + return err } // Set up the historical rounds handler c.Retriever = rounds.NewRetriever( - params.Historical, comms, c.Sender, events) - - // Set up Message Handler - c.Handler = message.NewHandler(params.Message, c.session.GetKV(), c.events, - c.session.GetReceptionID()) + c.param.Historical, c.comms, c.Sender, c.events) // Set up round handler c.Pickup = pickup.NewPickup( - params.Pickup, c.Handler.GetMessageReceptionChannel(), c.Sender, + c.param.Pickup, c.Handler.GetMessageReceptionChannel(), c.Sender, c.Retriever, c.comms, c.rng, c.instance, c.session) // Add the identity system @@ -169,9 +174,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, // Set up the ability to register with new nodes when they appear c.instance.SetAddGatewayChan(nodeChan) - // Set up the health monitor - c.Monitor = health.Init(instance, params.NetworkHealthTimeout) + c.Monitor = health.Init(c.instance, c.param.NetworkHealthTimeout) // Set up critical message tracking (sendCmix only) critSender := func(msg format.Message, recipient *id.ID, params CMIXParams, @@ -181,15 +185,15 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, c.session.GetTransmissionID(), c.comms) } - c.crit = newCritical( - session.GetKV(), c.Monitor, c.instance.GetRoundEvents(), critSender) + c.crit = newCritical(c.session.GetKV(), c.Monitor, + c.instance.GetRoundEvents(), critSender) // Report health events c.AddHealthCallback(func(isHealthy bool) { c.events.Report(5, "health", "IsHealthy", strconv.FormatBool(isHealthy)) }) - return c, nil + return nil } // Follow StartRunners kicks off all network reception goroutines ("threads"). diff --git a/cmix/identity/tracker.go b/cmix/identity/tracker.go index a6162ff7418437f307e16d9983fd6e9ec8f151bf..fc1fb2f2c78b2051c51543ffeace35f082e8a249 100644 --- a/cmix/identity/tracker.go +++ b/cmix/identity/tracker.go @@ -180,16 +180,19 @@ func (t *manager) track(stop *stoppable.Single) { for { // Process new and old identities nextEvent := t.processIdentities(addressSize) - - // Trigger events early. This will cause generations to happen early as - // well as message pickup. As a result, if there are time sync issues - // between clients, and they begin sending to ephemeral IDs early, then - // messages will still be picked up. - nextUpdate := nextEvent.Add(-validityGracePeriod) + waitPeriod := nextEvent.Sub(netTime.Now()) + + if waitPeriod > validityGracePeriod { + // Trigger events early. This will cause generations to happen early as + // well as message pickup. As a result, if there are time sync issues + // between clients, and they begin sending to ephemeral IDs early, then + // messages will still be picked up. + waitPeriod = waitPeriod - validityGracePeriod + } // Sleep until the last ID has expired select { - case <-time.After(nextUpdate.Sub(nextUpdate)): + case <-time.After(waitPeriod): case newIdentity := <-t.newIdentity: jww.DEBUG.Printf("Receiving new identity %s :%+v", newIdentity.Source, newIdentity) @@ -240,7 +243,9 @@ func (t *manager) track(stop *stoppable.Single) { func (t *manager) processIdentities(addressSize uint8) time.Time { edits := false toRemove := make(map[int]struct{}) - nextEvent := t.tracked[0].ValidUntil + // Identities are rotated on a 24-hour time period. Set the event + // to the latest possible time so that any sooner times will overwrite this + nextEvent := netTime.Now().Add(time.Duration(ephemeral.Period)) // Loop through every tracked ID and see if any operations are needed for i, inQuestion := range t.tracked { @@ -263,13 +268,15 @@ func (t *manager) processIdentities(addressSize uint8) time.Time { if inQuestion.NextGeneration.Before(nextEvent) { nextEvent = inQuestion.NextGeneration } - if inQuestion.ValidUntil.Before(nextEvent) { + if !inQuestion.ValidUntil.IsZero() && inQuestion.ValidUntil.Before(nextEvent) { nextEvent = inQuestion.ValidUntil } } } + jww.DEBUG.Printf("[TrackedIDS] NextEvent: %s", nextEvent) + // Process any deletions if len(toRemove) > 0 { newTracked := make([]TrackedID, 0, len(t.tracked)) diff --git a/cmix/interface.go b/cmix/interface.go index a5e540c23b7e1d54265dcc1bb03bc578be79bd9b..c03dc1296249a8174916e87f7de2836bd1c7d39f 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -14,9 +14,15 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" + "gitlab.com/xx_network/primitives/ndf" ) type Client interface { + // Connect turns on network handlers, initializing a host pool and + // network health monitors. This should be called before + // network Follow command is called. + Connect(ndf *ndf.NetworkDefinition) error + // Follow starts the tracking of the network in a new thread. // Errors that occur are reported on the ClientErrorReport function if // passed. The returned stoppable can be used to stop the follower. diff --git a/connect/connect.go b/connect/connect.go index 68914c95022995192293ad41211f94327b30590d..6c9fb120e91d25695ad3f3f5ac0dd3578f07c9f7 100644 --- a/connect/connect.go +++ b/connect/connect.go @@ -17,6 +17,7 @@ import ( clientE2e "gitlab.com/elixxir/client/e2e" "gitlab.com/elixxir/client/e2e/ratchet/partner" "gitlab.com/elixxir/client/e2e/receive" + "gitlab.com/elixxir/client/e2e/rekey" "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/crypto/contact" @@ -64,29 +65,35 @@ type handler struct { // Params for managing Connection objects type Params struct { - auth auth.Param - event event.Reporter + Auth auth.Param + Rekey rekey.Params + Event event.Reporter } // GetDefaultParams returns a usable set of default Connection parameters func GetDefaultParams() Params { return Params{ - auth: auth.GetDefaultParams(), - event: nil, + Auth: auth.GetDefaultParams(), + Rekey: rekey.GetDefaultParams(), + Event: event.NewEventManager(), } } // Connect performs auth key negotiation with the given recipient, // and returns a Connection object for the newly-created partner.Manager // This function is to be used sender-side and will block until the partner.Manager is confirmed -func Connect(recipient contact.Contact, myId *id.ID, rng *fastRNG.StreamGenerator, +func Connect(recipient contact.Contact, myId *id.ID, privKey *cyclic.Int, rng *fastRNG.StreamGenerator, grp *cyclic.Group, net cmix.Client, p Params) (Connection, error) { // Build an ephemeral KV kv := versioned.NewKV(ekv.MakeMemstore()) // Build E2e handler - e2eHandler, err := clientE2e.Load(kv, net, myId, grp, rng, p.event) + err := clientE2e.Init(kv, myId, privKey, grp, p.Rekey) + if err != nil { + return nil, err + } + e2eHandler, err := clientE2e.Load(kv, net, myId, grp, rng, p.Event) if err != nil { return nil, err } @@ -100,7 +107,7 @@ func Connect(recipient contact.Contact, myId *id.ID, rng *fastRNG.StreamGenerato // Build auth object for E2E negotiation authState, err := auth.NewState(kv, net, e2eHandler, - rng, p.event, p.auth, callback, nil) + rng, p.Event, p.Auth, callback, nil) if err != nil { return nil, err } @@ -127,14 +134,18 @@ func Connect(recipient contact.Contact, myId *id.ID, rng *fastRNG.StreamGenerato // RegisterConnectionCallback assembles a Connection object on the reception-side // and feeds it into the given Callback whenever an incoming request // for an E2E partnership with a partner.Manager is confirmed. -func RegisterConnectionCallback(cb Callback, myId *id.ID, rng *fastRNG.StreamGenerator, +func RegisterConnectionCallback(cb Callback, myId *id.ID, privKey *cyclic.Int, rng *fastRNG.StreamGenerator, grp *cyclic.Group, net cmix.Client, p Params) error { // Build an ephemeral KV kv := versioned.NewKV(ekv.MakeMemstore()) // Build E2e handler - e2eHandler, err := clientE2e.Load(kv, net, myId, grp, rng, p.event) + err := clientE2e.Init(kv, myId, privKey, grp, p.Rekey) + if err != nil { + return err + } + e2eHandler, err := clientE2e.Load(kv, net, myId, grp, rng, p.Event) if err != nil { return err } @@ -144,7 +155,7 @@ func RegisterConnectionCallback(cb Callback, myId *id.ID, rng *fastRNG.StreamGen // Build auth object for E2E negotiation _, err = auth.NewState(kv, net, e2eHandler, - rng, p.event, p.auth, callback, nil) + rng, p.Event, p.Auth, callback, nil) return err } diff --git a/e2e/critical.go b/e2e/critical.go index f6ef8912426a589aa0f06b57a77b0ecc0a7da50c..a605775419c8e2181e3d22eb3f984fed7a05bf3e 100644 --- a/e2e/critical.go +++ b/e2e/critical.go @@ -38,10 +38,11 @@ type critical struct { roundEvents roundEventRegistrar trigger chan bool send criticalSender + healthcb func(f func(bool)) uint64 } func newCritical(kv *versioned.KV, hm func(f func(bool)) uint64, - roundEvents roundEventRegistrar, send criticalSender) *critical { + send criticalSender) *critical { cm, err := NewOrLoadE2eMessageBuffer(kv, e2eCriticalMessagesKey) if err != nil { jww.FATAL.Panicf("cannot load the critical messages buffer: "+ @@ -50,17 +51,19 @@ func newCritical(kv *versioned.KV, hm func(f func(bool)) uint64, c := &critical{ E2eMessageBuffer: cm, - roundEvents: roundEvents, trigger: make(chan bool, 100), send: send, } - hm(func(healthy bool) { c.trigger <- healthy }) - return c } -func (c *critical) runCriticalMessages(stop *stoppable.Single) { +func (c *critical) runCriticalMessages(stop *stoppable.Single, + roundEvents roundEventRegistrar) { + if c.roundEvents == nil { + c.roundEvents = roundEvents + c.healthcb(func(healthy bool) { c.trigger <- healthy }) + } for { select { case <-stop.Quit(): diff --git a/e2e/manager.go b/e2e/manager.go index de6f09d7812f87447ffea943d9982da08dd767d8..73eeb111abdcc7db9b360e7e06065e5f7bce4a49 100644 --- a/e2e/manager.go +++ b/e2e/manager.go @@ -2,10 +2,11 @@ package e2e import ( "encoding/json" - "gitlab.com/xx_network/primitives/netTime" "strings" "time" + "gitlab.com/xx_network/primitives/netTime" + "github.com/pkg/errors" "gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/cmix" @@ -144,8 +145,7 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID, "Failed to unmarshal rekeyParams data") } - m.crit = newCritical(kv, net.AddHealthCallback, - net.GetInstance().GetRoundEvents(), m.SendE2E) + m.crit = newCritical(kv, net.AddHealthCallback, m.SendE2E) return m, nil } @@ -155,7 +155,8 @@ func (m *manager) StartProcesses() (stoppable.Stoppable, error) { critcalNetworkStopper := stoppable.NewSingle( "e2eCriticalMessagesStopper") - m.crit.runCriticalMessages(critcalNetworkStopper) + m.crit.runCriticalMessages(critcalNetworkStopper, + m.net.GetInstance().GetRoundEvents()) multi.Add(critcalNetworkStopper) rekeySendFunc := func(mt catalog.MessageType, diff --git a/single/receivedRequest.go b/single/receivedRequest.go index 534ac46376b503cc2d1b5c89d2b5f12caeaffc35..5026f353065f0405e497e18be194b3cde82477cc 100644 --- a/single/receivedRequest.go +++ b/single/receivedRequest.go @@ -47,12 +47,16 @@ func (r Request) GetMaxParts() uint8 { return r.maxParts } +// GetMaxResponseLength returns the maximum total payload size, which is the +// maximum size of each individual part multiplied by the maximum number of parts func (r Request) GetMaxResponseLength() int { - responseMsg := message.NewResponsePart(r.net.GetMaxMessageLength()) + return r.GetMaxContentsSize() * int(r.GetMaxParts()) +} - // Maximum payload size is the maximum amount of room in each message - // multiplied by the number of messages - return responseMsg.GetMaxContentsSize() * int(r.GetMaxParts()) +// GetMaxContentsSize returns maximum payload size for an individual part +func (r Request) GetMaxContentsSize() int { + responseMsg := message.NewResponsePart(r.net.GetMaxMessageLength()) + return responseMsg.GetMaxContentsSize() } // GetPartner returns a copy of the sender ID. diff --git a/single/responseProcessor.go b/single/responseProcessor.go index 99b4be25a286f0fc501eb8999bd4006ecb8c7920..942a8443cb32a3b9ea40d357f5e9b3277c59123a 100644 --- a/single/responseProcessor.go +++ b/single/responseProcessor.go @@ -11,8 +11,6 @@ import ( "gitlab.com/elixxir/primitives/format" ) -const responseProcessorName = "responseProcessorName" - type callbackWrapper func(payload []byte, receptionID receptionID.EphemeralIdentity, rounds []rounds.Round, err error)