Skip to content
Snippets Groups Projects
Commit d537913a authored by Jake Taylor's avatar Jake Taylor
Browse files

restructure waitforconnections into a callback registration

parent ec73bc5b
No related branches found
No related tags found
4 merge requests!510Release,!207WIP: Client Restructure,!206Built the connection interface which wraps e2e and auth objects and manages a...,!203Symmetric broadcast
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
package connect package connect
import ( import (
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/auth" "gitlab.com/elixxir/client/auth"
"gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/catalog"
...@@ -88,7 +89,11 @@ func Connect(recipient contact.Contact, myId *id.ID, rng *fastRNG.StreamGenerato ...@@ -88,7 +89,11 @@ func Connect(recipient contact.Contact, myId *id.ID, rng *fastRNG.StreamGenerato
} }
// Build callback for E2E negotiation // Build callback for E2E negotiation
callback := getAuthCallback() signalChannel := make(chan Connection, 1)
cb := func(connection Connection) {
signalChannel <- connection
}
callback := getAuthCallback(cb, e2eHandler, p)
// Build auth object for E2E negotiation // Build auth object for E2E negotiation
authState, err := auth.NewState(kv, net, e2eHandler, authState, err := auth.NewState(kv, net, e2eHandler,
...@@ -105,22 +110,22 @@ func Connect(recipient contact.Contact, myId *id.ID, rng *fastRNG.StreamGenerato ...@@ -105,22 +110,22 @@ func Connect(recipient contact.Contact, myId *id.ID, rng *fastRNG.StreamGenerato
// Block waiting for auth to confirm // Block waiting for auth to confirm
jww.DEBUG.Printf("Connection waiting for auth request for %s to be confirmed...", recipient.ID.String()) jww.DEBUG.Printf("Connection waiting for auth request for %s to be confirmed...", recipient.ID.String())
<-callback.confirmPartner newConnection := <-signalChannel
jww.DEBUG.Printf("Connection auth request for %s confirmed", recipient.ID.String())
// After confirmation, get the new partner // Verify the Connection is complete
newPartner, err := e2eHandler.GetPartner(recipient.ID) if newConnection == nil {
if err != nil { return nil, errors.Errorf("Unable to complete connection with partner %s", recipient.ID.String())
return nil, err
} }
jww.DEBUG.Printf("Connection auth request for %s confirmed", recipient.ID.String())
return BuildConnection(newPartner, e2eHandler, p), nil return newConnection, nil
} }
// WaitForConnections assembles a Connection object on the reception-side // RegisterConnectionCallback assembles a Connection object on the reception-side
// when an incoming request for an E2E partnership with a partner.Manager is confirmed. // and feeds it into the given Callback whenever an incoming request
func WaitForConnections(myId *id.ID, rng *fastRNG.StreamGenerator, // for an E2E partnership with a partner.Manager is confirmed.
grp *cyclic.Group, net cmix.Client, p Params) (Connection, error) { func RegisterConnectionCallback(cb Callback, myId *id.ID, rng *fastRNG.StreamGenerator,
grp *cyclic.Group, net cmix.Client, p Params) error {
// Build an ephemeral KV // Build an ephemeral KV
kv := versioned.NewKV(ekv.MakeMemstore()) kv := versioned.NewKV(ekv.MakeMemstore())
...@@ -128,32 +133,16 @@ func WaitForConnections(myId *id.ID, rng *fastRNG.StreamGenerator, ...@@ -128,32 +133,16 @@ func WaitForConnections(myId *id.ID, rng *fastRNG.StreamGenerator,
// Build E2e handler // Build E2e handler
e2eHandler, err := clientE2e.Load(kv, net, myId, grp, rng, p.event) e2eHandler, err := clientE2e.Load(kv, net, myId, grp, rng, p.event)
if err != nil { if err != nil {
return nil, err return err
} }
// Build callback for E2E negotiation // Build callback for E2E negotiation
callback := getAuthCallback() callback := getAuthCallback(cb, e2eHandler, p)
// Build auth object for E2E negotiation // Build auth object for E2E negotiation
_, err = auth.NewState(kv, net, e2eHandler, _, err = auth.NewState(kv, net, e2eHandler,
rng, p.event, p.auth, callback, nil) rng, p.event, p.auth, callback, nil)
if err != nil { return err
return nil, err
}
// Block waiting for incoming auth request
jww.DEBUG.Printf("Connection waiting for auth request to be received...")
newPartnerId := <-callback.confirmPartner
jww.DEBUG.Printf("Connection auth request for %s confirmed", newPartnerId.String())
// After confirmation, get the new partner
newPartner, err := e2eHandler.GetPartner(newPartnerId)
if err != nil {
return nil, err
}
// Return the new Connection object
return BuildConnection(newPartner, e2eHandler, p), nil
} }
// BuildConnection assembles a Connection object // BuildConnection assembles a Connection object
...@@ -198,26 +187,46 @@ func (h *handler) Unregister(listenerID receive.ListenerID) { ...@@ -198,26 +187,46 @@ func (h *handler) Unregister(listenerID receive.ListenerID) {
// and for dynamically building new Connection objects when an auth Request is received. // and for dynamically building new Connection objects when an auth Request is received.
type authCallback struct { type authCallback struct {
// Used for signaling confirmation of E2E partnership // Used for signaling confirmation of E2E partnership
confirmPartner chan *id.ID connectionCallback Callback
// Used for building new Connection objects
connectionE2e clientE2e.Handler
connectionParams Params
} }
//
type Callback func(connection Connection)
// getAuthCallback returns a callback interface to be passed into the creation of an auth.State object. // getAuthCallback returns a callback interface to be passed into the creation of an auth.State object.
func getAuthCallback() authCallback { func getAuthCallback(cb Callback, e2e clientE2e.Handler, params Params) authCallback {
return authCallback{ return authCallback{
confirmPartner: make(chan *id.ID, 10), connectionCallback: cb,
connectionE2e: e2e,
connectionParams: params,
} }
} }
// Confirm will be called when an auth Confirm message is processed // Confirm will be called when an auth Confirm message is processed
func (c authCallback) Confirm(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { func (a authCallback) Confirm(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) {
// Signal to a listening thread that the partnership is confirmed jww.DEBUG.Printf("Connection auth request for %s confirmed", requestor.ID.String())
c.confirmPartner <- requestor.ID
// After confirmation, get the new partner
newPartner, err := a.connectionE2e.GetPartner(requestor.ID)
if err != nil {
jww.ERROR.Printf("Unable to build connection with partner %s: %+v", requestor.ID, err)
// Send a nil connection to avoid hold-ups down the line
a.connectionCallback(nil)
return
}
// Return the new Connection object
a.connectionCallback(BuildConnection(newPartner, a.connectionE2e, a.connectionParams))
} }
// Request will be called when an auth Request message is processed // Request will be called when an auth Request message is processed
func (c authCallback) Request(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { func (a authCallback) Request(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) {
} }
// Reset will be called when an auth Reset operation occurs // Reset will be called when an auth Reset operation occurs
func (c authCallback) Reset(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) { func (a authCallback) Reset(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, round rounds.Round) {
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment