Skip to content
Snippets Groups Projects
Commit d6d8e637 authored by Jake Taylor's avatar Jake Taylor
Browse files

improvements

parent f5fa0d0c
Branches
Tags
4 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast,!194refactor send group chat code
...@@ -13,7 +13,6 @@ import ( ...@@ -13,7 +13,6 @@ import (
"gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/e2e" "gitlab.com/elixxir/client/e2e"
gs "gitlab.com/elixxir/client/groupChat/groupStore" gs "gitlab.com/elixxir/client/groupChat/groupStore"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/network" "gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
...@@ -33,7 +32,6 @@ const ( ...@@ -33,7 +32,6 @@ const (
// Manager handles the list of groups a user is a part of. // Manager handles the list of groups a user is a part of.
type Manager struct { type Manager struct {
e2e e2e.Handler e2e e2e.Handler
net interfaces.NetworkManager
receptionId *id.ID receptionId *id.ID
rng *fastRNG.StreamGenerator rng *fastRNG.StreamGenerator
...@@ -46,13 +44,13 @@ type Manager struct { ...@@ -46,13 +44,13 @@ type Manager struct {
} }
// NewManager creates a new group chat manager // NewManager creates a new group chat manager
func NewManager(services network.Manager, e2e e2e.Handler, net interfaces.NetworkManager, receptionId *id.ID, func NewManager(services network.Manager, e2e e2e.Handler, receptionId *id.ID,
rng *fastRNG.StreamGenerator, grp *cyclic.Group, userDhKey *cyclic.Int, rng *fastRNG.StreamGenerator, grp *cyclic.Group, kv *versioned.KV,
kv *versioned.KV, requestFunc RequestCallback, receiveFunc ReceiveCallback) (*Manager, error) { requestFunc RequestCallback, receiveFunc ReceiveCallback) (*Manager, error) {
// Load the group chat storage or create one if one does not exist // Load the group chat storage or create one if one does not exist
gStore, err := gs.NewOrLoadStore( gStore, err := gs.NewOrLoadStore(
kv, group.Member{ID: receptionId, DhKey: userDhKey}) kv, group.Member{ID: receptionId, DhKey: e2e.GetDefaultHistoricalDHPubkey()})
if err != nil { if err != nil {
return nil, errors.Errorf(newGroupStoreErr, err) return nil, errors.Errorf(newGroupStoreErr, err)
} }
...@@ -60,7 +58,6 @@ func NewManager(services network.Manager, e2e e2e.Handler, net interfaces.Networ ...@@ -60,7 +58,6 @@ func NewManager(services network.Manager, e2e e2e.Handler, net interfaces.Networ
// Define the manager object // Define the manager object
m := &Manager{ m := &Manager{
e2e: e2e, e2e: e2e,
net: net,
rng: rng, rng: rng,
receptionId: receptionId, receptionId: receptionId,
grp: grp, grp: grp,
...@@ -87,16 +84,13 @@ func NewManager(services network.Manager, e2e e2e.Handler, net interfaces.Networ ...@@ -87,16 +84,13 @@ func NewManager(services network.Manager, e2e e2e.Handler, net interfaces.Networ
continue continue
} }
err = m.JoinGroup(g) m.joinGroup(g)
if err != nil {
return nil, err
}
} }
return m, nil return m, nil
} }
// JoinGroup adds the group to the list of group chats the user is a part of. // JoinGroup adds the group to storage, and enables requisite services.
// An error is returned if the user is already part of the group or if the // An error is returned if the user is already part of the group or if the
// maximum number of groups have already been joined. // maximum number of groups have already been joined.
func (m Manager) JoinGroup(g gs.Group) error { func (m Manager) JoinGroup(g gs.Group) error {
...@@ -104,15 +98,19 @@ func (m Manager) JoinGroup(g gs.Group) error { ...@@ -104,15 +98,19 @@ func (m Manager) JoinGroup(g gs.Group) error {
return errors.Errorf(joinGroupErr, g.ID, err) return errors.Errorf(joinGroupErr, g.ID, err)
} }
m.joinGroup(g)
jww.DEBUG.Printf("Joined group %q with ID %s.", g.Name, g.ID)
return nil
}
// joinGroup adds the group services
func (m Manager) joinGroup(g gs.Group) {
newService := message.Service{ newService := message.Service{
Identifier: g.ID[:], Identifier: g.ID[:],
Tag: catalog.Group, Tag: catalog.Group,
Metadata: g.ID[:], Metadata: g.ID[:],
} }
m.services.AddService(m.receptionId, newService, &receptionProcessor{m: &m, g: g}) m.services.AddService(m.receptionId, newService, &receptionProcessor{m: &m, g: g})
jww.DEBUG.Printf("Joined group %q with ID %s.", g.Name, g.ID)
return nil
} }
// LeaveGroup removes a group from a list of groups the user is a part of. // LeaveGroup removes a group from a list of groups the user is a part of.
......
...@@ -76,7 +76,7 @@ func (p *receptionProcessor) Process(message format.Message, receptionID recepti ...@@ -76,7 +76,7 @@ func (p *receptionProcessor) Process(message format.Message, receptionID recepti
p.g.ID, result.Timestamp) p.g.ID, result.Timestamp)
// If the message was read correctly, send it to the callback // If the message was read correctly, send it to the callback
go p.m.receiveFunc(result) p.m.receiveFunc(result)
} }
// decryptMessage decrypts the group message payload and returns its message ID, // decryptMessage decrypts the group message payload and returns its message ID,
......
...@@ -47,7 +47,7 @@ func (l *requestListener) Hear(item receive.Message) { ...@@ -47,7 +47,7 @@ func (l *requestListener) Hear(item receive.Message) {
jww.DEBUG.Printf("Received group request for "+ jww.DEBUG.Printf("Received group request for "+
"group %s with ID %s.", g.Name, g.ID) "group %s with ID %s.", g.Name, g.ID)
go l.m.requestFunc(g) l.m.requestFunc(g)
} }
} }
......
...@@ -66,7 +66,7 @@ func (m *Manager) Send(groupID *id.ID, message []byte) (id.Round, time.Time, gro ...@@ -66,7 +66,7 @@ func (m *Manager) Send(groupID *id.ID, message []byte) (id.Round, time.Time, gro
// Send all the groupMessages // Send all the groupMessages
param := network.GetDefaultCMIXParams() param := network.GetDefaultCMIXParams()
param.DebugTag = "group.Message" param.DebugTag = "group.Message"
rid, _, err := m.net.SendManyCMIX(groupMessages, param) rid, _, err := m.services.SendManyCMIX(groupMessages, param)
if err != nil { if err != nil {
return 0, time.Time{}, group.MessageID{}, return 0, time.Time{}, group.MessageID{},
errors.Errorf(sendManyCmixErr, m.receptionId, groupID, err) errors.Errorf(sendManyCmixErr, m.receptionId, groupID, err)
...@@ -80,45 +80,25 @@ func (m *Manager) Send(groupID *id.ID, message []byte) (id.Round, time.Time, gro ...@@ -80,45 +80,25 @@ func (m *Manager) Send(groupID *id.ID, message []byte) (id.Round, time.Time, gro
// newMessages quickly builds messages for all group chat members in multiple threads // newMessages quickly builds messages for all group chat members in multiple threads
func (m *Manager) newMessages(g gs.Group, msg []byte, timestamp time.Time) ( func (m *Manager) newMessages(g gs.Group, msg []byte, timestamp time.Time) (
[]network.TargetedCmixMessage, error) { []network.TargetedCmixMessage, error) {
// Create channels to receive messages and errors on
msgChan := make(chan network.TargetedCmixMessage, len(g.Members)-1) // Create list of cMix messages
errChan := make(chan error, len(g.Members)-1) messages := make([]network.TargetedCmixMessage, 0, len(g.Members))
rng := m.rng.GetStream()
defer rng.Close()
// Create cMix messages in parallel // Create cMix messages in parallel
for i, member := range g.Members { for _, member := range g.Members {
// Do not send to the sender // Do not send to the sender
if m.receptionId.Cmp(member.ID) { if m.receptionId.Cmp(member.ID) {
continue continue
} }
// Start thread to build cMix message
go func(member group.Member, i int) {
// Create new stream
rng := m.rng.GetStream()
defer rng.Close()
// Add cMix message to list // Add cMix message to list
cMixMsg, err := newCmixMsg(g, msg, timestamp, member, rng, m.receptionId, m.grp) cMixMsg, err := newCmixMsg(g, msg, timestamp, member, rng, m.receptionId, m.grp)
if err != nil { if err != nil {
errChan <- errors.Errorf(newCmixErr, i, member.ID, g.ID, err)
}
msgChan <- cMixMsg
}(member, i)
}
// Create list of cMix messages
messages := make([]network.TargetedCmixMessage, 0, len(g.Members))
// Wait for messages or errors
for len(messages) < len(g.Members)-1 {
select {
case err := <-errChan:
// Return on the first error that occurs
return nil, err return nil, err
case info := <-msgChan:
messages = append(messages, info)
} }
messages = append(messages, cMixMsg)
} }
return messages, nil return messages, nil
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment