From 8b1604d95b3a65094c9489c5f03eff10165de0c7 Mon Sep 17 00:00:00 2001
From: Benjamin Wenger <ben@elixxir.ioo>
Date: Mon, 21 Sep 2020 10:57:24 -0700
Subject: [PATCH] finished the implementation of the message retreval

---
 cmixproto/types.pb.go                   |  2 +-
 context/params/rounds.go                |  9 ++-
 go.mod                                  |  2 +-
 network/message/bundle.go               | 12 ++++
 network/rounds/check.go                 |  4 +-
 network/rounds/historical.go            |  7 +-
 network/rounds/manager.go               | 16 ++++-
 network/rounds/processingrounds.go      |  4 +-
 network/rounds/processingrounds_test.go |  4 +-
 network/rounds/retreive.go              | 92 +++++++++++++++++++------
 storage/cmix/store.go                   |  2 +-
 storage/cmix/store_test.go              |  2 +-
 storage/e2e/session.go                  |  2 +-
 storage/utility/messageBuffer.go        |  6 +-
 14 files changed, 122 insertions(+), 42 deletions(-)
 create mode 100644 network/message/bundle.go

diff --git a/cmixproto/types.pb.go b/cmixproto/types.pb.go
index c0b870e2a..09d39016b 100644
--- a/cmixproto/types.pb.go
+++ b/cmixproto/types.pb.go
@@ -264,7 +264,7 @@ type TextMessage struct {
 	Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"`
 	// Timestamp (Unix time in seconds)
 	// You can use this to display the time when the other user sent the message
-	// TODO Remove this when all messages have timestamps
+	// TODO Done this when all messages have timestamps
 	Time                 int64    `protobuf:"varint,4,opt,name=time,proto3" json:"time,omitempty"`
 	XXX_NoUnkeyedLiteral struct{} `json:"-"`
 	XXX_unrecognized     []byte   `json:"-"`
diff --git a/context/params/rounds.go b/context/params/rounds.go
index 75de16707..9cd03e4c7 100644
--- a/context/params/rounds.go
+++ b/context/params/rounds.go
@@ -12,6 +12,8 @@ type Rounds struct {
 	// maximum period of time a pending historical round query will wait before
 	// it si transmitted
 	HistoricalRoundsPeriod time.Duration
+	// number of worker threads for retreiving messages from gateways
+	NumMessageRetrievalWorkers uint
 
 	//Length of historical rounds channel buffer
 	HistoricalRoundsBufferLen uint
@@ -21,9 +23,10 @@ type Rounds struct {
 
 func GetDefaultRounds() Rounds {
 	return Rounds{
-		MaxAttemptsCheckingARound: 5,
-		MaxHistoricalRounds:       100,
-		HistoricalRoundsPeriod:    100 * time.Millisecond,
+		MaxAttemptsCheckingARound:  5,
+		MaxHistoricalRounds:        100,
+		HistoricalRoundsPeriod:     100 * time.Millisecond,
+		NumMessageRetrievalWorkers: 8,
 
 		HistoricalRoundsBufferLen: 1000,
 		LookupRoundsBufferLen:     2000,
diff --git a/go.mod b/go.mod
index f69405f7d..8e9b9723e 100644
--- a/go.mod
+++ b/go.mod
@@ -21,7 +21,7 @@ require (
 	gitlab.com/elixxir/primitives v0.0.0-20200916172343-37503735c7a1
 	gitlab.com/xx_network/comms v0.0.0-20200915154643-d533291041b7
 	gitlab.com/xx_network/crypto v0.0.0-20200812183430-c77a5281c686
-	gitlab.com/xx_network/primitives v0.0.0-20200915204206-eb0287ed0031
+	gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2
 	golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
 	golang.org/x/sys v0.0.0-20200828194041-157a740278f4 // indirect
 	google.golang.org/protobuf v1.25.0
diff --git a/network/message/bundle.go b/network/message/bundle.go
new file mode 100644
index 000000000..af35baf9e
--- /dev/null
+++ b/network/message/bundle.go
@@ -0,0 +1,12 @@
+package message
+
+import (
+	"gitlab.com/elixxir/primitives/format"
+	"gitlab.com/xx_network/primitives/id"
+)
+
+type Bundle struct {
+	Round    id.Round
+	Messages []format.Message
+	Finish   func()
+}
diff --git a/network/rounds/check.go b/network/rounds/check.go
index f583376e9..31166b93c 100644
--- a/network/rounds/check.go
+++ b/network/rounds/check.go
@@ -16,7 +16,7 @@ func (m *Manager) Checker(roundID id.Round, instance *network.Instance) bool {
 		return false
 	}
 	if count == m.params.MaxAttemptsCheckingARound {
-		m.p.Remove(roundID)
+		m.p.Done(roundID)
 		return true
 	}
 	// FIXME: Spec has us SETTING processing, but not REMOVING it
@@ -25,7 +25,7 @@ func (m *Manager) Checker(roundID id.Round, instance *network.Instance) bool {
 	// a state that lives with the round info as soon as we know
 	// about it that gets updated at different parts...not clear
 	// needs to be thought through.
-	//defer processing.Remove(roundID)
+	//defer processing.Done(roundID)
 
 	// TODO: Bloom filter lookup -- return true when we don't have
 	// Go get the round from the round infos, if it exists
diff --git a/network/rounds/historical.go b/network/rounds/historical.go
index 5d0a9b48c..1fa672216 100644
--- a/network/rounds/historical.go
+++ b/network/rounds/historical.go
@@ -30,12 +30,13 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
 	rng := m.rngGen.GetStream()
 	var rounds []uint64
 
-	for {
+	done := false
+	for !done {
 		shouldProcess := false
 		select {
 		case <-quitCh:
 			rng.Close()
-			break
+			done = true
 		case <-ticker.C:
 			if len(rounds) > 0 {
 				shouldProcess = true
@@ -74,7 +75,7 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
 					"historical round %d", rounds[i])
 				continue
 			}
-			m.p.Remove(id.Round(rounds[i]))
+			m.p.Done(id.Round(rounds[i]))
 			m.lookupRoundMessages <- roundInfo
 		}
 	}
diff --git a/network/rounds/manager.go b/network/rounds/manager.go
index 4e9060d21..c03b97cc9 100644
--- a/network/rounds/manager.go
+++ b/network/rounds/manager.go
@@ -1,9 +1,11 @@
 package rounds
 
 import (
+	"fmt"
 	"gitlab.com/elixxir/client/context"
 	"gitlab.com/elixxir/client/context/params"
 	"gitlab.com/elixxir/client/context/stoppable"
+	"gitlab.com/elixxir/client/network/message"
 	"gitlab.com/elixxir/client/storage"
 	"gitlab.com/elixxir/comms/client"
 	"gitlab.com/elixxir/comms/mixmessages"
@@ -24,10 +26,12 @@ type Manager struct {
 
 	historicalRounds    chan id.Round
 	lookupRoundMessages chan *mixmessages.RoundInfo
+	messageBundles      chan message.Bundle
 }
 
 func New(comms *client.Comms, instance *network.Instance, session *storage.Session,
-	rngGen *fastRNG.StreamGenerator, params params.Rounds) (*Manager, error) {
+	rngGen *fastRNG.StreamGenerator, bundles chan message.Bundle,
+	params params.Rounds) (*Manager, error) {
 	return &Manager{
 		params:   params,
 		p:        newProcessingRounds(),
@@ -38,6 +42,7 @@ func New(comms *client.Comms, instance *network.Instance, session *storage.Sessi
 
 		historicalRounds:    make(chan id.Round, params.HistoricalRoundsBufferLen),
 		lookupRoundMessages: make(chan *mixmessages.RoundInfo, params.LookupRoundsBufferLen),
+		messageBundles:      bundles,
 	}, nil
 }
 
@@ -45,7 +50,16 @@ func (m *Manager) StartProcessors() stoppable.Stoppable {
 
 	multi := stoppable.NewMulti("Rounds")
 
+	//start the historical rounds thread
 	historicalRoundsStopper := stoppable.NewSingle("ProcessHistoricalRounds")
 	go m.processHistoricalRounds(m.comms, historicalRoundsStopper.Quit())
 	multi.Add(historicalRoundsStopper)
+
+	//start the message retrieval worker pool
+	for i := uint(0); i < m.params.NumMessageRetrievalWorkers; i++ {
+		stopper := stoppable.NewSingle(fmt.Sprintf("Messager Retriever %v", i))
+		go m.processMessageRetrieval(m.comms, stopper.Quit())
+		multi.Add(stopper)
+	}
+
 }
diff --git a/network/rounds/processingrounds.go b/network/rounds/processingrounds.go
index 8b558f122..829088dc4 100644
--- a/network/rounds/processingrounds.go
+++ b/network/rounds/processingrounds.go
@@ -65,8 +65,8 @@ func (pr *processing) Fail(id id.Round) {
 	}
 }
 
-// Remove a round from the processing list
-func (pr *processing) Remove(id id.Round) {
+// Done a round from the processing list
+func (pr *processing) Done(id id.Round) {
 	pr.Lock()
 	defer pr.Unlock()
 	delete(pr.rounds, id)
diff --git a/network/rounds/processingrounds_test.go b/network/rounds/processingrounds_test.go
index 5e4fb25ee..763a4948a 100644
--- a/network/rounds/processingrounds_test.go
+++ b/network/rounds/processingrounds_test.go
@@ -3,7 +3,7 @@ package rounds
 // Testing functions for Processing Round structure
 
 import (
-	"gitlab.com/elixxir/client/vendor/gitlab.com/xx_network/primitives/id"
+	"gitlab.com/xx_network/primitives/id"
 	"testing"
 )
 
@@ -29,7 +29,7 @@ func TestProcessingRounds_IsProcessing(t *testing.T) {
 func TestProcessingRounds_Remove(t *testing.T) {
 	pr := processing{rounds: make(map[id.Round]struct{})}
 	pr.rounds[id.Round(10)] = struct{}{}
-	pr.Remove(id.Round(10))
+	pr.Done(id.Round(10))
 	if _, ok := pr.rounds[id.Round(10)]; ok {
 		t.Errorf("Round 10 was not removed from processing list when calling Done")
 	}
diff --git a/network/rounds/retreive.go b/network/rounds/retreive.go
index 828927a76..964ae30e8 100644
--- a/network/rounds/retreive.go
+++ b/network/rounds/retreive.go
@@ -3,55 +3,105 @@ package rounds
 import (
 	"github.com/pkg/errors"
 	"gitlab.com/elixxir/client/network/gateway"
+	"gitlab.com/elixxir/client/network/message"
 	"gitlab.com/elixxir/client/storage/user"
+	"gitlab.com/elixxir/primitives/format"
+	"gitlab.com/xx_network/comms/connect"
 	"gitlab.com/xx_network/primitives/id"
 	pb "gitlab.com/elixxir/comms/mixmessages"
+	jww "github.com/spf13/jwalterweatherman"
 )
+
+type messageRetrievalComms interface {
+	GetHost(hostId *id.ID) (*connect.Host, bool)
+	RequestMessages(host *connect.Host,
+		message *pb.GetMessages) (*pb.GetMessagesResponse, error)
 }
 
+func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
+	quitCh <-chan struct{}) {
 
+	done := false
+	for !done {
+		select {
+		case <-quitCh:
+			done = true
+		case ri := <-m.lookupRoundMessages:
+			bundle, err := m.getMessagesFromGateway(ri, comms)
+			if err != nil {
+				jww.WARN.Printf("Failed to get messages for round %v: %s",
+					ri.ID, err)
+				break
+			}
+			if len(bundle.Messages) != 0 {
+				m.messageBundles <- bundle
+			}
+		}
+	}
 
+}
 
-func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo) ([]*pb.Slot, error) {
+func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo,
+	comms messageRetrievalComms) (message.Bundle, error) {
 
-	gwHost, err := gateway.GetLast(m.comms, roundInfo)
+	rid := id.Round(roundInfo.ID)
+
+	//Get the host object for the gateway to send to
+	gwHost, err := gateway.GetLast(comms, roundInfo)
 	if err != nil {
-		return nil, errors.WithMessage(err, "Failed to get Gateway "+
+		return message.Bundle{}, errors.WithMessage(err, "Failed to get Gateway "+
 			"to request from")
 	}
 
 	user := m.session.User().GetCryptographicIdentity()
 	userID := user.GetUserID().Bytes()
 
-	// First get message id list
+	// send the request
 	msgReq := &pb.GetMessages{
 		ClientID: userID,
-		RoundID:  roundInfo.ID,
+		RoundID:  uint64(rid),
 	}
-	msgResp, err := m.comms.RequestMessages(gwHost, msgReq)
+	msgResp, err := comms.RequestMessages(gwHost, msgReq)
+	// Fail the round if an error occurs so it can be tried again later
 	if err != nil {
-		return nil, errors.WithMessagef(err, "Failed to request "+
-			"messages from %s for round %s", gwHost.GetId(), roundInfo.ID)
+		m.p.Fail(id.Round(roundInfo.ID))
+		return message.Bundle{}, errors.WithMessagef(err, "Failed to "+
+			"request messages from %s for round %s", gwHost.GetId(), rid)
 	}
-
-	// If no error, then we have checked the round and finished processing
-	ctx.Session.GetCheckedRounds.Check(roundInfo.ID)
-	network.Processing.Done(roundInfo.ID)
-
+	// if the gateway doesnt have the round, return an error
 	if !msgResp.GetHasRound() {
-		jww.ERROR.Printf("host %s does not have roundID: %d",
-			gwHost, roundInfo.ID)
-		return nil
+		m.p.Fail(rid)
+		return message.Bundle{}, errors.Errorf("host %s does not have "+
+			"roundID: %d", gwHost.String(), rid)
 	}
 
+	// If there are no messages print a warning. Due to the probabilistic nature
+	// of the bloom filters, false positives will happen some times
 	msgs := msgResp.GetMessages()
-
 	if msgs == nil || len(msgs) == 0 {
-		jww.ERROR.Printf("host %s has no messages for client %s "+
-			" in round %d", gwHost, user, roundInfo.ID)
-		return nil
+		jww.WARN.Printf("host %s has no messages for client %s "+
+			" in round %d. This happening every once in a while is normal,"+
+			" but can be indicitive of a problem if it is consistant", gwHost,
+			user.GetUserID(), rid)
+		return message.Bundle{}, nil
 	}
 
-	return msgs
+	//build the bundle of messages to send to the message processor
+	bundle := message.Bundle{
+		Round:    rid,
+		Messages: make([]format.Message, len(msgs)),
+		Finish: func() {
+			m.session.GetCheckedRounds().Check(rid)
+			m.p.Done(rid)
+		},
+	}
+
+	for i, slot := range msgs {
+		msg := format.NewMessage(m.session.E2e().GetGroup().GetP().ByteLen())
+		msg.SetPayloadA(slot.PayloadA)
+		msg.SetPayloadB(slot.PayloadB)
+		bundle.Messages[i] = msg
+	}
 
+	return bundle, nil
 }
diff --git a/storage/cmix/store.go b/storage/cmix/store.go
index 0dd31917f..9a34000ec 100644
--- a/storage/cmix/store.go
+++ b/storage/cmix/store.go
@@ -108,7 +108,7 @@ func (s *Store) Add(nid *id.ID, k *cyclic.Int) {
 	}
 }
 
-// Remove a Node key from the nodes map and save
+// Done a Node key from the nodes map and save
 func (s *Store) Remove(nid *id.ID) {
 	s.mux.Lock()
 	defer s.mux.Unlock()
diff --git a/storage/cmix/store_test.go b/storage/cmix/store_test.go
index 0076a41c9..df305ebb3 100644
--- a/storage/cmix/store_test.go
+++ b/storage/cmix/store_test.go
@@ -40,7 +40,7 @@ func TestMain(m *testing.M) {
 	os.Exit(runFunc())
 }
 
-// Happy path Add/Remove test
+// Happy path Add/Done test
 func TestStore_AddRemove(t *testing.T) {
 	nodeId := id.NewIdFromString("test", id.Node, t)
 	key := testStore.grp.NewInt(5)
diff --git a/storage/e2e/session.go b/storage/e2e/session.go
index f1cf7afee..4ccf315eb 100644
--- a/storage/e2e/session.go
+++ b/storage/e2e/session.go
@@ -164,7 +164,7 @@ func (s *Session) save() error {
 }
 
 /*METHODS*/
-// Remove all unused key fingerprints
+// Done all unused key fingerprints
 // Delete this session and its key states from the storage
 func (s *Session) Delete() {
 	s.mux.Lock()
diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go
index e79d29e59..fe8d376b2 100644
--- a/storage/utility/messageBuffer.go
+++ b/storage/utility/messageBuffer.go
@@ -268,10 +268,10 @@ func (mb *MessageBuffer) Succeeded(m interface{}) {
 	mb.mux.Lock()
 	defer mb.mux.Unlock()
 
-	// Remove message from buffer
+	// Done message from buffer
 	delete(mb.processingMessages, h)
 
-	// Remove message from key value store
+	// Done message from key value store
 	err := mb.handler.DeleteMessage(mb.kv, makeStoredMessageKey(mb.key, h))
 	if err != nil {
 		jww.FATAL.Fatalf("Failed to save: %v", err)
@@ -292,7 +292,7 @@ func (mb *MessageBuffer) Failed(m interface{}) {
 	mb.mux.Lock()
 	defer mb.mux.Unlock()
 
-	// Remove from "processing" state
+	// Done from "processing" state
 	delete(mb.processingMessages, h)
 
 	// Add to "not processed" state
-- 
GitLab