Skip to content
Snippets Groups Projects
Commit 4bc84faf authored by Jonah Husson's avatar Jonah Husson
Browse files

Fix tests, add mapimpl func

parent a81b0e52
No related branches found
No related tags found
2 merge requests!32Release,!25Restructure notification code for batching
......@@ -33,7 +33,6 @@ import (
"gitlab.com/xx_network/primitives/ndf"
"gitlab.com/xx_network/primitives/netTime"
"gitlab.com/xx_network/primitives/utils"
"gorm.io/gorm"
"strings"
"time"
)
......@@ -68,7 +67,6 @@ type Impl struct {
Comms *notificationBot.Comms
Storage *storage.Storage
inst *network.Instance
notifyFunc NotifyFunc
fcm *firebase.FirebaseComm
apnsClient *apns.ApnsComm
receivedNdf *uint32
......@@ -114,7 +112,6 @@ func StartNotifications(params Params, noTLS, noFirebase bool) (*Impl, error) {
receivedNdf := uint32(0)
impl := &Impl{
notifyFunc: notifyUser,
fcm: fbComm,
receivedNdf: &receivedNdf,
maxNotifications: params.NotificationsPerBatch,
......@@ -186,32 +183,34 @@ func NewImplementation(instance *Impl) *notificationBot.Implementation {
return impl
}
func (nb *Impl) SendBatch(data map[int64][]*notifications.Data, maxBatchSize, maxBytes int) error {
func (nb *Impl) SendBatch(data map[int64][]*notifications.Data) ([]*notifications.Data, error) {
csvs := map[int64]string{}
var ephemerals []int64
var unsent []*notifications.Data
for i, ilist := range data {
var overflow, trimmed []*notifications.Data
if len(data) > maxBatchSize {
overflow = ilist[maxBatchSize:]
trimmed = ilist[:maxBatchSize]
if len(data) > nb.maxNotifications {
overflow = ilist[nb.maxNotifications:]
trimmed = ilist[:nb.maxNotifications]
}
notifs, rest := notifications.BuildNotificationCSV(trimmed, maxBytes-len([]byte(notificationsTag)))
notifs, rest := notifications.BuildNotificationCSV(trimmed, nb.maxPayloadBytes-len([]byte(notificationsTag)))
for _, nd := range rest {
nb.Storage.GetNotificationBuffer().Add(id.Round(nd.RoundID), []*notifications.Data{nd}) // TODO build lists by rid for more efficient re-insertion? Accumulator would let us do this with the other size check in swap
}
overflow = append(overflow, rest...)
csvs[i] = string(notifs)
ephemerals = append(ephemerals, i)
unsent = append(unsent, overflow...)
}
toNotify, err := nb.Storage.GetToNotify(ephemerals)
if err != nil {
return errors.WithMessage(err, "Failed to get list of tokens to notify")
return nil, errors.WithMessage(err, "Failed to get list of tokens to notify")
}
for _, n := range toNotify {
nb.notify(n.Token, csvs[n.EphemeralId], n.EphemeralId, n.TransmissionRSAHash)
}
return nil
return unsent, nil
}
func (nb *Impl) notify(token, csv string, ephID int64, transmissionRSAHash []byte) {
......@@ -422,13 +421,15 @@ func (nb *Impl) Sender(sendFreq int) {
notifBuf := nb.Storage.GetNotificationBuffer()
notifMap := notifBuf.Swap()
unsent := map[uint64][]*notifications.Data{}
for ephID := range notifMap {
localEphID := ephID
notifList := notifMap[localEphID]
rest, err := nb.notifyFunc(localEphID, notifList, nb.apnsClient, nb.fcm, nb.Storage, nb.maxNotifications, nb.maxPayloadBytes)
rest, err := nb.SendBatch(notifMap)
if err != nil {
jww.ERROR.Printf("Failed to notify %d: %+v", localEphID, err)
jww.ERROR.Printf("Failed to send notification batch: %+v", err)
for _, elist := range notifMap {
for _, n := range elist {
unsent[n.RoundID] = append(unsent[n.RoundID], n)
}
}
} else {
for _, n := range rest {
unsent[n.RoundID] = append(unsent[n.RoundID], n)
}
......
......@@ -7,7 +7,6 @@ package notifications
import (
"fmt"
"github.com/pkg/errors"
"github.com/sideshow/apns2"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/crypto/hash"
......@@ -31,22 +30,39 @@ import (
var port = 4200
// Test notificationbot's notifyuser function
// this mocks the setup and send functions, and only tests the core logic of this function
func TestNotifyUser(t *testing.T) {
badsend := func(firebase.FBSender, string, string) (string, error) {
return "", errors.New("Failed")
func TestImpl_SendBatch(t *testing.T) {
// Init storage
s, err := storage.NewStorage("", "", "", "", "")
if err != nil {
t.Errorf("Failed to make new storage: %+v", err)
}
send := func(firebase.FBSender, string, string) (string, error) {
dchan := make(chan string, 10)
// Init mock firebase comms
//badsend := func(firebase.FBSender, string, string) (string, error) {
// return "", errors.New("Failed")
//}
send := func(s1 firebase.FBSender, s2 string, s3 string) (string, error) {
dchan <- s2
return "", nil
}
fcBadSend := firebase.NewMockFirebaseComm(t, badsend)
//fcBadSend := firebase.NewMockFirebaseComm(t, badsend)
fc := firebase.NewMockFirebaseComm(t, send)
s, err := storage.NewStorage("", "", "", "", "")
if err != nil {
t.Errorf("Failed to make new storage: %+v", err)
// Make apns comm object
ac := apns.NewApnsComm(apns2.NewTokenClient(nil), "")
// Create impl
i := Impl{
Storage: s,
fcm: fc,
apnsClient: ac,
roundStore: sync.Map{},
maxNotifications: 0,
maxPayloadBytes: 0,
}
// Identity setup
uid := id.NewIdFromString("zezima", id.User, t)
iid, err := ephemeral.GetIntermediaryId(uid)
if err != nil {
......@@ -55,7 +71,7 @@ func TestNotifyUser(t *testing.T) {
if err != nil {
t.Errorf("Could not parse precanned time: %v", err.Error())
}
u, err := s.AddUser(iid, []byte("rsacert"), []byte("sig"), ":token")
u, err := s.AddUser(iid, []byte("rsacert"), []byte("sig"), "fcm:token")
if err != nil {
t.Errorf("Failed to add fake user: %+v", err)
}
......@@ -64,24 +80,39 @@ func TestNotifyUser(t *testing.T) {
if err != nil {
t.Errorf("Failed to add latest ephemeral: %+v", err)
}
_, err = i.SendBatch(map[int64][]*notifications.Data{})
if err != nil {
t.Errorf("Error on sending empty batch: %+v", err)
}
ac := apns.NewApnsComm(apns2.NewTokenClient(nil), "")
_, err = notifyUser(eph.EphemeralId, []*notifications.Data{{
EphemeralID: eph.EphemeralId,
IdentityFP: nil,
MessageHash: nil,
}}, ac, fcBadSend, s, 20, 4096)
unsent, err := i.SendBatch(map[int64][]*notifications.Data{
eph.EphemeralId: {{EphemeralID: eph.EphemeralId, RoundID: 3, MessageHash: []byte("hello"), IdentityFP: []byte("identity")}},
})
if err != nil {
t.Errorf("This no longer returns an error but should print one to log - not testable but leaving the case")
t.Errorf("Error on sending small batch: %+v", err)
}
if len(unsent) < 1 {
t.Errorf("Should have received notification back as unsent, instead got %+v", unsent)
}
_, err = notifyUser(eph.EphemeralId, []*notifications.Data{{
EphemeralID: eph.EphemeralId,
IdentityFP: nil,
MessageHash: nil,
}}, ac, fc, s, 20, 4096)
i.maxPayloadBytes = 4096
i.maxNotifications = 20
unsent, err = i.SendBatch(map[int64][]*notifications.Data{
1: {{EphemeralID: eph.EphemeralId, RoundID: 3, MessageHash: []byte("hello"), IdentityFP: []byte("identity")}},
})
if err != nil {
t.Errorf("Failed to notify user properly")
t.Errorf("Error on sending small batch again: %+v", err)
}
if len(unsent) > 0 {
t.Errorf("Should have received notification back as unsent, instead got %+v", unsent)
}
timeout := time.NewTicker(3 * time.Second)
select {
case <-dchan:
t.Logf("Received on data chan!")
case <-timeout.C:
t.Errorf("Did not receive data before timeout")
}
}
......@@ -290,8 +321,6 @@ func TestImpl_UnregisterForNotifications(t *testing.T) {
// Happy path.
func TestImpl_ReceiveNotificationBatch(t *testing.T) {
dataChan := make(chan *notifications.Data)
s, err := storage.NewStorage("", "", "", "", "")
impl := &Impl{
Storage: s,
......@@ -299,10 +328,6 @@ func TestImpl_ReceiveNotificationBatch(t *testing.T) {
maxNotifications: 0,
maxPayloadBytes: 0,
}
impl.notifyFunc = func(eph int64, data []*notifications.Data, apns *apns.ApnsComm, fc *firebase.FirebaseComm, s *storage.Storage, batchSize, maxBytes int) ([]*notifications.Data, error) {
go func() { dataChan <- data[0] }()
return nil, nil
}
notifBatch := &pb.NotificationBatch{
RoundID: 42,
......
......@@ -15,6 +15,21 @@ import (
"gorm.io/gorm"
)
func (m *MapImpl) GetToNotify(ephemeralIds []int64) ([]GTNResult, error) {
var results []GTNResult
for _, eid := range ephemeralIds {
for _, eph := range m.ephemeralsById[eid] {
u := m.usersByRsaHash[string(eph.TransmissionRSAHash)]
results = append(results, GTNResult{
EphemeralId: eid,
TransmissionRSAHash: u.TransmissionRSA,
Token: u.Token,
})
}
}
return results, nil
}
// Obtain User from backend by primary key
func (m *MapImpl) GetUser(userId []byte) (*User, error) {
// Attempt to load from map
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment