Skip to content
Snippets Groups Projects
Commit 9cf7a004 authored by Jake Taylor's avatar Jake Taylor :lips:
Browse files

Merge branch 'hotfix/RestructureGroupChat' into 'restructure'

refactor send group chat code

See merge request !194
parents 79485048 d6d8e637
Branches
Tags
4 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast,!194refactor send group chat code
......@@ -13,7 +13,6 @@ import (
"gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/e2e"
gs "gitlab.com/elixxir/client/groupChat/groupStore"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/storage/versioned"
......@@ -33,7 +32,6 @@ const (
// Manager handles the list of groups a user is a part of.
type Manager struct {
e2e e2e.Handler
net interfaces.NetworkManager
receptionId *id.ID
rng *fastRNG.StreamGenerator
......@@ -46,13 +44,13 @@ type Manager struct {
}
// NewManager creates a new group chat manager
func NewManager(services network.Manager, e2e e2e.Handler, net interfaces.NetworkManager, receptionId *id.ID,
rng *fastRNG.StreamGenerator, grp *cyclic.Group, userDhKey *cyclic.Int,
kv *versioned.KV, requestFunc RequestCallback, receiveFunc ReceiveCallback) (*Manager, error) {
func NewManager(services network.Manager, e2e e2e.Handler, receptionId *id.ID,
rng *fastRNG.StreamGenerator, grp *cyclic.Group, kv *versioned.KV,
requestFunc RequestCallback, receiveFunc ReceiveCallback) (*Manager, error) {
// Load the group chat storage or create one if one does not exist
gStore, err := gs.NewOrLoadStore(
kv, group.Member{ID: receptionId, DhKey: userDhKey})
kv, group.Member{ID: receptionId, DhKey: e2e.GetDefaultHistoricalDHPubkey()})
if err != nil {
return nil, errors.Errorf(newGroupStoreErr, err)
}
......@@ -60,7 +58,6 @@ func NewManager(services network.Manager, e2e e2e.Handler, net interfaces.Networ
// Define the manager object
m := &Manager{
e2e: e2e,
net: net,
rng: rng,
receptionId: receptionId,
grp: grp,
......@@ -87,16 +84,13 @@ func NewManager(services network.Manager, e2e e2e.Handler, net interfaces.Networ
continue
}
err = m.JoinGroup(g)
if err != nil {
return nil, err
}
m.joinGroup(g)
}
return m, nil
}
// JoinGroup adds the group to the list of group chats the user is a part of.
// JoinGroup adds the group to storage, and enables requisite services.
// An error is returned if the user is already part of the group or if the
// maximum number of groups have already been joined.
func (m Manager) JoinGroup(g gs.Group) error {
......@@ -104,15 +98,19 @@ func (m Manager) JoinGroup(g gs.Group) error {
return errors.Errorf(joinGroupErr, g.ID, err)
}
m.joinGroup(g)
jww.DEBUG.Printf("Joined group %q with ID %s.", g.Name, g.ID)
return nil
}
// joinGroup adds the group services
func (m Manager) joinGroup(g gs.Group) {
newService := message.Service{
Identifier: g.ID[:],
Tag: catalog.Group,
Metadata: g.ID[:],
}
m.services.AddService(m.receptionId, newService, &receptionProcessor{m: &m, g: g})
jww.DEBUG.Printf("Joined group %q with ID %s.", g.Name, g.ID)
return nil
}
// LeaveGroup removes a group from a list of groups the user is a part of.
......
......@@ -16,7 +16,6 @@ import (
"gitlab.com/elixxir/crypto/group"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/primitives/id"
"time"
)
......@@ -42,71 +41,70 @@ func (p *receptionProcessor) Process(message format.Message, receptionID recepti
jww.TRACE.Print("Group message reception received cMix message.")
// Attempt to read the message
roundTimeStamp := round.Timestamps[states.QUEUED]
msgID, timestamp, senderID, msg, err := decryptMessage(p.g, message, roundTimeStamp)
roundTimestamp := round.Timestamps[states.QUEUED]
// Unmarshal cMix message contents to get public message format
pubMsg, err := unmarshalPublicMsg(message.GetContents())
if err != nil {
jww.WARN.Printf("Failed to unmarshal: %+v", errors.Errorf(unmarshalPublicMsgErr, err))
}
// Obtain the cryptKey for the public message
key, err := getCryptKey(p.g.Key, pubMsg.GetSalt(), message.GetMac(),
pubMsg.GetPayload(), p.g.DhKeys, roundTimestamp)
if err != nil {
jww.WARN.Printf("Unable to getCryptKey: %+v", err)
return
}
// Decrypt the message payload using the cryptKey
result, err := decryptMessage(p.g, message.GetKeyFP(), key, pubMsg.GetPayload())
if err != nil {
jww.WARN.Printf("Group message reception failed to read "+
"cMix message: %+v", err)
return
}
// Populate remaining fields from the top level
result.GroupID = p.g.ID
result.RecipientID = receptionID.Source
result.EphemeralID = receptionID.EphId
result.RoundID = round.ID
result.RoundTimestamp = roundTimestamp
jww.DEBUG.Printf("Received group message with ID %s from sender "+
"%s in group %s with ID %s at %s.", msgID, senderID, p.g.Name,
p.g.ID, timestamp)
"%s in group %s with ID %s at %s.", result.ID, result.SenderID, p.g.Name,
p.g.ID, result.Timestamp)
// If the message was read correctly, send it to the callback
go p.m.receiveFunc(MessageReceive{
GroupID: p.g.ID,
ID: msgID,
Payload: msg,
SenderID: senderID,
RecipientID: receptionID.Source,
EphemeralID: receptionID.EphId,
Timestamp: timestamp,
RoundID: round.ID,
RoundTimestamp: roundTimeStamp,
})
p.m.receiveFunc(result)
}
// decryptMessage decrypts the group message payload and returns its message ID,
// timestamp, sender ID, and message contents.
func decryptMessage(g gs.Group, cMixMsg format.Message, roundTimestamp time.Time) (
group.MessageID, time.Time, *id.ID, []byte, error) {
// Unmarshal cMix message contents to get public message format
pubMsg, err := unmarshalPublicMsg(cMixMsg.GetContents())
if err != nil {
return group.MessageID{}, time.Time{}, nil, nil,
errors.Errorf(unmarshalPublicMsgErr, err)
}
key, err := getCryptKey(g.Key, pubMsg.GetSalt(), cMixMsg.GetMac(),
pubMsg.GetPayload(), g.DhKeys, roundTimestamp)
if err != nil {
return group.MessageID{}, time.Time{}, nil, nil, err
}
func decryptMessage(g gs.Group, fingerprint format.Fingerprint, key group.CryptKey, payload []byte) (
MessageReceive, error) {
// Decrypt internal message
decryptedPayload := group.Decrypt(key, cMixMsg.GetKeyFP(),
pubMsg.GetPayload())
decryptedPayload := group.Decrypt(key, fingerprint, payload)
// Unmarshal internal message
intlMsg, err := unmarshalInternalMsg(decryptedPayload)
if err != nil {
return group.MessageID{}, time.Time{}, nil, nil,
errors.Errorf(unmarshalInternalMsgErr, err)
return MessageReceive{}, errors.Errorf(unmarshalInternalMsgErr, err)
}
// Unmarshal sender ID
senderID, err := intlMsg.GetSenderID()
if err != nil {
return group.MessageID{}, time.Time{}, nil, nil,
errors.Errorf(unmarshalSenderIdErr, err)
return MessageReceive{}, errors.Errorf(unmarshalSenderIdErr, err)
}
messageID := group.NewMessageID(g.ID, intlMsg.Marshal())
return messageID, intlMsg.GetTimestamp(), senderID, intlMsg.GetPayload(), nil
return MessageReceive{
ID: group.NewMessageID(g.ID, intlMsg.Marshal()),
Payload: intlMsg.GetPayload(),
SenderID: senderID,
Timestamp: intlMsg.GetTimestamp(),
}, nil
}
// getCryptKey generates the decryption key for a group internal message. The
......@@ -117,6 +115,7 @@ func decryptMessage(g gs.Group, cMixMsg format.Message, roundTimestamp time.Time
// DH key is tried until there is a match.
func getCryptKey(key group.Key, salt [group.SaltLen]byte, mac, payload []byte,
dhKeys gs.DhKeyList, roundTimestamp time.Time) (group.CryptKey, error) {
// Compute the current epoch
epoch := group.ComputeEpoch(roundTimestamp)
......
......@@ -47,7 +47,7 @@ func (l *requestListener) Hear(item receive.Message) {
jww.DEBUG.Printf("Received group request for "+
"group %s with ID %s.", g.Name, g.ID)
go l.m.requestFunc(g)
l.m.requestFunc(g)
}
}
......@@ -56,7 +56,7 @@ func (l *requestListener) Name() string {
return catalog.GroupRq
}
// readRequest returns the group describes in the group request message. An
// readRequest returns the group described in the group request message. An
// error is returned if the request is of the wrong type or cannot be read.
func (m *Manager) readRequest(msg receive.Message) (gs.Group, error) {
// Return an error if the message is not of the right type
......
......@@ -14,6 +14,7 @@ import (
gs "gitlab.com/elixxir/client/groupChat/groupStore"
"gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/crypto/cyclic"
"gitlab.com/elixxir/crypto/group"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
......@@ -36,121 +37,89 @@ const (
saltReadLengthErr = "length of generated salt %d != %d required"
)
// Send sends a message to all group members using Client.SendManyCMIX. The
// send fails if the message is too long.
func (m *Manager) Send(groupID *id.ID, message []byte) (id.Round, time.Time, group.MessageID,
error) {
// Send sends a message to all group members using Client.SendManyCMIX.
// The send fails if the message is too long.
func (m *Manager) Send(groupID *id.ID, message []byte) (id.Round, time.Time, group.MessageID, error) {
// Get the relevant group
g, exists := m.GetGroup(groupID)
if !exists {
return 0, time.Time{}, group.MessageID{},
errors.Errorf(newNoGroupErr, groupID)
}
// get the current time stripped of the monotonic clock
timeNow := netTime.Now().Round(0)
// Create a cMix message for each group member
messages, msgID, err := m.createMessages(groupID, message, timeNow)
groupMessages, err := m.newMessages(g, message, timeNow)
if err != nil {
return 0, time.Time{}, group.MessageID{}, errors.Errorf(newCmixMsgErr, err)
}
// Obtain message ID
msgId, err := getGroupMessageId(m.grp, groupID, m.receptionId, timeNow, message)
if err != nil {
return 0, time.Time{}, group.MessageID{}, err
}
// Send all the groupMessages
param := network.GetDefaultCMIXParams()
param.DebugTag = "group.Message"
rid, _, err := m.net.SendManyCMIX(messages, param)
rid, _, err := m.services.SendManyCMIX(groupMessages, param)
if err != nil {
return 0, time.Time{}, group.MessageID{},
errors.Errorf(sendManyCmixErr, m.gs.GetUser().ID, groupID, err)
errors.Errorf(sendManyCmixErr, m.receptionId, groupID, err)
}
jww.DEBUG.Printf("Sent message to %d members in group %s at %s.",
len(messages), groupID, timeNow)
return rid, timeNow, msgID, nil
len(groupMessages), groupID, timeNow)
return rid, timeNow, msgId, nil
}
// createMessages generates a list of cMix messages and a list of corresponding
// recipient IDs.
func (m *Manager) createMessages(groupID *id.ID, msg []byte, timestamp time.Time) (
[]network.TargetedCmixMessage, group.MessageID, error) {
//make the message ID
cmixMsg := format.NewMessage(m.grp.GetP().ByteLen())
_, intlMsg, err := newMessageParts(cmixMsg.ContentsSize())
if err != nil {
return nil, group.MessageID{}, errors.WithMessage(err, "Failed to make message parts for message ID")
}
messageID := group.NewMessageID(groupID, setInternalPayload(intlMsg, timestamp, m.gs.GetUser().ID, msg))
g, exists := m.gs.Get(groupID)
if !exists {
return []network.TargetedCmixMessage{}, group.MessageID{},
errors.Errorf(newNoGroupErr, groupID)
}
NewMessages, err := m.newMessages(g, msg, timestamp)
return NewMessages, messageID, err
}
// newMessages is a private function that allows the passing in of a timestamp
// and streamGen instead of a fastRNG.StreamGenerator for easier testing.
// newMessages quickly builds messages for all group chat members in multiple threads
func (m *Manager) newMessages(g gs.Group, msg []byte, timestamp time.Time) (
[]network.TargetedCmixMessage, error) {
// Create list of cMix messages
messages := make([]network.TargetedCmixMessage, 0, len(g.Members))
// Create channels to receive messages and errors on
type msgInfo struct {
msg format.Message
id *id.ID
}
msgChan := make(chan network.TargetedCmixMessage, len(g.Members)-1)
errChan := make(chan error, len(g.Members)-1)
rng := m.rng.GetStream()
defer rng.Close()
// Create cMix messages in parallel
for i, member := range g.Members {
for _, member := range g.Members {
// Do not send to the sender
if m.gs.GetUser().ID.Cmp(member.ID) {
if m.receptionId.Cmp(member.ID) {
continue
}
// Start thread to build cMix message
go func(member group.Member, i int) {
// Create new stream
rng := m.rng.GetStream()
defer rng.Close()
// Add cMix message to list
msgChan <- network.TargetedCmixMessage{
Recipient: member.ID,
Payload: msg,
Service: message.Service{
Identifier: g.ID[:],
Tag: catalog.Group,
Metadata: g.ID[:],
},
}
}(member, i)
}
// Wait for messages or errors
for len(messages) < len(g.Members)-1 {
select {
case err := <-errChan:
// Return on the first error that occurs
cMixMsg, err := newCmixMsg(g, msg, timestamp, member, rng, m.receptionId, m.grp)
if err != nil {
return nil, err
case info := <-msgChan:
messages = append(messages, info)
}
messages = append(messages, cMixMsg)
}
return messages, nil
}
// newCmixMsg generates a new cMix message to be sent to a group member.
func (m *Manager) newCmixMsg(g gs.Group, msg []byte, timestamp time.Time,
mem group.Member, rng io.Reader) (format.Message, error) {
func newCmixMsg(g gs.Group, msg []byte, timestamp time.Time,
mem group.Member, rng io.Reader, senderId *id.ID, grp *cyclic.Group) (network.TargetedCmixMessage, error) {
// Initialize targeted message
cmixMsg := network.TargetedCmixMessage{
Recipient: mem.ID,
Service: message.Service{
Identifier: g.ID[:],
Tag: catalog.Group,
Metadata: g.ID[:],
},
}
// Create three message layers
cmixMsg := format.NewMessage(m.grp.GetP().ByteLen())
pubMsg, intlMsg, err := newMessageParts(cmixMsg.ContentsSize())
pubMsg, intlMsg, err := newMessageParts(grp.GetP().ByteLen())
if err != nil {
return cmixMsg, err
}
......@@ -168,7 +137,7 @@ func (m *Manager) newCmixMsg(g gs.Group, msg []byte, timestamp time.Time,
}
// Generate key fingerprint
keyFp := group.NewKeyFingerprint(g.Key, salt, mem.ID)
cmixMsg.Fingerprint = group.NewKeyFingerprint(g.Key, salt, mem.ID)
// Generate key
key, err := group.NewKdfKey(g.Key, group.ComputeEpoch(timestamp), salt)
......@@ -177,25 +146,30 @@ func (m *Manager) newCmixMsg(g gs.Group, msg []byte, timestamp time.Time,
}
// Generate internal message
payload := setInternalPayload(intlMsg, timestamp, m.gs.GetUser().ID, msg)
payload := setInternalPayload(intlMsg, timestamp, senderId, msg)
// Encrypt internal message
encryptedPayload := group.Encrypt(key, keyFp, payload)
encryptedPayload := group.Encrypt(key, cmixMsg.Fingerprint, payload)
// Generate public message
publicPayload := setPublicPayload(pubMsg, salt, encryptedPayload)
cmixMsg.Payload = setPublicPayload(pubMsg, salt, encryptedPayload)
// Generate MAC
mac := group.NewMAC(key, encryptedPayload, g.DhKeys[*mem.ID])
// Construct cMix message
cmixMsg.SetContents(publicPayload)
cmixMsg.SetKeyFP(keyFp)
cmixMsg.SetMac(mac)
cmixMsg.Mac = group.NewMAC(key, encryptedPayload, g.DhKeys[*mem.ID])
return cmixMsg, nil
}
// Build the group message ID
func getGroupMessageId(grp *cyclic.Group, groupId, senderId *id.ID, timestamp time.Time, msg []byte) (group.MessageID, error) {
cmixMsg := format.NewMessage(grp.GetP().ByteLen())
_, intlMsg, err := newMessageParts(cmixMsg.ContentsSize())
if err != nil {
return group.MessageID{}, errors.WithMessage(err, "Failed to make message parts for message ID")
}
return group.NewMessageID(groupId, setInternalPayload(intlMsg, timestamp, senderId, msg)), nil
}
// newMessageParts generates a public payload message and the internal payload
// message. An error is returned if the messages cannot fit in the payloadSize.
func newMessageParts(payloadSize int) (publicMsg, internalMsg, error) {
......
......@@ -30,7 +30,7 @@ const (
// ResendRequest allows a groupChat request to be sent again.
func (m Manager) ResendRequest(groupID *id.ID) ([]id.Round, RequestStatus, error) {
g, exists := m.gs.Get(groupID)
g, exists := m.GetGroup(groupID)
if !exists {
return nil, NotSent, errors.Errorf(resendGroupIdErr, groupID)
}
......@@ -94,10 +94,13 @@ func (m Manager) sendRequests(g gs.Group) ([]id.Round, RequestStatus, error) {
errors.Errorf(sendRequestAllErr, len(errs), strings.Join(errs, "\n"))
}
// Convert roundIdMap to List
roundList := roundIdMap2List(roundIDs)
// If some sends returned an error, then return a list of round IDs for the
// successful sends and a list of errors for the failed ones
if len(errs) > 0 {
return roundIdMap2List(roundIDs), PartialSent,
return roundList, PartialSent,
errors.Errorf(sendRequestPartialErr, len(errs), n,
strings.Join(errs, "\n"))
}
......@@ -106,7 +109,7 @@ func (m Manager) sendRequests(g gs.Group) ([]id.Round, RequestStatus, error) {
len(g.Members), g.Name, g.ID)
// If all sends succeeded, return a list of roundIDs
return roundIdMap2List(roundIDs), AllSent, nil
return roundList, AllSent, nil
}
// sendRequest sends the group request to the user via E2E.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment