Select Git revision
stoppable_test.go
single.go 9.50 KiB
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
// Package cmd initializes the CLI and config parsers as well as the logger.
package cmd
import (
"bytes"
"fmt"
"time"
"github.com/spf13/cobra"
jww "github.com/spf13/jwalterweatherman"
"github.com/spf13/viper"
"gitlab.com/elixxir/client/cmix"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/client/single"
"gitlab.com/elixxir/client/xxdk"
"gitlab.com/elixxir/crypto/contact"
"gitlab.com/xx_network/primitives/utils"
)
// singleCmd is the single-use subcommand that allows for sending and responding
// to single-use messages.
var singleCmd = &cobra.Command{
Use: "single",
Short: "Send and respond to single-use messages.",
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
client := initClient()
// Write user contact to file
user := client.GetUser()
jww.INFO.Printf("User: %s", user.ReceptionID)
jww.INFO.Printf("User Transmission: %s", user.TransmissionID)
writeContact(user.GetContact())
err := client.StartNetworkFollower(5 * time.Second)
if err != nil {
jww.FATAL.Panicf("%+v", err)
}
// Wait until connected or crash on timeout
connected := make(chan bool, 10)
client.GetCmix().AddHealthCallback(
func(isconnected bool) {
connected <- isconnected
})
waitUntilConnected(connected)
// get the tag
tag := viper.GetString("tag")
// Register the callback
receiver := &Receiver{
recvCh: make(chan struct {
request *single.Request
ephID receptionID.EphemeralIdentity
round []rounds.Round
}),
}
myID := client.GetUser().ReceptionID
listener := single.Listen(tag, myID,
client.GetUser().E2eDhPrivateKey,
client.GetCmix(),
client.GetStorage().GetE2EGroup(),
receiver)
for numReg, total := 1, 100; numReg < (total*3)/4; {
time.Sleep(1 * time.Second)
numReg, total, err = client.GetNodeRegistrationStatus()
if err != nil {
jww.FATAL.Panicf("%+v", err)
}
jww.INFO.Printf("Registering with nodes (%d/%d)...",
numReg, total)
}
timeout := viper.GetDuration("timeout")
// If the send flag is set, then send a message
if viper.GetBool("send") {
// get message details
payload := []byte(viper.GetString("message"))
partner := readSingleUseContact("contact")
maxMessages := uint8(viper.GetUint("maxMessages"))
sendSingleUse(client.Cmix, partner, payload,
maxMessages, timeout, tag)
}
// If the reply flag is set, then start waiting for a
// message and reply when it is received
if viper.GetBool("reply") {
replySingleUse(timeout, receiver)
}
listener.Stop()
},
}
func init() {
// Single-use subcommand options
singleCmd.Flags().Bool("send", false, "Sends a single-use message.")
_ = viper.BindPFlag("send", singleCmd.Flags().Lookup("send"))
singleCmd.Flags().Bool("reply", false,
"Listens for a single-use message and sends a reply.")
_ = viper.BindPFlag("reply", singleCmd.Flags().Lookup("reply"))
singleCmd.Flags().StringP("contact", "c", "",
"Path to contact file to send message to.")
_ = viper.BindPFlag("contact", singleCmd.Flags().Lookup("contact"))
singleCmd.Flags().StringP("tag", "", "testTag",
"The tag that specifies the callback to trigger on reception.")
_ = viper.BindPFlag("tag", singleCmd.Flags().Lookup("tag"))
singleCmd.Flags().Uint8("maxMessages", 1,
"The max number of single-use response messages.")
_ = viper.BindPFlag("maxMessages", singleCmd.Flags().Lookup("maxMessages"))
singleCmd.Flags().DurationP("timeout", "t", 30*time.Second,
"Duration before stopping to wait for single-use message.")
_ = viper.BindPFlag("timeout", singleCmd.Flags().Lookup("timeout"))
rootCmd.AddCommand(singleCmd)
}
type Response struct {
callbackChan chan struct {
payload []byte
err error
}
}
func (r *Response) Callback(payload []byte, receptionID receptionID.EphemeralIdentity,
round []rounds.Round, err error) {
jww.DEBUG.Printf("Payload: %v, receptionID: %v, round: %v, err: %v",
payload, receptionID, round, err)
r.callbackChan <- struct {
payload []byte
err error
}{payload: payload, err: err}
}
// sendSingleUse sends a single use message.
func sendSingleUse(m *xxdk.Cmix, partner contact.Contact, payload []byte,
maxMessages uint8, timeout time.Duration, tag string) {
// Construct callback
callback := &Response{
callbackChan: make(chan struct {
payload []byte
err error
}),
}
jww.INFO.Printf("Sending single-use message to contact: %+v", partner)
jww.INFO.Printf("Payload: \"%s\"", payload)
jww.INFO.Printf("Max number of replies: %d", maxMessages)
jww.INFO.Printf("Timeout: %s", timeout)
// Send single-use message
fmt.Printf("Sending single-use transmission message: %s\n", payload)
jww.DEBUG.Printf("Sending single-use transmission to %s: %s",
partner.ID, payload)
params := single.GetDefaultRequestParams()
params.MaxResponseMessages = maxMessages
rng := m.GetRng().GetStream()
defer rng.Close()
e2eGrp := m.GetStorage().GetE2EGroup()
rnd, ephID, err := single.TransmitRequest(partner, tag, payload, callback, params,
m.GetCmix(), rng, e2eGrp)
if err != nil {
jww.FATAL.Panicf("Failed to transmit single-use message: %+v", err)
}
jww.INFO.Printf("Single Use request sent on round %v with id %v", rnd,
ephID)
// Wait for callback to be called
fmt.Println("Waiting for response.")
results := <-callback.callbackChan
if results.payload != nil {
fmt.Printf("Message received: %s\n", results.payload)
jww.DEBUG.Printf("Received single-use reply payload: %s", results.payload)
} else {
jww.ERROR.Print("Failed to receive single-use reply payload.")
}
if results.err != nil {
jww.FATAL.Panicf("Received error when waiting for reply: %+v", results.err)
}
}
// replySingleUse responds to any single-use message it receives by replying\
// with the same payload.
func replySingleUse(timeout time.Duration, receiver *Receiver) {
// Wait to receive a message or stop after timeout occurs
fmt.Println("Waiting for single-use message.")
timer := time.NewTimer(timeout)
select {
case results := <-receiver.recvCh:
payload := results.request.GetPayload()
if payload != nil {
fmt.Printf("Single-use transmission received: %s\n", payload)
jww.DEBUG.Printf("Received single-use transmission from %s: %s",
results.request.GetPartner(), payload)
} else {
jww.ERROR.Print("Failed to receive single-use payload.")
}
// Create new payload from repeated received payloads so that each
// message part contains the same payload
resPayload := makeResponsePayload(payload, results.request.GetMaxParts(),
results.request.GetMaxResponsePartSize())
fmt.Printf("Sending single-use response message: %s\n", payload)
jww.DEBUG.Printf("Sending single-use response to %s: %s",
results.request.GetPartner(), payload)
roundId, err := results.request.Respond(resPayload, cmix.GetDefaultCMIXParams(),
30*time.Second)
if err != nil {
jww.FATAL.Panicf("Failed to send response: %+v", err)
}
jww.INFO.Printf("response sent on roundID: %v", roundId)
case <-timer.C:
fmt.Println("Timed out!")
jww.FATAL.Panicf("Failed to receive transmission after %s.", timeout)
}
}
type Receiver struct {
recvCh chan struct {
request *single.Request
ephID receptionID.EphemeralIdentity
round []rounds.Round
}
}
func (r *Receiver) Callback(req *single.Request, ephID receptionID.EphemeralIdentity,
round []rounds.Round) {
r.recvCh <- struct {
request *single.Request
ephID receptionID.EphemeralIdentity
round []rounds.Round
}{
request: req,
ephID: ephID,
round: round,
}
}
// makeResponsePayload generates a new payload that will span the max number of
// message parts in the contact. Each resulting message payload will contain a
// copy of the supplied payload with spaces taking up any remaining data.
func makeResponsePayload(payload []byte, maxParts uint8, maxSizePerPart int) []byte {
payloads := make([][]byte, maxParts)
payloadPart := makeResponsePayloadPart(payload, maxSizePerPart)
for i := range payloads {
payloads[i] = make([]byte, maxSizePerPart)
copy(payloads[i], payloadPart)
}
return bytes.Join(payloads, []byte{})
}
// makeResponsePayloadPart creates a single response payload by coping the given
// payload and filling the rest with spaces.
func makeResponsePayloadPart(payload []byte, maxSize int) []byte {
payloadPart := make([]byte, maxSize)
for i := range payloadPart {
payloadPart[i] = ' '
}
copy(payloadPart, payload)
return payloadPart
}
// readSingleUseContact opens the contact specified in the CLI flags. Panics if
// no file provided or if an error occurs while reading or unmarshalling it.
func readSingleUseContact(key string) contact.Contact {
// get path
filePath := viper.GetString(key)
if filePath == "" {
jww.FATAL.Panicf("Failed to read contact file: no file path provided.")
}
// Read from file
data, err := utils.ReadFile(filePath)
jww.INFO.Printf("Contact file size read in: %d bytes", len(data))
if err != nil {
jww.FATAL.Panicf("Failed to read contact file: %+v", err)
}
// Unmarshal contact
c, err := contact.Unmarshal(data)
if err != nil {
jww.FATAL.Panicf("Failed to unmarshal contact: %+v", err)
}
return c
}