Skip to content
Snippets Groups Projects

Add a sync map for rounds received

+ 26
0
@@ -11,6 +11,7 @@ package notifications
@@ -11,6 +11,7 @@ package notifications
import (
import (
"encoding/base64"
"encoding/base64"
"gitlab.com/elixxir/notifications-bot/notifications/apns"
"gitlab.com/elixxir/notifications-bot/notifications/apns"
 
"sync"
// "github.com/jonahh-yeti/apns"
// "github.com/jonahh-yeti/apns"
"github.com/pkg/errors"
"github.com/pkg/errors"
@@ -65,6 +66,7 @@ type Impl struct {
@@ -65,6 +66,7 @@ type Impl struct {
fcm *firebase.FirebaseComm
fcm *firebase.FirebaseComm
apnsClient *apns.ApnsComm
apnsClient *apns.ApnsComm
receivedNdf *uint32
receivedNdf *uint32
 
roundStore sync.Map
ndfStopper Stopper
ndfStopper Stopper
}
}
@@ -149,6 +151,8 @@ func StartNotifications(params Params, noTLS, noFirebase bool) (*Impl, error) {
@@ -149,6 +151,8 @@ func StartNotifications(params Params, noTLS, noFirebase bool) (*Impl, error) {
i.SetGatewayAuthentication()
i.SetGatewayAuthentication()
impl.inst = i
impl.inst = i
 
go impl.Cleaner()
 
return impl, nil
return impl, nil
}
}
@@ -333,6 +337,14 @@ func (nb *Impl) ReceiveNotificationBatch(notifBatch *pb.NotificationBatch, auth
@@ -333,6 +337,14 @@ func (nb *Impl) ReceiveNotificationBatch(notifBatch *pb.NotificationBatch, auth
// return errors.New("Cannot receive notification data: client is not authenticated")
// return errors.New("Cannot receive notification data: client is not authenticated")
//}
//}
 
rid := notifBatch.RoundID
 
 
_, loaded := nb.roundStore.LoadOrStore(rid, time.Now())
 
if loaded {
 
jww.DEBUG.Printf("Dropping duplicate notification batch for round %+v", notifBatch.RoundID)
 
return nil
 
}
 
jww.INFO.Printf("Received notification batch for round %+v", notifBatch.RoundID)
jww.INFO.Printf("Received notification batch for round %+v", notifBatch.RoundID)
for _, notifData := range notifBatch.GetNotifications() {
for _, notifData := range notifBatch.GetNotifications() {
@@ -348,3 +360,17 @@ func (nb *Impl) ReceiveNotificationBatch(notifBatch *pb.NotificationBatch, auth
@@ -348,3 +360,17 @@ func (nb *Impl) ReceiveNotificationBatch(notifBatch *pb.NotificationBatch, auth
func (nb *Impl) ReceivedNdf() *uint32 {
func (nb *Impl) ReceivedNdf() *uint32 {
return nb.receivedNdf
return nb.receivedNdf
}
}
 
 
func (nb *Impl) Cleaner() {
 
for {
 
f := func(key, val interface{}) bool {
 
t := val.(time.Time)
 
if time.Since(t) > (5 * time.Minute) {
 
nb.roundStore.Delete(key)
 
}
 
return true
 
}
 
nb.roundStore.Range(f)
 
time.Sleep(time.Minute * 10)
 
}
 
}
Loading