Skip to content
Snippets Groups Projects
Select Git revision
  • 2dce69c4f1dcaf0dd3d1369f98d37d5bb463b916
  • release default protected
  • 11-22-implement-kv-interface-defined-in-collectiveversionedkvgo
  • hotfix/TestHostPool_UpdateNdf_AddFilter
  • XX-4719/announcementChannels
  • xx-4717/logLevel
  • jonah/noob-channel
  • master protected
  • XX-4707/tagDiskJson
  • xx-4698/notification-retry
  • hotfix/notifylockup
  • syncNodes
  • hotfix/localCB
  • XX-4677/NewChanManagerMobile
  • XX-4689/DmSync
  • duplicatePrefix
  • XX-4601/HavenInvites
  • finalizedUICallbacks
  • XX-4673/AdminKeySync
  • debugNotifID
  • anne/test
  • v4.7.5
  • v4.7.4
  • v4.7.3
  • v4.7.2
  • v4.7.1
  • v4.6.3
  • v4.6.1
  • v4.5.0
  • v4.4.4
  • v4.3.11
  • v4.3.8
  • v4.3.7
  • v4.3.6
  • v4.3.5
  • v4.2.0
  • v4.3.0
  • v4.3.4
  • v4.3.3
  • v4.3.2
  • v4.3.1
41 results

retrieve.go

  • retrieve.go 10.02 KiB
    ////////////////////////////////////////////////////////////////////////////////
    // Copyright © 2022 xx foundation                                             //
    //                                                                            //
    // Use of this source code is governed by a license that can be found in the  //
    // LICENSE file.                                                              //
    ////////////////////////////////////////////////////////////////////////////////
    
    package pickup
    
    import (
    	"encoding/binary"
    	"time"
    
    	"github.com/pkg/errors"
    	jww "github.com/spf13/jwalterweatherman"
    	"gitlab.com/elixxir/client/v4/cmix/gateway"
    	"gitlab.com/elixxir/client/v4/cmix/identity/receptionID"
    	"gitlab.com/elixxir/client/v4/cmix/message"
    	"gitlab.com/elixxir/client/v4/cmix/rounds"
    	"gitlab.com/elixxir/client/v4/stoppable"
    	pb "gitlab.com/elixxir/comms/mixmessages"
    	"gitlab.com/elixxir/crypto/shuffle"
    	"gitlab.com/elixxir/primitives/format"
    	"gitlab.com/xx_network/comms/connect"
    	"gitlab.com/xx_network/primitives/id"
    	"gitlab.com/xx_network/primitives/netTime"
    )
    
    type MessageRetrievalComms interface {
    	GetHost(hostId *id.ID) (*connect.Host, bool)
    	RequestMessages(host *connect.Host, message *pb.GetMessages) (
    		*pb.GetMessagesResponse, error)
    	RequestBatchMessages(host *connect.Host,
    		message *pb.GetMessagesBatch) (*pb.GetMessagesResponseBatch, error)
    }
    
    type roundLookup struct {
    	Round    rounds.Round
    	Identity receptionID.EphemeralIdentity
    }
    
    const noRoundError = "does not have round %d"
    
    // processMessageRetrieval receives a roundLookup request and pings the gateways
    // of that round for messages for the requested Identity in the roundLookup.
    func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms,
    	stop *stoppable.Single) {
    
    	for {
    		select {
    		case <-stop.Quit():
    			stop.ToStopped()
    			return
    		case rl := <-m.lookupRoundMessages:
    			ri := rl.Round
    			jww.DEBUG.Printf("[processMessageRetrieval] Checking for messages in round %d", ri.ID)
    			err := m.unchecked.AddRound(id.Round(ri.ID), ri.Raw,
    				rl.Identity.Source, rl.Identity.EphId)
    			if err != nil {
    				jww.FATAL.Panicf(
    					"Failed to denote Unchecked Round for round %d",
    					id.Round(ri.ID))
    			}
    
    			gwIds := m.getGatewayList(rl)
    			if gwIds == nil {
    				continue
    			}
    			// If ForceMessagePickupRetry, we are forcing processUncheckedRounds
    			// by randomly not picking up messages (FOR INTEGRATION TEST). Only
    			// done if round has not been ignored before.
    			var bundle message.Bundle
    			if m.params.ForceMessagePickupRetry {
    				bundle, err = m.forceMessagePickupRetry(
    					ri, rl, comms, gwIds, stop)
    
    				// Exit if the thread has been stopped
    				if stoppable.CheckErr(err) {
    					jww.ERROR.Print(err)
    					continue
    				}
    				if err != nil {
    					jww.ERROR.Printf("[processMessageRetrieval] Failed to get pickup round %d from all "+
    						"gateways (%v): %s", ri.ID, gwIds, err)
    				}
    			} else {
    				// Attempt to request for this gateway
    				bundle, err = m.getMessagesFromGateway(
    					id.Round(ri.ID), rl.Identity, comms, gwIds, stop)
    
    				// Exit if the thread has been stopped
    				if stoppable.CheckErr(err) {
    					jww.ERROR.Print(err)
    					continue
    				}
    
    				// After trying all gateways, if none returned we mark the round
    				// as a failure and print out the last error
    				if err != nil {
    					jww.ERROR.Printf("[processMessageRetrieval] Failed to get pickup round %d "+
    						"from all gateways (%v): %s", rl.Round.ID, gwIds, err)
    				}
    			}
    
    			m.processBundle(bundle, rl.Identity, rl.Round)
    		}
    	}
    }
    
    // getGatewayList returns a shuffled list of gateways for a roundLookup request.
    func (m *pickup) getGatewayList(rl roundLookup) []*id.ID {
    	ri := rl.Round
    
    	// Convert gateways in round to proper ID format
    	gwIds := make([]*id.ID, ri.Topology.Len())
    	for i := 0; i < ri.Topology.Len(); i++ {
    		gwId := ri.Topology.GetNodeAtIndex(i).DeepCopy()
    		gwId.SetType(id.Gateway)
    		gwIds[i] = gwId
    	}
    
    	if len(gwIds) == 0 {
    		jww.WARN.Printf("Empty gateway ID List")
    		return nil
    	}
    
    	// Target the last nodes in the team first because it has messages
    	// first, randomize other members of the team
    	var rndBytes [32]byte
    	stream := m.rng.GetStream()
    	_, err := stream.Read(rndBytes[:])
    	stream.Close()
    	if err != nil {
    		jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+
    			"from all gateways (%v): %s", ri.ID, gwIds, err)
    	}
    
    	gwIds[0], gwIds[len(gwIds)-1] = gwIds[len(gwIds)-1], gwIds[0]
    	shuffle.ShuffleSwap(rndBytes[:], len(gwIds)-1, func(i, j int) {
    		gwIds[i+1], gwIds[j+1] = gwIds[j+1], gwIds[i+1]
    	})
    
    	return gwIds
    }
    
    // getMessagesFromGateway attempts to pick up messages from their assigned
    // gateway in the round specified. If successful, it returns a message.Bundle.
    func (m *pickup) getMessagesFromGateway(roundID id.Round,
    	identity receptionID.EphemeralIdentity, comms MessageRetrievalComms,
    	gwIds []*id.ID, stop *stoppable.Single) (message.Bundle, error) {
    	start := netTime.Now()
    	// Send to the gateways using backup proxies
    	result, err := m.sender.SendToPreferred(gwIds,
    		func(host *connect.Host, target *id.ID, _ time.Duration) (interface{}, error) {
    			jww.DEBUG.Printf("Trying to get messages for round %d for "+
    				"ephemeralID %d (%s) via Gateway: %s", roundID,
    				identity.EphId.Int64(), identity.Source, host.GetId())
    
    			// send the request
    			msgReq := &pb.GetMessages{
    				ClientID: identity.EphId[:],
    				RoundID:  uint64(roundID),
    				Target:   target.Marshal(),
    			}
    
    			// If the gateway doesn't have the round, return an error
    			msgResp, err := comms.RequestMessages(host, msgReq)
    
    			if err != nil {
    				// You need to default to a retryable errors because otherwise
    				// we cannot enumerate all errors
    				return nil, errors.WithMessage(err, gateway.RetryableError)
    			}
    
    			if !msgResp.GetHasRound() {
    				errRtn := errors.Errorf(noRoundError, roundID)
    				return message.Bundle{},
    					errors.WithMessage(errRtn, gateway.RetryableError)
    			}
    
    			return msgResp, nil
    		}, stop, m.params.SendTimeout)
    
    	// Fail the round if an error occurs so that it can be tried again later
    	if err != nil {
    		return message.Bundle{}, errors.WithMessagef(
    			err, "Failed to request messages for round %d", roundID)
    	}
    	msgResp := result.(*pb.GetMessagesResponse)
    
    	bundle, err := m.buildMessageBundle(msgResp, identity, roundID)
    	if err != nil {
    		return message.Bundle{}, errors.WithMessagef(err, "Failed to process pickup response for round %d", roundID)
    	}
    
    	jww.INFO.Printf("Received %d messages in Round %d for %d (%s) in %s",
    		len(bundle.Messages), roundID, identity.EphId.Int64(), identity.Source,
    		netTime.Now().Sub(start))
    
    	return bundle, nil
    }
    
    // processBundle accepts a message.Bundle, EphemeralIdentity and round ID.
    // If the bundle contains any messages, it iterates through them, sending
    // them to the bundle channel for handling, and removing the associated
    // rounds from m.unchecked.
    func (m *pickup) processBundle(bundle message.Bundle, rid receptionID.EphemeralIdentity, ri rounds.Round) {
    	jww.TRACE.Printf("messages: %v\n", bundle.Messages)
    
    	if len(bundle.Messages) != 0 {
    		// If successful and there are messages, we send them to another
    		// thread
    		bundle.Identity = receptionID.EphemeralIdentity{
    			EphId:  rid.EphId,
    			Source: rid.Source,
    		}
    		bundle.RoundInfo = ri
    		m.messageBundles <- bundle
    
    		jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID)
    		err := m.unchecked.Remove(
    			id.Round(ri.ID), rid.Source, rid.EphId)
    		if err != nil {
    			jww.ERROR.Printf("Could not remove round %d from "+
    				"unchecked rounds store: %v", ri.ID, err)
    		}
    	}
    }
    
    // buildMessageBundle builds a message.Bundle from a passed in
    // pb.GetMessagesResponse, EphemeralIdentity and round ID.
    func (m *pickup) buildMessageBundle(msgResp *pb.GetMessagesResponse, identity receptionID.EphemeralIdentity, roundID id.Round) (message.Bundle, error) {
    	// If there are no messages, print a warning. Due to the probabilistic
    	// nature of the bloom filters, false positives will happen sometimes
    	msgs := msgResp.GetMessages()
    	if len(msgs) == 0 {
    		jww.WARN.Printf("no messages for client %s "+
    			" in round %d. This happening every once in a while is normal,"+
    			" but can be indicative of a problem if it is consistent",
    			identity.Source, roundID)
    
    		err := m.unchecked.EndCheck(roundID, identity.Source, identity.EphId)
    		if err != nil {
    			jww.ERROR.Printf("Failed to end the check for the round round %d: %+v", roundID, err)
    		}
    
    		return message.Bundle{}, nil
    	}
    
    	// Build the bundle of messages to send to the message processor
    	bundle := message.Bundle{
    		Round:    roundID,
    		Messages: make([]format.Message, len(msgs)),
    		Finish:   func() {},
    	}
    
    	mSize := m.session.GetCmixGroup().GetP().ByteLen()
    	for i, slot := range msgs {
    		msg := format.NewMessage(mSize)
    		msg.SetPayloadA(slot.PayloadA)
    		msg.SetPayloadB(slot.PayloadB)
    		jww.INFO.Printf("Received message of msgDigest: %s, round %d",
    			msg.Digest(), roundID)
    		bundle.Messages[i] = msg
    	}
    	return bundle, nil
    }
    
    // forceMessagePickupRetry is a helper function which forces
    // processUncheckedRounds by randomly not looking up messages.
    func (m *pickup) forceMessagePickupRetry(ri rounds.Round, rl roundLookup,
    	comms MessageRetrievalComms, gwIds []*id.ID,
    	stop *stoppable.Single) (bundle message.Bundle, err error) {
    	if m.shouldForceMessagePickupRetry(rl) {
    		// Do not call get message, leaving the round to be picked up in
    		// unchecked round scheduler process
    		return
    	}
    
    	// Attempt to request for this gateway
    	return m.getMessagesFromGateway(
    		ri.ID, rl.Identity, comms, gwIds, stop)
    }
    
    // shouldForceMessagePickupRetry randomly determines if a roundLookup
    // should be skipped to force a retry.
    func (m *pickup) shouldForceMessagePickupRetry(rl roundLookup) bool {
    	rnd, _ := m.unchecked.GetRound(
    		rl.Round.ID, rl.Identity.Source, rl.Identity.EphId)
    	var err error
    	if rnd.NumChecks == 0 {
    		// Flip a coin to determine whether to pick up message
    		b := make([]byte, 8)
    		stream := m.rng.GetStream()
    		_, err = stream.Read(b)
    		if err != nil {
    			jww.FATAL.Panic(err)
    		}
    		stream.Close()
    
    		result := binary.BigEndian.Uint64(b)
    		if result%2 == 0 {
    			jww.INFO.Printf("Forcing a message pickup retry for round %d", rl.Round.ID)
    			return true
    		}
    	}
    	return false
    }