From 672fd570ecd1cb48de2f82d3bffb471939fe06de Mon Sep 17 00:00:00 2001 From: Bernardo Cardoso <bernardo@elixxir.io> Date: Thu, 11 Apr 2019 08:55:42 -0600 Subject: [PATCH] Refactor whole client architecture Now API or Bindings return a Client object, which should be used to call any of the functions. The Client object will hold the session, the messaging and the switchboard. This will enable us to have multi-client support, hopefully. --- api/client.go | 143 ++++++++++++++++++++++------------------- bindings/client.go | 68 ++++++++++---------- bindings/payment.go | 15 ++--- bots/userDiscovery.go | 23 ++++--- cmd/root.go | 62 ++++++++++-------- cmd/udb.go | 12 ++-- io/collate.go | 3 +- io/interface.go | 7 +- io/messaging.go | 111 ++++++++++++++++---------------- payment/wallet.go | 18 ++++-- payment/wallet_test.go | 2 +- user/session.go | 8 --- 12 files changed, 247 insertions(+), 225 deletions(-) diff --git a/api/client.go b/api/client.go index 6a1e4be6f..6750a8d22 100644 --- a/api/client.go +++ b/api/client.go @@ -29,6 +29,15 @@ import ( "time" ) +type Client struct { + sess user.Session + comm io.Communications + quitReceptionRunner chan bool + listeners *switchboard.Switchboard + // TODO Support more than one wallet per user? Maybe in v2 + wallet *payment.Wallet +} + // Populates a text message and returns its wire representation // TODO support multi-type messages or telling if a message is too long? func FormatTextMessage(message string) []byte { @@ -43,22 +52,27 @@ func FormatTextMessage(message string) []byte { // Initializes the client by registering a storage mechanism. // If none is provided, the system defaults to using OS file access -// returns in error if it fails -func InitClient(s globals.Storage, loc string) error { +// returns a new Client object, and an error if it fails +func InitClient(s globals.Storage, loc string) (*Client, error) { storageErr := globals.InitStorage(s, loc) if storageErr != nil { storageErr = errors.New( "could not init client storage: " + storageErr.Error()) - return storageErr + return nil, storageErr } - return nil + cl := new(Client) + cl.comm = io.NewMessenger() + cl.listeners = switchboard.NewSwitchboard() + // Initialize UDB stuff here + bots.InitUDB(cl.sess, cl.comm, cl.listeners) + return cl, nil } // Registers user and returns the User ID. // Returns an error if registration fails. -func Register(registrationCode string, gwAddr string, +func (cl *Client) Register(registrationCode string, gwAddr string, numNodes uint, mint bool, grp *cyclic.Group) (*id.User, error) { var err error @@ -103,7 +117,7 @@ func Register(registrationCode string, gwAddr string, nus := user.NewSession(u, gwAddr, nk, grp.NewIntFromBytes([]byte("this is not a real public key")), grp) - _, err = payment.CreateWallet(nus, mint) + _, err = payment.CreateWallet(nus, io.NewMessenger(), mint) if err != nil { return id.ZeroID, err } @@ -126,28 +140,26 @@ func Register(registrationCode string, gwAddr string, return UID, err } -var quitReceptionRunner chan bool - -// Logs in user and returns their nickname. -// returns an empty sting if login fails. -func Login(UID *id.User, addr string, tlsCert string) (user.Session, error) { +// Logs in user and sets session on client object +// returns an error if login fails +func (cl *Client) Login(UID *id.User, addr string, tlsCert string) (string, error) { connect.GatewayCertString = tlsCert session, err := user.LoadSession(UID) if session == nil { - return nil, errors.New("Unable to load session: " + err.Error() + + return "", errors.New("Unable to load session: " + err.Error() + fmt.Sprintf("Passed parameters: %q, %s, %q", *UID, addr, tlsCert)) } - theWallet, err = payment.CreateWallet(session, false) + cl.wallet, err = payment.CreateWallet(session, cl.comm,false) if err != nil { err = fmt.Errorf("Login: Couldn't create wallet: %s", err.Error()) globals.Log.ERROR.Printf(err.Error()) - return nil, err + return "", err } - theWallet.RegisterListeners() + cl.wallet.RegisterListeners(cl.listeners) if addr != "" { session.SetGWAddress(addr) @@ -157,65 +169,69 @@ func Login(UID *id.User, addr string, tlsCert string) (user.Session, error) { // TODO: These can be separate, but we set them to the same thing // until registration is completed. - io.SendAddress = addrToUse - io.ReceiveAddress = addrToUse + (cl.comm).(*io.Messaging).SendAddress = addrToUse + (cl.comm).(*io.Messaging).ReceiveAddress = addrToUse if err != nil { err = errors.New(fmt.Sprintf("Login: Could not login: %s", err.Error())) globals.Log.ERROR.Printf(err.Error()) - return nil, err + return "", err } - user.TheSession = session + cl.sess = session pollWaitTimeMillis := 1000 * time.Millisecond - quitReceptionRunner = make(chan bool) + cl.quitReceptionRunner = make(chan bool) // TODO Don't start the message receiver if it's already started. // Should be a pretty rare occurrence except perhaps for mobile. - go io.Messaging.MessageReceiver(pollWaitTimeMillis, quitReceptionRunner) + go cl.comm.MessageReceiver(session, cl.listeners, + pollWaitTimeMillis, cl.quitReceptionRunner) - return session, nil + return session.GetCurrentUser().Nick, nil } // Send prepares and sends a message to the cMix network // FIXME: We need to think through the message interface part. -func Send(message parse.MessageInterface) error { +func (cl *Client) Send(message parse.MessageInterface) error { // FIXME: There should (at least) be a version of this that takes a byte array recipientID := message.GetRecipient() - err := io.Messaging.SendMessage(recipientID, message.Pack()) + err := cl.comm.SendMessage(cl.sess, recipientID, message.Pack()) return err } // DisableBlockingTransmission turns off blocking transmission, for // use with the channel bot and dummy bot -func DisableBlockingTransmission() { - io.BlockTransmissions = false +func (cl *Client) DisableBlockingTransmission() { + (cl.comm).(*io.Messaging).BlockTransmissions = false } // SetRateLimiting sets the minimum amount of time between message // transmissions just for testing, probably to be removed in production -func SetRateLimiting(limit uint32) { - io.TransmitDelay = time.Duration(limit) * time.Millisecond +func (cl *Client) SetRateLimiting(limit uint32) { + (cl.comm).(*io.Messaging).TransmitDelay = time.Duration(limit) * time.Millisecond } -func Listen(user *id.User, outerType format.CryptoType, - messageType int32, newListener switchboard.Listener, callbacks *switchboard. - Switchboard) string { - listenerId := callbacks.Register(user, outerType, messageType, newListener) +func (cl *Client) Listen(user *id.User, outerType format.CryptoType, + messageType int32, newListener switchboard.Listener) string { + listenerId := cl.listeners.Register(user, outerType, messageType, newListener) globals.Log.INFO.Printf("Listening now: user %v, message type %v, id %v", user, messageType, listenerId) return listenerId } -func StopListening(listenerHandle string, callbacks *switchboard.Switchboard) { - callbacks.Unregister(listenerHandle) +func (cl *Client) StopListening(listenerHandle string) { + cl.listeners.Unregister(listenerHandle) +} + +func (cl *Client) GetSwitchboard() *switchboard.Switchboard { + return cl.listeners } type APISender struct{} func (s APISender) Send(messageInterface parse.MessageInterface) { - Send(messageInterface) + //Send(messageInterface) } type Sender interface { @@ -225,23 +241,26 @@ type Sender interface { // Logout closes the connection to the server at this time and does // nothing with the user id. In the future this will release resources // and safely release any sensitive memory. -func Logout() error { - if user.TheSession == nil { +func (cl *Client) Logout() error { + if cl.sess == nil { err := errors.New("Logout: Cannot Logout when you are not logged in") globals.Log.ERROR.Printf(err.Error()) return err } // Stop reception runner goroutine - quitReceptionRunner <- true + cl.quitReceptionRunner <- true // Disconnect from the gateway - io.Disconnect(io.SendAddress) - if io.SendAddress != io.ReceiveAddress { - io.Disconnect(io.ReceiveAddress) + io.Disconnect( + (cl.comm).(*io.Messaging).SendAddress) + if (cl.comm).(*io.Messaging).SendAddress != + (cl.comm).(*io.Messaging).ReceiveAddress { + io.Disconnect( + (cl.comm).(*io.Messaging).ReceiveAddress) } - errStore := user.TheSession.StoreSession() + errStore := cl.sess.StoreSession() // If a client is logging in again, the storage may need to go into a // different location // Currently, none of the storage abstractions need to do anything to @@ -256,7 +275,7 @@ func Logout() error { return err } - errImmolate := user.TheSession.Immolate() + errImmolate := cl.sess.Immolate() if errImmolate != nil { err := errors.New(fmt.Sprintf("Logout: Immolation Failed: %s" + @@ -265,13 +284,10 @@ func Logout() error { return err } - // Reset listener structure - switchboard.Listeners = switchboard.NewSwitchboard() - return nil } -func RegisterForUserDiscovery(emailAddress string) error { +func (cl *Client) RegisterForUserDiscovery(emailAddress string) error { valueType := "EMAIL" userId, _, err := bots.Search(valueType, emailAddress) if userId != nil { @@ -282,22 +298,22 @@ func RegisterForUserDiscovery(emailAddress string) error { return err } - publicKey := user.TheSession.GetPublicKey() + publicKey := cl.sess.GetPublicKey() publicKeyBytes := publicKey.LeftpadBytes(256) return bots.Register(valueType, emailAddress, publicKeyBytes) } -func SearchForUser(emailAddress string) (*id.User, []byte, error) { +func (cl *Client) SearchForUser(emailAddress string) (*id.User, []byte, error) { valueType := "EMAIL" return bots.Search(valueType, emailAddress) } -func registerUserE2E(partnerID *id.User, +func (cl *Client) registerUserE2E(partnerID *id.User, ownPrivKey *cyclic.Int, partnerPubKey *cyclic.Int) { // Get needed variables from session - grp := user.TheSession.GetGroup() - userID := user.TheSession.GetCurrentUser().User + grp := cl.sess.GetGroup() + userID := cl.sess.GetCurrentUser().User // Generate baseKey baseKey, _ := diffieHellman.CreateDHSessionKey( @@ -319,7 +335,7 @@ func registerUserE2E(partnerID *id.User, km.GenerateKeys(grp, userID) // Add Key Manager to session - user.TheSession.AddKeyManager(km) + cl.sess.AddKeyManager(km) } //Message struct adherent to interface in bindings for data return from ParseMessage @@ -361,30 +377,27 @@ func ParseMessage(message []byte)(ParsedMessage,error){ return pm, nil } -// TODO Support more than one wallet per user? Maybe in v2 -var theWallet *payment.Wallet - -func Wallet() *payment.Wallet { - if theWallet == nil { +func (cl *Client) Wallet() *payment.Wallet { + if cl.wallet == nil { // Assume that the correct wallet is already stored in the session // (if necessary, minted during register) // So, if the wallet is nil, registration must have happened for this method to work var err error - theWallet, err = payment.CreateWallet(user.TheSession, false) - theWallet.RegisterListeners() + cl.wallet, err = payment.CreateWallet(cl.sess, cl.comm, false) + cl.wallet.RegisterListeners(cl.listeners) if err != nil { globals.Log.ERROR.Println("Wallet("+ "): Got an error creating the wallet.", err.Error()) } } - return theWallet + return cl.wallet +} + +func (cl *Client) GetSessionData() ([]byte, error) { + return cl.sess.GetSessionData() } // Set the output of the func SetLogOutput(w goio.Writer) { globals.Log.SetLogOutput(w) } - -func GetSessionData() ([]byte, error) { - return user.TheSession.GetSessionData() -} diff --git a/bindings/client.go b/bindings/client.go index 5c0b3c8a6..7a253223e 100644 --- a/bindings/client.go +++ b/bindings/client.go @@ -11,7 +11,6 @@ import ( "gitlab.com/elixxir/client/api" "gitlab.com/elixxir/client/globals" "gitlab.com/elixxir/client/parse" - "gitlab.com/elixxir/client/user" "gitlab.com/elixxir/crypto/certs" "gitlab.com/elixxir/crypto/cyclic" "gitlab.com/elixxir/primitives/format" @@ -20,6 +19,10 @@ import ( "io" ) +type Client struct { + client *api.Client +} + // Copy of the storage interface. // It is identical to the interface used in Globals, // and a results the types can be passed freely between the two @@ -65,20 +68,19 @@ type Listener interface { // messages sent from all users. // If you pass the zero type (just zero) to Listen() you will hear messages of // all types. -func Listen(userId []byte, messageType int32, newListener Listener) string { +func (cl *Client) Listen(userId []byte, messageType int32, newListener Listener) string { typedUserId := new(id.User).SetBytes(userId) listener := &listenerProxy{proxy: newListener} - return api.Listen(typedUserId, format.None, messageType, - listener, switchboard.Listeners) + return cl.client.Listen(typedUserId, format.None, messageType, listener) } // Returns a parsed message // Pass the listener handle that Listen() returned to delete the listener -func StopListening(listenerHandle string) { - api.StopListening(listenerHandle, switchboard.Listeners) +func (cl *Client) StopListening(listenerHandle string) { + cl.client.StopListening(listenerHandle) } func FormatTextMessage(message string) []byte { @@ -98,15 +100,15 @@ func FormatTextMessage(message string) []byte { // loc is a string. If you're using DefaultStorage for your storage, // this would be the filename of the file that you're storing the user // session in. -func InitClient(storage Storage, loc string) error { +func InitClient(storage Storage, loc string) (*Client, error) { if storage == nil { - return errors.New("could not init client: Storage was nil") + return nil, errors.New("could not init client: Storage was nil") } proxy := &storageProxy{boundStorage: storage} - err := api.InitClient(globals.Storage(proxy), loc) + cl, err := api.InitClient(globals.Storage(proxy), loc) - return err + return &Client{client: cl}, err } // Registers user and returns the User ID. Returns null if registration fails. @@ -145,7 +147,7 @@ func InitClient(storage Storage, loc string) error { // 10 // “Jono” // OIF3OJ5I -func Register(registrationCode string, gwAddr string, numNodes int, +func (cl *Client) Register(registrationCode string, gwAddr string, numNodes int, mint bool, grpJSON string) ([]byte, error) { if numNodes < 1 { @@ -159,7 +161,7 @@ func Register(registrationCode string, gwAddr string, numNodes int, return id.ZeroID[:], err } - UID, err := api.Register(registrationCode, gwAddr, uint(numNodes), mint, &grp) + UID, err := cl.client.Register(registrationCode, gwAddr, uint(numNodes), mint, &grp) if err != nil { return id.ZeroID[:], err @@ -176,30 +178,26 @@ func Register(registrationCode string, gwAddr string, numNodes int, // certificate and it's in the crypto repository. So, if you leave the TLS // certificate string empty, the bindings will use that certificate. We probably // need to rethink this. In other words, tlsCert is optional. -func Login(UID []byte, addr string, tlsCert string) (string, error) { +func (cl *Client) Login(UID []byte, addr string, tlsCert string) (string, error) { userID := new(id.User).SetBytes(UID) var err error - var session user.Session + var nick string if tlsCert == "" { - session, err = api.Login(userID, addr, certs.GatewayTLS) - } else { - session, err = api.Login(userID, addr, tlsCert) - } - if err != nil || session == nil { - return "", err + nick, err = cl.client.Login(userID, addr, certs.GatewayTLS) } else { - return session.GetCurrentUser().Nick, err + nick, err = cl.client.Login(userID, addr, tlsCert) } + return nick, err } //Sends a message structured via the message interface // Automatically serializes the message type before the rest of the payload // Returns an error if either sender or recipient are too short -func Send(m Message) error { +func (cl *Client) Send(m Message) error { sender := new(id.User).SetBytes(m.GetSender()) recipient := new(id.User).SetBytes(m.GetRecipient()) - return api.Send(&parse.Message{ + return cl.client.Send(&parse.Message{ TypedBody: parse.TypedBody{ MessageType: m.GetMessageType(), Body: m.GetPayload(), @@ -212,24 +210,24 @@ func Send(m Message) error { // Logs the user out, saving the state for the system and clearing all data // from RAM -func Logout() error { - return api.Logout() +func (cl *Client) Logout() error { + return cl.client.Logout() } // Turns off blocking transmission so multiple messages can be sent // simultaneously -func DisableBlockingTransmission() { - api.DisableBlockingTransmission() +func (cl *Client) DisableBlockingTransmission() { + cl.client.DisableBlockingTransmission() } // Sets the minimum amount of time, in ms, between message transmissions // Just for testing, probably to be removed in production -func SetRateLimiting(limit int) { - api.SetRateLimiting(uint32(limit)) +func (cl *Client) SetRateLimiting(limit int) { + cl.client.SetRateLimiting(uint32(limit)) } -func RegisterForUserDiscovery(emailAddress string) error { - return api.RegisterForUserDiscovery(emailAddress) +func (cl *Client) RegisterForUserDiscovery(emailAddress string) error { + return cl.client.RegisterForUserDiscovery(emailAddress) } type SearchResult struct { @@ -237,8 +235,8 @@ type SearchResult struct { PublicKey []byte } -func SearchForUser(emailAddress string) (*SearchResult, error) { - searchedUser, key, err := api.SearchForUser(emailAddress) +func (cl *Client) SearchForUser(emailAddress string) (*SearchResult, error) { + searchedUser, key, err := cl.client.SearchForUser(emailAddress) if err != nil { return nil, err } else { @@ -292,6 +290,6 @@ func SetLogOutput(w Writer) { } // Call this to get the session data without getting Save called from the Go side -func GetSessionData() ([]byte, error) { - return api.GetSessionData() +func (cl *Client) GetSessionData() ([]byte, error) { + return cl.client.GetSessionData() } diff --git a/bindings/payment.go b/bindings/payment.go index a2db5b0ad..4b243cea1 100644 --- a/bindings/payment.go +++ b/bindings/payment.go @@ -9,18 +9,17 @@ package bindings import ( "errors" "fmt" - "gitlab.com/elixxir/client/api" "gitlab.com/elixxir/client/cmixproto" "gitlab.com/elixxir/client/parse" "gitlab.com/elixxir/client/payment" - "gitlab.com/elixxir/primitives/id" "gitlab.com/elixxir/primitives/format" + "gitlab.com/elixxir/primitives/id" ) // Currently there's only one wallet that you can get // There may be many in the future -func GetActiveWallet() *Wallet { - return &Wallet{wallet: api.Wallet()} +func (cl *Client) GetActiveWallet() *Wallet { + return &Wallet{wallet: cl.client.Wallet()} } func (w *Wallet) Listen(userId []byte, outerType format.CryptoType, @@ -29,17 +28,17 @@ func (w *Wallet) Listen(userId []byte, outerType format.CryptoType, listener := &listenerProxy{proxy: newListener} - return api.Listen(typedUserId, outerType, innerType, - listener, w.wallet.GetSwitchboard()) + return w.wallet.GetSwitchboard(). + Register(typedUserId, outerType, innerType, listener) } func (w *Wallet) StopListening(listenerHandle string) { - api.StopListening(listenerHandle, w.wallet.GetSwitchboard()) + w.wallet.GetSwitchboard().Unregister(listenerHandle) } // Returns the currently available balance in the wallet func (w *Wallet) GetAvailableFunds() int64 { - return int64(api.Wallet().GetAvailableFunds()) + return int64(w.wallet.GetAvailableFunds()) } // Payer: user ID, 256 bits diff --git a/bots/userDiscovery.go b/bots/userDiscovery.go index ce4013d01..4b7251aa7 100644 --- a/bots/userDiscovery.go +++ b/bots/userDiscovery.go @@ -14,6 +14,7 @@ import ( "gitlab.com/elixxir/client/globals" "gitlab.com/elixxir/client/io" "gitlab.com/elixxir/client/parse" + "gitlab.com/elixxir/client/user" "gitlab.com/elixxir/primitives/switchboard" "gitlab.com/elixxir/crypto/hash" "gitlab.com/elixxir/primitives/id" @@ -21,11 +22,14 @@ import ( "gitlab.com/elixxir/primitives/format" ) -// UdbID is the ID of the user discovery bot, which is always 13 +// UdbID is the ID of the user discovery bot, which is always 3 var UdbID *id.User type udbResponseListener chan string +var session user.Session +var messaging io.Communications + var pushKeyResponseListener udbResponseListener var getKeyResponseListener udbResponseListener var registerResponseListener udbResponseListener @@ -36,8 +40,8 @@ func (l *udbResponseListener) Hear(msg switchboard.Item, isHeardElsewhere bool) *l <- string(m.Body) } -// The go runtime calls init() before calling any methods in the package -func init() { +// InitUDB is called internally by the InitClient API +func InitUDB(s user.Session,m io.Communications, l *switchboard.Switchboard) { UdbID = new(id.User).SetUints(&[4]uint64{0, 0, 0, 3}) pushKeyResponseListener = make(udbResponseListener) @@ -45,16 +49,19 @@ func init() { registerResponseListener = make(udbResponseListener) searchResponseListener = make(udbResponseListener) - switchboard.Listeners.Register(UdbID, + session = s + messaging = m + + l.Register(UdbID, format.None, int32(cmixproto.Type_UDB_PUSH_KEY_RESPONSE), &pushKeyResponseListener) - switchboard.Listeners.Register(UdbID, + l.Register(UdbID, format.None, int32(cmixproto.Type_UDB_GET_KEY_RESPONSE), &getKeyResponseListener) - switchboard.Listeners.Register(UdbID, + l.Register(UdbID, format.None, int32(cmixproto.Type_UDB_REGISTER_RESPONSE), ®isterResponseListener) - switchboard.Listeners.Register(UdbID, + l.Register(UdbID, format.None, int32(cmixproto.Type_UDB_SEARCH_RESPONSE), &searchResponseListener) } @@ -224,5 +231,5 @@ func fingerprint(publicKey []byte) string { // Callers that need to wait on a response should implement waiting with a // listener. func sendCommand(botID *id.User, command []byte) error { - return io.Messaging.SendMessage(botID, command) + return messaging.SendMessage(session, botID, command) } diff --git a/cmd/root.go b/cmd/root.go index 09f44335b..3e237639d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -22,6 +22,7 @@ import ( "gitlab.com/elixxir/client/parse" "gitlab.com/elixxir/client/user" "gitlab.com/elixxir/comms/connect" + "gitlab.com/elixxir/crypto/cyclic" "gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/id" "gitlab.com/elixxir/primitives/switchboard" @@ -46,6 +47,7 @@ var mint bool var rateLimiting uint32 var showVer bool var certPath string +var client *api.Client // Execute adds all child commands to the root command and sets flags // appropriately. This is called by main.main(). It only needs to @@ -58,18 +60,12 @@ func Execute() { } func sessionInitialization() { - if noBlockingTransmission { - api.DisableBlockingTransmission() - } - - bindings.SetRateLimiting(int(rateLimiting)) - var err error register := false //If no session file is passed initialize with RAM Storage if sessionFile == "" { - err = bindings.InitClient(&globals.RamStorage{}, "") + client, err = api.InitClient(&globals.RamStorage{}, "") if err != nil { fmt.Printf("Could Not Initialize Ram Storage: %s\n", err.Error()) @@ -92,7 +88,7 @@ func sessionInitialization() { } //Initialize client with OS Storage - err = bindings.InitClient(&globals.DefaultStorage{}, sessionFile) + client, err = api.InitClient(&globals.DefaultStorage{}, sessionFile) if err != nil { fmt.Printf("Could Not Initialize OS Storage: %s\n", err.Error()) @@ -100,6 +96,12 @@ func sessionInitialization() { } } + if noBlockingTransmission { + client.DisableBlockingTransmission() + } + + client.SetRateLimiting(rateLimiting) + // Handle parsing gateway addresses from the config file gateways := viper.GetStringSlice("gateways") if gwAddr == "" { @@ -123,8 +125,15 @@ func sessionInitialization() { // 64 bits grpJSON := viper.GetString("group") + // Unmarshal group JSON + var grp cyclic.Group + err := grp.UnmarshalJSON([]byte(grpJSON)) + if err != nil { + return + } + regCode := new(id.User).SetUints(&[4]uint64{0, 0, 0, userId}).RegistrationCode() - _, err = bindings.Register(regCode, gwAddr, int(numNodes), mint, grpJSON) + _, err = client.Register(regCode, gwAddr, numNodes, mint, &grp) if err != nil { fmt.Printf("Could Not Register User: %s\n", err.Error()) return @@ -133,7 +142,7 @@ func sessionInitialization() { // Log the user in uid := id.NewUserFromUint(userId, nil) - _, err = bindings.Login(uid[:], gwAddr, "") + _, err = client.Login(uid, gwAddr, "") if err != nil { fmt.Printf("Could Not Log In\n") @@ -211,7 +220,7 @@ func (l *ChannelListener) Hear(item switchboard.Item, isHeardElsewhere bool) { new(big.Int).SetBytes(message.Sender[:]).Text(10), senderNick) typedBody, _ := parse.Parse(result.Message) speakerId := new(id.User).SetBytes(result.SpeakerID) - switchboard.Listeners.Speak(&parse.Message{ + client.GetSwitchboard().Speak(&parse.Message{ TypedBody: *typedBody, Sender: speakerId, Receiver: id.ZeroID, @@ -241,21 +250,21 @@ var rootCmd = &cobra.Command{ // Set the GatewayCertPath explicitly to avoid data races SetCertPath(certPath) + sessionInitialization() // Set up the listeners for both of the types the client needs for // the integration test // Normal text messages text := TextListener{} - api.Listen(id.ZeroID, format.None, int32(cmixproto.Type_TEXT_MESSAGE), - &text, switchboard.Listeners) + client.Listen(id.ZeroID, format.None, int32(cmixproto.Type_TEXT_MESSAGE), + &text) // Channel messages channel := ChannelListener{} - api.Listen(id.ZeroID, format.None, - int32(cmixproto.Type_CHANNEL_MESSAGE), &channel, - switchboard.Listeners) + client.Listen(id.ZeroID, format.None, + int32(cmixproto.Type_CHANNEL_MESSAGE), &channel) // All other messages fallback := FallbackListener{} - api.Listen(id.ZeroID, format.None, int32(cmixproto.Type_NO_TYPE), - &fallback, switchboard.Listeners) + client.Listen(id.ZeroID, format.None, int32(cmixproto.Type_NO_TYPE), + &fallback) // Do calculation for dummy messages if the flag is set if dummyFrequency != 0 { @@ -263,8 +272,6 @@ var rootCmd = &cobra.Command{ (time.Duration(float64(1000000000) * (float64(1.0) / dummyFrequency))) } - sessionInitialization() - // Only send a message if we have a message to send (except dummy messages) recipientId := new(id.User).SetUints(&[4]uint64{0, 0, 0, destinationUserId}) senderId := new(id.User).SetUints(&[4]uint64{0, 0, 0, userId}) @@ -278,8 +285,7 @@ var rootCmd = &cobra.Command{ // Handle sending to UDB if *recipientId == *bots.UdbID { - grp := user.TheSession.GetGroup() - fmt.Println(parseUdbMessage(message, grp)) + fmt.Println(parseUdbMessage(message, client)) } else { // Handle sending to any other destination wireOut := bindings.FormatTextMessage(message) @@ -288,7 +294,7 @@ var rootCmd = &cobra.Command{ recipientNick, message) // Send the message - bindings.Send(&parse.BindingsMessageProxy{&parse.Message{ + client.Send(&parse.Message{ Sender: senderId, TypedBody: parse.TypedBody{ MessageType: int32(cmixproto.Type_TEXT_MESSAGE), @@ -296,7 +302,7 @@ var rootCmd = &cobra.Command{ }, CryptoType: format.Unencrypted, Receiver: recipientId, - }}) + }) } } @@ -317,15 +323,15 @@ var rootCmd = &cobra.Command{ fmt.Printf("Sending Message to %d, %v: %s\n", destinationUserId, contact, message) - message := &parse.BindingsMessageProxy{&parse.Message{ + message := &parse.Message{ Sender: senderId, TypedBody: parse.TypedBody{ MessageType: int32(cmixproto.Type_TEXT_MESSAGE), Body: bindings.FormatTextMessage(message), }, CryptoType: format.Unencrypted, - Receiver: recipientId}} - bindings.Send(message) + Receiver: recipientId} + client.Send(message) timer = time.NewTimer(dummyPeriod) } @@ -343,7 +349,7 @@ var rootCmd = &cobra.Command{ } //Logout - err := bindings.Logout() + err := client.Logout() if err != nil { fmt.Printf("Could not logout: %s\n", err.Error()) diff --git a/cmd/udb.go b/cmd/udb.go index bfe0596a1..2d638c6b2 100644 --- a/cmd/udb.go +++ b/cmd/udb.go @@ -9,13 +9,12 @@ package cmd import ( "fmt" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/bindings" - "gitlab.com/elixxir/crypto/cyclic" + "gitlab.com/elixxir/client/api" "strings" ) // Determines what UDB send function to call based on the text in the message -func parseUdbMessage(msg string, grp *cyclic.Group) string { +func parseUdbMessage(msg string, client *api.Client) string { // Split the message on spaces args := strings.Fields(msg) if len(args) < 3 { @@ -27,16 +26,15 @@ func parseUdbMessage(msg string, grp *cyclic.Group) string { keyword := args[0] // Case-insensitive match the keyword to a command if strings.EqualFold(keyword, "SEARCH") { - result, err := bindings.SearchForUser(args[2]) + userID, pubKey, err := client.SearchForUser(args[2]) if err != nil { return fmt.Sprintf("UDB search failed: %v", err.Error()) } else { - userIdText := grp.NewIntFromBytes(result.ResultID).Text(10) return fmt.Sprintf("UDB search successful. Returned user %v, "+ - "public key %q", userIdText, result.PublicKey) + "public key %q", *userID, pubKey) } } else if strings.EqualFold(keyword, "REGISTER") { - err := bindings.RegisterForUserDiscovery(args[2]) + err := client.RegisterForUserDiscovery(args[2]) if err != nil { return fmt.Sprintf("UDB registration failed: %v", err.Error()) } else { diff --git a/io/collate.go b/io/collate.go index 934519e10..158c26961 100644 --- a/io/collate.go +++ b/io/collate.go @@ -11,7 +11,6 @@ import ( "fmt" "gitlab.com/elixxir/client/globals" "gitlab.com/elixxir/client/parse" - "gitlab.com/elixxir/client/user" "gitlab.com/elixxir/primitives/format" "sync" "time" @@ -75,7 +74,7 @@ func (mb *collator) AddMessage(message *format.Message, TypedBody: *typedBody, CryptoType: format.Unencrypted, Sender: sender, - Receiver: user.TheSession.GetCurrentUser().User, + Receiver: recipient, } return &msg diff --git a/io/interface.go b/io/interface.go index fe8b169a7..b04aecf3d 100644 --- a/io/interface.go +++ b/io/interface.go @@ -8,14 +8,17 @@ package io import ( + "gitlab.com/elixxir/client/user" "gitlab.com/elixxir/primitives/id" + "gitlab.com/elixxir/primitives/switchboard" "time" ) // Communication interface implements send/receive functionality with the server type Communications interface { // SendMessage to the server - SendMessage(recipientID *id.User, message []byte) error + SendMessage(session user.Session, recipientID *id.User, message []byte) error // MessageReceiver thread to get new messages - MessageReceiver(delay time.Duration, quit chan bool) + MessageReceiver(session user.Session, sw *switchboard.Switchboard, + delay time.Duration, quit chan bool) } diff --git a/io/messaging.go b/io/messaging.go index 8ced2ef77..aaf786b1d 100644 --- a/io/messaging.go +++ b/io/messaging.go @@ -28,38 +28,40 @@ import ( "time" ) -type messaging struct { +// Messaging implements the Communications interface +type Messaging struct { nextId func() []byte + // SendAddress is the address of the server to send messages + SendAddress string + // ReceiveAddress is the address of the server to receive messages from + ReceiveAddress string + // BlockTransmissions will use a mutex to prevent multiple threads from sending + // messages at the same time. + BlockTransmissions bool + // TransmitDelay is the minimum delay between transmissions. + TransmitDelay time.Duration + // Map that holds a record of the messages that this client successfully + // received during this session + ReceivedMessages map[string]struct{} + sendLock sync.Mutex } -// Messaging implements the Communications interface -var Messaging Communications = &messaging{nextId: parse.IDCounter()} - -// SendAddress is the address of the server to send messages -var SendAddress string - -// ReceiveAddress is the address of the server to receive messages from -var ReceiveAddress string - -// BlockTransmissions will use a mutex to prevent multiple threads from sending -// messages at the same time. -var BlockTransmissions = true - -// TransmitDelay is the minimum delay between transmissions. -var TransmitDelay = 1000 * time.Millisecond - -// Map that holds a record of the messages that this client successfully -// received during this session -var ReceivedMessages map[string]struct{} - -var sendLock sync.Mutex +func NewMessenger() *Messaging { + return &Messaging{ + nextId: parse.IDCounter(), + BlockTransmissions: true, + TransmitDelay: 1000 * time.Millisecond, + ReceivedMessages: make(map[string]struct{}), + } +} // SendMessage to the provided Recipient // TODO: It's not clear why we wouldn't hand off a sender object (with // the keys) here. I won't touch crypto at this time, though... // TODO This method would be cleaner if it took a parse.Message (particularly // w.r.t. generating message IDs for multi-part messages.) -func (m *messaging) SendMessage(recipientID *id.User, +func (m *Messaging) SendMessage(session user.Session, + recipientID *id.User, message []byte) error { // FIXME: We should really bring the plaintext parts of the NewMessage logic // into this module, then have an EncryptedMessage type that is sent to/from @@ -70,8 +72,8 @@ func (m *messaging) SendMessage(recipientID *id.User, // TBD: Is there a really good reason why we'd ever have more than one user // in this library? why not pass a sender object instead? globals.Log.DEBUG.Printf("Sending message to %q: %q", *recipientID, message) - userID := user.TheSession.GetCurrentUser().User - grp := user.TheSession.GetGroup() + userID := session.GetCurrentUser().User + grp := session.GetGroup() parts, err := parse.Partition([]byte(message), m.nextId()) if err != nil { @@ -101,7 +103,7 @@ func (m *messaging) SendMessage(recipientID *id.User, // The timestamp will be encrypted later message.SetTimestamp(nowBytes) message.SetPayloadData(parts[i]) - err = send(userID, message, grp) + err = m.send(session, userID, message, grp) if err != nil { return fmt.Errorf("SendMessage send() error: %v", err.Error()) } @@ -110,13 +112,14 @@ func (m *messaging) SendMessage(recipientID *id.User, } // send actually sends the message to the server -func send(senderID *id.User, message *format.Message, grp *cyclic.Group) error { +func (m *Messaging) send(session user.Session, + senderID *id.User, message *format.Message, grp *cyclic.Group) error { // Enable transmission blocking if enabled - if BlockTransmissions { - sendLock.Lock() + if m.BlockTransmissions { + m.sendLock.Lock() defer func() { - time.Sleep(TransmitDelay) - sendLock.Unlock() + time.Sleep(m.TransmitDelay) + m.sendLock.Unlock() }() } @@ -127,7 +130,7 @@ func send(senderID *id.User, message *format.Message, grp *cyclic.Group) error { // Generate a compound encryption key encryptionKey := grp.NewInt(1) - for _, key := range user.TheSession.GetKeys() { + for _, key := range session.GetKeys() { baseKey := key.TransmissionKeys.Base partialEncryptionKey := cmix.NewEncryptionKey(salt, baseKey, grp) grp.Mul(encryptionKey, partialEncryptionKey, encryptionKey) @@ -150,7 +153,7 @@ func send(senderID *id.User, message *format.Message, grp *cyclic.Group) error { var err error globals.Log.INFO.Println("Sending put message to gateway") - err = client.SendPutMessage(SendAddress, msgPacket) + err = client.SendPutMessage(m.SendAddress, msgPacket) return err } @@ -160,13 +163,15 @@ func send(senderID *id.User, message *format.Message, grp *cyclic.Group) error { // list for the listeners? // Accessing all of these global variables is extremely problematic for this // kind of thread. -func (m *messaging) MessageReceiver(delay time.Duration, quit chan bool) { +func (m *Messaging) MessageReceiver(session user.Session, + sw *switchboard.Switchboard, + delay time.Duration, quit chan bool) { // FIXME: It's not clear we should be doing decryption here. - if user.TheSession == nil { + if session == nil { globals.Log.FATAL.Panicf("No user session available") } pollingMessage := pb.ClientPollMessage{ - UserID: user.TheSession.GetCurrentUser().User.Bytes(), + UserID: session.GetCurrentUser().User.Bytes(), } for { @@ -177,14 +182,14 @@ func (m *messaging) MessageReceiver(delay time.Duration, quit chan bool) { default: time.Sleep(delay) globals.Log.INFO.Printf("Attempting to receive message from gateway") - decryptedMessages := m.receiveMessagesFromGateway(&pollingMessage) + decryptedMessages := m.receiveMessagesFromGateway(session, &pollingMessage) if decryptedMessages != nil { for i := range decryptedMessages { assembledMessage := GetCollator().AddMessage( decryptedMessages[i], time.Minute) if assembledMessage != nil { // we got a fully assembled message. let's broadcast it - broadcastMessageReception(assembledMessage, switchboard.Listeners) + broadcastMessageReception(assembledMessage, sw) } } } @@ -192,11 +197,11 @@ func (m *messaging) MessageReceiver(delay time.Duration, quit chan bool) { } } -func (m *messaging) receiveMessagesFromGateway( +func (m *Messaging) receiveMessagesFromGateway(session user.Session, pollingMessage *pb.ClientPollMessage) []*format.Message { - if user.TheSession != nil { - pollingMessage.MessageID = user.TheSession.GetLastMessageID() - messages, err := client.SendCheckMessages(user.TheSession.GetGWAddress(), + if session != nil { + pollingMessage.MessageID = session.GetLastMessageID() + messages, err := client.SendCheckMessages(session.GetGWAddress(), pollingMessage) if err != nil { @@ -206,24 +211,20 @@ func (m *messaging) receiveMessagesFromGateway( globals.Log.INFO.Printf("Checking novelty of %v messages", len(messages.MessageIDs)) - if ReceivedMessages == nil { - ReceivedMessages = make(map[string]struct{}) - } - results := make([]*format.Message, 0, len(messages.MessageIDs)) - grp := user.TheSession.GetGroup() + grp := session.GetGroup() for _, messageID := range messages.MessageIDs { // Get the first unseen message from the list of IDs - _, received := ReceivedMessages[messageID] + _, received := m.ReceivedMessages[messageID] if !received { globals.Log.INFO.Printf("Got a message waiting on the gateway: %v", messageID) // We haven't seen this message before. // So, we should retrieve it from the gateway. - newMessage, err := client.SendGetMessage(user. - TheSession.GetGWAddress(), + newMessage, err := client.SendGetMessage( + session.GetGWAddress(), &pb.ClientPollMessage{ - UserID: user.TheSession.GetCurrentUser().User. + UserID: session.GetCurrentUser().User. Bytes(), MessageID: messageID, }) @@ -241,7 +242,7 @@ func (m *messaging) receiveMessagesFromGateway( // Generate a compound decryption key salt := newMessage.Salt decryptionKey := grp.NewInt(1) - for _, key := range user.TheSession.GetKeys() { + for _, key := range session.GetKeys() { baseKey := key.ReceptionKeys.Base partialDecryptionKey := cmix.NewDecryptionKey(salt, baseKey, grp) @@ -251,9 +252,9 @@ func (m *messaging) receiveMessagesFromGateway( globals.Log.INFO.Printf( "Adding message ID %v to received message IDs", messageID) - ReceivedMessages[messageID] = struct{}{} - user.TheSession.SetLastMessageID(messageID) - user.TheSession.StoreSession() + m.ReceivedMessages[messageID] = struct{}{} + session.SetLastMessageID(messageID) + session.StoreSession() decryptedMsg, err2 := crypto.Decrypt(decryptionKey, grp, newMessage) diff --git a/payment/wallet.go b/payment/wallet.go index eedd7762b..e3e485f21 100644 --- a/payment/wallet.go +++ b/payment/wallet.go @@ -44,6 +44,7 @@ type Wallet struct { completedOutboundPayments *TransactionList session user.Session + comm io.Communications // Listen to this switchboard to get UI messages from the wallet. // This includes the types PAYMENT_INVOICE_UI, PAYMENT_RESPONSE, and @@ -81,7 +82,9 @@ func (w *Wallet) GetSwitchboard() *switchboard.Switchboard { // If you want the wallet to be able to receive messages you must register its // listeners // api.Login does this, while api.Register (during minting) does not -func CreateWallet(s user.Session, doMint bool) (*Wallet, error) { +func CreateWallet(s user.Session, + comm io.Communications, + doMint bool) (*Wallet, error) { cs, err := CreateOrderedStorage(CoinStorageTag, s) @@ -136,6 +139,7 @@ func CreateWallet(s user.Session, doMint bool) (*Wallet, error) { completedInboundPayments: ip, completedOutboundPayments: op, session: s, + comm: comm, switchboard: sb, } @@ -145,18 +149,19 @@ func CreateWallet(s user.Session, doMint bool) (*Wallet, error) { // You need to call this method after creating the wallet to have the wallet // behave correctly when receiving messages // TODO: Should this take the listeners as parameters? -func (w *Wallet) RegisterListeners() { - switchboard.Listeners.Register(id.ZeroID, +func (w *Wallet) RegisterListeners( + listeners *switchboard.Switchboard) { + listeners.Register(id.ZeroID, format.None, int32(cmixproto.Type_PAYMENT_INVOICE), &InvoiceListener{ wallet: w, }) - switchboard.Listeners.Register(getPaymentBotID(), + listeners.Register(getPaymentBotID(), format.None, int32(cmixproto.Type_PAYMENT_RESPONSE), &ResponseListener{ wallet: w, }) - switchboard.Listeners.Register(id.ZeroID, + listeners.Register(id.ZeroID, format.None, int32(cmixproto.Type_PAYMENT_RECEIPT), &ReceiptListener{ wallet: w, @@ -428,7 +433,8 @@ func (l *ResponseListener) Hear(msg switchboard.Item, isHeardElsewhere bool) { receipt := l.formatReceipt(transaction) globals.Log.DEBUG.Printf("Attempting to send receipt to transaction"+ " recipient: %v!", transaction.Recipient) - err := io.Messaging.SendMessage(transaction.Recipient, + err := l.wallet.comm.SendMessage(l.wallet.session, + transaction.Recipient, receipt.Pack()) if err != nil { globals.Log.ERROR.Printf("Payment response listener couldn't send"+ diff --git a/payment/wallet_test.go b/payment/wallet_test.go index 36b1c784c..7a3b15081 100644 --- a/payment/wallet_test.go +++ b/payment/wallet_test.go @@ -94,7 +94,7 @@ func TestCreateWallet(t *testing.T) { s := user.NewSession(&user.User{User: id.NewUserFromUint(1, t), Nick: "test"}, "", []user.NodeKeys{}, grp.NewInt(1), grp) - _, err := CreateWallet(s, false) + _, err := CreateWallet(s, io.NewMessenger(), false) if err != nil { t.Errorf("CreateWallet: error returned on valid wallet creation: %s", err.Error()) diff --git a/user/session.go b/user/session.go index 5611d5eb6..a20372da7 100644 --- a/user/session.go +++ b/user/session.go @@ -24,10 +24,6 @@ import ( // Errors var ErrQuery = errors.New("element not in map") -// Globally instantiated Session -// FIXME remove this sick filth -var TheSession Session - // Interface for User Session operations type Session interface { GetCurrentUser() (currentUser *User) @@ -132,8 +128,6 @@ func LoadSession(UID *id.User) (Session, error) { km.GenerateKeys(session.Grp, UID) } - TheSession = &session - return &session, nil } @@ -283,8 +277,6 @@ func (s *SessionObj) Immolate() error { clearRatchetKeys(&s.Keys[i].ReceptionKeys) } - TheSession = nil - s.UnlockStorage() return nil -- GitLab