Skip to content
Snippets Groups Projects
Commit 798d786a authored by Jonah Husson's avatar Jonah Husson
Browse files

Comment changes

parent 4bc84faf
No related branches found
No related tags found
2 merge requests!32Release,!25Restructure notification code for batching
......@@ -196,7 +196,7 @@ func (nb *Impl) SendBatch(data map[int64][]*notifications.Data) ([]*notification
notifs, rest := notifications.BuildNotificationCSV(trimmed, nb.maxPayloadBytes-len([]byte(notificationsTag)))
for _, nd := range rest {
nb.Storage.GetNotificationBuffer().Add(id.Round(nd.RoundID), []*notifications.Data{nd}) // TODO build lists by rid for more efficient re-insertion? Accumulator would let us do this with the other size check in swap
nb.Storage.GetNotificationBuffer().Add(id.Round(nd.RoundID), []*notifications.Data{nd})
}
overflow = append(overflow, rest...)
csvs[i] = string(notifs)
......@@ -418,22 +418,27 @@ func (nb *Impl) Sender(sendFreq int) {
select {
case <-sendTicker.C:
go func() {
// Retreive & swap notification buffer
notifBuf := nb.Storage.GetNotificationBuffer()
notifMap := notifBuf.Swap()
unsent := map[uint64][]*notifications.Data{}
rest, err := nb.SendBatch(notifMap)
if err != nil {
jww.ERROR.Printf("Failed to send notification batch: %+v", err)
// If we fail to run SendBatch, put everything back in unsent
for _, elist := range notifMap {
for _, n := range elist {
unsent[n.RoundID] = append(unsent[n.RoundID], n)
}
}
} else {
// Loop through rest and add to unsent map
for _, n := range rest {
unsent[n.RoundID] = append(unsent[n.RoundID], n)
}
}
// Re-add unsent notifications to the buffer
for rid, nd := range unsent {
notifBuf.Add(id.Round(rid), nd)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment