diff --git a/groupChat/manager.go b/groupChat/manager.go index 7c038c16070820ebdfe6e746c0978e6992cb424d..b903547ddd76a79ba9baa19e39d597d04624d2ac 100644 --- a/groupChat/manager.go +++ b/groupChat/manager.go @@ -13,7 +13,6 @@ import ( "gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/e2e" gs "gitlab.com/elixxir/client/groupChat/groupStore" - "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/network" "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/storage/versioned" @@ -33,7 +32,6 @@ const ( // Manager handles the list of groups a user is a part of. type Manager struct { e2e e2e.Handler - net interfaces.NetworkManager receptionId *id.ID rng *fastRNG.StreamGenerator @@ -46,13 +44,13 @@ type Manager struct { } // NewManager creates a new group chat manager -func NewManager(services network.Manager, e2e e2e.Handler, net interfaces.NetworkManager, receptionId *id.ID, - rng *fastRNG.StreamGenerator, grp *cyclic.Group, userDhKey *cyclic.Int, - kv *versioned.KV, requestFunc RequestCallback, receiveFunc ReceiveCallback) (*Manager, error) { +func NewManager(services network.Manager, e2e e2e.Handler, receptionId *id.ID, + rng *fastRNG.StreamGenerator, grp *cyclic.Group, kv *versioned.KV, + requestFunc RequestCallback, receiveFunc ReceiveCallback) (*Manager, error) { // Load the group chat storage or create one if one does not exist gStore, err := gs.NewOrLoadStore( - kv, group.Member{ID: receptionId, DhKey: userDhKey}) + kv, group.Member{ID: receptionId, DhKey: e2e.GetDefaultHistoricalDHPubkey()}) if err != nil { return nil, errors.Errorf(newGroupStoreErr, err) } @@ -60,7 +58,6 @@ func NewManager(services network.Manager, e2e e2e.Handler, net interfaces.Networ // Define the manager object m := &Manager{ e2e: e2e, - net: net, rng: rng, receptionId: receptionId, grp: grp, @@ -87,16 +84,13 @@ func NewManager(services network.Manager, e2e e2e.Handler, net interfaces.Networ continue } - err = m.JoinGroup(g) - if err != nil { - return nil, err - } + m.joinGroup(g) } return m, nil } -// JoinGroup adds the group to the list of group chats the user is a part of. +// JoinGroup adds the group to storage, and enables requisite services. // An error is returned if the user is already part of the group or if the // maximum number of groups have already been joined. func (m Manager) JoinGroup(g gs.Group) error { @@ -104,15 +98,19 @@ func (m Manager) JoinGroup(g gs.Group) error { return errors.Errorf(joinGroupErr, g.ID, err) } + m.joinGroup(g) + jww.DEBUG.Printf("Joined group %q with ID %s.", g.Name, g.ID) + return nil +} + +// joinGroup adds the group services +func (m Manager) joinGroup(g gs.Group) { newService := message.Service{ Identifier: g.ID[:], Tag: catalog.Group, Metadata: g.ID[:], } m.services.AddService(m.receptionId, newService, &receptionProcessor{m: &m, g: g}) - - jww.DEBUG.Printf("Joined group %q with ID %s.", g.Name, g.ID) - return nil } // LeaveGroup removes a group from a list of groups the user is a part of. diff --git a/groupChat/receive.go b/groupChat/receive.go index 9bebd6bba907a056eb8ded7f926f09531ae44138..faf1ddd9548b0ecb56436822a855d281377792cb 100644 --- a/groupChat/receive.go +++ b/groupChat/receive.go @@ -76,7 +76,7 @@ func (p *receptionProcessor) Process(message format.Message, receptionID recepti p.g.ID, result.Timestamp) // If the message was read correctly, send it to the callback - go p.m.receiveFunc(result) + p.m.receiveFunc(result) } // decryptMessage decrypts the group message payload and returns its message ID, diff --git a/groupChat/receiveRequest.go b/groupChat/receiveRequest.go index 3149c122048a8a65415e17786dac372b55941e74..4adbdb96f92e5eb1c24710942245c786ff60c858 100644 --- a/groupChat/receiveRequest.go +++ b/groupChat/receiveRequest.go @@ -47,7 +47,7 @@ func (l *requestListener) Hear(item receive.Message) { jww.DEBUG.Printf("Received group request for "+ "group %s with ID %s.", g.Name, g.ID) - go l.m.requestFunc(g) + l.m.requestFunc(g) } } diff --git a/groupChat/send.go b/groupChat/send.go index 706706b57e20c03f67e461bfa4d3f19f25fb4f75..29761e7adf53abe3b1bb039cf6e7f1bfdc101465 100644 --- a/groupChat/send.go +++ b/groupChat/send.go @@ -66,7 +66,7 @@ func (m *Manager) Send(groupID *id.ID, message []byte) (id.Round, time.Time, gro // Send all the groupMessages param := network.GetDefaultCMIXParams() param.DebugTag = "group.Message" - rid, _, err := m.net.SendManyCMIX(groupMessages, param) + rid, _, err := m.services.SendManyCMIX(groupMessages, param) if err != nil { return 0, time.Time{}, group.MessageID{}, errors.Errorf(sendManyCmixErr, m.receptionId, groupID, err) @@ -80,45 +80,25 @@ func (m *Manager) Send(groupID *id.ID, message []byte) (id.Round, time.Time, gro // newMessages quickly builds messages for all group chat members in multiple threads func (m *Manager) newMessages(g gs.Group, msg []byte, timestamp time.Time) ( []network.TargetedCmixMessage, error) { - // Create channels to receive messages and errors on - msgChan := make(chan network.TargetedCmixMessage, len(g.Members)-1) - errChan := make(chan error, len(g.Members)-1) + + // Create list of cMix messages + messages := make([]network.TargetedCmixMessage, 0, len(g.Members)) + rng := m.rng.GetStream() + defer rng.Close() // Create cMix messages in parallel - for i, member := range g.Members { + for _, member := range g.Members { // Do not send to the sender if m.receptionId.Cmp(member.ID) { continue } - // Start thread to build cMix message - go func(member group.Member, i int) { - // Create new stream - rng := m.rng.GetStream() - defer rng.Close() - - // Add cMix message to list - cMixMsg, err := newCmixMsg(g, msg, timestamp, member, rng, m.receptionId, m.grp) - if err != nil { - errChan <- errors.Errorf(newCmixErr, i, member.ID, g.ID, err) - } - msgChan <- cMixMsg - - }(member, i) - } - - // Create list of cMix messages - messages := make([]network.TargetedCmixMessage, 0, len(g.Members)) - - // Wait for messages or errors - for len(messages) < len(g.Members)-1 { - select { - case err := <-errChan: - // Return on the first error that occurs + // Add cMix message to list + cMixMsg, err := newCmixMsg(g, msg, timestamp, member, rng, m.receptionId, m.grp) + if err != nil { return nil, err - case info := <-msgChan: - messages = append(messages, info) } + messages = append(messages, cMixMsg) } return messages, nil