Skip to content
Snippets Groups Projects
Commit f55ed1c7 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

more tracker implementation

parent 1d5c6a44
No related branches found
No related tags found
4 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast,!187Xx 3829/triggers
Showing
with 373 additions and 54 deletions
...@@ -397,7 +397,7 @@ func (gmr *GroupMessageReceive) GetRecipientID() []byte { ...@@ -397,7 +397,7 @@ func (gmr *GroupMessageReceive) GetRecipientID() []byte {
return gmr.RecipientID.Bytes() return gmr.RecipientID.Bytes()
} }
// GetEphemeralID returns the ephemeral ID of the recipient. // GetEphemeralID returns the address ID of the recipient.
func (gmr *GroupMessageReceive) GetEphemeralID() int64 { func (gmr *GroupMessageReceive) GetEphemeralID() int64 {
return gmr.EphemeralID.Int64() return gmr.EphemeralID.Int64()
} }
......
...@@ -73,9 +73,9 @@ type NetworkManager interface { ...@@ -73,9 +73,9 @@ type NetworkManager interface {
// and Identity is Defined by a source ID and a current EphemeralID // and Identity is Defined by a source ID and a current EphemeralID
// In its IdentityParams, paremeters describing the properties // In its IdentityParams, paremeters describing the properties
// of the identity as well as how long it will last are described // of the identity as well as how long it will last are described
AddIdentity(Identity, IdentityParams) AddIdentity(id *id.ID, validUntil time.Time, persistent bool) error
// RemoveIdentity removes a currently tracked identity. // RemoveIdentity removes a currently tracked identity.
RemoveIdentity(Identity) RemoveIdentity(id *id.ID)
/* Fingerprints are the primary mechanisim of identifying a picked up message over /* Fingerprints are the primary mechanisim of identifying a picked up message over
cMix. They are a unique one time use 255 bit vector generally cMix. They are a unique one time use 255 bit vector generally
...@@ -84,8 +84,7 @@ type NetworkManager interface { ...@@ -84,8 +84,7 @@ type NetworkManager interface {
The */ The */
//AddFingerprint - Adds a fingerprint which will be handled by a specific processor //AddFingerprint - Adds a fingerprint which will be handled by a specific processor
AddFingerprint(fingerprint format.Fingerprint, processor MessageProcessorFP) AddFingerprint(fingerprint format.Fingerprint, processor MessageProcessor)
AddFingerprints(fingerprints map[format.Fingerprint]MessageProcessorFP)
RemoveFingerprint(fingerprint format.Fingerprint) RemoveFingerprint(fingerprint format.Fingerprint)
RemoveFingerprints(fingerprints []format.Fingerprint) RemoveFingerprints(fingerprints []format.Fingerprint)
CheckFingerprint(fingerprint format.Fingerprint) bool CheckFingerprint(fingerprint format.Fingerprint) bool
...@@ -106,7 +105,7 @@ type NetworkManager interface { ...@@ -106,7 +105,7 @@ type NetworkManager interface {
Due to the extra overhead of trial hashing, triggers are processed after fingerprints. Due to the extra overhead of trial hashing, triggers are processed after fingerprints.
If a fingerprint match occurs on the message, triggers will not be handled. If a fingerprint match occurs on the message, triggers will not be handled.
Triggers are ephemeral to the session. When starting a new client, all triggers must be Triggers are address to the session. When starting a new client, all triggers must be
re-added before StartNetworkFollower is called. re-added before StartNetworkFollower is called.
*/ */
...@@ -152,7 +151,7 @@ type IdentityParams struct { ...@@ -152,7 +151,7 @@ type IdentityParams struct {
EndValid time.Time // Timestamp when the ephID stops being valid EndValid time.Time // Timestamp when the ephID stops being valid
// Makes the identity not store on disk // Makes the identity not store on disk
// When an ephemeral identity is deleted, all fingerprints & triggers // When an address identity is deleted, all fingerprints & triggers
// associated with it also delete. // associated with it also delete.
// TODO: This should not be confused with EphID for checking // TODO: This should not be confused with EphID for checking
// when messages are for the the user. That's a different type // when messages are for the the user. That's a different type
......
package ephemeral package address
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
...@@ -13,18 +13,18 @@ const ( ...@@ -13,18 +13,18 @@ const (
initSize = 1 initSize = 1
) )
// AddressSpace contains the current address space size used for creating // Space contains the current address space size used for creating
// ephemeral IDs and the infrastructure to alert other processes when an Update // address IDs and the infrastructure to alert other processes when an Update
// occurs. // occurs.
type AddressSpace struct { type Space struct {
size uint8 size uint8
notifyMap map[string]chan uint8 notifyMap map[string]chan uint8
cond *sync.Cond cond *sync.Cond
} }
// NewAddressSpace initialises a new AddressSpace and returns it. // NewAddressSpace initialises a new AddressSpace and returns it.
func NewAddressSpace() *AddressSpace { func NewAddressSpace() *Space {
return &AddressSpace{ return &Space{
size: initSize, size: initSize,
notifyMap: make(map[string]chan uint8), notifyMap: make(map[string]chan uint8),
cond: sync.NewCond(&sync.Mutex{}), cond: sync.NewCond(&sync.Mutex{}),
...@@ -33,7 +33,7 @@ func NewAddressSpace() *AddressSpace { ...@@ -33,7 +33,7 @@ func NewAddressSpace() *AddressSpace {
// Get returns the current address space size. It blocks until an address space // Get returns the current address space size. It blocks until an address space
// size is set. // size is set.
func (as *AddressSpace) Get() uint8 { func (as *Space) Get() uint8 {
as.cond.L.Lock() as.cond.L.Lock()
defer as.cond.L.Unlock() defer as.cond.L.Unlock()
...@@ -50,7 +50,7 @@ func (as *AddressSpace) Get() uint8 { ...@@ -50,7 +50,7 @@ func (as *AddressSpace) Get() uint8 {
// GetWithoutWait returns the current address space size regardless if it has // GetWithoutWait returns the current address space size regardless if it has
// been set yet. // been set yet.
func (as *AddressSpace) GetWithoutWait() uint8 { func (as *Space) GetWithoutWait() uint8 {
as.cond.L.Lock() as.cond.L.Lock()
defer as.cond.L.Unlock() defer as.cond.L.Unlock()
return as.size return as.size
...@@ -60,7 +60,7 @@ func (as *AddressSpace) GetWithoutWait() uint8 { ...@@ -60,7 +60,7 @@ func (as *AddressSpace) GetWithoutWait() uint8 {
// each registered channel is notified of the Update. If this was the first time // each registered channel is notified of the Update. If this was the first time
// that the address space size was set, then the conditional broadcasts to stop // that the address space size was set, then the conditional broadcasts to stop
// blocking for all threads waiting on Get. // blocking for all threads waiting on Get.
func (as *AddressSpace) Update(newSize uint8) { func (as *Space) Update(newSize uint8) {
as.cond.L.Lock() as.cond.L.Lock()
defer as.cond.L.Unlock() defer as.cond.L.Unlock()
...@@ -93,7 +93,7 @@ func (as *AddressSpace) Update(newSize uint8) { ...@@ -93,7 +93,7 @@ func (as *AddressSpace) Update(newSize uint8) {
// RegisterNotification returns a channel that will trigger for every address // RegisterNotification returns a channel that will trigger for every address
// space size Update. The provided tag is the unique ID for the channel. // space size Update. The provided tag is the unique ID for the channel.
// Returns an error if the tag is already used. // Returns an error if the tag is already used.
func (as *AddressSpace) RegisterNotification(tag string) (chan uint8, error) { func (as *Space) RegisterNotification(tag string) (chan uint8, error) {
as.cond.L.Lock() as.cond.L.Lock()
defer as.cond.L.Unlock() defer as.cond.L.Unlock()
...@@ -108,7 +108,7 @@ func (as *AddressSpace) RegisterNotification(tag string) (chan uint8, error) { ...@@ -108,7 +108,7 @@ func (as *AddressSpace) RegisterNotification(tag string) (chan uint8, error) {
// UnregisterNotification stops broadcasting address space size updates on the // UnregisterNotification stops broadcasting address space size updates on the
// channel with the specified tag. // channel with the specified tag.
func (as *AddressSpace) UnregisterNotification(tag string) { func (as *Space) UnregisterNotification(tag string) {
as.cond.L.Lock() as.cond.L.Lock()
defer as.cond.L.Unlock() defer as.cond.L.Unlock()
...@@ -117,7 +117,7 @@ func (as *AddressSpace) UnregisterNotification(tag string) { ...@@ -117,7 +117,7 @@ func (as *AddressSpace) UnregisterNotification(tag string) {
// NewTestAddressSpace initialises a new AddressSpace for testing with the given // NewTestAddressSpace initialises a new AddressSpace for testing with the given
// size. // size.
func NewTestAddressSpace(newSize uint8, x interface{}) *AddressSpace { func NewTestAddressSpace(newSize uint8, x interface{}) *Space {
switch x.(type) { switch x.(type) {
case *testing.T, *testing.M, *testing.B, *testing.PB: case *testing.T, *testing.M, *testing.B, *testing.PB:
break break
...@@ -126,7 +126,7 @@ func NewTestAddressSpace(newSize uint8, x interface{}) *AddressSpace { ...@@ -126,7 +126,7 @@ func NewTestAddressSpace(newSize uint8, x interface{}) *AddressSpace {
"Got %T", x) "Got %T", x)
} }
as := &AddressSpace{ as := &Space{
size: initSize, size: initSize,
notifyMap: make(map[string]chan uint8), notifyMap: make(map[string]chan uint8),
cond: sync.NewCond(&sync.Mutex{}), cond: sync.NewCond(&sync.Mutex{}),
......
package ephemeral package address
import ( import (
"reflect" "reflect"
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
// LICENSE file // // LICENSE file //
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
package ephemeral package address
import ( import (
"gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/gateway"
......
package reception package receptionID
import ( import (
"fmt" "fmt"
......
package reception package receptionID
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
...@@ -23,12 +23,12 @@ func generateFakeIdentity(rng io.Reader, addressSize uint8, ...@@ -23,12 +23,12 @@ func generateFakeIdentity(rng io.Reader, addressSize uint8,
copy(randID[:id.ArrIDLen-1], randIdBytes) copy(randID[:id.ArrIDLen-1], randIdBytes)
randID.SetType(id.User) randID.SetType(id.User)
// Generate the current ephemeral ID from the random identity // Generate the current address ID from the random identity
ephID, start, end, err := ephemeral.GetId( ephID, start, end, err := ephemeral.GetId(
randID, uint(addressSize), now.UnixNano()) randID, uint(addressSize), now.UnixNano())
if err != nil { if err != nil {
return IdentityUse{}, errors.WithMessage(err, "failed to generate an "+ return IdentityUse{}, errors.WithMessage(err, "failed to generate an "+
"ephemeral ID for random identity when none is available") "address ID for random identity when none is available")
} }
return IdentityUse{ return IdentityUse{
......
package reception package receptionID
import ( import (
"encoding/json" "encoding/json"
...@@ -54,14 +54,14 @@ func Test_generateFakeIdentity_RngError(t *testing.T) { ...@@ -54,14 +54,14 @@ func Test_generateFakeIdentity_RngError(t *testing.T) {
} }
} }
// Error path: fails to get the ephemeral ID. // Error path: fails to get the address ID.
func Test_generateFakeIdentity_GetEphemeralIdError(t *testing.T) { func Test_generateFakeIdentity_GetEphemeralIdError(t *testing.T) {
rng := rand.New(rand.NewSource(42)) rng := rand.New(rand.NewSource(42))
timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC) timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC)
_, err := generateFakeIdentity(rng, math.MaxInt8, timestamp) _, err := generateFakeIdentity(rng, math.MaxInt8, timestamp)
if err == nil || !strings.Contains(err.Error(), "ephemeral ID") { if err == nil || !strings.Contains(err.Error(), "address ID") {
t.Errorf("generateFakeIdentity() did not return the correct error on "+ t.Errorf("generateFakeIdentity() did not return the correct error on "+
"failure to generate ephemeral ID: %+v", err) "failure to generate address ID: %+v", err)
} }
} }
package reception package receptionID
import ( import (
"encoding/json" "encoding/json"
......
package reception package receptionID
import ( import (
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
......
package reception package receptionID
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
...@@ -55,7 +55,7 @@ func newRegistration(reg Identity, kv *versioned.KV) (*registration, error) { ...@@ -55,7 +55,7 @@ func newRegistration(reg Identity, kv *versioned.KV) (*registration, error) {
} }
r.CR = cr r.CR = cr
// If this is not ephemeral, then store everything // If this is not address, then store everything
if !reg.Ephemeral { if !reg.Ephemeral {
// Store known rounds // Store known rounds
var err error var err error
......
package reception package receptionID
import ( import (
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
......
package reception package receptionID
import ( import (
"encoding/json" "encoding/json"
...@@ -117,7 +117,7 @@ func (s *Store) save() error { ...@@ -117,7 +117,7 @@ func (s *Store) save() error {
return nil return nil
} }
// makeStoredReferences generates a reference of any non-ephemeral identities // makeStoredReferences generates a reference of any non-address identities
// for storage. // for storage.
func (s *Store) makeStoredReferences() []storedReference { func (s *Store) makeStoredReferences() []storedReference {
identities := make([]storedReference, len(s.active)) identities := make([]storedReference, len(s.active))
...@@ -249,7 +249,6 @@ func (s *Store) SetToExpire(addressSize uint8) { ...@@ -249,7 +249,6 @@ func (s *Store) SetToExpire(addressSize uint8) {
func (s *Store) prune(now time.Time) { func (s *Store) prune(now time.Time) {
lengthBefore := len(s.active) lengthBefore := len(s.active)
var pruned []int64 var pruned []int64
// Prune the list // Prune the list
for i := 0; i < len(s.active); i++ { for i := 0; i < len(s.active); i++ {
inQuestion := s.active[i] inQuestion := s.active[i]
...@@ -268,6 +267,7 @@ func (s *Store) prune(now time.Time) { ...@@ -268,6 +267,7 @@ func (s *Store) prune(now time.Time) {
// Save the list if it changed // Save the list if it changed
if lengthBefore != len(s.active) { if lengthBefore != len(s.active) {
jww.INFO.Printf("Pruned %d identities [%+v]", lengthBefore-len(s.active), pruned) jww.INFO.Printf("Pruned %d identities [%+v]", lengthBefore-len(s.active), pruned)
if err := s.save(); err != nil { if err := s.save(); err != nil {
jww.FATAL.Panicf("Failed to store reception storage: %+v", err) jww.FATAL.Panicf("Failed to store reception storage: %+v", err)
......
package reception package receptionID
import ( import (
"bytes" "bytes"
......
...@@ -5,125 +5,196 @@ ...@@ -5,125 +5,196 @@
// LICENSE file // // LICENSE file //
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
package ephemeral package identity
import ( import (
"encoding/json"
"github.com/pkg/errors" "github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/network/address"
"gitlab.com/elixxir/client/network/identity/receptionID"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/reception"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
"sort"
"sync"
"time" "time"
) )
const validityGracePeriod = 5 * time.Minute const validityGracePeriod = 5 * time.Minute
const TrackerListKey = "TrackerListKey"
const TrackerListVersion = 0
const TimestampKey = "IDTrackingTimestamp" const TimestampKey = "IDTrackingTimestamp"
const TimestampStoreVersion = 0 const TimestampStoreVersion = 0
const ephemeralStoppable = "EphemeralCheck" const ephemeralStoppable = "EphemeralCheck"
const addressSpaceSizeChanTag = "ephemeralTracker" const addressSpaceSizeChanTag = "ephemeralTracker"
var Forever time.Time = time.Time{}
// Track runs a thread which checks for past and present ephemeral ID.
func Track(session *storage.Session, addrSpace *AddressSpace, ourId *id.ID) stoppable.Stoppable {
stop := stoppable.NewSingle(ephemeralStoppable)
go track(session, addrSpace, ourId, stop)
return stop type Tracker struct{
tracked []trackedID
store *receptionID.Store
session *storage.Session
newIdentity chan trackedID
deleteIdentity chan *id.ID
} }
// track is a thread which continuously processes ephemeral IDs. Panics if any type trackedID struct{
// error occurs. NextGeneration time.Time
func track(session *storage.Session, addrSpace *AddressSpace, ourId *id.ID, stop *stoppable.Single) { LastGeneration time.Time
Source *id.ID
// Check that there is a timestamp in store at all ValidUntil time.Time
err := checkTimestampStore(session) Persistent bool
if err != nil {
jww.FATAL.Panicf("Could not store timestamp for ephemeral ID "+
"tracking: %+v", err)
} }
// get the latest timestamp from store func NewTracker(session *storage.Session, addrSpace *address.Space)*Tracker{
lastTimestampObj, err := session.Get(TimestampKey) //intilization
if err != nil { //loading
jww.FATAL.Panicf("Could not get timestamp: %+v", err) //if load fails, make a new
//check if there is an old timestamp, recreate the basic ID from it
//initilize the ephemeral identiies system
} }
lastCheck, err := unmarshalTimestamp(lastTimestampObj) //make the GetIdentities call on the ephemerals system public on this
if err != nil {
jww.FATAL.Panicf("Could not parse stored timestamp: %+v", err)
}
// Wait until we get the ID size from the network // Track runs a thread which checks for past and present address ID.
receptionStore := session.Reception() func (tracker Tracker)StartProcessies() stoppable.Stoppable {
addrSpace.UnregisterNotification(addressSpaceSizeChanTag) stop := stoppable.NewSingle(ephemeralStoppable)
addressSizeUpdate, err := addrSpace.RegisterNotification(addressSpaceSizeChanTag)
if err != nil {
jww.FATAL.Panicf("failed to register address size notification "+
"channel: %+v", err)
}
addressSize := addrSpace.Get()
for { go track(session, addrSpace, ourId, stop)
now := netTime.Now()
// Hack for inconsistent time on android return stop
if now.Before(lastCheck) || now.Equal(lastCheck) {
now = lastCheck.Add(time.Nanosecond)
} }
// Generates the IDs since the last track
protoIds, err := ephemeral.GetIdsByRange(
ourId, uint(addressSize), lastCheck, now.Add(validityGracePeriod).Sub(lastCheck))
jww.DEBUG.Printf("Now: %s, LastCheck: %s, Different: %s",
now, lastCheck, now.Sub(lastCheck))
jww.DEBUG.Printf("protoIds Count: %d", len(protoIds))
if err != nil { // AddIdentity adds an identity to be tracked
jww.FATAL.Panicf("Could not generate upcoming IDs: %+v", err) func (tracker *Tracker)AddIdentity(id *id.ID, validUntil time.Time, persistent bool){
tracker.newIdentity<-trackedID{
NextGeneration: netTime.Now().Add(-time.Second),
LastGeneration: time.Time{},
Source: id,
ValidUntil: validUntil,
Persistent: persistent,
}
} }
// Generate identities off of that list // RemoveIdentity removes a currently tracked identity.
identities := generateIdentities(protoIds, ourId, addressSize) func (tracker *Tracker) RemoveIdentity(id *id.ID){
tracker.deleteIdentity<-id
}
jww.INFO.Printf("Number of Identities Generated: %d", len(identities)) func (tracker *Tracker)track(session *storage.Session, addrSpace *address.Space, ourId *id.ID, stop *stoppable.Single) {
jww.INFO.Printf("Current Identity: %d (source: %s), Start: %s, End: %s",
identities[len(identities)-1].EphId.Int64(), // Wait until we get the ID size from the network
identities[len(identities)-1].Source, addressSize := addrSpace.Get()
identities[len(identities)-1].StartValid,
identities[len(identities)-1].EndValid)
// Add identities to storage, if unique /*wait for next event*/
trackerLoop:
for {
edits := false
var toRemove map[int]struct{}
nextEvent := tracker.tracked[0].ValidUntil
//loop through every tracked ID and see if any operatrions are needed
for i := range tracker.tracked {
inQuestion := tracker.tracked[i]
//generate new ephmerals if is time for it
if netTime.Now().After(inQuestion.NextGeneration) {
edits = true
//ensure that ephemerals will not be generated after the identity is invalid
generateUntil := inQuestion.NextGeneration
if inQuestion.ValidUntil !=Forever && generateUntil.After(inQuestion.ValidUntil){
generateUntil = inQuestion.ValidUntil
}
//generate all not yet existing ephemerals
identities, nextNextGeneration := generateIdentitiesOverRange(inQuestion.LastGeneration,
inQuestion.NextGeneration, inQuestion.Source, addressSize)
//add all ephemerals to the ephemeral handler
for _, identity := range identities { for _, identity := range identities {
if err = receptionStore.AddIdentity(identity); err != nil { // move up the end time if the source identity is invalid before the natural end
// of the ephemeral identity.
if inQuestion.ValidUntil != Forever && identity.End.After(inQuestion.ValidUntil) {
identity.End = inQuestion.ValidUntil
}
if err := tracker.store.AddIdentity(identity); err != nil {
jww.FATAL.Panicf("Could not insert identity: %+v", err) jww.FATAL.Panicf("Could not insert identity: %+v", err)
} }
} }
//move forward the tracking of when generation should occur
inQuestion.LastGeneration = inQuestion.NextGeneration
inQuestion.NextGeneration = nextNextGeneration.Add(time.Millisecond)
}
// Generate the timestamp for storage // if it is time to delete the id, process the delete
vo, err := marshalTimestamp(now) if inQuestion.ValidUntil != Forever && netTime.Now().After(inQuestion.ValidUntil) {
if err != nil { edits = true
jww.FATAL.Panicf("Could not marshal timestamp for storage: %+v", err) toRemove[i] = struct{}{}
} else {
// otherwise see if it is responsible for the next event
if inQuestion.NextGeneration.Before(nextEvent){
nextEvent = inQuestion.NextGeneration
}
if inQuestion.ValidUntil.Before(nextEvent){
nextEvent = inQuestion.ValidUntil
}
}
}
//process any deletions
if len(toRemove)>0{
newTracked := make([]trackedID,len(tracker.tracked))
for i := range tracker.tracked{
if _, remove := toRemove[i]; !remove {
newTracked = append(newTracked, tracker.tracked[i])
}
}
tracker.tracked=newTracked
} }
lastCheck = now
// Store the timestamp if edits{
if err = session.Set(TimestampKey, vo); err != nil { tracker.save()
jww.FATAL.Panicf("Could not store timestamp: %+v", err)
} }
// trigger events early. this will cause generations to happen
// early as well as message pickup. As a result, if there
// are time sync issues between clients and they begin sending
// to ephemerals early, messages will still be picked up
nextUpdate := nextEvent.Add(-validityGracePeriod)
// Sleep until the last ID has expired // Sleep until the last ID has expired
timeToSleep := calculateTickerTime(protoIds, now)
select { select {
case <-time.NewTimer(timeToSleep).C: case <-time.NewTimer(nextUpdate.Sub(nextUpdate)).C:
case addressSize = <-addressSizeUpdate: case newIdentity := <- tracker.newIdentity:
receptionStore.SetToExpire(addressSize) // if the identity is old, just update its properties
for i := range tracker.tracked{
inQuestion := tracker.tracked[i]
if inQuestion.Source.Cmp(newIdentity.Source){
inQuestion.Persistent = newIdentity.Persistent
inQuestion.ValidUntil = newIdentity.ValidUntil
tracker.save()
continue trackerLoop
}
}
//otherwise, add it to the list and run
tracker.tracked = append(tracker.tracked,newIdentity)
tracker.save()
continue trackerLoop
case deleteID := <- tracker.deleteIdentity:
for i := range tracker.tracked{
inQuestion := tracker.tracked[i]
if inQuestion.Source.Cmp(deleteID){
tracker.tracked = append(tracker.tracked[:i], tracker.tracked[i+1:]...)
tracker.save()
break
}
}
case <-stop.Quit(): case <-stop.Quit():
addrSpace.UnregisterNotification(addressSpaceSizeChanTag) addrSpace.UnregisterNotification(addressSpaceSizeChanTag)
stop.ToStopped() stop.ToStopped()
...@@ -132,32 +203,6 @@ func track(session *storage.Session, addrSpace *AddressSpace, ourId *id.ID, stop ...@@ -132,32 +203,6 @@ func track(session *storage.Session, addrSpace *AddressSpace, ourId *id.ID, stop
} }
} }
// generateIdentities generates a list of identities off of the list of passed
// in ProtoIdentity.
func generateIdentities(protoIds []ephemeral.ProtoIdentity, ourId *id.ID,
addressSize uint8) []reception.Identity {
identities := make([]reception.Identity, len(protoIds))
// Add identities for every ephemeral ID
for i, eid := range protoIds {
// Expand the grace period for both start and end
identities[i] = reception.Identity{
EphId: eid.Id,
Source: ourId,
AddressSize: addressSize,
End: eid.End,
StartValid: eid.Start.Add(-validityGracePeriod),
EndValid: eid.End.Add(validityGracePeriod),
Ephemeral: false,
ExtraChecks: interfaces.DefaultExtraChecks,
}
}
return identities
}
// checkTimestampStore performs a sanitation check of timestamp store. If a // checkTimestampStore performs a sanitation check of timestamp store. If a
// value has not been stored yet, then the current time is stored. // value has not been stored yet, then the current time is stored.
func checkTimestampStore(session *storage.Session) error { func checkTimestampStore(session *storage.Session) error {
...@@ -199,18 +244,75 @@ func marshalTimestamp(timeToStore time.Time) (*versioned.Object, error) { ...@@ -199,18 +244,75 @@ func marshalTimestamp(timeToStore time.Time) (*versioned.Object, error) {
}, err }, err
} }
// calculateTickerTime calculates the time for the ticker based off of the last func generateIdentitiesOverRange(lastGeneration, generateThrough time.Time,
// ephemeral ID to expire. source *id.ID, addressSize uint8 )([]receptionID.Identity, time.Time){
func calculateTickerTime(baseIDs []ephemeral.ProtoIdentity, now time.Time) time.Duration { protoIds, err := ephemeral.GetIdsByRange(
if len(baseIDs) == 0 { source, uint(addressSize), lastGeneration, generateThrough.Sub(lastGeneration))
return time.Duration(0)
jww.DEBUG.Printf("Now: %s, LastCheck: %s, Different: %s",
generateThrough, generateThrough, generateThrough.Sub(lastGeneration))
jww.DEBUG.Printf("protoIds Count: %d", len(protoIds))
if err != nil {
jww.FATAL.Panicf("Could not generate upcoming IDs: %+v", err)
} }
// get the last identity in the list // Generate identities off of that list
lastIdentity := baseIDs[len(baseIDs)-1] identities := make([]receptionID.Identity, len(protoIds))
// Add identities for every address ID
for i, eid := range protoIds {
// Expand the grace period for both start and end
identities[i] = receptionID.Identity{
EphId: eid.Id,
Source: source,
AddressSize: addressSize,
End: eid.End,
StartValid: eid.Start.Add(-validityGracePeriod),
EndValid: eid.End.Add(validityGracePeriod),
Ephemeral: false,
ExtraChecks: interfaces.DefaultExtraChecks,
}
// Factor out the grace period previously expanded upon }
// Calculate and return that duration
gracePeriod := lastIdentity.End.Add(-1 * validityGracePeriod) jww.INFO.Printf("Number of Identities Generated: %d", len(identities))
return gracePeriod.Sub(now) jww.INFO.Printf("Current Identity: %d (source: %s), Start: %s, End: %s",
identities[len(identities)-1].EphId.Int64(),
identities[len(identities)-1].Source,
identities[len(identities)-1].StartValid,
identities[len(identities)-1].EndValid)
return identities, identities[len(identities)-1].End
}
func (tracker *Tracker)save(){
persistant := make([]trackedID, 0, len(tracker.tracked))
for i := range tracker.tracked{
if tracker.tracked[i].Persistent{
persistant = append(persistant, tracker.tracked[i])
}
}
if len(persistant)==0{
return
}
data, err := json.Marshal(&persistant)
if err!=nil{
jww.FATAL.Panicf("Failed to marshal the tracked users")
}
obj := &versioned.Object{
Version: ,
Timestamp: netTime.Now(),
Data: data,
}
err = tracker.session.GetKV().Set(TrackerListKey, TrackerListVersion, obj)
if err!=nil{
jww.FATAL.Panicf("Failed to store the tracked users")
}
} }
\ No newline at end of file
...@@ -5,11 +5,12 @@ ...@@ -5,11 +5,12 @@
// LICENSE file // // LICENSE file //
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
package ephemeral package identity
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
ephemeral2 "gitlab.com/elixxir/client/network/address"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/mixmessages"
...@@ -27,7 +28,7 @@ import ( ...@@ -27,7 +28,7 @@ import (
// Smoke test for Track function // Smoke test for Track function
func TestCheck(t *testing.T) { func TestCheck(t *testing.T) {
session := storage.InitTestingSession(t) session := storage.InitTestingSession(t)
instance := NewTestNetworkManager(t) instance := ephemeral2.NewTestNetworkManager(t)
if err := setupInstance(instance); err != nil { if err := setupInstance(instance); err != nil {
t.Errorf("Could not set up instance: %+v", err) t.Errorf("Could not set up instance: %+v", err)
} }
...@@ -46,7 +47,7 @@ func TestCheck(t *testing.T) { ...@@ -46,7 +47,7 @@ func TestCheck(t *testing.T) {
} }
ourId := id.NewIdFromBytes([]byte("Sauron"), t) ourId := id.NewIdFromBytes([]byte("Sauron"), t)
stop := Track(session, NewTestAddressSpace(15, t), ourId) stop := Track(session, ephemeral2.NewTestAddressSpace(15, t), ourId)
err = stop.Close() err = stop.Close()
if err != nil { if err != nil {
...@@ -58,7 +59,7 @@ func TestCheck(t *testing.T) { ...@@ -58,7 +59,7 @@ func TestCheck(t *testing.T) {
// Unit test for track. // Unit test for track.
func TestCheck_Thread(t *testing.T) { func TestCheck_Thread(t *testing.T) {
session := storage.InitTestingSession(t) session := storage.InitTestingSession(t)
instance := NewTestNetworkManager(t) instance := ephemeral2.NewTestNetworkManager(t)
if err := setupInstance(instance); err != nil { if err := setupInstance(instance); err != nil {
t.Errorf("Could not set up instance: %v", err) t.Errorf("Could not set up instance: %v", err)
} }
...@@ -80,7 +81,7 @@ func TestCheck_Thread(t *testing.T) { ...@@ -80,7 +81,7 @@ func TestCheck_Thread(t *testing.T) {
// Run the tracker // Run the tracker
go func() { go func() {
track(session, NewTestAddressSpace(15, t), ourId, stop) track(session, ephemeral2.NewTestAddressSpace(15, t), ourId, stop)
}() }()
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
......
...@@ -18,9 +18,10 @@ import ( ...@@ -18,9 +18,10 @@ import (
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/network/ephemeral" "gitlab.com/elixxir/client/network/address"
"gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/gateway"
"gitlab.com/elixxir/client/network/health" "gitlab.com/elixxir/client/network/health"
"gitlab.com/elixxir/client/network/identity"
"gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/network/nodes" "gitlab.com/elixxir/client/network/nodes"
"gitlab.com/elixxir/client/network/rounds" "gitlab.com/elixxir/client/network/rounds"
...@@ -79,7 +80,7 @@ type manager struct { ...@@ -79,7 +80,7 @@ type manager struct {
verboseRounds *RoundTracker verboseRounds *RoundTracker
// Address space size // Address space size
addrSpace *ephemeral.AddressSpace addrSpace *address.AddressSpace
// Event reporting api // Event reporting api
events interfaces.EventManager events interfaces.EventManager
...@@ -104,7 +105,7 @@ func NewManager(session *storage.Session, ...@@ -104,7 +105,7 @@ func NewManager(session *storage.Session,
m := manager{ m := manager{
param: params, param: params,
tracker: &tracker, tracker: &tracker,
addrSpace: ephemeral.NewAddressSpace(), addrSpace: address.NewAddressSpace(),
events: events, events: events,
earliestRound: &earliest, earliestRound: &earliest,
session: session, session: session,
...@@ -170,7 +171,7 @@ func NewManager(session *storage.Session, ...@@ -170,7 +171,7 @@ func NewManager(session *storage.Session,
// - health Tracker (/network/health) // - health Tracker (/network/health)
// - Garbled Messages (/network/message/inProgress.go) // - Garbled Messages (/network/message/inProgress.go)
// - Critical Messages (/network/message/critical.go) // - Critical Messages (/network/message/critical.go)
// - Ephemeral ID tracking (network/ephemeral/tracker.go) // - Ephemeral ID tracking (network/address/tracker.go)
func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppable, error) { func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppable, error) {
multi := stoppable.NewMulti("networkManager") multi := stoppable.NewMulti("networkManager")
...@@ -196,7 +197,7 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab ...@@ -196,7 +197,7 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab
// Round processing // Round processing
multi.Add(m.round.StartProcessors()) multi.Add(m.round.StartProcessors())
multi.Add(ephemeral.Track(m.session, m.addrSpace, m.ReceptionID)) multi.Add(identity.Track(m.session, m.addrSpace, m.ReceptionID))
return multi, nil return multi, nil
} }
......
...@@ -32,7 +32,7 @@ Due to the extra overhead of trial hashing, triggers are processed after ...@@ -32,7 +32,7 @@ Due to the extra overhead of trial hashing, triggers are processed after
fingerprints. If a fingerprint match occurs on the message, triggers will not be fingerprints. If a fingerprint match occurs on the message, triggers will not be
handled. handled.
Triggers are ephemeral to the session. When starting a new client, all triggers Triggers are address to the session. When starting a new client, all triggers
must be re-added before StartNetworkFollower is called. must be re-added before StartNetworkFollower is called.
*/ */
......
...@@ -10,7 +10,7 @@ package rounds ...@@ -10,7 +10,7 @@ package rounds
import ( import (
"encoding/binary" "encoding/binary"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/storage/reception" "gitlab.com/elixxir/client/network/identity/receptionID"
"gitlab.com/elixxir/client/storage/rounds" "gitlab.com/elixxir/client/storage/rounds"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
) )
...@@ -53,7 +53,7 @@ func serializeRound(roundId id.Round) []byte { ...@@ -53,7 +53,7 @@ func serializeRound(roundId id.Round) []byte {
return b return b
} }
func (m *Manager) GetMessagesFromRound(roundID id.Round, identity reception.IdentityUse) { func (m *Manager) GetMessagesFromRound(roundID id.Round, identity receptionID.IdentityUse) {
ri, err := m.Instance.GetRound(roundID) ri, err := m.Instance.GetRound(roundID)
if err != nil || m.params.ForceHistoricalRounds { if err != nil || m.params.ForceHistoricalRounds {
if m.params.ForceHistoricalRounds { if m.params.ForceHistoricalRounds {
......
...@@ -10,8 +10,8 @@ package rounds ...@@ -10,8 +10,8 @@ package rounds
import ( import (
"fmt" "fmt"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/network/identity/receptionID"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage/reception"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
...@@ -36,7 +36,7 @@ type historicalRoundsComms interface { ...@@ -36,7 +36,7 @@ type historicalRoundsComms interface {
//structure which contains a historical round lookup //structure which contains a historical round lookup
type historicalRoundRequest struct { type historicalRoundRequest struct {
rid id.Round rid id.Round
identity reception.IdentityUse identity receptionID.IdentityUse
numAttempts uint numAttempts uint
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment