diff --git a/connect/connect.go b/connect/connect.go index 635038130616933c0d2e46d193f25ca905443f41..a6aaac93771e47e2baab112ac4bb312d4323f628 100644 --- a/connect/connect.go +++ b/connect/connect.go @@ -7,6 +7,7 @@ package connect import ( + "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/auth" "gitlab.com/elixxir/client/catalog" @@ -88,7 +89,11 @@ func Connect(recipient contact.Contact, myId *id.ID, rng *fastRNG.StreamGenerato } // Build callback for E2E negotiation - callback := getAuthCallback() + signalChannel := make(chan Connection, 1) + cb := func(connection Connection) { + signalChannel <- connection + } + callback := getAuthCallback(cb, e2eHandler, p) // Build auth object for E2E negotiation authState, err := auth.NewState(kv, net, e2eHandler, @@ -105,22 +110,22 @@ func Connect(recipient contact.Contact, myId *id.ID, rng *fastRNG.StreamGenerato // Block waiting for auth to confirm jww.DEBUG.Printf("Connection waiting for auth request for %s to be confirmed...", recipient.ID.String()) - <-callback.confirmPartner - jww.DEBUG.Printf("Connection auth request for %s confirmed", recipient.ID.String()) + newConnection := <-signalChannel - // After confirmation, get the new partner - newPartner, err := e2eHandler.GetPartner(recipient.ID) - if err != nil { - return nil, err + // Verify the Connection is complete + if newConnection == nil { + return nil, errors.Errorf("Unable to complete connection with partner %s", recipient.ID.String()) } + jww.DEBUG.Printf("Connection auth request for %s confirmed", recipient.ID.String()) - return BuildConnection(newPartner, e2eHandler, p), nil + return newConnection, nil } -// WaitForConnections assembles a Connection object on the reception-side -// when an incoming request for an E2E partnership with a partner.Manager is confirmed. -func WaitForConnections(myId *id.ID, rng *fastRNG.StreamGenerator, - grp *cyclic.Group, net cmix.Client, p Params) (Connection, error) { +// RegisterConnectionCallback assembles a Connection object on the reception-side +// and feeds it into the given Callback whenever an incoming request +// for an E2E partnership with a partner.Manager is confirmed. +func RegisterConnectionCallback(cb Callback, myId *id.ID, rng *fastRNG.StreamGenerator, + grp *cyclic.Group, net cmix.Client, p Params) error { // Build an ephemeral KV kv := versioned.NewKV(ekv.MakeMemstore()) @@ -128,32 +133,16 @@ func WaitForConnections(myId *id.ID, rng *fastRNG.StreamGenerator, // Build E2e handler e2eHandler, err := clientE2e.Load(kv, net, myId, grp, rng, p.event) if err != nil { - return nil, err + return err } // Build callback for E2E negotiation - callback := getAuthCallback() + callback := getAuthCallback(cb, e2eHandler, p) // Build auth object for E2E negotiation _, err = auth.NewState(kv, net, e2eHandler, rng, p.event, p.auth, callback, nil) - if err != nil { - return nil, err - } - - // Block waiting for incoming auth request - jww.DEBUG.Printf("Connection waiting for auth request to be received...") - newPartnerId := <-callback.confirmPartner - jww.DEBUG.Printf("Connection auth request for %s confirmed", newPartnerId.String()) - - // After confirmation, get the new partner - newPartner, err := e2eHandler.GetPartner(newPartnerId) - if err != nil { - return nil, err - } - - // Return the new Connection object - return BuildConnection(newPartner, e2eHandler, p), nil + return err } // BuildConnection assembles a Connection object @@ -198,26 +187,46 @@ func (h *handler) Unregister(listenerID receive.ListenerID) { // and for dynamically building new Connection objects when an auth Request is received. type authCallback struct { // Used for signaling confirmation of E2E partnership - confirmPartner chan *id.ID + connectionCallback Callback + + // Used for building new Connection objects + connectionE2e clientE2e.Handler + connectionParams Params } +// +type Callback func(connection Connection) + // getAuthCallback returns a callback interface to be passed into the creation of an auth.State object. -func getAuthCallback() authCallback { +func getAuthCallback(cb Callback, e2e clientE2e.Handler, params Params) authCallback { return authCallback{ - confirmPartner: make(chan *id.ID, 10), + connectionCallback: cb, + connectionE2e: e2e, + connectionParams: params, } } // Confirm will be called when an auth Confirm message is processed -func (c authCallback) Confirm(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { - // Signal to a listening thread that the partnership is confirmed - c.confirmPartner <- requestor.ID +func (a authCallback) Confirm(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { + jww.DEBUG.Printf("Connection auth request for %s confirmed", requestor.ID.String()) + + // After confirmation, get the new partner + newPartner, err := a.connectionE2e.GetPartner(requestor.ID) + if err != nil { + jww.ERROR.Printf("Unable to build connection with partner %s: %+v", requestor.ID, err) + // Send a nil connection to avoid hold-ups down the line + a.connectionCallback(nil) + return + } + + // Return the new Connection object + a.connectionCallback(BuildConnection(newPartner, a.connectionE2e, a.connectionParams)) } // Request will be called when an auth Request message is processed -func (c authCallback) Request(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { +func (a authCallback) Request(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { } // Reset will be called when an auth Reset operation occurs -func (c authCallback) Reset(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { +func (a authCallback) Reset(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { }