Skip to content
Snippets Groups Projects
Commit 2fb406e1 authored by Jake Taylor's avatar Jake Taylor :lips:
Browse files

Merge branch 'trackerTests' into 'restructure'

add tests to tracker

See merge request !202
parents 8d81a53e 485f06ff
No related branches found
No related tags found
4 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast,!202add tests to tracker
......@@ -27,23 +27,22 @@ import (
"gitlab.com/xx_network/primitives/netTime"
)
const validityGracePeriod = 5 * time.Minute
const TrackerListKey = "TrackerListKey"
const TrackerListVersion = 0
const TimestampKey = "IDTrackingTimestamp"
const ephemeralStoppable = "EphemeralCheck"
const addressSpaceSizeChanTag = "ephemeralTracker"
var Forever = time.Time{}
const (
validityGracePeriod = 5 * time.Minute
TrackerListKey = "TrackerListKey"
TrackerListVersion = 0
TimestampKey = "IDTrackingTimestamp"
ephemeralStoppable = "EphemeralCheck"
addressSpaceSizeChanTag = "ephemeralTracker"
trackedIDChanSize = 1000
deleteIDChanSize = 1000
)
// DefaultExtraChecks is the default value for ExtraChecks on
// receptionID.Identity.
const DefaultExtraChecks = 10
// DefaultExtraChecks is the default value for ExtraChecks on receptionID.Identity.
DefaultExtraChecks = 10
)
type Tracker interface {
StartProcesses() stoppable.Stoppable
......@@ -55,7 +54,7 @@ type Tracker interface {
type manager struct {
tracked []TrackedID
store *receptionID.Store
ephemeral *receptionID.Store
session storage.Session
newIdentity chan TrackedID
deleteIdentity chan *id.ID
......@@ -123,7 +122,7 @@ func NewOrLoadTracker(session storage.Session, addrSpace address.Space) *manager
jww.FATAL.Panicf("Unable to create new Tracker: %+v", err)
}
t.store = receptionID.NewOrLoadStore(session.GetKV())
t.ephemeral = receptionID.NewOrLoadStore(session.GetKV())
return t
}
......@@ -158,7 +157,7 @@ func (t *manager) RemoveIdentity(id *id.ID) {
// GetEphemeralIdentity returns an ephemeral Identity to poll the network with.
func (t *manager) GetEphemeralIdentity(rng io.Reader, addressSize uint8) (
receptionID.IdentityUse, error) {
return t.store.GetIdentity(rng, addressSize)
return t.ephemeral.GetIdentity(rng, addressSize)
}
// GetIdentity returns a currently tracked identity
......@@ -177,57 +176,79 @@ func (t *manager) track(stop *stoppable.Single) {
// Wait until the ID size is retrieved from the network
addressSize := t.addrSpace.GetAddressSpace()
// Wait for next event
trackerLoop:
for {
edits := false
var toRemove map[int]struct{}
nextEvent := t.tracked[0].ValidUntil
// Process new and old identities
nextEvent := t.processIdentities(addressSize)
// Loop through every tracked ID and see if any operations are needed
// Trigger events early. This will cause generations to happen early as
// well as message pickup. As a result, if there are time sync issues
// between clients, and they begin sending to ephemeral IDs early, then
// messages will still be picked up.
nextUpdate := nextEvent.Add(-validityGracePeriod)
// Sleep until the last ID has expired
select {
case <-time.NewTimer(nextUpdate.Sub(nextUpdate)).C:
case newIdentity := <-t.newIdentity:
// If the identity is old, then update its properties
isOld := false
for i := range t.tracked {
inQuestion := t.tracked[i]
// Generate new ephemeral if is time for it
if netTime.Now().After(inQuestion.NextGeneration) {
edits = true
// Ensure that ephemeral IDs will not be generated after the
// identity is invalid
generateUntil := inQuestion.NextGeneration
if inQuestion.ValidUntil != Forever &&
generateUntil.After(inQuestion.ValidUntil) {
generateUntil = inQuestion.ValidUntil
if inQuestion.Source.Cmp(newIdentity.Source) {
inQuestion.Persistent = newIdentity.Persistent
inQuestion.ValidUntil = newIdentity.ValidUntil
isOld = true
break
}
}
if !isOld {
// Otherwise, add it to the list and run
t.tracked = append(t.tracked, newIdentity)
}
// Generate all not yet existing ephemeral IDs
identities, nextNextGeneration := generateIdentitiesOverRange(
inQuestion.LastGeneration, inQuestion.NextGeneration,
inQuestion.Source, addressSize)
t.save()
continue
// Add all ephemeral IDs to the ephemeral handler
for _, identity := range identities {
// Move up the end time if the source identity is invalid
// before the natural end of the ephemeral identity
if inQuestion.ValidUntil != Forever &&
identity.End.After(inQuestion.ValidUntil) {
identity.End = inQuestion.ValidUntil
case deleteID := <-t.deleteIdentity:
for i := range t.tracked {
inQuestion := t.tracked[i]
if inQuestion.Source.Cmp(deleteID) {
t.tracked = append(t.tracked[:i], t.tracked[i+1:]...)
t.save()
// Requires manual deletion in case identity is deleted before expiration
t.ephemeral.RemoveIdentities(deleteID)
break
}
}
case <-stop.Quit():
t.addrSpace.UnregisterAddressSpaceNotification(addressSpaceSizeChanTag)
stop.ToStopped()
return
}
identity.Ephemeral = !inQuestion.Persistent
if err := t.store.AddIdentity(identity); err != nil {
jww.FATAL.Panicf("Could not insert identity: %+v", err)
}
}
// processIdentities builds and adds new identities and removes old identities from the tracker
// and returns the timestamp of the next ID event
func (t *manager) processIdentities(addressSize uint8) time.Time {
edits := false
toRemove := make(map[int]struct{})
nextEvent := t.tracked[0].ValidUntil
// Loop through every tracked ID and see if any operations are needed
for i, inQuestion := range t.tracked {
// Generate new ephemeral if is time for it
if netTime.Now().After(inQuestion.NextGeneration) {
nextGeneration := t.generateIdentitiesOverRange(inQuestion, addressSize)
// Move forward the tracking of when generation should occur
inQuestion.LastGeneration = inQuestion.NextGeneration
inQuestion.NextGeneration = nextNextGeneration.Add(time.Millisecond)
inQuestion.NextGeneration = nextGeneration.Add(time.Millisecond)
edits = true
}
// If it is time to delete the ID, then process the deletion
if inQuestion.ValidUntil != Forever &&
netTime.Now().After(inQuestion.ValidUntil) {
if inQuestion.ValidUntil != Forever && netTime.Now().After(inQuestion.ValidUntil) {
edits = true
toRemove[i] = struct{}{}
} else {
......@@ -239,6 +260,7 @@ trackerLoop:
nextEvent = inQuestion.ValidUntil
}
}
}
// Process any deletions
......@@ -249,7 +271,6 @@ trackerLoop:
newTracked = append(newTracked, t.tracked[i])
}
}
t.tracked = newTracked
}
......@@ -257,48 +278,7 @@ trackerLoop:
t.save()
}
// Trigger events early. This will cause generations to happen early as
// well as message pickup. As a result, if there are time sync issues
// between clients, and they begin sending to ephemeral IDs early, then
// messages will still be picked up.
nextUpdate := nextEvent.Add(-validityGracePeriod)
// Sleep until the last ID has expired
select {
case <-time.NewTimer(nextUpdate.Sub(nextUpdate)).C:
case newIdentity := <-t.newIdentity:
// If the identity is old, then update its properties
for i := range t.tracked {
inQuestion := t.tracked[i]
if inQuestion.Source.Cmp(newIdentity.Source) {
inQuestion.Persistent = newIdentity.Persistent
inQuestion.ValidUntil = newIdentity.ValidUntil
t.save()
continue trackerLoop
}
}
// Otherwise, add it to the list and run
t.tracked = append(t.tracked, newIdentity)
t.save()
continue trackerLoop
case deleteID := <-t.deleteIdentity:
for i := range t.tracked {
inQuestion := t.tracked[i]
if inQuestion.Source.Cmp(deleteID) {
t.tracked = append(t.tracked[:i], t.tracked[i+1:]...)
t.save()
t.store.RemoveIdentities(deleteID)
break
}
}
case <-stop.Quit():
t.addrSpace.UnregisterAddressSpaceNotification(addressSpaceSizeChanTag)
stop.ToStopped()
return
}
}
return nextEvent
}
func getOldTimestampStore(session storage.Session) (time.Time, error) {
......@@ -321,29 +301,31 @@ func unmarshalTimestamp(lastTimestampObj *versioned.Object) (time.Time, error) {
return lastTimestamp, err
}
func generateIdentitiesOverRange(lastGeneration, generateThrough time.Time,
source *id.ID, addressSize uint8) ([]receptionID.Identity, time.Time) {
protoIds, err := ephemeral.GetIdsByRange(source, uint(addressSize),
lastGeneration, generateThrough.Sub(lastGeneration))
jww.DEBUG.Printf("Now: %s, LastCheck: %s, Different: %s",
generateThrough, generateThrough, generateThrough.Sub(lastGeneration))
jww.DEBUG.Printf("protoIds count: %d", len(protoIds))
// generateIdentitiesOverRange generates and adds all not yet existing ephemeral Ids
// and returns the timestamp of the next generation for the given TrackedID
func (t *manager) generateIdentitiesOverRange(inQuestion TrackedID, addressSize uint8) time.Time {
// Ensure that ephemeral IDs will not be generated after the
// identity is invalid
generateUntil := inQuestion.NextGeneration
if inQuestion.ValidUntil != Forever && generateUntil.After(inQuestion.ValidUntil) {
generateUntil = inQuestion.ValidUntil
}
// Generate list of identities
protoIds, err := ephemeral.GetIdsByRange(inQuestion.Source, uint(addressSize),
inQuestion.LastGeneration, generateUntil.Sub(inQuestion.LastGeneration))
if err != nil {
jww.FATAL.Panicf("Could not generate upcoming IDs: %+v", err)
}
// Generate identities off of that list
identities := make([]receptionID.Identity, len(protoIds))
// Add identities for every address ID
lastIdentityEnd := time.Time{}
for i, eid := range protoIds {
// Expand the grace period for both start and end
identities[i] = receptionID.Identity{
newIdentity := receptionID.Identity{
EphemeralIdentity: receptionID.EphemeralIdentity{
EphId: eid.Id,
Source: source,
Source: inQuestion.Source,
},
AddressSize: addressSize,
End: eid.End,
......@@ -352,19 +334,33 @@ func generateIdentitiesOverRange(lastGeneration, generateThrough time.Time,
Ephemeral: false,
ExtraChecks: DefaultExtraChecks,
}
// Move up the end time if the source identity is invalid
// before the natural end of the ephemeral identity
if inQuestion.ValidUntil != Forever && newIdentity.End.After(inQuestion.ValidUntil) {
newIdentity.End = inQuestion.ValidUntil
}
newIdentity.Ephemeral = !inQuestion.Persistent
if err := t.ephemeral.AddIdentity(newIdentity); err != nil {
jww.FATAL.Panicf("Could not insert identity: %+v", err)
}
jww.INFO.Printf("Number of identities generated: %d", len(identities))
// Print debug information and set return value
if isLastIdentity := i == len(protoIds)-1; isLastIdentity {
jww.INFO.Printf("Current Identity: %d (source: %s), Start: %s, End: %s",
identities[len(identities)-1].EphId.Int64(),
identities[len(identities)-1].Source,
identities[len(identities)-1].StartValid,
identities[len(identities)-1].EndValid)
newIdentity.EphId.Int64(),
newIdentity.Source,
newIdentity.StartValid,
newIdentity.EndValid)
lastIdentityEnd = newIdentity.End
}
}
return identities, identities[len(identities)-1].End
jww.INFO.Printf("Number of identities generated: %d", len(protoIds))
return lastIdentityEnd
}
// save persistent TrackedID to storage
func (t *manager) save() {
t.mux.Lock()
defer t.mux.Unlock()
......@@ -397,6 +393,7 @@ func (t *manager) save() {
}
}
// load persistent IDs from storage
func (t *manager) load() error {
t.mux.Lock()
defer t.mux.Unlock()
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 Privategrity Corporation /
// /
// All rights reserved. /
////////////////////////////////////////////////////////////////////////////////
package identity
import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/cmix/address"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/elixxir/client/storage"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime"
"sync"
"testing"
"time"
)
func TestManager_processIdentities_expired(t *testing.T) {
// Initialization
addrSpace := address.NewAddressSpace()
addrSpace.UpdateAddressSpace(18)
session := storage.InitTestingSession(t)
m := &manager{
tracked: make([]TrackedID, 0),
session: session,
newIdentity: make(chan TrackedID, trackedIDChanSize),
deleteIdentity: make(chan *id.ID, deleteIDChanSize),
addrSpace: addrSpace,
ephemeral: receptionID.NewOrLoadStore(session.GetKV()),
mux: &sync.Mutex{},
}
// Add some expired test IDs
for i := uint64(0); i < 10; i++ {
testId := id.NewIdFromUInt(i, id.User, t)
validUntil := time.Now()
m.tracked = append(m.tracked, TrackedID{
NextGeneration: netTime.Now().Add(-time.Second),
LastGeneration: time.Time{},
Source: testId,
ValidUntil: validUntil,
Persistent: false,
Creation: netTime.Now(),
})
}
expected := m.tracked[0].ValidUntil
nextEvent := m.processIdentities(addrSpace.GetAddressSpace())
if len(m.tracked) != 0 {
t.Errorf("Failed to remove expired identities, %d remain", len(m.tracked))
}
if nextEvent != expected {
t.Errorf("Invalid nextEvent, expected %v got %v", expected, nextEvent)
}
}
func TestManager_processIdentities(t *testing.T) {
jww.SetStdoutThreshold(jww.LevelDebug)
// Initialization
addrSpace := address.NewAddressSpace()
addrSpace.UpdateAddressSpace(18)
session := storage.InitTestingSession(t)
m := &manager{
tracked: make([]TrackedID, 0),
session: session,
newIdentity: make(chan TrackedID, trackedIDChanSize),
deleteIdentity: make(chan *id.ID, deleteIDChanSize),
addrSpace: addrSpace,
ephemeral: receptionID.NewOrLoadStore(session.GetKV()),
mux: &sync.Mutex{},
}
// Add some expired test IDs
testId := id.NewIdFromUInt(0, id.User, t)
validUntil := time.Now().Add(time.Minute)
m.tracked = append(m.tracked, TrackedID{
NextGeneration: netTime.Now(),
LastGeneration: time.Time{},
Source: testId,
ValidUntil: validUntil,
Persistent: true,
Creation: netTime.Now(),
})
_ = m.processIdentities(addrSpace.GetAddressSpace())
if len(m.tracked) != 1 {
t.Errorf("Unexpectedly removed identity, %d remain", len(m.tracked))
}
}
......@@ -13,9 +13,9 @@ require (
github.com/spf13/viper v1.7.1
gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228
gitlab.com/elixxir/comms v0.0.4-0.20220323190139-9ed75f3a8b2c
gitlab.com/elixxir/crypto v0.0.7-0.20220414225314-6f3eb9c073a5
gitlab.com/elixxir/crypto v0.0.7-0.20220415180223-ec8d560270a1
gitlab.com/elixxir/ekv v0.1.7
gitlab.com/elixxir/primitives v0.0.3-0.20220323183834-b98f255361b8
gitlab.com/elixxir/primitives v0.0.3-0.20220330212736-cce83b5f948f
gitlab.com/xx_network/comms v0.0.4-0.20220315161313-76acb14429ac
gitlab.com/xx_network/crypto v0.0.5-0.20220317171841-084640957d71
gitlab.com/xx_network/primitives v0.0.4-0.20220324193139-b292d1ae6e7e
......
......@@ -226,8 +226,7 @@ func InitTestingSession(i interface{}) Session {
}
privKey, _ := rsa.LoadPrivateKeyFromPem([]byte("-----BEGIN PRIVATE KEY-----\nMIIJQQIBADANBgkqhkiG9w0BAQEFAASCCSswggknAgEAAoICAQC7Dkb6VXFn4cdp\nU0xh6ji0nTDQUyT9DSNW9I3jVwBrWfqMc4ymJuonMZbuqK+cY2l+suS2eugevWZr\ntzujFPBRFp9O14Jl3fFLfvtjZvkrKbUMHDHFehascwzrp3tXNryiRMmCNQV55TfI\nTVCv8CLE0t1ibiyOGM9ZWYB2OjXt59j76lPARYww5qwC46vS6+3Cn2Yt9zkcrGes\nkWEFa2VttHqF910TP+DZk2R5C7koAh6wZYK6NQ4S83YQurdHAT51LKGrbGehFKXq\n6/OAXCU1JLi3kW2PovTb6MZuvxEiRmVAONsOcXKu7zWCmFjuZZwfRt2RhnpcSgzf\nrarmsGM0LZh6JY3MGJ9YdPcVGSz+Vs2E4zWbNW+ZQoqlcGeMKgsIiQ670g0xSjYI\nCqldpt79gaET9PZsoXKEmKUaj6pq1d4qXDk7s63HRQazwVLGBdJQK8qX41eCdR8V\nMKbrCaOkzD5zgnEu0jBBAwdMtcigkMIk1GRv91j7HmqwryOBHryLi6NWBY3tjb4S\no9AppDQB41SH3SwNenAbNO1CXeUqN0hHX6I1bE7OlbjqI7tXdrTllHAJTyVVjenP\nel2ApMXp+LVRdDbKtwBiuM6+n+z0I7YYerxN1gfvpYgcXm4uye8dfwotZj6H2J/u\nSALsU2v9UHBzprdrLSZk2YpozJb+CQIDAQABAoICAARjDFUYpeU6zVNyCauOM7BA\ns4FfQdHReg+zApTfWHosDQ04NIc9CGbM6e5E9IFlb3byORzyevkllf5WuMZVWmF8\nd1YBBeTftKYBn2Gwa42Ql9dl3eD0wQ1gUWBBeEoOVZQ0qskr9ynpr0o6TfciWZ5m\nF50UWmUmvc4ppDKhoNwogNU/pKEwwF3xOv2CW2hB8jyLQnk3gBZlELViX3UiFKni\n/rCfoYYvDFXt+ABCvx/qFNAsQUmerurQ3Ob9igjXRaC34D7F9xQ3CMEesYJEJvc9\nGjvr5DbnKnjx152HS56TKhK8gp6vGHJz17xtWECXD3dIUS/1iG8bqXuhdg2c+2aW\nm3MFpa5jgpAawUWc7c32UnqbKKf+HI7/x8J1yqJyNeU5SySyYSB5qtwTShYzlBW/\nyCYD41edeJcmIp693nUcXzU+UAdtpt0hkXS59WSWlTrB/huWXy6kYXLNocNk9L7g\niyx0cOmkuxREMHAvK0fovXdVyflQtJYC7OjJxkzj2rWO+QtHaOySXUyinkuTb5ev\nxNhs+ROWI/HAIE9buMqXQIpHx6MSgdKOL6P6AEbBan4RAktkYA6y5EtH/7x+9V5E\nQTIz4LrtI6abaKb4GUlZkEsc8pxrkNwCqOAE/aqEMNh91Na1TOj3f0/a6ckGYxYH\npyrvwfP2Ouu6e5FhDcCBAoIBAQDcN8mK99jtrH3q3Q8vZAWFXHsOrVvnJXyHLz9V\n1Rx/7TnMUxvDX1PIVxhuJ/tmHtxrNIXOlps80FCZXGgxfET/YFrbf4H/BaMNJZNP\nag1wBV5VQSnTPdTR+Ijice+/ak37S2NKHt8+ut6yoZjD7sf28qiO8bzNua/OYHkk\nV+RkRkk68Uk2tFMluQOSyEjdsrDNGbESvT+R1Eotupr0Vy/9JRY/TFMc4MwJwOoy\ns7wYr9SUCq/cYn7FIOBTI+PRaTx1WtpfkaErDc5O+nLLEp1yOrfktl4LhU/r61i7\nfdtafUACTKrXG2qxTd3w++mHwTwVl2MwhiMZfxvKDkx0L2gxAoIBAQDZcxKwyZOy\ns6Aw7igw1ftLny/dpjPaG0p6myaNpeJISjTOU7HKwLXmlTGLKAbeRFJpOHTTs63y\ngcmcuE+vGCpdBHQkaCev8cve1urpJRcxurura6+bYaENO6ua5VzF9BQlDYve0YwY\nlbJiRKmEWEAyULjbIebZW41Z4UqVG3MQI750PRWPW4WJ2kDhksFXN1gwSnaM46KR\nPmVA0SL+RCPcAp/VkImCv0eqv9exsglY0K/QiJfLy3zZ8QvAn0wYgZ3AvH3lr9rJ\nT7pg9WDb+OkfeEQ7INubqSthhaqCLd4zwbMRlpyvg1cMSq0zRvrFpwVlSY85lW4F\ng/tgjJ99W9VZAoIBAH3OYRVDAmrFYCoMn+AzA/RsIOEBqL8kaz/Pfh9K4D01CQ/x\naqryiqqpFwvXS4fLmaClIMwkvgq/90ulvuCGXeSG52D+NwW58qxQCxgTPhoA9yM9\nVueXKz3I/mpfLNftox8sskxl1qO/nfnu15cXkqVBe4ouD+53ZjhAZPSeQZwHi05h\nCbJ20gl66M+yG+6LZvXE96P8+ZQV80qskFmGdaPozAzdTZ3xzp7D1wegJpTz3j20\n3ULKAiIb5guZNU0tEZz5ikeOqsQt3u6/pVTeDZR0dxnyFUf/oOjmSorSG75WT3sA\n0ZiR0SH5mhFR2Nf1TJ4JHmFaQDMQqo+EG6lEbAECggEAA7kGnuQ0lSCiI3RQV9Wy\nAa9uAFtyE8/XzJWPaWlnoFk04jtoldIKyzHOsVU0GOYOiyKeTWmMFtTGANre8l51\nizYiTuVBmK+JD/2Z8/fgl8dcoyiqzvwy56kX3QUEO5dcKO48cMohneIiNbB7PnrM\nTpA3OfkwnJQGrX0/66GWrLYP8qmBDv1AIgYMilAa40VdSyZbNTpIdDgfP6bU9Ily\nG7gnyF47HHPt5Cx4ouArbMvV1rof7ytCrfCEhP21Lc46Ryxy81W5ZyzoQfSxfdKb\nGyDR+jkryVRyG69QJf5nCXfNewWbFR4ohVtZ78DNVkjvvLYvr4qxYYLK8PI3YMwL\nsQKCAQB9lo7JadzKVio+C18EfNikOzoriQOaIYowNaaGDw3/9KwIhRsKgoTs+K5O\ngt/gUoPRGd3M2z4hn5j4wgeuFi7HC1MdMWwvgat93h7R1YxiyaOoCTxH1klbB/3K\n4fskdQRxuM8McUebebrp0qT5E0xs2l+ABmt30Dtd3iRrQ5BBjnRc4V//sQiwS1aC\nYi5eNYCQ96BSAEo1dxJh5RI/QxF2HEPUuoPM8iXrIJhyg9TEEpbrEJcxeagWk02y\nOMEoUbWbX07OzFVvu+aJaN/GlgiogMQhb6IiNTyMlryFUleF+9OBA8xGHqGWA6nR\nOaRA5ZbdE7g7vxKRV36jT3wvD7W+\n-----END PRIVATE KEY-----\n"))
store := make(ekv.Memstore)
kv := versioned.NewKV(store)
kv := versioned.NewKV(ekv.MakeMemstore())
s := &session{kv: kv}
uid := id.NewIdFromString("zezima", id.User, i)
u, err := user.NewUser(kv, uid, uid, []byte("salt"), []byte("salt"), privKey, privKey, false)
......
......@@ -152,7 +152,7 @@ func (v *KV) Prefix(prefix string) *KV {
}
func (v *KV) IsMemStore() bool {
_, success := v.r.data.(ekv.Memstore)
_, success := v.r.data.(*ekv.Memstore)
return success
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment