Skip to content
Snippets Groups Projects
Commit 4732d426 authored by Josh Brooks's avatar Josh Brooks
Browse files

Add code for proccessing ephemeral Ids

parent 63fcc21a
No related branches found
No related tags found
No related merge requests found
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package ephemeral
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/globals"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/reception"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
ephemeralStore "gitlab.com/elixxir/client/storage/ephemeral"
"time"
)
const checkInterval = time.Duration(500) * time.Second
const ephemeralIdSie = 64
const validityGracePeriod = 5 * time.Minute
// Check runs a thread which checks for past and present ephemeral ids
func Check(session *storage.Session, ourId *id.ID) stoppable.Stoppable {
stop := stoppable.NewSingle("EphemeralCheck")
go check(session, ourId, stop)
return stop
}
// check is a thread which continuously processes ephemeral ids. If any error occurs,
// the thread crashes
func check(session *storage.Session, ourId *id.ID, stop *stoppable.Single) {
t := time.NewTicker(checkInterval)
ephStore := session.Ephemeral()
identityStore := session.Reception()
for true {
select {
case <-t.C:
err := processEphemeralIds(ourId, ephStore, identityStore)
if err != nil {
globals.Log.FATAL.Panicf("Could not " +
"process ephemeral ids: %v", err)
}
err = ephStore.UpdateTimestamp(time.Now())
if err != nil {
break
}
case <-stop.Quit():
break
}
}
}
// processEphemeralIds periodically checks for past and present ephemeral ids.
// It then adds identities for these ids if needed
func processEphemeralIds(ourId *id.ID, ephemeralStore *ephemeralStore.Store,
identityStore *reception.Store) error {
// Get the timestamp of the last check
lastCheck, err := ephemeralStore.GetTimestamp()
if err != nil {
return errors.Errorf("Could not get time stamp in " +
"ephemeral store: %v", err)
}
// Find out how long that last check was
timeSinceLastCheck := time.Now().Sub(lastCheck)
// Generate ephemeral ids in the range of the last check
eids, err := ephemeral.GetIdsByRange(ourId, ephemeralIdSie,
time.Now().UnixNano(), timeSinceLastCheck)
if err != nil {
return errors.Errorf("Could not generate ephemeral ids: %v", err)
}
// Add identities for every ephemeral id
for _, eid := range eids {
err = identityStore.AddIdentity(reception.Identity{
EphId: eid.Id,
Source: ourId,
End: time.Now().Add(validityGracePeriod),
StartValid: eid.Start,
EndValid: eid.End,
Ephemeral: false,
})
if err != nil {
return errors.Errorf("Could not add identity for " +
"generated ephemeral ID: %v", err)
}
}
return nil
}
...@@ -7,13 +7,14 @@ ...@@ -7,13 +7,14 @@
package network package network
// manager.go controls access to network resources. Interprocess communications // check.go controls access to network resources. Interprocess communications
// and intraclient state are accessible through the context object. // and intraclient state are accessible through the context object.
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/network/ephemeral"
"gitlab.com/elixxir/client/network/health" "gitlab.com/elixxir/client/network/health"
"gitlab.com/elixxir/client/network/internal" "gitlab.com/elixxir/client/network/internal"
"gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/network/message"
...@@ -44,7 +45,6 @@ type manager struct { ...@@ -44,7 +45,6 @@ type manager struct {
//sub-managers //sub-managers
round *rounds.Manager round *rounds.Manager
message *message.Manager message *message.Manager
//atomic denotes if the network is running //atomic denotes if the network is running
running *uint32 running *uint32
} }
...@@ -126,6 +126,9 @@ func (m *manager) Follow() (stoppable.Stoppable, error) { ...@@ -126,6 +126,9 @@ func (m *manager) Follow() (stoppable.Stoppable, error) {
// Round processing // Round processing
multi.Add(m.round.StartProcessors()) multi.Add(m.round.StartProcessors())
m.Session.Ephemeral()
// Ephemeral ID tracking
multi.Add(ephemeral.Check(m.Session, m.Comms.Id))
//set the running status back to 0 so it can be started again //set the running status back to 0 so it can be started again
closer := stoppable.NewCleanup(multi, func(time.Duration) error { closer := stoppable.NewCleanup(multi, func(time.Duration) error {
......
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// Copyright © 2021 xx network SEZC // // Copyright © 2020 xx network SEZC //
// // // //
// Use of this source code is governed by a license that can be found in the // // Use of this source code is governed by a license that can be found in the //
// LICENSE file // // LICENSE file //
......
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// Copyright © 2021 xx network SEZC // // Copyright © 2020 xx network SEZC //
// // // //
// Use of this source code is governed by a license that can be found in the // // Use of this source code is governed by a license that can be found in the //
// LICENSE file // // LICENSE file //
......
...@@ -17,7 +17,9 @@ import ( ...@@ -17,7 +17,9 @@ import (
"gitlab.com/elixxir/client/storage/cmix" "gitlab.com/elixxir/client/storage/cmix"
"gitlab.com/elixxir/client/storage/conversation" "gitlab.com/elixxir/client/storage/conversation"
"gitlab.com/elixxir/client/storage/e2e" "gitlab.com/elixxir/client/storage/e2e"
"gitlab.com/elixxir/client/storage/ephemeral"
"gitlab.com/elixxir/client/storage/partition" "gitlab.com/elixxir/client/storage/partition"
"gitlab.com/elixxir/client/storage/reception"
"gitlab.com/elixxir/client/storage/user" "gitlab.com/elixxir/client/storage/user"
"gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/utility"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
...@@ -56,6 +58,9 @@ type Session struct { ...@@ -56,6 +58,9 @@ type Session struct {
criticalRawMessages *utility.CmixMessageBuffer criticalRawMessages *utility.CmixMessageBuffer
garbledMessages *utility.MeteredCmixMessageBuffer garbledMessages *utility.MeteredCmixMessageBuffer
checkedRounds *utility.KnownRounds checkedRounds *utility.KnownRounds
ephemeral *ephemeral.Store
reception *reception.Store
} }
// Initialize a new Session object // Initialize a new Session object
...@@ -136,6 +141,13 @@ func New(baseDir, password string, u userInterface.User, cmixGrp, ...@@ -136,6 +141,13 @@ func New(baseDir, password string, u userInterface.User, cmixGrp,
s.conversations = conversation.NewStore(s.kv) s.conversations = conversation.NewStore(s.kv)
s.partition = partition.New(s.kv) s.partition = partition.New(s.kv)
s.ephemeral, err = ephemeral.NewStore(s.kv)
if err != nil {
return nil, errors.WithMessage(err, "Failed to ephemeralId tracking store")
}
s.reception = reception.NewStore(s.kv)
return s, nil return s, nil
} }
...@@ -197,6 +209,11 @@ func Load(baseDir, password string, rng *fastRNG.StreamGenerator) (*Session, err ...@@ -197,6 +209,11 @@ func Load(baseDir, password string, rng *fastRNG.StreamGenerator) (*Session, err
s.conversations = conversation.NewStore(s.kv) s.conversations = conversation.NewStore(s.kv)
s.partition = partition.New(s.kv) s.partition = partition.New(s.kv)
s.ephemeral, err = ephemeral.LoadStore(s.kv)
if err != nil {
return nil, errors.WithMessage(err, "Failed to ephemeral store")
}
return s, nil return s, nil
} }
...@@ -260,6 +277,18 @@ func (s *Session) Partition() *partition.Store { ...@@ -260,6 +277,18 @@ func (s *Session) Partition() *partition.Store {
return s.partition return s.partition
} }
func (s *Session) Ephemeral() *ephemeral.Store {
s.mux.RLock()
defer s.mux.RUnlock()
return s.ephemeral
}
func (s *Session) Reception() *reception.Store {
s.mux.RLock()
defer s.mux.RUnlock()
return s.reception
}
// Get an object from the session // Get an object from the session
func (s *Session) Get(key string) (*versioned.Object, error) { func (s *Session) Get(key string) (*versioned.Object, error) {
return s.kv.Get(key) return s.kv.Get(key)
...@@ -344,5 +373,10 @@ func InitTestingSession(i interface{}) *Session { ...@@ -344,5 +373,10 @@ func InitTestingSession(i interface{}) *Session {
s.conversations = conversation.NewStore(s.kv) s.conversations = conversation.NewStore(s.kv)
s.partition = partition.New(s.kv) s.partition = partition.New(s.kv)
s.ephemeral, err = ephemeral.NewStore(s.kv)
if err != nil {
globals.Log.FATAL.Panicf("Failed to create ephemeral store: %+v", err)
}
return s return s
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment