Skip to content
Snippets Groups Projects

Restructure notification code for batching

Files

+ 90
89
@@ -33,17 +33,12 @@ import (
"gitlab.com/xx_network/primitives/ndf"
"gitlab.com/xx_network/primitives/netTime"
"gitlab.com/xx_network/primitives/utils"
"gorm.io/gorm"
"strings"
"time"
)
const notificationsTag = "notificationData"
// Function type definitions for the main operations (poll and notify)
type NotifyFunc func(int64, []*notifications.Data, *apns.ApnsComm,
*firebase.FirebaseComm, *storage.Storage, int, int) ([]*notifications.Data, error)
// Params struct holds info passed in for configuration
type Params struct {
Address string
@@ -55,6 +50,8 @@ type Params struct {
NotificationRate int
APNS APNSParams
}
// APNSParams holds config info specific to apple's push notification service
type APNSParams struct {
KeyPath string
KeyID string
@@ -63,12 +60,11 @@ type APNSParams struct {
Dev bool
}
// Local impl for notifications; holds comms, storage object, creds and main functions
// Impl for notifications; holds comms, storage object, creds and main functions
type Impl struct {
Comms *notificationBot.Comms
Storage *storage.Storage
inst *network.Instance
notifyFunc NotifyFunc
fcm *firebase.FirebaseComm
apnsClient *apns.ApnsComm
receivedNdf *uint32
@@ -114,7 +110,6 @@ func StartNotifications(params Params, noTLS, noFirebase bool) (*Impl, error) {
receivedNdf := uint32(0)
impl := &Impl{
notifyFunc: notifyUser,
fcm: fbComm,
receivedNdf: &receivedNdf,
maxNotifications: params.NotificationsPerBatch,
@@ -186,93 +181,92 @@ func NewImplementation(instance *Impl) *notificationBot.Implementation {
return impl
}
// NotifyUser accepts a UID and service key file path.
// It handles the logic involved in retrieving a user's token and sending the notification
func notifyUser(ephID int64, data []*notifications.Data, apnsClient *apns.ApnsComm,
fc *firebase.FirebaseComm, db *storage.Storage, maxBatchSize, maxBytes int) ([]*notifications.Data, error) {
elist, err := db.GetEphemeral(ephID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
jww.DEBUG.Printf("No registration found for ephemeral ID %+v", ephID)
// This path is not an error. if no results are returned, the user hasn't registered for notifications
return nil, nil
// SendBatch accepts the map of ephemeralID:list[notifications.Data]
// It handles logic for building the CSV & sending to devices
func (nb *Impl) SendBatch(data map[int64][]*notifications.Data) ([]*notifications.Data, error) {
csvs := map[int64]string{}
var ephemerals []int64
var unsent []*notifications.Data
for i, ilist := range data {
var overflow, trimmed []*notifications.Data
if len(data) > nb.maxNotifications {
overflow = ilist[nb.maxNotifications:]
trimmed = ilist[:nb.maxNotifications]
}
return nil, errors.WithMessagef(err, "Could not retrieve registration for ephemeral ID %+v", ephID)
notifs, rest := notifications.BuildNotificationCSV(trimmed, nb.maxPayloadBytes-len([]byte(notificationsTag)))
for _, nd := range rest {
nb.Storage.GetNotificationBuffer().Add(id.Round(nd.RoundID), []*notifications.Data{nd})
}
overflow = append(overflow, rest...)
csvs[i] = string(notifs)
ephemerals = append(ephemerals, i)
unsent = append(unsent, overflow...)
}
var overflow []*notifications.Data
if len(data) > maxBatchSize {
overflow = data[maxBatchSize:]
data = data[:maxBatchSize]
toNotify, err := nb.Storage.GetToNotify(ephemerals)
if err != nil {
return nil, errors.WithMessage(err, "Failed to get list of tokens to notify")
}
notifs, rest := notifications.BuildNotificationCSV(data, maxBytes-len([]byte(notificationsTag)))
for _, nd := range rest {
db.GetNotificationBuffer().Add(id.Round(nd.RoundID), []*notifications.Data{nd}) // TODO build lists by rid for more efficient re-insertion? Accumulator would let us do this with the other size check in swap
for _, n := range toNotify {
nb.notify(csvs[n.EphemeralId], n)
}
overflow = append(overflow, rest...)
notificationsCSV := string(notifs)
return unsent, nil
}
for _, e := range elist {
u, err := db.GetUserByHash(e.TransmissionRSAHash)
if err != nil {
jww.ERROR.Printf("Failed to lookup user with tRSA hash %+v: %+v", e.TransmissionRSAHash, err)
continue
// notify is a helper function which handles sending notifications to either APNS or firebase
func (nb *Impl) notify(csv string, toNotify storage.GTNResult) {
isAPNS := !strings.Contains(toNotify.Token, ":")
// mutableContent := 1
if isAPNS {
jww.INFO.Printf("Notifying ephemeral ID %+v via APNS to token %+v", toNotify.EphemeralId, toNotify.Token)
notifPayload := payload.NewPayload().AlertTitle("Privacy: protected!").AlertBody(
"Some notifications are not for you to ensure privacy; we hope to remove this notification soon").MutableContent().Custom(
notificationsTag, csv)
notif := &apns2.Notification{
CollapseID: base64.StdEncoding.EncodeToString(toNotify.TransmissionRSAHash),
DeviceToken: toNotify.Token,
Expiration: time.Now().Add(time.Hour * 24 * 7),
Priority: apns2.PriorityHigh,
Payload: notifPayload,
PushType: apns2.PushTypeAlert,
Topic: nb.apnsClient.GetTopic(),
}
isAPNS := !strings.Contains(u.Token, ":")
// mutableContent := 1
if isAPNS {
jww.INFO.Printf("Notifying ephemeral ID %+v via APNS to token %+v", ephID, u.Token)
notifPayload := payload.NewPayload().AlertTitle("Privacy: protected!").AlertBody(
"Some notifications are not for you to ensure privacy; we hope to remove this notification soon").MutableContent().Custom(
notificationsTag, notificationsCSV)
notif := &apns2.Notification{
CollapseID: base64.StdEncoding.EncodeToString(u.TransmissionRSAHash),
DeviceToken: u.Token,
Expiration: time.Now().Add(time.Hour * 24 * 7),
Priority: apns2.PriorityHigh,
Payload: notifPayload,
PushType: apns2.PushTypeAlert,
Topic: apnsClient.GetTopic(),
}
resp, err := apnsClient.Push(notif)
if err != nil {
jww.ERROR.Printf("Failed to send notification via APNS: %+v: %+v", resp, err)
// TODO : Should be re-enabled for specific error cases? deep dive on apns docs may be helpful
//err := db.DeleteUserByHash(u.TransmissionRSAHash)
//if err != nil {
// return errors.WithMessagef(err, "Failed to remove user registration tRSA hash: %+v", u.TransmissionRSAHash)
//}
} else {
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", ephID, u.Token, resp)
}
resp, err := nb.apnsClient.Push(notif)
if err != nil {
jww.ERROR.Printf("Failed to send notification via APNS: %+v: %+v", resp, err)
// TODO : Should be re-enabled for specific error cases? deep dive on apns docs may be helpful
//err := db.DeleteUserByHash(u.TransmissionRSAHash)
//if err != nil {
// return errors.WithMessagef(err, "Failed to remove user registration tRSA hash: %+v", u.TransmissionRSAHash)
//}
} else {
resp, err := fc.SendNotification(fc.Client, u.Token, notificationsCSV)
if err != nil {
// Catch firebase errors that we don't want to crash on
// 404 indicate that the token stored is incorrect
// this means rather than crashing we should log and unregister the user
// 400 can also indicate incorrect token, do extra checking on this (12/27/2021)
// Error documentation: https://firebase.google.com/docs/reference/fcm/rest/v1/ErrorCode
// Stale token documentation: https://firebase.google.com/docs/cloud-messaging/manage-tokens
jww.ERROR.Printf("Error sending notification: %+v", err)
invalidToken := strings.Contains(err.Error(), "400") &&
strings.Contains(err.Error(), "Invalid registration")
if strings.Contains(err.Error(), "404") || invalidToken {
jww.ERROR.Printf("User with Transmission RSA hash %+v has invalid token, unregistering...", u.TransmissionRSAHash)
err := db.DeleteUserByHash(u.TransmissionRSAHash)
if err != nil {
jww.ERROR.Printf("Failed to remove user registration tRSA hash %+v: %+v", u.TransmissionRSAHash, err)
}
} else {
jww.ERROR.Printf("Failed to send notification to user with tRSA hash %+v: %+v", u.TransmissionRSAHash, err)
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", toNotify.EphemeralId, toNotify.Token, resp)
}
} else {
resp, err := nb.fcm.SendNotification(nb.fcm.Client, toNotify.Token, csv)
if err != nil {
// Catch firebase errors that we don't want to crash on
// 404 indicate that the token stored is incorrect
// this means rather than crashing we should log and unregister the user
// 400 can also indicate incorrect token, do extra checking on this (12/27/2021)
// Error documentation: https://firebase.google.com/docs/reference/fcm/rest/v1/ErrorCode
// Stale token documentation: https://firebase.google.com/docs/cloud-messaging/manage-tokens
jww.ERROR.Printf("Error sending notification via FCM: %+v", err)
invalidToken := strings.Contains(err.Error(), "400") &&
strings.Contains(err.Error(), "Invalid registration")
if strings.Contains(err.Error(), "404") || invalidToken {
jww.ERROR.Printf("User with Transmission RSA hash %+v has invalid token, unregistering...", toNotify.TransmissionRSAHash)
err := nb.Storage.DeleteUserByHash(toNotify.TransmissionRSAHash)
if err != nil {
jww.ERROR.Printf("Failed to remove user registration tRSA hash %+v: %+v", toNotify.TransmissionRSAHash, err)
}
} else {
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", ephID, u.Token, resp)
jww.ERROR.Printf("Failed to send notification to user with tRSA hash %+v: %+v", toNotify.TransmissionRSAHash, err)
}
} else {
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", toNotify.EphemeralId, toNotify.Token, resp)
}
}
return overflow, nil
}
// RegisterForNotifications is called by the client, and adds a user registration to our database
@@ -425,20 +419,27 @@ func (nb *Impl) Sender(sendFreq int) {
select {
case <-sendTicker.C:
go func() {
// Retreive & swap notification buffer
notifBuf := nb.Storage.GetNotificationBuffer()
notifMap := notifBuf.Swap()
unsent := map[uint64][]*notifications.Data{}
for ephID := range notifMap {
localEphID := ephID
notifList := notifMap[localEphID]
rest, err := nb.notifyFunc(localEphID, notifList, nb.apnsClient, nb.fcm, nb.Storage, nb.maxNotifications, nb.maxPayloadBytes)
if err != nil {
jww.ERROR.Printf("Failed to notify %d: %+v", localEphID, err)
rest, err := nb.SendBatch(notifMap)
if err != nil {
jww.ERROR.Printf("Failed to send notification batch: %+v", err)
// If we fail to run SendBatch, put everything back in unsent
for _, elist := range notifMap {
for _, n := range elist {
unsent[n.RoundID] = append(unsent[n.RoundID], n)
}
}
} else {
// Loop through rest and add to unsent map
for _, n := range rest {
unsent[n.RoundID] = append(unsent[n.RoundID], n)
}
}
// Re-add unsent notifications to the buffer
for rid, nd := range unsent {
notifBuf.Add(id.Round(rid), nd)
}
Loading