Skip to content
Snippets Groups Projects
Commit 8b1604d9 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

finished the implementation of the message retreval

parent f40bff36
Branches
Tags
No related merge requests found
...@@ -264,7 +264,7 @@ type TextMessage struct { ...@@ -264,7 +264,7 @@ type TextMessage struct {
Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"` Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"`
// Timestamp (Unix time in seconds) // Timestamp (Unix time in seconds)
// You can use this to display the time when the other user sent the message // You can use this to display the time when the other user sent the message
// TODO Remove this when all messages have timestamps // TODO Done this when all messages have timestamps
Time int64 `protobuf:"varint,4,opt,name=time,proto3" json:"time,omitempty"` Time int64 `protobuf:"varint,4,opt,name=time,proto3" json:"time,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
......
...@@ -12,6 +12,8 @@ type Rounds struct { ...@@ -12,6 +12,8 @@ type Rounds struct {
// maximum period of time a pending historical round query will wait before // maximum period of time a pending historical round query will wait before
// it si transmitted // it si transmitted
HistoricalRoundsPeriod time.Duration HistoricalRoundsPeriod time.Duration
// number of worker threads for retreiving messages from gateways
NumMessageRetrievalWorkers uint
//Length of historical rounds channel buffer //Length of historical rounds channel buffer
HistoricalRoundsBufferLen uint HistoricalRoundsBufferLen uint
...@@ -24,6 +26,7 @@ func GetDefaultRounds() Rounds { ...@@ -24,6 +26,7 @@ func GetDefaultRounds() Rounds {
MaxAttemptsCheckingARound: 5, MaxAttemptsCheckingARound: 5,
MaxHistoricalRounds: 100, MaxHistoricalRounds: 100,
HistoricalRoundsPeriod: 100 * time.Millisecond, HistoricalRoundsPeriod: 100 * time.Millisecond,
NumMessageRetrievalWorkers: 8,
HistoricalRoundsBufferLen: 1000, HistoricalRoundsBufferLen: 1000,
LookupRoundsBufferLen: 2000, LookupRoundsBufferLen: 2000,
......
...@@ -21,7 +21,7 @@ require ( ...@@ -21,7 +21,7 @@ require (
gitlab.com/elixxir/primitives v0.0.0-20200916172343-37503735c7a1 gitlab.com/elixxir/primitives v0.0.0-20200916172343-37503735c7a1
gitlab.com/xx_network/comms v0.0.0-20200915154643-d533291041b7 gitlab.com/xx_network/comms v0.0.0-20200915154643-d533291041b7
gitlab.com/xx_network/crypto v0.0.0-20200812183430-c77a5281c686 gitlab.com/xx_network/crypto v0.0.0-20200812183430-c77a5281c686
gitlab.com/xx_network/primitives v0.0.0-20200915204206-eb0287ed0031 gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/sys v0.0.0-20200828194041-157a740278f4 // indirect golang.org/x/sys v0.0.0-20200828194041-157a740278f4 // indirect
google.golang.org/protobuf v1.25.0 google.golang.org/protobuf v1.25.0
......
package message
import (
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
)
type Bundle struct {
Round id.Round
Messages []format.Message
Finish func()
}
...@@ -16,7 +16,7 @@ func (m *Manager) Checker(roundID id.Round, instance *network.Instance) bool { ...@@ -16,7 +16,7 @@ func (m *Manager) Checker(roundID id.Round, instance *network.Instance) bool {
return false return false
} }
if count == m.params.MaxAttemptsCheckingARound { if count == m.params.MaxAttemptsCheckingARound {
m.p.Remove(roundID) m.p.Done(roundID)
return true return true
} }
// FIXME: Spec has us SETTING processing, but not REMOVING it // FIXME: Spec has us SETTING processing, but not REMOVING it
...@@ -25,7 +25,7 @@ func (m *Manager) Checker(roundID id.Round, instance *network.Instance) bool { ...@@ -25,7 +25,7 @@ func (m *Manager) Checker(roundID id.Round, instance *network.Instance) bool {
// a state that lives with the round info as soon as we know // a state that lives with the round info as soon as we know
// about it that gets updated at different parts...not clear // about it that gets updated at different parts...not clear
// needs to be thought through. // needs to be thought through.
//defer processing.Remove(roundID) //defer processing.Done(roundID)
// TODO: Bloom filter lookup -- return true when we don't have // TODO: Bloom filter lookup -- return true when we don't have
// Go get the round from the round infos, if it exists // Go get the round from the round infos, if it exists
......
...@@ -30,12 +30,13 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c ...@@ -30,12 +30,13 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
rng := m.rngGen.GetStream() rng := m.rngGen.GetStream()
var rounds []uint64 var rounds []uint64
for { done := false
for !done {
shouldProcess := false shouldProcess := false
select { select {
case <-quitCh: case <-quitCh:
rng.Close() rng.Close()
break done = true
case <-ticker.C: case <-ticker.C:
if len(rounds) > 0 { if len(rounds) > 0 {
shouldProcess = true shouldProcess = true
...@@ -74,7 +75,7 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c ...@@ -74,7 +75,7 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
"historical round %d", rounds[i]) "historical round %d", rounds[i])
continue continue
} }
m.p.Remove(id.Round(rounds[i])) m.p.Done(id.Round(rounds[i]))
m.lookupRoundMessages <- roundInfo m.lookupRoundMessages <- roundInfo
} }
} }
......
package rounds package rounds
import ( import (
"fmt"
"gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/comms/client" "gitlab.com/elixxir/comms/client"
"gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/mixmessages"
...@@ -24,10 +26,12 @@ type Manager struct { ...@@ -24,10 +26,12 @@ type Manager struct {
historicalRounds chan id.Round historicalRounds chan id.Round
lookupRoundMessages chan *mixmessages.RoundInfo lookupRoundMessages chan *mixmessages.RoundInfo
messageBundles chan message.Bundle
} }
func New(comms *client.Comms, instance *network.Instance, session *storage.Session, func New(comms *client.Comms, instance *network.Instance, session *storage.Session,
rngGen *fastRNG.StreamGenerator, params params.Rounds) (*Manager, error) { rngGen *fastRNG.StreamGenerator, bundles chan message.Bundle,
params params.Rounds) (*Manager, error) {
return &Manager{ return &Manager{
params: params, params: params,
p: newProcessingRounds(), p: newProcessingRounds(),
...@@ -38,6 +42,7 @@ func New(comms *client.Comms, instance *network.Instance, session *storage.Sessi ...@@ -38,6 +42,7 @@ func New(comms *client.Comms, instance *network.Instance, session *storage.Sessi
historicalRounds: make(chan id.Round, params.HistoricalRoundsBufferLen), historicalRounds: make(chan id.Round, params.HistoricalRoundsBufferLen),
lookupRoundMessages: make(chan *mixmessages.RoundInfo, params.LookupRoundsBufferLen), lookupRoundMessages: make(chan *mixmessages.RoundInfo, params.LookupRoundsBufferLen),
messageBundles: bundles,
}, nil }, nil
} }
...@@ -45,7 +50,16 @@ func (m *Manager) StartProcessors() stoppable.Stoppable { ...@@ -45,7 +50,16 @@ func (m *Manager) StartProcessors() stoppable.Stoppable {
multi := stoppable.NewMulti("Rounds") multi := stoppable.NewMulti("Rounds")
//start the historical rounds thread
historicalRoundsStopper := stoppable.NewSingle("ProcessHistoricalRounds") historicalRoundsStopper := stoppable.NewSingle("ProcessHistoricalRounds")
go m.processHistoricalRounds(m.comms, historicalRoundsStopper.Quit()) go m.processHistoricalRounds(m.comms, historicalRoundsStopper.Quit())
multi.Add(historicalRoundsStopper) multi.Add(historicalRoundsStopper)
//start the message retrieval worker pool
for i := uint(0); i < m.params.NumMessageRetrievalWorkers; i++ {
stopper := stoppable.NewSingle(fmt.Sprintf("Messager Retriever %v", i))
go m.processMessageRetrieval(m.comms, stopper.Quit())
multi.Add(stopper)
}
} }
...@@ -65,8 +65,8 @@ func (pr *processing) Fail(id id.Round) { ...@@ -65,8 +65,8 @@ func (pr *processing) Fail(id id.Round) {
} }
} }
// Remove a round from the processing list // Done a round from the processing list
func (pr *processing) Remove(id id.Round) { func (pr *processing) Done(id id.Round) {
pr.Lock() pr.Lock()
defer pr.Unlock() defer pr.Unlock()
delete(pr.rounds, id) delete(pr.rounds, id)
......
...@@ -3,7 +3,7 @@ package rounds ...@@ -3,7 +3,7 @@ package rounds
// Testing functions for Processing Round structure // Testing functions for Processing Round structure
import ( import (
"gitlab.com/elixxir/client/vendor/gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"testing" "testing"
) )
...@@ -29,7 +29,7 @@ func TestProcessingRounds_IsProcessing(t *testing.T) { ...@@ -29,7 +29,7 @@ func TestProcessingRounds_IsProcessing(t *testing.T) {
func TestProcessingRounds_Remove(t *testing.T) { func TestProcessingRounds_Remove(t *testing.T) {
pr := processing{rounds: make(map[id.Round]struct{})} pr := processing{rounds: make(map[id.Round]struct{})}
pr.rounds[id.Round(10)] = struct{}{} pr.rounds[id.Round(10)] = struct{}{}
pr.Remove(id.Round(10)) pr.Done(id.Round(10))
if _, ok := pr.rounds[id.Round(10)]; ok { if _, ok := pr.rounds[id.Round(10)]; ok {
t.Errorf("Round 10 was not removed from processing list when calling Done") t.Errorf("Round 10 was not removed from processing list when calling Done")
} }
......
...@@ -3,55 +3,105 @@ package rounds ...@@ -3,55 +3,105 @@ package rounds
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/gateway"
"gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/storage/user" "gitlab.com/elixxir/client/storage/user"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
jww "github.com/spf13/jwalterweatherman"
) )
type messageRetrievalComms interface {
GetHost(hostId *id.ID) (*connect.Host, bool)
RequestMessages(host *connect.Host,
message *pb.GetMessages) (*pb.GetMessagesResponse, error)
} }
func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
quitCh <-chan struct{}) {
done := false
for !done {
select {
case <-quitCh:
done = true
case ri := <-m.lookupRoundMessages:
bundle, err := m.getMessagesFromGateway(ri, comms)
if err != nil {
jww.WARN.Printf("Failed to get messages for round %v: %s",
ri.ID, err)
break
}
if len(bundle.Messages) != 0 {
m.messageBundles <- bundle
}
}
}
}
func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo,
comms messageRetrievalComms) (message.Bundle, error) {
func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo) ([]*pb.Slot, error) { rid := id.Round(roundInfo.ID)
gwHost, err := gateway.GetLast(m.comms, roundInfo) //Get the host object for the gateway to send to
gwHost, err := gateway.GetLast(comms, roundInfo)
if err != nil { if err != nil {
return nil, errors.WithMessage(err, "Failed to get Gateway "+ return message.Bundle{}, errors.WithMessage(err, "Failed to get Gateway "+
"to request from") "to request from")
} }
user := m.session.User().GetCryptographicIdentity() user := m.session.User().GetCryptographicIdentity()
userID := user.GetUserID().Bytes() userID := user.GetUserID().Bytes()
// First get message id list // send the request
msgReq := &pb.GetMessages{ msgReq := &pb.GetMessages{
ClientID: userID, ClientID: userID,
RoundID: roundInfo.ID, RoundID: uint64(rid),
} }
msgResp, err := m.comms.RequestMessages(gwHost, msgReq) msgResp, err := comms.RequestMessages(gwHost, msgReq)
// Fail the round if an error occurs so it can be tried again later
if err != nil { if err != nil {
return nil, errors.WithMessagef(err, "Failed to request "+ m.p.Fail(id.Round(roundInfo.ID))
"messages from %s for round %s", gwHost.GetId(), roundInfo.ID) return message.Bundle{}, errors.WithMessagef(err, "Failed to "+
"request messages from %s for round %s", gwHost.GetId(), rid)
} }
// if the gateway doesnt have the round, return an error
// If no error, then we have checked the round and finished processing
ctx.Session.GetCheckedRounds.Check(roundInfo.ID)
network.Processing.Done(roundInfo.ID)
if !msgResp.GetHasRound() { if !msgResp.GetHasRound() {
jww.ERROR.Printf("host %s does not have roundID: %d", m.p.Fail(rid)
gwHost, roundInfo.ID) return message.Bundle{}, errors.Errorf("host %s does not have "+
return nil "roundID: %d", gwHost.String(), rid)
} }
// If there are no messages print a warning. Due to the probabilistic nature
// of the bloom filters, false positives will happen some times
msgs := msgResp.GetMessages() msgs := msgResp.GetMessages()
if msgs == nil || len(msgs) == 0 { if msgs == nil || len(msgs) == 0 {
jww.ERROR.Printf("host %s has no messages for client %s "+ jww.WARN.Printf("host %s has no messages for client %s "+
" in round %d", gwHost, user, roundInfo.ID) " in round %d. This happening every once in a while is normal,"+
return nil " but can be indicitive of a problem if it is consistant", gwHost,
user.GetUserID(), rid)
return message.Bundle{}, nil
} }
return msgs //build the bundle of messages to send to the message processor
bundle := message.Bundle{
Round: rid,
Messages: make([]format.Message, len(msgs)),
Finish: func() {
m.session.GetCheckedRounds().Check(rid)
m.p.Done(rid)
},
}
for i, slot := range msgs {
msg := format.NewMessage(m.session.E2e().GetGroup().GetP().ByteLen())
msg.SetPayloadA(slot.PayloadA)
msg.SetPayloadB(slot.PayloadB)
bundle.Messages[i] = msg
}
return bundle, nil
} }
...@@ -108,7 +108,7 @@ func (s *Store) Add(nid *id.ID, k *cyclic.Int) { ...@@ -108,7 +108,7 @@ func (s *Store) Add(nid *id.ID, k *cyclic.Int) {
} }
} }
// Remove a Node key from the nodes map and save // Done a Node key from the nodes map and save
func (s *Store) Remove(nid *id.ID) { func (s *Store) Remove(nid *id.ID) {
s.mux.Lock() s.mux.Lock()
defer s.mux.Unlock() defer s.mux.Unlock()
......
...@@ -40,7 +40,7 @@ func TestMain(m *testing.M) { ...@@ -40,7 +40,7 @@ func TestMain(m *testing.M) {
os.Exit(runFunc()) os.Exit(runFunc())
} }
// Happy path Add/Remove test // Happy path Add/Done test
func TestStore_AddRemove(t *testing.T) { func TestStore_AddRemove(t *testing.T) {
nodeId := id.NewIdFromString("test", id.Node, t) nodeId := id.NewIdFromString("test", id.Node, t)
key := testStore.grp.NewInt(5) key := testStore.grp.NewInt(5)
......
...@@ -164,7 +164,7 @@ func (s *Session) save() error { ...@@ -164,7 +164,7 @@ func (s *Session) save() error {
} }
/*METHODS*/ /*METHODS*/
// Remove all unused key fingerprints // Done all unused key fingerprints
// Delete this session and its key states from the storage // Delete this session and its key states from the storage
func (s *Session) Delete() { func (s *Session) Delete() {
s.mux.Lock() s.mux.Lock()
......
...@@ -268,10 +268,10 @@ func (mb *MessageBuffer) Succeeded(m interface{}) { ...@@ -268,10 +268,10 @@ func (mb *MessageBuffer) Succeeded(m interface{}) {
mb.mux.Lock() mb.mux.Lock()
defer mb.mux.Unlock() defer mb.mux.Unlock()
// Remove message from buffer // Done message from buffer
delete(mb.processingMessages, h) delete(mb.processingMessages, h)
// Remove message from key value store // Done message from key value store
err := mb.handler.DeleteMessage(mb.kv, makeStoredMessageKey(mb.key, h)) err := mb.handler.DeleteMessage(mb.kv, makeStoredMessageKey(mb.key, h))
if err != nil { if err != nil {
jww.FATAL.Fatalf("Failed to save: %v", err) jww.FATAL.Fatalf("Failed to save: %v", err)
...@@ -292,7 +292,7 @@ func (mb *MessageBuffer) Failed(m interface{}) { ...@@ -292,7 +292,7 @@ func (mb *MessageBuffer) Failed(m interface{}) {
mb.mux.Lock() mb.mux.Lock()
defer mb.mux.Unlock() defer mb.mux.Unlock()
// Remove from "processing" state // Done from "processing" state
delete(mb.processingMessages, h) delete(mb.processingMessages, h)
// Add to "not processed" state // Add to "not processed" state
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment