diff --git a/broadcast/asymmetric.go b/broadcast/asymmetric.go index 79fc4b4d0988319d6f9e0380fa6bbb4cd6d038ff..8fab1b5a24dad08ade68a369857f2dca06795c1e 100644 --- a/broadcast/asymmetric.go +++ b/broadcast/asymmetric.go @@ -8,10 +8,10 @@ package broadcast import ( + "encoding/binary" "github.com/pkg/errors" "gitlab.com/elixxir/client/cmix" "gitlab.com/elixxir/client/cmix/message" - "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/crypto/multicastRSA" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" @@ -20,6 +20,7 @@ import ( const ( asymmetricBroadcastServiceTag = "AsymmBcast" asymmCMixSendTag = "AsymmetricBroadcast" + internalPayloadSizeLength = 2 ) // MaxAsymmetricPayloadSize returns the maximum size for an asymmetric broadcast payload @@ -29,39 +30,28 @@ func (bc *broadcastClient) maxAsymmetricPayload() int { // BroadcastAsymmetric broadcasts the payload to the channel. Requires a healthy network state to send // Payload must be equal to bc.MaxAsymmetricPayloadSize, and the channel PrivateKey must be passed in -// Broadcast method must be set to asymmetric -// When a payload is sent, it is split into partitons of size bc.channel.MaxAsymmetricPayloadSize -// which are each encrypted using multicastRSA func (bc *broadcastClient) BroadcastAsymmetric(pk multicastRSA.PrivateKey, payload []byte, cMixParams cmix.CMIXParams) ( id.Round, ephemeral.Id, error) { - if bc.param.Method != Asymmetric { - return 0, ephemeral.Id{}, errors.Errorf(errBroadcastMethodType, Asymmetric, bc.param.Method) - } - + // Confirm network health if !bc.net.IsHealthy() { return 0, ephemeral.Id{}, errors.New(errNetworkHealth) } - if len(payload) != bc.maxAsymmetricPayload() { + // Check payload size + if len(payload) > bc.MaxAsymmetricPayloadSize() { return 0, ephemeral.Id{}, errors.Errorf(errPayloadSize, len(payload), bc.maxAsymmetricPayload()) } + payloadLength := uint16(len(payload)) - numParts := bc.maxParts() - size := bc.channel.MaxAsymmetricPayloadSize() - var mac []byte - var fp format.Fingerprint - var sequential []byte - for i := 0; i < numParts; i++ { - // Encrypt payload to send using asymmetric channel - var encryptedPayload []byte - var err error - encryptedPayload, mac, fp, err = bc.channel.EncryptAsymmetric(payload[:size], pk, bc.rng.GetStream()) - if err != nil { - return 0, ephemeral.Id{}, errors.WithMessage(err, "Failed to encrypt asymmetric broadcast message") - } - payload = payload[size:] - sequential = append(sequential, encryptedPayload...) + finalPayload := make([]byte, bc.maxAsymmetricPayloadSizeRaw()) + binary.BigEndian.PutUint16(finalPayload[:internalPayloadSizeLength], payloadLength) + copy(finalPayload[internalPayloadSizeLength:], payload) + + // Encrypt payload + encryptedPayload, mac, fp, err := bc.channel.EncryptAsymmetric(finalPayload, pk, bc.rng.GetStream()) + if err != nil { + return 0, ephemeral.Id{}, errors.WithMessage(err, "Failed to encrypt asymmetric broadcast message") } // Create service object to send message @@ -74,10 +64,14 @@ func (bc *broadcastClient) BroadcastAsymmetric(pk multicastRSA.PrivateKey, paylo cMixParams.DebugTag = asymmCMixSendTag } - sizedPayload, err := NewSizedBroadcast(bc.net.GetMaxMessageLength(), sequential) + // Create payload sized for sending over cmix + sizedPayload := make([]byte, bc.net.GetMaxMessageLength()) + // Read random data into sized payload + _, err = bc.rng.GetStream().Read(sizedPayload) if err != nil { - return id.Round(0), ephemeral.Id{}, err + return 0, ephemeral.Id{}, errors.WithMessage(err, "Failed to add random data to sized broadcast") } + copy(sizedPayload[:len(encryptedPayload)], encryptedPayload) return bc.net.Send( bc.channel.ReceptionID, fp, service, sizedPayload, mac, cMixParams) diff --git a/broadcast/asymmetric_test.go b/broadcast/asymmetric_test.go index 88aa2219d3495beb4659b97f5aa34ca4baf9c27f..776879127b040946b8147c184341cc4860de3296 100644 --- a/broadcast/asymmetric_test.go +++ b/broadcast/asymmetric_test.go @@ -56,11 +56,16 @@ func Test_asymmetricClient_Smoke(t *testing.T) { cbChan <- payload } - s, err := NewBroadcastChannel(channel, cb, newMockCmix(cMixHandler), rngGen, Param{Method: Asymmetric}) + s, err := NewBroadcastChannel(channel, newMockCmix(cMixHandler), rngGen) if err != nil { t.Errorf("Failed to create broadcast channel: %+v", err) } + err = s.RegisterListener(cb, Asymmetric) + if err != nil { + t.Errorf("Failed to register listener: %+v", err) + } + cbChans[i] = cbChan clients[i] = s @@ -73,7 +78,7 @@ func Test_asymmetricClient_Smoke(t *testing.T) { // Send broadcast from each client for i := range clients { - payload := make([]byte, clients[i].MaxPayloadSize()) + payload := make([]byte, clients[i].MaxAsymmetricPayloadSize()) copy(payload, fmt.Sprintf("Hello from client %d of %d.", i, len(clients))) @@ -112,7 +117,7 @@ func Test_asymmetricClient_Smoke(t *testing.T) { clients[i].Stop() } - payload := make([]byte, clients[0].MaxPayloadSize()) + payload := make([]byte, clients[0].MaxAsymmetricPayloadSize()) copy(payload, "This message should not get through.") // Start waiting on channels and error if anything is received diff --git a/broadcast/broadcastClient.go b/broadcast/broadcastClient.go index b34740f7ed5de80f4e50cfc692d69bb65156630c..6e1c7391cc6f97ed429cfd4164508444ac3eb0b3 100644 --- a/broadcast/broadcastClient.go +++ b/broadcast/broadcastClient.go @@ -17,26 +17,19 @@ import ( "gitlab.com/xx_network/crypto/signature/rsa" ) -// Param encapsulates configuration options for a broadcastClient -type Param struct { - Method Method -} - // broadcastClient implements the Channel interface for sending/receiving asymmetric or symmetric broadcast messages type broadcastClient struct { channel crypto.Channel net Client rng *fastRNG.StreamGenerator - param Param } // NewBroadcastChannel creates a channel interface based on crypto.Channel, accepts net client connection & callback for received messages -func NewBroadcastChannel(channel crypto.Channel, listenerCb ListenerFunc, net Client, rng *fastRNG.StreamGenerator, param Param) (Channel, error) { +func NewBroadcastChannel(channel crypto.Channel, net Client, rng *fastRNG.StreamGenerator) (Channel, error) { bc := &broadcastClient{ channel: channel, net: net, rng: rng, - param: param, } if !bc.verifyID() { @@ -46,31 +39,37 @@ func NewBroadcastChannel(channel crypto.Channel, listenerCb ListenerFunc, net Cl // Add channel's identity net.AddIdentity(channel.ReceptionID, identity.Forever, true) - p := &processor{ - c: &channel, - cb: listenerCb, - method: param.Method, - } + jww.INFO.Printf("New broadcast channel client created for channel %q (%s)", + channel.Name, channel.ReceptionID) + + return bc, nil +} + +// RegisterListener adds a service to hear broadcast messages of a given type via the passed in callback +func (bc *broadcastClient) RegisterListener(listenerCb ListenerFunc, method Method) error { var tag string - switch param.Method { + switch method { case Symmetric: tag = symmetricBroadcastServiceTag case Asymmetric: tag = asymmetricBroadcastServiceTag default: - return nil, errors.Errorf("Cannot make broadcast client for unknown broadcast method %s", param.Method) + return errors.Errorf("Cannot register listener for broadcast method %s", method) + } + + p := &processor{ + c: &bc.channel, + cb: listenerCb, + method: method, } + service := message.Service{ - Identifier: channel.ReceptionID.Bytes(), + Identifier: bc.channel.ReceptionID.Bytes(), Tag: tag, } - net.AddService(channel.ReceptionID, service, p) - - jww.INFO.Printf("New %s broadcast client created for channel %q (%s)", - param.Method, channel.Name, channel.ReceptionID) - - return bc, nil + bc.net.AddService(bc.channel.ReceptionID, service, p) + return nil } // Stop unregisters the listener callback and stops the channel's identity @@ -89,7 +88,6 @@ func (bc *broadcastClient) Get() crypto.Channel { } // verifyID generates a symmetric ID based on the info in the channel & compares it to the one passed in -// TODO: it seems very odd to me that we do this, rather than just making the ID a private/ephemeral component like the key func (bc *broadcastClient) verifyID() bool { gen, err := crypto.NewChannelID(bc.channel.Name, bc.channel.Description, bc.channel.Salt, rsa.CreatePublicKeyPem(bc.channel.RsaPubKey)) if err != nil { @@ -100,12 +98,13 @@ func (bc *broadcastClient) verifyID() bool { } func (bc *broadcastClient) MaxPayloadSize() int { - switch bc.param.Method { - case Symmetric: - return bc.maxSymmetricPayload() - case Asymmetric: - return bc.maxAsymmetricPayload() - default: - return -1 - } + return bc.maxSymmetricPayload() +} + +func (bc *broadcastClient) MaxAsymmetricPayloadSize() int { + return bc.maxAsymmetricPayloadSizeRaw() - internalPayloadSizeLength +} + +func (bc *broadcastClient) maxAsymmetricPayloadSizeRaw() int { + return bc.channel.MaxAsymmetricPayloadSize() } diff --git a/broadcast/interface.go b/broadcast/interface.go index 2c7d9e33820ce78a1532b693ace989fa7ea386f2..badfc8bc7e4009c59753d2fd39bf55b5d5043cad 100644 --- a/broadcast/interface.go +++ b/broadcast/interface.go @@ -26,9 +26,12 @@ type ListenerFunc func(payload []byte, receptionID receptionID.EphemeralIdentity, round rounds.Round) type Channel interface { - // MaxPayloadSize returns the maximum size for a broadcast payload. Different math depending on broadcast method. + // MaxPayloadSize returns the maximum size for a symmetric broadcast payload MaxPayloadSize() int + // MaxAsymmetricPayloadSize returns the maximum size for an asymmetric broadcast payload + MaxAsymmetricPayloadSize() int + // Get returns the underlying crypto.Channel Get() crypto.Channel @@ -42,6 +45,8 @@ type Channel interface { BroadcastAsymmetric(pk multicastRSA.PrivateKey, payload []byte, cMixParams cmix.CMIXParams) ( id.Round, ephemeral.Id, error) + RegisterListener(listenerCb ListenerFunc, method Method) error + // Stop unregisters the listener callback and stops the channel's identity // from being tracked. Stop() diff --git a/broadcast/processor.go b/broadcast/processor.go index ec9ab810dc0aa16eefbb0add3ee44c53f55597d3..651974c4cb648aba5df3e56fa8f46901cc1a2899 100644 --- a/broadcast/processor.go +++ b/broadcast/processor.go @@ -8,6 +8,7 @@ package broadcast import ( + "encoding/binary" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/rounds" @@ -35,25 +36,15 @@ func (p *processor) Process(msg format.Message, var err error switch p.method { case Asymmetric: - // We use sized broadcast to fill any remaining bytes in the cmix payload, decode it here - unsizedPayload, err := DecodeSizedBroadcast(msg.GetContents()) - if err != nil { - jww.ERROR.Printf("Failed to decode sized broadcast: %+v", err) + encPartSize := p.c.RsaPubKey.Size() // Size returned by multicast RSA encryption + encodedMessage := msg.GetContents()[:encPartSize] // Only one message is encoded, rest of it is random data + decodedMessage, decryptErr := p.c.DecryptAsymmetric(encodedMessage) + if decryptErr != nil { + jww.ERROR.Printf(errDecrypt, p.c.ReceptionID, p.c.Name, decryptErr) return } - encPartSize := p.c.RsaPubKey.Size() // Size of each chunk returned by multicast RSA encryption - numParts := len(unsizedPayload) / encPartSize // Number of chunks in the payload - // Iterate through & decrypt each chunk, appending to aggregate payload - for i := 0; i < numParts; i++ { - var decrypted []byte - decrypted, err = p.c.DecryptAsymmetric(unsizedPayload[:encPartSize]) - if err != nil { - jww.ERROR.Printf(errDecrypt, p.c.ReceptionID, p.c.Name, err) - return - } - unsizedPayload = unsizedPayload[encPartSize:] - payload = append(payload, decrypted...) - } + size := binary.BigEndian.Uint16(decodedMessage[:internalPayloadSizeLength]) + payload = decodedMessage[internalPayloadSizeLength : size+internalPayloadSizeLength] case Symmetric: payload, err = p.c.DecryptSymmetric(msg.GetContents(), msg.GetMac(), msg.GetKeyFP()) diff --git a/broadcast/symmetric.go b/broadcast/symmetric.go index ca0c528c588b184a72dc788a2fa83c0423ca5268..e9b96f75c845dc0da8d91f87235d2e011db1cf2c 100644 --- a/broadcast/symmetric.go +++ b/broadcast/symmetric.go @@ -19,7 +19,7 @@ import ( const ( // symmetricClient.Broadcast errNetworkHealth = "cannot send broadcast when the network is not healthy" - errPayloadSize = "size of payload %d must be %d" + errPayloadSize = "size of payload %d must be less than %d" errBroadcastMethodType = "cannot call %s broadcast using %s channel" ) @@ -35,15 +35,10 @@ func (bc *broadcastClient) maxSymmetricPayload() int { } // Broadcast broadcasts a payload over a symmetric channel. -// broadcast method must be set to Symmetric // Network must be healthy to send // Requires a payload of size bc.MaxSymmetricPayloadSize() func (bc *broadcastClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams) ( id.Round, ephemeral.Id, error) { - if bc.param.Method != Symmetric { - return 0, ephemeral.Id{}, errors.Errorf(errBroadcastMethodType, Symmetric, bc.param.Method) - } - if !bc.net.IsHealthy() { return 0, ephemeral.Id{}, errors.New(errNetworkHealth) } diff --git a/broadcast/symmetric_test.go b/broadcast/symmetric_test.go index de76da504ca02fa6b8b1aa558bbef20a865a7730..2eb26d29eb3b56db1e894ad5b3da3aaabadf114f 100644 --- a/broadcast/symmetric_test.go +++ b/broadcast/symmetric_test.go @@ -63,10 +63,16 @@ func Test_symmetricClient_Smoke(t *testing.T) { cbChan <- payload } - s, err := NewBroadcastChannel(channel, cb, newMockCmix(cMixHandler), rngGen, Param{Method: Symmetric}) + s, err := NewBroadcastChannel(channel, newMockCmix(cMixHandler), rngGen) if err != nil { t.Errorf("Failed to create broadcast channel: %+v", err) } + + err = s.RegisterListener(cb, Symmetric) + if err != nil { + t.Errorf("Failed to register listener: %+v", err) + } + cbChans[i] = cbChan clients[i] = s diff --git a/cmd/broadcast.go b/cmd/broadcast.go index ccb86bc76e6111ce6b736859264ab66b48ab8500..1e43d12867cfb7bb2708d962c27ad897ebe7f477 100644 --- a/cmd/broadcast.go +++ b/cmd/broadcast.go @@ -15,9 +15,10 @@ import ( crypto "gitlab.com/elixxir/crypto/broadcast" "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/primitives/utils" + "sync" ) -// singleCmd is the single-use subcommand that allows for sending and responding +// singleCmd is the single-use subcommand that allows for sening and responding // to single-use messages. var broadcastCmd = &cobra.Command{ Use: "broadcast", @@ -46,7 +47,6 @@ var broadcastCmd = &cobra.Command{ connected <- isConnected }) waitUntilConnected(connected) - /* Set up underlying crypto broadcast.Channel */ var channel *crypto.Channel var pk *rsa.PrivateKey @@ -81,13 +81,14 @@ var broadcastCmd = &cobra.Command{ } if keyPath != "" { - err = utils.WriteFile(path, rsa.CreatePrivateKeyPem(pk), os.ModePerm, os.ModeDir) + err = utils.WriteFile(keyPath, rsa.CreatePrivateKeyPem(pk), os.ModePerm, os.ModeDir) if err != nil { jww.ERROR.Printf("Failed to write private key to path %s: %+v", path, err) } } else { fmt.Printf("Private key generated for channel: %+v", rsa.CreatePrivateKeyPem(pk)) } + fmt.Printf("New broadcast channel generated") } else { // Read rest of info from config & build object manually pubKeyBytes := []byte(viper.GetString(broadcastRsaPubFlag)) @@ -109,23 +110,6 @@ var broadcastCmd = &cobra.Command{ Salt: salt, RsaPubKey: pubKey, } - - // Load key if it's there - if keyPath != "" { - if ep, err := utils.ExpandPath(keyPath); err == nil { - keyBytes, err := utils.ReadFile(ep) - if err != nil { - jww.ERROR.Printf("Failed to read private key from %s: %+v", ep, err) - } - pk, err = rsa.LoadPrivateKeyFromPem(keyBytes) - if err != nil { - jww.ERROR.Printf("Failed to load private key %+v: %+v", keyBytes, err) - } - } else { - jww.ERROR.Printf("Failed to expand private key path: %+v", err) - } - - } } // Save channel to disk @@ -144,66 +128,123 @@ var broadcastCmd = &cobra.Command{ } } + // Load key if needed + if pk == nil && keyPath != "" { + jww.DEBUG.Printf("Attempting to load private key at %s", keyPath) + if ep, err := utils.ExpandPath(keyPath); err == nil { + keyBytes, err := utils.ReadFile(ep) + if err != nil { + jww.ERROR.Printf("Failed to read private key from %s: %+v", ep, err) + } + pk, err = rsa.LoadPrivateKeyFromPem(keyBytes) + if err != nil { + jww.ERROR.Printf("Failed to load private key %+v: %+v", keyBytes, err) + } + } else { + jww.ERROR.Printf("Failed to expand private key path: %+v", err) + } + } + /* Broadcast client setup */ - // Create receiver callback + // Select broadcast method + symmetric := viper.GetString("symmetric") + asymmetric := viper.GetString("asymmetric") + + // Connect to broadcast channel + bcl, err := broadcast.NewBroadcastChannel(*channel, client.GetCmix(), client.GetRng()) + + // Create & register symmetric receiver callback receiveChan := make(chan []byte, 100) - cb := func(payload []byte, + scb := func(payload []byte, receptionID receptionID.EphemeralIdentity, round rounds.Round) { jww.INFO.Printf("Received symmetric message from %s over round %d", receptionID, round.ID) receiveChan <- payload } + err = bcl.RegisterListener(scb, broadcast.Symmetric) + if err != nil { + jww.FATAL.Panicf("Failed to register asymmetric listener: %+v", err) + } - // Select broadcast method - var method broadcast.Method - symmetric := viper.GetBool(broadcastSymmetricFlag) - asymmetric := viper.GetBool(broadcastAsymmetricFlag) - if symmetric && asymmetric { - jww.FATAL.Panicf("Cannot simultaneously broadcast symmetric & asymmetric") + // Create & register asymmetric receiver callback + asymmetricReceiveChan := make(chan []byte, 100) + acb := func(payload []byte, + receptionID receptionID.EphemeralIdentity, round rounds.Round) { + jww.INFO.Printf("Received asymmetric message from %s over round %d", receptionID, round.ID) + asymmetricReceiveChan <- payload } - if symmetric { - method = broadcast.Symmetric - } else if asymmetric { - method = broadcast.Asymmetric + err = bcl.RegisterListener(acb, broadcast.Asymmetric) + if err != nil { + jww.FATAL.Panicf("Failed to register asymmetric listener: %+v", err) } - // Connect to broadcast channel - bcl, err := broadcast.NewBroadcastChannel(*channel, cb, client.GetCmix(), client.GetRng(), broadcast.Param{Method: method}) + jww.INFO.Printf("Broadcast listeners registered...") + + /* Broadcast messages to the channel */ + if symmetric != "" || asymmetric != "" { + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + jww.INFO.Printf("Attempting to send broadcasts...") + + sendDelay := time.Duration(viper.GetUint("sendDelay")) + maxRetries := 10 + retries := 0 + for { + // Wait for sendDelay before sending (to allow connection to establish) + if maxRetries == retries { + jww.FATAL.Panicf("Max retries reached") + } + time.Sleep(sendDelay*time.Millisecond*time.Duration(retries) + 1) - /* Create properly sized broadcast message */ - message := viper.GetString(broadcastFlag) - fmt.Println(message) - var broadcastMessage []byte - if message != "" { - broadcastMessage, err = broadcast.NewSizedBroadcast(bcl.MaxPayloadSize(), []byte(message)) - if err != nil { - jww.ERROR.Printf("Failed to create sized broadcast: %+v", err) - } + /* Send symmetric broadcast */ + if symmetric != "" { + // Create properly sized broadcast message + broadcastMessage, err := broadcast.NewSizedBroadcast(bcl.MaxPayloadSize(), []byte(symmetric)) + if err != nil { + jww.FATAL.Panicf("Failed to create sized broadcast: %+v", err) + } + rid, eid, err := bcl.Broadcast(broadcastMessage, cmix.GetDefaultCMIXParams()) + if err != nil { + jww.ERROR.Printf("Failed to send symmetric broadcast message: %+v", err) + retries++ + continue + } + fmt.Printf("Sent symmetric broadcast message: %s", symmetric) + jww.INFO.Printf("Sent symmetric broadcast message to %s over round %d", eid, rid) + } - } + /* Send asymmetric broadcast */ + if asymmetric != "" { + // Create properly sized broadcast message + broadcastMessage, err := broadcast.NewSizedBroadcast(bcl.MaxAsymmetricPayloadSize(), []byte(asymmetric)) + if err != nil { + jww.FATAL.Panicf("Failed to create sized broadcast: %+v", err) + } + if pk == nil { + jww.FATAL.Panicf("CANNOT SEND ASYMMETRIC BROADCAST WITHOUT PRIVATE KEY") + } + rid, eid, err := bcl.BroadcastAsymmetric(pk, broadcastMessage, cmix.GetDefaultCMIXParams()) + if err != nil { + jww.ERROR.Printf("Failed to send asymmetric broadcast message: %+v", err) + retries++ + continue + } + fmt.Printf("Sent asymmetric broadcast message: %s", asymmetric) + jww.INFO.Printf("Sent asymmetric broadcast message to %s over round %d", eid, rid) + } - /* Broadcast message to the channel */ - switch method { - case broadcast.Symmetric: - rid, eid, err := bcl.Broadcast(broadcastMessage, cmix.GetDefaultCMIXParams()) - if err != nil { - jww.ERROR.Printf("Failed to send symmetric broadcast message: %+v", err) - } - jww.INFO.Printf("Sent symmetric broadcast message to %s over round %d", eid, rid) - case broadcast.Asymmetric: - if pk == nil { - jww.FATAL.Panicf("CANNOT SEND ASYMMETRIC BROADCAST WITHOUT PRIVATE KEY") - } - rid, eid, err := bcl.BroadcastAsymmetric(pk, broadcastMessage, cmix.GetDefaultCMIXParams()) - if err != nil { - jww.ERROR.Printf("Failed to send asymmetric broadcast message: %+v", err) - } - jww.INFO.Printf("Sent asymmetric broadcast message to %s over round %d", eid, rid) - default: - jww.WARN.Printf("Unknown broadcast type (this should not happen)") + wg.Done() + break + } + }() + + wg.Wait() } + /* Create properly sized broadcast message */ /* Receive broadcast messages over the channel */ + jww.INFO.Printf("Waiting for message reception...") waitSecs := viper.GetUint(waitTimeoutFlag) expectedCnt := viper.GetUint(receiveCountFlag) waitTimeout := time.Duration(waitSecs) * time.Second @@ -212,6 +253,17 @@ var broadcastCmd = &cobra.Command{ for !done && expectedCnt != 0 { timeout := time.NewTimer(waitTimeout) select { + case receivedPayload := <-asymmetricReceiveChan: + receivedCount++ + receivedBroadcast, err := broadcast.DecodeSizedBroadcast(receivedPayload) + if err != nil { + jww.ERROR.Printf("Failed to decode sized broadcast: %+v", err) + continue + } + fmt.Printf("Asymmetric broadcast message received: %s\n", string(receivedBroadcast)) + if receivedCount == expectedCnt { + done = true + } case receivedPayload := <-receiveChan: receivedCount++ receivedBroadcast, err := broadcast.DecodeSizedBroadcast(receivedPayload) @@ -219,7 +271,7 @@ var broadcastCmd = &cobra.Command{ jww.ERROR.Printf("Failed to decode sized broadcast: %+v", err) continue } - fmt.Printf("Symmetric broadcast message %d/%d received: %s\n", receivedCount, expectedCnt, string(receivedBroadcast)) + fmt.Printf("Symmetric broadcast message received: %s\n", string(receivedBroadcast)) if receivedCount == expectedCnt { done = true } @@ -269,16 +321,14 @@ func init() { "Create new broadcast channel") bindFlagHelper(broadcastNewFlag, broadcastCmd) - broadcastCmd.Flags().StringP(broadcastFlag, "", "", - "Message contents for broadcast") - bindFlagHelper(broadcastFlag, broadcastCmd) - - broadcastCmd.Flags().BoolP(broadcastSymmetricFlag, "", false, - "Set broadcast method to symmetric") + broadcastCmd.Flags().StringP(broadcastSymmetricFlag, "", "", + "Send symmetric broadcast message") + _ = viper.BindPFlag("symmetric", broadcastCmd.Flags().Lookup("symmetric")) bindFlagHelper(broadcastSymmetricFlag, broadcastCmd) - broadcastCmd.Flags().BoolP(broadcastAsymmetricFlag, "", false, - "Set broadcast method to asymmetric") + broadcastCmd.Flags().StringP(broadcastAsymmetricFlag, "", "", + "Send asymmetric broadcast message (must be used with keyPath)") + _ = viper.BindPFlag("asymmetric", broadcastCmd.Flags().Lookup("asymmetric")) bindFlagHelper(broadcastAsymmetricFlag, broadcastCmd) rootCmd.AddCommand(broadcastCmd) diff --git a/cmd/root.go b/cmd/root.go index c5a83d0329e6cc623d0590bdac661c9d2539d16b..905b5c25954dcc476f85b7c9aa8fbda5b530f303 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -993,9 +993,9 @@ func init() { rootCmd.PersistentFlags().UintP(sendCountFlag, "", 1, "The number of times to send the message") viper.BindPFlag(sendCountFlag, rootCmd.PersistentFlags().Lookup(sendCountFlag)) - rootCmd.Flags().UintP(sendDelayFlag, + rootCmd.PersistentFlags().UintP(sendDelayFlag, "", 500, "The delay between sending the messages in ms") - viper.BindPFlag(sendDelayFlag, rootCmd.Flags().Lookup(sendDelayFlag)) + viper.BindPFlag(sendDelayFlag, rootCmd.PersistentFlags().Lookup(sendDelayFlag)) rootCmd.Flags().BoolP(splitSendsFlag, "", false, "Force sends to go over multiple rounds if possible") viper.BindPFlag(splitSendsFlag, rootCmd.Flags().Lookup(splitSendsFlag))