Skip to content
Snippets Groups Projects
Select Git revision
  • 41f485a546702500c8710a87f2906b961bbfb58e
  • release default protected
  • 11-22-implement-kv-interface-defined-in-collectiveversionedkvgo
  • hotfix/TestHostPool_UpdateNdf_AddFilter
  • XX-4719/announcementChannels
  • xx-4717/logLevel
  • jonah/noob-channel
  • master protected
  • XX-4707/tagDiskJson
  • xx-4698/notification-retry
  • hotfix/notifylockup
  • syncNodes
  • hotfix/localCB
  • XX-4677/NewChanManagerMobile
  • XX-4689/DmSync
  • duplicatePrefix
  • XX-4601/HavenInvites
  • finalizedUICallbacks
  • XX-4673/AdminKeySync
  • debugNotifID
  • anne/test
  • v4.7.5
  • v4.7.4
  • v4.7.3
  • v4.7.2
  • v4.7.1
  • v4.6.3
  • v4.6.1
  • v4.5.0
  • v4.4.4
  • v4.3.11
  • v4.3.8
  • v4.3.7
  • v4.3.6
  • v4.3.5
  • v4.2.0
  • v4.3.0
  • v4.3.4
  • v4.3.3
  • v4.3.2
  • v4.3.1
41 results

status.go

Blame
  • sendCmix.go 10.34 KiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    
    package cmix
    
    import (
    	"fmt"
    	"github.com/pkg/errors"
    	jww "github.com/spf13/jwalterweatherman"
    	"gitlab.com/elixxir/client/cmix/gateway"
    	"gitlab.com/elixxir/client/cmix/message"
    	"gitlab.com/elixxir/client/cmix/nodes"
    	"gitlab.com/elixxir/client/event"
    	"gitlab.com/elixxir/client/stoppable"
    	pb "gitlab.com/elixxir/comms/mixmessages"
    	"gitlab.com/elixxir/comms/network"
    	"gitlab.com/elixxir/crypto/cmix"
    	"gitlab.com/elixxir/crypto/cyclic"
    	"gitlab.com/elixxir/crypto/fastRNG"
    	"gitlab.com/elixxir/primitives/excludedRounds"
    	"gitlab.com/elixxir/primitives/format"
    	"gitlab.com/xx_network/comms/connect"
    	"gitlab.com/xx_network/primitives/id"
    	"gitlab.com/xx_network/primitives/id/ephemeral"
    	"gitlab.com/xx_network/primitives/netTime"
    	"gitlab.com/xx_network/primitives/rateLimiting"
    	"strings"
    	"time"
    )
    
    // Send sends a "raw" cMix message payload to the provided recipient.
    // Returns the round ID of the round the payload was sent or an error if it
    // fails.
    // This does not have end-to-end encryption on it and is used exclusively as
    // a send for higher order cryptographic protocols. Do not use unless
    // implementing a protocol on top.
    //   recipient - cMix ID of the recipient.
    //   fingerprint - Key Fingerprint. 256-bit field to store a 255-bit
    //      fingerprint, highest order bit must be 0 (panic otherwise). If your
    //      system does not use key fingerprints, this must be random bits.
    //   service - Reception Service. The backup way for a client to identify
    //      messages on receipt via trial hashing and to identify notifications.
    //      If unused, use message.GetRandomService to fill the field with
    //      random data.
    //   payload - Contents of the message. Cannot exceed the payload size for a
    //      cMix message (panic otherwise).
    //   mac - 256-bit field to store a 255-bit mac, highest order bit must be 0
    //      (panic otherwise). If used, fill with random bits.
    // Will return an error if the network is unhealthy or if it fails to send
    // (along with the reason). Blocks until successful sends or errors.
    // WARNING: Do not roll your own crypto.
    func (c *client) Send(recipient *id.ID, fingerprint format.Fingerprint,
    	service message.Service, payload, mac []byte, cmixParams CMIXParams) (
    	id.Round, ephemeral.Id, error) {
    	if !c.Monitor.IsHealthy() {
    		return 0, ephemeral.Id{}, errors.New(
    			"Cannot send cmix message when the network is not healthy")
    	}
    
    	// Build message. Will panic if inputs are not correct.
    	msg := format.NewMessage(c.session.GetCmixGroup().GetP().ByteLen())
    	msg.SetKeyFP(fingerprint)
    	msg.SetContents(payload)
    	msg.SetMac(mac)
    	msg.SetSIH(service.Hash(msg.GetContents()))
    
    	if cmixParams.Critical {
    		c.crit.AddProcessing(msg, recipient, cmixParams)
    	}
    
    	rid, ephID, rtnErr := sendCmixHelper(c.Sender, msg, recipient, cmixParams,
    		c.instance, c.session.GetCmixGroup(), c.Registrar, c.rng, c.events,
    		c.session.GetTransmissionID(), c.comms)
    
    	if cmixParams.Critical {
    		c.crit.handle(msg, recipient, rid, rtnErr)
    	}
    
    	return rid, ephID, rtnErr
    }
    
    // sendCmixHelper is a helper function for client.SendCMIX.
    // NOTE: Payloads sent are not end-to-end encrypted; metadata is NOT protected
    // with this call. See SendE2E for end-to-end encryption and full privacy
    // protection.
    // Internal SendCmix, which bypasses the network check, will attempt to send to
    // the network without checking state. It has a built-in retry system which can
    // be configured through the params object.
    // If the message is successfully sent, the ID of the round sent it is returned,
    // which can be registered with the network instance to get a callback on its
    // status.
    func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID,
    	cmixParams CMIXParams, instance *network.Instance, grp *cyclic.Group,
    	nodes nodes.Registrar, rng *fastRNG.StreamGenerator, events event.Manager,
    	senderId *id.ID, comms SendCmixCommsInterface) (id.Round, ephemeral.Id, error) {
    
    	timeStart := netTime.Now()
    	maxTimeout := sender.GetHostParams().SendTimeout
    
    	var attempted excludedRounds.ExcludedRounds
    	if cmixParams.ExcludedRounds != nil {
    		attempted = cmixParams.ExcludedRounds
    	} else {
    		attempted = excludedRounds.NewSet()
    	}
    
    	jww.INFO.Printf("[Send-%s] Looking for round to send cMix message to "+
    		"%s (msgDigest: %s)", cmixParams.DebugTag, recipient, msg.Digest())
    
    	stream := rng.GetStream()
    	defer stream.Close()
    
    	// Flip leading bits randomly to thwart a tagging attack.
    	// See cmix.SetGroupBits for more info.
    	cmix.SetGroupBits(msg, grp, stream)
    
    	for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries++ {
    		elapsed := netTime.Since(timeStart)
    		jww.TRACE.Printf("[Send-%s] try %d, elapsed: %s",
    			cmixParams.DebugTag, numRoundTries, elapsed)
    
    		if elapsed > cmixParams.Timeout {
    			jww.INFO.Printf("[Send-%s] No rounds to send to %s "+
    				"(msgDigest: %s) were found before timeout %s",
    				cmixParams.DebugTag, recipient, msg.Digest(), cmixParams.Timeout)
    			return 0, ephemeral.Id{}, errors.New("Sending cmix message timed out")
    		}
    
    		if numRoundTries > 0 {
    			jww.INFO.Printf("[Send-%s] Attempt %d to find round to send "+
    				"message to %s (msgDigest: %s)", cmixParams.DebugTag,
    				numRoundTries+1, recipient, msg.Digest())
    		}
    
    		// Find the best round to send to, excluding attempted rounds
    		remainingTime := cmixParams.Timeout - elapsed
    		bestRound, err := instance.GetWaitingRounds().GetUpcomingRealtime(
    			remainingTime, attempted, sendTimeBuffer)
    		if err != nil {
    			jww.WARN.Printf("[Send-%s] Failed to GetUpcomingRealtime "+
    				"(msgDigest: %s): %+v", cmixParams.DebugTag, msg.Digest(), err)
    		}
    
    		if bestRound == nil {
    			jww.WARN.Printf(
    				"[Send-%s] Best round on send is nil", cmixParams.DebugTag)
    			continue
    		}
    
    		jww.TRACE.Printf("[Send-%s] Best round found: %+v",
    			cmixParams.DebugTag, bestRound)
    
    		// Determine whether the selected round contains any nodes that are
    		// blacklisted by the CMIXParams object
    		containsBlacklisted := false
    		if cmixParams.BlacklistedNodes != nil {
    			for _, nodeId := range bestRound.Topology {
    				var nid id.ID
    				copy(nid[:], nodeId)
    				if _, isBlacklisted := cmixParams.BlacklistedNodes[nid]; isBlacklisted {
    					containsBlacklisted = true
    					break
    				}
    			}
    		}
    
    		if containsBlacklisted {
    			jww.WARN.Printf("[Send-%s] Round %d contains blacklisted "+
    				"nodes, skipping...", cmixParams.DebugTag, bestRound.ID)
    			continue
    		}
    
    		// Retrieve host and key information from round
    		firstGateway, roundKeys, err := processRound(
    			nodes, bestRound, recipient.String(), msg.Digest())
    		if err != nil {
    			jww.WARN.Printf("[Send-%s] SendCmix failed to process round "+
    				"(will retry): %v", cmixParams.DebugTag, err)
    			continue
    		}
    
    		jww.TRACE.Printf("[Send-%s] Round %v processed, firstGW: %s",
    			cmixParams.DebugTag, bestRound, firstGateway)
    
    		// Build the messages to send
    		wrappedMsg, encMsg, ephID, err := buildSlotMessage(msg, recipient,
    			firstGateway, stream, senderId, bestRound, roundKeys)
    		if err != nil {
    			return 0, ephemeral.Id{}, err
    		}
    
    		jww.INFO.Printf("[Send-%s] Sending to EphID %d (%s), on round %d "+
    			"(msgDigest: %s, ecrMsgDigest: %s) via gateway %s",
    			cmixParams.DebugTag, ephID.Int64(), recipient, bestRound.ID,
    			msg.Digest(), encMsg.Digest(), firstGateway.String())
    
    		// Send the payload
    		sendFunc := func(host *connect.Host, target *id.ID,
    			timeout time.Duration) (interface{}, error) {
    			wrappedMsg.Target = target.Marshal()
    
    			jww.TRACE.Printf(
    				"[Send-%s] sendFunc %s", cmixParams.DebugTag, host)
    
    			// Use the smaller of the two timeout durations
    			timeout = calculateSendTimeout(bestRound, maxTimeout)
    			calculatedTimeout := calculateSendTimeout(bestRound, maxTimeout)
    			if calculatedTimeout < timeout {
    				timeout = calculatedTimeout
    			}
    
    			// Send the message
    			result, err := comms.SendPutMessage(host, wrappedMsg, timeout)
    			jww.TRACE.Printf("[Send-%s] sendFunc %s put message",
    				cmixParams.DebugTag, host)
    
    			if err != nil {
    				err := handlePutMessageError(
    					firstGateway, nodes, recipient.String(), bestRound, err)
    				jww.TRACE.Printf("[Send-%s] sendFunc %s error: %+v",
    					cmixParams.DebugTag, host, err)
    				return result, errors.WithMessagef(
    					err, "SendCmix %s", unrecoverableError)
    			}
    
    			return result, err
    		}
    
    		jww.TRACE.Printf("[Send-%s] sendToPreferred %s",
    			cmixParams.DebugTag, firstGateway)
    
    		result, err := sender.SendToPreferred([]*id.ID{firstGateway}, sendFunc,
    			cmixParams.Stop, cmixParams.SendTimeout)
    		jww.DEBUG.Printf("[Send-%s] sendToPreferred %s returned",
    			cmixParams.DebugTag, firstGateway)
    
    		// Exit if the thread has been stopped
    		if stoppable.CheckErr(err) {
    			return 0, ephemeral.Id{}, err
    		}
    
    		// If the comm errors or the message fails to send, continue retrying
    		if err != nil {
    			if strings.Contains(err.Error(), rateLimiting.ClientRateLimitErr) {
    				jww.ERROR.Printf("[Send-%s] SendCmix failed to send to "+
    					"EphID %d (%s) on round %d: %+v", cmixParams.DebugTag,
    					ephID.Int64(), recipient, bestRound.ID, err)
    				return 0, ephemeral.Id{}, err
    			}
    
    			jww.ERROR.Printf("[Send-%s] SendCmix failed to send to "+
    				"EphID %d (%s) on round %d, trying a new round: %+v",
    				cmixParams.DebugTag, ephID.Int64(), recipient, bestRound.ID, err)
    			continue
    		}
    
    		// Return if it sends properly
    		gwSlotResp := result.(*pb.GatewaySlotResponse)
    		if gwSlotResp.Accepted {
    			m := fmt.Sprintf("[Send-%s] Successfully sent to EphID %v "+
    				"(source: %s) in round %d (msgDigest: %s), elapsed: %s "+
    				"numRoundTries: %d", cmixParams.DebugTag, ephID.Int64(),
    				recipient, bestRound.ID, msg.Digest(), elapsed, numRoundTries)
    
    			jww.INFO.Print(m)
    			events.Report(1, "MessageSend", "Metric", m)
    
    			return id.Round(bestRound.ID), ephID, nil
    		} else {
    			jww.FATAL.Panicf("[Send-%s] Gateway %s returned no error, "+
    				"but failed to accept message when sending to EphID %d (%s) "+
    				"on round %d", cmixParams.DebugTag, firstGateway, ephID.Int64(),
    				recipient, bestRound.ID)
    		}
    
    	}
    	return 0, ephemeral.Id{},
    		errors.New("failed to send the message, unknown error")
    }