Skip to content
Snippets Groups Projects
Commit 2dce69c4 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Merge branch 'jonah/lastGoodUpdates' into 'release'

Revert "Revert "Merge branch 'xx-4509/batch-pickup' into 'release'""

See merge request !562
parents 57d2936c 8b6834a7
No related branches found
No related tags found
3 merge requests!562Revert "Revert "Merge branch 'xx-4509/batch-pickup' into 'release'"",!560Revert "Revert "Merge branch 'xx-4509/batch-pickup' into 'release'"",!515Release
......@@ -295,6 +295,10 @@ Flags:
--verify-sends Ensure successful message sending by checking for round completion
--waitTimeout uint The number of seconds to wait for messages to arrive (default 15)
-w, --writeContact string Write contact information, if any, to this file, defaults to stdout (default "-")
--batchMessagePickup Enables alternate message pickup logic which processes batches
--batchPickupDelay int Sets the delay (in MS) before a batch pickup request is sent, even if the batch is not full (default 50)
--batchPickupTimeout int Sets the timeout duration (in MS) sent to gateways that proxy batch message pickup requests (default 250)
--maxPickupBatchSize int Set the maximum number of requests in a batch pickup message (default 20)
Use "client [command] --help" for more information about a command.
```
......
......@@ -71,6 +71,10 @@ const (
forceHistoricalRoundsFlag = "forceHistoricalRounds"
slowPollingFlag = "slowPolling"
forceMessagePickupRetryFlag = "forceMessagePickupRetry"
batchMessagePickupFlag = "batchMessagePickup"
maxPickupBatchSizeFlag = "maxPickupBatchSize"
batchPickupDelayFlag = "batchPickupDelay"
batchPickupTimeoutFlag = "batchPickupTimeout"
// E2E Params
e2eMinKeysFlag = "e2eMinKeys"
......
......@@ -457,6 +457,11 @@ func initParams() (xxdk.CMIXParams, xxdk.E2EParams) {
cmixParams.Network.WhitelistedGateways = viper.GetStringSlice(gatewayWhitelistFlag)
cmixParams.Network.Pickup.BatchMessageRetrieval = viper.GetBool(batchMessagePickupFlag)
cmixParams.Network.Pickup.MaxBatchSize = viper.GetInt(maxPickupBatchSizeFlag)
cmixParams.Network.Pickup.BatchPickupTimeout = viper.GetInt(batchPickupTimeoutFlag)
cmixParams.Network.Pickup.BatchDelay = viper.GetInt(batchPickupDelayFlag)
return cmixParams, e2eParams
}
......@@ -1131,6 +1136,26 @@ func init() {
viper.BindPFlag(forceMessagePickupRetryFlag,
rootCmd.Flags().Lookup(forceMessagePickupRetryFlag))
rootCmd.PersistentFlags().Bool(batchMessagePickupFlag, false,
"Enables alternate message pickup logic which processes batches")
viper.BindPFlag(batchMessagePickupFlag,
rootCmd.PersistentFlags().Lookup(batchMessagePickupFlag))
rootCmd.PersistentFlags().Int(maxPickupBatchSizeFlag, 20,
"Set the maximum number of requests in a batch pickup message")
viper.BindPFlag(maxPickupBatchSizeFlag,
rootCmd.PersistentFlags().Lookup(maxPickupBatchSizeFlag))
rootCmd.PersistentFlags().Int(batchPickupDelayFlag, 50,
"Sets the delay (in MS) before a batch pickup request is sent, even if the batch is not full")
viper.BindPFlag(batchPickupDelayFlag,
rootCmd.PersistentFlags().Lookup(batchPickupDelayFlag))
rootCmd.PersistentFlags().Int(batchPickupTimeoutFlag, 250,
"Sets the timeout duration (in MS) sent to gateways that proxy batch message pickup requests")
viper.BindPFlag(batchPickupTimeoutFlag,
rootCmd.PersistentFlags().Lookup(batchPickupTimeoutFlag))
// E2E Params
defaultE2EParams := session.GetDefaultParams()
rootCmd.Flags().UintP(e2eMinKeysFlag,
......
......@@ -56,6 +56,8 @@ type followNetworkComms interface {
*pb.GatewayPollResponse, time.Time, time.Duration, error)
RequestMessages(host *connect.Host, message *pb.GetMessages) (
*pb.GetMessagesResponse, error)
RequestBatchMessages(host *connect.Host,
message *pb.GetMessagesBatch) (*pb.GetMessagesResponseBatch, error)
}
// followNetwork polls the network to get updated on the state of nodes, the
......@@ -151,14 +153,14 @@ func (c *client) followNetwork(report ClientErrorReport,
"last %s, with an average newest packet latency of %s",
numPolls, debugTrackPeriod, latencyAvg)
jww.INFO.Printf("[Follow] " + infoMsg)
jww.INFO.Println(infoMsg)
c.events.Report(1, "Polling", "MetricsWithLatency", infoMsg)
} else {
infoMsg := fmt.Sprintf(
"[Follow] Polled the network %d times in the last %s", numPolls,
debugTrackPeriod)
jww.INFO.Printf("[Follow] " + infoMsg)
jww.INFO.Println(infoMsg)
c.events.Report(1, "Polling", "Metrics", infoMsg)
}
}
......@@ -365,7 +367,6 @@ func (c *client) follow(identity receptionID.IdentityUse,
// are messages waiting in rounds and then sends signals to the appropriate
// handling threads
roundChecker := func(rid id.Round) bool {
jww.TRACE.Printf("[Follow] checking round: %d", rid)
hasMessage := Checker(rid, filterList, identity.CR)
if !hasMessage && c.verboseRounds != nil {
c.verboseRounds.denote(rid, RoundState(NoMessageAvailable))
......
......@@ -37,6 +37,11 @@ type Params struct {
// Toggles if historical rounds should always be used
ForceHistoricalRounds bool
BatchMessageRetrieval bool
MaxBatchSize int
BatchDelay int
BatchPickupTimeout int
}
// paramsDisk will be the marshal-able and umarshal-able object.
......@@ -60,6 +65,9 @@ func GetDefaultParams() Params {
UncheckRoundPeriod: 120 * time.Second,
ForceMessagePickupRetry: false,
SendTimeout: 3 * time.Second,
MaxBatchSize: 20,
BatchPickupTimeout: 250,
BatchDelay: 100,
}
}
......
......@@ -41,6 +41,7 @@ type pickup struct {
lookupRoundMessages chan roundLookup
messageBundles chan<- message.Bundle
gatewayMessageRequests chan *pickupRequest
unchecked *store.UncheckedRoundStore
}
......@@ -51,6 +52,7 @@ func NewPickup(params Params, bundles chan<- message.Bundle,
rng *fastRNG.StreamGenerator, instance RoundGetter,
session storage.Session) Pickup {
unchecked := store.NewOrLoadUncheckedStore(session.GetKV())
m := &pickup{
params: params,
lookupRoundMessages: make(chan roundLookup, params.LookupRoundsBufferLen),
......@@ -62,6 +64,7 @@ func NewPickup(params Params, bundles chan<- message.Bundle,
unchecked: unchecked,
session: session,
comms: comms,
gatewayMessageRequests: make(chan *pickupRequest, params.LookupRoundsBufferLen),
}
return m
......@@ -72,12 +75,18 @@ func (m *pickup) StartProcessors() stoppable.Stoppable {
multi := stoppable.NewMulti("Pickup")
// Start the message retrieval worker pool
if m.params.BatchMessageRetrieval {
stopper := stoppable.NewSingle("Batch Message Retriever")
go m.processBatchMessageRetrieval(m.comms, stopper)
multi.Add(stopper)
} else {
for i := uint(0); i < m.params.NumMessageRetrievalWorkers; i++ {
stopper := stoppable.NewSingle(
"Message Retriever " + strconv.Itoa(int(i)))
go m.processMessageRetrieval(m.comms, stopper)
multi.Add(stopper)
}
}
// Start the periodic unchecked round worker
stopper := stoppable.NewSingle("UncheckRound")
......
......@@ -30,6 +30,8 @@ type MessageRetrievalComms interface {
GetHost(hostId *id.ID) (*connect.Host, bool)
RequestMessages(host *connect.Host, message *pb.GetMessages) (
*pb.GetMessagesResponse, error)
RequestBatchMessages(host *connect.Host,
message *pb.GetMessagesBatch) (*pb.GetMessagesResponseBatch, error)
}
type roundLookup struct {
......@@ -39,7 +41,7 @@ type roundLookup struct {
const noRoundError = "does not have round %d"
// processMessageRetrieval received a roundLookup request and pings the gateways
// processMessageRetrieval receives a roundLookup request and pings the gateways
// of that round for messages for the requested Identity in the roundLookup.
func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms,
stop *stoppable.Single) {
......@@ -51,8 +53,7 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms,
return
case rl := <-m.lookupRoundMessages:
ri := rl.Round
jww.DEBUG.Printf("Checking for messages in round %d", ri.ID)
jww.DEBUG.Printf("[processMessageRetrieval] Checking for messages in round %d", ri.ID)
err := m.unchecked.AddRound(id.Round(ri.ID), ri.Raw,
rl.Identity.Source, rl.Identity.EphId)
if err != nil {
......@@ -61,35 +62,10 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms,
id.Round(ri.ID))
}
// Convert gateways in round to proper ID format
gwIds := make([]*id.ID, ri.Topology.Len())
for i := 0; i < ri.Topology.Len(); i++ {
gwId := ri.Topology.GetNodeAtIndex(i).DeepCopy()
gwId.SetType(id.Gateway)
gwIds[i] = gwId
}
if len(gwIds) == 0 {
jww.WARN.Printf("Empty gateway ID List")
gwIds := m.getGatewayList(rl)
if gwIds == nil {
continue
}
// Target the last nodes in the team first because it has messages
// first, randomize other members of the team
var rndBytes [32]byte
stream := m.rng.GetStream()
_, err = stream.Read(rndBytes[:])
stream.Close()
if err != nil {
jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+
"from all gateways (%v): %s", ri.ID, gwIds, err)
}
gwIds[0], gwIds[len(gwIds)-1] = gwIds[len(gwIds)-1], gwIds[0]
shuffle.ShuffleSwap(rndBytes[:], len(gwIds)-1, func(i, j int) {
gwIds[i+1], gwIds[j+1] = gwIds[j+1], gwIds[i+1]
})
// If ForceMessagePickupRetry, we are forcing processUncheckedRounds
// by randomly not picking up messages (FOR INTEGRATION TEST). Only
// done if round has not been ignored before.
......@@ -104,7 +80,7 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms,
continue
}
if err != nil {
jww.ERROR.Printf("Failed to get pickup round %d from all "+
jww.ERROR.Printf("[processMessageRetrieval] Failed to get pickup round %d from all "+
"gateways (%v): %s", ri.ID, gwIds, err)
}
} else {
......@@ -121,40 +97,54 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms,
// After trying all gateways, if none returned we mark the round
// as a failure and print out the last error
if err != nil {
jww.ERROR.Printf("Failed to get pickup round %d "+
"from all gateways (%v): %s", ri.ID, gwIds, err)
jww.ERROR.Printf("[processMessageRetrieval] Failed to get pickup round %d "+
"from all gateways (%v): %s", rl.Round.ID, gwIds, err)
}
}
m.processBundle(bundle, rl.Identity, rl.Round)
}
}
}
jww.TRACE.Printf("messages: %v\n", bundle.Messages)
// getGatewayList returns a shuffled list of gateways for a roundLookup request.
func (m *pickup) getGatewayList(rl roundLookup) []*id.ID {
ri := rl.Round
if len(bundle.Messages) != 0 {
// If successful and there are messages, we send them to another
// thread
bundle.Identity = receptionID.EphemeralIdentity{
EphId: rl.Identity.EphId,
Source: rl.Identity.Source,
// Convert gateways in round to proper ID format
gwIds := make([]*id.ID, ri.Topology.Len())
for i := 0; i < ri.Topology.Len(); i++ {
gwId := ri.Topology.GetNodeAtIndex(i).DeepCopy()
gwId.SetType(id.Gateway)
gwIds[i] = gwId
}
bundle.RoundInfo = rl.Round
m.messageBundles <- bundle
jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID)
err = m.unchecked.Remove(
id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId)
if err != nil {
jww.ERROR.Printf("Could not remove round %d from "+
"unchecked rounds store: %v", ri.ID, err)
if len(gwIds) == 0 {
jww.WARN.Printf("Empty gateway ID List")
return nil
}
// Target the last nodes in the team first because it has messages
// first, randomize other members of the team
var rndBytes [32]byte
stream := m.rng.GetStream()
_, err := stream.Read(rndBytes[:])
stream.Close()
if err != nil {
jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+
"from all gateways (%v): %s", ri.ID, gwIds, err)
}
}
}
gwIds[0], gwIds[len(gwIds)-1] = gwIds[len(gwIds)-1], gwIds[0]
shuffle.ShuffleSwap(rndBytes[:], len(gwIds)-1, func(i, j int) {
gwIds[i+1], gwIds[j+1] = gwIds[j+1], gwIds[i+1]
})
return gwIds
}
// getMessagesFromGateway attempts to get messages from their assigned gateway
// host in the round specified. If successful
// getMessagesFromGateway attempts to pick up messages from their assigned
// gateway in the round specified. If successful, it returns a message.Bundle.
func (m *pickup) getMessagesFromGateway(roundID id.Round,
identity receptionID.EphemeralIdentity, comms MessageRetrievalComms,
gwIds []*id.ID, stop *stoppable.Single) (message.Bundle, error) {
......@@ -191,8 +181,6 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round,
return msgResp, nil
}, stop, m.params.SendTimeout)
jww.INFO.Printf("Received messages for round %d, processing...", roundID)
// Fail the round if an error occurs so that it can be tried again later
if err != nil {
return message.Bundle{}, errors.WithMessagef(
......@@ -200,6 +188,48 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round,
}
msgResp := result.(*pb.GetMessagesResponse)
bundle, err := m.buildMessageBundle(msgResp, identity, roundID)
if err != nil {
return message.Bundle{}, errors.WithMessagef(err, "Failed to process pickup response for round %d", roundID)
}
jww.INFO.Printf("Received %d messages in Round %d for %d (%s) in %s",
len(bundle.Messages), roundID, identity.EphId.Int64(), identity.Source,
netTime.Now().Sub(start))
return bundle, nil
}
// processBundle accepts a message.Bundle, EphemeralIdentity and round ID.
// If the bundle contains any messages, it iterates through them, sending
// them to the bundle channel for handling, and removing the associated
// rounds from m.unchecked.
func (m *pickup) processBundle(bundle message.Bundle, rid receptionID.EphemeralIdentity, ri rounds.Round) {
jww.TRACE.Printf("messages: %v\n", bundle.Messages)
if len(bundle.Messages) != 0 {
// If successful and there are messages, we send them to another
// thread
bundle.Identity = receptionID.EphemeralIdentity{
EphId: rid.EphId,
Source: rid.Source,
}
bundle.RoundInfo = ri
m.messageBundles <- bundle
jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID)
err := m.unchecked.Remove(
id.Round(ri.ID), rid.Source, rid.EphId)
if err != nil {
jww.ERROR.Printf("Could not remove round %d from "+
"unchecked rounds store: %v", ri.ID, err)
}
}
}
// buildMessageBundle builds a message.Bundle from a passed in
// pb.GetMessagesResponse, EphemeralIdentity and round ID.
func (m *pickup) buildMessageBundle(msgResp *pb.GetMessagesResponse, identity receptionID.EphemeralIdentity, roundID id.Round) (message.Bundle, error) {
// If there are no messages, print a warning. Due to the probabilistic
// nature of the bloom filters, false positives will happen sometimes
msgs := msgResp.GetMessages()
......@@ -209,7 +239,7 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round,
" but can be indicative of a problem if it is consistent",
identity.Source, roundID)
err = m.unchecked.EndCheck(roundID, identity.Source, identity.EphId)
err := m.unchecked.EndCheck(roundID, identity.Source, identity.EphId)
if err != nil {
jww.ERROR.Printf("Failed to end the check for the round round %d: %+v", roundID, err)
}
......@@ -217,10 +247,6 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round,
return message.Bundle{}, nil
}
jww.INFO.Printf("Received %d messages in Round %d for %d (%s) in %s",
len(msgs), roundID, identity.EphId.Int64(), identity.Source,
netTime.Now().Sub(start))
// Build the bundle of messages to send to the message processor
bundle := message.Bundle{
Round: roundID,
......@@ -237,18 +263,31 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round,
msg.Digest(), roundID)
bundle.Messages[i] = msg
}
return bundle, nil
}
// Helper function which forces processUncheckedRounds by randomly not looking
// up messages.
// forceMessagePickupRetry is a helper function which forces
// processUncheckedRounds by randomly not looking up messages.
func (m *pickup) forceMessagePickupRetry(ri rounds.Round, rl roundLookup,
comms MessageRetrievalComms, gwIds []*id.ID,
stop *stoppable.Single) (bundle message.Bundle, err error) {
if m.shouldForceMessagePickupRetry(rl) {
// Do not call get message, leaving the round to be picked up in
// unchecked round scheduler process
return
}
// Attempt to request for this gateway
return m.getMessagesFromGateway(
ri.ID, rl.Identity, comms, gwIds, stop)
}
// shouldForceMessagePickupRetry randomly determines if a roundLookup
// should be skipped to force a retry.
func (m *pickup) shouldForceMessagePickupRetry(rl roundLookup) bool {
rnd, _ := m.unchecked.GetRound(
ri.ID, rl.Identity.Source, rl.Identity.EphId)
rl.Round.ID, rl.Identity.Source, rl.Identity.EphId)
var err error
if rnd.NumChecks == 0 {
// Flip a coin to determine whether to pick up message
b := make([]byte, 8)
......@@ -261,15 +300,9 @@ func (m *pickup) forceMessagePickupRetry(ri rounds.Round, rl roundLookup,
result := binary.BigEndian.Uint64(b)
if result%2 == 0 {
jww.INFO.Printf("Forcing a message pickup retry for round %d", ri.ID)
// Do not call get message, leaving the round to be picked up in
// unchecked round scheduler process
return
jww.INFO.Printf("Forcing a message pickup retry for round %d", rl.Round.ID)
return true
}
}
// Attempt to request for this gateway
return m.getMessagesFromGateway(
ri.ID, rl.Identity, comms, gwIds, stop)
return false
}
package pickup
import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/v4/cmix/identity/receptionID"
"gitlab.com/elixxir/client/v4/cmix/rounds"
"gitlab.com/elixxir/client/v4/stoppable"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"time"
)
type pickupRequest struct {
target *id.ID
round rounds.Round
id receptionID.EphemeralIdentity
uncheckedGateways []*id.ID
checkedGateways []*id.ID
}
// processBatchMessageRetrieval is an alternative to processMessageRetrieval.
// It receives a roundLookup request and adds it to a batch, then pings a
// random gateway with the batch when either batch size == maxBatchSize or
// batchDelay milliseconds elapses after the first request is added to
// the batch. If a pickup fails, it is returned to the batch targeting the
// next gateway in the batch.
func (m *pickup) processBatchMessageRetrieval(comms MessageRetrievalComms, stop *stoppable.Single) {
maxBatchSize := m.params.MaxBatchSize
batchDelay := time.Duration(m.params.BatchDelay) * time.Millisecond
batch := make(map[id.Round]*pickupRequest)
var timer = &time.Timer{
C: make(<-chan time.Time),
}
for {
shouldProcess := false
var req *pickupRequest
select {
case <-stop.Quit():
stop.ToStopped()
return
case <-timer.C:
shouldProcess = true
case rl := <-m.lookupRoundMessages:
// Add incoming lookup reqeust to unchecked
ri := rl.Round
err := m.unchecked.AddRound(id.Round(ri.ID), ri.Raw,
rl.Identity.Source, rl.Identity.EphId)
if err != nil {
jww.FATAL.Panicf(
"Failed to denote Unchecked Round for round %d",
id.Round(ri.ID))
}
// Get shuffled list of gateways in round
gwIds := m.getGatewayList(rl)
if m.params.ForceMessagePickupRetry && m.shouldForceMessagePickupRetry(rl) {
// Do not add to the batch, leaving the round to be picked up in
// unchecked round scheduler process
continue
}
// Initialize pickupRequest for new round
req = &pickupRequest{gwIds[0], rl.Round, rl.Identity, gwIds[1:], nil}
case req = <-m.gatewayMessageRequests: // Request retries
}
if req != nil {
rid := req.round.ID
// Add incoming request to batch
_, ok := batch[rid]
if !ok {
jww.DEBUG.Printf("[processBatchMessageRetrieval] Added round %d to batch", rid)
batch[req.round.ID] = req
if len(batch) >= maxBatchSize {
shouldProcess = true
} else if len(batch) == 1 {
timer = time.NewTimer(batchDelay)
}
} else {
jww.DEBUG.Printf("[processBatchMessageRetrieval] Ignoring request to add round %d; already in batch", rid)
}
}
// Continue unless batch is full or timer has elapsed
if !shouldProcess {
continue
}
jww.TRACE.Printf("[processBatchMessageRetrieval] Sending batch message request for %d rounds", len(batch))
// Reset timer
timer.Stop()
timer = &time.Timer{
C: make(<-chan time.Time),
}
// Build batch message request
msg := &pb.GetMessagesBatch{
Requests: make([]*pb.GetMessages, len(batch)),
Timeout: uint64(m.params.BatchPickupTimeout),
}
orderedBatch := make([]*pickupRequest, len(batch))
index := 0
for _, v := range batch {
orderedBatch[index] = v
msg.Requests[index] = &pb.GetMessages{
ClientID: v.id.EphId[:],
RoundID: uint64(v.round.ID),
Target: v.target.Marshal(),
}
index++
}
// Send batch pickup request to any gateway
resp, err := m.sender.SendToAny(func(host *connect.Host) (interface{}, error) {
return comms.RequestBatchMessages(host, msg)
}, stop)
if err != nil {
jww.ERROR.Printf("Failed to request batch of messages: %+v", err)
continue
}
// Process responses
batchResponse := resp.(*pb.GetMessagesResponseBatch)
for i, result := range batchResponse.GetResults() {
proxiedRequest := orderedBatch[i]
// Handler gw did not receive response in time/did not have contact with proxiedRequest
if result == nil {
jww.DEBUG.Printf("[processBatchMessageRetrieval] Handler gateway did not receive anything from target %s", proxiedRequest.target)
go m.tryNextGateway(proxiedRequest)
continue
}
// Handler gw encountered error getting messages from proxiedRequest
respErr := batchResponse.GetErrors()[i]
if respErr != "" {
jww.ERROR.Printf("[processBatchMessageRetrieval] Handler gateway encountered error attempting to pick up messages from target %s: %s", proxiedRequest.target, respErr)
go m.tryNextGateway(proxiedRequest)
continue
}
// Process response from proxiedRequest gateway
bundle, err := m.buildMessageBundle(result, proxiedRequest.id, proxiedRequest.round.ID)
if err != nil {
jww.ERROR.Printf("[processBatchMessageRetrieval] Failed to process pickup response from proxiedRequest gateway %s: %+v", proxiedRequest.target, err)
go m.tryNextGateway(proxiedRequest)
continue
}
// Handle received bundle
m.processBundle(bundle, proxiedRequest.id, proxiedRequest.round)
}
// Empty batch before restarting loop
batch = make(map[id.Round]*pickupRequest)
}
}
// tryNextGateway sends a pickupRequest back in the batch, targeting the next
// gateway in list of unchecked gateways.
func (m *pickup) tryNextGateway(req *pickupRequest) {
// If there are no more unchecked gateways, log an error & return
if len(req.uncheckedGateways) == 0 {
jww.ERROR.Printf("[processBatchMessageRetrieval] Failed to get pickup round %d "+
"from all gateways (%v)", req.round.ID, append(req.checkedGateways, req.target))
return
}
select {
case m.gatewayMessageRequests <- &pickupRequest{
target: req.uncheckedGateways[0],
round: req.round,
id: req.id,
uncheckedGateways: req.uncheckedGateways[1:],
checkedGateways: append(req.checkedGateways, req.target),
}:
default:
}
}
package pickup
import (
"bytes"
"gitlab.com/elixxir/client/v4/cmix/gateway"
ephemeral2 "gitlab.com/elixxir/client/v4/cmix/identity/receptionID"
"gitlab.com/elixxir/client/v4/cmix/message"
"gitlab.com/elixxir/client/v4/cmix/rounds"
"gitlab.com/elixxir/client/v4/stoppable"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/ndf"
"reflect"
"testing"
"time"
)
// Happy path.
func Test_manager_processBatchMessageRetrieval(t *testing.T) {
// General initializations
connect.TestingOnlyDisableTLS = true
testManager := newManager(t)
roundId := id.Round(5)
mockComms := &mockMessageRetrievalComms{testingSignature: t}
stop := stoppable.NewSingle("singleStoppable")
testNdf := getNDF()
nodeId := id.NewIdFromString(ReturningGateway, id.Node, &testing.T{})
gwId := nodeId.DeepCopy()
gwId.SetType(id.Gateway)
testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}}
testManager.rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG)
p := gateway.DefaultPoolParams()
p.MaxPoolSize = 1
var err error
addChan := make(chan network.NodeGateway, 1)
testManager.sender, err = gateway.NewTestingSender(p, testManager.rng,
testNdf, mockComms, testManager.session, addChan, t)
if err != nil {
t.Errorf(err.Error())
}
// Create a local channel so reception is possible
// (testManager.messageBundles is sent only via newManager call above)
messageBundleChan := make(chan message.Bundle)
testManager.messageBundles = messageBundleChan
// Initialize the message retrieval
go testManager.processBatchMessageRetrieval(mockComms, stop)
// Construct expected values for checking
expectedEphID := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8}
payloadMsg := []byte(PayloadMessage)
expectedPayload := make([]byte, 256)
copy(expectedPayload, payloadMsg)
go func() {
requestGateway := id.NewIdFromString(ReturningGateway, id.Gateway, t)
// Construct the round lookup
ephIdentity := ephemeral2.EphemeralIdentity{
EphId: expectedEphID,
Source: requestGateway,
}
round := rounds.Round{
ID: roundId,
Topology: connect.NewCircuit([]*id.ID{requestGateway}),
}
// Send a round look up request
testManager.lookupRoundMessages <- roundLookup{
Round: round,
Identity: ephIdentity,
}
}()
var testBundle message.Bundle
select {
case testBundle = <-messageBundleChan:
case <-time.After(1500 * time.Millisecond):
t.Errorf("Timed out waiting for messageBundleChan.")
}
err = stop.Close()
if err != nil {
t.Errorf("Failed to signal close to process: %+v", err)
}
// Ensure bundle received and has expected values
time.Sleep(2 * time.Second)
if reflect.DeepEqual(testBundle, message.Bundle{}) {
t.Fatal("Did not receive a message bundle over the channel")
}
if testBundle.Identity.EphId.Int64() != expectedEphID.Int64() {
t.Errorf("Unexpected address ID in bundle."+
"\n\tExpected: %v"+
"\n\tReceived: %v", expectedEphID, testBundle.Identity.EphId)
}
if !bytes.Equal(expectedPayload, testBundle.Messages[0].GetPayloadA()) {
t.Errorf("Unexpected address ID in bundle."+
"\n\tExpected: %v"+
"\n\tReceived: %v", expectedPayload, testBundle.Messages[0].GetPayloadA())
}
}
......@@ -143,6 +143,25 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host,
return nil, nil
}
func (mmrc *mockMessageRetrievalComms) RequestBatchMessages(host *connect.Host, req *pb.GetMessagesBatch) (*pb.GetMessagesResponseBatch, error) {
ret := make([]*pb.GetMessagesResponse, len(req.GetRequests()))
for i, mreq := range req.GetRequests() {
targetId, err := id.Unmarshal(mreq.Target)
if err != nil {
}
h, err := connect.NewHost(targetId, "0.0.0.0", nil, connect.GetDefaultHostParams())
if err != nil {
}
ret[i], err = mmrc.RequestMessages(h, mreq)
}
return &pb.GetMessagesResponseBatch{
Results: ret,
Errors: make([]string, len(ret)),
}, nil
}
func newTestBackoffTable(face interface{}) [cappedTries]time.Duration {
switch face.(type) {
case *testing.T, *testing.M, *testing.B, *testing.PB:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment