diff --git a/broadcast/client.go b/broadcast/client.go index 1b4891a00bdf4d8ae9ce6bf31c2b020b356fa114..e24c72248f6a7105b9bd6675b79a39a6c74134fd 100644 --- a/broadcast/client.go +++ b/broadcast/client.go @@ -14,6 +14,7 @@ import ( "gitlab.com/elixxir/client/cmix/message" crypto "gitlab.com/elixxir/crypto/broadcast" "gitlab.com/elixxir/crypto/fastRNG" + "time" ) // broadcastClient implements the [broadcast.Channel] interface for sending/ @@ -42,7 +43,8 @@ func NewBroadcastChannel(channel *crypto.Channel, net Client, } // Add channel's identity - net.AddIdentity(channel.ReceptionID, identity.Forever, true) + net.AddIdentityWithHistory(channel.ReceptionID, identity.Forever, + time.Now().Add(-500*time.Second), true) jww.INFO.Printf("New broadcast channel client created for channel %q (%s)", channel.Name, channel.ReceptionID) diff --git a/broadcast/interface.go b/broadcast/interface.go index 3c674c0db3438862296e8980afcb8177eb3d2a9a..4c390995abaf0b942e5d17ae03b14064fd7fac0d 100644 --- a/broadcast/interface.go +++ b/broadcast/interface.go @@ -92,7 +92,7 @@ type Client interface { SendWithAssembler(recipient *id.ID, assembler cmix.MessageAssembler, cmixParams cmix.CMIXParams) (rounds.Round, ephemeral.Id, error) IsHealthy() bool - AddIdentity(id *id.ID, validUntil time.Time, persistent bool) + AddIdentityWithHistory(id *id.ID, validUntil, beginning time.Time, persistent bool) AddService(clientID *id.ID, newService message.Service, response message.Processor) DeleteClientService(clientID *id.ID) diff --git a/broadcast/utils_test.go b/broadcast/utils_test.go index ee1dfebc264c208e037bcbd45385bac6160fa154..299a2f663d387ea0f911f58bf2a18623e1d7f8c2 100644 --- a/broadcast/utils_test.go +++ b/broadcast/utils_test.go @@ -110,6 +110,17 @@ func (m *mockCmix) AddIdentity(id *id.ID, _ time.Time, _ bool) { m.handler.processorMap[*id] = make(map[string][]message.Processor) } +func (m *mockCmix) AddIdentityWithHistory(id *id.ID, _, _ time.Time, _ bool) { + m.handler.Lock() + defer m.handler.Unlock() + + if _, exists := m.handler.processorMap[*id]; exists { + return + } + + m.handler.processorMap[*id] = make(map[string][]message.Processor) +} + func (m *mockCmix) AddService(clientID *id.ID, newService message.Service, response message.Processor) { m.handler.Lock() diff --git a/cmix/identity/receptionID/identity.go b/cmix/identity/receptionID/identity.go index bba7823789480636710b520d79d4deeadbfa1dbd..e8126015c290ab86ef44bc317b5bf65d727c0ddd 100644 --- a/cmix/identity/receptionID/identity.go +++ b/cmix/identity/receptionID/identity.go @@ -48,6 +48,11 @@ type Identity struct { // Makes the identity not store on disk Ephemeral bool + + // When this identity expired, it will auto add processNext to the identity list + // to be processed. In practice this is a reverse ordered list and is added whenever + // many identities are added at once in order to pick up sequentially + ProcessNext *Identity } func loadIdentity(kv *versioned.KV) (Identity, error) { diff --git a/cmix/identity/receptionID/store.go b/cmix/identity/receptionID/store.go index 967c22e034270d726b2c4e216d460dc767e0dbd2..35efbb2266a03bbe16600766ca452f3a287ff53f 100644 --- a/cmix/identity/receptionID/store.go +++ b/cmix/identity/receptionID/store.go @@ -203,9 +203,14 @@ func (s *Store) ForEach(n int, rng io.Reader, } func (s *Store) AddIdentity(identity Identity) error { - idH := makeIdHash(identity.EphId, identity.Source) + s.mux.Lock() defer s.mux.Unlock() + return s.addIdentity(identity) +} + +func (s *Store) addIdentity(identity Identity) error { + idH := makeIdHash(identity.EphId, identity.Source) // Do not make duplicates of IDs if _, ok := s.present[idH]; ok { @@ -319,6 +324,13 @@ func (s *Store) prune(now time.Time) { for i := 0; i < len(s.active); i++ { inQuestion := s.active[i] if now.After(inQuestion.End) && inQuestion.ExtraChecks == 0 { + if inQuestion.ProcessNext != nil { + if err := s.AddIdentity(*inQuestion.ProcessNext); err != nil { + jww.ERROR.Printf("Failed to add identity to process next "+ + "for %d(%s). The identity chain may be lost", + inQuestion.EphId.Int64(), inQuestion.Source) + } + } if err := inQuestion.Delete(); err != nil { jww.ERROR.Printf("Failed to delete Identity for %s: %+v", inQuestion, err) diff --git a/cmix/identity/tracker.go b/cmix/identity/tracker.go index 7ab08da034fbdd1dbbbfd35ff54b7fc72c273929..912ec8849b71dd3fb1fb9c9c5bea28012a47ff31 100644 --- a/cmix/identity/tracker.go +++ b/cmix/identity/tracker.go @@ -43,11 +43,14 @@ const ( // DefaultExtraChecks is the default value for ExtraChecks // on receptionID.Identity. DefaultExtraChecks = 10 + + NetworkRetention = 500 * time.Hour ) type Tracker interface { StartProcesses() stoppable.Stoppable AddIdentity(id *id.ID, validUntil time.Time, persistent bool) + AddIdentityWithHistory(id *id.ID, validUntil, beginning time.Time, persistent bool) RemoveIdentity(id *id.ID) ForEach(n int, rng io.Reader, addressSize uint8, operator func([]receptionID.IdentityUse) error) error @@ -73,7 +76,7 @@ type TrackedID struct { Creation time.Time } -func NewOrLoadTracker(session storage.Session, addrSpace address.Space) *manager { +func NewOrLoadTracker(session storage.Session, addrSpace address.Space) Tracker { // Initialization t := &manager{ tracked: make([]*TrackedID, 0), @@ -128,9 +131,27 @@ func (t *manager) StartProcesses() stoppable.Stoppable { // AddIdentity adds an identity to be tracked. func (t *manager) AddIdentity(id *id.ID, validUntil time.Time, persistent bool) { + lastGeneration := netTime.Now().Add(-time.Duration(ephemeral.Period)) + t.newIdentity <- TrackedID{ + NextGeneration: netTime.Now().Add(-time.Second), + LastGeneration: lastGeneration, + Source: id, + ValidUntil: validUntil, + Persistent: persistent, + Creation: netTime.Now(), + } +} + +// AddIdentityWithHistory adds an identity to be tracked which will slowly pick up history. +func (t *manager) AddIdentityWithHistory(id *id.ID, validUntil, historicalBeginning time.Time, persistent bool) { + retention := netTime.Now().Add(-NetworkRetention) + if historicalBeginning.Before(retention) { + historicalBeginning = retention + } + t.newIdentity <- TrackedID{ NextGeneration: netTime.Now().Add(-time.Second), - LastGeneration: netTime.Now().Add(-time.Duration(ephemeral.Period)), + LastGeneration: historicalBeginning, Source: id, ValidUntil: validUntil, Persistent: persistent, @@ -331,9 +352,13 @@ func (t *manager) generateIdentitiesOverRange(inQuestion *TrackedID, jww.FATAL.Panicf("Could not generate upcoming IDs: %+v", err) } + identitiesToAdd := make([]receptionID.Identity, 0, len(protoIds)) + identitiesToChain := make([]receptionID.Identity, 0, len(protoIds)) + // Add identities for every address ID lastIdentityEnd := time.Time{} - for i, eid := range protoIds { + for i, _ := range protoIds { + eid := protoIds[i] // Expand the grace period for both start and end newIdentity := receptionID.Identity{ EphemeralIdentity: receptionID.EphemeralIdentity{ @@ -355,20 +380,43 @@ func (t *manager) generateIdentitiesOverRange(inQuestion *TrackedID, } newIdentity.Ephemeral = !inQuestion.Persistent - if err := t.ephemeral.AddIdentity(newIdentity); err != nil { - jww.FATAL.Panicf("Could not insert identity: %+v", err) + + // If the identity expired before the current time, we know it + // is no longer valid and should be added to the chain + if netTime.Now().After(newIdentity.EndValid) { + identitiesToChain = append(identitiesToChain, newIdentity) + } else { + identitiesToAdd = append(identitiesToAdd, newIdentity) } - // Print debug information and set return value if isLastIdentity := i == len(protoIds)-1; isLastIdentity { jww.INFO.Printf("Current Identity: %d (source: %s), Start: %s, "+ "End: %s, addrSize: %d", - newIdentity.EphId.Int64(), - newIdentity.Source, - newIdentity.StartValid, - newIdentity.EndValid, + identitiesToAdd[i].EphId.Int64(), + identitiesToAdd[i].Source, + identitiesToAdd[i].StartValid, + identitiesToAdd[i].EndValid, addressSize) - lastIdentityEnd = newIdentity.End + lastIdentityEnd = identitiesToAdd[i].End + } + } + + //link the chain + if len(identitiesToChain) > 0 { + firstLink := identitiesToChain[len(identitiesToChain)-1] + currentLink := firstLink + if len(identitiesToChain) > 1 { + for i := len(identitiesToChain) - 1; i >= 0; i-- { + currentLink.ProcessNext = &identitiesToChain[i] + } + } + identitiesToAdd = append(identitiesToAdd, firstLink) + } + + //add the identities + for i := 0; i < len(identitiesToAdd); i++ { + if err = t.ephemeral.AddIdentity(identitiesToAdd[i]); err != nil { + jww.FATAL.Panicf("Could not insert identity: %+v", err) } } diff --git a/cmix/interface.go b/cmix/interface.go index f6572fb1dbdf6811052cb5d541e05286288c20cb..1d0bfdaeb09c25ab4b26018233ab0b8b52e93964 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -116,6 +116,13 @@ type Client interface { // the identity will not be stored to disk and will be dropped on reload. AddIdentity(id *id.ID, validUntil time.Time, persistent bool) + // AddIdentityWithHistory adds an identity to be tracked. If persistent is + // false, the identity will not be stored to disk and will be dropped on + // reload. It will pickup messages slowly back in the history or up back + // until beginning or the start of message retention, which should be ~500 + // houses back + AddIdentityWithHistory(id *id.ID, validUntil, beginning time.Time, persistent bool) + // RemoveIdentity removes a currently tracked identity. RemoveIdentity(id *id.ID)