Skip to content
Snippets Groups Projects
Commit 485f06ff authored by Jake Taylor's avatar Jake Taylor
Browse files

add tests to tracker

parent 8d81a53e
No related branches found
No related tags found
4 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast,!202add tests to tracker
......@@ -27,23 +27,22 @@ import (
"gitlab.com/xx_network/primitives/netTime"
)
const validityGracePeriod = 5 * time.Minute
const TrackerListKey = "TrackerListKey"
const TrackerListVersion = 0
const TimestampKey = "IDTrackingTimestamp"
const ephemeralStoppable = "EphemeralCheck"
const addressSpaceSizeChanTag = "ephemeralTracker"
var Forever = time.Time{}
const (
validityGracePeriod = 5 * time.Minute
TrackerListKey = "TrackerListKey"
TrackerListVersion = 0
TimestampKey = "IDTrackingTimestamp"
ephemeralStoppable = "EphemeralCheck"
addressSpaceSizeChanTag = "ephemeralTracker"
trackedIDChanSize = 1000
deleteIDChanSize = 1000
)
// DefaultExtraChecks is the default value for ExtraChecks on
// receptionID.Identity.
const DefaultExtraChecks = 10
// DefaultExtraChecks is the default value for ExtraChecks on receptionID.Identity.
DefaultExtraChecks = 10
)
type Tracker interface {
StartProcesses() stoppable.Stoppable
......@@ -55,7 +54,7 @@ type Tracker interface {
type manager struct {
tracked []TrackedID
store *receptionID.Store
ephemeral *receptionID.Store
session storage.Session
newIdentity chan TrackedID
deleteIdentity chan *id.ID
......@@ -123,7 +122,7 @@ func NewOrLoadTracker(session storage.Session, addrSpace address.Space) *manager
jww.FATAL.Panicf("Unable to create new Tracker: %+v", err)
}
t.store = receptionID.NewOrLoadStore(session.GetKV())
t.ephemeral = receptionID.NewOrLoadStore(session.GetKV())
return t
}
......@@ -158,7 +157,7 @@ func (t *manager) RemoveIdentity(id *id.ID) {
// GetEphemeralIdentity returns an ephemeral Identity to poll the network with.
func (t *manager) GetEphemeralIdentity(rng io.Reader, addressSize uint8) (
receptionID.IdentityUse, error) {
return t.store.GetIdentity(rng, addressSize)
return t.ephemeral.GetIdentity(rng, addressSize)
}
// GetIdentity returns a currently tracked identity
......@@ -177,57 +176,79 @@ func (t *manager) track(stop *stoppable.Single) {
// Wait until the ID size is retrieved from the network
addressSize := t.addrSpace.GetAddressSpace()
// Wait for next event
trackerLoop:
for {
edits := false
var toRemove map[int]struct{}
nextEvent := t.tracked[0].ValidUntil
// Process new and old identities
nextEvent := t.processIdentities(addressSize)
// Loop through every tracked ID and see if any operations are needed
// Trigger events early. This will cause generations to happen early as
// well as message pickup. As a result, if there are time sync issues
// between clients, and they begin sending to ephemeral IDs early, then
// messages will still be picked up.
nextUpdate := nextEvent.Add(-validityGracePeriod)
// Sleep until the last ID has expired
select {
case <-time.NewTimer(nextUpdate.Sub(nextUpdate)).C:
case newIdentity := <-t.newIdentity:
// If the identity is old, then update its properties
isOld := false
for i := range t.tracked {
inQuestion := t.tracked[i]
// Generate new ephemeral if is time for it
if netTime.Now().After(inQuestion.NextGeneration) {
edits = true
// Ensure that ephemeral IDs will not be generated after the
// identity is invalid
generateUntil := inQuestion.NextGeneration
if inQuestion.ValidUntil != Forever &&
generateUntil.After(inQuestion.ValidUntil) {
generateUntil = inQuestion.ValidUntil
if inQuestion.Source.Cmp(newIdentity.Source) {
inQuestion.Persistent = newIdentity.Persistent
inQuestion.ValidUntil = newIdentity.ValidUntil
isOld = true
break
}
}
if !isOld {
// Otherwise, add it to the list and run
t.tracked = append(t.tracked, newIdentity)
}
// Generate all not yet existing ephemeral IDs
identities, nextNextGeneration := generateIdentitiesOverRange(
inQuestion.LastGeneration, inQuestion.NextGeneration,
inQuestion.Source, addressSize)
t.save()
continue
// Add all ephemeral IDs to the ephemeral handler
for _, identity := range identities {
// Move up the end time if the source identity is invalid
// before the natural end of the ephemeral identity
if inQuestion.ValidUntil != Forever &&
identity.End.After(inQuestion.ValidUntil) {
identity.End = inQuestion.ValidUntil
case deleteID := <-t.deleteIdentity:
for i := range t.tracked {
inQuestion := t.tracked[i]
if inQuestion.Source.Cmp(deleteID) {
t.tracked = append(t.tracked[:i], t.tracked[i+1:]...)
t.save()
// Requires manual deletion in case identity is deleted before expiration
t.ephemeral.RemoveIdentities(deleteID)
break
}
}
case <-stop.Quit():
t.addrSpace.UnregisterAddressSpaceNotification(addressSpaceSizeChanTag)
stop.ToStopped()
return
}
identity.Ephemeral = !inQuestion.Persistent
if err := t.store.AddIdentity(identity); err != nil {
jww.FATAL.Panicf("Could not insert identity: %+v", err)
}
}
// processIdentities builds and adds new identities and removes old identities from the tracker
// and returns the timestamp of the next ID event
func (t *manager) processIdentities(addressSize uint8) time.Time {
edits := false
toRemove := make(map[int]struct{})
nextEvent := t.tracked[0].ValidUntil
// Loop through every tracked ID and see if any operations are needed
for i, inQuestion := range t.tracked {
// Generate new ephemeral if is time for it
if netTime.Now().After(inQuestion.NextGeneration) {
nextGeneration := t.generateIdentitiesOverRange(inQuestion, addressSize)
// Move forward the tracking of when generation should occur
inQuestion.LastGeneration = inQuestion.NextGeneration
inQuestion.NextGeneration = nextNextGeneration.Add(time.Millisecond)
inQuestion.NextGeneration = nextGeneration.Add(time.Millisecond)
edits = true
}
// If it is time to delete the ID, then process the deletion
if inQuestion.ValidUntil != Forever &&
netTime.Now().After(inQuestion.ValidUntil) {
if inQuestion.ValidUntil != Forever && netTime.Now().After(inQuestion.ValidUntil) {
edits = true
toRemove[i] = struct{}{}
} else {
......@@ -239,6 +260,7 @@ trackerLoop:
nextEvent = inQuestion.ValidUntil
}
}
}
// Process any deletions
......@@ -249,7 +271,6 @@ trackerLoop:
newTracked = append(newTracked, t.tracked[i])
}
}
t.tracked = newTracked
}
......@@ -257,48 +278,7 @@ trackerLoop:
t.save()
}
// Trigger events early. This will cause generations to happen early as
// well as message pickup. As a result, if there are time sync issues
// between clients, and they begin sending to ephemeral IDs early, then
// messages will still be picked up.
nextUpdate := nextEvent.Add(-validityGracePeriod)
// Sleep until the last ID has expired
select {
case <-time.NewTimer(nextUpdate.Sub(nextUpdate)).C:
case newIdentity := <-t.newIdentity:
// If the identity is old, then update its properties
for i := range t.tracked {
inQuestion := t.tracked[i]
if inQuestion.Source.Cmp(newIdentity.Source) {
inQuestion.Persistent = newIdentity.Persistent
inQuestion.ValidUntil = newIdentity.ValidUntil
t.save()
continue trackerLoop
}
}
// Otherwise, add it to the list and run
t.tracked = append(t.tracked, newIdentity)
t.save()
continue trackerLoop
case deleteID := <-t.deleteIdentity:
for i := range t.tracked {
inQuestion := t.tracked[i]
if inQuestion.Source.Cmp(deleteID) {
t.tracked = append(t.tracked[:i], t.tracked[i+1:]...)
t.save()
t.store.RemoveIdentities(deleteID)
break
}
}
case <-stop.Quit():
t.addrSpace.UnregisterAddressSpaceNotification(addressSpaceSizeChanTag)
stop.ToStopped()
return
}
}
return nextEvent
}
func getOldTimestampStore(session storage.Session) (time.Time, error) {
......@@ -321,29 +301,31 @@ func unmarshalTimestamp(lastTimestampObj *versioned.Object) (time.Time, error) {
return lastTimestamp, err
}
func generateIdentitiesOverRange(lastGeneration, generateThrough time.Time,
source *id.ID, addressSize uint8) ([]receptionID.Identity, time.Time) {
protoIds, err := ephemeral.GetIdsByRange(source, uint(addressSize),
lastGeneration, generateThrough.Sub(lastGeneration))
jww.DEBUG.Printf("Now: %s, LastCheck: %s, Different: %s",
generateThrough, generateThrough, generateThrough.Sub(lastGeneration))
jww.DEBUG.Printf("protoIds count: %d", len(protoIds))
// generateIdentitiesOverRange generates and adds all not yet existing ephemeral Ids
// and returns the timestamp of the next generation for the given TrackedID
func (t *manager) generateIdentitiesOverRange(inQuestion TrackedID, addressSize uint8) time.Time {
// Ensure that ephemeral IDs will not be generated after the
// identity is invalid
generateUntil := inQuestion.NextGeneration
if inQuestion.ValidUntil != Forever && generateUntil.After(inQuestion.ValidUntil) {
generateUntil = inQuestion.ValidUntil
}
// Generate list of identities
protoIds, err := ephemeral.GetIdsByRange(inQuestion.Source, uint(addressSize),
inQuestion.LastGeneration, generateUntil.Sub(inQuestion.LastGeneration))
if err != nil {
jww.FATAL.Panicf("Could not generate upcoming IDs: %+v", err)
}
// Generate identities off of that list
identities := make([]receptionID.Identity, len(protoIds))
// Add identities for every address ID
lastIdentityEnd := time.Time{}
for i, eid := range protoIds {
// Expand the grace period for both start and end
identities[i] = receptionID.Identity{
newIdentity := receptionID.Identity{
EphemeralIdentity: receptionID.EphemeralIdentity{
EphId: eid.Id,
Source: source,
Source: inQuestion.Source,
},
AddressSize: addressSize,
End: eid.End,
......@@ -352,19 +334,33 @@ func generateIdentitiesOverRange(lastGeneration, generateThrough time.Time,
Ephemeral: false,
ExtraChecks: DefaultExtraChecks,
}
// Move up the end time if the source identity is invalid
// before the natural end of the ephemeral identity
if inQuestion.ValidUntil != Forever && newIdentity.End.After(inQuestion.ValidUntil) {
newIdentity.End = inQuestion.ValidUntil
}
newIdentity.Ephemeral = !inQuestion.Persistent
if err := t.ephemeral.AddIdentity(newIdentity); err != nil {
jww.FATAL.Panicf("Could not insert identity: %+v", err)
}
jww.INFO.Printf("Number of identities generated: %d", len(identities))
// Print debug information and set return value
if isLastIdentity := i == len(protoIds)-1; isLastIdentity {
jww.INFO.Printf("Current Identity: %d (source: %s), Start: %s, End: %s",
identities[len(identities)-1].EphId.Int64(),
identities[len(identities)-1].Source,
identities[len(identities)-1].StartValid,
identities[len(identities)-1].EndValid)
newIdentity.EphId.Int64(),
newIdentity.Source,
newIdentity.StartValid,
newIdentity.EndValid)
lastIdentityEnd = newIdentity.End
}
}
return identities, identities[len(identities)-1].End
jww.INFO.Printf("Number of identities generated: %d", len(protoIds))
return lastIdentityEnd
}
// save persistent TrackedID to storage
func (t *manager) save() {
t.mux.Lock()
defer t.mux.Unlock()
......@@ -397,6 +393,7 @@ func (t *manager) save() {
}
}
// load persistent IDs from storage
func (t *manager) load() error {
t.mux.Lock()
defer t.mux.Unlock()
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 Privategrity Corporation /
// /
// All rights reserved. /
////////////////////////////////////////////////////////////////////////////////
package identity
import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/cmix/address"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/elixxir/client/storage"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime"
"sync"
"testing"
"time"
)
func TestManager_processIdentities_expired(t *testing.T) {
// Initialization
addrSpace := address.NewAddressSpace()
addrSpace.UpdateAddressSpace(18)
session := storage.InitTestingSession(t)
m := &manager{
tracked: make([]TrackedID, 0),
session: session,
newIdentity: make(chan TrackedID, trackedIDChanSize),
deleteIdentity: make(chan *id.ID, deleteIDChanSize),
addrSpace: addrSpace,
ephemeral: receptionID.NewOrLoadStore(session.GetKV()),
mux: &sync.Mutex{},
}
// Add some expired test IDs
for i := uint64(0); i < 10; i++ {
testId := id.NewIdFromUInt(i, id.User, t)
validUntil := time.Now()
m.tracked = append(m.tracked, TrackedID{
NextGeneration: netTime.Now().Add(-time.Second),
LastGeneration: time.Time{},
Source: testId,
ValidUntil: validUntil,
Persistent: false,
Creation: netTime.Now(),
})
}
expected := m.tracked[0].ValidUntil
nextEvent := m.processIdentities(addrSpace.GetAddressSpace())
if len(m.tracked) != 0 {
t.Errorf("Failed to remove expired identities, %d remain", len(m.tracked))
}
if nextEvent != expected {
t.Errorf("Invalid nextEvent, expected %v got %v", expected, nextEvent)
}
}
func TestManager_processIdentities(t *testing.T) {
jww.SetStdoutThreshold(jww.LevelDebug)
// Initialization
addrSpace := address.NewAddressSpace()
addrSpace.UpdateAddressSpace(18)
session := storage.InitTestingSession(t)
m := &manager{
tracked: make([]TrackedID, 0),
session: session,
newIdentity: make(chan TrackedID, trackedIDChanSize),
deleteIdentity: make(chan *id.ID, deleteIDChanSize),
addrSpace: addrSpace,
ephemeral: receptionID.NewOrLoadStore(session.GetKV()),
mux: &sync.Mutex{},
}
// Add some expired test IDs
testId := id.NewIdFromUInt(0, id.User, t)
validUntil := time.Now().Add(time.Minute)
m.tracked = append(m.tracked, TrackedID{
NextGeneration: netTime.Now(),
LastGeneration: time.Time{},
Source: testId,
ValidUntil: validUntil,
Persistent: true,
Creation: netTime.Now(),
})
_ = m.processIdentities(addrSpace.GetAddressSpace())
if len(m.tracked) != 1 {
t.Errorf("Unexpectedly removed identity, %d remain", len(m.tracked))
}
}
......@@ -13,9 +13,9 @@ require (
github.com/spf13/viper v1.7.1
gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228
gitlab.com/elixxir/comms v0.0.4-0.20220323190139-9ed75f3a8b2c
gitlab.com/elixxir/crypto v0.0.7-0.20220414225314-6f3eb9c073a5
gitlab.com/elixxir/crypto v0.0.7-0.20220415180223-ec8d560270a1
gitlab.com/elixxir/ekv v0.1.7
gitlab.com/elixxir/primitives v0.0.3-0.20220323183834-b98f255361b8
gitlab.com/elixxir/primitives v0.0.3-0.20220330212736-cce83b5f948f
gitlab.com/xx_network/comms v0.0.4-0.20220315161313-76acb14429ac
gitlab.com/xx_network/crypto v0.0.5-0.20220317171841-084640957d71
gitlab.com/xx_network/primitives v0.0.4-0.20220324193139-b292d1ae6e7e
......
......@@ -226,8 +226,7 @@ func InitTestingSession(i interface{}) Session {
}
privKey, _ := rsa.LoadPrivateKeyFromPem([]byte("-----BEGIN PRIVATE KEY-----\nMIIJQQIBADANBgkqhkiG9w0BAQEFAASCCSswggknAgEAAoICAQC7Dkb6VXFn4cdp\nU0xh6ji0nTDQUyT9DSNW9I3jVwBrWfqMc4ymJuonMZbuqK+cY2l+suS2eugevWZr\ntzujFPBRFp9O14Jl3fFLfvtjZvkrKbUMHDHFehascwzrp3tXNryiRMmCNQV55TfI\nTVCv8CLE0t1ibiyOGM9ZWYB2OjXt59j76lPARYww5qwC46vS6+3Cn2Yt9zkcrGes\nkWEFa2VttHqF910TP+DZk2R5C7koAh6wZYK6NQ4S83YQurdHAT51LKGrbGehFKXq\n6/OAXCU1JLi3kW2PovTb6MZuvxEiRmVAONsOcXKu7zWCmFjuZZwfRt2RhnpcSgzf\nrarmsGM0LZh6JY3MGJ9YdPcVGSz+Vs2E4zWbNW+ZQoqlcGeMKgsIiQ670g0xSjYI\nCqldpt79gaET9PZsoXKEmKUaj6pq1d4qXDk7s63HRQazwVLGBdJQK8qX41eCdR8V\nMKbrCaOkzD5zgnEu0jBBAwdMtcigkMIk1GRv91j7HmqwryOBHryLi6NWBY3tjb4S\no9AppDQB41SH3SwNenAbNO1CXeUqN0hHX6I1bE7OlbjqI7tXdrTllHAJTyVVjenP\nel2ApMXp+LVRdDbKtwBiuM6+n+z0I7YYerxN1gfvpYgcXm4uye8dfwotZj6H2J/u\nSALsU2v9UHBzprdrLSZk2YpozJb+CQIDAQABAoICAARjDFUYpeU6zVNyCauOM7BA\ns4FfQdHReg+zApTfWHosDQ04NIc9CGbM6e5E9IFlb3byORzyevkllf5WuMZVWmF8\nd1YBBeTftKYBn2Gwa42Ql9dl3eD0wQ1gUWBBeEoOVZQ0qskr9ynpr0o6TfciWZ5m\nF50UWmUmvc4ppDKhoNwogNU/pKEwwF3xOv2CW2hB8jyLQnk3gBZlELViX3UiFKni\n/rCfoYYvDFXt+ABCvx/qFNAsQUmerurQ3Ob9igjXRaC34D7F9xQ3CMEesYJEJvc9\nGjvr5DbnKnjx152HS56TKhK8gp6vGHJz17xtWECXD3dIUS/1iG8bqXuhdg2c+2aW\nm3MFpa5jgpAawUWc7c32UnqbKKf+HI7/x8J1yqJyNeU5SySyYSB5qtwTShYzlBW/\nyCYD41edeJcmIp693nUcXzU+UAdtpt0hkXS59WSWlTrB/huWXy6kYXLNocNk9L7g\niyx0cOmkuxREMHAvK0fovXdVyflQtJYC7OjJxkzj2rWO+QtHaOySXUyinkuTb5ev\nxNhs+ROWI/HAIE9buMqXQIpHx6MSgdKOL6P6AEbBan4RAktkYA6y5EtH/7x+9V5E\nQTIz4LrtI6abaKb4GUlZkEsc8pxrkNwCqOAE/aqEMNh91Na1TOj3f0/a6ckGYxYH\npyrvwfP2Ouu6e5FhDcCBAoIBAQDcN8mK99jtrH3q3Q8vZAWFXHsOrVvnJXyHLz9V\n1Rx/7TnMUxvDX1PIVxhuJ/tmHtxrNIXOlps80FCZXGgxfET/YFrbf4H/BaMNJZNP\nag1wBV5VQSnTPdTR+Ijice+/ak37S2NKHt8+ut6yoZjD7sf28qiO8bzNua/OYHkk\nV+RkRkk68Uk2tFMluQOSyEjdsrDNGbESvT+R1Eotupr0Vy/9JRY/TFMc4MwJwOoy\ns7wYr9SUCq/cYn7FIOBTI+PRaTx1WtpfkaErDc5O+nLLEp1yOrfktl4LhU/r61i7\nfdtafUACTKrXG2qxTd3w++mHwTwVl2MwhiMZfxvKDkx0L2gxAoIBAQDZcxKwyZOy\ns6Aw7igw1ftLny/dpjPaG0p6myaNpeJISjTOU7HKwLXmlTGLKAbeRFJpOHTTs63y\ngcmcuE+vGCpdBHQkaCev8cve1urpJRcxurura6+bYaENO6ua5VzF9BQlDYve0YwY\nlbJiRKmEWEAyULjbIebZW41Z4UqVG3MQI750PRWPW4WJ2kDhksFXN1gwSnaM46KR\nPmVA0SL+RCPcAp/VkImCv0eqv9exsglY0K/QiJfLy3zZ8QvAn0wYgZ3AvH3lr9rJ\nT7pg9WDb+OkfeEQ7INubqSthhaqCLd4zwbMRlpyvg1cMSq0zRvrFpwVlSY85lW4F\ng/tgjJ99W9VZAoIBAH3OYRVDAmrFYCoMn+AzA/RsIOEBqL8kaz/Pfh9K4D01CQ/x\naqryiqqpFwvXS4fLmaClIMwkvgq/90ulvuCGXeSG52D+NwW58qxQCxgTPhoA9yM9\nVueXKz3I/mpfLNftox8sskxl1qO/nfnu15cXkqVBe4ouD+53ZjhAZPSeQZwHi05h\nCbJ20gl66M+yG+6LZvXE96P8+ZQV80qskFmGdaPozAzdTZ3xzp7D1wegJpTz3j20\n3ULKAiIb5guZNU0tEZz5ikeOqsQt3u6/pVTeDZR0dxnyFUf/oOjmSorSG75WT3sA\n0ZiR0SH5mhFR2Nf1TJ4JHmFaQDMQqo+EG6lEbAECggEAA7kGnuQ0lSCiI3RQV9Wy\nAa9uAFtyE8/XzJWPaWlnoFk04jtoldIKyzHOsVU0GOYOiyKeTWmMFtTGANre8l51\nizYiTuVBmK+JD/2Z8/fgl8dcoyiqzvwy56kX3QUEO5dcKO48cMohneIiNbB7PnrM\nTpA3OfkwnJQGrX0/66GWrLYP8qmBDv1AIgYMilAa40VdSyZbNTpIdDgfP6bU9Ily\nG7gnyF47HHPt5Cx4ouArbMvV1rof7ytCrfCEhP21Lc46Ryxy81W5ZyzoQfSxfdKb\nGyDR+jkryVRyG69QJf5nCXfNewWbFR4ohVtZ78DNVkjvvLYvr4qxYYLK8PI3YMwL\nsQKCAQB9lo7JadzKVio+C18EfNikOzoriQOaIYowNaaGDw3/9KwIhRsKgoTs+K5O\ngt/gUoPRGd3M2z4hn5j4wgeuFi7HC1MdMWwvgat93h7R1YxiyaOoCTxH1klbB/3K\n4fskdQRxuM8McUebebrp0qT5E0xs2l+ABmt30Dtd3iRrQ5BBjnRc4V//sQiwS1aC\nYi5eNYCQ96BSAEo1dxJh5RI/QxF2HEPUuoPM8iXrIJhyg9TEEpbrEJcxeagWk02y\nOMEoUbWbX07OzFVvu+aJaN/GlgiogMQhb6IiNTyMlryFUleF+9OBA8xGHqGWA6nR\nOaRA5ZbdE7g7vxKRV36jT3wvD7W+\n-----END PRIVATE KEY-----\n"))
store := make(ekv.Memstore)
kv := versioned.NewKV(store)
kv := versioned.NewKV(ekv.MakeMemstore())
s := &session{kv: kv}
uid := id.NewIdFromString("zezima", id.User, i)
u, err := user.NewUser(kv, uid, uid, []byte("salt"), []byte("salt"), privKey, privKey, false)
......
......@@ -152,7 +152,7 @@ func (v *KV) Prefix(prefix string) *KV {
}
func (v *KV) IsMemStore() bool {
_, success := v.r.data.(ekv.Memstore)
_, success := v.r.data.(*ekv.Memstore)
return success
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment