diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go index 82c8b898e14c9e9b225a5d74141ec6e61925c8bd..8f513bdb948a61d11b7486eea80848a97a9876b0 100644 --- a/storage/utility/messageBuffer.go +++ b/storage/utility/messageBuffer.go @@ -203,11 +203,19 @@ func (mb *MessageBuffer) Add(m interface{}) interface{} { defer mb.mux.Unlock() // Ensure message does not already exist in buffer - if face1, exists1 := mb.messages[h]; exists1 { - return face1 + if _, exists1 := mb.messages[h]; exists1 { + msg, err := mb.handler.LoadMessage(mb.kv, MakeStoredMessageKey(mb.key, h)) + if err != nil { + jww.FATAL.Panicf("Error loading message %s: %v", h, err) + } + return msg } - if face2, exists2 := mb.processingMessages[h]; exists2 { - return face2 + if _, exists2 := mb.processingMessages[h]; exists2 { + msg, err := mb.handler.LoadMessage(mb.kv, MakeStoredMessageKey(mb.key, h)) + if err != nil { + jww.FATAL.Panicf("Error loading processing message %s: %v", h, err) + } + return msg } // Save message as versioned object