Skip to content
Snippets Groups Projects
Commit 76728af6 authored by Jake Taylor's avatar Jake Taylor :lips:
Browse files

made several changes in an attempt for solidify behavior of receiveHelper

parent c5c6120d
No related branches found
No related tags found
2 merge requests!88made several changes in an attempt for solidify behavior of receiveHelper,!67fix for latest client release
......@@ -13,8 +13,6 @@ import (
"crypto/ed25519"
"encoding/json"
"strconv"
"strings"
"sync"
"syscall/js"
"time"
......@@ -36,13 +34,14 @@ import (
// wasmModel implements [channels.EventModel] interface, which uses the channels
// system passed an object that adheres to in order to get events on the
// channel.
// NOTE: This model is NOT thread safe - it is the responsibility of the
// caller to ensure that its methods are called sequentially.
type wasmModel struct {
db *idb.Database
cipher cryptoChannel.Cipher
receivedMessageCB wChannels.MessageReceivedCallback
deletedMessageCB wChannels.DeletedMessageCallback
mutedUserCB wChannels.MutedUserCallback
updateMux sync.Mutex
}
// JoinChannel is called whenever a channel is joined locally.
......@@ -166,7 +165,7 @@ func (w *wasmModel) ReceiveMessage(channelID *id.ID, messageID message.ID,
textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType,
false, hidden, status)
uuid, err := w.receiveHelper(msgToInsert, false)
uuid, err := w.upsertMessage(msgToInsert)
if err != nil {
jww.ERROR.Printf("Failed to receive Message: %+v", err)
}
......@@ -202,7 +201,7 @@ func (w *wasmModel) ReceiveReply(channelID *id.ID, messageID,
replyTo.Bytes(), nickname, textBytes, pubKey, dmToken, codeset,
timestamp, lease, round.ID, mType, hidden, false, status)
uuid, err := w.receiveHelper(msgToInsert, false)
uuid, err := w.upsertMessage(msgToInsert)
if err != nil {
jww.ERROR.Printf("Failed to receive reply: %+v", err)
......@@ -239,7 +238,7 @@ func (w *wasmModel) ReceiveReaction(channelID *id.ID, messageID,
textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType,
false, hidden, status)
uuid, err := w.receiveHelper(msgToInsert, false)
uuid, err := w.upsertMessage(msgToInsert)
if err != nil {
jww.ERROR.Printf("Failed to receive reaction: %+v", err)
}
......@@ -257,24 +256,25 @@ func (w *wasmModel) UpdateFromUUID(uuid uint64, messageID *message.ID,
status *channels.SentStatus) {
parentErr := errors.New("failed to UpdateFromUUID")
// FIXME: this is a bit of race condition without the mux.
// This should be done via the transactions (i.e., make a
// special version of receiveHelper)
w.updateMux.Lock()
defer w.updateMux.Unlock()
// Convert messageID to the key generated by json.Marshal
key := js.ValueOf(uuid)
// Use the key to get the existing Message
currentMsg, err := impl.Get(w.db, messageStoreName, key)
msgObj, err := impl.Get(w.db, messageStoreName, key)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to get message: %+v", err))
return
}
_, err = w.updateMessage(utils.JsToJson(currentMsg), messageID, timestamp,
currentMsg, err := valueToMessage(msgObj)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Failed to marshal Message: %+v", err))
return
}
_, err = w.updateMessage(currentMsg, messageID, timestamp,
round, pinned, hidden, status)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
......@@ -296,13 +296,7 @@ func (w *wasmModel) UpdateFromMessageID(messageID message.ID,
status *channels.SentStatus) uint64 {
parentErr := errors.New("failed to UpdateFromMessageID")
// FIXME: this is a bit of race condition without the mux.
// This should be done via the transactions (i.e., make a
// special version of receiveHelper)
w.updateMux.Lock()
defer w.updateMux.Unlock()
currentMsgObj, err := impl.GetIndex(w.db, messageStoreName,
msgObj, err := impl.GetIndex(w.db, messageStoreName,
messageStoreMessageIndex, impl.EncodeBytes(messageID.Marshal()))
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
......@@ -310,7 +304,13 @@ func (w *wasmModel) UpdateFromMessageID(messageID message.ID,
return 0
}
currentMsg := utils.JsToJson(currentMsgObj)
currentMsg, err := valueToMessage(msgObj)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Failed to marshal Message: %+v", err))
return 0
}
uuid, err := w.updateMessage(currentMsg, &messageID, timestamp,
round, pinned, hidden, status)
if err != nil {
......@@ -320,52 +320,6 @@ func (w *wasmModel) UpdateFromMessageID(messageID message.ID,
return uuid
}
// updateMessage is a helper for updating a stored message.
func (w *wasmModel) updateMessage(currentMsgJson string, messageID *message.ID,
timestamp *time.Time, round *rounds.Round, pinned, hidden *bool,
status *channels.SentStatus) (uint64, error) {
newMessage := &Message{}
err := json.Unmarshal([]byte(currentMsgJson), newMessage)
if err != nil {
return 0, err
}
if status != nil {
newMessage.Status = uint8(*status)
}
if messageID != nil {
newMessage.MessageID = messageID.Bytes()
}
if round != nil {
newMessage.Round = uint64(round.ID)
}
if timestamp != nil {
newMessage.Timestamp = *timestamp
}
if pinned != nil {
newMessage.Pinned = *pinned
}
if hidden != nil {
newMessage.Hidden = *hidden
}
// Store the updated Message
uuid, err := w.receiveHelper(newMessage, true)
if err != nil {
return 0, err
}
channelID := &id.ID{}
copy(channelID[:], newMessage.ChannelID)
go w.receivedMessageCB(uuid, channelID, true)
return uuid, nil
}
// buildMessage is a private helper that converts typical [channels.EventModel]
// inputs into a basic Message structure for insertion into storage.
//
......@@ -397,11 +351,51 @@ func buildMessage(channelID, messageID, parentID []byte, nickname string,
}
}
// receiveHelper is a private helper for receiving any sort of message.
func (w *wasmModel) receiveHelper(
newMessage *Message, isUpdate bool) (uint64, error) {
// updateMessage is a helper for updating a stored message.
func (w *wasmModel) updateMessage(currentMsg *Message, messageID *message.ID,
timestamp *time.Time, round *rounds.Round, pinned, hidden *bool,
status *channels.SentStatus) (uint64, error) {
if status != nil {
currentMsg.Status = uint8(*status)
}
if messageID != nil {
currentMsg.MessageID = messageID.Bytes()
}
if round != nil {
currentMsg.Round = uint64(round.ID)
}
if timestamp != nil {
currentMsg.Timestamp = *timestamp
}
if pinned != nil {
currentMsg.Pinned = *pinned
}
if hidden != nil {
currentMsg.Hidden = *hidden
}
// Store the updated Message
uuid, err := w.upsertMessage(currentMsg)
if err != nil {
return 0, err
}
channelID := &id.ID{}
copy(channelID[:], currentMsg.ChannelID)
go w.receivedMessageCB(uuid, channelID, true)
return uuid, nil
}
// upsertMessage is a helper function that will update an existing record
// if Message.ID is specified. Otherwise, it will perform an insert.
func (w *wasmModel) upsertMessage(msg *Message) (uint64, error) {
// Convert to jsObject
newMessageJson, err := json.Marshal(newMessage)
newMessageJson, err := json.Marshal(msg)
if err != nil {
return 0, errors.Errorf("Unable to marshal Message: %+v", err)
}
......@@ -410,43 +404,29 @@ func (w *wasmModel) receiveHelper(
return 0, errors.Errorf("Unable to marshal Message: %+v", err)
}
// Unset the primaryKey for inserts so that it can be auto-populated and
// incremented
if !isUpdate {
messageObj.Delete("id")
}
// Store message to database
result, err := impl.Put(w.db, messageStoreName, messageObj)
// FIXME: The following is almost certainly causing a bug
// where all of our upsert operations are failing.
if err != nil && !strings.Contains(err.Error(), impl.ErrUniqueConstraint) {
// Only return non-unique constraint errors so that the case
// below this one can be hit and handle duplicate entries properly.
msgIdObj, err := impl.Put(w.db, messageStoreName, messageObj)
if err != nil {
return 0, errors.Errorf("Unable to put Message: %+v", err)
}
// NOTE: Sometimes the insert fails to return an error but hits a duplicate
// insert, so this fallthrough returns the UUID entry in that case.
if result.IsUndefined() {
msgID := message.ID{}
copy(msgID[:], newMessage.MessageID)
msg, errLookup := w.msgIDLookup(msgID)
if errLookup == nil && msg.ID != 0 {
return msg.ID, nil
}
return 0, errors.Errorf("uuid lookup failure: %+v", err)
}
uuid := uint64(result.Int())
uuid := msgIdObj.Int()
jww.DEBUG.Printf("Successfully stored message %d", uuid)
return uuid, nil
return uint64(uuid), nil
}
// GetMessage returns the message with the given [channel.MessageID].
func (w *wasmModel) GetMessage(
messageID message.ID) (channels.ModelMessage, error) {
lookupResult, err := w.msgIDLookup(messageID)
msgIDStr := impl.EncodeBytes(messageID.Marshal())
resultObj, err := impl.GetIndex(w.db, messageStoreName,
messageStoreMessageIndex, msgIDStr)
if err != nil {
return channels.ModelMessage{}, err
}
lookupResult, err := valueToMessage(resultObj)
if err != nil {
return channels.ModelMessage{}, err
}
......@@ -514,20 +494,10 @@ func (w *wasmModel) MuteUser(
go w.mutedUserCB(channelID, pubKey, unmute)
}
// msgIDLookup gets the UUID of the Message with the given messageID.
func (w *wasmModel) msgIDLookup(messageID message.ID) (*Message, error) {
msgIDStr := impl.EncodeBytes(messageID.Marshal())
resultObj, err := impl.GetIndex(w.db, messageStoreName,
messageStoreMessageIndex, msgIDStr)
if err != nil {
return nil, err
} else if resultObj.IsUndefined() {
return nil, errors.Errorf("no message for %s found", msgIDStr)
}
// Process result into string
// valueToMessage is a helper for converting js.Value to Message.
func valueToMessage(msgObj js.Value) (*Message, error) {
resultMsg := &Message{}
err = json.Unmarshal([]byte(utils.JsToJson(resultObj)), resultMsg)
err := json.Unmarshal([]byte(utils.JsToJson(msgObj)), resultMsg)
if err != nil {
return nil, err
}
......
......@@ -47,7 +47,7 @@ func dummyStoreEncryptionStatus(_ string, encryptionStatus bool) (bool, error) {
}
// Happy path, insert message and look it up
func TestWasmModel_msgIDLookup(t *testing.T) {
func TestWasmModel_GetMessage(t *testing.T) {
cipher, err := cryptoChannel.NewCipher(
[]byte("testPass"), []byte("testSalt"), 128, csprng.NewSystemRNG())
if err != nil {
......@@ -58,32 +58,32 @@ func TestWasmModel_msgIDLookup(t *testing.T) {
if c != nil {
cs = "_withCipher"
}
t.Run(fmt.Sprintf("TestWasmModel_msgIDLookup%s", cs), func(t *testing.T) {
storage.GetLocalStorage().Clear()
testString := "TestWasmModel_msgIDLookup" + cs
t.Run(testString, func(t *testing.T) {
storage.GetLocalStorage().Clear()
testMsgId := message.DeriveChannelMessageID(&id.ID{1}, 0, []byte(testString))
eventModel, err2 := newWASMModel(testString, c,
eventModel, err := newWASMModel(testString, c,
dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB,
dummyStoreDatabaseName, dummyStoreEncryptionStatus)
if err2 != nil {
t.Fatal(err2)
if err != nil {
t.Fatal(err)
}
testMsg := buildMessage([]byte(testString), testMsgId.Bytes(), nil,
testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0,
netTime.Now(), time.Second, 0, 0, false, false, channels.Sent)
_, err = eventModel.receiveHelper(testMsg, false)
testMsg := buildMessage(id.NewIdFromBytes([]byte(testString), t).Marshal(),
testMsgId.Bytes(), nil, testString, []byte(testString),
[]byte{8, 6, 7, 5}, 0, 0, netTime.Now(),
time.Second, 0, 0, false, false, channels.Sent)
_, err = eventModel.upsertMessage(testMsg)
if err != nil {
t.Fatal(err)
}
msg, err2 := eventModel.msgIDLookup(testMsgId)
if err2 != nil {
t.Fatal(err2)
msg, err := eventModel.GetMessage(testMsgId)
if err != nil {
t.Fatal(err)
}
if msg.ID == 0 {
if msg.UUID == 0 {
t.Fatalf("Expected to get a UUID!")
}
})
......@@ -106,7 +106,7 @@ func TestWasmModel_DeleteMessage(t *testing.T) {
testMsg := buildMessage([]byte(testString), testMsgId.Bytes(), nil,
testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0, netTime.Now(),
time.Second, 0, 0, false, false, channels.Sent)
_, err = eventModel.receiveHelper(testMsg, false)
_, err = eventModel.upsertMessage(testMsg)
if err != nil {
t.Fatal(err)
}
......@@ -164,7 +164,7 @@ func Test_wasmModel_UpdateSentStatus(t *testing.T) {
testMsg := buildMessage([]byte(testString), testMsgId.Bytes(), nil,
testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0,
netTime.Now(), time.Second, 0, 0, false, false, channels.Sent)
uuid, err2 := eventModel.receiveHelper(testMsg, false)
uuid, err2 := eventModel.upsertMessage(testMsg)
if err2 != nil {
t.Fatal(err2)
}
......@@ -322,36 +322,35 @@ func Test_wasmModel_DuplicateReceives(t *testing.T) {
if c != nil {
cs = "_withCipher"
}
t.Run("Test_wasmModel_DuplicateReceives"+cs, func(t *testing.T) {
testString := "Test_wasmModel_DuplicateReceives" + cs
t.Run(testString, func(t *testing.T) {
storage.GetLocalStorage().Clear()
testString := "testHello"
eventModel, err2 := newWASMModel(testString, c,
eventModel, err := newWASMModel(testString, c,
dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB,
dummyStoreDatabaseName, dummyStoreEncryptionStatus)
if err2 != nil {
t.Fatal(err2)
if err != nil {
t.Fatal(err)
}
uuids := make([]uint64, 10)
// Store a test message
msgID := message.ID{}
copy(msgID[:], testString)
for i := 0; i < 10; i++ {
// Store a test message
channelID := id.NewIdFromBytes([]byte(testString), t)
rnd := rounds.Round{ID: id.Round(42)}
uuid := eventModel.ReceiveMessage(channelID, msgID, "test",
testString+fmt.Sprintf("%d", i), []byte{8, 6, 7, 5}, 0, 0,
testString, []byte{8, 6, 7, 5}, 0, 0,
netTime.Now(), time.Hour, rnd, 0, channels.Sent, false)
uuids[i] = uuid
if uuid != 1 {
t.Fatalf("Expected UUID to be one for first receive")
}
// Store duplicate messages with same messageID
for i := 0; i < 10; i++ {
for j := i + 1; j < 10; j++ {
if uuids[i] != uuids[j] {
t.Fatalf("uuid failed: %d[%d] != %d[%d]",
uuids[i], i, uuids[j], j)
}
uuid = eventModel.ReceiveMessage(channelID, msgID, "test",
testString+fmt.Sprintf("%d", i), []byte{8, 6, 7, 5}, 0, 0,
netTime.Now(), time.Hour, rnd, 0, channels.Sent, false)
if uuid != 0 {
t.Fatalf("Expected UUID to be zero for duplicate receives")
}
}
})
......@@ -371,16 +370,16 @@ func Test_wasmModel_deleteMsgByChannel(t *testing.T) {
if c != nil {
cs = "_withCipher"
}
t.Run("Test_wasmModel_deleteMsgByChannel"+cs, func(t *testing.T) {
testString := "Test_wasmModel_deleteMsgByChannel" + cs
t.Run(testString, func(t *testing.T) {
storage.GetLocalStorage().Clear()
testString := "test_deleteMsgByChannel"
totalMessages := 10
expectedMessages := 5
eventModel, err2 := newWASMModel(testString, c,
eventModel, err := newWASMModel(testString, c,
dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB,
dummyStoreDatabaseName, dummyStoreEncryptionStatus)
if err2 != nil {
t.Fatal(err2)
if err != nil {
t.Fatal(err)
}
// Create a test channel id
......@@ -406,12 +405,12 @@ func Test_wasmModel_deleteMsgByChannel(t *testing.T) {
}
// Check pre-results
result, err2 := impl.Dump(eventModel.db, messageStoreName)
if err2 != nil {
t.Fatal(err2)
result, err := impl.Dump(eventModel.db, messageStoreName)
if err != nil {
t.Fatal(err)
}
if len(result) != totalMessages {
t.Errorf("Expected %d messages, got %d", totalMessages, len(result))
t.Fatalf("Expected %d messages, got %d", totalMessages, len(result))
}
// Do delete
......@@ -426,7 +425,7 @@ func Test_wasmModel_deleteMsgByChannel(t *testing.T) {
t.Fatal(err)
}
if len(result) != expectedMessages {
t.Errorf("Expected %d messages, got %d", expectedMessages, len(result))
t.Fatalf("Expected %d messages, got %d", expectedMessages, len(result))
}
})
}
......@@ -448,26 +447,26 @@ func TestWasmModel_receiveHelper_UniqueIndex(t *testing.T) {
t.Run("TestWasmModel_receiveHelper_UniqueIndex"+cs, func(t *testing.T) {
storage.GetLocalStorage().Clear()
testString := fmt.Sprintf("test_receiveHelper_UniqueIndex_%d", i)
eventModel, err2 := newWASMModel(testString, c,
eventModel, err := newWASMModel(testString, c,
dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB,
dummyStoreDatabaseName, dummyStoreEncryptionStatus)
if err2 != nil {
t.Fatal(err2)
if err != nil {
t.Fatal(err)
}
// Ensure index is unique
txn, err2 := eventModel.db.Transaction(
txn, err := eventModel.db.Transaction(
idb.TransactionReadOnly, messageStoreName)
if err2 != nil {
t.Fatal(err2)
if err != nil {
t.Fatal(err)
}
store, err2 := txn.ObjectStore(messageStoreName)
if err2 != nil {
t.Fatal(err2)
store, err := txn.ObjectStore(messageStoreName)
if err != nil {
t.Fatal(err)
}
idx, err2 := store.Index(messageStoreMessageIndex)
if err2 != nil {
t.Fatal(err2)
idx, err := store.Index(messageStoreMessageIndex)
if err != nil {
t.Fatal(err)
}
if isUnique, err3 := idx.Unique(); !isUnique {
t.Fatalf("Index is not unique!")
......@@ -475,49 +474,49 @@ func TestWasmModel_receiveHelper_UniqueIndex(t *testing.T) {
t.Fatal(err3)
}
// First message insert should succeed
testMsgId := message.DeriveChannelMessageID(&id.ID{1}, 0, []byte(testString))
testMsg := buildMessage([]byte(testString), testMsgId.Bytes(), nil,
testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0,
netTime.Now(), time.Second, 0, 0, false, false, channels.Sent)
uuid, err2 := eventModel.receiveHelper(testMsg, false)
if err2 != nil {
t.Fatal(err2)
testMsgId2 := message.DeriveChannelMessageID(&id.ID{2}, 0, []byte(testString))
testMsg2 := buildMessage([]byte(testString), testMsgId2.Bytes(), nil,
testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0,
netTime.Now(), time.Second, 0, 0, false, false, channels.Sent)
// First message insert should succeed
uuid, err := eventModel.upsertMessage(testMsg)
if err != nil {
t.Fatal(err)
}
// The duplicate entry should return the same UUID
duplicateUuid, err2 := eventModel.receiveHelper(testMsg, false)
if err2 != nil {
t.Fatal(err2)
// The duplicate entry should fail
duplicateUuid, err := eventModel.upsertMessage(testMsg)
if err == nil {
t.Fatal("Expected error to happen")
}
if uuid != duplicateUuid {
t.Fatalf("Expected UUID %d to match %d", uuid, duplicateUuid)
if duplicateUuid != 0 {
t.Fatalf("Expected UUID %d to be 0", duplicateUuid)
}
// Now insert a message with a different message ID from the first
testMsgId2 := message.DeriveChannelMessageID(
&id.ID{2}, 0, []byte(testString))
testMsg = buildMessage([]byte(testString), testMsgId2.Bytes(), nil,
testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0,
netTime.Now(), time.Second, 0, 0, false, false, channels.Sent)
uuid2, err2 := eventModel.receiveHelper(testMsg, false)
if err2 != nil {
t.Fatal(err2)
uuid2, err := eventModel.upsertMessage(testMsg2)
if err != nil {
t.Fatal(err)
}
if uuid2 == uuid {
t.Fatalf("Expected UUID %d to NOT match %d", uuid, duplicateUuid)
t.Fatalf("Expected UUID %d to NOT match %d", uuid, uuid2)
}
// Except this time, we update the second entry to have the same
// message ID as the first
testMsg.ID = uuid
testMsg.MessageID = testMsgId.Bytes()
duplicateUuid2, err2 := eventModel.receiveHelper(testMsg, true)
if err2 != nil {
t.Fatal(err2)
testMsg2.MessageID = testMsgId.Bytes()
duplicateUuid, err = eventModel.upsertMessage(testMsg)
if err == nil {
t.Fatal("Expected error to happen")
}
if duplicateUuid2 != duplicateUuid {
t.Fatalf("Expected UUID %d to match %d", uuid, duplicateUuid)
if duplicateUuid != 0 {
t.Fatalf("Expected UUID %d to be 0", uuid)
}
})
}
......
......@@ -45,7 +45,7 @@ const (
// The user's nickname can change each message, but the rest does not. We
// still duplicate all of it for each entry to simplify code for now.
type Message struct {
ID uint64 `json:"id"` // Matches pkeyName
ID uint64 `json:"id,omitempty"` // Matches pkeyName
Nickname string `json:"nickname"`
MessageID []byte `json:"message_id"` // Index
ChannelID []byte `json:"channel_id"` // Index
......
......@@ -14,7 +14,6 @@ import (
"crypto/ed25519"
"encoding/json"
"strings"
"sync"
"syscall/js"
"time"
......@@ -33,11 +32,12 @@ import (
// wasmModel implements dm.EventModel interface, which uses the channels system
// passed an object that adheres to in order to get events on the channel.
// NOTE: This model is NOT thread safe - it is the responsibility of the
// caller to ensure that its methods are called sequentially.
type wasmModel struct {
db *idb.Database
cipher cryptoChannel.Cipher
receivedMessageCB MessageReceivedCallback
updateMux sync.Mutex
}
// upsertConversation is used for joining or updating a Conversation.
......@@ -162,13 +162,7 @@ func (w *wasmModel) ReceiveReaction(messageID, reactionTo message.ID, nickname,
func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID,
timestamp time.Time, round rounds.Round, status dm.Status) {
parentErr := errors.New("failed to UpdateSentStatus")
// FIXME: this is a bit of race condition without the mux.
// This should be done via the transactions (i.e., make a
// special version of receiveHelper)
w.updateMux.Lock()
defer w.updateMux.Unlock()
parentErr := errors.New("[DM indexedDB] failed to UpdateSentStatus")
jww.TRACE.Printf(
"[DM indexedDB] UpdateSentStatus(%d, %s, ...)", uuid, messageID)
......@@ -178,7 +172,7 @@ func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID,
// Use the key to get the existing Message
currentMsg, err := impl.Get(w.db, messageStoreName, key)
if err != nil {
jww.ERROR.Printf("[DM indexedDB] %+v", errors.WithMessagef(parentErr,
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to get message: %+v", err))
return
}
......@@ -187,7 +181,7 @@ func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID,
newMessage := &Message{}
err = json.Unmarshal([]byte(utils.JsToJson(currentMsg)), newMessage)
if err != nil {
jww.ERROR.Printf("[DM indexedDB] %+v", errors.WithMessagef(parentErr,
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Could not JSON unmarshal message: %+v", err))
return
}
......@@ -206,10 +200,9 @@ func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID,
}
// Store the updated Message
_, err = w.receiveHelper(newMessage, true)
_, err = w.upsertMessage(newMessage)
if err != nil {
jww.ERROR.Printf("[DM indexedDB] %+v",
errors.Wrap(parentErr, err.Error()))
jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error()))
return
}
......@@ -219,7 +212,7 @@ func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID,
true, false)
}
// receiveWrapper is a higher-level wrapper of receiveHelper.
// receiveWrapper is a higher-level wrapper of upsertMessage.
func (w *wasmModel) receiveWrapper(messageID message.ID, parentID *message.ID, nickname,
data string, partnerKey, senderKey ed25519.PublicKey, dmToken uint32, codeset uint8,
timestamp time.Time, round rounds.Round, mType dm.MessageType, status dm.Status) (uint64, error) {
......@@ -278,7 +271,7 @@ func (w *wasmModel) receiveWrapper(messageID message.ID, parentID *message.ID, n
msgToInsert := buildMessage(messageID.Bytes(), parentIdBytes, textBytes,
partnerKey, senderKey, timestamp, round.ID, mType, codeset, status)
uuid, err := w.receiveHelper(msgToInsert, false)
uuid, err := w.upsertMessage(msgToInsert)
if err != nil {
return 0, err
}
......@@ -289,11 +282,11 @@ func (w *wasmModel) receiveWrapper(messageID message.ID, parentID *message.ID, n
return uuid, nil
}
// receiveHelper is a private helper for receiving any sort of message.
func (w *wasmModel) receiveHelper(
newMessage *Message, isUpdate bool) (uint64, error) {
// upsertMessage is a helper function that will update an existing record
// if Message.ID is specified. Otherwise, it will perform an insert.
func (w *wasmModel) upsertMessage(msg *Message) (uint64, error) {
// Convert to jsObject
newMessageJson, err := json.Marshal(newMessage)
newMessageJson, err := json.Marshal(msg)
if err != nil {
return 0, errors.Errorf("Unable to marshal Message: %+v", err)
}
......@@ -302,54 +295,15 @@ func (w *wasmModel) receiveHelper(
return 0, errors.Errorf("Unable to marshal Message: %+v", err)
}
// Unset the primaryKey for inserts so that it can be autopopulated and
// incremented
if !isUpdate {
messageObj.Delete("id")
}
// Store message to database
result, err := impl.Put(w.db, messageStoreName, messageObj)
// FIXME: The following is almost certainly causing a bug
// where all of our upsert operations are failing.
if err != nil && !strings.Contains(err.Error(), impl.ErrUniqueConstraint) {
// Only return non-unique constraint errors so that the case
// below this one can be hit and handle duplicate entries properly.
msgIdObj, err := impl.Put(w.db, messageStoreName, messageObj)
if err != nil {
return 0, errors.Errorf("Unable to put Message: %+v", err)
}
// NOTE: Sometimes the insert fails to return an error but hits a duplicate
// insert, so this fallthrough returns the UUID entry in that case.
if result.IsUndefined() {
msgID := message.ID{}
copy(msgID[:], newMessage.MessageID)
uuid, errLookup := w.msgIDLookup(msgID)
if uuid != 0 && errLookup == nil {
jww.WARN.Printf("[DM indexedDB] Result undefined, but found"+
" duplicate? %d, %s", uuid, msgID)
return uuid, nil
}
return 0, errors.Errorf("uuid lookup failure: %+v", err)
}
uuid := uint64(result.Int())
uuid := msgIdObj.Int()
jww.DEBUG.Printf("[DM indexedDB] Successfully stored message %d", uuid)
return uuid, nil
}
// msgIDLookup gets the UUID of the Message with the given messageID.
func (w *wasmModel) msgIDLookup(messageID message.ID) (uint64, error) {
resultObj, err := impl.GetIndex(w.db, messageStoreName,
messageStoreMessageIndex, impl.EncodeBytes(messageID.Marshal()))
if err != nil {
return 0, err
}
uuid := uint64(0)
if !resultObj.IsUndefined() {
uuid = uint64(resultObj.Get("id").Int())
}
return uuid, nil
return uint64(uuid), nil
}
// BlockSender silences messages sent by the indicated sender
......
......@@ -16,7 +16,7 @@ import (
"testing"
)
func dummyReceivedMessageCB(uint64, ed25519.PublicKey, bool) {}
func dummyReceivedMessageCB(uint64, ed25519.PublicKey, bool, bool) {}
func dummyStoreDatabaseName(string) error { return nil }
func dummyStoreEncryptionStatus(_ string, encryptionStatus bool) (bool, error) {
return encryptionStatus, nil
......@@ -37,7 +37,7 @@ func TestWasmModel_BlockSender(t *testing.T) {
// Insert a test convo
testPubKey := ed25519.PublicKey{}
err = m.joinConversation("test", testPubKey, 0, 0)
err = m.upsertConversation("test", testPubKey, 0, 0, false)
if err != nil {
t.Fatal(err.Error())
}
......
......@@ -176,7 +176,8 @@ func GetIndex(db *idb.Database, objectStoreName,
}
// Put is a generic helper for putting values into the given [idb.ObjectStore].
// Equivalent to insert if not exists else update.
// Equivalent to insert if not exists else update. Returns the primary key of
// the stored object as a js.Value.
func Put(db *idb.Database, objectStoreName string, value js.Value) (js.Value, error) {
// Prepare the Transaction
txn, err := db.Transaction(idb.TransactionReadWrite, objectStoreName)
......@@ -199,7 +200,8 @@ func Put(db *idb.Database, objectStoreName string, value js.Value) (js.Value, er
result, err := request.Await(ctx)
cancel()
if err != nil {
return js.Undefined(), errors.Errorf("Putting value failed: %+v", err)
return js.Undefined(), errors.Errorf("Putting value failed: %+v\n%s",
err, utils.JsToJson(value))
}
jww.DEBUG.Printf("Successfully put value in %s: %s",
objectStoreName, utils.JsToJson(value))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment