Skip to content
Snippets Groups Projects
Select Git revision
  • 33d3c9124bc2d9129551f6f03d2db75e2ce6f930
  • release default protected
  • 11-22-implement-kv-interface-defined-in-collectiveversionedkvgo
  • hotfix/TestHostPool_UpdateNdf_AddFilter
  • XX-4719/announcementChannels
  • xx-4717/logLevel
  • jonah/noob-channel
  • master protected
  • XX-4707/tagDiskJson
  • xx-4698/notification-retry
  • hotfix/notifylockup
  • syncNodes
  • hotfix/localCB
  • XX-4677/NewChanManagerMobile
  • XX-4689/DmSync
  • duplicatePrefix
  • XX-4601/HavenInvites
  • finalizedUICallbacks
  • XX-4673/AdminKeySync
  • debugNotifID
  • anne/test
  • v4.7.5
  • v4.7.4
  • v4.7.3
  • v4.7.2
  • v4.7.1
  • v4.6.3
  • v4.6.1
  • v4.5.0
  • v4.4.4
  • v4.3.11
  • v4.3.8
  • v4.3.7
  • v4.3.6
  • v4.3.5
  • v4.2.0
  • v4.3.0
  • v4.3.4
  • v4.3.3
  • v4.3.2
  • v4.3.1
41 results

messageReceive.go

Blame
  • single.go 9.50 KiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    
    // Package cmd initializes the CLI and config parsers as well as the logger.
    package cmd
    
    import (
    	"bytes"
    	"fmt"
    	"time"
    
    	"github.com/spf13/cobra"
    	jww "github.com/spf13/jwalterweatherman"
    	"github.com/spf13/viper"
    	"gitlab.com/elixxir/client/cmix"
    	"gitlab.com/elixxir/client/cmix/identity/receptionID"
    	"gitlab.com/elixxir/client/cmix/rounds"
    	"gitlab.com/elixxir/client/single"
    	"gitlab.com/elixxir/client/xxdk"
    	"gitlab.com/elixxir/crypto/contact"
    	"gitlab.com/xx_network/primitives/utils"
    )
    
    // singleCmd is the single-use subcommand that allows for sending and responding
    // to single-use messages.
    var singleCmd = &cobra.Command{
    	Use:   "single",
    	Short: "Send and respond to single-use messages.",
    	Args:  cobra.NoArgs,
    	Run: func(cmd *cobra.Command, args []string) {
    
    		client := initClient()
    
    		// Write user contact to file
    		user := client.GetUser()
    		jww.INFO.Printf("User: %s", user.ReceptionID)
    		jww.INFO.Printf("User Transmission: %s", user.TransmissionID)
    		writeContact(user.GetContact())
    
    		err := client.StartNetworkFollower(5 * time.Second)
    		if err != nil {
    			jww.FATAL.Panicf("%+v", err)
    		}
    
    		// Wait until connected or crash on timeout
    		connected := make(chan bool, 10)
    		client.GetCmix().AddHealthCallback(
    			func(isconnected bool) {
    				connected <- isconnected
    			})
    		waitUntilConnected(connected)
    
    		// get the tag
    		tag := viper.GetString("tag")
    
    		// Register the callback
    		receiver := &Receiver{
    			recvCh: make(chan struct {
    				request *single.Request
    				ephID   receptionID.EphemeralIdentity
    				round   []rounds.Round
    			}),
    		}
    
    		myID := client.GetUser().ReceptionID
    		listener := single.Listen(tag, myID,
    			client.GetUser().E2eDhPrivateKey,
    			client.GetCmix(),
    			client.GetStorage().GetE2EGroup(),
    			receiver)
    
    		for numReg, total := 1, 100; numReg < (total*3)/4; {
    			time.Sleep(1 * time.Second)
    			numReg, total, err = client.GetNodeRegistrationStatus()
    			if err != nil {
    				jww.FATAL.Panicf("%+v", err)
    			}
    			jww.INFO.Printf("Registering with nodes (%d/%d)...",
    				numReg, total)
    		}
    
    		timeout := viper.GetDuration("timeout")
    
    		// If the send flag is set, then send a message
    		if viper.GetBool("send") {
    			// get message details
    			payload := []byte(viper.GetString("message"))
    			partner := readSingleUseContact("contact")
    			maxMessages := uint8(viper.GetUint("maxMessages"))
    
    			sendSingleUse(client.Cmix, partner, payload,
    				maxMessages, timeout, tag)
    		}
    
    		// If the reply flag is set, then start waiting for a
    		// message and reply when it is received
    		if viper.GetBool("reply") {
    			replySingleUse(timeout, receiver)
    		}
    		listener.Stop()
    	},
    }
    
    func init() {
    	// Single-use subcommand options
    
    	singleCmd.Flags().Bool("send", false, "Sends a single-use message.")
    	_ = viper.BindPFlag("send", singleCmd.Flags().Lookup("send"))
    
    	singleCmd.Flags().Bool("reply", false,
    		"Listens for a single-use message and sends a reply.")
    	_ = viper.BindPFlag("reply", singleCmd.Flags().Lookup("reply"))
    
    	singleCmd.Flags().StringP("contact", "c", "",
    		"Path to contact file to send message to.")
    	_ = viper.BindPFlag("contact", singleCmd.Flags().Lookup("contact"))
    
    	singleCmd.Flags().StringP("tag", "", "testTag",
    		"The tag that specifies the callback to trigger on reception.")
    	_ = viper.BindPFlag("tag", singleCmd.Flags().Lookup("tag"))
    
    	singleCmd.Flags().Uint8("maxMessages", 1,
    		"The max number of single-use response messages.")
    	_ = viper.BindPFlag("maxMessages", singleCmd.Flags().Lookup("maxMessages"))
    
    	singleCmd.Flags().DurationP("timeout", "t", 30*time.Second,
    		"Duration before stopping to wait for single-use message.")
    	_ = viper.BindPFlag("timeout", singleCmd.Flags().Lookup("timeout"))
    
    	rootCmd.AddCommand(singleCmd)
    }
    
    type Response struct {
    	callbackChan chan struct {
    		payload []byte
    		err     error
    	}
    }
    
    func (r *Response) Callback(payload []byte, receptionID receptionID.EphemeralIdentity,
    	round []rounds.Round, err error) {
    	jww.DEBUG.Printf("Payload: %v, receptionID: %v, round: %v, err: %v",
    		payload, receptionID, round, err)
    	r.callbackChan <- struct {
    		payload []byte
    		err     error
    	}{payload: payload, err: err}
    }
    
    // sendSingleUse sends a single use message.
    func sendSingleUse(m *xxdk.Cmix, partner contact.Contact, payload []byte,
    	maxMessages uint8, timeout time.Duration, tag string) {
    	// Construct callback
    	callback := &Response{
    		callbackChan: make(chan struct {
    			payload []byte
    			err     error
    		}),
    	}
    
    	jww.INFO.Printf("Sending single-use message to contact: %+v", partner)
    	jww.INFO.Printf("Payload: \"%s\"", payload)
    	jww.INFO.Printf("Max number of replies: %d", maxMessages)
    	jww.INFO.Printf("Timeout: %s", timeout)
    
    	// Send single-use message
    	fmt.Printf("Sending single-use transmission message: %s\n", payload)
    	jww.DEBUG.Printf("Sending single-use transmission to %s: %s",
    		partner.ID, payload)
    	params := single.GetDefaultRequestParams()
    	params.MaxResponseMessages = maxMessages
    	rng := m.GetRng().GetStream()
    	defer rng.Close()
    
    	e2eGrp := m.GetStorage().GetE2EGroup()
    	rnd, ephID, err := single.TransmitRequest(partner, tag, payload, callback, params,
    		m.GetCmix(), rng, e2eGrp)
    	if err != nil {
    		jww.FATAL.Panicf("Failed to transmit single-use message: %+v", err)
    	}
    
    	jww.INFO.Printf("Single Use request sent on round %v with id %v", rnd,
    		ephID)
    
    	// Wait for callback to be called
    	fmt.Println("Waiting for response.")
    	results := <-callback.callbackChan
    	if results.payload != nil {
    		fmt.Printf("Message received: %s\n", results.payload)
    		jww.DEBUG.Printf("Received single-use reply payload: %s", results.payload)
    	} else {
    		jww.ERROR.Print("Failed to receive single-use reply payload.")
    	}
    
    	if results.err != nil {
    		jww.FATAL.Panicf("Received error when waiting for reply: %+v", results.err)
    	}
    }
    
    // replySingleUse responds to any single-use message it receives by replying\
    // with the same payload.
    func replySingleUse(timeout time.Duration, receiver *Receiver) {
    	// Wait to receive a message or stop after timeout occurs
    	fmt.Println("Waiting for single-use message.")
    	timer := time.NewTimer(timeout)
    	select {
    	case results := <-receiver.recvCh:
    		payload := results.request.GetPayload()
    		if payload != nil {
    			fmt.Printf("Single-use transmission received: %s\n", payload)
    			jww.DEBUG.Printf("Received single-use transmission from %s: %s",
    				results.request.GetPartner(), payload)
    		} else {
    			jww.ERROR.Print("Failed to receive single-use payload.")
    		}
    
    		// Create new payload from repeated received payloads so that each
    		// message part contains the same payload
    		resPayload := makeResponsePayload(payload, results.request.GetMaxParts(),
    			results.request.GetMaxResponsePartSize())
    
    		fmt.Printf("Sending single-use response message: %s\n", payload)
    		jww.DEBUG.Printf("Sending single-use response to %s: %s",
    			results.request.GetPartner(), payload)
    		roundId, err := results.request.Respond(resPayload, cmix.GetDefaultCMIXParams(),
    			30*time.Second)
    		if err != nil {
    			jww.FATAL.Panicf("Failed to send response: %+v", err)
    		}
    
    		jww.INFO.Printf("response sent on roundID: %v", roundId)
    
    	case <-timer.C:
    		fmt.Println("Timed out!")
    		jww.FATAL.Panicf("Failed to receive transmission after %s.", timeout)
    	}
    }
    
    type Receiver struct {
    	recvCh chan struct {
    		request *single.Request
    		ephID   receptionID.EphemeralIdentity
    		round   []rounds.Round
    	}
    }
    
    func (r *Receiver) Callback(req *single.Request, ephID receptionID.EphemeralIdentity,
    	round []rounds.Round) {
    	r.recvCh <- struct {
    		request *single.Request
    		ephID   receptionID.EphemeralIdentity
    		round   []rounds.Round
    	}{
    		request: req,
    		ephID:   ephID,
    		round:   round,
    	}
    
    }
    
    // makeResponsePayload generates a new payload that will span the max number of
    // message parts in the contact. Each resulting message payload will contain a
    // copy of the supplied payload with spaces taking up any remaining data.
    func makeResponsePayload(payload []byte, maxParts uint8, maxSizePerPart int) []byte {
    	payloads := make([][]byte, maxParts)
    	payloadPart := makeResponsePayloadPart(payload, maxSizePerPart)
    	for i := range payloads {
    		payloads[i] = make([]byte, maxSizePerPart)
    		copy(payloads[i], payloadPart)
    	}
    	return bytes.Join(payloads, []byte{})
    }
    
    // makeResponsePayloadPart creates a single response payload by coping the given
    // payload and filling the rest with spaces.
    func makeResponsePayloadPart(payload []byte, maxSize int) []byte {
    	payloadPart := make([]byte, maxSize)
    	for i := range payloadPart {
    		payloadPart[i] = ' '
    	}
    	copy(payloadPart, payload)
    
    	return payloadPart
    }
    
    // readSingleUseContact opens the contact specified in the CLI flags. Panics if
    // no file provided or if an error occurs while reading or unmarshalling it.
    func readSingleUseContact(key string) contact.Contact {
    	// get path
    	filePath := viper.GetString(key)
    	if filePath == "" {
    		jww.FATAL.Panicf("Failed to read contact file: no file path provided.")
    	}
    
    	// Read from file
    	data, err := utils.ReadFile(filePath)
    	jww.INFO.Printf("Contact file size read in: %d bytes", len(data))
    	if err != nil {
    		jww.FATAL.Panicf("Failed to read contact file: %+v", err)
    	}
    
    	// Unmarshal contact
    	c, err := contact.Unmarshal(data)
    	if err != nil {
    		jww.FATAL.Panicf("Failed to unmarshal contact: %+v", err)
    	}
    
    	return c
    }