diff --git a/README.md b/README.md index 5920ef91d1af43d95b71c7059553c19750bf5f4d..3c4821c580590d294f8c40f22c165060118ed67c 100644 --- a/README.md +++ b/README.md @@ -295,6 +295,10 @@ Flags: --verify-sends Ensure successful message sending by checking for round completion --waitTimeout uint The number of seconds to wait for messages to arrive (default 15) -w, --writeContact string Write contact information, if any, to this file, defaults to stdout (default "-") + --batchMessagePickup Enables alternate message pickup logic which processes batches + --batchPickupDelay int Sets the delay (in MS) before a batch pickup request is sent, even if the batch is not full (default 50) + --batchPickupTimeout int Sets the timeout duration (in MS) sent to gateways that proxy batch message pickup requests (default 250) + --maxPickupBatchSize int Set the maximum number of requests in a batch pickup message (default 20) Use "client [command] --help" for more information about a command. ``` @@ -508,4 +512,4 @@ use the correct versions as listed below. 7. Check that `protoc-gen-go-grpc` is installed with the correct version. protoc-gen-go-grpc --version - protoc-gen-go-grpc 1.2.0 \ No newline at end of file + protoc-gen-go-grpc 1.2.0 diff --git a/cmd/flags.go b/cmd/flags.go index cb234c2f3233ca3e0c1f30d358bac6807f50436c..bb300a1fdcdc1a1ca2aa07bbdd88d1c9364651b8 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -71,6 +71,10 @@ const ( forceHistoricalRoundsFlag = "forceHistoricalRounds" slowPollingFlag = "slowPolling" forceMessagePickupRetryFlag = "forceMessagePickupRetry" + batchMessagePickupFlag = "batchMessagePickup" + maxPickupBatchSizeFlag = "maxPickupBatchSize" + batchPickupDelayFlag = "batchPickupDelay" + batchPickupTimeoutFlag = "batchPickupTimeout" // E2E Params e2eMinKeysFlag = "e2eMinKeys" diff --git a/cmd/root.go b/cmd/root.go index 356619fa9758d8bc2726f921ed8414fea147c079..cf8c3a0ed7c6ff101b54293b3a6505f85abdef1a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -457,6 +457,11 @@ func initParams() (xxdk.CMIXParams, xxdk.E2EParams) { cmixParams.Network.WhitelistedGateways = viper.GetStringSlice(gatewayWhitelistFlag) + cmixParams.Network.Pickup.BatchMessageRetrieval = viper.GetBool(batchMessagePickupFlag) + cmixParams.Network.Pickup.MaxBatchSize = viper.GetInt(maxPickupBatchSizeFlag) + cmixParams.Network.Pickup.BatchPickupTimeout = viper.GetInt(batchPickupTimeoutFlag) + cmixParams.Network.Pickup.BatchDelay = viper.GetInt(batchPickupDelayFlag) + return cmixParams, e2eParams } @@ -1131,6 +1136,26 @@ func init() { viper.BindPFlag(forceMessagePickupRetryFlag, rootCmd.Flags().Lookup(forceMessagePickupRetryFlag)) + rootCmd.PersistentFlags().Bool(batchMessagePickupFlag, false, + "Enables alternate message pickup logic which processes batches") + viper.BindPFlag(batchMessagePickupFlag, + rootCmd.PersistentFlags().Lookup(batchMessagePickupFlag)) + + rootCmd.PersistentFlags().Int(maxPickupBatchSizeFlag, 20, + "Set the maximum number of requests in a batch pickup message") + viper.BindPFlag(maxPickupBatchSizeFlag, + rootCmd.PersistentFlags().Lookup(maxPickupBatchSizeFlag)) + + rootCmd.PersistentFlags().Int(batchPickupDelayFlag, 50, + "Sets the delay (in MS) before a batch pickup request is sent, even if the batch is not full") + viper.BindPFlag(batchPickupDelayFlag, + rootCmd.PersistentFlags().Lookup(batchPickupDelayFlag)) + + rootCmd.PersistentFlags().Int(batchPickupTimeoutFlag, 250, + "Sets the timeout duration (in MS) sent to gateways that proxy batch message pickup requests") + viper.BindPFlag(batchPickupTimeoutFlag, + rootCmd.PersistentFlags().Lookup(batchPickupTimeoutFlag)) + // E2E Params defaultE2EParams := session.GetDefaultParams() rootCmd.Flags().UintP(e2eMinKeysFlag, diff --git a/cmix/follow.go b/cmix/follow.go index 1c71a9ca625796109ab9126f0f31a5a875c0744f..a0b5fe670d7518d84e090b266e9c6b52b1695c31 100644 --- a/cmix/follow.go +++ b/cmix/follow.go @@ -56,6 +56,8 @@ type followNetworkComms interface { *pb.GatewayPollResponse, time.Time, time.Duration, error) RequestMessages(host *connect.Host, message *pb.GetMessages) ( *pb.GetMessagesResponse, error) + RequestBatchMessages(host *connect.Host, + message *pb.GetMessagesBatch) (*pb.GetMessagesResponseBatch, error) } // followNetwork polls the network to get updated on the state of nodes, the @@ -151,14 +153,14 @@ func (c *client) followNetwork(report ClientErrorReport, "last %s, with an average newest packet latency of %s", numPolls, debugTrackPeriod, latencyAvg) - jww.INFO.Printf("[Follow] " + infoMsg) + jww.INFO.Println(infoMsg) c.events.Report(1, "Polling", "MetricsWithLatency", infoMsg) } else { infoMsg := fmt.Sprintf( "[Follow] Polled the network %d times in the last %s", numPolls, debugTrackPeriod) - jww.INFO.Printf("[Follow] " + infoMsg) + jww.INFO.Println(infoMsg) c.events.Report(1, "Polling", "Metrics", infoMsg) } } @@ -365,7 +367,6 @@ func (c *client) follow(identity receptionID.IdentityUse, // are messages waiting in rounds and then sends signals to the appropriate // handling threads roundChecker := func(rid id.Round) bool { - jww.TRACE.Printf("[Follow] checking round: %d", rid) hasMessage := Checker(rid, filterList, identity.CR) if !hasMessage && c.verboseRounds != nil { c.verboseRounds.denote(rid, RoundState(NoMessageAvailable)) diff --git a/cmix/pickup/params.go b/cmix/pickup/params.go index a30a065290b34701d4f11da91594778c9fdbbfcd..16a0fce3f8d7adec7f67434fc6093d17c0118f79 100644 --- a/cmix/pickup/params.go +++ b/cmix/pickup/params.go @@ -37,6 +37,11 @@ type Params struct { // Toggles if historical rounds should always be used ForceHistoricalRounds bool + + BatchMessageRetrieval bool + MaxBatchSize int + BatchDelay int + BatchPickupTimeout int } // paramsDisk will be the marshal-able and umarshal-able object. @@ -60,6 +65,9 @@ func GetDefaultParams() Params { UncheckRoundPeriod: 120 * time.Second, ForceMessagePickupRetry: false, SendTimeout: 3 * time.Second, + MaxBatchSize: 20, + BatchPickupTimeout: 250, + BatchDelay: 100, } } diff --git a/cmix/pickup/pickup.go b/cmix/pickup/pickup.go index 5d9ea653f5de648b2a0d387583a57a8cf1940308..fafb1ee3dd67121d031fab71224414d38868aa10 100644 --- a/cmix/pickup/pickup.go +++ b/cmix/pickup/pickup.go @@ -39,8 +39,9 @@ type pickup struct { instance RoundGetter - lookupRoundMessages chan roundLookup - messageBundles chan<- message.Bundle + lookupRoundMessages chan roundLookup + messageBundles chan<- message.Bundle + gatewayMessageRequests chan *pickupRequest unchecked *store.UncheckedRoundStore } @@ -51,17 +52,19 @@ func NewPickup(params Params, bundles chan<- message.Bundle, rng *fastRNG.StreamGenerator, instance RoundGetter, session storage.Session) Pickup { unchecked := store.NewOrLoadUncheckedStore(session.GetKV()) + m := &pickup{ - params: params, - lookupRoundMessages: make(chan roundLookup, params.LookupRoundsBufferLen), - messageBundles: bundles, - sender: sender, - historical: historical, - rng: rng, - instance: instance, - unchecked: unchecked, - session: session, - comms: comms, + params: params, + lookupRoundMessages: make(chan roundLookup, params.LookupRoundsBufferLen), + messageBundles: bundles, + sender: sender, + historical: historical, + rng: rng, + instance: instance, + unchecked: unchecked, + session: session, + comms: comms, + gatewayMessageRequests: make(chan *pickupRequest, params.LookupRoundsBufferLen), } return m @@ -72,11 +75,17 @@ func (m *pickup) StartProcessors() stoppable.Stoppable { multi := stoppable.NewMulti("Pickup") // Start the message retrieval worker pool - for i := uint(0); i < m.params.NumMessageRetrievalWorkers; i++ { - stopper := stoppable.NewSingle( - "Message Retriever " + strconv.Itoa(int(i))) - go m.processMessageRetrieval(m.comms, stopper) + if m.params.BatchMessageRetrieval { + stopper := stoppable.NewSingle("Batch Message Retriever") + go m.processBatchMessageRetrieval(m.comms, stopper) multi.Add(stopper) + } else { + for i := uint(0); i < m.params.NumMessageRetrievalWorkers; i++ { + stopper := stoppable.NewSingle( + "Message Retriever " + strconv.Itoa(int(i))) + go m.processMessageRetrieval(m.comms, stopper) + multi.Add(stopper) + } } // Start the periodic unchecked round worker diff --git a/cmix/pickup/retrieve.go b/cmix/pickup/retrieve.go index f7d0925ce2f68a485aa281ff2721976b1f63bb9f..f9f44c0620520806408fbf9e29633cd4f6598a46 100644 --- a/cmix/pickup/retrieve.go +++ b/cmix/pickup/retrieve.go @@ -30,6 +30,8 @@ type MessageRetrievalComms interface { GetHost(hostId *id.ID) (*connect.Host, bool) RequestMessages(host *connect.Host, message *pb.GetMessages) ( *pb.GetMessagesResponse, error) + RequestBatchMessages(host *connect.Host, + message *pb.GetMessagesBatch) (*pb.GetMessagesResponseBatch, error) } type roundLookup struct { @@ -39,7 +41,7 @@ type roundLookup struct { const noRoundError = "does not have round %d" -// processMessageRetrieval received a roundLookup request and pings the gateways +// processMessageRetrieval receives a roundLookup request and pings the gateways // of that round for messages for the requested Identity in the roundLookup. func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms, stop *stoppable.Single) { @@ -51,8 +53,7 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms, return case rl := <-m.lookupRoundMessages: ri := rl.Round - jww.DEBUG.Printf("Checking for messages in round %d", ri.ID) - + jww.DEBUG.Printf("[processMessageRetrieval] Checking for messages in round %d", ri.ID) err := m.unchecked.AddRound(id.Round(ri.ID), ri.Raw, rl.Identity.Source, rl.Identity.EphId) if err != nil { @@ -61,35 +62,10 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms, id.Round(ri.ID)) } - // Convert gateways in round to proper ID format - gwIds := make([]*id.ID, ri.Topology.Len()) - for i := 0; i < ri.Topology.Len(); i++ { - gwId := ri.Topology.GetNodeAtIndex(i).DeepCopy() - gwId.SetType(id.Gateway) - gwIds[i] = gwId - } - - if len(gwIds) == 0 { - jww.WARN.Printf("Empty gateway ID List") + gwIds := m.getGatewayList(rl) + if gwIds == nil { continue } - - // Target the last nodes in the team first because it has messages - // first, randomize other members of the team - var rndBytes [32]byte - stream := m.rng.GetStream() - _, err = stream.Read(rndBytes[:]) - stream.Close() - if err != nil { - jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+ - "from all gateways (%v): %s", ri.ID, gwIds, err) - } - - gwIds[0], gwIds[len(gwIds)-1] = gwIds[len(gwIds)-1], gwIds[0] - shuffle.ShuffleSwap(rndBytes[:], len(gwIds)-1, func(i, j int) { - gwIds[i+1], gwIds[j+1] = gwIds[j+1], gwIds[i+1] - }) - // If ForceMessagePickupRetry, we are forcing processUncheckedRounds // by randomly not picking up messages (FOR INTEGRATION TEST). Only // done if round has not been ignored before. @@ -104,7 +80,7 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms, continue } if err != nil { - jww.ERROR.Printf("Failed to get pickup round %d from all "+ + jww.ERROR.Printf("[processMessageRetrieval] Failed to get pickup round %d from all "+ "gateways (%v): %s", ri.ID, gwIds, err) } } else { @@ -121,40 +97,54 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms, // After trying all gateways, if none returned we mark the round // as a failure and print out the last error if err != nil { - jww.ERROR.Printf("Failed to get pickup round %d "+ - "from all gateways (%v): %s", ri.ID, gwIds, err) + jww.ERROR.Printf("[processMessageRetrieval] Failed to get pickup round %d "+ + "from all gateways (%v): %s", rl.Round.ID, gwIds, err) } - } - jww.TRACE.Printf("messages: %v\n", bundle.Messages) + m.processBundle(bundle, rl.Identity, rl.Round) + } + } +} - if len(bundle.Messages) != 0 { - // If successful and there are messages, we send them to another - // thread - bundle.Identity = receptionID.EphemeralIdentity{ - EphId: rl.Identity.EphId, - Source: rl.Identity.Source, - } - bundle.RoundInfo = rl.Round - m.messageBundles <- bundle +// getGatewayList returns a shuffled list of gateways for a roundLookup request. +func (m *pickup) getGatewayList(rl roundLookup) []*id.ID { + ri := rl.Round - jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID) - err = m.unchecked.Remove( - id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId) - if err != nil { - jww.ERROR.Printf("Could not remove round %d from "+ - "unchecked rounds store: %v", ri.ID, err) - } + // Convert gateways in round to proper ID format + gwIds := make([]*id.ID, ri.Topology.Len()) + for i := 0; i < ri.Topology.Len(); i++ { + gwId := ri.Topology.GetNodeAtIndex(i).DeepCopy() + gwId.SetType(id.Gateway) + gwIds[i] = gwId + } - } + if len(gwIds) == 0 { + jww.WARN.Printf("Empty gateway ID List") + return nil + } - } + // Target the last nodes in the team first because it has messages + // first, randomize other members of the team + var rndBytes [32]byte + stream := m.rng.GetStream() + _, err := stream.Read(rndBytes[:]) + stream.Close() + if err != nil { + jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+ + "from all gateways (%v): %s", ri.ID, gwIds, err) } + + gwIds[0], gwIds[len(gwIds)-1] = gwIds[len(gwIds)-1], gwIds[0] + shuffle.ShuffleSwap(rndBytes[:], len(gwIds)-1, func(i, j int) { + gwIds[i+1], gwIds[j+1] = gwIds[j+1], gwIds[i+1] + }) + + return gwIds } -// getMessagesFromGateway attempts to get messages from their assigned gateway -// host in the round specified. If successful +// getMessagesFromGateway attempts to pick up messages from their assigned +// gateway in the round specified. If successful, it returns a message.Bundle. func (m *pickup) getMessagesFromGateway(roundID id.Round, identity receptionID.EphemeralIdentity, comms MessageRetrievalComms, gwIds []*id.ID, stop *stoppable.Single) (message.Bundle, error) { @@ -191,8 +181,6 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round, return msgResp, nil }, stop, m.params.SendTimeout) - jww.INFO.Printf("Received messages for round %d, processing...", roundID) - // Fail the round if an error occurs so that it can be tried again later if err != nil { return message.Bundle{}, errors.WithMessagef( @@ -200,6 +188,48 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round, } msgResp := result.(*pb.GetMessagesResponse) + bundle, err := m.buildMessageBundle(msgResp, identity, roundID) + if err != nil { + return message.Bundle{}, errors.WithMessagef(err, "Failed to process pickup response for round %d", roundID) + } + + jww.INFO.Printf("Received %d messages in Round %d for %d (%s) in %s", + len(bundle.Messages), roundID, identity.EphId.Int64(), identity.Source, + netTime.Now().Sub(start)) + + return bundle, nil +} + +// processBundle accepts a message.Bundle, EphemeralIdentity and round ID. +// If the bundle contains any messages, it iterates through them, sending +// them to the bundle channel for handling, and removing the associated +// rounds from m.unchecked. +func (m *pickup) processBundle(bundle message.Bundle, rid receptionID.EphemeralIdentity, ri rounds.Round) { + jww.TRACE.Printf("messages: %v\n", bundle.Messages) + + if len(bundle.Messages) != 0 { + // If successful and there are messages, we send them to another + // thread + bundle.Identity = receptionID.EphemeralIdentity{ + EphId: rid.EphId, + Source: rid.Source, + } + bundle.RoundInfo = ri + m.messageBundles <- bundle + + jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID) + err := m.unchecked.Remove( + id.Round(ri.ID), rid.Source, rid.EphId) + if err != nil { + jww.ERROR.Printf("Could not remove round %d from "+ + "unchecked rounds store: %v", ri.ID, err) + } + } +} + +// buildMessageBundle builds a message.Bundle from a passed in +// pb.GetMessagesResponse, EphemeralIdentity and round ID. +func (m *pickup) buildMessageBundle(msgResp *pb.GetMessagesResponse, identity receptionID.EphemeralIdentity, roundID id.Round) (message.Bundle, error) { // If there are no messages, print a warning. Due to the probabilistic // nature of the bloom filters, false positives will happen sometimes msgs := msgResp.GetMessages() @@ -209,7 +239,7 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round, " but can be indicative of a problem if it is consistent", identity.Source, roundID) - err = m.unchecked.EndCheck(roundID, identity.Source, identity.EphId) + err := m.unchecked.EndCheck(roundID, identity.Source, identity.EphId) if err != nil { jww.ERROR.Printf("Failed to end the check for the round round %d: %+v", roundID, err) } @@ -217,10 +247,6 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round, return message.Bundle{}, nil } - jww.INFO.Printf("Received %d messages in Round %d for %d (%s) in %s", - len(msgs), roundID, identity.EphId.Int64(), identity.Source, - netTime.Now().Sub(start)) - // Build the bundle of messages to send to the message processor bundle := message.Bundle{ Round: roundID, @@ -237,18 +263,31 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round, msg.Digest(), roundID) bundle.Messages[i] = msg } - return bundle, nil - } -// Helper function which forces processUncheckedRounds by randomly not looking -// up messages. +// forceMessagePickupRetry is a helper function which forces +// processUncheckedRounds by randomly not looking up messages. func (m *pickup) forceMessagePickupRetry(ri rounds.Round, rl roundLookup, comms MessageRetrievalComms, gwIds []*id.ID, stop *stoppable.Single) (bundle message.Bundle, err error) { + if m.shouldForceMessagePickupRetry(rl) { + // Do not call get message, leaving the round to be picked up in + // unchecked round scheduler process + return + } + + // Attempt to request for this gateway + return m.getMessagesFromGateway( + ri.ID, rl.Identity, comms, gwIds, stop) +} + +// shouldForceMessagePickupRetry randomly determines if a roundLookup +// should be skipped to force a retry. +func (m *pickup) shouldForceMessagePickupRetry(rl roundLookup) bool { rnd, _ := m.unchecked.GetRound( - ri.ID, rl.Identity.Source, rl.Identity.EphId) + rl.Round.ID, rl.Identity.Source, rl.Identity.EphId) + var err error if rnd.NumChecks == 0 { // Flip a coin to determine whether to pick up message b := make([]byte, 8) @@ -261,15 +300,9 @@ func (m *pickup) forceMessagePickupRetry(ri rounds.Round, rl roundLookup, result := binary.BigEndian.Uint64(b) if result%2 == 0 { - jww.INFO.Printf("Forcing a message pickup retry for round %d", ri.ID) - // Do not call get message, leaving the round to be picked up in - // unchecked round scheduler process - return + jww.INFO.Printf("Forcing a message pickup retry for round %d", rl.Round.ID) + return true } - } - - // Attempt to request for this gateway - return m.getMessagesFromGateway( - ri.ID, rl.Identity, comms, gwIds, stop) + return false } diff --git a/cmix/pickup/retrieveBatch.go b/cmix/pickup/retrieveBatch.go new file mode 100644 index 0000000000000000000000000000000000000000..eaf46b5e33694493ef41e9c450a21fe868d3c534 --- /dev/null +++ b/cmix/pickup/retrieveBatch.go @@ -0,0 +1,185 @@ +package pickup + +import ( + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/cmix/identity/receptionID" + "gitlab.com/elixxir/client/v4/cmix/rounds" + "gitlab.com/elixxir/client/v4/stoppable" + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/primitives/id" + "time" +) + +type pickupRequest struct { + target *id.ID + round rounds.Round + id receptionID.EphemeralIdentity + uncheckedGateways []*id.ID + checkedGateways []*id.ID +} + +// processBatchMessageRetrieval is an alternative to processMessageRetrieval. +// It receives a roundLookup request and adds it to a batch, then pings a +// random gateway with the batch when either batch size == maxBatchSize or +// batchDelay milliseconds elapses after the first request is added to +// the batch. If a pickup fails, it is returned to the batch targeting the +// next gateway in the batch. +func (m *pickup) processBatchMessageRetrieval(comms MessageRetrievalComms, stop *stoppable.Single) { + maxBatchSize := m.params.MaxBatchSize + batchDelay := time.Duration(m.params.BatchDelay) * time.Millisecond + batch := make(map[id.Round]*pickupRequest) + + var timer = &time.Timer{ + C: make(<-chan time.Time), + } + + for { + shouldProcess := false + + var req *pickupRequest + select { + case <-stop.Quit(): + stop.ToStopped() + return + case <-timer.C: + shouldProcess = true + case rl := <-m.lookupRoundMessages: + // Add incoming lookup reqeust to unchecked + ri := rl.Round + err := m.unchecked.AddRound(id.Round(ri.ID), ri.Raw, + rl.Identity.Source, rl.Identity.EphId) + if err != nil { + jww.FATAL.Panicf( + "Failed to denote Unchecked Round for round %d", + id.Round(ri.ID)) + } + + // Get shuffled list of gateways in round + gwIds := m.getGatewayList(rl) + + if m.params.ForceMessagePickupRetry && m.shouldForceMessagePickupRetry(rl) { + // Do not add to the batch, leaving the round to be picked up in + // unchecked round scheduler process + continue + } + + // Initialize pickupRequest for new round + req = &pickupRequest{gwIds[0], rl.Round, rl.Identity, gwIds[1:], nil} + case req = <-m.gatewayMessageRequests: // Request retries + } + + if req != nil { + rid := req.round.ID + // Add incoming request to batch + _, ok := batch[rid] + if !ok { + jww.DEBUG.Printf("[processBatchMessageRetrieval] Added round %d to batch", rid) + batch[req.round.ID] = req + if len(batch) >= maxBatchSize { + shouldProcess = true + } else if len(batch) == 1 { + timer = time.NewTimer(batchDelay) + } + } else { + jww.DEBUG.Printf("[processBatchMessageRetrieval] Ignoring request to add round %d; already in batch", rid) + } + + } + + // Continue unless batch is full or timer has elapsed + if !shouldProcess { + continue + } + + jww.TRACE.Printf("[processBatchMessageRetrieval] Sending batch message request for %d rounds", len(batch)) + + // Reset timer + timer.Stop() + timer = &time.Timer{ + C: make(<-chan time.Time), + } + + // Build batch message request + msg := &pb.GetMessagesBatch{ + Requests: make([]*pb.GetMessages, len(batch)), + Timeout: uint64(m.params.BatchPickupTimeout), + } + + orderedBatch := make([]*pickupRequest, len(batch)) + index := 0 + for _, v := range batch { + orderedBatch[index] = v + msg.Requests[index] = &pb.GetMessages{ + ClientID: v.id.EphId[:], + RoundID: uint64(v.round.ID), + Target: v.target.Marshal(), + } + index++ + } + + // Send batch pickup request to any gateway + resp, err := m.sender.SendToAny(func(host *connect.Host) (interface{}, error) { + return comms.RequestBatchMessages(host, msg) + }, stop) + if err != nil { + jww.ERROR.Printf("Failed to request batch of messages: %+v", err) + continue + } + + // Process responses + batchResponse := resp.(*pb.GetMessagesResponseBatch) + for i, result := range batchResponse.GetResults() { + proxiedRequest := orderedBatch[i] + // Handler gw did not receive response in time/did not have contact with proxiedRequest + if result == nil { + jww.DEBUG.Printf("[processBatchMessageRetrieval] Handler gateway did not receive anything from target %s", proxiedRequest.target) + go m.tryNextGateway(proxiedRequest) + continue + } + + // Handler gw encountered error getting messages from proxiedRequest + respErr := batchResponse.GetErrors()[i] + if respErr != "" { + jww.ERROR.Printf("[processBatchMessageRetrieval] Handler gateway encountered error attempting to pick up messages from target %s: %s", proxiedRequest.target, respErr) + go m.tryNextGateway(proxiedRequest) + continue + } + + // Process response from proxiedRequest gateway + bundle, err := m.buildMessageBundle(result, proxiedRequest.id, proxiedRequest.round.ID) + if err != nil { + jww.ERROR.Printf("[processBatchMessageRetrieval] Failed to process pickup response from proxiedRequest gateway %s: %+v", proxiedRequest.target, err) + go m.tryNextGateway(proxiedRequest) + continue + } + + // Handle received bundle + m.processBundle(bundle, proxiedRequest.id, proxiedRequest.round) + } + + // Empty batch before restarting loop + batch = make(map[id.Round]*pickupRequest) + } +} + +// tryNextGateway sends a pickupRequest back in the batch, targeting the next +// gateway in list of unchecked gateways. +func (m *pickup) tryNextGateway(req *pickupRequest) { + // If there are no more unchecked gateways, log an error & return + if len(req.uncheckedGateways) == 0 { + jww.ERROR.Printf("[processBatchMessageRetrieval] Failed to get pickup round %d "+ + "from all gateways (%v)", req.round.ID, append(req.checkedGateways, req.target)) + return + } + select { + case m.gatewayMessageRequests <- &pickupRequest{ + target: req.uncheckedGateways[0], + round: req.round, + id: req.id, + uncheckedGateways: req.uncheckedGateways[1:], + checkedGateways: append(req.checkedGateways, req.target), + }: + default: + } +} diff --git a/cmix/pickup/retrieveBatch_test.go b/cmix/pickup/retrieveBatch_test.go new file mode 100644 index 0000000000000000000000000000000000000000..03be8fcd93e26e8e08498cdec0097d47628e1f71 --- /dev/null +++ b/cmix/pickup/retrieveBatch_test.go @@ -0,0 +1,115 @@ +package pickup + +import ( + "bytes" + "gitlab.com/elixxir/client/v4/cmix/gateway" + ephemeral2 "gitlab.com/elixxir/client/v4/cmix/identity/receptionID" + "gitlab.com/elixxir/client/v4/cmix/message" + "gitlab.com/elixxir/client/v4/cmix/rounds" + "gitlab.com/elixxir/client/v4/stoppable" + "gitlab.com/elixxir/comms/network" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/crypto/csprng" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" + "gitlab.com/xx_network/primitives/ndf" + "reflect" + "testing" + "time" +) + +// Happy path. +func Test_manager_processBatchMessageRetrieval(t *testing.T) { + // General initializations + connect.TestingOnlyDisableTLS = true + testManager := newManager(t) + roundId := id.Round(5) + mockComms := &mockMessageRetrievalComms{testingSignature: t} + stop := stoppable.NewSingle("singleStoppable") + testNdf := getNDF() + nodeId := id.NewIdFromString(ReturningGateway, id.Node, &testing.T{}) + gwId := nodeId.DeepCopy() + gwId.SetType(id.Gateway) + testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}} + testManager.rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) + + p := gateway.DefaultPoolParams() + p.MaxPoolSize = 1 + var err error + addChan := make(chan network.NodeGateway, 1) + testManager.sender, err = gateway.NewTestingSender(p, testManager.rng, + testNdf, mockComms, testManager.session, addChan, t) + if err != nil { + t.Errorf(err.Error()) + } + + // Create a local channel so reception is possible + // (testManager.messageBundles is sent only via newManager call above) + messageBundleChan := make(chan message.Bundle) + testManager.messageBundles = messageBundleChan + + // Initialize the message retrieval + go testManager.processBatchMessageRetrieval(mockComms, stop) + + // Construct expected values for checking + expectedEphID := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} + payloadMsg := []byte(PayloadMessage) + expectedPayload := make([]byte, 256) + copy(expectedPayload, payloadMsg) + + go func() { + requestGateway := id.NewIdFromString(ReturningGateway, id.Gateway, t) + + // Construct the round lookup + ephIdentity := ephemeral2.EphemeralIdentity{ + EphId: expectedEphID, + Source: requestGateway, + } + + round := rounds.Round{ + ID: roundId, + Topology: connect.NewCircuit([]*id.ID{requestGateway}), + } + + // Send a round look up request + testManager.lookupRoundMessages <- roundLookup{ + Round: round, + Identity: ephIdentity, + } + + }() + + var testBundle message.Bundle + + select { + case testBundle = <-messageBundleChan: + case <-time.After(1500 * time.Millisecond): + t.Errorf("Timed out waiting for messageBundleChan.") + } + + err = stop.Close() + if err != nil { + t.Errorf("Failed to signal close to process: %+v", err) + } + + // Ensure bundle received and has expected values + time.Sleep(2 * time.Second) + if reflect.DeepEqual(testBundle, message.Bundle{}) { + t.Fatal("Did not receive a message bundle over the channel") + } + + if testBundle.Identity.EphId.Int64() != expectedEphID.Int64() { + t.Errorf("Unexpected address ID in bundle."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", expectedEphID, testBundle.Identity.EphId) + } + + if !bytes.Equal(expectedPayload, testBundle.Messages[0].GetPayloadA()) { + t.Errorf("Unexpected address ID in bundle."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", expectedPayload, testBundle.Messages[0].GetPayloadA()) + + } + +} diff --git a/cmix/pickup/utils_test.go b/cmix/pickup/utils_test.go index 0f97bdd7da6d196d80885e55cdd662d45f31fcbe..6a03403f31709df6fd7785c38cfbaec8424eb4cf 100644 --- a/cmix/pickup/utils_test.go +++ b/cmix/pickup/utils_test.go @@ -143,6 +143,25 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host, return nil, nil } +func (mmrc *mockMessageRetrievalComms) RequestBatchMessages(host *connect.Host, req *pb.GetMessagesBatch) (*pb.GetMessagesResponseBatch, error) { + ret := make([]*pb.GetMessagesResponse, len(req.GetRequests())) + for i, mreq := range req.GetRequests() { + targetId, err := id.Unmarshal(mreq.Target) + if err != nil { + + } + h, err := connect.NewHost(targetId, "0.0.0.0", nil, connect.GetDefaultHostParams()) + if err != nil { + + } + ret[i], err = mmrc.RequestMessages(h, mreq) + } + return &pb.GetMessagesResponseBatch{ + Results: ret, + Errors: make([]string, len(ret)), + }, nil +} + func newTestBackoffTable(face interface{}) [cappedTries]time.Duration { switch face.(type) { case *testing.T, *testing.M, *testing.B, *testing.PB: