Skip to content
Snippets Groups Projects
Select Git revision
  • 38896a526f6835c6390dccbad880609b996494f4
  • release default protected
  • 11-22-implement-kv-interface-defined-in-collectiveversionedkvgo
  • hotfix/TestHostPool_UpdateNdf_AddFilter
  • XX-4719/announcementChannels
  • xx-4717/logLevel
  • jonah/noob-channel
  • master protected
  • XX-4707/tagDiskJson
  • xx-4698/notification-retry
  • hotfix/notifylockup
  • syncNodes
  • hotfix/localCB
  • XX-4677/NewChanManagerMobile
  • XX-4689/DmSync
  • duplicatePrefix
  • XX-4601/HavenInvites
  • finalizedUICallbacks
  • XX-4673/AdminKeySync
  • debugNotifID
  • anne/test
  • v4.7.5
  • v4.7.4
  • v4.7.3
  • v4.7.2
  • v4.7.1
  • v4.6.3
  • v4.6.1
  • v4.5.0
  • v4.4.4
  • v4.3.11
  • v4.3.8
  • v4.3.7
  • v4.3.6
  • v4.3.5
  • v4.2.0
  • v4.3.0
  • v4.3.4
  • v4.3.3
  • v4.3.2
  • v4.3.1
41 results

single.go

Blame
  • single.go 9.32 KiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    
    // Package cmd initializes the CLI and config parsers as well as the logger.
    package cmd
    
    import (
    	"bytes"
    	"fmt"
    	"github.com/spf13/cobra"
    	jww "github.com/spf13/jwalterweatherman"
    	"github.com/spf13/viper"
    	"gitlab.com/elixxir/client/interfaces/message"
    	"gitlab.com/elixxir/client/single"
    	"gitlab.com/elixxir/client/switchboard"
    	"gitlab.com/elixxir/crypto/contact"
    	"gitlab.com/xx_network/primitives/utils"
    	"time"
    )
    
    // singleCmd is the single-use subcommand that allows for sending and responding
    // to single-use messages.
    var singleCmd = &cobra.Command{
    	Use:   "single",
    	Short: "Send and respond to single-use messages.",
    	Args:  cobra.NoArgs,
    	Run: func(cmd *cobra.Command, args []string) {
    
    		client := initClient()
    
    		// Write user contact to file
    		user := client.GetUser()
    		jww.INFO.Printf("User: %s", user.ReceptionID)
    		jww.INFO.Printf("User Transmission: %s", user.TransmissionID)
    		writeContact(user.GetContact())
    
    		// Set up reception handler
    		swBoard := client.GetSwitchboard()
    		recvCh := make(chan message.Receive, 10000)
    		listenerID := swBoard.RegisterChannel("DefaultCLIReceiver",
    			switchboard.AnyUser(), message.Text, recvCh)
    		jww.INFO.Printf("Message ListenerID: %v", listenerID)
    
    		// Set up auth request handler, which simply prints the user ID of the
    		// requester
    		authMgr := client.GetAuthRegistrar()
    		authMgr.AddGeneralRequestCallback(printChanRequest)
    
    		// If unsafe channels, then add auto-acceptor
    		if viper.GetBool("unsafe-channel-creation") {
    			authMgr.AddGeneralRequestCallback(func(
    				requester contact.Contact, message string) {
    				jww.INFO.Printf("Got request: %s", requester.ID)
    				_, err := client.ConfirmAuthenticatedChannel(requester)
    				if err != nil {
    					jww.FATAL.Panicf("%+v", err)
    				}
    			})
    		}
    
    		err := client.StartNetworkFollower(5 * time.Second)
    		if err != nil {
    			jww.FATAL.Panicf("%+v", err)
    		}
    
    		// Wait until connected or crash on timeout
    		connected := make(chan bool, 10)
    		client.GetHealth().AddChannel(connected)
    		waitUntilConnected(connected)
    
    		// Make single-use manager and start receiving process
    		singleMng := single.NewManager(client)
    
    		// Get the tag
    		tag := viper.GetString("tag")
    
    		// Register the callback
    		callbackChan := make(chan responseCallbackChan)
    		callback := func(payload []byte, c single.Contact) {
    			callbackChan <- responseCallbackChan{payload, c}
    		}
    		singleMng.RegisterCallback(tag, callback)
    		err = client.AddService(singleMng.StartProcesses)
    		if err != nil {
    			jww.FATAL.Panicf("Could not add single use process: %+v", err)
    		}
    
    		for numReg, total := 1, 100; numReg < total; {
    			time.Sleep(1 * time.Second)
    			numReg, total, err = client.GetNodeRegistrationStatus()
    			if err != nil {
    				jww.FATAL.Panicf("%+v", err)
    			}
    			jww.INFO.Printf("Registering with nodes (%d/%d)...",
    				numReg, total)
    		}
    
    		timeout := viper.GetDuration("timeout")
    
    		// If the send flag is set, then send a message
    		if viper.GetBool("send") {
    			// Get message details
    			payload := []byte(viper.GetString("message"))
    			partner := readSingleUseContact("contact")
    			maxMessages := uint8(viper.GetUint("maxMessages"))
    
    			sendSingleUse(singleMng, partner, payload, maxMessages, timeout, tag)
    		}
    
    		// If the reply flag is set, then start waiting for a message and reply
    		// when it is received
    		if viper.GetBool("reply") {
    			replySingleUse(singleMng, timeout, callbackChan)
    		}
    	},
    }
    
    func init() {
    	// Single-use subcommand options
    
    	singleCmd.Flags().Bool("send", false, "Sends a single-use message.")
    	_ = viper.BindPFlag("send", singleCmd.Flags().Lookup("send"))
    
    	singleCmd.Flags().Bool("reply", false,
    		"Listens for a single-use message and sends a reply.")
    	_ = viper.BindPFlag("reply", singleCmd.Flags().Lookup("reply"))
    
    	singleCmd.Flags().StringP("contact", "c", "",
    		"Path to contact file to send message to.")
    	_ = viper.BindPFlag("contact", singleCmd.Flags().Lookup("contact"))
    
    	singleCmd.Flags().StringP("tag", "", "testTag",
    		"The tag that specifies the callback to trigger on reception.")
    	_ = viper.BindPFlag("tag", singleCmd.Flags().Lookup("tag"))
    
    	singleCmd.Flags().Uint8("maxMessages", 1,
    		"The max number of single-use response messages.")
    	_ = viper.BindPFlag("maxMessages", singleCmd.Flags().Lookup("maxMessages"))
    
    	singleCmd.Flags().DurationP("timeout", "t", 30*time.Second,
    		"Duration before stopping to wait for single-use message.")
    	_ = viper.BindPFlag("timeout", singleCmd.Flags().Lookup("timeout"))
    
    	rootCmd.AddCommand(singleCmd)
    }
    
    // sendSingleUse sends a single use message.
    func sendSingleUse(m *single.Manager, partner contact.Contact, payload []byte,
    	maxMessages uint8, timeout time.Duration, tag string) {
    	// Construct callback
    	callbackChan := make(chan struct {
    		payload []byte
    		err     error
    	})
    	callback := func(payload []byte, err error) {
    		callbackChan <- struct {
    			payload []byte
    			err     error
    		}{payload: payload, err: err}
    	}
    
    	jww.INFO.Printf("Sending single-use message to contact: %+v", partner)
    	jww.INFO.Printf("Payload: \"%s\"", payload)
    	jww.INFO.Printf("Max number of replies: %d", maxMessages)
    	jww.INFO.Printf("Timeout: %s", timeout)
    
    	// Send single-use message
    	fmt.Printf("Sending single-use transmission message: %s\n", payload)
    	jww.DEBUG.Printf("Sending single-use transmission to %s: %s", partner.ID, payload)
    	err := m.TransmitSingleUse(partner, payload, tag, maxMessages, callback, timeout)
    	if err != nil {
    		jww.FATAL.Panicf("Failed to transmit single-use message: %+v", err)
    	}
    
    	// Wait for callback to be called
    	fmt.Println("Waiting for response.")
    	results := <-callbackChan
    	if results.payload != nil {
    		fmt.Printf("Message received: %s\n", results.payload)
    		jww.DEBUG.Printf("Received single-use reply payload: %s", results.payload)
    	} else {
    		jww.ERROR.Print("Failed to receive single-use reply payload.")
    	}
    
    	if results.err != nil {
    		jww.FATAL.Panicf("Received error when waiting for reply: %+v", results.err)
    	}
    }
    
    // replySingleUse responds to any single-use message it receives by replying\
    // with the same payload.
    func replySingleUse(m *single.Manager, timeout time.Duration, callbackChan chan responseCallbackChan) {
    
    	// Wait to receive a message or stop after timeout occurs
    	fmt.Println("Waiting for single-use message.")
    	timer := time.NewTimer(timeout)
    	select {
    	case results := <-callbackChan:
    		if results.payload != nil {
    			fmt.Printf("Single-use transmission received: %s\n", results.payload)
    			jww.DEBUG.Printf("Received single-use transmission from %s: %s",
    				results.c.GetPartner(), results.payload)
    		} else {
    			jww.ERROR.Print("Failed to receive single-use payload.")
    		}
    
    		// Create new payload from repeated received payloads so that each
    		// message part contains the same payload
    		payload := makeResponsePayload(m, results.payload, results.c.GetMaxParts())
    
    		fmt.Printf("Sending single-use response message: %s\n", payload)
    		jww.DEBUG.Printf("Sending single-use response to %s: %s", results.c.GetPartner(), payload)
    		err := m.RespondSingleUse(results.c, payload, timeout)
    		if err != nil {
    			jww.FATAL.Panicf("Failed to send response: %+v", err)
    		}
    
    	case <-timer.C:
    		fmt.Println("Timed out!")
    		jww.FATAL.Panicf("Failed to receive transmission after %s.", timeout)
    	}
    }
    
    // responseCallbackChan structure used to collect information sent to the
    // response callback.
    type responseCallbackChan struct {
    	payload []byte
    	c       single.Contact
    }
    
    // makeResponsePayload generates a new payload that will span the max number of
    // message parts in the contact. Each resulting message payload will contain a
    // copy of the supplied payload with spaces taking up any remaining data.
    func makeResponsePayload(m *single.Manager, payload []byte, maxParts uint8) []byte {
    	payloads := make([][]byte, maxParts)
    	payloadPart := makeResponsePayloadPart(m, payload)
    	for i := range payloads {
    		payloads[i] = make([]byte, m.GetMaxResponsePayloadSize())
    		copy(payloads[i], payloadPart)
    	}
    	return bytes.Join(payloads, []byte{})
    }
    
    // makeResponsePayloadPart creates a single response payload by coping the given
    // payload and filling the rest with spaces.
    func makeResponsePayloadPart(m *single.Manager, payload []byte) []byte {
    	payloadPart := make([]byte, m.GetMaxResponsePayloadSize())
    	for i := range payloadPart {
    		payloadPart[i] = ' '
    	}
    	copy(payloadPart, payload)
    
    	return payloadPart
    }
    
    // readSingleUseContact opens the contact specified in the CLI flags. Panics if
    // no file provided or if an error occurs while reading or unmarshalling it.
    func readSingleUseContact(key string) contact.Contact {
    	// Get path
    	filePath := viper.GetString(key)
    	if filePath == "" {
    		jww.FATAL.Panicf("Failed to read contact file: no file path provided.")
    	}
    
    	// Read from file
    	data, err := utils.ReadFile(filePath)
    	jww.INFO.Printf("Contact file size read in: %d bytes", len(data))
    	if err != nil {
    		jww.FATAL.Panicf("Failed to read contact file: %+v", err)
    	}
    
    	// Unmarshal contact
    	c, err := contact.Unmarshal(data)
    	if err != nil {
    		jww.FATAL.Panicf("Failed to unmarshal contact: %+v", err)
    	}
    
    	return c
    }