From 96ad3f2143870f1569c3ab633098ada06d102305 Mon Sep 17 00:00:00 2001 From: Benjamin Wenger <ben@elixxir.ioo> Date: Mon, 14 Dec 2020 09:33:49 -0800 Subject: [PATCH] finished basic implementation --- api/client.go | 9 ++++ api/userDiscovery.go | 14 +++++- go.mod | 4 +- go.sum | 2 + ud/confirmFact.go | 19 +++++--- ud/confirmFact_test.go | 10 ++--- ud/lookup.go | 13 +++--- ud/lookup_test.go | 23 +++++++++- ud/manager.go | 83 ++++++++++++++++++++++++++++++++--- ud/register.go | 9 ++++ ud/register_test.go | 3 ++ ud/registered.go | 56 ++++++++++++++++++++++++ ud/remove.go | 6 +++ ud/remove_test.go | 3 ++ ud/search.go | 46 +++++++++++++++++++- ud/search_test.go | 99 ++++++++++++++++++++++++++++++++++++++++++ 16 files changed, 370 insertions(+), 29 deletions(-) create mode 100644 ud/registered.go diff --git a/api/client.go b/api/client.go index ba9817414..c593151d8 100644 --- a/api/client.go +++ b/api/client.go @@ -29,6 +29,8 @@ import ( "time" ) +type ServiceProcess func()stoppable.Stoppable + type Client struct { //generic RNG for client rng *fastRNG.StreamGenerator @@ -52,6 +54,9 @@ type Client struct { //contains stopables for all running threads runner *stoppable.Multi status *statusTracker + + serviceProcessies []ServiceProcess + } // NewClient creates client storage, generates keys, connects, and registers @@ -287,6 +292,10 @@ func (c *Client) StartNetworkFollower() error { return errors.WithMessage(err, "Failed to Start the Network Follower") } + for _, p := range c.serviceProcessies{ + c.runner.Add(p()) + } + return nil } diff --git a/api/userDiscovery.go b/api/userDiscovery.go index 680ed4fac..b51fef51d 100644 --- a/api/userDiscovery.go +++ b/api/userDiscovery.go @@ -1,10 +1,11 @@ package api import ( - jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/contact" + "gitlab.com/elixxir/client/ud" ) +/* // Returns true if the cryptographic identity has been registered with // the CMIX user discovery agent. // Note that clients do not need to perform this step if they use @@ -73,4 +74,15 @@ func (c *Client) SearchWithCallback(data, separator string, searchTypes []byte, //TODO: Timer } }(resultCh, cb) +}*/ + +func (c *Client) StartUD() (*ud.Manager, error) { + m, err := ud.NewManager(c.comms, c.rng, c.switchboard, c.storage, c.network) + if err!=nil{ + return nil, err + } + + c.serviceProcessies = append(c.serviceProcessies, m.StartProcesses()) + c.runner.Add(m.StartProcesses()) + return m, nil } diff --git a/go.mod b/go.mod index 7d98b995c..cdf783e11 100644 --- a/go.mod +++ b/go.mod @@ -19,12 +19,12 @@ require ( github.com/spf13/viper v1.7.1 gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 gitlab.com/elixxir/comms v0.0.4-0.20201125010058-1bd873622e92 - gitlab.com/elixxir/crypto v0.0.5-0.20201125005724-bcc603df02d3 + gitlab.com/elixxir/crypto v0.0.6 gitlab.com/elixxir/ekv v0.1.3 gitlab.com/elixxir/primitives v0.0.3-0.20201116174806-97f190989704 gitlab.com/xx_network/comms v0.0.4-0.20201119231004-a67d08045535 gitlab.com/xx_network/crypto v0.0.5-0.20201124194022-366c10b1bce0 - gitlab.com/xx_network/primitives v0.0.3-0.20201116234927-44e42fc91e7c + gitlab.com/xx_network/primitives v0.0.3-0.20201209182507-be7e190879a6 golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 google.golang.org/protobuf v1.25.0 gopkg.in/ini.v1 v1.61.0 // indirect diff --git a/go.sum b/go.sum index 66f26427e..68badd9a8 100644 --- a/go.sum +++ b/go.sum @@ -302,6 +302,8 @@ gitlab.com/xx_network/primitives v0.0.2 h1:r45yKenJ9e7PylI1ZXJ1Es09oYNaYXjxVy9+u gitlab.com/xx_network/primitives v0.0.2/go.mod h1:cs0QlFpdMDI6lAo61lDRH2JZz+3aVkHy+QogOB6F/qc= gitlab.com/xx_network/primitives v0.0.3-0.20201116234927-44e42fc91e7c h1:mYId667WIN97E6KhPw4HDYyCjWzsG7gCM/HLTNTCXZQ= gitlab.com/xx_network/primitives v0.0.3-0.20201116234927-44e42fc91e7c/go.mod h1:cs0QlFpdMDI6lAo61lDRH2JZz+3aVkHy+QogOB6F/qc= +gitlab.com/xx_network/primitives v0.0.3-0.20201209182507-be7e190879a6 h1:qDDXJFvVGDil3InrgRNe7WEOnk34ZioGYwpkAMhfxuo= +gitlab.com/xx_network/primitives v0.0.3-0.20201209182507-be7e190879a6/go.mod h1:cs0QlFpdMDI6lAo61lDRH2JZz+3aVkHy+QogOB6F/qc= gitlab.com/xx_network/ring v0.0.2 h1:TlPjlbFdhtJrwvRgIg4ScdngMTaynx/ByHBRZiXCoL0= gitlab.com/xx_network/ring v0.0.2/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= diff --git a/ud/confirmFact.go b/ud/confirmFact.go index ab00ea74c..de56805f6 100644 --- a/ud/confirmFact.go +++ b/ud/confirmFact.go @@ -1,6 +1,7 @@ package ud import ( + "github.com/pkg/errors" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/comms/messages" @@ -10,15 +11,23 @@ type confirmFactComm interface { SendConfirmFact(host *connect.Host, message *pb.FactConfirmRequest) (*messages.Ack, error) } -func (m *Manager) SendConfirmFact(confirmationID, code string) (*messages.Ack, error) { - return m.confirmFact(confirmationID, code, m.comms) +func (m *Manager) SendConfirmFact(confirmationID, code string) error { + if err := m.confirmFact(confirmationID, code, m.comms); err!=nil{ + return errors.WithMessage(err, "Failed to confirm fact") + } + return nil } -func (m *Manager) confirmFact(confirmationID, code string, comm confirmFactComm) (*messages.Ack, error) { +func (m *Manager) confirmFact(confirmationID, code string, comm confirmFactComm) error { + if !m.IsRegistered(){ + return errors.New("Failed to confirm fact: " + + "client is not registered") + } + msg := &pb.FactConfirmRequest{ ConfirmationID: confirmationID, Code: code, } - - return comm.SendConfirmFact(m.host, msg) + _, err := comm.SendConfirmFact(m.host, msg) + return err } diff --git a/ud/confirmFact_test.go b/ud/confirmFact_test.go index b048ad7e8..5645898e8 100644 --- a/ud/confirmFact_test.go +++ b/ud/confirmFact_test.go @@ -26,9 +26,12 @@ func TestManager_confirmFact(t *testing.T) { t.Fatalf("Could not create a new host: %+v", err) } + isReg := uint32(1) + // Set up manager m := &Manager{ host: host, + registered:&isReg, } c := &testComm{} @@ -38,16 +41,11 @@ func TestManager_confirmFact(t *testing.T) { Code: "1234", } - msg, err := m.confirmFact(expectedRequest.ConfirmationID, expectedRequest.Code, c) + err = m.confirmFact(expectedRequest.ConfirmationID, expectedRequest.Code, c) if err != nil { t.Errorf("confirmFact() returned an error: %+v", err) } - if !reflect.DeepEqual(*msg, messages.Ack{}) { - t.Errorf("confirmFact() did not return the expected Ack message."+ - "\n\texpected: %+v\n\treceived: %+v", messages.Ack{}, *msg) - } - if !reflect.DeepEqual(expectedRequest, c.request) { t.Errorf("end point did not recieve the expected request."+ "\n\texpected: %+v\n\treceived: %+v", expectedRequest, c.request) diff --git a/ud/lookup.go b/ud/lookup.go index 9730bedd0..7733a1523 100644 --- a/ud/lookup.go +++ b/ud/lookup.go @@ -21,11 +21,7 @@ func (m *Manager) lookupProcess(c chan message.Receive, quitCh <-chan struct{}) case <-quitCh: return case response := <-c: - // Edge check the encryption - if response.Encryption != message.E2E { - jww.WARN.Printf("Dropped a lookup response from user " + - "discovery due to incorrect encryption") - } + // Unmarshal the message lookupResponse := &LookupResponse{} @@ -61,6 +57,11 @@ func (m *Manager) lookupProcess(c chan message.Receive, quitCh <-chan struct{}) // system or returns by the timeout. func (m *Manager) Lookup(uid *id.ID, callback lookupCallback, timeout time.Duration) error { + if !m.IsRegistered(){ + return errors.New("Failed to lookup: " + + "client is not registered") + } + // Get the ID of this comm so it can be connected to its response commID := m.getCommID() @@ -88,7 +89,7 @@ func (m *Manager) Lookup(uid *id.ID, callback lookupCallback, timeout time.Durat m.inProgressLookupMux.Unlock() // Send the request - rounds, _, err := m.net.SendE2E(msg, params.GetDefaultE2E()) + rounds, err := m.net.SendUnsafe(msg, params.GetDefaultUnsafe()) if err != nil { return errors.WithMessage(err, "Failed to send the lookup request") } diff --git a/ud/lookup_test.go b/ud/lookup_test.go index 791e50046..d732c7004 100644 --- a/ud/lookup_test.go +++ b/ud/lookup_test.go @@ -27,6 +27,8 @@ import ( // Happy path. func TestManager_Lookup(t *testing.T) { + isReg := uint32(1) + // Set up manager m := &Manager{ rng: fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG), @@ -35,6 +37,7 @@ func TestManager_Lookup(t *testing.T) { udID: &id.UDB, inProgressLookup: map[uint64]chan *LookupResponse{}, net: newTestNetworkManager(t), + registered: &isReg, } // Generate callback function @@ -113,6 +116,7 @@ func TestManager_Lookup(t *testing.T) { // Error path: the callback returns an error. func TestManager_Lookup_CallbackError(t *testing.T) { + isReg := uint32(1) // Set up manager m := &Manager{ rng: fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG), @@ -121,6 +125,7 @@ func TestManager_Lookup_CallbackError(t *testing.T) { udID: &id.UDB, inProgressLookup: map[uint64]chan *LookupResponse{}, net: newTestNetworkManager(t), + registered: &isReg, } // Generate callback function @@ -174,6 +179,7 @@ func TestManager_Lookup_CallbackError(t *testing.T) { // Error path: the round event chan times out. func TestManager_Lookup_EventChanTimeout(t *testing.T) { + isReg := uint32(1) // Set up manager m := &Manager{ rng: fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG), @@ -182,6 +188,7 @@ func TestManager_Lookup_EventChanTimeout(t *testing.T) { udID: &id.UDB, inProgressLookup: map[uint64]chan *LookupResponse{}, net: newTestNetworkManager(t), + registered: &isReg, } // Generate callback function @@ -226,6 +233,7 @@ func TestManager_Lookup_EventChanTimeout(t *testing.T) { // Happy path. func TestManager_lookupProcess(t *testing.T) { + isReg := uint32(1) m := &Manager{ rng: fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG), grp: cyclic.NewGroup(large.NewInt(107), large.NewInt(2)), @@ -233,6 +241,7 @@ func TestManager_lookupProcess(t *testing.T) { udID: &id.UDB, inProgressLookup: map[uint64]chan *LookupResponse{}, net: newTestNetworkManager(t), + registered: &isReg, } c := make(chan message.Receive) @@ -280,6 +289,7 @@ func TestManager_lookupProcess(t *testing.T) { // Error path: dropped lookup response due to incorrect message.Receive. func TestManager_lookupProcess_NoLookupResponse(t *testing.T) { + isReg := uint32(1) m := &Manager{ rng: fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG), grp: cyclic.NewGroup(large.NewInt(107), large.NewInt(2)), @@ -287,6 +297,7 @@ func TestManager_lookupProcess_NoLookupResponse(t *testing.T) { udID: &id.UDB, inProgressLookup: map[uint64]chan *LookupResponse{}, net: newTestNetworkManager(t), + registered: &isReg, } c := make(chan message.Receive) @@ -329,8 +340,16 @@ func (t *testNetworkManager) SendE2E(m message.Send, _ params.E2E) ([]id.Round, return rounds, e2e.MessageID{}, nil } -func (t *testNetworkManager) SendUnsafe(message.Send, params.Unsafe) ([]id.Round, error) { - return nil, nil +func (t *testNetworkManager) SendUnsafe(m message.Send, _ params.Unsafe) ([]id.Round, error) { + rounds := []id.Round{ + id.Round(0), + id.Round(1), + id.Round(2), + } + + t.msg = m + + return rounds, nil } func (t *testNetworkManager) SendCMIX(format.Message, params.CMIX) (id.Round, error) { diff --git a/ud/manager.go b/ud/manager.go index fac9e1eb8..e0c379170 100644 --- a/ud/manager.go +++ b/ud/manager.go @@ -1,6 +1,7 @@ package ud import ( + "github.com/pkg/errors" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/stoppable" @@ -15,28 +16,88 @@ import ( ) type Manager struct { + //external comms *client.Comms - host *connect.Host - privKey *rsa.PrivateKey rng *fastRNG.StreamGenerator - grp *cyclic.Group sw interfaces.Switchboard storage *storage.Session + net interfaces.NetworkManager + //loaded from external access udID *id.ID + privKey *rsa.PrivateKey + grp *cyclic.Group + //internal maps + host *connect.Host inProgressLookup map[uint64]chan *LookupResponse inProgressLookupMux sync.RWMutex inProgressSearch map[uint64]chan *SearchResponse - inProgressSearchMux sync.Mutex - - net interfaces.NetworkManager + inProgressSearchMux sync.RWMutex + //State tracking commID uint64 commIDLock sync.Mutex + + registered *uint32 +} + +// New manager builds a new user discovery manager. It requires that an +// updated NDF is available and will error if one is not. +func NewManager(comms *client.Comms, rng *fastRNG.StreamGenerator, + sw interfaces.Switchboard, storage *storage.Session, + net interfaces.NetworkManager)(*Manager, error){ + + m := &Manager{ + comms: comms, + rng: rng, + sw: sw, + storage: storage, + net: net, + inProgressLookup: make(map[uint64]chan *LookupResponse), + inProgressSearch: make(map[uint64]chan *SearchResponse), + } + + var err error + + //check that user discovery is available in the ndf + def := net.GetInstance().GetPartialNdf().Get() + if m.udID, err = id.Unmarshal(def.UDB.ID); err!=nil{ + return nil, errors.WithMessage(err,"NDF does not have User " + + "Discovery information, is there network access?: ID could not be " + + "unmarshaled") + } + + if def.UDB.Cert==""{ + return nil, errors.New("NDF does not have User " + + "Discovery information, is there network access?: Cert " + + "not present") + } + + //create the user discovery host object + if m.host, err = m.comms.AddHost(m.udID, def.UDB.Address,[]byte(def.UDB.Cert), + connect.GetDefaultHostParams()); err!=nil{ + return nil, errors.WithMessage(err, "User Discovery host " + + "object could not be constructed") + } + + //get the commonly used data from storage + m.privKey = m.storage.GetUser().RSA + + //load the last used commID + m.loadCommID() + + //load if the client is registered + m.loadRegistered() + + //store the pointer to the group locally for easy access + m.grp = m.storage.E2e().GetGroup() + + return m, nil } + func (m *Manager) StartProcesses() stoppable.Stoppable { lookupStop := stoppable.NewSingle("UDLookup") @@ -44,7 +105,17 @@ func (m *Manager) StartProcesses() stoppable.Stoppable { m.sw.RegisterChannel("UDLookupResponse", m.udID, message.UdLookupResponse, lookupChan) go m.lookupProcess(lookupChan, lookupStop.Quit()) + searchStop := stoppable.NewSingle("UDSearch") + searchChan := make(chan message.Receive, 100) + m.sw.RegisterChannel("UDSearchResponse", m.udID, message.UdSearchResponse, searchChan) + go m.searchProcess(searchChan, searchStop.Quit()) + udMulti := stoppable.NewMulti("UD") udMulti.Add(lookupStop) + udMulti.Add(searchStop) return lookupStop } + + + + diff --git a/ud/register.go b/ud/register.go index 505c08d88..bac837203 100644 --- a/ud/register.go +++ b/ud/register.go @@ -23,6 +23,11 @@ func (m *Manager) Register(username string) error { // register registers a user with user discovery with a specified comm for // easier testing. func (m *Manager) register(username string, comm registerUserComms) error { + if m.IsRegistered(){ + return errors.New("cannot register client with User Discovery: " + + "client is already registered") + } + var err error user := m.storage.User() cryptoUser := m.storage.User().GetCryptographicIdentity() @@ -71,5 +76,9 @@ func (m *Manager) register(username string, comm registerUserComms) error { // Register user with user discovery _, err = comm.SendRegisterUser(m.host, msg) + if err==nil{ + err = m.setRegistered() + } + return err } diff --git a/ud/register_test.go b/ud/register_test.go index 274eebe9c..8591ff9de 100644 --- a/ud/register_test.go +++ b/ud/register_test.go @@ -34,11 +34,14 @@ func TestManager_register(t *testing.T) { t.Fatalf("Could not create a new host: %+v", err) } + isReg := uint32(0) + // Set up manager m := &Manager{ host: host, rng: fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG), storage: storage.InitTestingSession(t), + registered: &isReg, } c := &testRegisterComm{} diff --git a/ud/registered.go b/ud/registered.go new file mode 100644 index 000000000..688d789eb --- /dev/null +++ b/ud/registered.go @@ -0,0 +1,56 @@ +package ud + +import ( + "encoding/binary" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/storage/versioned" + "sync/atomic" + "time" +) + +const isRegisteredKey = "isRegisteredKey" +const isRegisteredVersion = 0 + +// loadRegistered loads from storage if the client is registered with user +// discovery. +func (m *Manager) loadRegistered() { + + obj, err := m.storage.Get(isRegisteredKey) + if err != nil { + jww.INFO.Printf("Failed to load is registered; " + + "assuming un-registered: %s", err) + return + } + + isReg := binary.BigEndian.Uint32(obj.Data) + m.registered = &isReg +} + +// IsRegistered returns if the client is registered with user discovery +func (m *Manager) IsRegistered()bool { + return atomic.LoadUint32(m.registered)==1 +} + +// IsRegistered returns if the client is registered with user discovery +func (m *Manager) setRegistered()error { + if !atomic.CompareAndSwapUint32(m.registered,0,1){ + return errors.New("cannot register with User Discovery when " + + "already registered") + } + + data := make([]byte, 4) + binary.BigEndian.PutUint32(data, 1) + + obj := &versioned.Object{ + Version: isRegisteredVersion, + Timestamp: time.Now(), + Data: data, + } + + if err := m.storage.Set(isRegisteredKey, obj); err != nil { + jww.FATAL.Panicf("Failed to store that the client is " + + "registered: %+v", err) + } + return nil +} diff --git a/ud/remove.go b/ud/remove.go index 46a2f0ac5..147cf4964 100644 --- a/ud/remove.go +++ b/ud/remove.go @@ -1,6 +1,7 @@ package ud import ( + "github.com/pkg/errors" "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/primitives/fact" "gitlab.com/xx_network/comms/connect" @@ -16,6 +17,11 @@ func (m *Manager) RemoveFact(fact fact.Fact) error { } func (m *Manager) removeFact(fact fact.Fact, rFC removeFactComms) error { + if !m.IsRegistered(){ + return errors.New("Failed to remove fact: " + + "client is not registered") + } + // Construct the message to send // Convert our Fact to a mixmessages Fact for sending mmFact := mixmessages.Fact{ diff --git a/ud/remove_test.go b/ud/remove_test.go index 573893efc..cedc4a53c 100644 --- a/ud/remove_test.go +++ b/ud/remove_test.go @@ -29,10 +29,13 @@ func TestRemoveFact(t *testing.T) { t.Fatal(err) } + isReg := uint32(1) + m := Manager{ comms: nil, host: h, privKey: cpk, + registered: &isReg, } f := fact.Fact{ diff --git a/ud/search.go b/ud/search.go index 6dc632aa0..029b38f57 100644 --- a/ud/search.go +++ b/ud/search.go @@ -3,6 +3,7 @@ package ud import ( "github.com/golang/protobuf/proto" "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/contact" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" @@ -16,8 +17,51 @@ import ( type searchCallback func([]contact.Contact, error) + +func (m *Manager) searchProcess(c chan message.Receive, quitCh <-chan struct{}) { + for true { + select { + case <-quitCh: + return + case response := <-c: + // Unmarshal the message + searchResponse := &SearchResponse{} + if err := proto.Unmarshal(response.Payload, searchResponse); err != nil { + jww.WARN.Printf("Dropped a search response from user "+ + "discovery due to failed unmarshal: %s", err) + } + + // Get the appropriate channel from the lookup + m.inProgressSearchMux.RLock() + ch, ok := m.inProgressSearch[searchResponse.CommID] + m.inProgressSearchMux.RUnlock() + if !ok { + jww.WARN.Printf("Dropped a search response from user "+ + "discovery due to unknown comm ID: %d", + searchResponse.CommID) + } + + // Send the response on the correct channel + // Drop if the send cannot be completed + select { + case ch <- searchResponse: + default: + jww.WARN.Printf("Dropped a search response from user "+ + "discovery due to failure to transmit to handling thread: "+ + "commID: %d", searchResponse.CommID) + } + } + } +} + + // Search... func (m *Manager) Search(list fact.FactList, callback searchCallback, timeout time.Duration) error { + if !m.IsRegistered(){ + return errors.New("Failed to search: " + + "client is not registered") + } + // Get the ID of this comm so it can be connected to its response commID := m.getCommID() @@ -47,7 +91,7 @@ func (m *Manager) Search(list fact.FactList, callback searchCallback, timeout ti m.inProgressSearchMux.Unlock() // Send the request - rounds, _, err := m.net.SendE2E(msg, params.GetDefaultE2E()) + rounds, err := m.net.SendUnsafe(msg, params.GetDefaultUnsafe()) if err != nil { return errors.WithMessage(err, "Failed to send the search request") } diff --git a/ud/search_test.go b/ud/search_test.go index ee6b77658..4b4d7eba0 100644 --- a/ud/search_test.go +++ b/ud/search_test.go @@ -19,6 +19,7 @@ import ( // Happy path. func TestManager_Search(t *testing.T) { + isReg := uint32(1) // Set up manager m := &Manager{ rng: fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG), @@ -27,6 +28,7 @@ func TestManager_Search(t *testing.T) { udID: &id.UDB, inProgressSearch: map[uint64]chan *SearchResponse{}, net: newTestNetworkManager(t), + registered: &isReg, } // Generate callback function @@ -122,6 +124,7 @@ func TestManager_Search(t *testing.T) { // Error path: the callback returns an error. func TestManager_Search_CallbackError(t *testing.T) { + isReg := uint32(1) // Set up manager m := &Manager{ rng: fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG), @@ -130,6 +133,7 @@ func TestManager_Search_CallbackError(t *testing.T) { udID: &id.UDB, inProgressSearch: map[uint64]chan *SearchResponse{}, net: newTestNetworkManager(t), + registered: &isReg, } // Generate callback function @@ -188,6 +192,7 @@ func TestManager_Search_CallbackError(t *testing.T) { // Error path: the round event chan times out. func TestManager_Search_EventChanTimeout(t *testing.T) { + isReg := uint32(1) // Set up manager m := &Manager{ rng: fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG), @@ -196,6 +201,7 @@ func TestManager_Search_EventChanTimeout(t *testing.T) { udID: &id.UDB, inProgressSearch: map[uint64]chan *SearchResponse{}, net: newTestNetworkManager(t), + registered: &isReg, } // Generate callback function @@ -242,3 +248,96 @@ func TestManager_Search_EventChanTimeout(t *testing.T) { t.Error("Failed to delete SearchResponse from inProgressSearch.") } } + +// Happy path. +func TestManager_searchProcess(t *testing.T) { + isReg := uint32(1) + m := &Manager{ + rng: fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG), + grp: cyclic.NewGroup(large.NewInt(107), large.NewInt(2)), + storage: storage.InitTestingSession(t), + udID: &id.UDB, + inProgressSearch: map[uint64]chan *SearchResponse{}, + net: newTestNetworkManager(t), + registered: &isReg, + } + + c := make(chan message.Receive) + quitCh := make(chan struct{}) + + // Generate expected Send message + payload, err := proto.Marshal(&SearchSend{ + Fact: []*HashFact{&HashFact{ + Hash: []byte{1}, + Type: 0, + }}, + CommID: m.commID, + }) + if err != nil { + t.Fatalf("Failed to marshal LookupSend: %+v", err) + } + + m.inProgressSearch[m.commID] = make(chan *SearchResponse, 1) + + // Trigger response chan + go func() { + time.Sleep(1 * time.Millisecond) + c <- message.Receive{ + Payload: payload, + Encryption: message.E2E, + } + time.Sleep(1 * time.Millisecond) + quitCh <- struct{}{} + }() + + m.searchProcess(c, quitCh) + + select { + case response := <-m.inProgressSearch[m.commID]: + expectedResponse := &SearchResponse{} + if err := proto.Unmarshal(payload, expectedResponse); err != nil { + t.Fatalf("Failed to unmarshal payload: %+v", err) + } + + if !reflect.DeepEqual(expectedResponse, response) { + t.Errorf("Recieved unexpected response."+ + "\n\texpected: %+v\n\trecieved: %+v", expectedResponse, response) + } + case <-time.After(100 * time.Millisecond): + t.Error("Response not sent.") + } +} + +// Error path: dropped lookup response due to incorrect message.Receive. +func TestManager_searchpProcess_NoSearchResponse(t *testing.T) { + isReg := uint32(1) + m := &Manager{ + rng: fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG), + grp: cyclic.NewGroup(large.NewInt(107), large.NewInt(2)), + storage: storage.InitTestingSession(t), + udID: &id.UDB, + inProgressSearch: map[uint64]chan *SearchResponse{}, + net: newTestNetworkManager(t), + registered: &isReg, + } + + c := make(chan message.Receive) + quitCh := make(chan struct{}) + + // Trigger response chan + go func() { + time.Sleep(1 * time.Millisecond) + c <- message.Receive{} + time.Sleep(1 * time.Millisecond) + quitCh <- struct{}{} + }() + + m.lookupProcess(c, quitCh) + + select { + case response := <-m.inProgressSearch[m.commID]: + t.Errorf("Received unexpected response: %+v", response) + case <-time.After(10 * time.Millisecond): + return + } +} \ No newline at end of file -- GitLab