diff --git a/indexedDb/impl/channels/implementation.go b/indexedDb/impl/channels/implementation.go index 703e4bb0f22fee52209bb5eb63818ca887910e24..b7d14d8b72c0dbb5285018d37fc0c1c91a2de8d5 100644 --- a/indexedDb/impl/channels/implementation.go +++ b/indexedDb/impl/channels/implementation.go @@ -13,8 +13,6 @@ import ( "crypto/ed25519" "encoding/json" "strconv" - "strings" - "sync" "syscall/js" "time" @@ -36,13 +34,14 @@ import ( // wasmModel implements [channels.EventModel] interface, which uses the channels // system passed an object that adheres to in order to get events on the // channel. +// NOTE: This model is NOT thread safe - it is the responsibility of the +// caller to ensure that its methods are called sequentially. type wasmModel struct { db *idb.Database cipher cryptoChannel.Cipher receivedMessageCB wChannels.MessageReceivedCallback deletedMessageCB wChannels.DeletedMessageCallback mutedUserCB wChannels.MutedUserCallback - updateMux sync.Mutex } // JoinChannel is called whenever a channel is joined locally. @@ -166,7 +165,7 @@ func (w *wasmModel) ReceiveMessage(channelID *id.ID, messageID message.ID, textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType, false, hidden, status) - uuid, err := w.receiveHelper(msgToInsert, false) + uuid, err := w.upsertMessage(msgToInsert) if err != nil { jww.ERROR.Printf("Failed to receive Message: %+v", err) } @@ -202,7 +201,7 @@ func (w *wasmModel) ReceiveReply(channelID *id.ID, messageID, replyTo.Bytes(), nickname, textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType, hidden, false, status) - uuid, err := w.receiveHelper(msgToInsert, false) + uuid, err := w.upsertMessage(msgToInsert) if err != nil { jww.ERROR.Printf("Failed to receive reply: %+v", err) @@ -239,7 +238,7 @@ func (w *wasmModel) ReceiveReaction(channelID *id.ID, messageID, textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType, false, hidden, status) - uuid, err := w.receiveHelper(msgToInsert, false) + uuid, err := w.upsertMessage(msgToInsert) if err != nil { jww.ERROR.Printf("Failed to receive reaction: %+v", err) } @@ -257,24 +256,25 @@ func (w *wasmModel) UpdateFromUUID(uuid uint64, messageID *message.ID, status *channels.SentStatus) { parentErr := errors.New("failed to UpdateFromUUID") - // FIXME: this is a bit of race condition without the mux. - // This should be done via the transactions (i.e., make a - // special version of receiveHelper) - w.updateMux.Lock() - defer w.updateMux.Unlock() - // Convert messageID to the key generated by json.Marshal key := js.ValueOf(uuid) // Use the key to get the existing Message - currentMsg, err := impl.Get(w.db, messageStoreName, key) + msgObj, err := impl.Get(w.db, messageStoreName, key) if err != nil { jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, "Unable to get message: %+v", err)) return } - _, err = w.updateMessage(utils.JsToJson(currentMsg), messageID, timestamp, + currentMsg, err := valueToMessage(msgObj) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Failed to marshal Message: %+v", err)) + return + } + + _, err = w.updateMessage(currentMsg, messageID, timestamp, round, pinned, hidden, status) if err != nil { jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, @@ -296,13 +296,7 @@ func (w *wasmModel) UpdateFromMessageID(messageID message.ID, status *channels.SentStatus) uint64 { parentErr := errors.New("failed to UpdateFromMessageID") - // FIXME: this is a bit of race condition without the mux. - // This should be done via the transactions (i.e., make a - // special version of receiveHelper) - w.updateMux.Lock() - defer w.updateMux.Unlock() - - currentMsgObj, err := impl.GetIndex(w.db, messageStoreName, + msgObj, err := impl.GetIndex(w.db, messageStoreName, messageStoreMessageIndex, impl.EncodeBytes(messageID.Marshal())) if err != nil { jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, @@ -310,7 +304,13 @@ func (w *wasmModel) UpdateFromMessageID(messageID message.ID, return 0 } - currentMsg := utils.JsToJson(currentMsgObj) + currentMsg, err := valueToMessage(msgObj) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Failed to marshal Message: %+v", err)) + return 0 + } + uuid, err := w.updateMessage(currentMsg, &messageID, timestamp, round, pinned, hidden, status) if err != nil { @@ -320,52 +320,6 @@ func (w *wasmModel) UpdateFromMessageID(messageID message.ID, return uuid } -// updateMessage is a helper for updating a stored message. -func (w *wasmModel) updateMessage(currentMsgJson string, messageID *message.ID, - timestamp *time.Time, round *rounds.Round, pinned, hidden *bool, - status *channels.SentStatus) (uint64, error) { - - newMessage := &Message{} - err := json.Unmarshal([]byte(currentMsgJson), newMessage) - if err != nil { - return 0, err - } - - if status != nil { - newMessage.Status = uint8(*status) - } - if messageID != nil { - newMessage.MessageID = messageID.Bytes() - } - - if round != nil { - newMessage.Round = uint64(round.ID) - } - - if timestamp != nil { - newMessage.Timestamp = *timestamp - } - - if pinned != nil { - newMessage.Pinned = *pinned - } - - if hidden != nil { - newMessage.Hidden = *hidden - } - - // Store the updated Message - uuid, err := w.receiveHelper(newMessage, true) - if err != nil { - return 0, err - } - channelID := &id.ID{} - copy(channelID[:], newMessage.ChannelID) - go w.receivedMessageCB(uuid, channelID, true) - - return uuid, nil -} - // buildMessage is a private helper that converts typical [channels.EventModel] // inputs into a basic Message structure for insertion into storage. // @@ -397,11 +351,51 @@ func buildMessage(channelID, messageID, parentID []byte, nickname string, } } -// receiveHelper is a private helper for receiving any sort of message. -func (w *wasmModel) receiveHelper( - newMessage *Message, isUpdate bool) (uint64, error) { +// updateMessage is a helper for updating a stored message. +func (w *wasmModel) updateMessage(currentMsg *Message, messageID *message.ID, + timestamp *time.Time, round *rounds.Round, pinned, hidden *bool, + status *channels.SentStatus) (uint64, error) { + + if status != nil { + currentMsg.Status = uint8(*status) + } + if messageID != nil { + currentMsg.MessageID = messageID.Bytes() + } + + if round != nil { + currentMsg.Round = uint64(round.ID) + } + + if timestamp != nil { + currentMsg.Timestamp = *timestamp + } + + if pinned != nil { + currentMsg.Pinned = *pinned + } + + if hidden != nil { + currentMsg.Hidden = *hidden + } + + // Store the updated Message + uuid, err := w.upsertMessage(currentMsg) + if err != nil { + return 0, err + } + channelID := &id.ID{} + copy(channelID[:], currentMsg.ChannelID) + go w.receivedMessageCB(uuid, channelID, true) + + return uuid, nil +} + +// upsertMessage is a helper function that will update an existing record +// if Message.ID is specified. Otherwise, it will perform an insert. +func (w *wasmModel) upsertMessage(msg *Message) (uint64, error) { // Convert to jsObject - newMessageJson, err := json.Marshal(newMessage) + newMessageJson, err := json.Marshal(msg) if err != nil { return 0, errors.Errorf("Unable to marshal Message: %+v", err) } @@ -410,43 +404,29 @@ func (w *wasmModel) receiveHelper( return 0, errors.Errorf("Unable to marshal Message: %+v", err) } - // Unset the primaryKey for inserts so that it can be auto-populated and - // incremented - if !isUpdate { - messageObj.Delete("id") - } - // Store message to database - result, err := impl.Put(w.db, messageStoreName, messageObj) - // FIXME: The following is almost certainly causing a bug - // where all of our upsert operations are failing. - if err != nil && !strings.Contains(err.Error(), impl.ErrUniqueConstraint) { - // Only return non-unique constraint errors so that the case - // below this one can be hit and handle duplicate entries properly. + msgIdObj, err := impl.Put(w.db, messageStoreName, messageObj) + if err != nil { return 0, errors.Errorf("Unable to put Message: %+v", err) } - // NOTE: Sometimes the insert fails to return an error but hits a duplicate - // insert, so this fallthrough returns the UUID entry in that case. - if result.IsUndefined() { - msgID := message.ID{} - copy(msgID[:], newMessage.MessageID) - msg, errLookup := w.msgIDLookup(msgID) - if errLookup == nil && msg.ID != 0 { - return msg.ID, nil - } - return 0, errors.Errorf("uuid lookup failure: %+v", err) - } - uuid := uint64(result.Int()) + uuid := msgIdObj.Int() jww.DEBUG.Printf("Successfully stored message %d", uuid) - - return uuid, nil + return uint64(uuid), nil } // GetMessage returns the message with the given [channel.MessageID]. func (w *wasmModel) GetMessage( messageID message.ID) (channels.ModelMessage, error) { - lookupResult, err := w.msgIDLookup(messageID) + msgIDStr := impl.EncodeBytes(messageID.Marshal()) + + resultObj, err := impl.GetIndex(w.db, messageStoreName, + messageStoreMessageIndex, msgIDStr) + if err != nil { + return channels.ModelMessage{}, err + } + + lookupResult, err := valueToMessage(resultObj) if err != nil { return channels.ModelMessage{}, err } @@ -514,20 +494,10 @@ func (w *wasmModel) MuteUser( go w.mutedUserCB(channelID, pubKey, unmute) } -// msgIDLookup gets the UUID of the Message with the given messageID. -func (w *wasmModel) msgIDLookup(messageID message.ID) (*Message, error) { - msgIDStr := impl.EncodeBytes(messageID.Marshal()) - resultObj, err := impl.GetIndex(w.db, messageStoreName, - messageStoreMessageIndex, msgIDStr) - if err != nil { - return nil, err - } else if resultObj.IsUndefined() { - return nil, errors.Errorf("no message for %s found", msgIDStr) - } - - // Process result into string +// valueToMessage is a helper for converting js.Value to Message. +func valueToMessage(msgObj js.Value) (*Message, error) { resultMsg := &Message{} - err = json.Unmarshal([]byte(utils.JsToJson(resultObj)), resultMsg) + err := json.Unmarshal([]byte(utils.JsToJson(msgObj)), resultMsg) if err != nil { return nil, err } diff --git a/indexedDb/impl/channels/implementation_test.go b/indexedDb/impl/channels/implementation_test.go index 20bb4ed7aa9041a1339b54b34b234022b97e9933..67aaa6514777d2419414d800041bc84c6d6bec53 100644 --- a/indexedDb/impl/channels/implementation_test.go +++ b/indexedDb/impl/channels/implementation_test.go @@ -47,7 +47,7 @@ func dummyStoreEncryptionStatus(_ string, encryptionStatus bool) (bool, error) { } // Happy path, insert message and look it up -func TestWasmModel_msgIDLookup(t *testing.T) { +func TestWasmModel_GetMessage(t *testing.T) { cipher, err := cryptoChannel.NewCipher( []byte("testPass"), []byte("testSalt"), 128, csprng.NewSystemRNG()) if err != nil { @@ -58,32 +58,32 @@ func TestWasmModel_msgIDLookup(t *testing.T) { if c != nil { cs = "_withCipher" } - t.Run(fmt.Sprintf("TestWasmModel_msgIDLookup%s", cs), func(t *testing.T) { - + testString := "TestWasmModel_msgIDLookup" + cs + t.Run(testString, func(t *testing.T) { storage.GetLocalStorage().Clear() - testString := "TestWasmModel_msgIDLookup" + cs testMsgId := message.DeriveChannelMessageID(&id.ID{1}, 0, []byte(testString)) - eventModel, err2 := newWASMModel(testString, c, + eventModel, err := newWASMModel(testString, c, dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB, dummyStoreDatabaseName, dummyStoreEncryptionStatus) - if err2 != nil { - t.Fatal(err2) + if err != nil { + t.Fatal(err) } - testMsg := buildMessage([]byte(testString), testMsgId.Bytes(), nil, - testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0, - netTime.Now(), time.Second, 0, 0, false, false, channels.Sent) - _, err = eventModel.receiveHelper(testMsg, false) + testMsg := buildMessage(id.NewIdFromBytes([]byte(testString), t).Marshal(), + testMsgId.Bytes(), nil, testString, []byte(testString), + []byte{8, 6, 7, 5}, 0, 0, netTime.Now(), + time.Second, 0, 0, false, false, channels.Sent) + _, err = eventModel.upsertMessage(testMsg) if err != nil { t.Fatal(err) } - msg, err2 := eventModel.msgIDLookup(testMsgId) - if err2 != nil { - t.Fatal(err2) + msg, err := eventModel.GetMessage(testMsgId) + if err != nil { + t.Fatal(err) } - if msg.ID == 0 { + if msg.UUID == 0 { t.Fatalf("Expected to get a UUID!") } }) @@ -106,7 +106,7 @@ func TestWasmModel_DeleteMessage(t *testing.T) { testMsg := buildMessage([]byte(testString), testMsgId.Bytes(), nil, testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0, netTime.Now(), time.Second, 0, 0, false, false, channels.Sent) - _, err = eventModel.receiveHelper(testMsg, false) + _, err = eventModel.upsertMessage(testMsg) if err != nil { t.Fatal(err) } @@ -164,7 +164,7 @@ func Test_wasmModel_UpdateSentStatus(t *testing.T) { testMsg := buildMessage([]byte(testString), testMsgId.Bytes(), nil, testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0, netTime.Now(), time.Second, 0, 0, false, false, channels.Sent) - uuid, err2 := eventModel.receiveHelper(testMsg, false) + uuid, err2 := eventModel.upsertMessage(testMsg) if err2 != nil { t.Fatal(err2) } @@ -322,36 +322,35 @@ func Test_wasmModel_DuplicateReceives(t *testing.T) { if c != nil { cs = "_withCipher" } - t.Run("Test_wasmModel_DuplicateReceives"+cs, func(t *testing.T) { + testString := "Test_wasmModel_DuplicateReceives" + cs + t.Run(testString, func(t *testing.T) { storage.GetLocalStorage().Clear() - testString := "testHello" - eventModel, err2 := newWASMModel(testString, c, + eventModel, err := newWASMModel(testString, c, dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB, dummyStoreDatabaseName, dummyStoreEncryptionStatus) - if err2 != nil { - t.Fatal(err2) + if err != nil { + t.Fatal(err) } - uuids := make([]uint64, 10) - + // Store a test message msgID := message.ID{} copy(msgID[:], testString) - for i := 0; i < 10; i++ { - // Store a test message - channelID := id.NewIdFromBytes([]byte(testString), t) - rnd := rounds.Round{ID: id.Round(42)} - uuid := eventModel.ReceiveMessage(channelID, msgID, "test", - testString+fmt.Sprintf("%d", i), []byte{8, 6, 7, 5}, 0, 0, - netTime.Now(), time.Hour, rnd, 0, channels.Sent, false) - uuids[i] = uuid + channelID := id.NewIdFromBytes([]byte(testString), t) + rnd := rounds.Round{ID: id.Round(42)} + uuid := eventModel.ReceiveMessage(channelID, msgID, "test", + testString, []byte{8, 6, 7, 5}, 0, 0, + netTime.Now(), time.Hour, rnd, 0, channels.Sent, false) + if uuid != 1 { + t.Fatalf("Expected UUID to be one for first receive") } + // Store duplicate messages with same messageID for i := 0; i < 10; i++ { - for j := i + 1; j < 10; j++ { - if uuids[i] != uuids[j] { - t.Fatalf("uuid failed: %d[%d] != %d[%d]", - uuids[i], i, uuids[j], j) - } + uuid = eventModel.ReceiveMessage(channelID, msgID, "test", + testString+fmt.Sprintf("%d", i), []byte{8, 6, 7, 5}, 0, 0, + netTime.Now(), time.Hour, rnd, 0, channels.Sent, false) + if uuid != 0 { + t.Fatalf("Expected UUID to be zero for duplicate receives") } } }) @@ -371,16 +370,16 @@ func Test_wasmModel_deleteMsgByChannel(t *testing.T) { if c != nil { cs = "_withCipher" } - t.Run("Test_wasmModel_deleteMsgByChannel"+cs, func(t *testing.T) { + testString := "Test_wasmModel_deleteMsgByChannel" + cs + t.Run(testString, func(t *testing.T) { storage.GetLocalStorage().Clear() - testString := "test_deleteMsgByChannel" totalMessages := 10 expectedMessages := 5 - eventModel, err2 := newWASMModel(testString, c, + eventModel, err := newWASMModel(testString, c, dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB, dummyStoreDatabaseName, dummyStoreEncryptionStatus) - if err2 != nil { - t.Fatal(err2) + if err != nil { + t.Fatal(err) } // Create a test channel id @@ -406,12 +405,12 @@ func Test_wasmModel_deleteMsgByChannel(t *testing.T) { } // Check pre-results - result, err2 := impl.Dump(eventModel.db, messageStoreName) - if err2 != nil { - t.Fatal(err2) + result, err := impl.Dump(eventModel.db, messageStoreName) + if err != nil { + t.Fatal(err) } if len(result) != totalMessages { - t.Errorf("Expected %d messages, got %d", totalMessages, len(result)) + t.Fatalf("Expected %d messages, got %d", totalMessages, len(result)) } // Do delete @@ -426,7 +425,7 @@ func Test_wasmModel_deleteMsgByChannel(t *testing.T) { t.Fatal(err) } if len(result) != expectedMessages { - t.Errorf("Expected %d messages, got %d", expectedMessages, len(result)) + t.Fatalf("Expected %d messages, got %d", expectedMessages, len(result)) } }) } @@ -448,26 +447,26 @@ func TestWasmModel_receiveHelper_UniqueIndex(t *testing.T) { t.Run("TestWasmModel_receiveHelper_UniqueIndex"+cs, func(t *testing.T) { storage.GetLocalStorage().Clear() testString := fmt.Sprintf("test_receiveHelper_UniqueIndex_%d", i) - eventModel, err2 := newWASMModel(testString, c, + eventModel, err := newWASMModel(testString, c, dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB, dummyStoreDatabaseName, dummyStoreEncryptionStatus) - if err2 != nil { - t.Fatal(err2) + if err != nil { + t.Fatal(err) } // Ensure index is unique - txn, err2 := eventModel.db.Transaction( + txn, err := eventModel.db.Transaction( idb.TransactionReadOnly, messageStoreName) - if err2 != nil { - t.Fatal(err2) + if err != nil { + t.Fatal(err) } - store, err2 := txn.ObjectStore(messageStoreName) - if err2 != nil { - t.Fatal(err2) + store, err := txn.ObjectStore(messageStoreName) + if err != nil { + t.Fatal(err) } - idx, err2 := store.Index(messageStoreMessageIndex) - if err2 != nil { - t.Fatal(err2) + idx, err := store.Index(messageStoreMessageIndex) + if err != nil { + t.Fatal(err) } if isUnique, err3 := idx.Unique(); !isUnique { t.Fatalf("Index is not unique!") @@ -475,49 +474,49 @@ func TestWasmModel_receiveHelper_UniqueIndex(t *testing.T) { t.Fatal(err3) } - // First message insert should succeed testMsgId := message.DeriveChannelMessageID(&id.ID{1}, 0, []byte(testString)) testMsg := buildMessage([]byte(testString), testMsgId.Bytes(), nil, testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0, netTime.Now(), time.Second, 0, 0, false, false, channels.Sent) - uuid, err2 := eventModel.receiveHelper(testMsg, false) - if err2 != nil { - t.Fatal(err2) + + testMsgId2 := message.DeriveChannelMessageID(&id.ID{2}, 0, []byte(testString)) + testMsg2 := buildMessage([]byte(testString), testMsgId2.Bytes(), nil, + testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0, + netTime.Now(), time.Second, 0, 0, false, false, channels.Sent) + + // First message insert should succeed + uuid, err := eventModel.upsertMessage(testMsg) + if err != nil { + t.Fatal(err) } - // The duplicate entry should return the same UUID - duplicateUuid, err2 := eventModel.receiveHelper(testMsg, false) - if err2 != nil { - t.Fatal(err2) + // The duplicate entry should fail + duplicateUuid, err := eventModel.upsertMessage(testMsg) + if err == nil { + t.Fatal("Expected error to happen") } - if uuid != duplicateUuid { - t.Fatalf("Expected UUID %d to match %d", uuid, duplicateUuid) + if duplicateUuid != 0 { + t.Fatalf("Expected UUID %d to be 0", duplicateUuid) } // Now insert a message with a different message ID from the first - testMsgId2 := message.DeriveChannelMessageID( - &id.ID{2}, 0, []byte(testString)) - testMsg = buildMessage([]byte(testString), testMsgId2.Bytes(), nil, - testString, []byte(testString), []byte{8, 6, 7, 5}, 0, 0, - netTime.Now(), time.Second, 0, 0, false, false, channels.Sent) - uuid2, err2 := eventModel.receiveHelper(testMsg, false) - if err2 != nil { - t.Fatal(err2) + uuid2, err := eventModel.upsertMessage(testMsg2) + if err != nil { + t.Fatal(err) } if uuid2 == uuid { - t.Fatalf("Expected UUID %d to NOT match %d", uuid, duplicateUuid) + t.Fatalf("Expected UUID %d to NOT match %d", uuid, uuid2) } // Except this time, we update the second entry to have the same // message ID as the first - testMsg.ID = uuid - testMsg.MessageID = testMsgId.Bytes() - duplicateUuid2, err2 := eventModel.receiveHelper(testMsg, true) - if err2 != nil { - t.Fatal(err2) + testMsg2.MessageID = testMsgId.Bytes() + duplicateUuid, err = eventModel.upsertMessage(testMsg) + if err == nil { + t.Fatal("Expected error to happen") } - if duplicateUuid2 != duplicateUuid { - t.Fatalf("Expected UUID %d to match %d", uuid, duplicateUuid) + if duplicateUuid != 0 { + t.Fatalf("Expected UUID %d to be 0", uuid) } }) } diff --git a/indexedDb/impl/channels/model.go b/indexedDb/impl/channels/model.go index 5ea934d7c7ca64d5dad74c1ebafc9fff2427b9ab..d1dcee88b3efd5f4a877685366154caefaa46a82 100644 --- a/indexedDb/impl/channels/model.go +++ b/indexedDb/impl/channels/model.go @@ -45,7 +45,7 @@ const ( // The user's nickname can change each message, but the rest does not. We // still duplicate all of it for each entry to simplify code for now. type Message struct { - ID uint64 `json:"id"` // Matches pkeyName + ID uint64 `json:"id,omitempty"` // Matches pkeyName Nickname string `json:"nickname"` MessageID []byte `json:"message_id"` // Index ChannelID []byte `json:"channel_id"` // Index diff --git a/indexedDb/impl/dm/implementation.go b/indexedDb/impl/dm/implementation.go index 43dc52c62f2485c2b83dc3a33f3288db741414d0..e4c314a09b5c639a0012355c3acb1f3fc59ab6b7 100644 --- a/indexedDb/impl/dm/implementation.go +++ b/indexedDb/impl/dm/implementation.go @@ -14,7 +14,6 @@ import ( "crypto/ed25519" "encoding/json" "strings" - "sync" "syscall/js" "time" @@ -33,11 +32,12 @@ import ( // wasmModel implements dm.EventModel interface, which uses the channels system // passed an object that adheres to in order to get events on the channel. +// NOTE: This model is NOT thread safe - it is the responsibility of the +// caller to ensure that its methods are called sequentially. type wasmModel struct { db *idb.Database cipher cryptoChannel.Cipher receivedMessageCB MessageReceivedCallback - updateMux sync.Mutex } // upsertConversation is used for joining or updating a Conversation. @@ -162,13 +162,7 @@ func (w *wasmModel) ReceiveReaction(messageID, reactionTo message.ID, nickname, func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID, timestamp time.Time, round rounds.Round, status dm.Status) { - parentErr := errors.New("failed to UpdateSentStatus") - - // FIXME: this is a bit of race condition without the mux. - // This should be done via the transactions (i.e., make a - // special version of receiveHelper) - w.updateMux.Lock() - defer w.updateMux.Unlock() + parentErr := errors.New("[DM indexedDB] failed to UpdateSentStatus") jww.TRACE.Printf( "[DM indexedDB] UpdateSentStatus(%d, %s, ...)", uuid, messageID) @@ -178,7 +172,7 @@ func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID, // Use the key to get the existing Message currentMsg, err := impl.Get(w.db, messageStoreName, key) if err != nil { - jww.ERROR.Printf("[DM indexedDB] %+v", errors.WithMessagef(parentErr, + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, "Unable to get message: %+v", err)) return } @@ -187,7 +181,7 @@ func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID, newMessage := &Message{} err = json.Unmarshal([]byte(utils.JsToJson(currentMsg)), newMessage) if err != nil { - jww.ERROR.Printf("[DM indexedDB] %+v", errors.WithMessagef(parentErr, + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, "Could not JSON unmarshal message: %+v", err)) return } @@ -206,10 +200,9 @@ func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID, } // Store the updated Message - _, err = w.receiveHelper(newMessage, true) + _, err = w.upsertMessage(newMessage) if err != nil { - jww.ERROR.Printf("[DM indexedDB] %+v", - errors.Wrap(parentErr, err.Error())) + jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error())) return } @@ -219,7 +212,7 @@ func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID, true, false) } -// receiveWrapper is a higher-level wrapper of receiveHelper. +// receiveWrapper is a higher-level wrapper of upsertMessage. func (w *wasmModel) receiveWrapper(messageID message.ID, parentID *message.ID, nickname, data string, partnerKey, senderKey ed25519.PublicKey, dmToken uint32, codeset uint8, timestamp time.Time, round rounds.Round, mType dm.MessageType, status dm.Status) (uint64, error) { @@ -278,7 +271,7 @@ func (w *wasmModel) receiveWrapper(messageID message.ID, parentID *message.ID, n msgToInsert := buildMessage(messageID.Bytes(), parentIdBytes, textBytes, partnerKey, senderKey, timestamp, round.ID, mType, codeset, status) - uuid, err := w.receiveHelper(msgToInsert, false) + uuid, err := w.upsertMessage(msgToInsert) if err != nil { return 0, err } @@ -289,11 +282,11 @@ func (w *wasmModel) receiveWrapper(messageID message.ID, parentID *message.ID, n return uuid, nil } -// receiveHelper is a private helper for receiving any sort of message. -func (w *wasmModel) receiveHelper( - newMessage *Message, isUpdate bool) (uint64, error) { +// upsertMessage is a helper function that will update an existing record +// if Message.ID is specified. Otherwise, it will perform an insert. +func (w *wasmModel) upsertMessage(msg *Message) (uint64, error) { // Convert to jsObject - newMessageJson, err := json.Marshal(newMessage) + newMessageJson, err := json.Marshal(msg) if err != nil { return 0, errors.Errorf("Unable to marshal Message: %+v", err) } @@ -302,54 +295,15 @@ func (w *wasmModel) receiveHelper( return 0, errors.Errorf("Unable to marshal Message: %+v", err) } - // Unset the primaryKey for inserts so that it can be autopopulated and - // incremented - if !isUpdate { - messageObj.Delete("id") - } - // Store message to database - result, err := impl.Put(w.db, messageStoreName, messageObj) - // FIXME: The following is almost certainly causing a bug - // where all of our upsert operations are failing. - if err != nil && !strings.Contains(err.Error(), impl.ErrUniqueConstraint) { - // Only return non-unique constraint errors so that the case - // below this one can be hit and handle duplicate entries properly. + msgIdObj, err := impl.Put(w.db, messageStoreName, messageObj) + if err != nil { return 0, errors.Errorf("Unable to put Message: %+v", err) } - // NOTE: Sometimes the insert fails to return an error but hits a duplicate - // insert, so this fallthrough returns the UUID entry in that case. - if result.IsUndefined() { - msgID := message.ID{} - copy(msgID[:], newMessage.MessageID) - uuid, errLookup := w.msgIDLookup(msgID) - if uuid != 0 && errLookup == nil { - jww.WARN.Printf("[DM indexedDB] Result undefined, but found"+ - " duplicate? %d, %s", uuid, msgID) - return uuid, nil - } - return 0, errors.Errorf("uuid lookup failure: %+v", err) - } - uuid := uint64(result.Int()) + uuid := msgIdObj.Int() jww.DEBUG.Printf("[DM indexedDB] Successfully stored message %d", uuid) - - return uuid, nil -} - -// msgIDLookup gets the UUID of the Message with the given messageID. -func (w *wasmModel) msgIDLookup(messageID message.ID) (uint64, error) { - resultObj, err := impl.GetIndex(w.db, messageStoreName, - messageStoreMessageIndex, impl.EncodeBytes(messageID.Marshal())) - if err != nil { - return 0, err - } - - uuid := uint64(0) - if !resultObj.IsUndefined() { - uuid = uint64(resultObj.Get("id").Int()) - } - return uuid, nil + return uint64(uuid), nil } // BlockSender silences messages sent by the indicated sender diff --git a/indexedDb/impl/dm/implementation_test.go b/indexedDb/impl/dm/implementation_test.go index 3765266925985c09d4eceb22818dee68311f07ad..5936c83ba3b6537c350cec3a852f489a3911e913 100644 --- a/indexedDb/impl/dm/implementation_test.go +++ b/indexedDb/impl/dm/implementation_test.go @@ -16,8 +16,8 @@ import ( "testing" ) -func dummyReceivedMessageCB(uint64, ed25519.PublicKey, bool) {} -func dummyStoreDatabaseName(string) error { return nil } +func dummyReceivedMessageCB(uint64, ed25519.PublicKey, bool, bool) {} +func dummyStoreDatabaseName(string) error { return nil } func dummyStoreEncryptionStatus(_ string, encryptionStatus bool) (bool, error) { return encryptionStatus, nil } @@ -37,7 +37,7 @@ func TestWasmModel_BlockSender(t *testing.T) { // Insert a test convo testPubKey := ed25519.PublicKey{} - err = m.joinConversation("test", testPubKey, 0, 0) + err = m.upsertConversation("test", testPubKey, 0, 0, false) if err != nil { t.Fatal(err.Error()) } diff --git a/indexedDb/impl/utils.go b/indexedDb/impl/utils.go index 70974c5245233357f365aca05bce6626d8d4a1ba..3765cfce5e01698ca625844b5fe495906985366a 100644 --- a/indexedDb/impl/utils.go +++ b/indexedDb/impl/utils.go @@ -176,7 +176,8 @@ func GetIndex(db *idb.Database, objectStoreName, } // Put is a generic helper for putting values into the given [idb.ObjectStore]. -// Equivalent to insert if not exists else update. +// Equivalent to insert if not exists else update. Returns the primary key of +// the stored object as a js.Value. func Put(db *idb.Database, objectStoreName string, value js.Value) (js.Value, error) { // Prepare the Transaction txn, err := db.Transaction(idb.TransactionReadWrite, objectStoreName) @@ -199,7 +200,8 @@ func Put(db *idb.Database, objectStoreName string, value js.Value) (js.Value, er result, err := request.Await(ctx) cancel() if err != nil { - return js.Undefined(), errors.Errorf("Putting value failed: %+v", err) + return js.Undefined(), errors.Errorf("Putting value failed: %+v\n%s", + err, utils.JsToJson(value)) } jww.DEBUG.Printf("Successfully put value in %s: %s", objectStoreName, utils.JsToJson(value))