diff --git a/broadcast/asymmetric.go b/broadcast/asymmetric.go index 79fc4b4d0988319d6f9e0380fa6bbb4cd6d038ff..9f913b8c2611cc15a1f5bc51425dcb40a00ca8f2 100644 --- a/broadcast/asymmetric.go +++ b/broadcast/asymmetric.go @@ -29,15 +29,10 @@ func (bc *broadcastClient) maxAsymmetricPayload() int { // BroadcastAsymmetric broadcasts the payload to the channel. Requires a healthy network state to send // Payload must be equal to bc.MaxAsymmetricPayloadSize, and the channel PrivateKey must be passed in -// Broadcast method must be set to asymmetric // When a payload is sent, it is split into partitons of size bc.channel.MaxAsymmetricPayloadSize // which are each encrypted using multicastRSA func (bc *broadcastClient) BroadcastAsymmetric(pk multicastRSA.PrivateKey, payload []byte, cMixParams cmix.CMIXParams) ( id.Round, ephemeral.Id, error) { - if bc.param.Method != Asymmetric { - return 0, ephemeral.Id{}, errors.Errorf(errBroadcastMethodType, Asymmetric, bc.param.Method) - } - if !bc.net.IsHealthy() { return 0, ephemeral.Id{}, errors.New(errNetworkHealth) } diff --git a/broadcast/broadcastClient.go b/broadcast/broadcastClient.go index 54555a3875db03af4331438e0dcb92922ad3aa06..45ec34edfde9156071775af0d19c74c0697474d6 100644 --- a/broadcast/broadcastClient.go +++ b/broadcast/broadcastClient.go @@ -17,26 +17,19 @@ import ( "gitlab.com/xx_network/crypto/signature/rsa" ) -// Param encapsulates configuration options for a broadcastClient -type Param struct { - Method Method -} - // broadcastClient implements the Channel interface for sending/receiving asymmetric or symmetric broadcast messages type broadcastClient struct { channel crypto.Channel net Client rng *fastRNG.StreamGenerator - param Param } // NewBroadcastChannel creates a channel interface based on crypto.Channel, accepts net client connection & callback for received messages -func NewBroadcastChannel(channel crypto.Channel, listenerCb ListenerFunc, net Client, rng *fastRNG.StreamGenerator, param Param) (Channel, error) { +func NewBroadcastChannel(channel crypto.Channel, net Client, rng *fastRNG.StreamGenerator) (Channel, error) { bc := &broadcastClient{ channel: channel, net: net, rng: rng, - param: param, } if !bc.verifyID() { @@ -46,31 +39,37 @@ func NewBroadcastChannel(channel crypto.Channel, listenerCb ListenerFunc, net Cl // Add channel's identity net.AddIdentity(channel.ReceptionID, identity.Forever, true) - p := &processor{ - c: &channel, - cb: listenerCb, - method: param.Method, - } + jww.INFO.Printf("New broadcast channel client created for channel %q (%s)", + channel.Name, channel.ReceptionID) + + return bc, nil +} + +// RegisterListener adds a service to hear broadcast messages of a given type via the passed in callback +func (bc *broadcastClient) RegisterListener(listenerCb ListenerFunc, method Method) error { var tag string - switch param.Method { + switch method { case Symmetric: tag = symmetricBroadcastServiceTag case Asymmetric: tag = asymmetricBroadcastServiceTag default: - return nil, errors.Errorf("Cannot make broadcast client for unknown broadcast method %s", param.Method) + return errors.Errorf("Cannot register listener for broadcast method %s", method) } + + p := &processor{ + c: &bc.channel, + cb: listenerCb, + method: method, + } + service := message.Service{ - Identifier: channel.ReceptionID.Bytes(), + Identifier: bc.channel.ReceptionID.Bytes(), Tag: tag, } - net.AddService(channel.ReceptionID, service, p) - - jww.INFO.Printf("New %s broadcast client created for channel %q (%s)", - param.Method, channel.Name, channel.ReceptionID) - - return bc, nil + bc.net.AddService(bc.channel.ReceptionID, service, p) + return nil } // Stop unregisters the listener callback and stops the channel's identity @@ -89,7 +88,6 @@ func (bc *broadcastClient) Get() crypto.Channel { } // verifyID generates a symmetric ID based on the info in the channel & compares it to the one passed in -// TODO: it seems very odd to me that we do this, rather than just making the ID a private/ephemeral component like the key func (bc *broadcastClient) verifyID() bool { gen, err := crypto.NewChannelID(bc.channel.Name, bc.channel.Description, bc.channel.Salt, rsa.CreatePublicKeyPem(bc.channel.RsaPubKey)) if err != nil { @@ -100,12 +98,9 @@ func (bc *broadcastClient) verifyID() bool { } func (bc *broadcastClient) MaxPayloadSize() int { - switch bc.param.Method { - case Symmetric: - return bc.maxSymmetricPayload() - case Asymmetric: - return bc.maxAsymmetricPayload() - default: - return -1 - } + return bc.maxSymmetricPayload() +} + +func (bc *broadcastClient) MaxAsymmetricPayloadSize() int { + return bc.maxAsymmetricPayload() } diff --git a/broadcast/interface.go b/broadcast/interface.go index 2c7d9e33820ce78a1532b693ace989fa7ea386f2..badfc8bc7e4009c59753d2fd39bf55b5d5043cad 100644 --- a/broadcast/interface.go +++ b/broadcast/interface.go @@ -26,9 +26,12 @@ type ListenerFunc func(payload []byte, receptionID receptionID.EphemeralIdentity, round rounds.Round) type Channel interface { - // MaxPayloadSize returns the maximum size for a broadcast payload. Different math depending on broadcast method. + // MaxPayloadSize returns the maximum size for a symmetric broadcast payload MaxPayloadSize() int + // MaxAsymmetricPayloadSize returns the maximum size for an asymmetric broadcast payload + MaxAsymmetricPayloadSize() int + // Get returns the underlying crypto.Channel Get() crypto.Channel @@ -42,6 +45,8 @@ type Channel interface { BroadcastAsymmetric(pk multicastRSA.PrivateKey, payload []byte, cMixParams cmix.CMIXParams) ( id.Round, ephemeral.Id, error) + RegisterListener(listenerCb ListenerFunc, method Method) error + // Stop unregisters the listener callback and stops the channel's identity // from being tracked. Stop() diff --git a/broadcast/symmetric.go b/broadcast/symmetric.go index ca0c528c588b184a72dc788a2fa83c0423ca5268..10ca5a5b33bad72316973ee8dc4dc959331a20a1 100644 --- a/broadcast/symmetric.go +++ b/broadcast/symmetric.go @@ -35,15 +35,10 @@ func (bc *broadcastClient) maxSymmetricPayload() int { } // Broadcast broadcasts a payload over a symmetric channel. -// broadcast method must be set to Symmetric // Network must be healthy to send // Requires a payload of size bc.MaxSymmetricPayloadSize() func (bc *broadcastClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams) ( id.Round, ephemeral.Id, error) { - if bc.param.Method != Symmetric { - return 0, ephemeral.Id{}, errors.Errorf(errBroadcastMethodType, Symmetric, bc.param.Method) - } - if !bc.net.IsHealthy() { return 0, ephemeral.Id{}, errors.New(errNetworkHealth) } diff --git a/cmd/broadcast.go b/cmd/broadcast.go index 8b9ae2358d5e4b6b51d2dd15bc6b6e22e74e5c05..3d0a11de1b90f140510d90157049ed562e75a47c 100644 --- a/cmd/broadcast.go +++ b/cmd/broadcast.go @@ -16,7 +16,7 @@ import ( "time" ) -// singleCmd is the single-use subcommand that allows for sending and responding +// singleCmd is the single-use subcommand that allows for sening and responding // to single-use messages. var broadcastCmd = &cobra.Command{ Use: "broadcast", @@ -43,7 +43,6 @@ var broadcastCmd = &cobra.Command{ connected <- isconnected }) waitUntilConnected(connected) - /* Set up underlying crypto broadcast.Channel */ var channel *crypto.Channel var pk *rsa.PrivateKey @@ -69,7 +68,6 @@ var broadcastCmd = &cobra.Command{ jww.FATAL.Panicf("description cannot be empty") } - var channel *crypto.Channel if viper.GetBool("new") { // Create a new broadcast channel channel, pk, err = crypto.NewChannel(name, desc, client.GetRng().GetStream()) @@ -78,7 +76,7 @@ var broadcastCmd = &cobra.Command{ } if keyPath != "" { - err = utils.WriteFile(path, rsa.CreatePrivateKeyPem(pk), os.ModePerm, os.ModeDir) + err = utils.WriteFile(keyPath, rsa.CreatePrivateKeyPem(pk), os.ModePerm, os.ModeDir) if err != nil { jww.ERROR.Printf("Failed to write private key to path %s: %+v", path, err) } @@ -106,23 +104,6 @@ var broadcastCmd = &cobra.Command{ Salt: salt, RsaPubKey: pubKey, } - - // Load key if it's there - if keyPath != "" { - if ep, err := utils.ExpandPath(keyPath); err == nil { - keyBytes, err := utils.ReadFile(ep) - if err != nil { - jww.ERROR.Printf("Failed to read private key from %s: %+v", ep, err) - } - pk, err = rsa.LoadPrivateKeyFromPem(keyBytes) - if err != nil { - jww.ERROR.Printf("Failed to load private key %+v: %+v", keyBytes, err) - } - } else { - jww.ERROR.Printf("Failed to expand private key path: %+v", err) - } - - } } // Save channel to disk @@ -141,66 +122,99 @@ var broadcastCmd = &cobra.Command{ } } + // Load key if needed + if pk == nil && keyPath != "" { + jww.DEBUG.Printf("Attempting to load private key at %s") + if ep, err := utils.ExpandPath(keyPath); err == nil { + keyBytes, err := utils.ReadFile(ep) + if err != nil { + jww.ERROR.Printf("Failed to read private key from %s: %+v", ep, err) + } + pk, err = rsa.LoadPrivateKeyFromPem(keyBytes) + if err != nil { + jww.ERROR.Printf("Failed to load private key %+v: %+v", keyBytes, err) + } + } else { + jww.ERROR.Printf("Failed to expand private key path: %+v", err) + } + } + /* Broadcast client setup */ - // Create receiver callback + // Select broadcast method + symmetric := viper.GetString("symmetric") + asymmetric := viper.GetString("asymmetric") + + // Connect to broadcast channel + bcl, err := broadcast.NewBroadcastChannel(*channel, client.GetCmix(), client.GetRng()) + + // Create & register symmetric receiver callback receiveChan := make(chan []byte, 100) - cb := func(payload []byte, + scb := func(payload []byte, receptionID receptionID.EphemeralIdentity, round rounds.Round) { jww.INFO.Printf("Received symmetric message from %s over round %d", receptionID, round.ID) receiveChan <- payload } + err = bcl.RegisterListener(scb, broadcast.Symmetric) + if err != nil { + jww.FATAL.Panicf("Failed to register symmetric listener: %+v", err) + } - // Select broadcast method - var method broadcast.Method - symmetric := viper.GetBool("symmetric") - asymmetric := viper.GetBool("asymmetric") - if symmetric && asymmetric { - jww.FATAL.Panicf("Cannot simultaneously broadcast symmetric & asymmetric") + // Create & register asymmetric receiver callback + asymmetricReceiveChan := make(chan []byte, 100) + acb := func(payload []byte, + receptionID receptionID.EphemeralIdentity, round rounds.Round) { + jww.INFO.Printf("Received asymmetric message from %s over round %d", receptionID, round.ID) + asymmetricReceiveChan <- payload } - if symmetric { - method = broadcast.Symmetric - } else if asymmetric { - method = broadcast.Asymmetric + err = bcl.RegisterListener(acb, broadcast.Asymmetric) + if err != nil { + jww.FATAL.Panicf("Failed to register asymmetric listener: %+v", err) } - // Connect to broadcast channel - bcl, err := broadcast.NewBroadcastChannel(*channel, cb, client.GetCmix(), client.GetRng(), broadcast.Param{Method: method}) - - /* Create properly sized broadcast message */ - message := viper.GetString("broadcast") - fmt.Println(message) - var broadcastMessage []byte - if message != "" { - broadcastMessage, err = broadcast.NewSizedBroadcast(bcl.MaxPayloadSize(), []byte(message)) - if err != nil { - jww.ERROR.Printf("Failed to create sized broadcast: %+v", err) - } + jww.INFO.Printf("Broadcast listeners registered...") - } + /* Broadcast messages to the channel */ + if symmetric != "" || asymmetric != "" { + jww.INFO.Printf("Attempting to send broadcasts...") + // Wait for sendDelay before sending (to allow connection to establish) + sendDelay := time.Duration(viper.GetUint("sendDelay")) + time.Sleep(sendDelay) - /* Broadcast message to the channel */ - switch method { - case broadcast.Symmetric: - rid, eid, err := bcl.Broadcast(broadcastMessage, cmix.GetDefaultCMIXParams()) - if err != nil { - jww.ERROR.Printf("Failed to send symmetric broadcast message: %+v", err) - } - jww.INFO.Printf("Sent symmetric broadcast message to %s over round %d", eid, rid) - case broadcast.Asymmetric: - if pk == nil { - jww.FATAL.Panicf("CANNOT SEND ASYMMETRIC BROADCAST WITHOUT PRIVATE KEY") + /* Send symmetric broadcast */ + if symmetric != "" { + // Create properly sized broadcast message + broadcastMessage, err := broadcast.NewSizedBroadcast(bcl.MaxPayloadSize(), []byte(symmetric)) + if err != nil { + jww.ERROR.Printf("Failed to create sized broadcast: %+v", err) + } + rid, eid, err := bcl.Broadcast(broadcastMessage, cmix.GetDefaultCMIXParams()) + if err != nil { + jww.FATAL.Panicf("Failed to send symmetric broadcast message: %+v", err) + } + jww.INFO.Printf("Sent symmetric broadcast message to %s over round %d", eid, rid) } - rid, eid, err := bcl.BroadcastAsymmetric(pk, broadcastMessage, cmix.GetDefaultCMIXParams()) - if err != nil { - jww.ERROR.Printf("Failed to send asymmetric broadcast message: %+v", err) + + /* Send asymmetric broadcast */ + if asymmetric != "" { + // Create properly sized broadcast message + broadcastMessage, err := broadcast.NewSizedBroadcast(bcl.MaxAsymmetricPayloadSize(), []byte(asymmetric)) + if err != nil { + jww.ERROR.Printf("Failed to create sized broadcast: %+v", err) + } + if pk == nil { + jww.FATAL.Panicf("CANNOT SEND ASYMMETRIC BROADCAST WITHOUT PRIVATE KEY") + } + rid, eid, err := bcl.BroadcastAsymmetric(pk, broadcastMessage, cmix.GetDefaultCMIXParams()) + if err != nil { + jww.FATAL.Panicf("Failed to send asymmetric broadcast message: %+v", err) + } + jww.INFO.Printf("Sent asymmetric broadcast message to %s over round %d", eid, rid) } - jww.INFO.Printf("Sent asymmetric broadcast message to %s over round %d", eid, rid) - default: - jww.WARN.Printf("Unknown broadcast type (this should not happen)") } /* Receive broadcast messages over the channel */ + jww.INFO.Printf("Waiting for message reception...") waitSecs := viper.GetUint("waitTimeout") expectedCnt := viper.GetUint("receiveCount") waitTimeout := time.Duration(waitSecs) * time.Second @@ -209,6 +223,17 @@ var broadcastCmd = &cobra.Command{ for !done && expectedCnt != 0 { timeout := time.NewTimer(waitTimeout) select { + case receivedPayload := <-asymmetricReceiveChan: + receivedCount++ + receivedBroadcast, err := broadcast.DecodeSizedBroadcast(receivedPayload) + if err != nil { + jww.ERROR.Printf("Failed to decode sized broadcast: %+v", err) + continue + } + fmt.Printf("Asymmetric broadcast message %d/%d received: %s\n", receivedCount, expectedCnt, string(receivedBroadcast)) + if receivedCount == expectedCnt { + done = true + } case receivedPayload := <-receiveChan: receivedCount++ receivedBroadcast, err := broadcast.DecodeSizedBroadcast(receivedPayload) @@ -266,16 +291,12 @@ func init() { "Create new broadcast channel") _ = viper.BindPFlag("new", broadcastCmd.Flags().Lookup("new")) - broadcastCmd.Flags().StringP("broadcast", "", "", - "Message contents for broadcast") - _ = viper.BindPFlag("broadcast", broadcastCmd.Flags().Lookup("broadcast")) - - broadcastCmd.Flags().BoolP("symmetric", "", false, - "Set broadcast method to symmetric") + broadcastCmd.Flags().StringP("symmetric", "", "", + "Send symmetric broadcast message") _ = viper.BindPFlag("symmetric", broadcastCmd.Flags().Lookup("symmetric")) - broadcastCmd.Flags().BoolP("asymmetric", "", false, - "Set broadcast method to asymmetric") + broadcastCmd.Flags().StringP("asymmetric", "", "", + "Send asymmetric broadcast message (must be used with keyPath)") _ = viper.BindPFlag("asymmetric", broadcastCmd.Flags().Lookup("asymmetric")) rootCmd.AddCommand(broadcastCmd) diff --git a/cmd/root.go b/cmd/root.go index 8bdd2944b77cb12258a4d73804b8ec22d3377c6a..094d3a3bc403b27a533902595027a438136b7c91 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1059,9 +1059,9 @@ func init() { rootCmd.Flags().UintP("sendCount", "", 1, "The number of times to send the message") viper.BindPFlag("sendCount", rootCmd.Flags().Lookup("sendCount")) - rootCmd.Flags().UintP("sendDelay", + rootCmd.PersistentFlags().UintP("sendDelay", "", 500, "The delay between sending the messages in ms") - viper.BindPFlag("sendDelay", rootCmd.Flags().Lookup("sendDelay")) + viper.BindPFlag("sendDelay", rootCmd.PersistentFlags().Lookup("sendDelay")) rootCmd.Flags().BoolP("splitSends", "", false, "Force sends to go over multiple rounds if possible") viper.BindPFlag("splitSends", rootCmd.Flags().Lookup("splitSends"))