diff --git a/broadcast/asymmetricClient.go b/broadcast/asymmetricClient.go new file mode 100644 index 0000000000000000000000000000000000000000..ef397c2e50c925fcab4bd03a98a7645e91c76a10 --- /dev/null +++ b/broadcast/asymmetricClient.go @@ -0,0 +1,103 @@ +package broadcast + +import ( + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity" + "gitlab.com/elixxir/client/cmix/message" + crypto "gitlab.com/elixxir/crypto/broadcast" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/xx_network/crypto/multicastRSA" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" +) + +const ( + asymmetricBroadcastServiceTag = "AsymmBcast" + asymmCMixSendTag = "AsymmetricBroadcast" +) + +type asymmetricClient struct { + channel crypto.Asymmetric + net Client + rng *fastRNG.StreamGenerator +} + +// Creates a +func NewAsymmetricClient(channel crypto.Asymmetric, listenerCb ListenerFunc, net Client, rng *fastRNG.StreamGenerator) Asymmetric { + // Add channel's identity + net.AddIdentity(channel.ReceptionID, identity.Forever, true) + + p := &asymmetricProcessor{ + ac: &channel, + cb: listenerCb, + } + + service := message.Service{ + Identifier: channel.ReceptionID.Bytes(), + Tag: asymmetricBroadcastServiceTag, + } + + net.AddService(channel.ReceptionID, service, p) + + jww.INFO.Printf("New asymmetric broadcast client created for channel %q (%s)", + channel.Name, channel.ReceptionID) + + return &asymmetricClient{ + channel: channel, + net: net, + rng: rng, + } +} + +// Broadcast broadcasts the payload to the channel. Requires a healthy network state to send +// Payload must be equal to ac.MaxPayloadSize, and the channel PrivateKey must be passed in +func (ac *asymmetricClient) Broadcast(pk multicastRSA.PrivateKey, payload []byte, cMixParams cmix.CMIXParams) ( + id.Round, ephemeral.Id, error) { + if !ac.net.IsHealthy() { + return 0, ephemeral.Id{}, errors.New(errNetworkHealth) + } + + if len(payload) != ac.MaxPayloadSize() { + return 0, ephemeral.Id{}, + errors.Errorf(errPayloadSize, len(payload), ac.MaxPayloadSize()) + } + // Encrypt payload to send using asymmetric channel + encryptedPayload, mac, fp, err := ac.channel.Encrypt(payload, pk, ac.rng.GetStream()) + if err != nil { + return 0, ephemeral.Id{}, errors.WithMessage(err, "Failed to encrypt asymmetric broadcast message") + } + + // Create service object to send message + service := message.Service{ + Identifier: ac.channel.ReceptionID.Bytes(), + Tag: asymmetricBroadcastServiceTag, + } + + if cMixParams.DebugTag == cmix.DefaultDebugTag { + cMixParams.DebugTag = asymmCMixSendTag + } + + return ac.net.Send( + ac.channel.ReceptionID, fp, service, encryptedPayload, mac, cMixParams) +} + +// MaxPayloadSize returns the maximum size for a broadcasted payload. +func (ac *asymmetricClient) MaxPayloadSize() int { + return ac.net.GetMaxMessageLength() +} + +// Stop unregisters the listener callback and stops the channel's identity +// from being tracked. +func (ac *asymmetricClient) Stop() { + // Removes currently tracked identity + ac.net.RemoveIdentity(ac.channel.ReceptionID) + + // Delete all registered services + ac.net.DeleteClientService(ac.channel.ReceptionID) +} + +func (ac *asymmetricClient) Get() crypto.Asymmetric { + return ac.channel +} diff --git a/broadcast/asymmetricClient_test.go b/broadcast/asymmetricClient_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b26e86f7cf143e7711964580aa9d891cd32de20f --- /dev/null +++ b/broadcast/asymmetricClient_test.go @@ -0,0 +1,129 @@ +package broadcast + +import ( + "bytes" + "fmt" + "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/rounds" + crypto "gitlab.com/elixxir/crypto/broadcast" + cMixCrypto "gitlab.com/elixxir/crypto/cmix" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/xx_network/crypto/csprng" + "gitlab.com/xx_network/crypto/signature/rsa" + "gitlab.com/xx_network/primitives/id" + "reflect" + "sync" + "testing" + "time" +) + +// Tests that symmetricClient adheres to the Symmetric interface. +var _ Asymmetric = (*asymmetricClient)(nil) + +// Tests that symmetricClient adheres to the Symmetric interface. +var _ Client = (cmix.Client)(nil) + +func Test_asymmetricClient_Smoke(t *testing.T) { + cMixHandler := newMockCmixHandler() + rngGen := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG) + pk, err := rsa.GenerateKey(rngGen.GetStream(), 8192) + if err != nil { + t.Fatalf("Failed to generate priv key: %+v", err) + } + channel := crypto.Asymmetric{ + ReceptionID: id.NewIdFromString("ReceptionID", id.User, t), + Name: "MyChannel", + Description: "This is my channel about stuff.", + Salt: cMixCrypto.NewSalt(csprng.NewSystemRNG(), 32), + RsaPubKey: pk.GetPublic(), + } + + const n = 5 + cbChans := make([]chan []byte, n) + clients := make([]Asymmetric, n) + for i := range clients { + cbChan := make(chan []byte, 10) + cb := func(payload []byte, _ receptionID.EphemeralIdentity, + _ rounds.Round) { + cbChan <- payload + } + + s := NewAsymmetricClient(channel, cb, newMockCmix(cMixHandler), rngGen) + + cbChans[i] = cbChan + clients[i] = s + + // Test that Get returns the expected channel + if !reflect.DeepEqual(s.Get(), channel) { + t.Errorf("Client %d returned wrong channel."+ + "\nexpected: %+v\nreceived: %+v", i, channel, s.Get()) + } + } + + // Send broadcast from each client + for i := range clients { + payload := make([]byte, newMockCmix(cMixHandler).GetMaxMessageLength()) + copy(payload, + fmt.Sprintf("Hello from client %d of %d.", i, len(clients))) + + // Start processes that waits for each client to receive broadcast + var wg sync.WaitGroup + for j := range cbChans { + wg.Add(1) + go func(i, j int, cbChan chan []byte) { + defer wg.Done() + select { + case r := <-cbChan: + if !bytes.Equal(payload, r) { + t.Errorf("Client %d failed to receive expected "+ + "payload from client %d."+ + "\nexpected: %q\nreceived: %q", j, i, payload, r) + } + case <-time.After(25 * time.Millisecond): + t.Errorf("Client %d timed out waiting for broadcast "+ + "payload from client %d.", j, i) + } + }(i, j, cbChans[j]) + } + + // Broadcast payload + _, _, err := clients[i].Broadcast(pk, payload, cmix.GetDefaultCMIXParams()) + if err != nil { + t.Errorf("Client %d failed to send broadcast: %+v", i, err) + } + + // Wait for all clients to receive payload or time out + wg.Wait() + } + + // Stop each client + for i := range clients { + clients[i].Stop() + } + + payload := make([]byte, newMockCmix(cMixHandler).GetMaxMessageLength()) + copy(payload, "This message should not get through.") + + // Start waiting on channels and error if anything is received + var wg sync.WaitGroup + for i := range cbChans { + wg.Add(1) + go func(i int, cbChan chan []byte) { + defer wg.Done() + select { + case r := <-cbChan: + t.Errorf("Client %d received message: %q", i, r) + case <-time.After(25 * time.Millisecond): + } + }(i, cbChans[i]) + } + + // Broadcast payload + _, _, err = clients[0].Broadcast(pk, payload, cmix.GetDefaultCMIXParams()) + if err != nil { + t.Errorf("Client 0 failed to send broadcast: %+v", err) + } + + wg.Wait() +} diff --git a/broadcast/interface.go b/broadcast/interface.go index 1da049fc8807fd0d170242b4d1c0de9ff68060e4..c12ae350fe9c6b0737b050ccbaf2178685e57af4 100644 --- a/broadcast/interface.go +++ b/broadcast/interface.go @@ -12,11 +12,12 @@ import ( "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/rounds" crypto "gitlab.com/elixxir/crypto/broadcast" + "gitlab.com/xx_network/crypto/multicastRSA" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" ) -// ListenerFunc is registered when creating a new symmetric broadcasting channel +// ListenerFunc is registered when creating a new broadcasting channel // and receives all new broadcast messages for the channel. type ListenerFunc func(payload []byte, receptionID receptionID.EphemeralIdentity, round rounds.Round) @@ -40,3 +41,23 @@ type Symmetric interface { // from being tracked. Stop() } + +// Asymmetric manages the listening and broadcasting of an asymmetric broadcast +// channel. +type Asymmetric interface { + // MaxPayloadSize returns the maximum size for a broadcasted payload. + MaxPayloadSize() int + + // Get returns the crypto.Asymmetric object containing the cryptographic and + // identifying information about the channel. + Get() crypto.Asymmetric + + // Broadcast broadcasts the payload to the channel. The payload size must be + // equal to MaxPayloadSize. + Broadcast(pk multicastRSA.PrivateKey, payload []byte, cMixParams cmix.CMIXParams) ( + id.Round, ephemeral.Id, error) + + // Stop unregisters the listener callback and stops the channel's identity + // from being tracked. + Stop() +} diff --git a/broadcast/processor.go b/broadcast/processor.go index 419072fdc06517f57b0ec1d788d9ec284dc4acfd..1fe50906852714822a8dd7c4d694df60f1b6f361 100644 --- a/broadcast/processor.go +++ b/broadcast/processor.go @@ -20,15 +20,15 @@ const ( errDecrypt = "[BCAST] Failed to decrypt payload for broadcast %s (%q): %+v" ) -// processor manages the reception and decryption of a broadcast message. +// symmetricProcessor manages the reception and decryption of a symmetric broadcast message. // Adheres to the message.Processor interface. -type processor struct { +type symmetricProcessor struct { s *crypto.Symmetric cb ListenerFunc } // Process decrypts the broadcast message and sends the results on the callback. -func (p *processor) Process(msg format.Message, +func (p *symmetricProcessor) Process(msg format.Message, receptionID receptionID.EphemeralIdentity, round rounds.Round) { payload, err := p.s.Decrypt(msg.GetContents(), msg.GetMac(), msg.GetKeyFP()) @@ -40,7 +40,32 @@ func (p *processor) Process(msg format.Message, go p.cb(payload, receptionID, round) } -// String returns a string identifying the processor for debugging purposes. -func (p *processor) String() string { +// String returns a string identifying the symmetricProcessor for debugging purposes. +func (p *symmetricProcessor) String() string { return "symmetricChannel-" + p.s.Name } + +// asymmetricProcessor manages the reception and decryption of an asymmetric broadcast message +// Adheres to the message.Processor interface. +type asymmetricProcessor struct { + ac *crypto.Asymmetric + cb ListenerFunc +} + +// Process decrypts the broadcast message and sends the results on the callback. +func (p *asymmetricProcessor) Process(msg format.Message, + receptionID receptionID.EphemeralIdentity, round rounds.Round) { + + payload, err := p.ac.Decrypt(msg.GetContents()) + if err != nil { + jww.ERROR.Printf(errDecrypt, p.ac.ReceptionID, p.ac.Name, err) + return + } + + go p.cb(payload, receptionID, round) +} + +// String returns a string identifying the asymmetricProcessor for debugging purposes. +func (p *asymmetricProcessor) String() string { + return "symmetricChannel-" + p.ac.Name +} diff --git a/broadcast/processor_test.go b/broadcast/processor_test.go index 04e2f4a791485e7b5b7410f62785164d9d18f753..c7b56e05c92a57d47a2f9c8640ff09668db7d470 100644 --- a/broadcast/processor_test.go +++ b/broadcast/processor_test.go @@ -42,7 +42,7 @@ func Test_processor_Process(t *testing.T) { cbChan <- payload } - p := &processor{ + p := &symmetricProcessor{ s: s, cb: cb, } diff --git a/broadcast/symmetricClient.go b/broadcast/symmetricClient.go index 28d718868a0659f9f2816f7fd2b16a4feb9cb116..22832bbd06caf9b472a93cfeaa20e43bb4cd8127 100644 --- a/broadcast/symmetricClient.go +++ b/broadcast/symmetricClient.go @@ -30,7 +30,7 @@ const ( // Tags. const ( - cMixSendTag = "SymmBcast" + symmCMixSendTag = "SymmBcast" symmetricBroadcastServiceTag = "SymmetricBroadcast" ) @@ -71,8 +71,8 @@ func NewSymmetricClient(channel crypto.Symmetric, listenerCb ListenerFunc, Tag: symmetricBroadcastServiceTag, } - // Create new message processor - p := &processor{ + // Create new message symmetricProcessor + p := &symmetricProcessor{ s: &channel, cb: listenerCb, } @@ -125,7 +125,7 @@ func (s *symmetricClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams) } if cMixParams.DebugTag == cmix.DefaultDebugTag { - cMixParams.DebugTag = cMixSendTag + cMixParams.DebugTag = symmCMixSendTag } return s.net.Send( diff --git a/go.mod b/go.mod index 5cc7bdbc98c27ad9aad0f873ef32a516cc24c9f8..7472776aa336ba3749e8d33cb45ba502a1696211 100644 --- a/go.mod +++ b/go.mod @@ -13,11 +13,11 @@ require ( github.com/spf13/viper v1.7.1 gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f gitlab.com/elixxir/comms v0.0.4-0.20220323190139-9ed75f3a8b2c - gitlab.com/elixxir/crypto v0.0.7-0.20220425192911-a23209a58073 + gitlab.com/elixxir/crypto v0.0.7-0.20220510191648-70e5e956d3d5 gitlab.com/elixxir/ekv v0.1.7 gitlab.com/elixxir/primitives v0.0.3-0.20220330212736-cce83b5f948f gitlab.com/xx_network/comms v0.0.4-0.20220315161313-76acb14429ac - gitlab.com/xx_network/crypto v0.0.5-0.20220317171841-084640957d71 + gitlab.com/xx_network/crypto v0.0.5-0.20220502201458-dabab1ef2982 gitlab.com/xx_network/primitives v0.0.4-0.20220324193139-b292d1ae6e7e go.uber.org/ratelimit v0.2.0 golang.org/x/crypto v0.0.0-20220128200615-198e4374d7ed diff --git a/go.sum b/go.sum index b2b24d4d75babae71f71578a773220de0cf06b99..32805755ccd38e8cb756ad131883bd921f24d28d 100644 --- a/go.sum +++ b/go.sum @@ -317,6 +317,10 @@ gitlab.com/elixxir/crypto v0.0.7-0.20220425192754-f9f5ca6ad2eb h1:K10bnRhnZS6XFx gitlab.com/elixxir/crypto v0.0.7-0.20220425192754-f9f5ca6ad2eb/go.mod h1:JkByWX/TXCjdu6pRJsx+jwttbBGvlAljYSJMImDmt+4= gitlab.com/elixxir/crypto v0.0.7-0.20220425192911-a23209a58073 h1:T0BK30t1F9M8RCdsuMN1bL34fSLNPqjqC+oufnjPh3I= gitlab.com/elixxir/crypto v0.0.7-0.20220425192911-a23209a58073/go.mod h1:JkByWX/TXCjdu6pRJsx+jwttbBGvlAljYSJMImDmt+4= +gitlab.com/elixxir/crypto v0.0.7-0.20220509151343-6e5a514a65fd h1:3lU8kF5ItUZYIuKHF6qrC3o1yiL6fKdF5p1HJRB9iik= +gitlab.com/elixxir/crypto v0.0.7-0.20220509151343-6e5a514a65fd/go.mod h1:cJF80ad9YCR+UcOlZNzfDVBAQqGEEhhs3y5taMEvXaE= +gitlab.com/elixxir/crypto v0.0.7-0.20220510191648-70e5e956d3d5 h1:uzzFrmqx0CnqQ7AInGi6PA8w1mm2032hIpYn2G2SDKA= +gitlab.com/elixxir/crypto v0.0.7-0.20220510191648-70e5e956d3d5/go.mod h1:cJF80ad9YCR+UcOlZNzfDVBAQqGEEhhs3y5taMEvXaE= gitlab.com/elixxir/ekv v0.1.6 h1:M2hUSNhH/ChxDd+s8xBqSEKgoPtmE6hOEBqQ73KbN6A= gitlab.com/elixxir/ekv v0.1.6/go.mod h1:e6WPUt97taFZe5PFLPb1Dupk7tqmDCTQu1kkstqJvw4= gitlab.com/elixxir/ekv v0.1.7 h1:OW2z+N4QCqqMFzouAwFTWWMKz0Y/PDhyYReN7gQ5NiQ= @@ -345,6 +349,8 @@ gitlab.com/xx_network/crypto v0.0.5-0.20220222212031-750f7e8a01f4 h1:95dZDMn/hpL gitlab.com/xx_network/crypto v0.0.5-0.20220222212031-750f7e8a01f4/go.mod h1:6apvsoHCQJDjO0J4E3uhR3yO9tTz/Mq5be5rjB3tQPU= gitlab.com/xx_network/crypto v0.0.5-0.20220317171841-084640957d71 h1:N2+Jja4xNg66entu6rGvzRcf3Vc785xgiaHeDPYnBvg= gitlab.com/xx_network/crypto v0.0.5-0.20220317171841-084640957d71/go.mod h1:/SJf+R75E+QepdTLh0H1/udsovxx2Q5ru34q1v0umKk= +gitlab.com/xx_network/crypto v0.0.5-0.20220502201458-dabab1ef2982 h1:iUMwO/lIeOEmqNTMPtr9d7H6Y0cGe4DaHhd+ISCts6E= +gitlab.com/xx_network/crypto v0.0.5-0.20220502201458-dabab1ef2982/go.mod h1:/SJf+R75E+QepdTLh0H1/udsovxx2Q5ru34q1v0umKk= gitlab.com/xx_network/primitives v0.0.0-20200803231956-9b192c57ea7c/go.mod h1:wtdCMr7DPePz9qwctNoAUzZtbOSHSedcK++3Df3psjA= gitlab.com/xx_network/primitives v0.0.0-20200804183002-f99f7a7284da/go.mod h1:OK9xevzWCaPO7b1wiluVJGk7R5ZsuC7pHY5hteZFQug= gitlab.com/xx_network/primitives v0.0.2/go.mod h1:cs0QlFpdMDI6lAo61lDRH2JZz+3aVkHy+QogOB6F/qc=