Skip to content
Snippets Groups Projects
Commit 1bac77e1 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

more work on channels

parent af82b7b7
No related branches found
No related tags found
5 merge requests!510Release,!419rewrote the health tracker to both consider if there are waiting rounds and...,!371[Channel RSAtoPrivate] Implement Reverse Asymmetric in Client/Broadcast,!354Channels impl,!340Project/channels
...@@ -24,6 +24,8 @@ type broadcastClient struct { ...@@ -24,6 +24,8 @@ type broadcastClient struct {
rng *fastRNG.StreamGenerator rng *fastRNG.StreamGenerator
} }
type NewBroadcastChannelFunc func(channel crypto.Channel, net Client, rng *fastRNG.StreamGenerator) (Channel, error)
// NewBroadcastChannel creates a channel interface based on crypto.Channel, accepts net client connection & callback for received messages // NewBroadcastChannel creates a channel interface based on crypto.Channel, accepts net client connection & callback for received messages
func NewBroadcastChannel(channel crypto.Channel, net Client, rng *fastRNG.StreamGenerator) (Channel, error) { func NewBroadcastChannel(channel crypto.Channel, net Client, rng *fastRNG.StreamGenerator) (Channel, error) {
bc := &broadcastClient{ bc := &broadcastClient{
......
...@@ -13,7 +13,6 @@ import ( ...@@ -13,7 +13,6 @@ import (
"gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/rounds"
crypto "gitlab.com/elixxir/crypto/broadcast" crypto "gitlab.com/elixxir/crypto/broadcast"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/multicastRSA" "gitlab.com/xx_network/crypto/multicastRSA"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
...@@ -41,6 +40,14 @@ type Channel interface { ...@@ -41,6 +40,14 @@ type Channel interface {
Broadcast(payload []byte, cMixParams cmix.CMIXParams) ( Broadcast(payload []byte, cMixParams cmix.CMIXParams) (
id.Round, ephemeral.Id, error) id.Round, ephemeral.Id, error)
// BroadcastWithAssembler broadcasts a payload over a symmetric channel. With
// a payload assembled after the round is selected, allowing the round
// info to be included in the payload.
// Network must be healthy to send
// Requires a payload of size bc.MaxSymmetricPayloadSize()
BroadcastWithAssembler(assembler Assembler, cMixParams cmix.CMIXParams) (
id.Round, ephemeral.Id, error)
// BroadcastAsymmetric broadcasts an asymmetric payload to the channel. The payload size must be // BroadcastAsymmetric broadcasts an asymmetric payload to the channel. The payload size must be
// equal to MaxPayloadSize & private key for channel must be passed in // equal to MaxPayloadSize & private key for channel must be passed in
BroadcastAsymmetric(pk multicastRSA.PrivateKey, payload []byte, cMixParams cmix.CMIXParams) ( BroadcastAsymmetric(pk multicastRSA.PrivateKey, payload []byte, cMixParams cmix.CMIXParams) (
...@@ -54,13 +61,15 @@ type Channel interface { ...@@ -54,13 +61,15 @@ type Channel interface {
Stop() Stop()
} }
// Assembler is a function which allows a bre
type Assembler func(rid id.Round) (payload []byte, err error)
// Client contains the methods from cmix.Client that are required by // Client contains the methods from cmix.Client that are required by
// symmetricClient. // symmetricClient.
type Client interface { type Client interface {
GetMaxMessageLength() int GetMaxMessageLength() int
Send(recipient *id.ID, fingerprint format.Fingerprint, SendWithAssembler(recipient *id.ID, assembler cmix.MessageAssembler,
service message.Service, payload, mac []byte, cmixParams cmix.CMIXParams) (id.Round, ephemeral.Id, error)
cMixParams cmix.CMIXParams) (id.Round, ephemeral.Id, error)
IsHealthy() bool IsHealthy() bool
AddIdentity(id *id.ID, validUntil time.Time, persistent bool) AddIdentity(id *id.ID, validUntil time.Time, persistent bool)
AddService(clientID *id.ID, newService message.Service, AddService(clientID *id.ID, newService message.Service,
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/elixxir/client/cmix" "gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/cmix/message"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
) )
...@@ -38,24 +39,46 @@ func (bc *broadcastClient) maxSymmetricPayload() int { ...@@ -38,24 +39,46 @@ func (bc *broadcastClient) maxSymmetricPayload() int {
// Network must be healthy to send // Network must be healthy to send
// Requires a payload of size bc.MaxSymmetricPayloadSize() // Requires a payload of size bc.MaxSymmetricPayloadSize()
func (bc *broadcastClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams) ( func (bc *broadcastClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams) (
id.Round, ephemeral.Id, error) {
assemble := func(rid id.Round) ([]byte, error) {
return payload, nil
}
return bc.BroadcastWithAssembler(assemble, cMixParams)
}
// BroadcastWithAssembler broadcasts a payload over a symmetric channel. With
// a payload assembled after the round is selected, allowing the round
// info to be included in the payload.
// Network must be healthy to send
// Requires a payload of size bc.MaxSymmetricPayloadSize()
func (bc *broadcastClient) BroadcastWithAssembler(assembler Assembler, cMixParams cmix.CMIXParams) (
id.Round, ephemeral.Id, error) { id.Round, ephemeral.Id, error) {
if !bc.net.IsHealthy() { if !bc.net.IsHealthy() {
return 0, ephemeral.Id{}, errors.New(errNetworkHealth) return 0, ephemeral.Id{}, errors.New(errNetworkHealth)
} }
assemble := func(rid id.Round) (fp format.Fingerprint,
service message.Service, encryptedPayload, mac []byte, err error) {
//assemble the passed payload
payload, err := assembler(rid)
if err != nil {
return format.Fingerprint{}, message.Service{}, nil, nil, err
}
if len(payload) != bc.maxSymmetricPayload() { if len(payload) != bc.maxSymmetricPayload() {
return 0, ephemeral.Id{}, return format.Fingerprint{}, message.Service{}, nil, nil,
errors.Errorf(errPayloadSize, len(payload), bc.maxSymmetricPayload()) errors.Errorf(errPayloadSize, len(payload), bc.maxSymmetricPayload())
} }
// Encrypt payload // Encrypt payload
rng := bc.rng.GetStream() rng := bc.rng.GetStream()
encryptedPayload, mac, fp := bc.channel.EncryptSymmetric(payload, rng) defer rng.Close()
rng.Close() encryptedPayload, mac, fp = bc.channel.EncryptSymmetric(payload, rng)
// Create service using symmetric broadcast service tag & channel reception ID // Create service using symmetric broadcast service tag & channel reception ID
// Allows anybody with this info to listen for messages on this channel // Allows anybody with this info to listen for messages on this channel
service := message.Service{ service = message.Service{
Identifier: bc.channel.ReceptionID.Bytes(), Identifier: bc.channel.ReceptionID.Bytes(),
Tag: symmetricBroadcastServiceTag, Tag: symmetricBroadcastServiceTag,
} }
...@@ -63,7 +86,9 @@ func (bc *broadcastClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams) ...@@ -63,7 +86,9 @@ func (bc *broadcastClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams)
if cMixParams.DebugTag == cmix.DefaultDebugTag { if cMixParams.DebugTag == cmix.DefaultDebugTag {
cMixParams.DebugTag = symmCMixSendTag cMixParams.DebugTag = symmCMixSendTag
} }
return
}
return bc.net.Send( return bc.net.SendWithAssembler(bc.channel.ReceptionID, assemble,
bc.channel.ReceptionID, fp, service, encryptedPayload, mac, cMixParams) cMixParams)
} }
package channels
import (
"errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/elixxir/primitives/states"
"sync"
"time"
"gitlab.com/elixxir/client/cmix/rounds"
cryptoBroadcast "gitlab.com/elixxir/crypto/broadcast"
cryptoChannel "gitlab.com/elixxir/crypto/channel"
"gitlab.com/xx_network/primitives/id"
)
var (
MessageTypeAlreadyRegistered = errors.New("the given message type has " +
"already been registered")
)
type EventModel interface {
JoinChannel(channel cryptoBroadcast.Channel)
LeaveChannel(ChannelID *id.ID)
ReceiveTextMessage(ChannelID *id.ID, MessageID cryptoChannel.MessageID,
messageType MessageType, SenderUsername string, Content []byte,
timestamp time.Time, lease time.Duration, round rounds.Round)
ReceiveAdminTextMessage(ChannelID *id.ID, MessageID cryptoChannel.MessageID,
messageType MessageType, SenderUsername string, Content []byte,
timestamp time.Time, lease time.Duration, round rounds.Round)
}
type MessageTypeReceiveMessage func(ChannelID *id.ID,
MessageID cryptoChannel.MessageID, messageType MessageType,
SenderUsername string, Content []byte, timestamp time.Time,
lease time.Duration, round rounds.Round)
type events struct {
model EventModel
registered map[MessageType]MessageTypeReceiveMessage
mux sync.RWMutex
}
func initEvents(model EventModel) *events {
e := &events{
model: model,
registered: make(map[MessageType]MessageTypeReceiveMessage),
mux: sync.RWMutex{},
}
//set up default message types
e.registered[Text] = e.model.ReceiveTextMessage
e.registered[AdminText] = e.model.ReceiveAdminTextMessage
return e
}
func (e *events) RegisterReceiveHandler(messageType MessageType,
listener MessageTypeReceiveMessage) error {
e.mux.Lock()
defer e.mux.Unlock()
//check if the type is already registered
if _, exists := e.registered[messageType]; exists {
return MessageTypeAlreadyRegistered
}
//register the message type
e.registered[messageType] = listener
jww.INFO.Printf("Registered Listener for Message Type %s", messageType)
return nil
}
func (e *events) hear(chID *id.ID, umi *UserMessageInternal,
receptionID receptionID.EphemeralIdentity, round rounds.Round) {
um := umi.GetUserMessage()
cm := umi.GetChannelMessage()
messageType := MessageType(cm.PayloadType)
//check if the type is already registered
e.mux.RLock()
listener, exists := e.registered[messageType]
e.mux.RUnlock()
if !exists {
jww.WARN.Printf("Received message from %s on channel %s in "+
"round %d which could not be handled due to unregistered message "+
"type %s; Contents: %v", um.Username, chID, round.ID, messageType,
cm.Payload)
}
//Call the listener. This is already in an instanced event, no new thread needed.
listener(chID, umi.GetMessageID(), messageType, um.Username,
cm.Payload, round.Timestamps[states.QUEUED], time.Duration(cm.Lease), round)
return
}
package channels
import (
"crypto/ed25519"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/broadcast"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/primitives/id"
"time"
)
type genericUserListener struct {
name NameService
events *events
chID *id.ID
}
func (gul *genericUserListener) Listen(payload []byte,
receptionID receptionID.EphemeralIdentity, round rounds.Round) {
//Remove the padding
payloadUnpadded, err := broadcast.DecodeSizedBroadcast(payload)
if err != nil {
jww.WARN.Printf("Failed to strip the padding on User Message "+
"on channel %s", gul.chID)
return
}
//Decode the message as a user message
umi, err := UnmarshalUserMessageInternal(payloadUnpadded)
if err != nil {
jww.WARN.Printf("Failed to unmarshal User Message on "+
"channel %s", gul.chID)
return
}
um := umi.GetUserMessage()
cm := umi.GetChannelMessage()
msgID := umi.GetMessageID()
/*CRYPTOGRAPHICALLY RELEVANT CHECKS*/
// check the round to ensure the message is not a replay
if id.Round(cm.RoundID) != round.ID {
jww.WARN.Printf("The round message %s send on %d referenced "+
"(%d) was not the same as the round the message was found on (%d)",
msgID, gul.chID, cm.RoundID, round.ID, gul.chID)
return
}
// check that the username lease is valid
usernameLeaseEnd := time.Unix(0, um.UsernameLease)
if usernameLeaseEnd.After(round.Timestamps[states.QUEUED]) {
jww.WARN.Printf("Message %s on channel %s purportedly from %s "+
"has an expired lease, ended %s, round %d was sent at %s", msgID,
gul.chID, um.Username, usernameLeaseEnd, round.ID,
round.Timestamps[states.QUEUED])
return
}
// check that the signature from the nameserver is valid
if !gul.name.ValidateChannelMessage(um.Username,
time.Unix(0, um.UsernameLease), um.ECCPublicKey, um.ValidationSignature) {
jww.WARN.Printf("Message %s on channel %s purportedly from %s "+
"failed the check of its Name Server with signature %v", msgID,
gul.chID, um.Username, um.ValidationSignature)
return
}
// check that the user properly signed the message
if !ed25519.Verify(um.ECCPublicKey, um.Message, um.Signature) {
jww.WARN.Printf("Message %s on channel %s purportedly from %s "+
"failed its user signature with signature %v", msgID,
gul.chID, um.Username, um.Signature)
return
}
//TODO: Processing of the message relative to admin commands will be here
//Submit the message to the event model for listening
gul.events.hear(gul.chID, umi, receptionID, round)
return
}
package channels
type Manager interface {
}
package channels
import (
"encoding/json"
"errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/broadcast"
"gitlab.com/elixxir/client/storage/versioned"
cryptoBroadcast "gitlab.com/elixxir/crypto/broadcast"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime"
)
const (
joinedChannelsVersion = 0
joinedChannelsKey = "JoinedChannelsKey"
joinedChannelVersion = 0
joinedChannelKey = "JoinedChannelKey-"
)
var ChannelAlreadyExistsErr = errors.New("the channel cannot be added " +
"becasue it already exists")
var ChannelDoesNotExistsErr = errors.New("the channel cannot be found")
// store Stores the list of joined channels to disk while taking the read lock
func (m *manager) store() error {
m.mux.RLock()
defer m.mux.RUnlock()
return m.storeUnsafe()
}
// storeUnsafe Stores the list of joined channels to disk without taking the
// read lock. Must be used by another function which has already taken the read
// lock
func (m *manager) storeUnsafe() error {
channelsList := m.getChannelsUnsafe()
data, err := json.Marshal(&channelsList)
if err != nil {
return err
}
obj := &versioned.Object{
Version: joinedChannelsVersion,
Timestamp: netTime.Now(),
Data: data,
}
return m.kv.Set(joinedChannelsKey,
joinedChannelsVersion, obj)
}
// loadChannels loads all currently joined channels from disk and registers
// them for message reception
func (m *manager) loadChannels() map[*id.ID]*joinedChannel {
obj, err := m.kv.Get(joinedChannelsKey,
joinedChannelsVersion)
if err != nil {
jww.FATAL.Panicf("Failed to load channels %+v", err)
}
chList := make([]*id.ID, 0, len(m.channels))
if err = json.Unmarshal(obj.Data, &chList); err != nil {
jww.FATAL.Panicf("Failed to load channels %+v", err)
}
chMap := make(map[*id.ID]*joinedChannel)
for i := range chList {
jc, err := loadJoinedChannel(chList[i], m.kv, m.client, m.rng, m.name,
&m.events, m.broadcastMaker)
if err != nil {
jww.FATAL.Panicf("Failed to load channel %s: %+v",
chList[i], err)
}
chMap[chList[i]] = jc
}
return chMap
}
//addChannel Adds a channel
func (m *manager) addChannel(channel cryptoBroadcast.Channel) error {
m.mux.Lock()
defer m.mux.Unlock()
if _, exists := m.channels[channel.ReceptionID]; exists {
return ChannelAlreadyExistsErr
}
b, err := m.broadcastMaker(channel, m.client, m.rng)
if err != nil {
return err
}
//Connect to listeners
err = b.RegisterListener((&genericUserListener{
name: m.name,
events: &m.events,
chID: channel.ReceptionID,
}).Listen, broadcast.Symmetric)
if err != nil {
return err
}
jc := &joinedChannel{
broadcast: b,
}
if err = jc.Store(m.kv); err != nil {
return err
}
if err = m.storeUnsafe(); err != nil {
return err
}
return nil
}
//getChannel returns the given channel, if it exists
func (m *manager) getChannel(channelId *id.ID) (*joinedChannel, error) {
m.mux.RLock()
defer m.mux.RUnlock()
jc, exists := m.channels[channelId]
if !exists {
return nil, ChannelDoesNotExistsErr
}
return jc, nil
}
//getChannels returns the ids of all channels that have been joined
//use getChannelsUnsafe if you already have taken the mux
func (m *manager) getChannels() []*id.ID {
m.mux.Lock()
defer m.mux.Unlock()
return m.getChannelsUnsafe()
}
//getChannelsUnsafe returns the ids of all channels that have been joined
//is unsafe because it does not take the mux, only use when under a lock.
func (m *manager) getChannelsUnsafe() []*id.ID {
list := make([]*id.ID, 0, len(m.channels))
for chID := range m.channels {
list = append(list, chID)
}
return list
}
// joinedChannel which holds channel info. Will expand to include admin data,
// so will be treated as a struct for now
type joinedChannel struct {
broadcast broadcast.Channel
}
// joinedChannelDisk is the representation for storage
type joinedChannelDisk struct {
broadcast cryptoBroadcast.Channel
}
//Store writes the given channel to a unique storage location within the EKV
func (jc *joinedChannel) Store(kv *versioned.KV) error {
jcd := joinedChannelDisk{broadcast: jc.broadcast.Get()}
data, err := json.Marshal(&jcd)
if err != nil {
return err
}
obj := &versioned.Object{
Version: joinedChannelVersion,
Timestamp: netTime.Now(),
Data: data,
}
return kv.Set(makeJoinedChannelKey(jc.broadcast.Get().ReceptionID),
joinedChannelVersion, obj)
}
//loadJoinedChannel loads a given channel from ekv storage
func loadJoinedChannel(chId *id.ID, kv *versioned.KV, net broadcast.Client,
rngGen *fastRNG.StreamGenerator, name NameService, e *events,
broadcastMaker broadcast.NewBroadcastChannelFunc) (*joinedChannel, error) {
obj, err := kv.Get(makeJoinedChannelKey(chId), joinedChannelVersion)
if err != nil {
return nil, err
}
jcd := &joinedChannelDisk{}
err = json.Unmarshal(obj.Data, jcd)
if err != nil {
return nil, err
}
b, err := broadcastMaker(jcd.broadcast, net, rngGen)
if err != nil {
return nil, err
}
err = b.RegisterListener((&genericUserListener{
name: name,
events: e,
chID: jcd.broadcast.ReceptionID,
}).Listen, broadcast.Symmetric)
if err != nil {
return nil, err
}
jc := &joinedChannel{broadcast: b}
return jc, nil
}
func makeJoinedChannelKey(chId *id.ID) string {
return joinedChannelKey + chId.HexEncode()
}
package channels
import (
"github.com/golang/protobuf/proto"
"gitlab.com/elixxir/client/broadcast"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/storage/versioned"
cryptoChannel "gitlab.com/elixxir/crypto/channel"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"sync"
"time"
)
type manager struct {
//List of all channels
channels map[*id.ID]*joinedChannel
mux sync.RWMutex
//External references
kv *versioned.KV
client broadcast.Client
rng *fastRNG.StreamGenerator
name NameService
//Events model
events
broadcastMaker broadcast.NewBroadcastChannelFunc
}
func (m *manager) Send(channelID *id.ID, msg []byte, validUntil time.Duration,
messageType MessageType, params cmix.CMIXParams) (cryptoChannel.MessageID, id.Round, ephemeral.Id,
error) {
//find the channel
ch, err := m.getChannel(channelID)
if err != nil {
return cryptoChannel.MessageID{}, 0, ephemeral.Id{}, err
}
var msgId cryptoChannel.MessageID
//Note: we are not checking check if message is too long before trying to
//find a round
//Build the function pointer that will build the message
assemble := func(rid id.Round) ([]byte, error) {
//Build the message
chMsg := &ChannelMessage{
Lease: validUntil.Nanoseconds(),
RoundID: uint64(rid),
PayloadType: uint32(messageType),
Payload: msg,
}
//Serialize the message
chMsgSerial, err := proto.Marshal(chMsg)
if err != nil {
return nil, err
}
//Sign the message
messageSig, err := m.name.SignChannelMessage(chMsgSerial)
if err != nil {
return nil, err
}
//Build the user message
validationSig, unameLease := m.name.GetChannelValidationSignature()
usrMsg := &UserMessage{
Message: chMsgSerial,
ValidationSignature: validationSig,
Signature: messageSig,
Username: m.name.GetUsername(),
ECCPublicKey: m.name.GetChannelPubkey(),
UsernameLease: unameLease.Unix(),
}
//Serialize the user message
usrMsgSerial, err := proto.Marshal(usrMsg)
if err != nil {
return nil, err
}
//Fill in any extra bits in the payload to ensure it is the right size
usrMsgSerialSized, err := broadcast.NewSizedBroadcast(
ch.broadcast.MaxAsymmetricPayloadSize(), usrMsgSerial)
if err != nil {
return nil, err
}
msgId = cryptoChannel.MakeMessageID(usrMsgSerialSized)
return usrMsgSerialSized, nil
}
//TODO: send the send message over to reception manually so it is added to
//the database early
rid, ephid, err := ch.broadcast.BroadcastWithAssembler(assemble, params)
return msgId, rid, ephid, err
}
package channels
import "fmt"
type MessageType uint32
const (
Text = MessageType(1)
AdminText = MessageType(2)
)
func (mt MessageType) String() string {
switch mt {
case Text:
return "Text"
default:
return fmt.Sprintf("Unknown messageType %d", mt)
}
}
...@@ -9,51 +9,65 @@ package channels ...@@ -9,51 +9,65 @@ package channels
import ( import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"sync" "gitlab.com/elixxir/crypto/channel"
) )
// UserMessageInternal is the internal structure of a UserMessage protobuf. // UserMessageInternal is the internal structure of a UserMessage protobuf.
type UserMessageInternal struct { type UserMessageInternal struct {
mux sync.RWMutex
userMessage *UserMessage userMessage *UserMessage
channelMessage *ChannelMessage channelMessage *ChannelMessage
messageID channel.MessageID
} }
func NewUserMessageInternal(ursMsg *UserMessage) *UserMessageInternal { func NewUserMessageInternal(ursMsg *UserMessage) (*UserMessageInternal, error) {
chanMessage := &ChannelMessage{}
err := proto.Unmarshal(ursMsg.Message, chanMessage)
if err != nil {
return nil, err
}
channelMessage := chanMessage
return &UserMessageInternal{ return &UserMessageInternal{
mux: sync.RWMutex{},
userMessage: ursMsg, userMessage: ursMsg,
channelMessage: nil, channelMessage: channelMessage,
messageID: channel.MakeMessageID(channelMessage.Payload),
}, nil
} }
func UnmarshalUserMessageInternal(usrMsg []byte) (*UserMessageInternal, error) {
um := &UserMessage{}
if err := proto.Unmarshal(usrMsg, um); err != nil {
return nil, err
}
chanMessage := &ChannelMessage{}
err := proto.Unmarshal(um.Message, chanMessage)
if err != nil {
return nil, err
}
channelMessage := chanMessage
return &UserMessageInternal{
userMessage: um,
channelMessage: channelMessage,
}, nil
} }
// GetUserMessage retrieves the UserMessage within // GetUserMessage retrieves the UserMessage within
// UserMessageInternal. // UserMessageInternal.
func (umi *UserMessageInternal) GetUserMessage() *UserMessage { func (umi *UserMessageInternal) GetUserMessage() *UserMessage {
umi.mux.RLock()
umi.mux.RUnlock()
return umi.userMessage return umi.userMessage
} }
// GetChannelMessage retrieves the ChannelMessage within // GetChannelMessage retrieves the ChannelMessage within
// UserMessageInternal. This is a lazy getter which will // UserMessageInternal.
// deserialize the ChannelMessage within the UserMessage.Message field. func (umi *UserMessageInternal) GetChannelMessage() *ChannelMessage {
// This deserialized ChannelMessage will then be placed into return umi.channelMessage
// UserMessageInternal's channelMessage field and return. On subsequent calls it will return
// the message stored in UserMessageInternal.
func (umi *UserMessageInternal) GetChannelMessage() (*ChannelMessage, error) {
umi.mux.Lock()
defer umi.mux.Unlock()
if umi.channelMessage == nil {
chanMessage := &ChannelMessage{}
err := proto.Unmarshal(umi.userMessage.Message, chanMessage)
if err != nil {
return nil, err
}
umi.channelMessage = chanMessage
} }
return umi.channelMessage, nil // GetMessageID retrieves the messageID for the message.
func (umi *UserMessageInternal) GetMessageID() channel.MessageID {
return umi.messageID
} }
...@@ -30,7 +30,7 @@ func TestUserMessageInternal_GetChannelMessage(t *testing.T) { ...@@ -30,7 +30,7 @@ func TestUserMessageInternal_GetChannelMessage(t *testing.T) {
Username: "hunter", Username: "hunter",
} }
internal := NewUserMessageInternal(usrMsg) internal, _ := NewUserMessageInternal(usrMsg)
received, err := internal.GetChannelMessage() received, err := internal.GetChannelMessage()
if err != nil { if err != nil {
t.Fatalf("GetChannelMessage error: %v", err) t.Fatalf("GetChannelMessage error: %v", err)
......
package channels
import (
"crypto/ed25519"
"time"
)
// NameService is an interface which encapsulates
// the user identity channel tracking service.
type NameService interface {
// GetUsername returns the username.
GetUsername() string
// GetChannelValidationSignature returns the validation
// signature and the time it was signed.
GetChannelValidationSignature() (signature []byte, lease time.Time)
// GetChannelPubkey returns the user's public key.
GetChannelPubkey() ed25519.PublicKey
// SignChannelMessage returns the signature of the
// given message.
SignChannelMessage(message []byte) (signature []byte, err error)
// ValidateChannelMessage
ValidateChannelMessage(username string, lease time.Time,
pubKey ed25519.PublicKey, authorIDSignature []byte) bool
}
...@@ -80,6 +80,24 @@ type Client interface { ...@@ -80,6 +80,24 @@ type Client interface {
SendMany(messages []TargetedCmixMessage, p CMIXParams) ( SendMany(messages []TargetedCmixMessage, p CMIXParams) (
id.Round, []ephemeral.Id, error) id.Round, []ephemeral.Id, error)
// SendWithAssembler sends a variable cmix payload to the provided recipient.
// The payload sent is based on the Complier function passed in, which accepts
// a round ID and returns the necessary payload data.
// Returns the round ID of the round the payload was sent or an error if it
// fails.
// This does not have end-to-end encryption on it and is used exclusively as
// a send for higher order cryptographic protocols. Do not use unless
// implementing a protocol on top.
// recipient - cMix ID of the recipient.
// assembler - MessageAssembler function, accepting round ID and returning
// fingerprint
// format.Fingerprint, service message.Service, payload, mac []byte
// Will return an error if the network is unhealthy or if it fails to send
// (along with the reason). Blocks until successful sends or errors.
// WARNING: Do not roll your own crypto.
SendWithAssembler(recipient *id.ID, assembler MessageAssembler,
cmixParams CMIXParams) (id.Round, ephemeral.Id, error)
/* === Message Reception ================================================ */ /* === Message Reception ================================================ */
/* Identities are all network identities which the client is currently /* Identities are all network identities which the client is currently
trying to pick up message on. An identity must be added to receive trying to pick up message on. An identity must be added to receive
...@@ -290,12 +308,15 @@ type Client interface { ...@@ -290,12 +308,15 @@ type Client interface {
type ClientErrorReport func(source, message, trace string) type ClientErrorReport func(source, message, trace string)
// MessageAssembler func accepts a round ID, returning fingerprint, service, payload & mac. // MessageAssembler func accepts a round ID, returning fingerprint, service,
// This allows users to pass in a paylaod which will contain the round ID over which the message is sent. // payload & mac. This allows users to pass in a paylaod which will contain the
type MessageAssembler func(rid id.Round) (fingerprint format.Fingerprint, service message.Service, payload, mac []byte) // round ID over which the message is sent.
type MessageAssembler func(rid id.Round) (fingerprint format.Fingerprint,
service message.Service, payload, mac []byte, err error)
// messageAssembler is an internal wrapper around MessageAssembler which returns a format.message // messageAssembler is an internal wrapper around MessageAssembler which
// This is necessary to preserve the interaction between sendCmixHelper and critical messages // returns a format.message This is necessary to preserve the interaction
// between sendCmixHelper and critical messages
type messageAssembler func(rid id.Round) (format.Message, error) type messageAssembler func(rid id.Round) (format.Message, error)
type clientCommsInterface interface { type clientCommsInterface interface {
......
...@@ -58,8 +58,9 @@ func (c *client) Send(recipient *id.ID, fingerprint format.Fingerprint, ...@@ -58,8 +58,9 @@ func (c *client) Send(recipient *id.ID, fingerprint format.Fingerprint,
service message.Service, payload, mac []byte, cmixParams CMIXParams) ( service message.Service, payload, mac []byte, cmixParams CMIXParams) (
id.Round, ephemeral.Id, error) { id.Round, ephemeral.Id, error) {
// create an internal assembler function to pass to sendWithAssembler // create an internal assembler function to pass to sendWithAssembler
assembler := func(rid id.Round) (format.Fingerprint, message.Service, []byte, []byte) { assembler := func(rid id.Round) (format.Fingerprint, message.Service,
return fingerprint, service, payload, mac []byte, []byte, error) {
return fingerprint, service, payload, mac, nil
} }
return c.sendWithAssembler(recipient, assembler, cmixParams) return c.sendWithAssembler(recipient, assembler, cmixParams)
} }
...@@ -98,7 +99,11 @@ func (c *client) sendWithAssembler(recipient *id.ID, assembler MessageAssembler, ...@@ -98,7 +99,11 @@ func (c *client) sendWithAssembler(recipient *id.ID, assembler MessageAssembler,
// Create an internal messageAssembler which returns a format.Message // Create an internal messageAssembler which returns a format.Message
assemblerFunc := func(rid id.Round) (format.Message, error) { assemblerFunc := func(rid id.Round) (format.Message, error) {
fingerprint, service, payload, mac := assembler(rid) fingerprint, service, payload, mac, err := assembler(rid)
if err != nil {
return format.Message{}, err
}
if len(payload) != c.maxMsgLen { if len(payload) != c.maxMsgLen {
return format.Message{}, errors.Errorf( return format.Message{}, errors.Errorf(
......
...@@ -2,8 +2,6 @@ module gitlab.com/elixxir/client ...@@ -2,8 +2,6 @@ module gitlab.com/elixxir/client
go 1.17 go 1.17
replace gitlab.com/elixxir/crypto => /home/human/code/crypto
require ( require (
github.com/cloudflare/circl v1.2.0 github.com/cloudflare/circl v1.2.0
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
...@@ -15,7 +13,7 @@ require ( ...@@ -15,7 +13,7 @@ require (
github.com/spf13/viper v1.12.0 github.com/spf13/viper v1.12.0
gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f
gitlab.com/elixxir/comms v0.0.4-0.20220805121030-b95005ac4528 gitlab.com/elixxir/comms v0.0.4-0.20220805121030-b95005ac4528
gitlab.com/elixxir/crypto v0.0.7-0.20220808171640-473891de4c46 gitlab.com/elixxir/crypto v0.0.7-0.20220822200404-0be5ac9167ba
gitlab.com/elixxir/ekv v0.1.7 gitlab.com/elixxir/ekv v0.1.7
gitlab.com/elixxir/primitives v0.0.3-0.20220606195757-40f7a589347f gitlab.com/elixxir/primitives v0.0.3-0.20220606195757-40f7a589347f
gitlab.com/xx_network/comms v0.0.4-0.20220630163702-f3d372ef6acd gitlab.com/xx_network/comms v0.0.4-0.20220630163702-f3d372ef6acd
......
...@@ -431,6 +431,8 @@ gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp0 ...@@ -431,6 +431,8 @@ gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp0
gitlab.com/elixxir/crypto v0.0.7-0.20220317172048-3de167bd9406/go.mod h1:tD6XjtQh87T2nKZL5I/pYPck5M2wLpkZ1Oz7H/LqO10= gitlab.com/elixxir/crypto v0.0.7-0.20220317172048-3de167bd9406/go.mod h1:tD6XjtQh87T2nKZL5I/pYPck5M2wLpkZ1Oz7H/LqO10=
gitlab.com/elixxir/crypto v0.0.7-0.20220808171640-473891de4c46 h1:C8nAiMnL8IOGjQ5qErbpzAjRMVFMoB1GunYk8pGOEz8= gitlab.com/elixxir/crypto v0.0.7-0.20220808171640-473891de4c46 h1:C8nAiMnL8IOGjQ5qErbpzAjRMVFMoB1GunYk8pGOEz8=
gitlab.com/elixxir/crypto v0.0.7-0.20220808171640-473891de4c46/go.mod h1:Oy+VWQ2Sa0Ybata3oTV+Yc46hkaDwAsuIMW0wJ01z2M= gitlab.com/elixxir/crypto v0.0.7-0.20220808171640-473891de4c46/go.mod h1:Oy+VWQ2Sa0Ybata3oTV+Yc46hkaDwAsuIMW0wJ01z2M=
gitlab.com/elixxir/crypto v0.0.7-0.20220822200404-0be5ac9167ba h1:aaz9Xxm1EooDzr644KdSPg+iVkyztndQ6+DfLIBIuv0=
gitlab.com/elixxir/crypto v0.0.7-0.20220822200404-0be5ac9167ba/go.mod h1:Oy+VWQ2Sa0Ybata3oTV+Yc46hkaDwAsuIMW0wJ01z2M=
gitlab.com/elixxir/ekv v0.1.7 h1:OW2z+N4QCqqMFzouAwFTWWMKz0Y/PDhyYReN7gQ5NiQ= gitlab.com/elixxir/ekv v0.1.7 h1:OW2z+N4QCqqMFzouAwFTWWMKz0Y/PDhyYReN7gQ5NiQ=
gitlab.com/elixxir/ekv v0.1.7/go.mod h1:e6WPUt97taFZe5PFLPb1Dupk7tqmDCTQu1kkstqJvw4= gitlab.com/elixxir/ekv v0.1.7/go.mod h1:e6WPUt97taFZe5PFLPb1Dupk7tqmDCTQu1kkstqJvw4=
gitlab.com/elixxir/primitives v0.0.0-20200731184040-494269b53b4d/go.mod h1:OQgUZq7SjnE0b+8+iIAT2eqQF+2IFHn73tOo+aV11mg= gitlab.com/elixxir/primitives v0.0.0-20200731184040-494269b53b4d/go.mod h1:OQgUZq7SjnE0b+8+iIAT2eqQF+2IFHn73tOo+aV11mg=
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment