From b7040c47b55215173aa13440069292482db2feae Mon Sep 17 00:00:00 2001 From: benjamin <ben@elixxir.io> Date: Tue, 25 Oct 2022 09:33:22 -0700 Subject: [PATCH] built the system to sequentually pick up old ephemerals, breaks interfaces and tests --- broadcast/client.go | 4 +- broadcast/interface.go | 2 +- broadcast/utils_test.go | 11 +++++ cmix/identity/receptionID/identity.go | 5 ++ cmix/identity/receptionID/store.go | 14 +++++- cmix/identity/tracker.go | 70 ++++++++++++++++++++++----- cmix/interface.go | 7 +++ 7 files changed, 99 insertions(+), 14 deletions(-) diff --git a/broadcast/client.go b/broadcast/client.go index 1b4891a00..e24c72248 100644 --- a/broadcast/client.go +++ b/broadcast/client.go @@ -14,6 +14,7 @@ import ( "gitlab.com/elixxir/client/cmix/message" crypto "gitlab.com/elixxir/crypto/broadcast" "gitlab.com/elixxir/crypto/fastRNG" + "time" ) // broadcastClient implements the [broadcast.Channel] interface for sending/ @@ -42,7 +43,8 @@ func NewBroadcastChannel(channel *crypto.Channel, net Client, } // Add channel's identity - net.AddIdentity(channel.ReceptionID, identity.Forever, true) + net.AddIdentityWithHistory(channel.ReceptionID, identity.Forever, + time.Now().Add(-500*time.Second), true) jww.INFO.Printf("New broadcast channel client created for channel %q (%s)", channel.Name, channel.ReceptionID) diff --git a/broadcast/interface.go b/broadcast/interface.go index 3c674c0db..4c390995a 100644 --- a/broadcast/interface.go +++ b/broadcast/interface.go @@ -92,7 +92,7 @@ type Client interface { SendWithAssembler(recipient *id.ID, assembler cmix.MessageAssembler, cmixParams cmix.CMIXParams) (rounds.Round, ephemeral.Id, error) IsHealthy() bool - AddIdentity(id *id.ID, validUntil time.Time, persistent bool) + AddIdentityWithHistory(id *id.ID, validUntil, beginning time.Time, persistent bool) AddService(clientID *id.ID, newService message.Service, response message.Processor) DeleteClientService(clientID *id.ID) diff --git a/broadcast/utils_test.go b/broadcast/utils_test.go index ee1dfebc2..299a2f663 100644 --- a/broadcast/utils_test.go +++ b/broadcast/utils_test.go @@ -110,6 +110,17 @@ func (m *mockCmix) AddIdentity(id *id.ID, _ time.Time, _ bool) { m.handler.processorMap[*id] = make(map[string][]message.Processor) } +func (m *mockCmix) AddIdentityWithHistory(id *id.ID, _, _ time.Time, _ bool) { + m.handler.Lock() + defer m.handler.Unlock() + + if _, exists := m.handler.processorMap[*id]; exists { + return + } + + m.handler.processorMap[*id] = make(map[string][]message.Processor) +} + func (m *mockCmix) AddService(clientID *id.ID, newService message.Service, response message.Processor) { m.handler.Lock() diff --git a/cmix/identity/receptionID/identity.go b/cmix/identity/receptionID/identity.go index bba782378..e8126015c 100644 --- a/cmix/identity/receptionID/identity.go +++ b/cmix/identity/receptionID/identity.go @@ -48,6 +48,11 @@ type Identity struct { // Makes the identity not store on disk Ephemeral bool + + // When this identity expired, it will auto add processNext to the identity list + // to be processed. In practice this is a reverse ordered list and is added whenever + // many identities are added at once in order to pick up sequentially + ProcessNext *Identity } func loadIdentity(kv *versioned.KV) (Identity, error) { diff --git a/cmix/identity/receptionID/store.go b/cmix/identity/receptionID/store.go index 967c22e03..35efbb226 100644 --- a/cmix/identity/receptionID/store.go +++ b/cmix/identity/receptionID/store.go @@ -203,9 +203,14 @@ func (s *Store) ForEach(n int, rng io.Reader, } func (s *Store) AddIdentity(identity Identity) error { - idH := makeIdHash(identity.EphId, identity.Source) + s.mux.Lock() defer s.mux.Unlock() + return s.addIdentity(identity) +} + +func (s *Store) addIdentity(identity Identity) error { + idH := makeIdHash(identity.EphId, identity.Source) // Do not make duplicates of IDs if _, ok := s.present[idH]; ok { @@ -319,6 +324,13 @@ func (s *Store) prune(now time.Time) { for i := 0; i < len(s.active); i++ { inQuestion := s.active[i] if now.After(inQuestion.End) && inQuestion.ExtraChecks == 0 { + if inQuestion.ProcessNext != nil { + if err := s.AddIdentity(*inQuestion.ProcessNext); err != nil { + jww.ERROR.Printf("Failed to add identity to process next "+ + "for %d(%s). The identity chain may be lost", + inQuestion.EphId.Int64(), inQuestion.Source) + } + } if err := inQuestion.Delete(); err != nil { jww.ERROR.Printf("Failed to delete Identity for %s: %+v", inQuestion, err) diff --git a/cmix/identity/tracker.go b/cmix/identity/tracker.go index 7ab08da03..912ec8849 100644 --- a/cmix/identity/tracker.go +++ b/cmix/identity/tracker.go @@ -43,11 +43,14 @@ const ( // DefaultExtraChecks is the default value for ExtraChecks // on receptionID.Identity. DefaultExtraChecks = 10 + + NetworkRetention = 500 * time.Hour ) type Tracker interface { StartProcesses() stoppable.Stoppable AddIdentity(id *id.ID, validUntil time.Time, persistent bool) + AddIdentityWithHistory(id *id.ID, validUntil, beginning time.Time, persistent bool) RemoveIdentity(id *id.ID) ForEach(n int, rng io.Reader, addressSize uint8, operator func([]receptionID.IdentityUse) error) error @@ -73,7 +76,7 @@ type TrackedID struct { Creation time.Time } -func NewOrLoadTracker(session storage.Session, addrSpace address.Space) *manager { +func NewOrLoadTracker(session storage.Session, addrSpace address.Space) Tracker { // Initialization t := &manager{ tracked: make([]*TrackedID, 0), @@ -128,9 +131,27 @@ func (t *manager) StartProcesses() stoppable.Stoppable { // AddIdentity adds an identity to be tracked. func (t *manager) AddIdentity(id *id.ID, validUntil time.Time, persistent bool) { + lastGeneration := netTime.Now().Add(-time.Duration(ephemeral.Period)) + t.newIdentity <- TrackedID{ + NextGeneration: netTime.Now().Add(-time.Second), + LastGeneration: lastGeneration, + Source: id, + ValidUntil: validUntil, + Persistent: persistent, + Creation: netTime.Now(), + } +} + +// AddIdentityWithHistory adds an identity to be tracked which will slowly pick up history. +func (t *manager) AddIdentityWithHistory(id *id.ID, validUntil, historicalBeginning time.Time, persistent bool) { + retention := netTime.Now().Add(-NetworkRetention) + if historicalBeginning.Before(retention) { + historicalBeginning = retention + } + t.newIdentity <- TrackedID{ NextGeneration: netTime.Now().Add(-time.Second), - LastGeneration: netTime.Now().Add(-time.Duration(ephemeral.Period)), + LastGeneration: historicalBeginning, Source: id, ValidUntil: validUntil, Persistent: persistent, @@ -331,9 +352,13 @@ func (t *manager) generateIdentitiesOverRange(inQuestion *TrackedID, jww.FATAL.Panicf("Could not generate upcoming IDs: %+v", err) } + identitiesToAdd := make([]receptionID.Identity, 0, len(protoIds)) + identitiesToChain := make([]receptionID.Identity, 0, len(protoIds)) + // Add identities for every address ID lastIdentityEnd := time.Time{} - for i, eid := range protoIds { + for i, _ := range protoIds { + eid := protoIds[i] // Expand the grace period for both start and end newIdentity := receptionID.Identity{ EphemeralIdentity: receptionID.EphemeralIdentity{ @@ -355,20 +380,43 @@ func (t *manager) generateIdentitiesOverRange(inQuestion *TrackedID, } newIdentity.Ephemeral = !inQuestion.Persistent - if err := t.ephemeral.AddIdentity(newIdentity); err != nil { - jww.FATAL.Panicf("Could not insert identity: %+v", err) + + // If the identity expired before the current time, we know it + // is no longer valid and should be added to the chain + if netTime.Now().After(newIdentity.EndValid) { + identitiesToChain = append(identitiesToChain, newIdentity) + } else { + identitiesToAdd = append(identitiesToAdd, newIdentity) } - // Print debug information and set return value if isLastIdentity := i == len(protoIds)-1; isLastIdentity { jww.INFO.Printf("Current Identity: %d (source: %s), Start: %s, "+ "End: %s, addrSize: %d", - newIdentity.EphId.Int64(), - newIdentity.Source, - newIdentity.StartValid, - newIdentity.EndValid, + identitiesToAdd[i].EphId.Int64(), + identitiesToAdd[i].Source, + identitiesToAdd[i].StartValid, + identitiesToAdd[i].EndValid, addressSize) - lastIdentityEnd = newIdentity.End + lastIdentityEnd = identitiesToAdd[i].End + } + } + + //link the chain + if len(identitiesToChain) > 0 { + firstLink := identitiesToChain[len(identitiesToChain)-1] + currentLink := firstLink + if len(identitiesToChain) > 1 { + for i := len(identitiesToChain) - 1; i >= 0; i-- { + currentLink.ProcessNext = &identitiesToChain[i] + } + } + identitiesToAdd = append(identitiesToAdd, firstLink) + } + + //add the identities + for i := 0; i < len(identitiesToAdd); i++ { + if err = t.ephemeral.AddIdentity(identitiesToAdd[i]); err != nil { + jww.FATAL.Panicf("Could not insert identity: %+v", err) } } diff --git a/cmix/interface.go b/cmix/interface.go index f6572fb1d..1d0bfdaeb 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -116,6 +116,13 @@ type Client interface { // the identity will not be stored to disk and will be dropped on reload. AddIdentity(id *id.ID, validUntil time.Time, persistent bool) + // AddIdentityWithHistory adds an identity to be tracked. If persistent is + // false, the identity will not be stored to disk and will be dropped on + // reload. It will pickup messages slowly back in the history or up back + // until beginning or the start of message retention, which should be ~500 + // houses back + AddIdentityWithHistory(id *id.ID, validUntil, beginning time.Time, persistent bool) + // RemoveIdentity removes a currently tracked identity. RemoveIdentity(id *id.ID) -- GitLab