Skip to content
Snippets Groups Projects

Release

11 files
+ 440
264
Compare changes
  • Side-by-side
  • Inline

Files

+ 31
8
package notifications
import (
"errors"
"fmt"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/notifications-bot/storage"
"gitlab.com/xx_network/primitives/id/ephemeral"
"gorm.io/gorm"
"strconv"
"time"
)
const offsetPhase = ephemeral.Period / ephemeral.NumOffsets
const creationLead = 5 * time.Minute
const deletionDelay = -(time.Duration(ephemeral.Period) + creationLead)
const ephemeralStateKey = "lastEphemeralOffset"
// EphIdCreator runs as a thread to track ephemeral IDs for users who registered to receive push notifications
func (nb *Impl) EphIdCreator() {
@@ -28,14 +29,16 @@ func (nb *Impl) EphIdCreator() {
func (nb *Impl) initCreator() {
// Retrieve most recent ephemeral from storage
var lastEpochTime time.Time
lastEph, err := nb.Storage.GetLatestEphemeral()
if err != nil && errors.Is(err, gorm.ErrRecordNotFound) {
jww.WARN.Printf("Failed to get latest ephemeral (no records found): %+v", err)
lastEphEpoch, err := nb.Storage.GetStateValue(ephemeralStateKey)
if err != nil {
jww.WARN.Printf("Failed to get latest ephemeral: %+v", err)
lastEpochTime = time.Now().Add(-time.Duration(ephemeral.Period))
} else if err != nil {
jww.FATAL.Panicf("Database lookup for latest ephemeral failed: %+v", err)
} else {
lastEpochTime = time.Unix(0, int64(lastEph.Epoch)*offsetPhase) // Epoch time of last ephemeral ID
lastEpochInt, err := strconv.Atoi(lastEphEpoch)
if err != nil {
jww.FATAL.Printf("Failed to convert last epoch to int: %+v", err)
}
lastEpochTime = time.Unix(0, int64(lastEpochInt)*offsetPhase) // Epoch time of last ephemeral ID
// If the last epoch is further back than the ephemeral ID period, only go back one period for generation
if lastEpochTime.Before(time.Now().Add(-time.Duration(ephemeral.Period))) {
lastEpochTime = time.Now().Add(-time.Duration(ephemeral.Period))
@@ -49,6 +52,22 @@ func (nb *Impl) initCreator() {
// handle the next epoch
_, epoch := ephemeral.HandleQuantization(lastEpochTime)
nextTrigger := time.Unix(0, int64(epoch)*offsetPhase)
// Check for users with no associated ephemerals, add them if found (this should not happen unless there were issues)
orphaned, err := nb.Storage.GetOrphanedUsers()
if err != nil {
jww.FATAL.Panicf("Failed to retrieve orphaned users: %+v", err)
}
if len(orphaned) > 0 {
jww.WARN.Printf("Found %d orphaned users in database", len(orphaned))
}
for _, u := range orphaned {
_, err := nb.Storage.AddLatestEphemeral(u, epoch, uint(nb.inst.GetPartialNdf().Get().AddressSpace[0].Size)) // TODO: is this the correct epoch? Should we do the previous one as well?
if err != nil {
jww.WARN.Printf("Failed to add latest ephemeral for orphaned user %+v: %+v", u.TransmissionRSAHash, err)
}
}
jww.INFO.Println(fmt.Sprintf("Sleeping until next trigger at %+v", nextTrigger))
time.Sleep(time.Until(nextTrigger))
}
@@ -61,6 +80,10 @@ func (nb *Impl) addEphemerals(start time.Time) {
if err != nil {
jww.WARN.Printf("failed to update ephemerals: %+v", err)
}
err = nb.Storage.UpsertState(&storage.State{
Key: ephemeralStateKey,
Value: strconv.Itoa(int(epoch)),
})
}
func (nb *Impl) EphIdDeleter() {
Loading