Skip to content
Snippets Groups Projects
Select Git revision
  • 20e1c77f57935371091c58765788ba45973aefd2
  • release default protected
  • 11-22-implement-kv-interface-defined-in-collectiveversionedkvgo
  • hotfix/TestHostPool_UpdateNdf_AddFilter
  • XX-4719/announcementChannels
  • xx-4717/logLevel
  • jonah/noob-channel
  • master protected
  • XX-4707/tagDiskJson
  • xx-4698/notification-retry
  • hotfix/notifylockup
  • syncNodes
  • hotfix/localCB
  • XX-4677/NewChanManagerMobile
  • XX-4689/DmSync
  • duplicatePrefix
  • XX-4601/HavenInvites
  • finalizedUICallbacks
  • XX-4673/AdminKeySync
  • debugNotifID
  • anne/test
  • v4.7.5
  • v4.7.4
  • v4.7.3
  • v4.7.2
  • v4.7.1
  • v4.6.3
  • v4.6.1
  • v4.5.0
  • v4.4.4
  • v4.3.11
  • v4.3.8
  • v4.3.7
  • v4.3.6
  • v4.3.5
  • v4.2.0
  • v4.3.0
  • v4.3.4
  • v4.3.3
  • v4.3.2
  • v4.3.1
41 results

retrieve_test.go

Blame
  • retrieve_test.go 13.14 KiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    package rounds
    
    import (
    	"bytes"
    	"gitlab.com/elixxir/client/network/gateway"
    	ephemeral2 "gitlab.com/elixxir/client/network/identity/receptionID"
    	"gitlab.com/elixxir/client/network/message"
    	"gitlab.com/elixxir/client/stoppable"
    	pb "gitlab.com/elixxir/comms/mixmessages"
    	"gitlab.com/elixxir/crypto/fastRNG"
    	"gitlab.com/xx_network/comms/connect"
    	"gitlab.com/xx_network/crypto/csprng"
    	"gitlab.com/xx_network/primitives/id"
    	"gitlab.com/xx_network/primitives/id/ephemeral"
    	"gitlab.com/xx_network/primitives/ndf"
    	"reflect"
    	"testing"
    	"time"
    )
    
    // Happy path
    func TestManager_ProcessMessageRetrieval(t *testing.T) {
    	// General initializations
    	connect.TestingOnlyDisableTLS = true
    	testManager := newManager(t)
    	roundId := id.Round(5)
    	mockComms := &mockMessageRetrievalComms{testingSignature: t}
    	stop := stoppable.NewSingle("singleStoppable")
    	testNdf := getNDF()
    	nodeId := id.NewIdFromString(ReturningGateway, id.Node, &testing.T{})
    	gwId := nodeId.DeepCopy()
    	gwId.SetType(id.Gateway)
    	testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}}
    	testManager.rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG)
    
    	p := gateway.DefaultPoolParams()
    	p.MaxPoolSize = 1
    	var err error
    	testManager.sender, err = gateway.NewSender(p, testManager.rng,
    		testNdf, mockComms, testManager.session, nil)
    	if err != nil {
    		t.Errorf(err.Error())
    	}
    
    	// Create a local channel so reception is possible (testManager.messageBundles is
    	// send only via newManager call above)
    	messageBundleChan := make(chan message.Bundle)
    	testManager.messageBundles = messageBundleChan
    
    	// Initialize the message retrieval
    	go testManager.processMessageRetrieval(mockComms, stop)
    
    	// Construct expected values for checking
    	expectedEphID := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8}
    	payloadMsg := []byte(PayloadMessage)
    	expectedPayload := make([]byte, 256)
    	copy(expectedPayload, payloadMsg)
    
    	go func() {
    		requestGateway := id.NewIdFromString(ReturningGateway, id.Gateway, t)
    
    		// Construct the round lookup
    		ephIdentity := ephemeral2.EphemeralIdentity{
    			EphId:  expectedEphID,
    			Source: requestGateway,
    		}
    
    		idList := [][]byte{requestGateway.Bytes()}
    
    		roundInfo := &pb.RoundInfo{
    			ID:       uint64(roundId),
    			Topology: idList,
    		}
    
    		// Send a round look up request
    		testManager.lookupRoundMessages <- roundLookup{
    			RoundInfo: roundInfo,
    			Identity:  ephIdentity,
    		}
    
    	}()
    
    	var testBundle message.Bundle
    	go func() {
    		// Receive the bundle over the channel
    		time.Sleep(1 * time.Second)
    		testBundle = <-messageBundleChan
    
    		// Close the process
    		err := stop.Close()
    		if err != nil {
    			t.Errorf("Failed to signal close to process: %+v", err)
    		}
    	}()
    
    	// Ensure bundle received and has expected values
    	time.Sleep(2 * time.Second)
    	if reflect.DeepEqual(testBundle, message.Bundle{}) {
    		t.Errorf("Did not receive a message bundle over the channel")
    		t.FailNow()
    	}
    
    	if testBundle.Identity.EphId.Int64() != expectedEphID.Int64() {
    		t.Errorf("Unexpected address ID in bundle."+
    			"\n\tExpected: %v"+
    			"\n\tReceived: %v", expectedEphID, testBundle.Identity.EphId)
    	}
    
    	if !bytes.Equal(expectedPayload, testBundle.Messages[0].GetPayloadA()) {
    		t.Errorf("Unexpected address ID in bundle."+
    			"\n\tExpected: %v"+
    			"\n\tReceived: %v", expectedPayload, testBundle.Messages[0].GetPayloadA())
    
    	}
    
    }
    
    // Utilize the mockComms to construct a gateway which does not have the round
    func TestManager_ProcessMessageRetrieval_NoRound(t *testing.T) {
    	// General initializations
    	testManager := newManager(t)
    	p := gateway.DefaultPoolParams()
    	p.MaxPoolSize = 1
    	roundId := id.Round(5)
    	mockComms := &mockMessageRetrievalComms{testingSignature: t}
    	testNdf := getNDF()
    	nodeId := id.NewIdFromString(FalsePositive, id.Node, &testing.T{})
    	gwId := nodeId.DeepCopy()
    	gwId.SetType(id.Gateway)
    	testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}}
    	testManager.rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG)
    
    	testManager.sender, _ = gateway.NewSender(p,
    		testManager.rng,
    		testNdf, mockComms, testManager.session, nil)
    	stop := stoppable.NewSingle("singleStoppable")
    
    	// Create a local channel so reception is possible
    	// (testManager.messageBundles is sent only via newManager call above)
    	messageBundleChan := make(chan message.Bundle)
    	testManager.messageBundles = messageBundleChan
    
    	// Initialize the message retrieval
    	go testManager.processMessageRetrieval(mockComms, stop)
    
    	expectedEphID := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8}
    
    	// Construct a gateway without keyword ID in utils_test.go
    	// ie mockComms does not return a round
    	dummyGateway := id.NewIdFromString("Sauron", id.Gateway, t)
    
    	go func() {
    		// Construct the round lookup
    		identity := ephemeral2.EphemeralIdentity{
    			EphId:  expectedEphID,
    			Source: dummyGateway,
    		}
    
    		idList := [][]byte{dummyGateway.Marshal()}
    
    		roundInfo := &pb.RoundInfo{
    			ID:       uint64(roundId),
    			Topology: idList,
    		}
    
    		// Send a round look up request
    		testManager.lookupRoundMessages <- roundLookup{
    			RoundInfo: roundInfo,
    			Identity:  identity,
    		}
    
    	}()
    
    	var testBundle message.Bundle
    	go func() {
    		// Receive the bundle over the channel
    		time.Sleep(1 * time.Second)
    		testBundle = <-messageBundleChan
    
    		// Close the process
    		if err := stop.Close(); err != nil {
    			t.Errorf("Failed to signal close to process: %+v", err)
    		}
    	}()
    
    	time.Sleep(2 * time.Second)
    	if !reflect.DeepEqual(testBundle, message.Bundle{}) {
    		t.Errorf("Should not receive a message bundle, mock gateway should not return round."+
    			"\n\tExpected: %v"+
    			"\n\tReceived: %v", message.Bundle{}, testBundle)
    	}
    }
    
    // Test the path where there are no messages,
    // simulating a false positive in a bloom filter
    func TestManager_ProcessMessageRetrieval_FalsePositive(t *testing.T) {
    	// General initializations
    	testManager := newManager(t)
    	roundId := id.Round(5)
    	mockComms := &mockMessageRetrievalComms{testingSignature: t}
    	stop := stoppable.NewSingle("singleStoppable")
    	testNdf := getNDF()
    	nodeId := id.NewIdFromString(FalsePositive, id.Node, &testing.T{})
    	gwId := nodeId.DeepCopy()
    	gwId.SetType(id.Gateway)
    	testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}}
    	testManager.rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG)
    
    	p := gateway.DefaultPoolParams()
    	p.MaxPoolSize = 1
    	testManager.sender, _ = gateway.NewSender(p,
    		testManager.rng,
    		testNdf, mockComms, testManager.session, nil)
    
    	// Create a local channel so reception is possible (testManager.messageBundles is
    	// send only via newManager call above)
    	messageBundleChan := make(chan message.Bundle)
    	testManager.messageBundles = messageBundleChan
    
    	// Initialize the message retrieval
    	go testManager.processMessageRetrieval(mockComms, stop)
    
    	// Construct expected values for checking
    	expectedEphID := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8}
    	payloadMsg := []byte(PayloadMessage)
    	expectedPayload := make([]byte, 256)
    	copy(expectedPayload, payloadMsg)
    
    	go func() {
    		// Construct the round lookup
    		identity := ephemeral2.EphemeralIdentity{
    			EphId:  expectedEphID,
    			Source: id.NewIdFromString("Source", id.User, t),
    		}
    
    		requestGateway := id.NewIdFromString(FalsePositive, id.Gateway, t)
    
    		idList := [][]byte{requestGateway.Bytes()}
    
    		roundInfo := &pb.RoundInfo{
    			ID:       uint64(roundId),
    			Topology: idList,
    		}
    
    		// Send a round look up request
    		testManager.lookupRoundMessages <- roundLookup{
    			RoundInfo: roundInfo,
    			Identity:  identity,
    		}
    
    	}()
    
    	var testBundle message.Bundle
    	go func() {
    		// Receive the bundle over the channel
    		time.Sleep(1 * time.Second)
    		testBundle = <-messageBundleChan
    
    		// Close the process
    		if err := stop.Close(); err != nil {
    			t.Errorf("Failed to signal close to process: %+v", err)
    		}
    	}()
    
    	// Ensure no bundle was received due to false positive test
    	time.Sleep(2 * time.Second)
    	if !reflect.DeepEqual(testBundle, message.Bundle{}) {
    		t.Errorf("Received a message bundle over the channel, should receive empty message list")
    		t.FailNow()
    	}
    
    }
    
    // Ensure that the quit chan closes the program, on an otherwise happy path
    func TestManager_ProcessMessageRetrieval_Quit(t *testing.T) {
    	// General initializations
    	testManager := newManager(t)
    	roundId := id.Round(5)
    	mockComms := &mockMessageRetrievalComms{testingSignature: t}
    	stop := stoppable.NewSingle("singleStoppable")
    
    	// Create a local channel so reception is possible (testManager.messageBundles is
    	// send only via newManager call above)
    	messageBundleChan := make(chan message.Bundle)
    	testManager.messageBundles = messageBundleChan
    
    	// Initialize the message retrieval
    	go testManager.processMessageRetrieval(mockComms, stop)
    
    	// Close the process early, before any logic below can be completed
    	if err := stop.Close(); err != nil {
    		t.Errorf("Failed to signal close to process: %+v", err)
    	}
    
    	if err := stoppable.WaitForStopped(stop, 300*time.Millisecond); err != nil {
    		t.Fatalf("Failed to stop stoppable: %+v", err)
    	}
    
    	// Construct expected values for checking
    	expectedEphID := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8}
    	payloadMsg := []byte(PayloadMessage)
    	expectedPayload := make([]byte, 256)
    	copy(expectedPayload, payloadMsg)
    
    	go func() {
    		// Construct the round lookup
    		identity := ephemeral2.EphemeralIdentity{
    			EphId: expectedEphID,
    		}
    
    		requestGateway := id.NewIdFromString(ReturningGateway, id.Gateway, t)
    
    		idList := [][]byte{requestGateway.Bytes()}
    
    		roundInfo := &pb.RoundInfo{
    			ID:       uint64(roundId),
    			Topology: idList,
    		}
    
    		// Send a round look up request
    		testManager.lookupRoundMessages <- roundLookup{
    			RoundInfo: roundInfo,
    			Identity:  identity,
    		}
    
    	}()
    
    	var testBundle message.Bundle
    	go func() {
    		// Receive the bundle over the channel
    		testBundle = <-messageBundleChan
    
    	}()
    
    	time.Sleep(1 * time.Second)
    	// Ensure no bundle was received due to quiting process early
    	if !reflect.DeepEqual(testBundle, message.Bundle{}) {
    		t.Errorf("Received a message bundle over the channel, process should have quit before reception")
    		t.FailNow()
    	}
    
    }
    
    // Path in which multiple error comms are encountered before a happy path comms
    func TestManager_ProcessMessageRetrieval_MultipleGateways(t *testing.T) {
    	// General initializations
    	testManager := newManager(t)
    	roundId := id.Round(5)
    	mockComms := &mockMessageRetrievalComms{testingSignature: t}
    	stop := stoppable.NewSingle("singleStoppable")
    	testNdf := getNDF()
    	nodeId := id.NewIdFromString(ReturningGateway, id.Node, &testing.T{})
    	gwId := nodeId.DeepCopy()
    	gwId.SetType(id.Gateway)
    	testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}}
    	testManager.rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG)
    
    	p := gateway.DefaultPoolParams()
    	p.MaxPoolSize = 1
    	testManager.sender, _ = gateway.NewSender(p,
    		testManager.rng,
    		testNdf, mockComms, testManager.session, nil)
    
    	// Create a local channel so reception is possible (testManager.messageBundles is
    	// send only via newManager call above)
    	messageBundleChan := make(chan message.Bundle)
    	testManager.messageBundles = messageBundleChan
    
    	// Initialize the message retrieval
    	go testManager.processMessageRetrieval(mockComms, stop)
    
    	// Construct expected values for checking
    	expectedEphID := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8}
    	payloadMsg := []byte(PayloadMessage)
    	expectedPayload := make([]byte, 256)
    	copy(expectedPayload, payloadMsg)
    
    	go func() {
    		requestGateway := id.NewIdFromString(ReturningGateway, id.Gateway, t)
    		errorGateway := id.NewIdFromString(ErrorGateway, id.Gateway, t)
    		// Construct the round lookup
    		identity := ephemeral2.EphemeralIdentity{
    			EphId:  expectedEphID,
    			Source: requestGateway,
    		}
    
    		// Create a list of ID's in which some error gateways must be contacted before the happy path
    		idList := [][]byte{errorGateway.Bytes(), errorGateway.Bytes(), requestGateway.Bytes()}
    
    		roundInfo := &pb.RoundInfo{
    			ID:       uint64(roundId),
    			Topology: idList,
    		}
    
    		// Send a round look up request
    		testManager.lookupRoundMessages <- roundLookup{
    			RoundInfo: roundInfo,
    			Identity:  identity,
    		}
    
    	}()
    
    	var testBundle message.Bundle
    	go func() {
    		// Receive the bundle over the channel
    		time.Sleep(1 * time.Second)
    		testBundle = <-messageBundleChan
    
    		// Close the process
    		if err := stop.Close(); err != nil {
    			t.Errorf("Failed to signal close to process: %+v", err)
    		}
    	}()
    
    	// Ensure that expected bundle is still received from happy comm
    	// despite initial errors
    	time.Sleep(2 * time.Second)
    	if reflect.DeepEqual(testBundle, message.Bundle{}) {
    		t.Errorf("Did not receive a message bundle over the channel")
    		t.FailNow()
    	}
    
    	if testBundle.Identity.EphId.Int64() != expectedEphID.Int64() {
    		t.Errorf("Unexpected address ID in bundle."+
    			"\n\tExpected: %v"+
    			"\n\tReceived: %v", expectedEphID, testBundle.Identity.EphId)
    	}
    
    	if !bytes.Equal(expectedPayload, testBundle.Messages[0].GetPayloadA()) {
    		t.Errorf("Unexpected address ID in bundle."+
    			"\n\tExpected: %v"+
    			"\n\tReceived: %v", expectedPayload, testBundle.Messages[0].GetPayloadA())
    
    	}
    
    }