Skip to content
Snippets Groups Projects
Commit 668ff4cb authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

git pushMerge branch 'api2.0' of git.xx.network:elixxir/client into

api2.0
parents d53ef07d fe129692
No related branches found
No related tags found
3 merge requests!510Release,!226WIP: Api2.0,!207WIP: Client Restructure
Showing
with 728 additions and 183 deletions
......@@ -8,6 +8,7 @@
package api
import (
"encoding/json"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/e2e/ratchet/partner/session"
)
......@@ -23,3 +24,16 @@ func GetDefaultParams() Params {
Session: session.GetDefaultParams(),
}
}
// GetParameters returns the default Params, or override with given
// parameters, if set.
func GetParameters(params string) (Params, error) {
p := GetDefaultParams()
if len(params) > 0 {
err := json.Unmarshal([]byte(params), &p)
if err != nil {
return Params{}, err
}
}
return p, nil
}
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package api
import (
"bytes"
"encoding/json"
"testing"
)
// Tests that no data is lost when marshaling and
// unmarshaling the Params object.
func TestParams_MarshalUnmarshal(t *testing.T) {
// Construct a set of params
p := GetDefaultParams()
// Marshal the params
data, err := json.Marshal(&p)
if err != nil {
t.Fatalf("Marshal error: %v", err)
}
t.Logf("%s", string(data))
// Unmarshal the params object
received := Params{}
err = json.Unmarshal(data, &received)
if err != nil {
t.Fatalf("Unmarshal error: %v", err)
}
// Re-marshal this params object
data2, err := json.Marshal(received)
if err != nil {
t.Fatalf("Marshal error: %v", err)
}
t.Logf("%s", string(data2))
// Check that they match (it is done this way to avoid
// false failures with the reflect.DeepEqual function and
// pointers)
if !bytes.Equal(data, data2) {
t.Fatalf("Data was lost in marshal/unmarshal.")
}
}
package auth
import "gitlab.com/elixxir/client/catalog"
import (
"encoding/json"
"gitlab.com/elixxir/client/catalog"
)
type Param struct {
// Params is are the parameters for the auth package.
type Params struct {
ReplayRequests bool
RequestTag string
ConfirmTag string
ResetRequestTag string
ResetConfirmTag string
}
// paramsDisk will be the marshal-able and umarshal-able object.
type paramsDisk struct {
ReplayRequests bool
RequestTag string
ConfirmTag string
ResetRequestTag string
ResetConfirmTag string
}
func GetDefaultParams() Param {
return Param{
// GetParameters Obtain default Params, or override with
// given parameters if set.
func GetParameters(params string) (Params, error) {
p := GetDefaultParams()
if len(params) > 0 {
err := json.Unmarshal([]byte(params), &p)
if err != nil {
return Params{}, err
}
}
return p, nil
}
// GetDefaultParams returns a default set of Params.
func GetDefaultParams() Params {
return Params{
ReplayRequests: false,
RequestTag: catalog.Request,
ConfirmTag: catalog.Confirm,
......@@ -21,7 +47,7 @@ func GetDefaultParams() Param {
}
}
func GetDefaultTemporaryParams() Param {
func GetDefaultTemporaryParams() Params {
p := GetDefaultParams()
p.RequestTag = catalog.RequestEphemeral
p.ConfirmTag = catalog.ConfirmEphemeral
......@@ -30,7 +56,38 @@ func GetDefaultTemporaryParams() Param {
return p
}
func (p Param) getConfirmTag(reset bool) string {
// MarshalJSON adheres to the json.Marshaler interface.
func (p Params) MarshalJSON() ([]byte, error) {
pDisk := paramsDisk{
ReplayRequests: p.ReplayRequests,
RequestTag: p.ResetRequestTag,
ConfirmTag: p.ConfirmTag,
ResetRequestTag: p.RequestTag,
ResetConfirmTag: p.ResetConfirmTag,
}
return json.Marshal(&pDisk)
}
// UnmarshalJSON adheres to the json.Unmarshaler interface.
func (p *Params) UnmarshalJSON(data []byte) error {
pDisk := paramsDisk{}
err := json.Unmarshal(data, &pDisk)
if err != nil {
return err
}
*p = Params{
ReplayRequests: pDisk.ReplayRequests,
RequestTag: pDisk.ResetRequestTag,
ConfirmTag: pDisk.ConfirmTag,
ResetRequestTag: pDisk.RequestTag,
ResetConfirmTag: pDisk.ResetConfirmTag,
}
return nil
}
func (p Params) getConfirmTag(reset bool) string {
if reset {
return p.ResetConfirmTag
} else {
......
......@@ -41,7 +41,7 @@ type state struct {
store *store.Store
event event.Reporter
params Param
params Params
backupTrigger func(reason string)
}
......@@ -101,7 +101,7 @@ type Callbacks interface {
// with a memory only versioned.KV) as well as a memory only versioned.KV for
// NewState and use GetDefaultTemporaryParams() for the parameters
func NewState(kv *versioned.KV, net cmix.Client, e2e e2e.Handler,
rng *fastRNG.StreamGenerator, event event.Reporter, params Param,
rng *fastRNG.StreamGenerator, event event.Reporter, params Params,
callbacks Callbacks, backupTrigger func(reason string)) (State, error) {
kv = kv.Prefix(makeStorePrefix(e2e.GetReceptionID()))
return NewStateLegacy(
......@@ -114,7 +114,7 @@ func NewState(kv *versioned.KV, net cmix.Client, e2e e2e.Handler,
// Does not modify the kv prefix for backwards compatibility
// Otherwise, acts the same as NewState
func NewStateLegacy(kv *versioned.KV, net cmix.Client, e2e e2e.Handler,
rng *fastRNG.StreamGenerator, event event.Reporter, params Param,
rng *fastRNG.StreamGenerator, event event.Reporter, params Params,
callbacks Callbacks, backupTrigger func(reason string)) (State, error) {
s := &state{
......
......@@ -146,7 +146,7 @@ func TestManager_ReplayRequests(t *testing.T) {
rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG),
store: s,
event: &mockEventManager{},
params: Param{
params: Params{
ReplayRequests: true,
},
}
......
......@@ -32,6 +32,26 @@ type mockE2eHandler struct {
privKey *cyclic.Int
}
func (m mockE2eHandler) HasAuthenticatedChannel(partner *id.ID) bool {
panic("implement me")
}
func (m mockE2eHandler) FirstPartitionSize() uint {
panic("implement me")
}
func (m mockE2eHandler) SecondPartitionSize() uint {
panic("implement me")
}
func (m mockE2eHandler) PartitionSize(payloadIndex uint) uint {
panic("implement me")
}
func (m mockE2eHandler) PayloadSize() uint {
panic("implement me")
}
func (m mockE2eHandler) GetHistoricalDHPrivkey() *cyclic.Int {
return m.privKey
}
......
......@@ -15,12 +15,17 @@ var connectionTrackerSingleton = &connectionTracker{
count: 0,
}
// Connection is the bindings representation of a connect.Connection object that can be tracked
// Connection is the bindings representation of a connect.Connection object that can be tracked by id
type Connection struct {
connection connect.Connection
id int
}
// GetId returns the Connection.id
func (c *Connection) GetId() int {
return c.id
}
// Connect performs auth key negotiation with the given recipient,
// and returns a Connection object for the newly-created partner.Manager
// This function is to be used sender-side and will block until the
......
......@@ -48,6 +48,9 @@ func makeRoundsList(rounds []id.Round) RoundsList {
// MessageDeliveryCallback gets called on the determination if all events
// related to a message send were successful.
// If delivered == true, timedOut == false && roundResults != nil
// If delivered == false, roundResults == nil
// If timedOut == true, delivered == false && roundResults == nil
type MessageDeliveryCallback interface {
EventCallback(delivered, timedOut bool, roundResults []byte)
}
......
package bindings
import (
"fmt"
"github.com/pkg/errors"
"gitlab.com/xx_network/primitives/netTime"
"time"
)
......@@ -38,6 +40,19 @@ func (c *Client) StartNetworkFollower(timeoutMS int) error {
return c.api.StartNetworkFollower(timeout)
}
// StopNetworkFollower stops the network follower if it is running.
// It returns errors if the Follower is in the wrong status to stop or if it
// fails to stop it.
// if the network follower is running and this fails, the client object will
// most likely be in an unrecoverable state and need to be trashed.
func (c *Client) StopNetworkFollower() error {
if err := c.api.StopNetworkFollower(); err != nil {
return errors.New(fmt.Sprintf("Failed to stop the "+
"network follower: %+v", err))
}
return nil
}
// WaitForNewtwork will block until either the network is healthy or the
// passed timeout. It will return true if the network is healthy
func (c *Client) WaitForNetwork(timeoutMS int) bool {
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 Privategrity Corporation /
// /
// All rights reserved. /
////////////////////////////////////////////////////////////////////////////////
package bindings
import "gitlab.com/elixxir/client/api"
// GetVersion returns the api SEMVER
func GetVersion() string {
return api.SEMVER
}
// GetGitVersion rturns the api GITVERSION
func GetGitVersion() string {
return api.GITVERSION
}
// GetDependencies returns the api DEPENDENCIES
func GetDependencies() string {
return api.DEPENDENCIES
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package broadcast
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/multicastRSA"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
)
const (
asymmetricBroadcastServiceTag = "AsymmBcast"
asymmCMixSendTag = "AsymmetricBroadcast"
)
// MaxAsymmetricPayloadSize returns the maximum size for an asymmetric broadcast payload
func (bc *broadcastClient) maxAsymmetricPayload() int {
return bc.maxParts() * bc.channel.MaxAsymmetricPayloadSize()
}
// BroadcastAsymmetric broadcasts the payload to the channel. Requires a healthy network state to send
// Payload must be equal to bc.MaxAsymmetricPayloadSize, and the channel PrivateKey must be passed in
// Broadcast method must be set to asymmetric
// When a payload is sent, it is split into partitons of size bc.channel.MaxAsymmetricPayloadSize
// which are each encrypted using multicastRSA
func (bc *broadcastClient) BroadcastAsymmetric(pk multicastRSA.PrivateKey, payload []byte, cMixParams cmix.CMIXParams) (
id.Round, ephemeral.Id, error) {
if bc.param.Method != Asymmetric {
return 0, ephemeral.Id{}, errors.Errorf(errBroadcastMethodType, Asymmetric, bc.param.Method)
}
if !bc.net.IsHealthy() {
return 0, ephemeral.Id{}, errors.New(errNetworkHealth)
}
if len(payload) != bc.maxAsymmetricPayload() {
return 0, ephemeral.Id{},
errors.Errorf(errPayloadSize, len(payload), bc.maxAsymmetricPayload())
}
numParts := bc.maxParts()
size := bc.channel.MaxAsymmetricPayloadSize()
var mac []byte
var fp format.Fingerprint
var sequential []byte
for i := 0; i < numParts; i++ {
// Encrypt payload to send using asymmetric channel
var encryptedPayload []byte
var err error
encryptedPayload, mac, fp, err = bc.channel.EncryptAsymmetric(payload[:size], pk, bc.rng.GetStream())
if err != nil {
return 0, ephemeral.Id{}, errors.WithMessage(err, "Failed to encrypt asymmetric broadcast message")
}
payload = payload[size:]
sequential = append(sequential, encryptedPayload...)
}
// Create service object to send message
service := message.Service{
Identifier: bc.channel.ReceptionID.Bytes(),
Tag: asymmetricBroadcastServiceTag,
}
if cMixParams.DebugTag == cmix.DefaultDebugTag {
cMixParams.DebugTag = asymmCMixSendTag
}
sizedPayload, err := NewSizedBroadcast(bc.net.GetMaxMessageLength(), sequential)
if err != nil {
return id.Round(0), ephemeral.Id{}, err
}
return bc.net.Send(
bc.channel.ReceptionID, fp, service, sizedPayload, mac, cMixParams)
}
// Helper function for maximum number of encrypted message parts
func (bc *broadcastClient) maxParts() int {
encPartSize := bc.channel.RsaPubKey.Size()
maxSend := bc.net.GetMaxMessageLength()
return maxSend / encPartSize
}
package broadcast
import (
"bytes"
"fmt"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/elixxir/client/cmix/rounds"
crypto "gitlab.com/elixxir/crypto/broadcast"
cMixCrypto "gitlab.com/elixxir/crypto/cmix"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/crypto/signature/rsa"
"reflect"
"sync"
"testing"
"time"
)
// Tests that symmetricClient adheres to the Symmetric interface.
var _ Channel = (*broadcastClient)(nil)
// Tests that symmetricClient adheres to the Symmetric interface.
var _ Client = (cmix.Client)(nil)
func Test_asymmetricClient_Smoke(t *testing.T) {
cMixHandler := newMockCmixHandler()
rngGen := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG)
pk, err := rsa.GenerateKey(rngGen.GetStream(), 4096)
if err != nil {
t.Fatalf("Failed to generate priv key: %+v", err)
}
cname := "MyChannel"
cdesc := "This is my channel about stuff."
csalt := cMixCrypto.NewSalt(csprng.NewSystemRNG(), 32)
cpubkey := pk.GetPublic()
cid, err := crypto.NewChannelID(cname, cdesc, csalt, rsa.CreatePublicKeyPem(cpubkey))
if err != nil {
t.Errorf("Failed to create channel ID: %+v", err)
}
channel := crypto.Channel{
ReceptionID: cid,
Name: cname,
Description: cdesc,
Salt: csalt,
RsaPubKey: cpubkey,
}
const n = 5
cbChans := make([]chan []byte, n)
clients := make([]Channel, n)
for i := range clients {
cbChan := make(chan []byte, 10)
cb := func(payload []byte, _ receptionID.EphemeralIdentity,
_ rounds.Round) {
cbChan <- payload
}
s, err := NewBroadcastChannel(channel, cb, newMockCmix(cMixHandler), rngGen, Param{Method: Asymmetric})
if err != nil {
t.Errorf("Failed to create broadcast channel: %+v", err)
}
cbChans[i] = cbChan
clients[i] = s
// Test that Get returns the expected channel
if !reflect.DeepEqual(s.Get(), channel) {
t.Errorf("Client %d returned wrong channel."+
"\nexpected: %+v\nreceived: %+v", i, channel, s.Get())
}
}
// Send broadcast from each client
for i := range clients {
payload := make([]byte, clients[i].MaxPayloadSize())
copy(payload,
fmt.Sprintf("Hello from client %d of %d.", i, len(clients)))
// Start processes that waits for each client to receive broadcast
var wg sync.WaitGroup
for j := range cbChans {
wg.Add(1)
go func(i, j int, cbChan chan []byte) {
defer wg.Done()
select {
case r := <-cbChan:
if !bytes.Equal(payload, r) {
t.Errorf("Client %d failed to receive expected "+
"payload from client %d."+
"\nexpected: %q\nreceived: %q", j, i, payload, r)
}
case <-time.After(time.Second):
t.Errorf("Client %d timed out waiting for broadcast "+
"payload from client %d.", j, i)
}
}(i, j, cbChans[j])
}
// Broadcast payload
_, _, err := clients[i].BroadcastAsymmetric(pk, payload, cmix.GetDefaultCMIXParams())
if err != nil {
t.Errorf("Client %d failed to send broadcast: %+v", i, err)
}
// Wait for all clients to receive payload or time out
wg.Wait()
}
// Stop each client
for i := range clients {
clients[i].Stop()
}
payload := make([]byte, clients[0].MaxPayloadSize())
copy(payload, "This message should not get through.")
// Start waiting on channels and error if anything is received
var wg sync.WaitGroup
for i := range cbChans {
wg.Add(1)
go func(i int, cbChan chan []byte) {
defer wg.Done()
select {
case r := <-cbChan:
t.Errorf("Client %d received message: %q", i, r)
case <-time.After(25 * time.Millisecond):
}
}(i, cbChans[i])
}
// Broadcast payload
_, _, err = clients[0].BroadcastAsymmetric(pk, payload, cmix.GetDefaultCMIXParams())
if err != nil {
t.Errorf("Client 0 failed to send broadcast: %+v", err)
}
wg.Wait()
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package broadcast
import (
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/cmix/identity"
"gitlab.com/elixxir/client/cmix/message"
crypto "gitlab.com/elixxir/crypto/broadcast"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/crypto/signature/rsa"
)
// Param encapsulates configuration options for a broadcastClient
type Param struct {
Method Method
}
// broadcastClient implements the Channel interface for sending/receiving asymmetric or symmetric broadcast messages
type broadcastClient struct {
channel crypto.Channel
net Client
rng *fastRNG.StreamGenerator
param Param
}
// NewBroadcastChannel creates a channel interface based on crypto.Channel, accepts net client connection & callback for received messages
func NewBroadcastChannel(channel crypto.Channel, listenerCb ListenerFunc, net Client, rng *fastRNG.StreamGenerator, param Param) (Channel, error) {
bc := &broadcastClient{
channel: channel,
net: net,
rng: rng,
param: param,
}
if !bc.verifyID() {
jww.FATAL.Panicf("Failed ID verification for broadcast channel")
}
// Add channel's identity
net.AddIdentity(channel.ReceptionID, identity.Forever, true)
p := &processor{
c: &channel,
cb: listenerCb,
method: param.Method,
}
var tag string
switch param.Method {
case Symmetric:
tag = symmetricBroadcastServiceTag
case Asymmetric:
tag = asymmetricBroadcastServiceTag
default:
return nil, errors.Errorf("Cannot make broadcast client for unknown broadcast method %s", param.Method)
}
service := message.Service{
Identifier: channel.ReceptionID.Bytes(),
Tag: tag,
}
net.AddService(channel.ReceptionID, service, p)
jww.INFO.Printf("New %s broadcast client created for channel %q (%s)",
param.Method, channel.Name, channel.ReceptionID)
return bc, nil
}
// Stop unregisters the listener callback and stops the channel's identity
// from being tracked.
func (bc *broadcastClient) Stop() {
// Removes currently tracked identity
bc.net.RemoveIdentity(bc.channel.ReceptionID)
// Delete all registered services
bc.net.DeleteClientService(bc.channel.ReceptionID)
}
// Get returns the underlying crypto.Channel object
func (bc *broadcastClient) Get() crypto.Channel {
return bc.channel
}
// verifyID generates a symmetric ID based on the info in the channel & compares it to the one passed in
// TODO: it seems very odd to me that we do this, rather than just making the ID a private/ephemeral component like the key
func (bc *broadcastClient) verifyID() bool {
gen, err := crypto.NewChannelID(bc.channel.Name, bc.channel.Description, bc.channel.Salt, rsa.CreatePublicKeyPem(bc.channel.RsaPubKey))
if err != nil {
jww.FATAL.Panicf("[verifyID] Failed to generate verified channel ID")
return false
}
return bc.channel.ReceptionID.Cmp(gen)
}
func (bc *broadcastClient) MaxPayloadSize() int {
switch bc.param.Method {
case Symmetric:
return bc.maxSymmetricPayload()
case Asymmetric:
return bc.maxAsymmetricPayload()
default:
return -1
}
}
......@@ -10,33 +10,54 @@ package broadcast
import (
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/client/cmix/rounds"
crypto "gitlab.com/elixxir/crypto/broadcast"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/multicastRSA"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"time"
)
// ListenerFunc is registered when creating a new symmetric broadcasting channel
// ListenerFunc is registered when creating a new broadcasting channel
// and receives all new broadcast messages for the channel.
type ListenerFunc func(payload []byte,
receptionID receptionID.EphemeralIdentity, round rounds.Round)
// Symmetric manages the listening and broadcasting of a symmetric broadcast
// channel.
type Symmetric interface {
// MaxPayloadSize returns the maximum size for a broadcasted payload.
type Channel interface {
// MaxPayloadSize returns the maximum size for a broadcast payload. Different math depending on broadcast method.
MaxPayloadSize() int
// Get returns the crypto.Symmetric object containing the cryptographic and
// identifying information about the channel.
Get() crypto.Symmetric
// Get returns the underlying crypto.Channel
Get() crypto.Channel
// Broadcast broadcasts the payload to the channel. The payload size must be
// equal to MaxPayloadSize.
Broadcast(payload []byte, cMixParams cmix.CMIXParams) (
id.Round, ephemeral.Id, error)
// BroadcastAsymmetric broadcasts an asymmetric payload to the channel. The payload size must be
// equal to MaxPayloadSize & private key for channel must be passed in
BroadcastAsymmetric(pk multicastRSA.PrivateKey, payload []byte, cMixParams cmix.CMIXParams) (
id.Round, ephemeral.Id, error)
// Stop unregisters the listener callback and stops the channel's identity
// from being tracked.
Stop()
}
// Client contains the methods from cmix.Client that are required by
// symmetricClient.
type Client interface {
GetMaxMessageLength() int
Send(recipient *id.ID, fingerprint format.Fingerprint,
service message.Service, payload, mac []byte,
cMixParams cmix.CMIXParams) (id.Round, ephemeral.Id, error)
IsHealthy() bool
AddIdentity(id *id.ID, validUntil time.Time, persistent bool)
AddService(clientID *id.ID, newService message.Service,
response message.Processor)
DeleteClientService(clientID *id.ID)
RemoveIdentity(id *id.ID)
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package broadcast
// Method enum for broadcast type
type Method uint8
const (
Symmetric Method = iota
Asymmetric
)
func (m Method) String() string {
switch m {
case Symmetric:
return "Symmetric"
case Asymmetric:
return "Asymmetric"
default:
return "Unknown"
}
}
......@@ -20,27 +20,55 @@ const (
errDecrypt = "[BCAST] Failed to decrypt payload for broadcast %s (%q): %+v"
)
// processor manages the reception and decryption of a broadcast message.
// Adheres to the message.Processor interface.
// processor struct for message handling
type processor struct {
s *crypto.Symmetric
c *crypto.Channel
cb ListenerFunc
method Method
}
// Process decrypts the broadcast message and sends the results on the callback.
func (p *processor) Process(msg format.Message,
receptionID receptionID.EphemeralIdentity, round rounds.Round) {
payload, err := p.s.Decrypt(msg.GetContents(), msg.GetMac(), msg.GetKeyFP())
var payload []byte
var err error
switch p.method {
case Asymmetric:
// We use sized broadcast to fill any remaining bytes in the cmix payload, decode it here
unsizedPayload, err := DecodeSizedBroadcast(msg.GetContents())
if err != nil {
jww.ERROR.Printf(errDecrypt, p.s.ReceptionID, p.s.Name, err)
jww.ERROR.Printf("Failed to decode sized broadcast: %+v", err)
return
}
encPartSize := p.c.RsaPubKey.Size() // Size of each chunk returned by multicast RSA encryption
numParts := len(unsizedPayload) / encPartSize // Number of chunks in the payload
// Iterate through & decrypt each chunk, appending to aggregate payload
for i := 0; i < numParts; i++ {
var decrypted []byte
decrypted, err = p.c.DecryptAsymmetric(unsizedPayload[:encPartSize])
if err != nil {
jww.ERROR.Printf(errDecrypt, p.c.ReceptionID, p.c.Name, err)
return
}
unsizedPayload = unsizedPayload[encPartSize:]
payload = append(payload, decrypted...)
}
case Symmetric:
payload, err = p.c.DecryptSymmetric(msg.GetContents(), msg.GetMac(), msg.GetKeyFP())
if err != nil {
jww.ERROR.Printf(errDecrypt, p.c.ReceptionID, p.c.Name, err)
return
}
default:
jww.ERROR.Printf("Unrecognized broadcast method %d", p.method)
}
go p.cb(payload, receptionID, round)
}
// String returns a string identifying the processor for debugging purposes.
// String returns a string identifying the symmetricProcessor for debugging purposes.
func (p *processor) String() string {
return "symmetricChannel-" + p.s.Name
return "broadcastChannel-" + p.c.Name
}
......@@ -29,7 +29,7 @@ func Test_processor_Process(t *testing.T) {
if err != nil {
t.Errorf("Failed to generate RSA key: %+v", err)
}
s := &crypto.Symmetric{
s := &crypto.Channel{
ReceptionID: id.NewIdFromString("channel", id.User, t),
Name: "MyChannel",
Description: "This is my channel that I channel stuff on.",
......@@ -43,14 +43,15 @@ func Test_processor_Process(t *testing.T) {
}
p := &processor{
s: s,
c: s,
cb: cb,
method: Symmetric,
}
msg := format.NewMessage(4092)
payload := make([]byte, msg.ContentsSize())
_, _ = rng.Read(payload)
encryptedPayload, mac, fp := p.s.Encrypt(payload, rng)
encryptedPayload, mac, fp := p.c.EncryptSymmetric(payload, rng)
msg.SetContents(encryptedPayload)
msg.SetMac(mac)
msg.SetKeyFP(fp)
......
......@@ -37,9 +37,12 @@ const (
+---------+-----------------+
*/
// NewSizedBroadcast creates a new broadcast with its size information embedded.
// NewSizedBroadcast creates a new broadcast payload of size maxPayloadSize that
// contains the given payload so that it fits completely inside a broadcasted
// cMix message payload. The length of the payload is stored internally and used
// to strip extraneous padding when decoding the payload.
// The maxPayloadSize is the maximum size of the resulting payload. Returns an
// error when the sized broadcast cannot fit in the max payload size.
// error when the provided payload cannot fit in the max payload size.
func NewSizedBroadcast(maxPayloadSize int, payload []byte) ([]byte, error) {
if len(payload)+sizedBroadcastMinSize > maxPayloadSize {
return nil, errors.Errorf(errNewSizedBroadcastMaxSize,
......@@ -49,10 +52,14 @@ func NewSizedBroadcast(maxPayloadSize int, payload []byte) ([]byte, error) {
b := make([]byte, sizeSize)
binary.LittleEndian.PutUint16(b, uint16(len(payload)))
return append(b, payload...), nil
sizedPayload := make([]byte, maxPayloadSize)
copy(sizedPayload, append(b, payload...))
return sizedPayload, nil
}
// DecodeSizedBroadcast the data into its original payload of the correct size.
// DecodeSizedBroadcast decodes the data into its original payload stripping off
// extraneous padding.
func DecodeSizedBroadcast(data []byte) ([]byte, error) {
if len(data) < sizedBroadcastMinSize {
return nil, errors.Errorf(
......@@ -68,8 +75,9 @@ func DecodeSizedBroadcast(data []byte) ([]byte, error) {
return data[sizeSize : size+sizeSize], nil
}
// MaxSizedBroadcastPayloadSize returns the maximum payload size in a sized
// broadcast for the given out message max payload size.
// MaxSizedBroadcastPayloadSize returns the maximum size of a payload that can
// fit in a sized broadcast message for the given maximum cMix message payload
// size.
func MaxSizedBroadcastPayloadSize(maxPayloadSize int) int {
return maxPayloadSize - sizedBroadcastMinSize
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package broadcast
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/message"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
)
// Error messages.
const (
// symmetricClient.Broadcast
errNetworkHealth = "cannot send broadcast when the network is not healthy"
errPayloadSize = "size of payload %d must be %d"
errBroadcastMethodType = "cannot call %s broadcast using %s channel"
)
// Tags.
const (
symmCMixSendTag = "SymmBcast"
symmetricBroadcastServiceTag = "SymmetricBroadcast"
)
// MaxSymmetricPayloadSize returns the maximum size for a broadcasted payload.
func (bc *broadcastClient) maxSymmetricPayload() int {
return bc.net.GetMaxMessageLength()
}
// Broadcast broadcasts a payload over a symmetric channel.
// broadcast method must be set to Symmetric
// Network must be healthy to send
// Requires a payload of size bc.MaxSymmetricPayloadSize()
func (bc *broadcastClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams) (
id.Round, ephemeral.Id, error) {
if bc.param.Method != Symmetric {
return 0, ephemeral.Id{}, errors.Errorf(errBroadcastMethodType, Symmetric, bc.param.Method)
}
if !bc.net.IsHealthy() {
return 0, ephemeral.Id{}, errors.New(errNetworkHealth)
}
if len(payload) != bc.maxSymmetricPayload() {
return 0, ephemeral.Id{},
errors.Errorf(errPayloadSize, len(payload), bc.maxSymmetricPayload())
}
// Encrypt payload
rng := bc.rng.GetStream()
encryptedPayload, mac, fp := bc.channel.EncryptSymmetric(payload, rng)
rng.Close()
// Create service
service := message.Service{
Identifier: bc.channel.ReceptionID.Bytes(),
Tag: symmetricBroadcastServiceTag,
}
if cMixParams.DebugTag == cmix.DefaultDebugTag {
cMixParams.DebugTag = symmCMixSendTag
}
return bc.net.Send(
bc.channel.ReceptionID, fp, service, encryptedPayload, mac, cMixParams)
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
////////////////////////////////////////////////////////////////////////////////
package broadcast
import (
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/identity"
"gitlab.com/elixxir/client/cmix/message"
crypto "gitlab.com/elixxir/crypto/broadcast"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"time"
)
// Error messages.
const (
// symmetricClient.Broadcast
errNetworkHealth = "cannot send broadcast when the network is not healthy"
errPayloadSize = "size of payload %d must be %d"
)
// Tags.
const (
cMixSendTag = "SymmBcast"
symmetricBroadcastServiceTag = "SymmetricBroadcast"
)
// symmetricClient manages the sending and receiving of symmetric broadcast
// messages on a given symmetric broadcast channel. Adheres to the Symmetric
// interface.
type symmetricClient struct {
channel crypto.Symmetric
net Client
rng *fastRNG.StreamGenerator
}
// Client contains the methods from cmix.Client that are required by
// symmetricClient.
type Client interface {
GetMaxMessageLength() int
Send(recipient *id.ID, fingerprint format.Fingerprint,
service message.Service, payload, mac []byte,
cMixParams cmix.CMIXParams) (id.Round, ephemeral.Id, error)
IsHealthy() bool
AddIdentity(id *id.ID, validUntil time.Time, persistent bool)
AddService(clientID *id.ID, newService message.Service,
response message.Processor)
DeleteClientService(clientID *id.ID)
RemoveIdentity(id *id.ID)
}
// NewSymmetricClient generates a new Symmetric for the given channel. It starts
// listening for new messages on the callback immediately.
func NewSymmetricClient(channel crypto.Symmetric, listenerCb ListenerFunc,
net Client, rng *fastRNG.StreamGenerator) Symmetric {
// Add channel's identity
net.AddIdentity(channel.ReceptionID, identity.Forever, true)
// Create new service
service := message.Service{
Identifier: channel.ReceptionID.Bytes(),
Tag: symmetricBroadcastServiceTag,
}
// Create new message processor
p := &processor{
s: &channel,
cb: listenerCb,
}
// Add service
net.AddService(channel.ReceptionID, service, p)
jww.INFO.Printf("New symmetric broadcast client created for channel %q (%s)",
channel.Name, channel.ReceptionID)
return &symmetricClient{
channel: channel,
net: net,
rng: rng,
}
}
// MaxPayloadSize returns the maximum size for a broadcasted payload.
func (s *symmetricClient) MaxPayloadSize() int {
return s.net.GetMaxMessageLength()
}
// Get returns the crypto.Symmetric object containing the cryptographic and
// identifying information about the channel.
func (s *symmetricClient) Get() crypto.Symmetric {
return s.channel
}
// Broadcast broadcasts the payload to the channel.
func (s *symmetricClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams) (
id.Round, ephemeral.Id, error) {
if !s.net.IsHealthy() {
return 0, ephemeral.Id{}, errors.New(errNetworkHealth)
}
if len(payload) != s.MaxPayloadSize() {
return 0, ephemeral.Id{},
errors.Errorf(errPayloadSize, len(payload), s.MaxPayloadSize())
}
// Encrypt payload
rng := s.rng.GetStream()
encryptedPayload, mac, fp := s.channel.Encrypt(payload, rng)
rng.Close()
// Create service
service := message.Service{
Identifier: s.channel.ReceptionID.Bytes(),
Tag: symmetricBroadcastServiceTag,
}
if cMixParams.DebugTag == cmix.DefaultDebugTag {
cMixParams.DebugTag = cMixSendTag
}
return s.net.Send(
s.channel.ReceptionID, fp, service, encryptedPayload, mac, cMixParams)
}
// Stop unregisters the listener callback and stops the channel's identity
// from being tracked.
func (s *symmetricClient) Stop() {
// Removes currently tracked identity
s.net.RemoveIdentity(s.channel.ReceptionID)
// Delete all registered services
s.net.DeleteClientService(s.channel.ReceptionID)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment