Skip to content
Snippets Groups Projects

Restructure notification code for batching

Files

+ 87
88
@@ -33,17 +33,12 @@ import (
@@ -33,17 +33,12 @@ import (
"gitlab.com/xx_network/primitives/ndf"
"gitlab.com/xx_network/primitives/ndf"
"gitlab.com/xx_network/primitives/netTime"
"gitlab.com/xx_network/primitives/netTime"
"gitlab.com/xx_network/primitives/utils"
"gitlab.com/xx_network/primitives/utils"
"gorm.io/gorm"
"strings"
"strings"
"time"
"time"
)
)
const notificationsTag = "notificationData"
const notificationsTag = "notificationData"
// Function type definitions for the main operations (poll and notify)
type NotifyFunc func(int64, []*notifications.Data, *apns.ApnsComm,
*firebase.FirebaseComm, *storage.Storage, int, int) ([]*notifications.Data, error)
// Params struct holds info passed in for configuration
// Params struct holds info passed in for configuration
type Params struct {
type Params struct {
Address string
Address string
@@ -68,7 +63,6 @@ type Impl struct {
@@ -68,7 +63,6 @@ type Impl struct {
Comms *notificationBot.Comms
Comms *notificationBot.Comms
Storage *storage.Storage
Storage *storage.Storage
inst *network.Instance
inst *network.Instance
notifyFunc NotifyFunc
fcm *firebase.FirebaseComm
fcm *firebase.FirebaseComm
apnsClient *apns.ApnsComm
apnsClient *apns.ApnsComm
receivedNdf *uint32
receivedNdf *uint32
@@ -114,7 +108,6 @@ func StartNotifications(params Params, noTLS, noFirebase bool) (*Impl, error) {
@@ -114,7 +108,6 @@ func StartNotifications(params Params, noTLS, noFirebase bool) (*Impl, error) {
receivedNdf := uint32(0)
receivedNdf := uint32(0)
impl := &Impl{
impl := &Impl{
notifyFunc: notifyUser,
fcm: fbComm,
fcm: fbComm,
receivedNdf: &receivedNdf,
receivedNdf: &receivedNdf,
maxNotifications: params.NotificationsPerBatch,
maxNotifications: params.NotificationsPerBatch,
@@ -186,93 +179,92 @@ func NewImplementation(instance *Impl) *notificationBot.Implementation {
@@ -186,93 +179,92 @@ func NewImplementation(instance *Impl) *notificationBot.Implementation {
return impl
return impl
}
}
// NotifyUser accepts a UID and service key file path.
// SendBatch accepts the map of ephemeralID:list[notifications.Data]
// It handles the logic involved in retrieving a user's token and sending the notification
// It handles logic for building the CSV & sending to devices
func notifyUser(ephID int64, data []*notifications.Data, apnsClient *apns.ApnsComm,
func (nb *Impl) SendBatch(data map[int64][]*notifications.Data) ([]*notifications.Data, error) {
fc *firebase.FirebaseComm, db *storage.Storage, maxBatchSize, maxBytes int) ([]*notifications.Data, error) {
csvs := map[int64]string{}
elist, err := db.GetEphemeral(ephID)
var ephemerals []int64
if err != nil {
var unsent []*notifications.Data
if errors.Is(err, gorm.ErrRecordNotFound) {
for i, ilist := range data {
jww.DEBUG.Printf("No registration found for ephemeral ID %+v", ephID)
var overflow, trimmed []*notifications.Data
// This path is not an error. if no results are returned, the user hasn't registered for notifications
if len(data) > nb.maxNotifications {
return nil, nil
overflow = ilist[nb.maxNotifications:]
 
trimmed = ilist[:nb.maxNotifications]
 
}
 
 
notifs, rest := notifications.BuildNotificationCSV(trimmed, nb.maxPayloadBytes-len([]byte(notificationsTag)))
 
for _, nd := range rest {
 
nb.Storage.GetNotificationBuffer().Add(id.Round(nd.RoundID), []*notifications.Data{nd})
}
}
return nil, errors.WithMessagef(err, "Could not retrieve registration for ephemeral ID %+v", ephID)
overflow = append(overflow, rest...)
 
csvs[i] = string(notifs)
 
ephemerals = append(ephemerals, i)
 
unsent = append(unsent, overflow...)
}
}
var overflow []*notifications.Data
toNotify, err := nb.Storage.GetToNotify(ephemerals)
if len(data) > maxBatchSize {
if err != nil {
overflow = data[maxBatchSize:]
return nil, errors.WithMessage(err, "Failed to get list of tokens to notify")
data = data[:maxBatchSize]
}
}
for _, n := range toNotify {
notifs, rest := notifications.BuildNotificationCSV(data, maxBytes-len([]byte(notificationsTag)))
nb.notify(csvs[n.EphemeralId], n)
for _, nd := range rest {
db.GetNotificationBuffer().Add(id.Round(nd.RoundID), []*notifications.Data{nd}) // TODO build lists by rid for more efficient re-insertion? Accumulator would let us do this with the other size check in swap
}
}
overflow = append(overflow, rest...)
return unsent, nil
notificationsCSV := string(notifs)
}
for _, e := range elist {
// notify is a helper function which handles sending notifications to either APNS or firebase
u, err := db.GetUserByHash(e.TransmissionRSAHash)
func (nb *Impl) notify(csv string, toNotify storage.GTNResult) {
if err != nil {
isAPNS := !strings.Contains(toNotify.Token, ":")
jww.ERROR.Printf("Failed to lookup user with tRSA hash %+v: %+v", e.TransmissionRSAHash, err)
// mutableContent := 1
continue
if isAPNS {
 
jww.INFO.Printf("Notifying ephemeral ID %+v via APNS to token %+v", toNotify.EphemeralId, toNotify.Token)
 
notifPayload := payload.NewPayload().AlertTitle("Privacy: protected!").AlertBody(
 
"Some notifications are not for you to ensure privacy; we hope to remove this notification soon").MutableContent().Custom(
 
notificationsTag, csv)
 
notif := &apns2.Notification{
 
CollapseID: base64.StdEncoding.EncodeToString(toNotify.TransmissionRSAHash),
 
DeviceToken: toNotify.Token,
 
Expiration: time.Now().Add(time.Hour * 24 * 7),
 
Priority: apns2.PriorityHigh,
 
Payload: notifPayload,
 
PushType: apns2.PushTypeAlert,
 
Topic: nb.apnsClient.GetTopic(),
}
}
resp, err := nb.apnsClient.Push(notif)
isAPNS := !strings.Contains(u.Token, ":")
if err != nil {
// mutableContent := 1
jww.ERROR.Printf("Failed to send notification via APNS: %+v: %+v", resp, err)
if isAPNS {
// TODO : Should be re-enabled for specific error cases? deep dive on apns docs may be helpful
jww.INFO.Printf("Notifying ephemeral ID %+v via APNS to token %+v", ephID, u.Token)
//err := db.DeleteUserByHash(u.TransmissionRSAHash)
notifPayload := payload.NewPayload().AlertTitle("Privacy: protected!").AlertBody(
//if err != nil {
"Some notifications are not for you to ensure privacy; we hope to remove this notification soon").MutableContent().Custom(
// return errors.WithMessagef(err, "Failed to remove user registration tRSA hash: %+v", u.TransmissionRSAHash)
notificationsTag, notificationsCSV)
//}
notif := &apns2.Notification{
CollapseID: base64.StdEncoding.EncodeToString(u.TransmissionRSAHash),
DeviceToken: u.Token,
Expiration: time.Now().Add(time.Hour * 24 * 7),
Priority: apns2.PriorityHigh,
Payload: notifPayload,
PushType: apns2.PushTypeAlert,
Topic: apnsClient.GetTopic(),
}
resp, err := apnsClient.Push(notif)
if err != nil {
jww.ERROR.Printf("Failed to send notification via APNS: %+v: %+v", resp, err)
// TODO : Should be re-enabled for specific error cases? deep dive on apns docs may be helpful
//err := db.DeleteUserByHash(u.TransmissionRSAHash)
//if err != nil {
// return errors.WithMessagef(err, "Failed to remove user registration tRSA hash: %+v", u.TransmissionRSAHash)
//}
} else {
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", ephID, u.Token, resp)
}
} else {
} else {
resp, err := fc.SendNotification(fc.Client, u.Token, notificationsCSV)
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", toNotify.EphemeralId, toNotify.Token, resp)
if err != nil {
}
// Catch firebase errors that we don't want to crash on
} else {
// 404 indicate that the token stored is incorrect
resp, err := nb.fcm.SendNotification(nb.fcm.Client, toNotify.Token, csv)
// this means rather than crashing we should log and unregister the user
if err != nil {
// 400 can also indicate incorrect token, do extra checking on this (12/27/2021)
// Catch firebase errors that we don't want to crash on
// Error documentation: https://firebase.google.com/docs/reference/fcm/rest/v1/ErrorCode
// 404 indicate that the token stored is incorrect
// Stale token documentation: https://firebase.google.com/docs/cloud-messaging/manage-tokens
// this means rather than crashing we should log and unregister the user
jww.ERROR.Printf("Error sending notification: %+v", err)
// 400 can also indicate incorrect token, do extra checking on this (12/27/2021)
invalidToken := strings.Contains(err.Error(), "400") &&
// Error documentation: https://firebase.google.com/docs/reference/fcm/rest/v1/ErrorCode
strings.Contains(err.Error(), "Invalid registration")
// Stale token documentation: https://firebase.google.com/docs/cloud-messaging/manage-tokens
if strings.Contains(err.Error(), "404") || invalidToken {
jww.ERROR.Printf("Error sending notification: %+v", err)
jww.ERROR.Printf("User with Transmission RSA hash %+v has invalid token, unregistering...", u.TransmissionRSAHash)
invalidToken := strings.Contains(err.Error(), "400") &&
err := db.DeleteUserByHash(u.TransmissionRSAHash)
strings.Contains(err.Error(), "Invalid registration")
if err != nil {
if strings.Contains(err.Error(), "404") || invalidToken {
jww.ERROR.Printf("Failed to remove user registration tRSA hash %+v: %+v", u.TransmissionRSAHash, err)
jww.ERROR.Printf("User with Transmission RSA hash %+v has invalid token, unregistering...", toNotify.TransmissionRSAHash)
}
err := nb.Storage.DeleteUserByHash(toNotify.TransmissionRSAHash)
} else {
if err != nil {
jww.ERROR.Printf("Failed to send notification to user with tRSA hash %+v: %+v", u.TransmissionRSAHash, err)
jww.ERROR.Printf("Failed to remove user registration tRSA hash %+v: %+v", toNotify.TransmissionRSAHash, err)
}
}
} else {
} else {
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", ephID, u.Token, resp)
jww.ERROR.Printf("Failed to send notification to user with tRSA hash %+v: %+v", toNotify.TransmissionRSAHash, err)
}
}
 
} else {
 
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", toNotify.EphemeralId, toNotify.Token, resp)
}
}
}
}
return overflow, nil
}
}
// RegisterForNotifications is called by the client, and adds a user registration to our database
// RegisterForNotifications is called by the client, and adds a user registration to our database
@@ -425,20 +417,27 @@ func (nb *Impl) Sender(sendFreq int) {
@@ -425,20 +417,27 @@ func (nb *Impl) Sender(sendFreq int) {
select {
select {
case <-sendTicker.C:
case <-sendTicker.C:
go func() {
go func() {
 
// Retreive & swap notification buffer
notifBuf := nb.Storage.GetNotificationBuffer()
notifBuf := nb.Storage.GetNotificationBuffer()
notifMap := notifBuf.Swap()
notifMap := notifBuf.Swap()
 
unsent := map[uint64][]*notifications.Data{}
unsent := map[uint64][]*notifications.Data{}
for ephID := range notifMap {
rest, err := nb.SendBatch(notifMap)
localEphID := ephID
if err != nil {
notifList := notifMap[localEphID]
jww.ERROR.Printf("Failed to send notification batch: %+v", err)
rest, err := nb.notifyFunc(localEphID, notifList, nb.apnsClient, nb.fcm, nb.Storage, nb.maxNotifications, nb.maxPayloadBytes)
// If we fail to run SendBatch, put everything back in unsent
if err != nil {
for _, elist := range notifMap {
jww.ERROR.Printf("Failed to notify %d: %+v", localEphID, err)
for _, n := range elist {
 
unsent[n.RoundID] = append(unsent[n.RoundID], n)
 
}
}
}
 
} else {
 
// Loop through rest and add to unsent map
for _, n := range rest {
for _, n := range rest {
unsent[n.RoundID] = append(unsent[n.RoundID], n)
unsent[n.RoundID] = append(unsent[n.RoundID], n)
}
}
}
}
 
// Re-add unsent notifications to the buffer
for rid, nd := range unsent {
for rid, nd := range unsent {
notifBuf.Add(id.Round(rid), nd)
notifBuf.Add(id.Round(rid), nd)
}
}
Loading