Skip to content
Snippets Groups Projects
Commit 78bb5649 authored by Josh Brooks's avatar Josh Brooks
Browse files

Respond to MR review

parent 377af159
No related branches found
No related tags found
No related merge requests found
...@@ -28,10 +28,10 @@ require ( ...@@ -28,10 +28,10 @@ require (
gitlab.com/xx_network/crypto v0.0.5-0.20210107183440-804e0f8b7d22 gitlab.com/xx_network/crypto v0.0.5-0.20210107183440-804e0f8b7d22
gitlab.com/xx_network/primitives v0.0.4-0.20210202001929-8bf8bf35a234 gitlab.com/xx_network/primitives v0.0.4-0.20210202001929-8bf8bf35a234
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect golang.org/x/net v0.0.0-20201224014010-6772e930b67b
golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 // indirect golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 // indirect
google.golang.org/genproto v0.0.0-20210105202744-fe13368bc0e1 // indirect google.golang.org/genproto v0.0.0-20210105202744-fe13368bc0e1 // indirect
google.golang.org/grpc v1.34.0 // indirect google.golang.org/grpc v1.34.0
google.golang.org/protobuf v1.25.0 google.golang.org/protobuf v1.25.0
gopkg.in/ini.v1 v1.62.0 // indirect gopkg.in/ini.v1 v1.62.0 // indirect
) )
......
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package ephemeral
import "gitlab.com/elixxir/client/storage/reception"
type IdentityStoreInterface interface {
IsAlreadyIdentity(identity reception.Identity) bool
AddIdentity(identity reception.Identity) error
InsertIdentity(identity reception.Identity) error
}
type IdentityStore struct {
*reception.Store
tracker map[reception.Identity]bool
}
func NewTracker(store *reception.Store) *IdentityStore {
return &IdentityStore{
tracker: make(map[reception.Identity]bool),
Store: store,
}
}
func (is *IdentityStore) IsAlreadyIdentity(identity reception.Identity) bool {
return is.tracker[identity]
}
func (is *IdentityStore) InsertIdentity(identity reception.Identity) error {
is.tracker[identity] = true
return is.AddIdentity(identity)
}
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package ephemeral
import (
"gitlab.com/elixxir/client/globals"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/comms/testkeys"
"gitlab.com/elixxir/crypto/e2e"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/ndf"
"gitlab.com/xx_network/primitives/utils"
"testing"
)
// testNetworkManager is a test implementation of NetworkManager interface.
type testNetworkManager struct {
instance *network.Instance
msg message.Send
}
func (t *testNetworkManager) SendE2E(m message.Send, _ params.E2E) ([]id.Round,
e2e.MessageID, error) {
rounds := []id.Round{
id.Round(0),
id.Round(1),
id.Round(2),
}
t.msg = m
return rounds, e2e.MessageID{}, nil
}
func (t *testNetworkManager) SendUnsafe(m message.Send, _ params.Unsafe) ([]id.Round, error) {
rounds := []id.Round{
id.Round(0),
id.Round(1),
id.Round(2),
}
t.msg = m
return rounds, nil
}
func (t *testNetworkManager) SendCMIX(format.Message, *id.ID, params.CMIX) (id.Round, ephemeral.Id, error) {
return 0, ephemeral.Id{}, nil
}
func (t *testNetworkManager) GetInstance() *network.Instance {
return t.instance
}
func (t *testNetworkManager) GetHealthTracker() interfaces.HealthTracker {
return nil
}
func (t *testNetworkManager) Follow() (stoppable.Stoppable, error) {
return nil, nil
}
func (t *testNetworkManager) CheckGarbledMessages() {}
func NewTestNetworkManager(i interface{}) interfaces.NetworkManager {
switch i.(type) {
case *testing.T, *testing.M, *testing.B:
break
default:
globals.Log.FATAL.Panicf("initTesting is restricted to testing only."+
"Got %T", i)
}
commsManager := connect.NewManagerTesting(i)
cert, err := utils.ReadFile(testkeys.GetNodeCertPath())
if err != nil {
globals.Log.FATAL.Panicf("Failed to create new test Instance: %v", err)
}
commsManager.AddHost(&id.Permissioning, "", cert, connect.GetDefaultHostParams())
instanceComms := &connect.ProtoComms{
Manager: commsManager,
}
thisInstance, err := network.NewInstanceTesting(instanceComms, getNDF(), getNDF(), nil, nil, i)
if err != nil {
globals.Log.FATAL.Panicf("Failed to create new test Instance: %v", err)
}
thisManager := &testNetworkManager{instance: thisInstance}
return thisManager
}
func getNDF() *ndf.NetworkDefinition {
return &ndf.NetworkDefinition{
E2E: ndf.Group{
Prime: "E2EE983D031DC1DB6F1A7A67DF0E9A8E5561DB8E8D49413394C049B" +
"7A8ACCEDC298708F121951D9CF920EC5D146727AA4AE535B0922C688B55B3DD2AE" +
"DF6C01C94764DAB937935AA83BE36E67760713AB44A6337C20E7861575E745D31F" +
"8B9E9AD8412118C62A3E2E29DF46B0864D0C951C394A5CBBDC6ADC718DD2A3E041" +
"023DBB5AB23EBB4742DE9C1687B5B34FA48C3521632C4A530E8FFB1BC51DADDF45" +
"3B0B2717C2BC6669ED76B4BDD5C9FF558E88F26E5785302BEDBCA23EAC5ACE9209" +
"6EE8A60642FB61E8F3D24990B8CB12EE448EEF78E184C7242DD161C7738F32BF29" +
"A841698978825B4111B4BC3E1E198455095958333D776D8B2BEEED3A1A1A221A6E" +
"37E664A64B83981C46FFDDC1A45E3D5211AAF8BFBC072768C4F50D7D7803D2D4F2" +
"78DE8014A47323631D7E064DE81C0C6BFA43EF0E6998860F1390B5D3FEACAF1696" +
"015CB79C3F9C2D93D961120CD0E5F12CBB687EAB045241F96789C38E89D796138E" +
"6319BE62E35D87B1048CA28BE389B575E994DCA755471584A09EC723742DC35873" +
"847AEF49F66E43873",
Generator: "2",
},
CMIX: ndf.Group{
Prime: "9DB6FB5951B66BB6FE1E140F1D2CE5502374161FD6538DF1648218642F0B5C48" +
"C8F7A41AADFA187324B87674FA1822B00F1ECF8136943D7C55757264E5A1A44F" +
"FE012E9936E00C1D3E9310B01C7D179805D3058B2A9F4BB6F9716BFE6117C6B5" +
"B3CC4D9BE341104AD4A80AD6C94E005F4B993E14F091EB51743BF33050C38DE2" +
"35567E1B34C3D6A5C0CEAA1A0F368213C3D19843D0B4B09DCB9FC72D39C8DE41" +
"F1BF14D4BB4563CA28371621CAD3324B6A2D392145BEBFAC748805236F5CA2FE" +
"92B871CD8F9C36D3292B5509CA8CAA77A2ADFC7BFD77DDA6F71125A7456FEA15" +
"3E433256A2261C6A06ED3693797E7995FAD5AABBCFBE3EDA2741E375404AE25B",
Generator: "5C7FF6B06F8F143FE8288433493E4769C4D988ACE5BE25A0E24809670716C613" +
"D7B0CEE6932F8FAA7C44D2CB24523DA53FBE4F6EC3595892D1AA58C4328A06C4" +
"6A15662E7EAA703A1DECF8BBB2D05DBE2EB956C142A338661D10461C0D135472" +
"085057F3494309FFA73C611F78B32ADBB5740C361C9F35BE90997DB2014E2EF5" +
"AA61782F52ABEB8BD6432C4DD097BC5423B285DAFB60DC364E8161F4A2A35ACA" +
"3A10B1C4D203CC76A470A33AFDCBDD92959859ABD8B56E1725252D78EAC66E71" +
"BA9AE3F1DD2487199874393CD4D832186800654760E1E34C09E4D155179F9EC0" +
"DC4473F996BDCE6EED1CABED8B6F116F7AD9CF505DF0F998E34AB27514B0FFE7",
},
}
}
...@@ -13,28 +13,30 @@ import ( ...@@ -13,28 +13,30 @@ import (
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/reception" "gitlab.com/elixxir/client/storage/reception"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/comms/network"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
"time" "time"
) )
const ephemeralIdSie = 64
const validityGracePeriod = 5 * time.Minute const validityGracePeriod = 5 * time.Minute
const TimestampKey = "IDTrackingTimestamp" const TimestampKey = "IDTrackingTimestamp"
const ephemeralStoppable = "EphemeralCheck" const ephemeralStoppable = "EphemeralCheck"
// Track runs a thread which checks for past and present ephemeral ids // Track runs a thread which checks for past and present ephemeral ids
func Track(session *storage.Session, ourId *id.ID, identityStore *IdentityStore) stoppable.Stoppable { func Track(session *storage.Session, instance *network.Instance, ourId *id.ID) stoppable.Stoppable {
stop := stoppable.NewSingle(ephemeralStoppable) stop := stoppable.NewSingle(ephemeralStoppable)
go track(session, ourId, stop, identityStore) go track(session, instance, ourId, stop)
return stop return stop
} }
// track is a thread which continuously processes ephemeral ids. // track is a thread which continuously processes ephemeral ids.
// If any error occurs, the thread crashes // If any error occurs, the thread crashes
func track(session *storage.Session, ourId *id.ID, stop *stoppable.Single, identityStore *IdentityStore) { func track(session *storage.Session, instance *network.Instance, ourId *id.ID, stop *stoppable.Single) {
identityStore := session.Reception()
// Get the latest timestamp from store // Get the latest timestamp from store
lastTimestampObj, err := session.Get(TimestampKey) lastTimestampObj, err := session.Get(TimestampKey)
if err != nil { if err != nil {
...@@ -47,9 +49,18 @@ func track(session *storage.Session, ourId *id.ID, stop *stoppable.Single, ident ...@@ -47,9 +49,18 @@ func track(session *storage.Session, ourId *id.ID, stop *stoppable.Single, ident
} }
for true { for true {
// Generates the IDs since the last track
now := time.Now() now := time.Now()
protoIds, err := getUpcomingIDs(ourId, now, lastCheck)
// Pull out the round information
ri, err := instance.GetRound(instance.GetLastRoundID())
if err != nil {
globals.Log.FATAL.Panicf("Could not pull round information: %v", err)
}
// Generates the IDs since the last track
protoIds, err := ephemeral.GetIdsByRange(ourId, uint(ri.AddressSpaceSize),
now.UnixNano(), now.Sub(lastCheck))
if err != nil { if err != nil {
globals.Log.FATAL.Panicf("Could not generate "+ globals.Log.FATAL.Panicf("Could not generate "+
"upcoming IDs: %v", err) "upcoming IDs: %v", err)
...@@ -61,9 +72,9 @@ func track(session *storage.Session, ourId *id.ID, stop *stoppable.Single, ident ...@@ -61,9 +72,9 @@ func track(session *storage.Session, ourId *id.ID, stop *stoppable.Single, ident
// Add identities to storage if unique // Add identities to storage if unique
for _, identity := range identities { for _, identity := range identities {
// Track if identity has been generated already // Track if identity has been generated already
if !identityStore.IsAlreadyIdentity(identity) { if identity.StartValid.After(lastCheck) {
// If not not, insert identity into store // If not not, insert identity into store
if err = identityStore.InsertIdentity(identity); err != nil { if err = identityStore.AddIdentity(identity); err != nil {
globals.Log.FATAL.Panicf("Could not insert "+ globals.Log.FATAL.Panicf("Could not insert "+
"identity: %v", err) "identity: %v", err)
} }
...@@ -104,9 +115,9 @@ func generateIdentities(protoIds []ephemeral.ProtoIdentity, ...@@ -104,9 +115,9 @@ func generateIdentities(protoIds []ephemeral.ProtoIdentity,
// Add identities for every ephemeral id // Add identities for every ephemeral id
for _, eid := range protoIds { for _, eid := range protoIds {
// Expand the grace period // Expand the grace period for both start and end
eid.End.Add(validityGracePeriod) eid.End.Add(validityGracePeriod)
eid.Start.Add(-validityGracePeriod)
identities = append(identities, reception.Identity{ identities = append(identities, reception.Identity{
EphId: eid.Id, EphId: eid.Id,
Source: ourId, Source: ourId,
...@@ -143,15 +154,6 @@ func MarshalTimestamp(timeToStore time.Time) (*versioned.Object, error) { ...@@ -143,15 +154,6 @@ func MarshalTimestamp(timeToStore time.Time) (*versioned.Object, error) {
}, err }, err
} }
// Wrapper for GetIdsByRange. Generates ephemeral ids in the time period
// since the last track
func getUpcomingIDs(ourId *id.ID, now,
lastCheck time.Time) ([]ephemeral.ProtoIdentity, error) {
return ephemeral.GetIdsByRange(ourId, ephemeralIdSie,
now.UnixNano(), now.Sub(lastCheck))
}
// Helper function which calculates the time for the ticker based // Helper function which calculates the time for the ticker based
// off of the last ephemeral ID to expire // off of the last ephemeral ID to expire
func calculateTickerTime(baseIDs []ephemeral.ProtoIdentity) time.Duration { func calculateTickerTime(baseIDs []ephemeral.ProtoIdentity) time.Duration {
......
...@@ -8,9 +8,19 @@ ...@@ -8,9 +8,19 @@
package ephemeral package ephemeral
import ( import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/comms/testkeys"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/comms/signature"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/crypto/signature/rsa"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/utils"
"testing" "testing"
"time" "time"
) )
...@@ -18,7 +28,10 @@ import ( ...@@ -18,7 +28,10 @@ import (
// Smoke test for Track function // Smoke test for Track function
func TestCheck(t *testing.T) { func TestCheck(t *testing.T) {
session := storage.InitTestingSession(t) session := storage.InitTestingSession(t)
identityStore := NewTracker(session.Reception()) instance := NewTestNetworkManager(t)
if err := setupInstance(instance); err != nil {
t.Errorf("Could not set up instance: %v", err)
}
/// Store a mock initial timestamp the store /// Store a mock initial timestamp the store
now := time.Now() now := time.Now()
...@@ -33,7 +46,7 @@ func TestCheck(t *testing.T) { ...@@ -33,7 +46,7 @@ func TestCheck(t *testing.T) {
} }
ourId := id.NewIdFromBytes([]byte("Sauron"), t) ourId := id.NewIdFromBytes([]byte("Sauron"), t)
stop := Track(session, ourId, identityStore) stop := Track(session, instance.GetInstance(), ourId)
err = stop.Close(3 * time.Second) err = stop.Close(3 * time.Second)
if err != nil { if err != nil {
...@@ -46,40 +59,50 @@ func TestCheck(t *testing.T) { ...@@ -46,40 +59,50 @@ func TestCheck(t *testing.T) {
func TestCheck_Thread(t *testing.T) { func TestCheck_Thread(t *testing.T) {
session := storage.InitTestingSession(t) session := storage.InitTestingSession(t)
instance := NewTestNetworkManager(t)
if err := setupInstance(instance); err != nil {
t.Errorf("Could not set up instance: %v", err)
}
ourId := id.NewIdFromBytes([]byte("Sauron"), t) ourId := id.NewIdFromBytes([]byte("Sauron"), t)
stop := stoppable.NewSingle(ephemeralStoppable) stop := stoppable.NewSingle(ephemeralStoppable)
identityStore := NewTracker(session.Reception())
/// Store a mock initial timestamp the store /// Store a mock initial timestamp the store
now := time.Now() now := time.Now()
twoDaysAgo := now.Add(-2 * 24 * time.Hour) yesterday := now.Add(-24 * time.Hour)
twoDaysTimestamp, err := MarshalTimestamp(twoDaysAgo) yesterdayTimestamp, err := MarshalTimestamp(yesterday)
if err != nil { if err != nil {
t.Errorf("Could not marshal timestamp for test setup: %v", err) t.Errorf("Could not marshal timestamp for test setup: %v", err)
} }
err = session.Set(TimestampKey, twoDaysTimestamp) err = session.Set(TimestampKey, yesterdayTimestamp)
if err != nil { if err != nil {
t.Errorf("Could not set mock timestamp for test setup: %v", err) t.Errorf("Could not set mock timestamp for test setup: %v", err)
} }
// Run the tracker // Run the tracker
go func() { go func() {
track(session, ourId, stop, identityStore) track(session, instance.GetInstance(), ourId, stop)
}() }()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// Manually generate identities // Manually generate identities
eids, err := getUpcomingIDs(ourId, time.Now(), twoDaysAgo)
eids, err := ephemeral.GetIdsByRange(ourId, 64, now.UnixNano(), now.Sub(yesterday))
if err != nil { if err != nil {
t.Errorf("Could not generate upcoming ids: %v", err) t.Errorf("Could not generate upcoming ids: %v", err)
} }
identities := generateIdentities(eids, ourId) identities := generateIdentities(eids, ourId)
rngStreamGen := fastRNG.NewStreamGenerator(12, 3, csprng.NewSystemRNG)
retrieved, err := session.Reception().GetIdentity(rngStreamGen.GetStream())
if err != nil {
t.Errorf("Could not retrieve identity: %v", err)
}
// Check if store has been updated for new identities // Check if store has been updated for new identities
if !identityStore.IsAlreadyIdentity(identities[0]) { if identities[0].String() != retrieved.String() {
t.Errorf("Store was not updated for newly generated identies") t.Errorf("Store was not updated for newly generated identies")
} }
...@@ -88,5 +111,39 @@ func TestCheck_Thread(t *testing.T) { ...@@ -88,5 +111,39 @@ func TestCheck_Thread(t *testing.T) {
t.Errorf("Could not close thread: %v", err) t.Errorf("Could not close thread: %v", err)
} }
}
func setupInstance(instance interfaces.NetworkManager) error {
cert, err := utils.ReadFile(testkeys.GetNodeKeyPath())
if err != nil {
return errors.Errorf("Failed to read cert from from file: %v", err)
}
ri := &mixmessages.RoundInfo{
ID: 1,
AddressSpaceSize: 64,
}
testCert, err := rsa.LoadPrivateKeyFromPem(cert)
if err != nil {
return errors.Errorf("Failed to load cert from from file: %v", err)
}
if err = signature.Sign(ri, testCert); err != nil {
return errors.Errorf("Failed to sign round info: %v", err)
}
if err = instance.GetInstance().RoundUpdate(ri); err != nil {
return errors.Errorf("Failed to RoundUpdate from from file: %v", err)
}
ri = &mixmessages.RoundInfo{
ID: 2,
AddressSpaceSize: 64,
}
if err = signature.Sign(ri, testCert); err != nil {
return errors.Errorf("Failed to sign round info: %v", err)
}
if err = instance.GetInstance().RoundUpdate(ri); err != nil {
return errors.Errorf("Failed to RoundUpdate from from file: %v", err)
}
return nil
} }
...@@ -134,8 +134,8 @@ func (m *manager) Follow() (stoppable.Stoppable, error) { ...@@ -134,8 +134,8 @@ func (m *manager) Follow() (stoppable.Stoppable, error) {
return nil, errors.Errorf("Could not store timestamp " + return nil, errors.Errorf("Could not store timestamp " +
"for ephemeral ID tracking: %v", err) "for ephemeral ID tracking: %v", err)
} }
identityStore := ephemeral.NewTracker(m.Session.Reception())
multi.Add(ephemeral.Track(m.Session, m.Comms.Id, identityStore)) multi.Add(ephemeral.Track(m.Session, m.Instance, m.Comms.Id))
//set the running status back to 0 so it can be started again //set the running status back to 0 so it can be started again
closer := stoppable.NewCleanup(multi, func(time.Duration) error { closer := stoppable.NewCleanup(multi, func(time.Duration) error {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment