Skip to content
Snippets Groups Projects
Select Git revision
  • bcd7c4bb832bf8ce44d50796ebb13bf26e2af2c6
  • release default protected
  • 11-22-implement-kv-interface-defined-in-collectiveversionedkvgo
  • hotfix/TestHostPool_UpdateNdf_AddFilter
  • XX-4719/announcementChannels
  • xx-4717/logLevel
  • jonah/noob-channel
  • master protected
  • XX-4707/tagDiskJson
  • xx-4698/notification-retry
  • hotfix/notifylockup
  • syncNodes
  • hotfix/localCB
  • XX-4677/NewChanManagerMobile
  • XX-4689/DmSync
  • duplicatePrefix
  • XX-4601/HavenInvites
  • finalizedUICallbacks
  • XX-4673/AdminKeySync
  • debugNotifID
  • anne/test
  • v4.8.0
  • v4.7.9
  • v4.7.8
  • v4.7.7
  • v4.7.6
  • v4.7.5
  • v4.7.4
  • v4.7.3
  • v4.7.2
  • v4.7.1
  • v4.6.3
  • v4.6.1
  • v4.5.0
  • v4.4.4
  • v4.3.11
  • v4.3.8
  • v4.3.7
  • v4.3.6
  • v4.3.5
  • v4.2.0
41 results

transmission.go

Blame
  • transmission.go 10.45 KiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    
    package single
    
    import (
    	"fmt"
    	"github.com/pkg/errors"
    	jww "github.com/spf13/jwalterweatherman"
    	"gitlab.com/elixxir/client/interfaces/params"
    	"gitlab.com/elixxir/client/interfaces/utility"
    	"gitlab.com/elixxir/client/storage/reception"
    	ds "gitlab.com/elixxir/comms/network/dataStructures"
    	contact2 "gitlab.com/elixxir/crypto/contact"
    	"gitlab.com/elixxir/crypto/cyclic"
    	"gitlab.com/elixxir/crypto/e2e/auth"
    	"gitlab.com/elixxir/crypto/e2e/singleUse"
    	"gitlab.com/elixxir/primitives/format"
    	"gitlab.com/elixxir/primitives/states"
    	"gitlab.com/xx_network/crypto/csprng"
    	"gitlab.com/xx_network/primitives/id"
    	"gitlab.com/xx_network/primitives/id/ephemeral"
    	"gitlab.com/xx_network/primitives/netTime"
    	"io"
    	"sync/atomic"
    	"time"
    )
    
    // GetMaxTransmissionPayloadSize returns the maximum payload size for a
    // transmission message.
    func (m *Manager) GetMaxTransmissionPayloadSize() int {
    	// Generate empty messages to determine the available space for the payload
    	cmixPrimeSize := m.store.Cmix().GetGroup().GetP().ByteLen()
    	e2ePrimeSize := m.store.E2e().GetGroup().GetP().ByteLen()
    	cmixMsg := format.NewMessage(cmixPrimeSize)
    	transmitMsg := newTransmitMessage(cmixMsg.ContentsSize(), e2ePrimeSize)
    	msgPayload := newTransmitMessagePayload(transmitMsg.GetPayloadSize())
    
    	return msgPayload.GetMaxContentsSize()
    }
    
    // TransmitSingleUse creates a CMIX message, sends it, and waits for delivery.
    func (m *Manager) TransmitSingleUse(partner contact2.Contact, payload []byte,
    	tag string, maxMsgs uint8, callback ReplyComm, timeout time.Duration) error {
    
    	rngReader := m.rng.GetStream()
    	defer rngReader.Close()
    
    	return m.transmitSingleUse(partner, payload, tag, maxMsgs, rngReader,
    		callback, timeout, m.net.GetInstance().GetRoundEvents())
    }
    
    // roundEvents interface allows custom round events to be passed in for testing.
    type roundEvents interface {
    	AddRoundEventChan(id.Round, chan ds.EventReturn, time.Duration,
    		...states.Round) *ds.EventCallback
    }
    
    // transmitSingleUse has the fields passed in for easier testing.
    func (m *Manager) transmitSingleUse(partner contact2.Contact, payload []byte,
    	tag string, MaxMsgs uint8, rng io.Reader, callback ReplyComm, timeout time.Duration, roundEvents roundEvents) error {
    
    	// Get ephemeral ID address space size; this blocks until the address space
    	// size is set for the first time
    	addressSize := m.net.GetAddressSize()
    
    	// Create new CMIX message containing the transmission payload
    	cmixMsg, dhKey, rid, ephID, err := m.makeTransmitCmixMessage(partner,
    		payload, tag, MaxMsgs, addressSize, timeout, netTime.Now(), rng)
    	if err != nil {
    		return errors.Errorf("failed to create new CMIX message: %+v", err)
    	}
    
    	jww.DEBUG.Printf("Created single-use transmission CMIX message with new ID "+
    		"%s and ephemeral ID %d", rid, ephID.Int64())
    
    	timeStart := netTime.Now()
    
    	// Add message state to map
    	quitChan, quit, err := m.p.addState(rid, dhKey, MaxMsgs, callback, timeout)
    	if err != nil {
    		return errors.Errorf("failed to add pending state: %+v", err)
    	}
    
    	// Add identity for newly generated ID
    	err = m.reception.AddIdentity(reception.Identity{
    		EphId:       ephID,
    		Source:      rid,
    		AddressSize: addressSize,
    		End:         timeStart.Add(2 * timeout),
    		ExtraChecks: 10,
    		StartValid:  timeStart.Add(-2 * timeout),
    		EndValid:    timeStart.Add(2 * timeout),
    		Ephemeral:   true,
    	})
    	if err != nil {
    		errorString := fmt.Sprintf("failed to add new identity to "+
    			"reception storage for single-use: %+v", err)
    		jww.ERROR.Print(errorString)
    
    		// Exit the state timeout handler, delete the state from map, and
    		// return an error on the callback
    		quitChan <- struct{}{}
    		m.p.Lock()
    		delete(m.p.singleUse, *rid)
    		m.p.Unlock()
    		go callback(nil, errors.New(errorString))
    	}
    
    	go func() {
    		// Send Message
    		jww.DEBUG.Printf("Sending single-use transmission CMIX "+
    			"message to %s.", partner.ID)
    		round, _, err := m.net.SendCMIX(cmixMsg, partner.ID, params.GetDefaultCMIX())
    		if err != nil {
    			errorString := fmt.Sprintf("failed to send single-use transmission "+
    				"CMIX message: %+v", err)
    			jww.ERROR.Print(errorString)
    
    			// Exit the state timeout handler, delete the state from map, and
    			// return an error on the callback
    			quitChan <- struct{}{}
    			m.p.Lock()
    			delete(m.p.singleUse, *rid)
    			m.p.Unlock()
    			go callback(nil, errors.New(errorString))
    		}
    
    		// Check if the state timeout handler has quit
    		if atomic.LoadInt32(quit) == 1 {
    			jww.ERROR.Print("Stopping to send single-use transmission CMIX " +
    				"message because the timeout handler quit.")
    			return
    		}
    
    		// Update the timeout for the elapsed time
    		roundEventTimeout := timeout - netTime.Since(timeStart) - time.Millisecond
    
    		// Check message delivery
    		sendResults := make(chan ds.EventReturn, 1)
    		roundEvents.AddRoundEventChan(round, sendResults, roundEventTimeout,
    			states.COMPLETED, states.FAILED)
    
    		im := fmt.Sprintf("Sent single-use transmission CMIX "+
    			"message to %s and ephemeral ID %d on round %d.",
    			partner.ID, ephID.Int64(), round)
    		jww.DEBUG.Print(im)
    		if m.client != nil {
    			m.client.ReportEvent(1, "SingleUse", "MessageSend", im)
    		}
    
    		// Wait until the result tracking responds
    		success, numRoundFail, numTimeOut := utility.TrackResults(sendResults, 1)
    		if !success {
    			errorString := fmt.Sprintf("failed to send single-use transmission "+
    				"message: %d round failures, %d round event time outs.",
    				numRoundFail, numTimeOut)
    			jww.ERROR.Print(errorString)
    
    			// Exit the state timeout handler, delete the state from map, and
    			// return an error on the callback
    			quitChan <- struct{}{}
    			m.p.Lock()
    			delete(m.p.singleUse, *rid)
    			m.p.Unlock()
    			go callback(nil, errors.New(errorString))
    		}
    		jww.DEBUG.Print("Tracked single-use transmission message round.")
    	}()
    
    	return nil
    }
    
    // makeTransmitCmixMessage generates a CMIX message containing the transmission message,
    // which contains the encrypted payload.
    func (m *Manager) makeTransmitCmixMessage(partner contact2.Contact,
    	payload []byte, tag string, maxMsgs uint8, addressSize uint8,
    	timeout time.Duration, timeNow time.Time, rng io.Reader) (format.Message,
    	*cyclic.Int, *id.ID, ephemeral.Id, error) {
    	e2eGrp := m.store.E2e().GetGroup()
    
    	// Generate internal payloads based off key size to determine if the passed
    	// in payload is too large to fit in the available contents
    	cmixMsg := format.NewMessage(m.store.Cmix().GetGroup().GetP().ByteLen())
    	transmitMsg := newTransmitMessage(cmixMsg.ContentsSize(), e2eGrp.GetP().ByteLen())
    	msgPayload := newTransmitMessagePayload(transmitMsg.GetPayloadSize())
    
    	if msgPayload.GetMaxContentsSize() < len(payload) {
    		return format.Message{}, nil, nil, ephemeral.Id{},
    			errors.Errorf("length of provided payload (%d) too long for message "+
    				"payload capacity (%d).", len(payload), len(msgPayload.contents))
    	}
    
    	// Generate DH key and public key
    	dhKey, publicKey, err := generateDhKeys(e2eGrp, partner.DhPubKey, rng)
    	if err != nil {
    		return format.Message{}, nil, nil, ephemeral.Id{}, err
    	}
    
    	// Compose payload
    	msgPayload.SetTagFP(singleUse.NewTagFP(tag))
    	msgPayload.SetMaxParts(maxMsgs)
    	msgPayload.SetContents(payload)
    
    	// Generate new user ID and ephemeral ID
    	rid, ephID, err := makeIDs(&msgPayload, publicKey, addressSize, timeout,
    		timeNow, rng)
    	if err != nil {
    		return format.Message{}, nil, nil, ephemeral.Id{},
    			errors.Errorf("failed to generate IDs: %+v", err)
    	}
    
    	// Encrypt payload
    	fp := singleUse.NewTransmitFingerprint(partner.DhPubKey)
    	key := singleUse.NewTransmitKey(dhKey)
    	encryptedPayload := auth.Crypt(key, fp[:24], msgPayload.Marshal())
    
    	// Generate CMIX message MAC
    	mac := singleUse.MakeMAC(key, encryptedPayload)
    
    	// Compose transmission message
    	transmitMsg.SetPubKey(publicKey)
    	transmitMsg.SetPayload(encryptedPayload)
    
    	// Compose CMIX message contents
    	cmixMsg.SetContents(transmitMsg.Marshal())
    	cmixMsg.SetKeyFP(fp)
    	cmixMsg.SetMac(mac)
    
    	return cmixMsg, dhKey, rid, ephID, nil
    }
    
    // generateDhKeys generates a new public key and DH key.
    func generateDhKeys(grp *cyclic.Group, dhPubKey *cyclic.Int,
    	rng io.Reader) (*cyclic.Int, *cyclic.Int, error) {
    	// Generate private key
    	privKeyBytes, err := csprng.GenerateInGroup(grp.GetP().Bytes(),
    		grp.GetP().ByteLen(), rng)
    	if err != nil {
    		return nil, nil, errors.Errorf("failed to generate key in group: %+v",
    			err)
    	}
    	privKey := grp.NewIntFromBytes(privKeyBytes)
    
    	// Generate public key and DH key
    	publicKey := grp.ExpG(privKey, grp.NewInt(1))
    	dhKey := grp.Exp(dhPubKey, privKey, grp.NewInt(1))
    
    	return dhKey, publicKey, nil
    }
    
    // makeIDs generates a new user ID and ephemeral ID with a start and end within
    // the given timout. The ID is generated from the unencrypted msg payload, which
    // contains a nonce. If the generated ephemeral ID has a window that is not
    // within +/- the given 2*timeout from now, then the IDs are generated again
    // using a new nonce.
    func makeIDs(msg *transmitMessagePayload, publicKey *cyclic.Int,
    	addressSize uint8, timeout time.Duration, timeNow time.Time,
    	rng io.Reader) (*id.ID, ephemeral.Id, error) {
    	var rid *id.ID
    	var ephID ephemeral.Id
    
    	// Generate acceptable window for the ephemeral ID to exist in
    	windowStart, windowEnd := timeNow.Add(-2*timeout), timeNow.Add(2*timeout)
    	start, end := timeNow, timeNow
    
    	// Loop until the ephemeral ID's start and end are within bounds
    	for windowStart.Before(start) || windowEnd.After(end) {
    		// Generate new nonce
    		err := msg.SetNonce(rng)
    		if err != nil {
    			return nil, ephemeral.Id{},
    				errors.Errorf("failed to generate nonce: %+v", err)
    		}
    
    		// Generate ID from unencrypted payload
    		rid = msg.GetRID(publicKey)
    
    		// Generate the ephemeral ID
    		ephID, start, end, err = ephemeral.GetId(rid, uint(addressSize), timeNow.UnixNano())
    		if err != nil {
    			return nil, ephemeral.Id{}, errors.Errorf("failed to generate "+
    				"ephemeral ID from newly generated ID: %+v", err)
    		}
    		jww.DEBUG.Printf("ephemeral.GetId(%s, %d, %d) = %d", rid, addressSize, timeNow.UnixNano(), ephID.Int64())
    	}
    
    	jww.INFO.Printf("generated by singe use sender reception id for single use: %s, " +
    		"ephId: %d, pubkey: %x, msg: %s", rid, ephID.Int64(), publicKey.Bytes(), msg)
    
    	return rid, ephID, nil
    }