Skip to content
Snippets Groups Projects
Commit aa54e738 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

more work on channels

parent 2c6a5951
No related branches found
No related tags found
1 merge request!510Release
...@@ -51,20 +51,20 @@ type Tracker interface { ...@@ -51,20 +51,20 @@ type Tracker interface {
AddIdentity(id *id.ID, validUntil time.Time, persistent bool) AddIdentity(id *id.ID, validUntil time.Time, persistent bool)
RemoveIdentity(id *id.ID) RemoveIdentity(id *id.ID)
GetEphemeralIdentity(rng io.Reader, addressSize uint8) (receptionID.IdentityUse, error) GetEphemeralIdentity(rng io.Reader, addressSize uint8) (receptionID.IdentityUse, error)
GetIdentity(get *id.ID) (TrackedID, error) GetIdentity(get *id.ID) (trackedID, error)
} }
type manager struct { type manager struct {
tracked []TrackedID tracked []*trackedID
ephemeral *receptionID.Store ephemeral *receptionID.Store
session storage.Session session storage.Session
newIdentity chan TrackedID newIdentity chan trackedID
deleteIdentity chan *id.ID deleteIdentity chan *id.ID
addrSpace address.Space addrSpace address.Space
mux *sync.Mutex mux *sync.Mutex
} }
type TrackedID struct { type trackedID struct {
NextGeneration time.Time NextGeneration time.Time
LastGeneration time.Time LastGeneration time.Time
Source *id.ID Source *id.ID
...@@ -76,9 +76,9 @@ type TrackedID struct { ...@@ -76,9 +76,9 @@ type TrackedID struct {
func NewOrLoadTracker(session storage.Session, addrSpace address.Space) *manager { func NewOrLoadTracker(session storage.Session, addrSpace address.Space) *manager {
// Initialization // Initialization
t := &manager{ t := &manager{
tracked: make([]TrackedID, 0), tracked: make([]*trackedID, 0),
session: session, session: session,
newIdentity: make(chan TrackedID, trackedIDChanSize), newIdentity: make(chan trackedID, trackedIDChanSize),
deleteIdentity: make(chan *id.ID, deleteIDChanSize), deleteIdentity: make(chan *id.ID, deleteIDChanSize),
addrSpace: addrSpace, addrSpace: addrSpace,
mux: &sync.Mutex{}, mux: &sync.Mutex{},
...@@ -92,7 +92,7 @@ func NewOrLoadTracker(session storage.Session, addrSpace address.Space) *manager ...@@ -92,7 +92,7 @@ func NewOrLoadTracker(session storage.Session, addrSpace address.Space) *manager
jww.WARN.Printf("No tracked identities found, creating a new " + jww.WARN.Printf("No tracked identities found, creating a new " +
"tracked identity from legacy stored timestamp.") "tracked identity from legacy stored timestamp.")
t.tracked = append(t.tracked, TrackedID{ t.tracked = append(t.tracked, &trackedID{
// Make the next generation now so a generation triggers on // Make the next generation now so a generation triggers on
// first run // first run
NextGeneration: netTime.Now(), NextGeneration: netTime.Now(),
...@@ -128,7 +128,7 @@ func (t *manager) StartProcesses() stoppable.Stoppable { ...@@ -128,7 +128,7 @@ func (t *manager) StartProcesses() stoppable.Stoppable {
// AddIdentity adds an identity to be tracked. // AddIdentity adds an identity to be tracked.
func (t *manager) AddIdentity(id *id.ID, validUntil time.Time, persistent bool) { func (t *manager) AddIdentity(id *id.ID, validUntil time.Time, persistent bool) {
t.newIdentity <- TrackedID{ t.newIdentity <- trackedID{
NextGeneration: netTime.Now().Add(-time.Second), NextGeneration: netTime.Now().Add(-time.Second),
LastGeneration: netTime.Now().Add(-time.Duration(ephemeral.Period)), LastGeneration: netTime.Now().Add(-time.Duration(ephemeral.Period)),
Source: id, Source: id,
...@@ -150,7 +150,7 @@ func (t *manager) GetEphemeralIdentity(rng io.Reader, addressSize uint8) ( ...@@ -150,7 +150,7 @@ func (t *manager) GetEphemeralIdentity(rng io.Reader, addressSize uint8) (
} }
// GetIdentity returns a currently tracked identity // GetIdentity returns a currently tracked identity
func (t *manager) GetIdentity(get *id.ID) (TrackedID, error) { func (t *manager) GetIdentity(get *id.ID) (*trackedID, error) {
t.mux.Lock() t.mux.Lock()
defer t.mux.Unlock() defer t.mux.Unlock()
for i := range t.tracked { for i := range t.tracked {
...@@ -158,7 +158,7 @@ func (t *manager) GetIdentity(get *id.ID) (TrackedID, error) { ...@@ -158,7 +158,7 @@ func (t *manager) GetIdentity(get *id.ID) (TrackedID, error) {
return t.tracked[i], nil return t.tracked[i], nil
} }
} }
return TrackedID{}, errors.Errorf("could not find id %s", get) return nil, errors.Errorf("could not find id %s", get)
} }
func (t *manager) track(stop *stoppable.Single) { func (t *manager) track(stop *stoppable.Single) {
...@@ -201,7 +201,7 @@ func (t *manager) track(stop *stoppable.Single) { ...@@ -201,7 +201,7 @@ func (t *manager) track(stop *stoppable.Single) {
if !isOld { if !isOld {
jww.DEBUG.Printf("Tracking new identity %s", newIdentity.Source) jww.DEBUG.Printf("Tracking new identity %s", newIdentity.Source)
// Otherwise, add it to the list and run // Otherwise, add it to the list and run
t.tracked = append(t.tracked, newIdentity) t.tracked = append(t.tracked, &newIdentity)
} }
t.save() t.save()
...@@ -236,7 +236,8 @@ func (t *manager) processIdentities(addressSize uint8) time.Time { ...@@ -236,7 +236,8 @@ func (t *manager) processIdentities(addressSize uint8) time.Time {
nextEvent := netTime.Now().Add(time.Duration(ephemeral.Period)) nextEvent := netTime.Now().Add(time.Duration(ephemeral.Period))
// Loop through every tracked ID and see if any operations are needed // Loop through every tracked ID and see if any operations are needed
for i, inQuestion := range t.tracked { for i := range t.tracked {
inQuestion := t.tracked[i]
// Generate new ephemeral if is time for it // Generate new ephemeral if is time for it
if netTime.Now().After(inQuestion.NextGeneration) { if netTime.Now().After(inQuestion.NextGeneration) {
nextGeneration := t.generateIdentitiesOverRange(inQuestion, addressSize) nextGeneration := t.generateIdentitiesOverRange(inQuestion, addressSize)
...@@ -267,7 +268,7 @@ func (t *manager) processIdentities(addressSize uint8) time.Time { ...@@ -267,7 +268,7 @@ func (t *manager) processIdentities(addressSize uint8) time.Time {
// Process any deletions // Process any deletions
if len(toRemove) > 0 { if len(toRemove) > 0 {
newTracked := make([]TrackedID, 0, len(t.tracked)) newTracked := make([]*trackedID, 0, len(t.tracked))
for i := range t.tracked { for i := range t.tracked {
if _, remove := toRemove[i]; !remove { if _, remove := toRemove[i]; !remove {
newTracked = append(newTracked, t.tracked[i]) newTracked = append(newTracked, t.tracked[i])
...@@ -305,7 +306,7 @@ func unmarshalTimestamp(lastTimestampObj *versioned.Object) (time.Time, error) { ...@@ -305,7 +306,7 @@ func unmarshalTimestamp(lastTimestampObj *versioned.Object) (time.Time, error) {
// generateIdentitiesOverRange generates and adds all not yet existing ephemeral Ids // generateIdentitiesOverRange generates and adds all not yet existing ephemeral Ids
// and returns the timestamp of the next generation for the given TrackedID // and returns the timestamp of the next generation for the given TrackedID
func (t *manager) generateIdentitiesOverRange(inQuestion TrackedID, func (t *manager) generateIdentitiesOverRange(inQuestion *trackedID,
addressSize uint8) time.Time { addressSize uint8) time.Time {
// Ensure that ephemeral IDs will not be generated after the // Ensure that ephemeral IDs will not be generated after the
// identity is invalid // identity is invalid
...@@ -370,7 +371,7 @@ func (t *manager) generateIdentitiesOverRange(inQuestion TrackedID, ...@@ -370,7 +371,7 @@ func (t *manager) generateIdentitiesOverRange(inQuestion TrackedID,
func (t *manager) save() { func (t *manager) save() {
t.mux.Lock() t.mux.Lock()
defer t.mux.Unlock() defer t.mux.Unlock()
persistent := make([]TrackedID, 0, len(t.tracked)) persistent := make([]*trackedID, 0, len(t.tracked))
for i := range t.tracked { for i := range t.tracked {
if t.tracked[i].Persistent { if t.tracked[i].Persistent {
......
...@@ -67,9 +67,9 @@ func TestManager_processIdentities(t *testing.T) { ...@@ -67,9 +67,9 @@ func TestManager_processIdentities(t *testing.T) {
addrSpace.UpdateAddressSpace(18) addrSpace.UpdateAddressSpace(18)
session := storage.InitTestingSession(t) session := storage.InitTestingSession(t)
m := &manager{ m := &manager{
tracked: make([]TrackedID, 0), tracked: make([]*trackedID, 0),
session: session, session: session,
newIdentity: make(chan TrackedID, trackedIDChanSize), newIdentity: make(chan trackedID, trackedIDChanSize),
deleteIdentity: make(chan *id.ID, deleteIDChanSize), deleteIdentity: make(chan *id.ID, deleteIDChanSize),
addrSpace: addrSpace, addrSpace: addrSpace,
ephemeral: receptionID.NewOrLoadStore(session.GetKV()), ephemeral: receptionID.NewOrLoadStore(session.GetKV()),
...@@ -79,7 +79,7 @@ func TestManager_processIdentities(t *testing.T) { ...@@ -79,7 +79,7 @@ func TestManager_processIdentities(t *testing.T) {
// Add some expired test IDs // Add some expired test IDs
testId := id.NewIdFromUInt(0, id.User, t) testId := id.NewIdFromUInt(0, id.User, t)
validUntil := netTime.Now().Add(time.Minute) validUntil := netTime.Now().Add(time.Minute)
m.tracked = append(m.tracked, TrackedID{ m.tracked = append(m.tracked, &trackedID{
NextGeneration: netTime.Now(), NextGeneration: netTime.Now(),
LastGeneration: time.Time{}, LastGeneration: time.Time{},
Source: testId, Source: testId,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment