diff --git a/README.md b/README.md index 3282a035ed29e2863a8c7756e0851a808c2cbf69..3ed4e0cc137593d5e542ab08a9b4a037cc1df1a0 100644 --- a/README.md +++ b/README.md @@ -42,5 +42,9 @@ apnsKeyID: "" apnsIssuer: "" apnsBundleID: "" apnsDev: true + +# Notification params +notificationRate: 30 # Duration in seconds +notificationsPerBatch: 20 # === END YAML ``` diff --git a/cmd/root.go b/cmd/root.go index 2acf7824f0d4bd0ee23981986b1286dd2f257eb7..8707c4cd34107ad978dc5fb7a75f62df2cad53ea 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -69,13 +69,16 @@ var rootCmd = &cobra.Command{ if err != nil { jww.FATAL.Panicf("Unable to expand apns key path: %+v", err) } - + viper.SetDefault("notificationRate", 30) + viper.SetDefault("notificationsPerBatch", 20) // Populate params NotificationParams = notifications.Params{ - Address: localAddress, - CertPath: certPath, - KeyPath: keyPath, - FBCreds: fbCreds, + Address: localAddress, + CertPath: certPath, + KeyPath: keyPath, + FBCreds: fbCreds, + NotificationRate: viper.GetInt("notificationRate"), + NotificationsPerBatch: viper.GetInt("notificationsPerBatch"), APNS: notifications.APNSParams{ KeyPath: apnsKeyPath, KeyID: viper.GetString("apnsKeyID"), diff --git a/go.mod b/go.mod index 28c874524e4a1d261acc3b06f09878ba0c215de2..227d9aaad94b2fcf55a75b09db4143df08e234e4 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/viper v1.7.0 github.com/stretchr/testify v1.7.0 // indirect - gitlab.com/elixxir/comms v0.0.4-0.20211220224127-670d882e4067 + gitlab.com/elixxir/comms v0.0.4-0.20211222154743-2f5bc6365f8d gitlab.com/elixxir/crypto v0.0.7-0.20211220224022-1f518df56c0f gitlab.com/xx_network/comms v0.0.4-0.20211220223937-660809b69338 gitlab.com/xx_network/crypto v0.0.5-0.20211220223913-6088f05a1191 diff --git a/go.sum b/go.sum index 339bf9451a925704cd7165ee7a3824ebdec69d42..afdf054208ebdc89e40dfcda7d7b7b2688297196 100644 --- a/go.sum +++ b/go.sum @@ -381,8 +381,8 @@ github.com/zeebo/pcg v0.0.0-20181207190024-3cdc6b625a05/go.mod h1:Gr+78ptB0MwXxm github.com/zeebo/pcg v1.0.0 h1:dt+dx+HvX8g7Un32rY9XWoYnd0NmKmrIzpHF7qiTDj0= github.com/zeebo/pcg v1.0.0/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= -gitlab.com/elixxir/comms v0.0.4-0.20211220224127-670d882e4067 h1:FlZjW7xdzFkivwTu+D/ZSvH7JLqDYEICuf7OPePNs6U= -gitlab.com/elixxir/comms v0.0.4-0.20211220224127-670d882e4067/go.mod h1:kvn6jAHZcRTXYdo9LvZDbO+Nw8V7Gn749fpbyUYlSN8= +gitlab.com/elixxir/comms v0.0.4-0.20211222154743-2f5bc6365f8d h1:jcEIpFn2DISLNe1nWiCocJCBgd81KUoxVCe4EVP+QUk= +gitlab.com/elixxir/comms v0.0.4-0.20211222154743-2f5bc6365f8d/go.mod h1:kvn6jAHZcRTXYdo9LvZDbO+Nw8V7Gn749fpbyUYlSN8= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp07WQmhA= gitlab.com/elixxir/crypto v0.0.7-0.20211220224022-1f518df56c0f h1:/VKJYB++EcVy5WvCRm/yIOWHmtqNYDodWZiNKc4bsXg= diff --git a/notifications/ephemeral_test.go b/notifications/ephemeral_test.go index 8a8d3d7b1db8c1d67f7869a2d86c29389e4fcf4d..212e84e86ab4a5ce03a87d445fb47ab81f5f2525 100644 --- a/notifications/ephemeral_test.go +++ b/notifications/ephemeral_test.go @@ -65,10 +65,12 @@ func TestImpl_InitCreator(t *testing.T) { t.FailNow() } impl, err := StartNotifications(Params{ - Address: "", - CertPath: "", - KeyPath: "", - FBCreds: "", + NotificationsPerBatch: 20, + NotificationRate: 30, + Address: "", + CertPath: "", + KeyPath: "", + FBCreds: "", }, true, true) if err != nil { t.Errorf("Failed to create impl: %+v", err) diff --git a/notifications/firebase/fcm.go b/notifications/firebase/fcm.go index 94bbfb4d0eb0a0af00c0cd7f7badc82bee0ad183..0c79f568258acba70c5b5e4fc5fb22549bb7f2cd 100644 --- a/notifications/firebase/fcm.go +++ b/notifications/firebase/fcm.go @@ -6,10 +6,8 @@ package firebase import ( - "encoding/base64" "firebase.google.com/go/messaging" "github.com/pkg/errors" - "gitlab.com/elixxir/comms/mixmessages" "testing" "time" @@ -21,7 +19,7 @@ import ( ) // function types for use in notificationsbot struct -type SendFunc func(FBSender, string, *mixmessages.NotificationData) (string, error) +type SendFunc func(FBSender, string, string) (string, error) // FirebaseComm is a struct which holds the functions to setup the messaging app and sending notifications // Using a struct in this manner allows us to properly unit test the NotifyUser function @@ -74,13 +72,12 @@ func SetupMessagingApp(serviceKeyPath string) (*messaging.Client, error) { // sendNotification accepts a registration token and service account file // It gets the proper infrastructure, then builds & sends a notification through the firebase admin API // returns string, error (string is of dubious use, but is returned for the time being) -func sendNotification(app FBSender, token string, data *mixmessages.NotificationData) (string, error) { +func sendNotification(app FBSender, token string, data string) (string, error) { ctx := context.Background() ttl := 7 * 24 * time.Hour message := &messaging.Message{ Data: map[string]string{ - "messagehash": base64.StdEncoding.EncodeToString(data.MessageHash), - "identityfingerprint": base64.StdEncoding.EncodeToString(data.IdentityFP), + "notificationsTag": data, }, Android: &messaging.AndroidConfig{ Priority: "high", diff --git a/notifications/firebase/fcm_test.go b/notifications/firebase/fcm_test.go index 08650bbdc49e3a9401a13a0debe9c0383695235a..16085467edb5136da4d9fcda44d5a37a1510f912 100644 --- a/notifications/firebase/fcm_test.go +++ b/notifications/firebase/fcm_test.go @@ -8,7 +8,6 @@ package firebase import ( "context" "firebase.google.com/go/messaging" - "gitlab.com/elixxir/comms/mixmessages" "testing" ) @@ -25,11 +24,7 @@ func (MockSender) Send(ctx context.Context, app *messaging.Message) (string, err func TestSendNotification(t *testing.T) { app := MockSender{} - _, err := sendNotification(app, token, &mixmessages.NotificationData{ - EphemeralID: 12345, - IdentityFP: []byte("testfp"), - MessageHash: []byte("testmsghash"), - }) + _, err := sendNotification(app, token, "data") if err != nil { t.Error(err.Error()) } diff --git a/notifications/notifications.go b/notifications/notifications.go index 72a5e282b1a4c177f7153a96daa200183376a102..827de1abcaa9df4a1c5830141387fbb82876be98 100644 --- a/notifications/notifications.go +++ b/notifications/notifications.go @@ -38,16 +38,20 @@ import ( "time" ) +const notificationsTag = "notificationData" + // Function type definitions for the main operations (poll and notify) -type NotifyFunc func(*pb.NotificationData, *apns.ApnsComm, *firebase.FirebaseComm, *storage.Storage) error +type NotifyFunc func(int64, []*pb.NotificationData, *apns.ApnsComm, *firebase.FirebaseComm, *storage.Storage) error // Params struct holds info passed in for configuration type Params struct { - Address string - CertPath string - KeyPath string - FBCreds string - APNS APNSParams + Address string + CertPath string + KeyPath string + FBCreds string + NotificationsPerBatch int + NotificationRate int + APNS APNSParams } type APNSParams struct { KeyPath string @@ -59,14 +63,15 @@ type APNSParams struct { // Local impl for notifications; holds comms, storage object, creds and main functions type Impl struct { - Comms *notificationBot.Comms - Storage *storage.Storage - inst *network.Instance - notifyFunc NotifyFunc - fcm *firebase.FirebaseComm - apnsClient *apns.ApnsComm - receivedNdf *uint32 - roundStore sync.Map + Comms *notificationBot.Comms + Storage *storage.Storage + inst *network.Instance + notifyFunc NotifyFunc + fcm *firebase.FirebaseComm + apnsClient *apns.ApnsComm + receivedNdf *uint32 + roundStore sync.Map + maxNotifications int ndfStopper Stopper } @@ -106,9 +111,10 @@ func StartNotifications(params Params, noTLS, noFirebase bool) (*Impl, error) { receivedNdf := uint32(0) impl := &Impl{ - notifyFunc: notifyUser, - fcm: fbComm, - receivedNdf: &receivedNdf, + notifyFunc: notifyUser, + fcm: fbComm, + receivedNdf: &receivedNdf, + maxNotifications: params.NotificationsPerBatch, } if params.APNS.KeyPath == "" { @@ -152,6 +158,7 @@ func StartNotifications(params Params, noTLS, noFirebase bool) (*Impl, error) { impl.inst = i go impl.Cleaner() + go impl.Sender(params.NotificationRate) return impl, nil } @@ -177,15 +184,15 @@ func NewImplementation(instance *Impl) *notificationBot.Implementation { // NotifyUser accepts a UID and service key file path. // It handles the logic involved in retrieving a user's token and sending the notification -func notifyUser(data *pb.NotificationData, apnsClient *apns.ApnsComm, fc *firebase.FirebaseComm, db *storage.Storage) error { - elist, err := db.GetEphemeral(data.EphemeralID) +func notifyUser(ephID int64, data []*pb.NotificationData, apnsClient *apns.ApnsComm, fc *firebase.FirebaseComm, db *storage.Storage) error { + elist, err := db.GetEphemeral(ephID) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { - jww.DEBUG.Printf("No registration found for ephemeral ID %+v", data.EphemeralID) + jww.DEBUG.Printf("No registration found for ephemeral ID %+v", ephID) // This path is not an error. if no results are returned, the user hasn't registered for notifications return nil } - return errors.WithMessagef(err, "Could not retrieve registration for ephemeral ID %+v", data.EphemeralID) + return errors.WithMessagef(err, "Could not retrieve registration for ephemeral ID %+v", ephID) } for _, e := range elist { u, err := db.GetUserByHash(e.TransmissionRSAHash) @@ -193,14 +200,15 @@ func notifyUser(data *pb.NotificationData, apnsClient *apns.ApnsComm, fc *fireba return errors.WithMessagef(err, "Failed to lookup user with tRSA hash %+v", e.TransmissionRSAHash) } + notificationsCSV := pb.MakeNotificationsCSV(data) + isAPNS := !strings.Contains(u.Token, ":") // mutableContent := 1 if isAPNS { - jww.INFO.Printf("Notifying ephemeral ID %+v via APNS to token %+v", data.EphemeralID, u.Token) + jww.INFO.Printf("Notifying ephemeral ID %+v via APNS to token %+v", ephID, u.Token) notifPayload := payload.NewPayload().AlertTitle("Privacy: protected!").AlertBody( "Some notifications are not for you to ensure privacy; we hope to remove this notification soon").MutableContent().Custom( - "messagehash", base64.StdEncoding.EncodeToString(data.MessageHash)).Custom( - "identityfingerprint", base64.StdEncoding.EncodeToString(data.IdentityFP)) + notificationsTag, notificationsCSV) notif := &apns2.Notification{ CollapseID: base64.StdEncoding.EncodeToString(u.TransmissionRSAHash), DeviceToken: u.Token, @@ -219,10 +227,10 @@ func notifyUser(data *pb.NotificationData, apnsClient *apns.ApnsComm, fc *fireba // return errors.WithMessagef(err, "Failed to remove user registration tRSA hash: %+v", u.TransmissionRSAHash) //} } else { - jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", data.EphemeralID, u.Token, resp) + jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", ephID, u.Token, resp) } } else { - resp, err := fc.SendNotification(fc.Client, u.Token, data) + resp, err := fc.SendNotification(fc.Client, u.Token, notificationsCSV) if err != nil { // Catch two firebase errors that we don't want to crash on // 400 and 404 indicate that the token stored is incorrect @@ -240,7 +248,7 @@ func notifyUser(data *pb.NotificationData, apnsClient *apns.ApnsComm, fc *fireba return errors.WithMessagef(err, "Failed to send notification to user with tRSA hash %+v", u.TransmissionRSAHash) } } - jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", data.EphemeralID, u.Token, resp) + jww.INFO.Printf("Notified ephemeral ID %+v [%+v] and received response %+v", ephID, u.Token, resp) } } return nil @@ -347,11 +355,10 @@ func (nb *Impl) ReceiveNotificationBatch(notifBatch *pb.NotificationBatch, auth jww.INFO.Printf("Received notification batch for round %+v", notifBatch.RoundID) + buffer := nb.Storage.GetNotificationBuffer() + for _, notifData := range notifBatch.GetNotifications() { - err := nb.notifyFunc(notifData, nb.apnsClient, nb.fcm, nb.Storage) - if err != nil { - return err - } + buffer.Add(notifData) } return nil @@ -362,15 +369,41 @@ func (nb *Impl) ReceivedNdf() *uint32 { } func (nb *Impl) Cleaner() { + cleanF := func(key, val interface{}) bool { + t := val.(time.Time) + if time.Since(t) > (5 * time.Minute) { + nb.roundStore.Delete(key) + } + return true + } + + cleanTicker := time.NewTicker(time.Minute * 10) + + for { + select { + case <-cleanTicker.C: + nb.roundStore.Range(cleanF) + } + } +} + +func (nb *Impl) Sender(sendFreq int) { + sendTicker := time.NewTicker(time.Duration(sendFreq) * time.Second) for { - f := func(key, val interface{}) bool { - t := val.(time.Time) - if time.Since(t) > (5 * time.Minute) { - nb.roundStore.Delete(key) + select { + case <-sendTicker.C: + notifBuf := nb.Storage.GetNotificationBuffer() + notifMap := notifBuf.Swap(uint(nb.maxNotifications)) + for ephID := range notifMap { + localEphID := ephID + notifList := notifMap[localEphID] + go func() { + err := nb.notifyFunc(localEphID, notifList, nb.apnsClient, nb.fcm, nb.Storage) + if err != nil { + jww.ERROR.Printf("Failed to notify %d: %+v", localEphID, err) + } + }() } - return true } - nb.roundStore.Range(f) - time.Sleep(time.Minute * 10) } } diff --git a/notifications/notifications_test.go b/notifications/notifications_test.go index b2d9de848ced32e0be43f6cad0ee17d3c09bf80e..8b2cefc71ec507c672d035a0a5f21cb56e340def 100644 --- a/notifications/notifications_test.go +++ b/notifications/notifications_test.go @@ -22,7 +22,7 @@ import ( "gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/utils" "os" - "reflect" + "strings" "testing" "time" ) @@ -32,10 +32,10 @@ var port = 4200 // Test notificationbot's notifyuser function // this mocks the setup and send functions, and only tests the core logic of this function func TestNotifyUser(t *testing.T) { - badsend := func(firebase.FBSender, string, *pb.NotificationData) (string, error) { + badsend := func(firebase.FBSender, string, string) (string, error) { return "", errors.New("Failed") } - send := func(firebase.FBSender, string, *pb.NotificationData) (string, error) { + send := func(firebase.FBSender, string, string) (string, error) { return "", nil } fcBadSend := firebase.NewMockFirebaseComm(t, badsend) @@ -64,20 +64,20 @@ func TestNotifyUser(t *testing.T) { } ac := apns.NewApnsComm(apns2.NewTokenClient(nil), "") - err = notifyUser(&pb.NotificationData{ + err = notifyUser(eph.EphemeralId, []*pb.NotificationData{{ EphemeralID: eph.EphemeralId, IdentityFP: nil, MessageHash: nil, - }, ac, fcBadSend, s) + }}, ac, fcBadSend, s) if err == nil { t.Errorf("Should have returned an error") } - err = notifyUser(&pb.NotificationData{ + err = notifyUser(eph.EphemeralId, []*pb.NotificationData{{ EphemeralID: eph.EphemeralId, IdentityFP: nil, MessageHash: nil, - }, ac, fc, s) + }}, ac, fc, s) if err != nil { t.Errorf("Failed to notify user properly") } @@ -85,35 +85,37 @@ func TestNotifyUser(t *testing.T) { // Unit test for startnotifications // tests logic including error cases -//func TestStartNotifications(t *testing.T) { -// wd, err := os.Getwd() -// if err != nil { -// t.Errorf("Failed to get working dir: %+v", err) -// return -// } -// -// params := Params{ -// Address: "0.0.0.0:42010", -// APNS: APNSParams{ -// KeyPath: wd + "/../testutil/apnsKey.key", -// KeyID: "WQT68265C5", -// Issuer: "S6JDM2WW29", -// BundleID: "io.xxlabs.messenger", -// }, -// } -// -// params.KeyPath = wd + "/../testutil/cmix.rip.key" -// _, err = StartNotifications(params, false, true) -// if err == nil || !strings.Contains(err.Error(), "failed to read certificate at") { -// t.Errorf("Should have thrown an error for no cert path") -// } -// -// params.CertPath = wd + "/../testutil/cmix.rip.crt" -// _, err = StartNotifications(params, false, true) -// if err != nil { -// t.Errorf("Failed to start notifications successfully: %+v", err) -// } -//} +func TestStartNotifications(t *testing.T) { + wd, err := os.Getwd() + if err != nil { + t.Errorf("Failed to get working dir: %+v", err) + return + } + + params := Params{ + Address: "0.0.0.0:42010", + NotificationsPerBatch: 20, + NotificationRate: 30, + APNS: APNSParams{ + KeyPath: "", + KeyID: "WQT68265C5", + Issuer: "S6JDM2WW29", + BundleID: "io.xxlabs.messenger", + }, + } + + params.KeyPath = wd + "/../testutil/cmix.rip.key" + _, err = StartNotifications(params, false, true) + if err == nil || !strings.Contains(err.Error(), "failed to read certificate at") { + t.Errorf("Should have thrown an error for no cert path") + } + + params.CertPath = wd + "/../testutil/cmix.rip.crt" + _, err = StartNotifications(params, false, true) + if err != nil { + t.Errorf("Failed to start notifications successfully: %+v", err) + } +} // unit test for newimplementation // tests logic and error cases @@ -288,8 +290,8 @@ func TestImpl_UnregisterForNotifications(t *testing.T) { func TestImpl_ReceiveNotificationBatch(t *testing.T) { impl := getNewImpl() dataChan := make(chan *pb.NotificationData) - impl.notifyFunc = func(data *pb.NotificationData, apns *apns.ApnsComm, fc *firebase.FirebaseComm, s *storage.Storage) error { - go func() { dataChan <- data }() + impl.notifyFunc = func(eph int64, data []*pb.NotificationData, apns *apns.ApnsComm, fc *firebase.FirebaseComm, s *storage.Storage) error { + go func() { dataChan <- data[0] }() return nil } @@ -313,14 +315,9 @@ func TestImpl_ReceiveNotificationBatch(t *testing.T) { t.Errorf("ReceiveNotificationBatch() returned an error: %+v", err) } - select { - case result := <-dataChan: - if !reflect.DeepEqual(notifBatch.Notifications[0], result) { - t.Errorf("Failed to receive expected NotificationData."+ - "\nexpected: %s\nreceived: %s", notifBatch.Notifications[0], result) - } - case <-time.NewTimer(50 * time.Millisecond).C: - t.Error("Timed out while waiting for NotificationData.") + nbm := impl.Storage.GetNotificationBuffer().Swap(20) + if len(nbm[5]) < 1 { + t.Errorf("Notification was not added to notification buffer") } } @@ -328,12 +325,15 @@ func TestImpl_ReceiveNotificationBatch(t *testing.T) { func getNewImpl() *Impl { wd, _ := os.Getwd() params := Params{ - Address: fmt.Sprintf("0.0.0.0:%d", port), - KeyPath: wd + "/../testutil/cmix.rip.key", - CertPath: wd + "/../testutil/cmix.rip.crt", - FBCreds: "", + NotificationsPerBatch: 20, + NotificationRate: 30, + Address: fmt.Sprintf("0.0.0.0:%d", port), + KeyPath: wd + "/../testutil/cmix.rip.key", + CertPath: wd + "/../testutil/cmix.rip.crt", + FBCreds: "", } port += 1 instance, _ := StartNotifications(params, false, true) + instance.Storage, _ = storage.NewStorage("", "", "", "", "") return instance } diff --git a/storage/buffer.go b/storage/buffer.go new file mode 100644 index 0000000000000000000000000000000000000000..7edcfa092af59344e0cf8c185aa74da7a7a34b3e --- /dev/null +++ b/storage/buffer.go @@ -0,0 +1,55 @@ +package storage + +import ( + pb "gitlab.com/elixxir/comms/mixmessages" + "strconv" + "sync" + "sync/atomic" +) + +type NotificationBuffer struct { + bufMap atomic.Value + counter *int64 +} + +func NewNotificationBuffer() *NotificationBuffer { + u := int64(0) + sm := sync.Map{} + nb := &NotificationBuffer{ + counter: &u, + bufMap: atomic.Value{}, + } + nb.bufMap.Store(&sm) + return nb +} + +func (bnm *NotificationBuffer) Swap(maxNotifications uint) map[int64][]*pb.NotificationData { + newSM := &sync.Map{} + m := bnm.bufMap.Swap(newSM).(*sync.Map) + + outMap := make(map[int64][]*pb.NotificationData) + f := func(_, value interface{}) bool { + n := value.(*pb.NotificationData) + nSlice, exists := outMap[n.EphemeralID] + if exists { + if uint(len(nSlice)) >= maxNotifications { + bnm.Add(n) + } else { + nSlice = append(nSlice, n) + } + } else { + nSlice = []*pb.NotificationData{n} + } + outMap[n.EphemeralID] = nSlice + return true + } + + m.Range(f) + + return outMap +} + +func (bnm *NotificationBuffer) Add(n *pb.NotificationData) { + c := atomic.AddInt64(bnm.counter, 1) + bnm.bufMap.Load().(*sync.Map).Store(strconv.FormatInt(c, 16), n) +} diff --git a/storage/storage.go b/storage/storage.go index 87641059be7d93a9d679bcfaac481f01061fc5d6..0272cf399164ed4148c27ac0641233db3e5b194b 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -11,13 +11,15 @@ import ( type Storage struct { database + notificationBuffer *NotificationBuffer } // Create a new Storage object wrapping a database interface // Returns a Storage object and error func NewStorage(username, password, dbName, address, port string) (*Storage, error) { db, err := newDatabase(username, password, dbName, address, port) - storage := &Storage{db} + nb := NewNotificationBuffer() + storage := &Storage{db, nb} return storage, err } @@ -117,3 +119,7 @@ func (s *Storage) AddEphemeralsForOffset(offset int64, epoch int32, size uint, t } return nil } + +func (s *Storage) GetNotificationBuffer()*NotificationBuffer { + return s.notificationBuffer +}