diff --git a/api/client.go b/api/client.go index a8e641a491dd0fd1438f920ecc08d5bf87c85650..7145b1f802768a86e376aa9090175754322265b8 100644 --- a/api/client.go +++ b/api/client.go @@ -9,6 +9,7 @@ package api import ( "encoding/json" + "gitlab.com/xx_network/primitives/netTime" "math" "time" @@ -94,10 +95,10 @@ func NewClient(ndfJSON, storageDir string, password []byte, } cmixGrp, e2eGrp := decodeGroups(def) - start := time.Now() + start := netTime.Now() protoUser := createNewUser(rngStreamGen, cmixGrp, e2eGrp) jww.DEBUG.Printf("PortableUserInfo generation took: %s", - time.Now().Sub(start)) + netTime.Now().Sub(start)) _, err = checkVersionAndSetupStorage(def, storageDir, password, protoUser, cmixGrp, e2eGrp, rngStreamGen, diff --git a/auth/confirm.go b/auth/confirm.go index bdefe18f0c569a275c0ce3c6df66219856fc82ec..72d327cf12a63ed231dac32c9aa8fcf3bd87330f 100644 --- a/auth/confirm.go +++ b/auth/confirm.go @@ -36,13 +36,6 @@ import ( // If the request must be resent, use ReplayConfirm func (s *state) Confirm(partner contact.Contact) ( id.Round, error) { - - // check that messages can be sent over the network - if !s.net.IsHealthy() { - return 0, errors.New("Cannot confirm authenticated message " + - "when the network is not healthy") - } - return s.confirm(partner, s.params.ConfirmTag) } @@ -133,7 +126,7 @@ func (s *state) confirm(partner contact.Contact, serviceTag string) ( partner.ID, s.e2e.GetReceptionID(), format.DigestContents(baseFmt.Marshal())) - //service used for noticiation only + //service used for notification only /*send message*/ if err = s.store.StoreConfirmation(partner.ID, baseFmt.Marshal(), diff --git a/auth/receivedRequest.go b/auth/receivedRequest.go index 4bd1ed1215c2c599e9127ddaf93cd8201b61a852..7b033cc2ca11290ff7f2b2fa9cf7813e48c5ca57 100644 --- a/auth/receivedRequest.go +++ b/auth/receivedRequest.go @@ -19,7 +19,7 @@ import ( "gitlab.com/xx_network/primitives/id" ) -const dummyerr = "dummy error so we dont delete the request" +const dummyErr = "dummy error so we dont delete the request" type receivedRequestService struct { s *state @@ -28,7 +28,6 @@ type receivedRequestService struct { func (rrs *receivedRequestService) Process(message format.Message, receptionID receptionID.EphemeralIdentity, round rounds.Round) { - state := rrs.s // check if the timestamp is before the id was created and therefore @@ -204,7 +203,7 @@ func (rrs *receivedRequestService) Process(message format.Message, // return an error so the store layer does not delete the request // because the other side will confirm it bail = true - return errors.Errorf(dummyerr) + return errors.Errorf(dummyErr) } jww.INFO.Printf("Received AuthRequest from %s to %s,"+ diff --git a/auth/store/store_test.go b/auth/store/store_test.go index 882ac6ac1c31aaf8c40954a56a6634f460d8eb9b..e6b14e7e08a40724668ce252054cc4d7591c4e85 100644 --- a/auth/store/store_test.go +++ b/auth/store/store_test.go @@ -25,6 +25,7 @@ import ( "gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/crypto/large" "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" "io" "math/rand" "reflect" @@ -923,7 +924,7 @@ func makeTestRound(t *testing.T) rounds.Round { State: 2, BatchSize: 5, Topology: [][]byte{[]byte("one"), []byte("two")}, - Timestamps: []uint64{uint64(time.Now().UnixNano()), uint64(time.Now().UnixNano())}, + Timestamps: []uint64{uint64(netTime.Now().UnixNano()), uint64(netTime.Now().UnixNano())}, Errors: nil, ClientErrors: nil, ResourceQueueTimeoutMillis: 0, diff --git a/auth/utils_test.go b/auth/utils_test.go index 2a6c16ecca08536668bbebcc35bc4c63bde9aabe..da9f6d7c00c9f8a79dd87aa33b7a32987e8825ac 100644 --- a/auth/utils_test.go +++ b/auth/utils_test.go @@ -9,9 +9,9 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/crypto/large" "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" "math/rand" "testing" - "time" ) type mockSentRequestHandler struct{} @@ -75,7 +75,7 @@ func makeTestRound(t *testing.T) rounds.Round { State: 2, BatchSize: 5, Topology: [][]byte{[]byte("test"), []byte("test")}, - Timestamps: []uint64{uint64(time.Now().UnixNano()), uint64(time.Now().UnixNano())}, + Timestamps: []uint64{uint64(netTime.Now().UnixNano()), uint64(netTime.Now().UnixNano())}, Errors: nil, ClientErrors: nil, ResourceQueueTimeoutMillis: 0, diff --git a/cmix/identity/tracker.go b/cmix/identity/tracker.go index 1022940eec62f13f2b2b2358a54029856fe4bee8..51f324b67f88c6a01512391817cb3554b04117d8 100644 --- a/cmix/identity/tracker.go +++ b/cmix/identity/tracker.go @@ -189,13 +189,18 @@ func (t *manager) track(stop *stoppable.Single) { // Sleep until the last ID has expired select { - case <-time.NewTimer(nextUpdate.Sub(nextUpdate)).C: + case <-time.After(nextUpdate.Sub(nextUpdate)): case newIdentity := <-t.newIdentity: + jww.DEBUG.Printf("Receiving new identity %s :%+v", + newIdentity.Source, newIdentity) + // If the identity is old, then update its properties isOld := false for i := range t.tracked { inQuestion := t.tracked[i] if inQuestion.Source.Cmp(newIdentity.Source) { + jww.DEBUG.Printf( + "Updating old identity %s", newIdentity.Source) inQuestion.Persistent = newIdentity.Persistent inQuestion.ValidUntil = newIdentity.ValidUntil isOld = true @@ -203,6 +208,7 @@ func (t *manager) track(stop *stoppable.Single) { } } if !isOld { + jww.DEBUG.Printf("Tracking new identity %s", newIdentity.Source) // Otherwise, add it to the list and run t.tracked = append(t.tracked, newIdentity) } diff --git a/cmix/identity/tracker_test.go b/cmix/identity/tracker_test.go index ef7241c6422d163abc869121b1756aee2b8fff0a..f8b28740d7e41a00bf66eb1b55e3a459a986ff2d 100644 --- a/cmix/identity/tracker_test.go +++ b/cmix/identity/tracker_test.go @@ -36,7 +36,7 @@ func TestManager_processIdentities_expired(t *testing.T) { // Add some expired test IDs for i := uint64(0); i < 10; i++ { testId := id.NewIdFromUInt(i, id.User, t) - validUntil := time.Now() + validUntil := netTime.Now() m.tracked = append(m.tracked, TrackedID{ NextGeneration: netTime.Now().Add(-time.Second), LastGeneration: time.Time{}, @@ -75,7 +75,7 @@ func TestManager_processIdentities(t *testing.T) { // Add some expired test IDs testId := id.NewIdFromUInt(0, id.User, t) - validUntil := time.Now().Add(time.Minute) + validUntil := netTime.Now().Add(time.Minute) m.tracked = append(m.tracked, TrackedID{ NextGeneration: netTime.Now(), LastGeneration: time.Time{}, diff --git a/cmix/message/handler.go b/cmix/message/handler.go index 6a67e983e2ff1980b42f089c412279e6ae278a7b..1ea53e75416125c321d045ceb883132bf0e220cc 100644 --- a/cmix/message/handler.go +++ b/cmix/message/handler.go @@ -9,6 +9,10 @@ package message import ( "fmt" + "gitlab.com/elixxir/client/event" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/xx_network/primitives/id" + "strconv" "sync" jww "github.com/spf13/jwalterweatherman" @@ -17,13 +21,96 @@ import ( "gitlab.com/xx_network/primitives/netTime" ) -func (p *handler) handleMessages(stop *stoppable.Single) { +const ( + inProcessKey = "InProcessMessagesKey" +) + +type Handler interface { + GetMessageReceptionChannel() chan<- Bundle + StartProcesses() stoppable.Stoppable + CheckInProgressMessages() + + // Fingerprints + AddFingerprint(clientID *id.ID, fingerprint format.Fingerprint, mp Processor) error + DeleteFingerprint(clientID *id.ID, fingerprint format.Fingerprint) + DeleteClientFingerprints(clientID *id.ID) + + // Triggers + AddService(clientID *id.ID, newService Service, response Processor) + DeleteService(clientID *id.ID, toDelete Service, response Processor) + DeleteClientService(clientID *id.ID) + TrackServices(triggerTracker ServicesTracker) +} + +type handler struct { + param Params + + messageReception chan Bundle + checkInProgress chan struct{} + + inProcess *MeteredCmixMessageBuffer + + events event.Reporter + + FingerprintsManager + ServicesManager +} + +func NewHandler(param Params, kv *versioned.KV, events event.Reporter, + standardID *id.ID) Handler { + + garbled, err := NewOrLoadMeteredCmixMessageBuffer(kv, inProcessKey) + if err != nil { + jww.FATAL.Panicf( + "Failed to load or new the Garbled Messages system: %v", err) + } + + m := handler{ + param: param, + messageReception: make(chan Bundle, param.MessageReceptionBuffLen), + checkInProgress: make(chan struct{}, 100), + inProcess: garbled, + events: events, + } + + m.FingerprintsManager = *newFingerprints(standardID) + m.ServicesManager = *NewServices() + return &m +} + +// GetMessageReceptionChannel gets the channel to send received messages on. +func (h *handler) GetMessageReceptionChannel() chan<- Bundle { + return h.messageReception +} + +// StartProcesses starts all worker pool. +func (h *handler) StartProcesses() stoppable.Stoppable { + multi := stoppable.NewMulti("MessageReception") + + // Create the message handler workers + for i := uint(0); i < h.param.MessageReceptionWorkerPoolSize; i++ { + stop := stoppable.NewSingle( + "MessageReception Worker " + strconv.Itoa(int(i))) + go h.handleMessages(stop) + multi.Add(stop) + } + + // Create the in progress messages thread + garbledStop := stoppable.NewSingle("GarbledMessages") + go h.recheckInProgressRunner(garbledStop) + multi.Add(garbledStop) + + return multi +} + +// +func (h *handler) handleMessages(stop *stoppable.Single) { for { select { case <-stop.Quit(): stop.ToStopped() return - case bundle := <-p.messageReception: + case bundle := <-h.messageReception: go func() { wg := sync.WaitGroup{} wg.Add(len(bundle.Messages)) @@ -33,24 +120,24 @@ func (p *handler) handleMessages(stop *stoppable.Single) { msg.Digest()) go func() { - count, ts := p.inProcess.Add( + count, ts := h.inProcess.Add( msg, bundle.RoundInfo.Raw, bundle.Identity) wg.Done() - success := p.handleMessage(msg, bundle) + success := h.handleMessage(msg, bundle) if success { - p.inProcess.Remove( + h.inProcess.Remove( msg, bundle.RoundInfo.Raw, bundle.Identity) } else { // Fail the message if any part of the decryption // fails, unless it is the last attempts and has // been in the buffer long enough, in which case // remove it - if count == p.param.MaxChecksInProcessMessage && - netTime.Since(ts) > p.param.InProcessMessageWait { - p.inProcess.Remove( + if count == h.param.MaxChecksInProcessMessage && + netTime.Since(ts) > h.param.InProcessMessageWait { + h.inProcess.Remove( msg, bundle.RoundInfo.Raw, bundle.Identity) } else { - p.inProcess.Failed( + h.inProcess.Failed( msg, bundle.RoundInfo.Raw, bundle.Identity) } @@ -66,7 +153,7 @@ func (p *handler) handleMessages(stop *stoppable.Single) { } -func (p *handler) handleMessage(ecrMsg format.Message, bundle Bundle) bool { +func (h *handler) handleMessage(ecrMsg format.Message, bundle Bundle) bool { fingerprint := ecrMsg.GetKeyFP() identity := bundle.Identity round := bundle.RoundInfo @@ -74,14 +161,14 @@ func (p *handler) handleMessage(ecrMsg format.Message, bundle Bundle) bool { jww.INFO.Printf("handleMessage(%s)", ecrMsg.Digest()) // If we have a fingerprint, process it - if proc, exists := p.pop(identity.Source, fingerprint); exists { + if proc, exists := h.pop(identity.Source, fingerprint); exists { jww.DEBUG.Printf("handleMessage found fingerprint: %s", ecrMsg.Digest()) proc.Process(ecrMsg, identity, round) return true } - services, exists := p.get( + services, exists := h.get( identity.Source, ecrMsg.GetSIH(), ecrMsg.GetContents()) if exists { for _, t := range services { @@ -94,13 +181,6 @@ func (p *handler) handleMessage(ecrMsg format.Message, bundle Bundle) bool { ecrMsg.Digest(), ecrMsg.GetSIH()) } return true - } else { - // TODO: Delete this else block because it should not be needed. - jww.INFO.Printf("checking backup %v", identity.Source) - // // If it does not exist, check against the default fingerprint for the - // // identity - // forMe = fingerprint2.CheckIdentityFP(ecrMsg.GetSIH(), - // ecrMsgContents, preimage.MakeDefault(identity.Source)) } im := fmt.Sprintf("Message cannot be identify: keyFP: %v, round: %d "+ @@ -108,7 +188,7 @@ func (p *handler) handleMessage(ecrMsg format.Message, bundle Bundle) bool { ecrMsg.GetKeyFP(), bundle.Round, ecrMsg.Digest()) jww.TRACE.Printf(im) - p.events.Report(1, "MessageReception", "Garbled", im) + h.events.Report(1, "MessageReception", "Garbled", im) return false } diff --git a/cmix/message/inProgress.go b/cmix/message/inProgress.go index 147b6401b7e2a292735cbfc0e90bc58f9248a178..e5e9826a9c0dbba61e28401fac041e530146ade0 100644 --- a/cmix/message/inProgress.go +++ b/cmix/message/inProgress.go @@ -26,9 +26,9 @@ import ( // CheckInProgressMessages triggers rechecking all in progress messages if the // queue is not full Exposed on the network handler. -func (p *handler) CheckInProgressMessages() { +func (h *handler) CheckInProgressMessages() { select { - case p.checkInProgress <- struct{}{}: + case h.checkInProgress <- struct{}{}: default: jww.WARN.Print("Failed to check garbled messages due to full channel.") } @@ -36,24 +36,24 @@ func (p *handler) CheckInProgressMessages() { // recheckInProgressRunner is a long-running thread which processes messages // that need to be checked. -func (p *handler) recheckInProgressRunner(stop *stoppable.Single) { +func (h *handler) recheckInProgressRunner(stop *stoppable.Single) { for { select { case <-stop.Quit(): stop.ToStopped() return - case <-p.checkInProgress: + case <-h.checkInProgress: jww.INFO.Printf("[GARBLE] Checking Garbled messages") - p.recheckInProgress() + h.recheckInProgress() } } } // recheckInProgress is the handler for a single run of recheck messages. -func (p *handler) recheckInProgress() { +func (h *handler) recheckInProgress() { // Try to decrypt every garbled message, excising those whose counts are too // high - for grbldMsg, ri, identity, has := p.inProcess.Next(); has; grbldMsg, ri, identity, has = p.inProcess.Next() { + for grbldMsg, ri, identity, has := h.inProcess.Next(); has; grbldMsg, ri, identity, has = h.inProcess.Next() { bundle := Bundle{ Round: id.Round(ri.ID), RoundInfo: rounds.MakeRound(ri), @@ -63,7 +63,7 @@ func (p *handler) recheckInProgress() { } select { - case p.messageReception <- bundle: + case h.messageReception <- bundle: default: jww.WARN.Printf("Failed to send bundle, channel full.") } diff --git a/cmix/message/pickup.go b/cmix/message/pickup.go deleted file mode 100644 index 29fbb95951aea7f4cffbd1c96ba9919ab0d27ee0..0000000000000000000000000000000000000000 --- a/cmix/message/pickup.go +++ /dev/null @@ -1,102 +0,0 @@ -/////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 xx network SEZC // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file // -/////////////////////////////////////////////////////////////////////////////// - -package message - -import ( - "strconv" - - "gitlab.com/elixxir/client/event" - "gitlab.com/elixxir/client/storage/versioned" - "gitlab.com/elixxir/primitives/format" - "gitlab.com/xx_network/primitives/id" - - jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/stoppable" -) - -const ( - inProcessKey = "InProcessMessagesKey" -) - -type Handler interface { - GetMessageReceptionChannel() chan<- Bundle - StartProcesses() stoppable.Stoppable - CheckInProgressMessages() - - // Fingerprints - AddFingerprint(clientID *id.ID, fingerprint format.Fingerprint, mp Processor) error - DeleteFingerprint(clientID *id.ID, fingerprint format.Fingerprint) - DeleteClientFingerprints(clientID *id.ID) - - // Triggers - AddService(clientID *id.ID, newService Service, response Processor) - DeleteService(clientID *id.ID, toDelete Service, response Processor) - DeleteClientService(clientID *id.ID) - TrackServices(triggerTracker ServicesTracker) -} - -type handler struct { - param Params - - messageReception chan Bundle - checkInProgress chan struct{} - - inProcess *MeteredCmixMessageBuffer - - events event.Reporter - - FingerprintsManager - ServicesManager -} - -func NewHandler(param Params, kv *versioned.KV, events event.Reporter, - standardID *id.ID) Handler { - - garbled, err := NewOrLoadMeteredCmixMessageBuffer(kv, inProcessKey) - if err != nil { - jww.FATAL.Panicf( - "Failed to load or new the Garbled Messages system: %v", err) - } - - m := handler{ - param: param, - messageReception: make(chan Bundle, param.MessageReceptionBuffLen), - checkInProgress: make(chan struct{}, 100), - inProcess: garbled, - events: events, - } - - m.FingerprintsManager = *newFingerprints(standardID) - m.ServicesManager = *NewServices() - return &m -} - -// GetMessageReceptionChannel gets the channel to send received messages on. -func (p *handler) GetMessageReceptionChannel() chan<- Bundle { - return p.messageReception -} - -// StartProcesses starts all worker pool. -func (p *handler) StartProcesses() stoppable.Stoppable { - multi := stoppable.NewMulti("MessageReception") - - // Create the message handler workers - for i := uint(0); i < p.param.MessageReceptionWorkerPoolSize; i++ { - stop := stoppable.NewSingle( - "MessageReception Worker " + strconv.Itoa(int(i))) - go p.handleMessages(stop) - multi.Add(stop) - } - - // Create the in progress messages thread - garbledStop := stoppable.NewSingle("GarbledMessages") - go p.recheckInProgressRunner(garbledStop) - multi.Add(garbledStop) - - return multi -} diff --git a/cmix/message/processor.go b/cmix/message/processor.go index 5d454df5c5bc49aaf0ffe1570512a9dc661e4c32..1601189b65e1c6c8ea74c263e4e77e7531e76b8d 100644 --- a/cmix/message/processor.go +++ b/cmix/message/processor.go @@ -19,6 +19,6 @@ type Processor interface { Process(message format.Message, receptionID receptionID.EphemeralIdentity, round rounds.Round) - // Implement the stringer interface String() string for debugging + // Stringer interface for debugging fmt.Stringer } diff --git a/cmix/polltracker_test.go b/cmix/polltracker_test.go index 9b0812d04784a4c1943f1f8b2c2786284782d312..8fb6d96860d70c94f48f51843a03df44f84694be 100644 --- a/cmix/polltracker_test.go +++ b/cmix/polltracker_test.go @@ -3,6 +3,7 @@ package cmix import ( xxid "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" + "gitlab.com/xx_network/primitives/netTime" "strings" "testing" "time" @@ -14,11 +15,11 @@ func TestPollTracker(t *testing.T) { // Init ID and first EID id := xxid.NewIdFromString("zezima", xxid.User, t) - eid, _, _, err := ephemeral.GetId(id, 16, time.Now().UnixNano()) + eid, _, _, err := ephemeral.GetId(id, 16, netTime.Now().UnixNano()) if err != nil { t.Errorf("Failed to create eid for ID %s", id.String()) } - eid2, _, _, err := ephemeral.GetId(id, 16, time.Now().Add(time.Hour*24).UnixNano()) + eid2, _, _, err := ephemeral.GetId(id, 16, netTime.Now().Add(time.Hour*24).UnixNano()) if err != nil { t.Errorf("Failed to create second eid for ID %s", id.String()) } diff --git a/cmix/utils_test.go b/cmix/utils_test.go index 2d45401c6c7c010b0a61c588f48fc05d277e63f2..6fffb13db3f47dee64d71e89f6e254050d0318c0 100644 --- a/cmix/utils_test.go +++ b/cmix/utils_test.go @@ -227,7 +227,7 @@ func newTestClient(t *testing.T) (*client, error) { } pubKey := pk.GetPublic() - now := time.Now() + now := netTime.Now() timestamps := []uint64{ uint64(now.Add(-30 * time.Second).UnixNano()), //PENDING uint64(now.Add(-25 * time.Second).UnixNano()), //PRECOMPUTING diff --git a/connect/connect.go b/connect/connect.go new file mode 100644 index 0000000000000000000000000000000000000000..68914c95022995192293ad41211f94327b30590d --- /dev/null +++ b/connect/connect.go @@ -0,0 +1,232 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 Privategrity Corporation / +// / +// All rights reserved. / +//////////////////////////////////////////////////////////////////////////////// + +package connect + +import ( + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/auth" + "gitlab.com/elixxir/client/catalog" + "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/rounds" + clientE2e "gitlab.com/elixxir/client/e2e" + "gitlab.com/elixxir/client/e2e/ratchet/partner" + "gitlab.com/elixxir/client/e2e/receive" + "gitlab.com/elixxir/client/event" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/crypto/contact" + "gitlab.com/elixxir/crypto/cyclic" + "gitlab.com/elixxir/crypto/e2e" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/elixxir/ekv" + "gitlab.com/xx_network/primitives/id" + "io" + "time" +) + +// Connection is a wrapper for the E2E and auth packages. +// It can be used to automatically establish an E2E partnership +// with a partner.Manager, or be built from an existing E2E partnership. +// You can then use this interface to send to and receive from the newly-established partner.Manager. +type Connection interface { + // Closer deletes this Connection's partner.Manager and releases resources + io.Closer + + // GetPartner returns the partner.Manager for this Connection + GetPartner() partner.Manager + + // SendE2E is a wrapper for sending specifically to the Connection's partner.Manager + SendE2E(mt catalog.MessageType, payload []byte, params clientE2e.Params) ( + []id.Round, e2e.MessageID, time.Time, error) + + // RegisterListener is used for E2E reception + // and allows for reading data sent from the partner.Manager + RegisterListener(messageType catalog.MessageType, + newListener receive.Listener) receive.ListenerID + // Unregister listener for E2E reception + Unregister(listenerID receive.ListenerID) +} + +// Callback is the callback format required to retrieve new Connection objects as they are established +type Callback func(connection Connection) + +// handler provides an implementation for the Connection interface +type handler struct { + partner partner.Manager + e2e clientE2e.Handler + params Params +} + +// Params for managing Connection objects +type Params struct { + auth auth.Param + event event.Reporter +} + +// GetDefaultParams returns a usable set of default Connection parameters +func GetDefaultParams() Params { + return Params{ + auth: auth.GetDefaultParams(), + event: nil, + } +} + +// Connect performs auth key negotiation with the given recipient, +// and returns a Connection object for the newly-created partner.Manager +// This function is to be used sender-side and will block until the partner.Manager is confirmed +func Connect(recipient contact.Contact, myId *id.ID, rng *fastRNG.StreamGenerator, + grp *cyclic.Group, net cmix.Client, p Params) (Connection, error) { + + // Build an ephemeral KV + kv := versioned.NewKV(ekv.MakeMemstore()) + + // Build E2e handler + e2eHandler, err := clientE2e.Load(kv, net, myId, grp, rng, p.event) + if err != nil { + return nil, err + } + + // Build callback for E2E negotiation + signalChannel := make(chan Connection, 1) + cb := func(connection Connection) { + signalChannel <- connection + } + callback := getAuthCallback(cb, e2eHandler, p) + + // Build auth object for E2E negotiation + authState, err := auth.NewState(kv, net, e2eHandler, + rng, p.event, p.auth, callback, nil) + if err != nil { + return nil, err + } + + // Perform the auth request + _, err = authState.Request(recipient, nil) + if err != nil { + return nil, err + } + + // Block waiting for auth to confirm + jww.DEBUG.Printf("Connection waiting for auth request for %s to be confirmed...", recipient.ID.String()) + newConnection := <-signalChannel + + // Verify the Connection is complete + if newConnection == nil { + return nil, errors.Errorf("Unable to complete connection with partner %s", recipient.ID.String()) + } + jww.DEBUG.Printf("Connection auth request for %s confirmed", recipient.ID.String()) + + return newConnection, nil +} + +// RegisterConnectionCallback assembles a Connection object on the reception-side +// and feeds it into the given Callback whenever an incoming request +// for an E2E partnership with a partner.Manager is confirmed. +func RegisterConnectionCallback(cb Callback, myId *id.ID, rng *fastRNG.StreamGenerator, + grp *cyclic.Group, net cmix.Client, p Params) error { + + // Build an ephemeral KV + kv := versioned.NewKV(ekv.MakeMemstore()) + + // Build E2e handler + e2eHandler, err := clientE2e.Load(kv, net, myId, grp, rng, p.event) + if err != nil { + return err + } + + // Build callback for E2E negotiation + callback := getAuthCallback(cb, e2eHandler, p) + + // Build auth object for E2E negotiation + _, err = auth.NewState(kv, net, e2eHandler, + rng, p.event, p.auth, callback, nil) + return err +} + +// BuildConnection assembles a Connection object +// after an E2E partnership has already been confirmed with the given partner.Manager +func BuildConnection(partner partner.Manager, e2eHandler clientE2e.Handler, p Params) Connection { + return &handler{ + partner: partner, + params: p, + e2e: e2eHandler, + } +} + +// Close deletes this Connection's partner.Manager and releases resources +func (h *handler) Close() error { + return h.e2e.DeletePartner(h.partner.PartnerId()) +} + +// GetPartner returns the partner.Manager for this Connection +func (h *handler) GetPartner() partner.Manager { + return h.partner +} + +// SendE2E is a wrapper for sending specifically to the Connection's partner.Manager +func (h *handler) SendE2E(mt catalog.MessageType, payload []byte, params clientE2e.Params) ( + []id.Round, e2e.MessageID, time.Time, error) { + return h.e2e.SendE2E(mt, h.partner.PartnerId(), payload, params) +} + +// RegisterListener is used for E2E reception +// and allows for reading data sent from the partner.Manager +func (h *handler) RegisterListener(messageType catalog.MessageType, newListener receive.Listener) receive.ListenerID { + return h.e2e.RegisterListener(h.partner.PartnerId(), messageType, newListener) +} + +// Unregister listener for E2E reception +func (h *handler) Unregister(listenerID receive.ListenerID) { + h.e2e.Unregister(listenerID) +} + +// authCallback provides callback functionality for interfacing between auth.State and Connection +// This is used both for blocking creation of a Connection object until the auth Request is confirmed +// and for dynamically building new Connection objects when an auth Request is received. +type authCallback struct { + // Used for signaling confirmation of E2E partnership + connectionCallback Callback + + // Used for building new Connection objects + connectionE2e clientE2e.Handler + connectionParams Params +} + +// getAuthCallback returns a callback interface to be passed into the creation of an auth.State object. +func getAuthCallback(cb Callback, e2e clientE2e.Handler, params Params) authCallback { + return authCallback{ + connectionCallback: cb, + connectionE2e: e2e, + connectionParams: params, + } +} + +// Confirm will be called when an auth Confirm message is processed +func (a authCallback) Confirm(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { + jww.DEBUG.Printf("Connection auth request for %s confirmed", requestor.ID.String()) + + // After confirmation, get the new partner + newPartner, err := a.connectionE2e.GetPartner(requestor.ID) + if err != nil { + jww.ERROR.Printf("Unable to build connection with partner %s: %+v", requestor.ID, err) + // Send a nil connection to avoid hold-ups down the line + a.connectionCallback(nil) + return + } + + // Return the new Connection object + a.connectionCallback(BuildConnection(newPartner, a.connectionE2e, a.connectionParams)) +} + +// Request will be called when an auth Request message is processed +func (a authCallback) Request(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { +} + +// Reset will be called when an auth Reset operation occurs +func (a authCallback) Reset(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { +} diff --git a/e2e/interface.go b/e2e/interface.go index 43644636361a97c310f60ca0cf2ca1651fee1078..a24f285ce3b2046b8d838aa2826775b167f47bf9 100644 --- a/e2e/interface.go +++ b/e2e/interface.go @@ -45,10 +45,10 @@ type Handler interface { // The name is used for debug printing and not checked for // uniqueness // - // user: 0 for all, or any user ID to listen for messages from - // a particular user. 0 can be id.ZeroUser or id.ZeroID - // messageType: 0 for all, or any message type to listen for - // messages of that type. 0 can be Message.AnyType + // user: id.ZeroUser for all, or any user ID to listen for + // messages from a particular user. + // messageType: catalog.NoType for all, or any message type to + // listen for messages of that type. // newListener: something implementing the Listener // interface. Do not pass nil to this. // @@ -66,10 +66,10 @@ type Handler interface { // name is used for debug printing and not checked for // uniqueness // - // user: 0 for all, or any user ID to listen for messages from - // a particular user. 0 can be id.ZeroUser or id.ZeroID - // messageType: 0 for all, or any message type to listen for - // messages of that type. 0 can be Message.AnyType + // user: id.ZeroUser for all, or any user ID to listen for + // messages from a particular user. + // messageType: catalog.NoType for all, or any message type to + // listen for messages of that type. // newListener: a function implementing the ListenerFunc // function type. Do not pass nil to this. // @@ -115,7 +115,6 @@ type Handler interface { receiveParams session.Params) (partner.Manager, error) // GetPartner returns the partner per its ID, if it exists - // myID is your ID in the relationship GetPartner(partnerID *id.ID) (partner.Manager, error) // DeletePartner removes the associated contact from the E2E store @@ -133,8 +132,8 @@ type Handler interface { // be sent to using the tag fields in the Params Object // Passing nil for the processor allows you to create a // service which is never called but will be visible by - // notifications Processes added this way are generally not - // end ot end encrypted messages themselves, but other + // notifications. Processes added this way are generally not + // end-to-end encrypted messages themselves, but other // protocols which piggyback on e2e relationships to start // communication AddService(tag string, processor message.Processor) error diff --git a/e2e/manager.go b/e2e/manager.go index 7565deb402e86dec5a2b61e25ebf28f8553ff555..de6f09d7812f87447ffea943d9982da08dd767d8 100644 --- a/e2e/manager.go +++ b/e2e/manager.go @@ -2,6 +2,7 @@ package e2e import ( "encoding/json" + "gitlab.com/xx_network/primitives/netTime" "strings" "time" @@ -39,7 +40,7 @@ const e2eRekeyParamsKey = "e2eRekeyParams" const e2eRekeyParamsVer = 0 // Init Creates stores. After calling, use load -// Passes a the ID public key which is used for the relationship +// Passes the ID public key which is used for the relationship // uses the passed ID to modify the kv prefix for a unique storage path func Init(kv *versioned.KV, myID *id.ID, privKey *cyclic.Int, grp *cyclic.Group, rekeyParams rekey.Params) error { @@ -55,7 +56,7 @@ func initE2E(kv *versioned.KV, myID *id.ID, privKey *cyclic.Int, } err = kv.Set(e2eRekeyParamsKey, e2eRekeyParamsVer, &versioned.Object{ Version: e2eRekeyParamsVer, - Timestamp: time.Now(), + Timestamp: netTime.Now(), Data: rekeyParamsData, }) if err != nil { @@ -101,7 +102,7 @@ func LoadLegacy(kv *versioned.KV, net cmix.Client, myID *id.ID, // Store the rekey params to disk/memory err = kv.Set(e2eRekeyParamsKey, e2eRekeyParamsVer, &versioned.Object{ Version: e2eRekeyParamsVer, - Timestamp: time.Now(), + Timestamp: netTime.Now(), Data: rekeyParamsData, }) diff --git a/e2e/manager_test.go b/e2e/manager_test.go index 54a04e1ec096c0106cc5711982c46eb0524ed8e3..af4736d0ddb1fee413be96e026dbb4ea05c75109 100644 --- a/e2e/manager_test.go +++ b/e2e/manager_test.go @@ -101,7 +101,7 @@ func TestLoadLegacy(t *testing.T) { // t.Fatalf( // "Failed to create storage session: %+v", err) //} - kv := versioned.NewKV(ekv.Memstore{}) + kv := versioned.NewKV(&ekv.Memstore{}) err := ratchet.New(kv, myId, myPrivKey, grp) if err != nil { diff --git a/storage/utility/bucket.go b/storage/utility/bucket.go index 5548cdfe52321d58ca181130dbca4010d5aa4b16..46b3da9a9d7733b0ce2abb5812a451ea9ae8e419 100644 --- a/storage/utility/bucket.go +++ b/storage/utility/bucket.go @@ -42,7 +42,7 @@ func NewStoredBucket(capacity, leaked uint32, leakDuration time.Duration, kv: kv.Prefix(bucketStorePrefix), } - bs.save(0, time.Now().UnixNano()) + bs.save(0, netTime.Now().UnixNano()) return rateLimiting.CreateBucket(capacity, leaked, leakDuration, bs.save) } diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go index 82c8b898e14c9e9b225a5d74141ec6e61925c8bd..8f513bdb948a61d11b7486eea80848a97a9876b0 100644 --- a/storage/utility/messageBuffer.go +++ b/storage/utility/messageBuffer.go @@ -203,11 +203,19 @@ func (mb *MessageBuffer) Add(m interface{}) interface{} { defer mb.mux.Unlock() // Ensure message does not already exist in buffer - if face1, exists1 := mb.messages[h]; exists1 { - return face1 + if _, exists1 := mb.messages[h]; exists1 { + msg, err := mb.handler.LoadMessage(mb.kv, MakeStoredMessageKey(mb.key, h)) + if err != nil { + jww.FATAL.Panicf("Error loading message %s: %v", h, err) + } + return msg } - if face2, exists2 := mb.processingMessages[h]; exists2 { - return face2 + if _, exists2 := mb.processingMessages[h]; exists2 { + msg, err := mb.handler.LoadMessage(mb.kv, MakeStoredMessageKey(mb.key, h)) + if err != nil { + jww.FATAL.Panicf("Error loading processing message %s: %v", h, err) + } + return msg } // Save message as versioned object diff --git a/xxmutils/restoreContacts.go b/xxmutils/restoreContacts.go index b64947418cb401ea797c99cc8c5935a60335bfb9..6b8afad7073c5d9ff7b4ad56e19515bdd596440c 100644 --- a/xxmutils/restoreContacts.go +++ b/xxmutils/restoreContacts.go @@ -11,6 +11,7 @@ import ( "encoding/json" "errors" "fmt" + "gitlab.com/xx_network/primitives/netTime" "math" "strings" "sync" @@ -287,7 +288,7 @@ func (s stateStore) set(user *contact.Contact, state restoreState) error { data = append(data, user.Marshal()...) val := &versioned.Object{ Version: 0, - Timestamp: time.Now(), + Timestamp: netTime.Now(), Data: data, } return s.apiStore.Set(key, val)