Skip to content
Snippets Groups Projects

Restructure notification code for batching

Files

+ 69
75
@@ -186,93 +186,87 @@ func NewImplementation(instance *Impl) *notificationBot.Implementation {
return impl
}
// NotifyUser accepts a UID and service key file path.
// It handles the logic involved in retrieving a user's token and sending the notification
func notifyUser(ephID int64, data []*notifications.Data, apnsClient *apns.ApnsComm,
fc *firebase.FirebaseComm, db *storage.Storage, maxBatchSize, maxBytes int) ([]*notifications.Data, error) {
elist, err := db.GetEphemeral(ephID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
jww.DEBUG.Printf("No registration found for ephemeral ID %+v", ephID)
// This path is not an error. if no results are returned, the user hasn't registered for notifications
return nil, nil
func (nb *Impl) SendBatch(data map[int64][]*notifications.Data, maxBatchSize, maxBytes int) error {
csvs := map[int64]string{}
var ephemerals []int64
for i, ilist := range data {
var overflow, trimmed []*notifications.Data
if len(data) > maxBatchSize {
overflow = ilist[maxBatchSize:]
trimmed = ilist[:maxBatchSize]
}
notifs, rest := notifications.BuildNotificationCSV(trimmed, maxBytes-len([]byte(notificationsTag)))
for _, nd := range rest {
nb.Storage.GetNotificationBuffer().Add(id.Round(nd.RoundID), []*notifications.Data{nd}) // TODO build lists by rid for more efficient re-insertion? Accumulator would let us do this with the other size check in swap
}
return nil, errors.WithMessagef(err, "Could not retrieve registration for ephemeral ID %+v", ephID)
overflow = append(overflow, rest...)
csvs[i] = string(notifs)
ephemerals = append(ephemerals, i)
}
var overflow []*notifications.Data
if len(data) > maxBatchSize {
overflow = data[maxBatchSize:]
data = data[:maxBatchSize]
toNotify, err := nb.Storage.GetToNotify(ephemerals)
if err != nil {
return errors.WithMessage(err, "Failed to get list of tokens to notify")
}
notifs, rest := notifications.BuildNotificationCSV(data, maxBytes-len([]byte(notificationsTag)))
for _, nd := range rest {
db.GetNotificationBuffer().Add(id.Round(nd.RoundID), []*notifications.Data{nd}) // TODO build lists by rid for more efficient re-insertion? Accumulator would let us do this with the other size check in swap
for _, n := range toNotify {
nb.notify(n.Token, csvs[n.EphemeralId], n.EphemeralId, n.TransmissionRSAHash)
}
overflow = append(overflow, rest...)
notificationsCSV := string(notifs)
return nil
}
for _, e := range elist {
u, err := db.GetUserByHash(e.TransmissionRSAHash)
if err != nil {
jww.ERROR.Printf("Failed to lookup user with tRSA hash %+v: %+v", e.TransmissionRSAHash, err)
continue
func (nb *Impl) notify(token, csv string, ephID int64, transmissionRSAHash []byte) {
isAPNS := !strings.Contains(token, ":")
// mutableContent := 1
if isAPNS {
jww.INFO.Printf("Notifying ephemeral ID %+v via APNS to token %+v", ephID, token)
notifPayload := payload.NewPayload().AlertTitle("Privacy: protected!").AlertBody(
"Some notifications are not for you to ensure privacy; we hope to remove this notification soon").MutableContent().Custom(
notificationsTag, csv)
notif := &apns2.Notification{
CollapseID: base64.StdEncoding.EncodeToString(transmissionRSAHash),
DeviceToken: token,
Expiration: time.Now().Add(time.Hour * 24 * 7),
Priority: apns2.PriorityHigh,
Payload: notifPayload,
PushType: apns2.PushTypeAlert,
Topic: nb.apnsClient.GetTopic(),
}
isAPNS := !strings.Contains(u.Token, ":")
// mutableContent := 1
if isAPNS {
jww.INFO.Printf("Notifying ephemeral ID %+v via APNS to token %+v", ephID, u.Token)
notifPayload := payload.NewPayload().AlertTitle("Privacy: protected!").AlertBody(
"Some notifications are not for you to ensure privacy; we hope to remove this notification soon").MutableContent().Custom(
notificationsTag, notificationsCSV)
notif := &apns2.Notification{
CollapseID: base64.StdEncoding.EncodeToString(u.TransmissionRSAHash),
DeviceToken: u.Token,
Expiration: time.Now().Add(time.Hour * 24 * 7),
Priority: apns2.PriorityHigh,
Payload: notifPayload,
PushType: apns2.PushTypeAlert,
Topic: apnsClient.GetTopic(),
}
resp, err := apnsClient.Push(notif)
if err != nil {
jww.ERROR.Printf("Failed to send notification via APNS: %+v: %+v", resp, err)
// TODO : Should be re-enabled for specific error cases? deep dive on apns docs may be helpful
//err := db.DeleteUserByHash(u.TransmissionRSAHash)
//if err != nil {
// return errors.WithMessagef(err, "Failed to remove user registration tRSA hash: %+v", u.TransmissionRSAHash)
//}
} else {
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", ephID, u.Token, resp)
}
resp, err := nb.apnsClient.Push(notif)
if err != nil {
jww.ERROR.Printf("Failed to send notification via APNS: %+v: %+v", resp, err)
// TODO : Should be re-enabled for specific error cases? deep dive on apns docs may be helpful
//err := db.DeleteUserByHash(u.TransmissionRSAHash)
//if err != nil {
// return errors.WithMessagef(err, "Failed to remove user registration tRSA hash: %+v", u.TransmissionRSAHash)
//}
} else {
resp, err := fc.SendNotification(fc.Client, u.Token, notificationsCSV)
if err != nil {
// Catch firebase errors that we don't want to crash on
// 404 indicate that the token stored is incorrect
// this means rather than crashing we should log and unregister the user
// 400 can also indicate incorrect token, do extra checking on this (12/27/2021)
// Error documentation: https://firebase.google.com/docs/reference/fcm/rest/v1/ErrorCode
// Stale token documentation: https://firebase.google.com/docs/cloud-messaging/manage-tokens
jww.ERROR.Printf("Error sending notification: %+v", err)
invalidToken := strings.Contains(err.Error(), "400") &&
strings.Contains(err.Error(), "Invalid registration")
if strings.Contains(err.Error(), "404") || invalidToken {
jww.ERROR.Printf("User with Transmission RSA hash %+v has invalid token, unregistering...", u.TransmissionRSAHash)
err := db.DeleteUserByHash(u.TransmissionRSAHash)
if err != nil {
jww.ERROR.Printf("Failed to remove user registration tRSA hash %+v: %+v", u.TransmissionRSAHash, err)
}
} else {
jww.ERROR.Printf("Failed to send notification to user with tRSA hash %+v: %+v", u.TransmissionRSAHash, err)
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", ephID, token, resp)
}
} else {
resp, err := nb.fcm.SendNotification(nb.fcm.Client, token, csv)
if err != nil {
// Catch firebase errors that we don't want to crash on
// 404 indicate that the token stored is incorrect
// this means rather than crashing we should log and unregister the user
// 400 can also indicate incorrect token, do extra checking on this (12/27/2021)
// Error documentation: https://firebase.google.com/docs/reference/fcm/rest/v1/ErrorCode
// Stale token documentation: https://firebase.google.com/docs/cloud-messaging/manage-tokens
jww.ERROR.Printf("Error sending notification: %+v", err)
invalidToken := strings.Contains(err.Error(), "400") &&
strings.Contains(err.Error(), "Invalid registration")
if strings.Contains(err.Error(), "404") || invalidToken {
jww.ERROR.Printf("User with Transmission RSA hash %+v has invalid token, unregistering...", transmissionRSAHash)
err := nb.Storage.DeleteUserByHash(transmissionRSAHash)
if err != nil {
jww.ERROR.Printf("Failed to remove user registration tRSA hash %+v: %+v", transmissionRSAHash, err)
}
} else {
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", ephID, u.Token, resp)
jww.ERROR.Printf("Failed to send notification to user with tRSA hash %+v: %+v", transmissionRSAHash, err)
}
} else {
jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", ephID, token, resp)
}
}
return overflow, nil
}
// RegisterForNotifications is called by the client, and adds a user registration to our database
Loading