Skip to content
Snippets Groups Projects
Commit b7040c47 authored by benjamin's avatar benjamin
Browse files

built the system to sequentually pick up old ephemerals, breaks interfaces and tests

parent 715fb5a8
No related branches found
No related tags found
2 merge requests!510Release,!428built the system to sequentually pick up old ephemerals, breaks interfaces and tests
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/cmix/message"
crypto "gitlab.com/elixxir/crypto/broadcast" crypto "gitlab.com/elixxir/crypto/broadcast"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
"time"
) )
// broadcastClient implements the [broadcast.Channel] interface for sending/ // broadcastClient implements the [broadcast.Channel] interface for sending/
...@@ -42,7 +43,8 @@ func NewBroadcastChannel(channel *crypto.Channel, net Client, ...@@ -42,7 +43,8 @@ func NewBroadcastChannel(channel *crypto.Channel, net Client,
} }
// Add channel's identity // Add channel's identity
net.AddIdentity(channel.ReceptionID, identity.Forever, true) net.AddIdentityWithHistory(channel.ReceptionID, identity.Forever,
time.Now().Add(-500*time.Second), true)
jww.INFO.Printf("New broadcast channel client created for channel %q (%s)", jww.INFO.Printf("New broadcast channel client created for channel %q (%s)",
channel.Name, channel.ReceptionID) channel.Name, channel.ReceptionID)
......
...@@ -92,7 +92,7 @@ type Client interface { ...@@ -92,7 +92,7 @@ type Client interface {
SendWithAssembler(recipient *id.ID, assembler cmix.MessageAssembler, SendWithAssembler(recipient *id.ID, assembler cmix.MessageAssembler,
cmixParams cmix.CMIXParams) (rounds.Round, ephemeral.Id, error) cmixParams cmix.CMIXParams) (rounds.Round, ephemeral.Id, error)
IsHealthy() bool IsHealthy() bool
AddIdentity(id *id.ID, validUntil time.Time, persistent bool) AddIdentityWithHistory(id *id.ID, validUntil, beginning time.Time, persistent bool)
AddService(clientID *id.ID, newService message.Service, AddService(clientID *id.ID, newService message.Service,
response message.Processor) response message.Processor)
DeleteClientService(clientID *id.ID) DeleteClientService(clientID *id.ID)
......
...@@ -110,6 +110,17 @@ func (m *mockCmix) AddIdentity(id *id.ID, _ time.Time, _ bool) { ...@@ -110,6 +110,17 @@ func (m *mockCmix) AddIdentity(id *id.ID, _ time.Time, _ bool) {
m.handler.processorMap[*id] = make(map[string][]message.Processor) m.handler.processorMap[*id] = make(map[string][]message.Processor)
} }
func (m *mockCmix) AddIdentityWithHistory(id *id.ID, _, _ time.Time, _ bool) {
m.handler.Lock()
defer m.handler.Unlock()
if _, exists := m.handler.processorMap[*id]; exists {
return
}
m.handler.processorMap[*id] = make(map[string][]message.Processor)
}
func (m *mockCmix) AddService(clientID *id.ID, newService message.Service, func (m *mockCmix) AddService(clientID *id.ID, newService message.Service,
response message.Processor) { response message.Processor) {
m.handler.Lock() m.handler.Lock()
......
...@@ -48,6 +48,11 @@ type Identity struct { ...@@ -48,6 +48,11 @@ type Identity struct {
// Makes the identity not store on disk // Makes the identity not store on disk
Ephemeral bool Ephemeral bool
// When this identity expired, it will auto add processNext to the identity list
// to be processed. In practice this is a reverse ordered list and is added whenever
// many identities are added at once in order to pick up sequentially
ProcessNext *Identity
} }
func loadIdentity(kv *versioned.KV) (Identity, error) { func loadIdentity(kv *versioned.KV) (Identity, error) {
......
...@@ -203,9 +203,14 @@ func (s *Store) ForEach(n int, rng io.Reader, ...@@ -203,9 +203,14 @@ func (s *Store) ForEach(n int, rng io.Reader,
} }
func (s *Store) AddIdentity(identity Identity) error { func (s *Store) AddIdentity(identity Identity) error {
idH := makeIdHash(identity.EphId, identity.Source)
s.mux.Lock() s.mux.Lock()
defer s.mux.Unlock() defer s.mux.Unlock()
return s.addIdentity(identity)
}
func (s *Store) addIdentity(identity Identity) error {
idH := makeIdHash(identity.EphId, identity.Source)
// Do not make duplicates of IDs // Do not make duplicates of IDs
if _, ok := s.present[idH]; ok { if _, ok := s.present[idH]; ok {
...@@ -319,6 +324,13 @@ func (s *Store) prune(now time.Time) { ...@@ -319,6 +324,13 @@ func (s *Store) prune(now time.Time) {
for i := 0; i < len(s.active); i++ { for i := 0; i < len(s.active); i++ {
inQuestion := s.active[i] inQuestion := s.active[i]
if now.After(inQuestion.End) && inQuestion.ExtraChecks == 0 { if now.After(inQuestion.End) && inQuestion.ExtraChecks == 0 {
if inQuestion.ProcessNext != nil {
if err := s.AddIdentity(*inQuestion.ProcessNext); err != nil {
jww.ERROR.Printf("Failed to add identity to process next "+
"for %d(%s). The identity chain may be lost",
inQuestion.EphId.Int64(), inQuestion.Source)
}
}
if err := inQuestion.Delete(); err != nil { if err := inQuestion.Delete(); err != nil {
jww.ERROR.Printf("Failed to delete Identity for %s: %+v", jww.ERROR.Printf("Failed to delete Identity for %s: %+v",
inQuestion, err) inQuestion, err)
......
...@@ -43,11 +43,14 @@ const ( ...@@ -43,11 +43,14 @@ const (
// DefaultExtraChecks is the default value for ExtraChecks // DefaultExtraChecks is the default value for ExtraChecks
// on receptionID.Identity. // on receptionID.Identity.
DefaultExtraChecks = 10 DefaultExtraChecks = 10
NetworkRetention = 500 * time.Hour
) )
type Tracker interface { type Tracker interface {
StartProcesses() stoppable.Stoppable StartProcesses() stoppable.Stoppable
AddIdentity(id *id.ID, validUntil time.Time, persistent bool) AddIdentity(id *id.ID, validUntil time.Time, persistent bool)
AddIdentityWithHistory(id *id.ID, validUntil, beginning time.Time, persistent bool)
RemoveIdentity(id *id.ID) RemoveIdentity(id *id.ID)
ForEach(n int, rng io.Reader, addressSize uint8, ForEach(n int, rng io.Reader, addressSize uint8,
operator func([]receptionID.IdentityUse) error) error operator func([]receptionID.IdentityUse) error) error
...@@ -73,7 +76,7 @@ type TrackedID struct { ...@@ -73,7 +76,7 @@ type TrackedID struct {
Creation time.Time Creation time.Time
} }
func NewOrLoadTracker(session storage.Session, addrSpace address.Space) *manager { func NewOrLoadTracker(session storage.Session, addrSpace address.Space) Tracker {
// Initialization // Initialization
t := &manager{ t := &manager{
tracked: make([]*TrackedID, 0), tracked: make([]*TrackedID, 0),
...@@ -128,9 +131,27 @@ func (t *manager) StartProcesses() stoppable.Stoppable { ...@@ -128,9 +131,27 @@ func (t *manager) StartProcesses() stoppable.Stoppable {
// AddIdentity adds an identity to be tracked. // AddIdentity adds an identity to be tracked.
func (t *manager) AddIdentity(id *id.ID, validUntil time.Time, persistent bool) { func (t *manager) AddIdentity(id *id.ID, validUntil time.Time, persistent bool) {
lastGeneration := netTime.Now().Add(-time.Duration(ephemeral.Period))
t.newIdentity <- TrackedID{
NextGeneration: netTime.Now().Add(-time.Second),
LastGeneration: lastGeneration,
Source: id,
ValidUntil: validUntil,
Persistent: persistent,
Creation: netTime.Now(),
}
}
// AddIdentityWithHistory adds an identity to be tracked which will slowly pick up history.
func (t *manager) AddIdentityWithHistory(id *id.ID, validUntil, historicalBeginning time.Time, persistent bool) {
retention := netTime.Now().Add(-NetworkRetention)
if historicalBeginning.Before(retention) {
historicalBeginning = retention
}
t.newIdentity <- TrackedID{ t.newIdentity <- TrackedID{
NextGeneration: netTime.Now().Add(-time.Second), NextGeneration: netTime.Now().Add(-time.Second),
LastGeneration: netTime.Now().Add(-time.Duration(ephemeral.Period)), LastGeneration: historicalBeginning,
Source: id, Source: id,
ValidUntil: validUntil, ValidUntil: validUntil,
Persistent: persistent, Persistent: persistent,
...@@ -331,9 +352,13 @@ func (t *manager) generateIdentitiesOverRange(inQuestion *TrackedID, ...@@ -331,9 +352,13 @@ func (t *manager) generateIdentitiesOverRange(inQuestion *TrackedID,
jww.FATAL.Panicf("Could not generate upcoming IDs: %+v", err) jww.FATAL.Panicf("Could not generate upcoming IDs: %+v", err)
} }
identitiesToAdd := make([]receptionID.Identity, 0, len(protoIds))
identitiesToChain := make([]receptionID.Identity, 0, len(protoIds))
// Add identities for every address ID // Add identities for every address ID
lastIdentityEnd := time.Time{} lastIdentityEnd := time.Time{}
for i, eid := range protoIds { for i, _ := range protoIds {
eid := protoIds[i]
// Expand the grace period for both start and end // Expand the grace period for both start and end
newIdentity := receptionID.Identity{ newIdentity := receptionID.Identity{
EphemeralIdentity: receptionID.EphemeralIdentity{ EphemeralIdentity: receptionID.EphemeralIdentity{
...@@ -355,20 +380,43 @@ func (t *manager) generateIdentitiesOverRange(inQuestion *TrackedID, ...@@ -355,20 +380,43 @@ func (t *manager) generateIdentitiesOverRange(inQuestion *TrackedID,
} }
newIdentity.Ephemeral = !inQuestion.Persistent newIdentity.Ephemeral = !inQuestion.Persistent
if err := t.ephemeral.AddIdentity(newIdentity); err != nil {
jww.FATAL.Panicf("Could not insert identity: %+v", err) // If the identity expired before the current time, we know it
// is no longer valid and should be added to the chain
if netTime.Now().After(newIdentity.EndValid) {
identitiesToChain = append(identitiesToChain, newIdentity)
} else {
identitiesToAdd = append(identitiesToAdd, newIdentity)
} }
// Print debug information and set return value
if isLastIdentity := i == len(protoIds)-1; isLastIdentity { if isLastIdentity := i == len(protoIds)-1; isLastIdentity {
jww.INFO.Printf("Current Identity: %d (source: %s), Start: %s, "+ jww.INFO.Printf("Current Identity: %d (source: %s), Start: %s, "+
"End: %s, addrSize: %d", "End: %s, addrSize: %d",
newIdentity.EphId.Int64(), identitiesToAdd[i].EphId.Int64(),
newIdentity.Source, identitiesToAdd[i].Source,
newIdentity.StartValid, identitiesToAdd[i].StartValid,
newIdentity.EndValid, identitiesToAdd[i].EndValid,
addressSize) addressSize)
lastIdentityEnd = newIdentity.End lastIdentityEnd = identitiesToAdd[i].End
}
}
//link the chain
if len(identitiesToChain) > 0 {
firstLink := identitiesToChain[len(identitiesToChain)-1]
currentLink := firstLink
if len(identitiesToChain) > 1 {
for i := len(identitiesToChain) - 1; i >= 0; i-- {
currentLink.ProcessNext = &identitiesToChain[i]
}
}
identitiesToAdd = append(identitiesToAdd, firstLink)
}
//add the identities
for i := 0; i < len(identitiesToAdd); i++ {
if err = t.ephemeral.AddIdentity(identitiesToAdd[i]); err != nil {
jww.FATAL.Panicf("Could not insert identity: %+v", err)
} }
} }
......
...@@ -116,6 +116,13 @@ type Client interface { ...@@ -116,6 +116,13 @@ type Client interface {
// the identity will not be stored to disk and will be dropped on reload. // the identity will not be stored to disk and will be dropped on reload.
AddIdentity(id *id.ID, validUntil time.Time, persistent bool) AddIdentity(id *id.ID, validUntil time.Time, persistent bool)
// AddIdentityWithHistory adds an identity to be tracked. If persistent is
// false, the identity will not be stored to disk and will be dropped on
// reload. It will pickup messages slowly back in the history or up back
// until beginning or the start of message retention, which should be ~500
// houses back
AddIdentityWithHistory(id *id.ID, validUntil, beginning time.Time, persistent bool)
// RemoveIdentity removes a currently tracked identity. // RemoveIdentity removes a currently tracked identity.
RemoveIdentity(id *id.ID) RemoveIdentity(id *id.ID)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment