diff --git a/go.mod b/go.mod index ceb0ae8ab973b1b279918d710552d29f40dca37f..6b959f15b87538d451d007b230622499fb099662 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/hack-pad/go-indexeddb v0.2.0 github.com/pkg/errors v0.9.1 github.com/spf13/jwalterweatherman v1.1.0 - gitlab.com/elixxir/client/v4 v4.3.12-0.20230104212415-a5a511de981d + gitlab.com/elixxir/client/v4 v4.3.12-0.20230104224316-88578485baf0 gitlab.com/elixxir/crypto v0.0.7-0.20230104191842-814e756a51f2 gitlab.com/elixxir/primitives v0.0.3-0.20221214192222-988b44a6958a gitlab.com/xx_network/crypto v0.0.5-0.20221121220724-8eefdbb0eb46 diff --git a/go.sum b/go.sum index fa65c389a85d846eeefbfdbb3bbdfc8433149862..fae8f80a3bc30e1a08d6104e5a82f1dbbe2719e5 100644 --- a/go.sum +++ b/go.sum @@ -382,8 +382,8 @@ github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo= github.com/zeebo/pcg v1.0.1/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f h1:yXGvNBqzZwAhDYlSnxPRbgor6JWoOt1Z7s3z1O9JR40= gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f/go.mod h1:H6jztdm0k+wEV2QGK/KYA+MY9nj9Zzatux/qIvDDv3k= -gitlab.com/elixxir/client/v4 v4.3.12-0.20230104212415-a5a511de981d h1:a5fjyrFWOZqRBydR9a/eIfnoSsTPQYvdYGmgymlS6YQ= -gitlab.com/elixxir/client/v4 v4.3.12-0.20230104212415-a5a511de981d/go.mod h1:4H2aK6ciagpRk39YtZ1kxnI4+0ulxVFpi1eF/RcKOHk= +gitlab.com/elixxir/client/v4 v4.3.12-0.20230104224316-88578485baf0 h1:tmN1PDzqRpPZiI1Rwxa3KNhEE9uk54KGLkTJQvQnZY8= +gitlab.com/elixxir/client/v4 v4.3.12-0.20230104224316-88578485baf0/go.mod h1:4H2aK6ciagpRk39YtZ1kxnI4+0ulxVFpi1eF/RcKOHk= gitlab.com/elixxir/comms v0.0.4-0.20230104190642-585e834d2a73 h1:gRQTICBcP9Dr6DTZo1bw0S5Oji/eXJw6fImTtvg3Brg= gitlab.com/elixxir/comms v0.0.4-0.20230104190642-585e834d2a73/go.mod h1:aFnxDpIxEEFHdAa2dEeydzo00u/IAcfrqPSEnmeffbY= gitlab.com/elixxir/crypto v0.0.7-0.20230104191842-814e756a51f2 h1:/+uUXuy1HcAUmGsd5z9aUtfwrJtSz3mXmCTJ6ku/dKU= diff --git a/indexedDb/channels/implementation.go b/indexedDb/channels/implementation.go index 26c78490d6c522e16a3ef2df816af4f8e5af6998..6538e4f76c7d894cce5b8373364b796f78106528 100644 --- a/indexedDb/channels/implementation.go +++ b/indexedDb/channels/implementation.go @@ -11,25 +11,18 @@ package channels import ( "crypto/ed25519" - "encoding/base64" "encoding/json" - "strings" - "sync" - "syscall/js" "time" "gitlab.com/elixxir/xxdk-wasm/indexedDb" - "github.com/hack-pad/go-indexeddb/idb" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/v4/channels" "gitlab.com/elixxir/client/v4/cmix/rounds" cryptoBroadcast "gitlab.com/elixxir/crypto/broadcast" - cryptoChannel "gitlab.com/elixxir/crypto/channel" "gitlab.com/elixxir/crypto/message" - "gitlab.com/elixxir/xxdk-wasm/utils" "gitlab.com/xx_network/primitives/id" ) @@ -37,115 +30,23 @@ import ( // system passed an object that adheres to in order to get events on the // channel. type wasmModel struct { - db *idb.Database - cipher cryptoChannel.Cipher - receivedMessageCB MessageReceivedCallback - updateMux sync.Mutex -} - -// DeleteMessage removes a message with the given messageID from storage. -func (w *wasmModel) DeleteMessage(messageID message.ID) error { - msgId := js.ValueOf(base64.StdEncoding.EncodeToString(messageID.Bytes())) - return indexedDb.DeleteIndex(w.db, messageStoreName, - messageStoreMessageIndex, pkeyName, msgId) + wh *indexedDb.WorkerHandler } // JoinChannel is called whenever a channel is joined locally. func (w *wasmModel) JoinChannel(channel *cryptoBroadcast.Channel) { - parentErr := errors.New("failed to JoinChannel") - - // Build object - newChannel := Channel{ - ID: channel.ReceptionID.Marshal(), - Name: channel.Name, - Description: channel.Description, - } - - // Convert to jsObject - newChannelJson, err := json.Marshal(&newChannel) - if err != nil { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to marshal Channel: %+v", err)) - return - } - channelObj, err := utils.JsonToJS(newChannelJson) + data, err := json.Marshal(channel) if err != nil { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to marshal Channel: %+v", err)) + jww.ERROR.Printf("Could not JSON marshal broadcast.Channel: %+v", err) return } - _, err = indexedDb.Put(w.db, channelsStoreName, channelObj) - if err != nil { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to put Channel: %+v", err)) - } + w.wh.SendMessage(indexedDb.JoinChannelTag, data, nil) } // LeaveChannel is called whenever a channel is left locally. func (w *wasmModel) LeaveChannel(channelID *id.ID) { - parentErr := errors.New("failed to LeaveChannel") - - // Delete the channel from storage - err := indexedDb.Delete(w.db, channelsStoreName, - js.ValueOf(channelID.String())) - if err != nil { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to delete Channel: %+v", err)) - return - } - - // Clean up lingering data - err = w.deleteMsgByChannel(channelID) - if err != nil { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Deleting Channel's Message data failed: %+v", err)) - return - } - jww.DEBUG.Printf("Successfully deleted channel: %s", channelID) -} - -// deleteMsgByChannel is a private helper that uses messageStoreChannelIndex -// to delete all Message with the given Channel ID. -func (w *wasmModel) deleteMsgByChannel(channelID *id.ID) error { - parentErr := errors.New("failed to deleteMsgByChannel") - - // Prepare the Transaction - txn, err := w.db.Transaction(idb.TransactionReadWrite, messageStoreName) - if err != nil { - return errors.WithMessagef(parentErr, - "Unable to create Transaction: %+v", err) - } - store, err := txn.ObjectStore(messageStoreName) - if err != nil { - return errors.WithMessagef(parentErr, - "Unable to get ObjectStore: %+v", err) - } - index, err := store.Index(messageStoreChannelIndex) - if err != nil { - return errors.WithMessagef(parentErr, - "Unable to get Index: %+v", err) - } - - // Perform the operation - channelIdStr := base64.StdEncoding.EncodeToString(channelID.Marshal()) - keyRange, err := idb.NewKeyRangeOnly(js.ValueOf(channelIdStr)) - cursorRequest, err := index.OpenCursorRange(keyRange, idb.CursorNext) - if err != nil { - return errors.WithMessagef(parentErr, "Unable to open Cursor: %+v", err) - } - ctx, cancel := indexedDb.NewContext() - err = cursorRequest.Iter(ctx, - func(cursor *idb.CursorWithValue) error { - _, err := cursor.Delete() - return err - }) - cancel() - if err != nil { - return errors.WithMessagef(parentErr, - "Unable to delete Message data: %+v", err) - } - return nil + w.wh.SendMessage(indexedDb.LeaveChannelTag, channelID.Marshal(), nil) } // ReceiveMessage is called whenever a message is received on a given channel. @@ -156,30 +57,58 @@ func (w *wasmModel) ReceiveMessage(channelID *id.ID, messageID message.ID, nickname, text string, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8, timestamp time.Time, lease time.Duration, round rounds.Round, mType channels.MessageType, status channels.SentStatus, hidden bool) uint64 { - textBytes := []byte(text) - var err error + msg := channels.ModelMessage{ + Nickname: nickname, + MessageID: messageID, + ChannelID: channelID, + Timestamp: timestamp, + Lease: lease, + Status: status, + Hidden: hidden, + Pinned: false, + Content: []byte(text), + Type: mType, + Round: round.ID, + PubKey: pubKey, + CodesetVersion: codeset, + DmToken: dmToken, + } - // Handle encryption, if it is present - if w.cipher != nil { - textBytes, err = w.cipher.Encrypt([]byte(text)) - if err != nil { - jww.ERROR.Printf("Failed to encrypt Message: %+v", err) - return 0 - } + data, err := json.Marshal(msg) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal payload for ReceiveMessage: %+v", err) + return 0 } - msgToInsert := buildMessage( - channelID.Marshal(), messageID.Bytes(), nil, nickname, - textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType, - false, hidden, status) + uuidChan := make(chan uint64) + w.wh.SendMessage(indexedDb.ReceiveMessageTag, data, func(data []byte) { + var uuid uint64 + err = json.Unmarshal(data, &uuid) + if err != nil { + jww.ERROR.Printf( + "Could not JSON unmarshal response to ReceiveMessage: %+v", err) + uuidChan <- 0 + } + uuidChan <- uuid + }) - uuid, err := w.receiveHelper(msgToInsert, false) - if err != nil { - jww.ERROR.Printf("Failed to receive Message: %+v", err) + select { + case uuid := <-uuidChan: + return uuid + case <-time.After(indexedDb.ResponseTimeout): + jww.ERROR.Printf("Timed out after %s waiting for response from the "+ + "worker about ReceiveMessage", indexedDb.ResponseTimeout) } - go w.receivedMessageCB(uuid, channelID, false) - return uuid + return 0 +} + +// ReceiveReplyMessage is JSON marshalled and sent to the worker for +// [wasmModel.ReceiveReply]. +type ReceiveReplyMessage struct { + ReactionTo message.ID `json:"replyTo"` + channels.ModelMessage `json:"message"` } // ReceiveReply is called whenever a message is received that is a reply on a @@ -193,29 +122,54 @@ func (w *wasmModel) ReceiveReply(channelID *id.ID, messageID, dmToken uint32, codeset uint8, timestamp time.Time, lease time.Duration, round rounds.Round, mType channels.MessageType, status channels.SentStatus, hidden bool) uint64 { - textBytes := []byte(text) - var err error + msg := ReceiveReplyMessage{ + ReactionTo: replyTo, + ModelMessage: channels.ModelMessage{ + Nickname: nickname, + MessageID: messageID, + ChannelID: channelID, + Timestamp: timestamp, + Lease: lease, + Status: status, + Hidden: hidden, + Pinned: false, + Content: []byte(text), + Type: mType, + Round: round.ID, + PubKey: pubKey, + CodesetVersion: codeset, + DmToken: dmToken, + }, + } + + data, err := json.Marshal(msg) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal payload for ReceiveReply: %+v", err) + return 0 + } - // Handle encryption, if it is present - if w.cipher != nil { - textBytes, err = w.cipher.Encrypt([]byte(text)) + uuidChan := make(chan uint64) + w.wh.SendMessage(indexedDb.ReceiveReplyTag, data, func(data []byte) { + var uuid uint64 + err = json.Unmarshal(data, &uuid) if err != nil { - jww.ERROR.Printf("Failed to encrypt Message: %+v", err) - return 0 + jww.ERROR.Printf( + "Could not JSON unmarshal response to ReceiveReply: %+v", err) + uuidChan <- 0 } - } - - msgToInsert := buildMessage(channelID.Marshal(), messageID.Bytes(), - replyTo.Bytes(), nickname, textBytes, pubKey, dmToken, codeset, - timestamp, lease, round.ID, mType, hidden, false, status) - - uuid, err := w.receiveHelper(msgToInsert, false) + uuidChan <- uuid + }) - if err != nil { - jww.ERROR.Printf("Failed to receive reply: %+v", err) + select { + case uuid := <-uuidChan: + return uuid + case <-time.After(indexedDb.ResponseTimeout): + jww.ERROR.Printf("Timed out after %s waiting for response from the "+ + "worker about ReceiveReply", indexedDb.ResponseTimeout) } - go w.receivedMessageCB(uuid, channelID, false) - return uuid + + return 0 } // ReceiveReaction is called whenever a reaction to a message is received on a @@ -229,68 +183,79 @@ func (w *wasmModel) ReceiveReaction(channelID *id.ID, messageID, dmToken uint32, codeset uint8, timestamp time.Time, lease time.Duration, round rounds.Round, mType channels.MessageType, status channels.SentStatus, hidden bool) uint64 { - textBytes := []byte(reaction) - var err error - // Handle encryption, if it is present - if w.cipher != nil { - textBytes, err = w.cipher.Encrypt([]byte(reaction)) - if err != nil { - jww.ERROR.Printf("Failed to encrypt Message: %+v", err) - return 0 - } + msg := ReceiveReplyMessage{ + ReactionTo: reactionTo, + ModelMessage: channels.ModelMessage{ + Nickname: nickname, + MessageID: messageID, + ChannelID: channelID, + Timestamp: timestamp, + Lease: lease, + Status: status, + Hidden: hidden, + Pinned: false, + Content: []byte(reaction), + Type: mType, + Round: round.ID, + PubKey: pubKey, + CodesetVersion: codeset, + DmToken: dmToken, + }, + } + + data, err := json.Marshal(msg) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal payload for ReceiveReaction: %+v", err) + return 0 } - msgToInsert := buildMessage( - channelID.Marshal(), messageID.Bytes(), reactionTo.Bytes(), nickname, - textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType, - false, hidden, status) + uuidChan := make(chan uint64) + w.wh.SendMessage(indexedDb.ReceiveReactionTag, data, func(data []byte) { + var uuid uint64 + err = json.Unmarshal(data, &uuid) + if err != nil { + jww.ERROR.Printf( + "Could not JSON unmarshal response to ReceiveReaction: %+v", err) + uuidChan <- 0 + } + uuidChan <- uuid + }) - uuid, err := w.receiveHelper(msgToInsert, false) - if err != nil { - jww.ERROR.Printf("Failed to receive reaction: %+v", err) + select { + case uuid := <-uuidChan: + return uuid + case <-time.After(indexedDb.ResponseTimeout): + jww.ERROR.Printf("Timed out after %s waiting for response from the "+ + "worker about ReceiveReply", indexedDb.ResponseTimeout) } - go w.receivedMessageCB(uuid, channelID, false) - return uuid + + return 0 } -// UpdateFromMessageID is called whenever a message with the message ID is -// modified. -// -// The API needs to return the UUID of the modified message that can be -// referenced at a later time. -// -// timestamp, round, pinned, and hidden are all nillable and may be updated -// based upon the UUID at a later date. If a nil value is passed, then make -// no update. -func (w *wasmModel) UpdateFromMessageID(messageID message.ID, - timestamp *time.Time, round *rounds.Round, pinned, hidden *bool, - status *channels.SentStatus) uint64 { - parentErr := errors.New("failed to UpdateFromMessageID") +// MessageUpdateInfo is JSON marshalled and sent to the worker for +// [wasmModel.UpdateFromMessageID] and [wasmModel.UpdateFromUUID]. +type MessageUpdateInfo struct { + UUID uint64 `json:"uuid"` - // FIXME: this is a bit of race condition without the mux. - // This should be done via the transactions (i.e., make a - // special version of receiveHelper) - w.updateMux.Lock() - defer w.updateMux.Unlock() + MessageID message.ID `json:"messageID"` + MessageIDSet bool `json:"messageIDSet"` - msgIDStr := base64.StdEncoding.EncodeToString(messageID.Marshal()) - currentMsgObj, err := indexedDb.GetIndex(w.db, messageStoreName, - messageStoreMessageIndex, js.ValueOf(msgIDStr)) - if err != nil { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Failed to get message by index: %+v", err)) - return 0 - } + Timestamp time.Time `json:"timestamp"` + TimestampSet bool `json:"timestampSet"` - currentMsg := utils.JsToJson(currentMsgObj) - uuid, err := w.updateMessage(currentMsg, &messageID, timestamp, - round, pinned, hidden, status) - if err != nil { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to updateMessage: %+v", err)) - } - return uuid + RoundID id.Round `json:"round"` + RoundIDSet bool `json:"roundIDSet"` + + Pinned bool `json:"pinned"` + PinnedSet bool `json:"pinnedSet"` + + Hidden bool `json:"hidden"` + HiddenSet bool `json:"hiddenSet"` + + Status channels.SentStatus `json:"status"` + StatusSet bool `json:"statusSet"` } // UpdateFromUUID is called whenever a message at the UUID is modified. @@ -301,215 +266,157 @@ func (w *wasmModel) UpdateFromMessageID(messageID message.ID, func (w *wasmModel) UpdateFromUUID(uuid uint64, messageID *message.ID, timestamp *time.Time, round *rounds.Round, pinned, hidden *bool, status *channels.SentStatus) { - parentErr := errors.New("failed to UpdateFromUUID") - - // FIXME: this is a bit of race condition without the mux. - // This should be done via the transactions (i.e., make a - // special version of receiveHelper) - w.updateMux.Lock() - defer w.updateMux.Unlock() - - // Convert messageID to the key generated by json.Marshal - key := js.ValueOf(uuid) - - // Use the key to get the existing Message - currentMsg, err := indexedDb.Get(w.db, messageStoreName, key) - if err != nil { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to get message: %+v", err)) - return - } - - _, err = w.updateMessage(utils.JsToJson(currentMsg), messageID, timestamp, - round, pinned, hidden, status) - if err != nil { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to updateMessage: %+v", err)) - } -} - -// updateMessage is a helper for updating a stored message. -func (w *wasmModel) updateMessage(currentMsgJson string, messageID *message.ID, - timestamp *time.Time, round *rounds.Round, pinned, hidden *bool, - status *channels.SentStatus) (uint64, error) { - - newMessage := &Message{} - err := json.Unmarshal([]byte(currentMsgJson), newMessage) - if err != nil { - return 0, err - } - - if status != nil { - newMessage.Status = uint8(*status) - } + msg := MessageUpdateInfo{UUID: uuid} if messageID != nil { - newMessage.MessageID = messageID.Bytes() - } - - if round != nil { - newMessage.Round = uint64(round.ID) + msg.MessageID = *messageID + msg.MessageIDSet = true } - if timestamp != nil { - newMessage.Timestamp = *timestamp + msg.Timestamp = *timestamp + msg.TimestampSet = true + } + if round != nil { + msg.RoundID = round.ID + msg.RoundIDSet = true } - if pinned != nil { - newMessage.Pinned = *pinned + msg.Pinned = *pinned + msg.PinnedSet = true } - if hidden != nil { - newMessage.Hidden = *hidden + msg.Hidden = *hidden + msg.HiddenSet = true + } + if status != nil { + msg.Status = *status + msg.StatusSet = true } - // Store the updated Message - uuid, err := w.receiveHelper(newMessage, true) + data, err := json.Marshal(msg) if err != nil { - return 0, err + jww.ERROR.Printf( + "Could not JSON marshal payload for UpdateFromUUID: %+v", err) + return } - channelID := &id.ID{} - copy(channelID[:], newMessage.ChannelID) - go w.receivedMessageCB(uuid, channelID, true) - return uuid, nil + w.wh.SendMessage(indexedDb.UpdateFromUUIDTag, data, nil) } -// buildMessage is a private helper that converts typical [channels.EventModel] -// inputs into a basic Message structure for insertion into storage. +// UpdateFromMessageID is called whenever a message with the message ID is +// modified. // -// NOTE: ID is not set inside this function because we want to use the -// autoincrement key by default. If you are trying to overwrite an existing -// message, then you need to set it manually yourself. -func buildMessage(channelID, messageID, parentID []byte, nickname string, - text []byte, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8, - timestamp time.Time, lease time.Duration, round id.Round, - mType channels.MessageType, pinned, hidden bool, - status channels.SentStatus) *Message { - return &Message{ - MessageID: messageID, - Nickname: nickname, - ChannelID: channelID, - ParentMessageID: parentID, - Timestamp: timestamp, - Lease: lease, - Status: uint8(status), - Hidden: hidden, - Pinned: pinned, - Text: text, - Type: uint16(mType), - Round: uint64(round), - // User Identity Info - Pubkey: pubKey, - DmToken: dmToken, - CodesetVersion: codeset, - } -} +// The API needs to return the UUID of the modified message that can be +// referenced at a later time. +// +// timestamp, round, pinned, and hidden are all nillable and may be updated +// based upon the UUID at a later date. If a nil value is passed, then make +// no update. +func (w *wasmModel) UpdateFromMessageID(messageID message.ID, + timestamp *time.Time, round *rounds.Round, pinned, hidden *bool, + status *channels.SentStatus) uint64 { -// receiveHelper is a private helper for receiving any sort of message. -func (w *wasmModel) receiveHelper( - newMessage *Message, isUpdate bool) (uint64, error) { - // Convert to jsObject - newMessageJson, err := json.Marshal(newMessage) - if err != nil { - return 0, errors.Errorf("Unable to marshal Message: %+v", err) + msg := MessageUpdateInfo{MessageID: messageID, MessageIDSet: true} + if timestamp != nil { + msg.Timestamp = *timestamp + msg.TimestampSet = true } - messageObj, err := utils.JsonToJS(newMessageJson) - if err != nil { - return 0, errors.Errorf("Unable to marshal Message: %+v", err) + if round != nil { + msg.RoundID = round.ID + msg.RoundIDSet = true } - - // Unset the primaryKey for inserts so that it can be auto-populated and - // incremented - if !isUpdate { - messageObj.Delete("id") + if pinned != nil { + msg.Pinned = *pinned + msg.PinnedSet = true + } + if hidden != nil { + msg.Hidden = *hidden + msg.HiddenSet = true + } + if status != nil { + msg.Status = *status + msg.StatusSet = true } - // Store message to database - result, err := indexedDb.Put(w.db, messageStoreName, messageObj) - if err != nil && !strings.Contains(err.Error(), - "at least one key does not satisfy the uniqueness requirements") { - // Only return non-unique constraint errors so that the case - // below this one can be hit and handle duplicate entries properly. - return 0, errors.Errorf("Unable to put Message: %+v", err) + data, err := json.Marshal(msg) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal payload for UpdateFromMessageID: %+v", err) + return 0 } - // NOTE: Sometimes the insert fails to return an error but hits a duplicate - // insert, so this fallthrough returns the UUID entry in that case. - if result.IsUndefined() { - msgID := message.ID{} - copy(msgID[:], newMessage.MessageID) - msg, errLookup := w.msgIDLookup(msgID) - if errLookup == nil && msg.ID != 0 { - return msg.ID, nil + uuidChan := make(chan uint64) + w.wh.SendMessage(indexedDb.UpdateFromMessageIDTag, data, func(data []byte) { + var uuid uint64 + err = json.Unmarshal(data, &uuid) + if err != nil { + jww.ERROR.Printf( + "Could not JSON unmarshal response to UpdateFromMessageID: %+v", err) + uuidChan <- 0 } - return 0, errors.Errorf("uuid lookup failure: %+v", err) + uuidChan <- uuid + }) + + select { + case uuid := <-uuidChan: + return uuid + case <-time.After(indexedDb.ResponseTimeout): + jww.ERROR.Printf("Timed out after %s waiting for response from the "+ + "worker about ReceiveReply", indexedDb.ResponseTimeout) } - uuid := uint64(result.Int()) - jww.DEBUG.Printf("Successfully stored message %d", uuid) - return uuid, nil + return 0 +} + +// GetMessageMessage is JSON marshalled and sent to the worker for +// [wasmModel.GetMessage]. +type GetMessageMessage struct { + Message channels.ModelMessage `json:"message"` + Error string `json:"error"` } // GetMessage returns the message with the given [channel.MessageID]. func (w *wasmModel) GetMessage( messageID message.ID) (channels.ModelMessage, error) { - lookupResult, err := w.msgIDLookup(messageID) - if err != nil { - return channels.ModelMessage{}, err - } - - var channelId *id.ID - if lookupResult.ChannelID != nil { - channelId, err = id.Unmarshal(lookupResult.ChannelID) + msgChan := make(chan GetMessageMessage) + w.wh.SendMessage(indexedDb.GetMessageTag, messageID.Marshal(), func(data []byte) { + var msg GetMessageMessage + err := json.Unmarshal(data, &msg) if err != nil { - return channels.ModelMessage{}, err + jww.ERROR.Printf( + "Could not JSON unmarshal response to GetMessage: %+v", err) } - } + msgChan <- msg + }) - var parentMsgId message.ID - if lookupResult.ParentMessageID != nil { - parentMsgId, err = message.UnmarshalID(lookupResult.ParentMessageID) - if err != nil { - return channels.ModelMessage{}, err + select { + case msg := <-msgChan: + if msg.Error != "" { + return channels.ModelMessage{}, errors.New(msg.Error) } + return msg.Message, nil + case <-time.After(indexedDb.ResponseTimeout): + return channels.ModelMessage{}, errors.Errorf("timed out after %s "+ + "waiting for response from the worker about GetMessage", + indexedDb.ResponseTimeout) } - - return channels.ModelMessage{ - UUID: lookupResult.ID, - Nickname: lookupResult.Nickname, - MessageID: messageID, - ChannelID: channelId, - ParentMessageID: parentMsgId, - Timestamp: lookupResult.Timestamp, - Lease: lookupResult.Lease, - Status: channels.SentStatus(lookupResult.Status), - Hidden: lookupResult.Hidden, - Pinned: lookupResult.Pinned, - Content: lookupResult.Text, - Type: channels.MessageType(lookupResult.Type), - Round: id.Round(lookupResult.Round), - PubKey: lookupResult.Pubkey, - CodesetVersion: lookupResult.CodesetVersion, - }, nil } -// msgIDLookup gets the UUID of the Message with the given messageID. -func (w *wasmModel) msgIDLookup(messageID message.ID) (*Message, error) { - msgIDStr := js.ValueOf(base64.StdEncoding.EncodeToString(messageID.Bytes())) - resultObj, err := indexedDb.GetIndex(w.db, messageStoreName, - messageStoreMessageIndex, msgIDStr) - if err != nil { - return nil, err - } else if resultObj.IsUndefined() { - return nil, errors.Errorf("no message for %s found", msgIDStr) - } +// DeleteMessage removes a message with the given messageID from storage. +func (w *wasmModel) DeleteMessage(messageID message.ID) error { + errChan := make(chan error) + w.wh.SendMessage(indexedDb.DeleteMessageTag, messageID.Marshal(), func(data []byte) { + if data != nil { + errChan <- errors.New(string(data)) + } else { + errChan <- nil + } + }) - // Process result into string - resultMsg := &Message{} - err = json.Unmarshal([]byte(utils.JsToJson(resultObj)), resultMsg) - if err != nil { - return nil, err + select { + case err := <-errChan: + return err + case <-time.After(indexedDb.ResponseTimeout): + return errors.Errorf("timed out after %s waiting for response from "+ + "the worker about DeleteMessage", indexedDb.ResponseTimeout) } - return resultMsg, nil - } diff --git a/indexedDb/channels/init.go b/indexedDb/channels/init.go index 12d6bb8840f3d75c8778b871ac883a84ad008ada..292a596148035af097b19a3e1f1909938251268e 100644 --- a/indexedDb/channels/init.go +++ b/indexedDb/channels/init.go @@ -10,7 +10,7 @@ package channels import ( - "github.com/hack-pad/go-indexeddb/idb" + "encoding/json" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/v4/channels" @@ -18,18 +18,12 @@ import ( "gitlab.com/elixxir/xxdk-wasm/indexedDb" "gitlab.com/elixxir/xxdk-wasm/storage" "gitlab.com/xx_network/primitives/id" - "syscall/js" + "time" ) -const ( - // databaseSuffix is the suffix to be appended to the name of - // the database. - databaseSuffix = "_speakeasy" - - // currentVersion is the current version of the IndexDb - // runtime. Used for migration purposes. - currentVersion uint = 1 -) +// WorkerJavascriptFileURL is the URL of the script the worker will execute to +// launch the worker WASM binary. It must obey the same-origin policy. +const WorkerJavascriptFileURL = "/integrations/assets/indexedDbWorker.js" // MessageReceivedCallback is called any time a message is received or updated. // @@ -47,172 +41,134 @@ func NewWASMEventModelBuilder(encryption cryptoChannel.Cipher, return fn } +// NewWASMEventModelMessage is JSON marshalled and sent to the worker for +// [NewWASMEventModel]. +type NewWASMEventModelMessage struct { + Path string `json:"path"` + EncryptionJSON string `json:"encryptionJSON"` +} + // NewWASMEventModel returns a [channels.EventModel] backed by a wasmModel. // The name should be a base64 encoding of the users public key. func NewWASMEventModel(path string, encryption cryptoChannel.Cipher, cb MessageReceivedCallback) (channels.EventModel, error) { - databaseName := path + databaseSuffix - return newWASMModel(databaseName, encryption, cb) -} -// newWASMModel creates the given [idb.Database] and returns a wasmModel. -func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, - cb MessageReceivedCallback) (*wasmModel, error) { - // Attempt to open database object - ctx, cancel := indexedDb.NewContext() - defer cancel() - openRequest, err := idb.Global().Open(ctx, databaseName, currentVersion, - func(db *idb.Database, oldVersion, newVersion uint) error { - if oldVersion == newVersion { - jww.INFO.Printf("IndexDb version is current: v%d", - newVersion) - return nil - } - - jww.INFO.Printf("IndexDb upgrade required: v%d -> v%d", - oldVersion, newVersion) - - if oldVersion == 0 && newVersion >= 1 { - err := v1Upgrade(db) - if err != nil { - return err - } - oldVersion = 1 - } - - // if oldVersion == 1 && newVersion >= 2 { v2Upgrade(), oldVersion = 2 } - return nil - }) + // TODO: bring in URL and name from caller + wh, err := indexedDb.NewWorkerHandler( + WorkerJavascriptFileURL, "indexedDbWorker") if err != nil { return nil, err } - // Wait for database open to finish - db, err := openRequest.Await(ctx) - if err != nil { - return nil, err - } + // Register handler to manage messages for the MessageReceivedCallback + wh.RegisterHandler(indexedDb.GetMessageTag, indexedDb.InitID, false, + messageReceivedCallbackHandler(cb)) - // Save the encryption status to storage - encryptionStatus := encryption != nil - loadedEncryptionStatus, err := storage.StoreIndexedDbEncryptionStatus( - databaseName, encryptionStatus) + // Register handler to manage checking encryption status from local storage + wh.RegisterHandler(indexedDb.EncryptionStatusTag, indexedDb.InitID, false, + checkDbEncryptionStatusHandler(wh)) + + encryptionJSON, err := json.Marshal(encryption) if err != nil { return nil, err } - // Verify encryption status does not change - if encryptionStatus != loadedEncryptionStatus { - return nil, errors.New( - "Cannot load database with different encryption status.") - } else if !encryptionStatus { - jww.WARN.Printf("IndexedDb encryption disabled!") + message := NewWASMEventModelMessage{ + Path: path, + EncryptionJSON: string(encryptionJSON), } - // Attempt to ensure the database has been properly initialized - openRequest, err = idb.Global().Open(ctx, databaseName, currentVersion, - func(db *idb.Database, oldVersion, newVersion uint) error { - return nil - }) + payload, err := json.Marshal(message) if err != nil { return nil, err } - // Wait for database open to finish - db, err = openRequest.Await(ctx) - if err != nil { - return nil, err - } - wrapper := &wasmModel{db: db, receivedMessageCB: cb, cipher: encryption} - return wrapper, nil -} + errChan := make(chan string) + wh.SendMessage(indexedDb.NewWASMEventModelTag, payload, func(data []byte) { + errChan <- string(data) + }) -// v1Upgrade performs the v0 -> v1 database upgrade. -// -// This can never be changed without permanently breaking backwards -// compatibility. -func v1Upgrade(db *idb.Database) error { - storeOpts := idb.ObjectStoreOptions{ - KeyPath: js.ValueOf(pkeyName), - AutoIncrement: true, - } - indexOpts := idb.IndexOptions{ - Unique: false, - MultiEntry: false, + select { + case workerErr := <-errChan: + if workerErr != "" { + return nil, errors.New(workerErr) + } + case <-time.After(indexedDb.ResponseTimeout): + return nil, errors.Errorf("timed out after %s waiting for indexedDB "+ + "database in worker to intialize", indexedDb.ResponseTimeout) } - // Build Message ObjectStore and Indexes - messageStore, err := db.CreateObjectStore(messageStoreName, storeOpts) - if err != nil { - return err - } - _, err = messageStore.CreateIndex(messageStoreMessageIndex, - js.ValueOf(messageStoreMessage), - idb.IndexOptions{ - Unique: true, - MultiEntry: false, - }) - if err != nil { - return err - } - _, err = messageStore.CreateIndex(messageStoreChannelIndex, - js.ValueOf(messageStoreChannel), indexOpts) - if err != nil { - return err - } - _, err = messageStore.CreateIndex(messageStoreParentIndex, - js.ValueOf(messageStoreParent), indexOpts) - if err != nil { - return err - } - _, err = messageStore.CreateIndex(messageStoreTimestampIndex, - js.ValueOf(messageStoreTimestamp), indexOpts) - if err != nil { - return err - } - _, err = messageStore.CreateIndex(messageStorePinnedIndex, - js.ValueOf(messageStorePinned), indexOpts) - if err != nil { - return err - } + return &wasmModel{wh}, nil +} - // Build Channel ObjectStore - _, err = db.CreateObjectStore(channelsStoreName, storeOpts) - if err != nil { - return err - } +// MessageReceivedCallbackMessage is JSON marshalled and received from the +// worker for the [MessageReceivedCallback] callback. +type MessageReceivedCallbackMessage struct { + UUID uint64 `json:"uuid"` + ChannelID *id.ID `json:"channelID"` + Update bool `json:"update"` +} - // Get the database name and save it to storage - if databaseName, err := db.Name(); err != nil { - return err - } else if err = storage.StoreIndexedDb(databaseName); err != nil { - return err +// messageReceivedCallbackHandler returns a handler to manage messages for the +// MessageReceivedCallback. +func messageReceivedCallbackHandler(cb MessageReceivedCallback) func(data []byte) { + return func(data []byte) { + var msg MessageReceivedCallbackMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Failed to JSON unmarshal "+ + "MessageReceivedCallback message from worker: %+v", err) + return + } + cb(msg.UUID, msg.ChannelID, msg.Update) } +} - return nil +// EncryptionStatusMessage is JSON marshalled and received from the worker when +// the database checks if it is encrypted. +type EncryptionStatusMessage struct { + DatabaseName string `json:"databaseName"` + EncryptionStatus bool `json:"encryptionStatus"` } -// hackTestDb is a horrible function that exists as the result of an extremely -// long discussion about why initializing the IndexedDb sometimes silently -// fails. It ultimately tries to prevent an unrecoverable situation by actually -// inserting some nonsense data and then checking to see if it persists. -// If this function still exists in 2023, god help us all. Amen. -func (w *wasmModel) hackTestDb() error { - testMessage := &Message{ - ID: 0, - Nickname: "test", - MessageID: id.DummyUser.Marshal(), - } - msgId, helper := w.receiveHelper(testMessage, false) - if helper != nil { - return helper - } - result, err := indexedDb.Get(w.db, messageStoreName, js.ValueOf(msgId)) - if err != nil { - return err - } - if result.IsUndefined() { - return errors.Errorf("Failed to test db, record not present") +// EncryptionStatusReply is JSON marshalled and sent to the worker is response +// to the [EncryptionStatusMessage]. +type EncryptionStatusReply struct { + EncryptionStatus bool `json:"encryptionStatus"` + Error string `json:"error"` +} + +// checkDbEncryptionStatusHandler returns a handler to manage checking +// encryption status from local storage. +func checkDbEncryptionStatusHandler(wh *indexedDb.WorkerHandler) func(data []byte) { + return func(data []byte) { + // Unmarshal received message + var msg EncryptionStatusMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Failed to JSON unmarshal "+ + "EncryptionStatusMessage message from worker: %+v", err) + return + } + + // Pass message values to storage + loadedEncryptionStatus, err := storage.StoreIndexedDbEncryptionStatus( + msg.DatabaseName, msg.EncryptionStatus) + var reply EncryptionStatusReply + if err != nil { + reply.Error = err.Error() + } else { + reply.EncryptionStatus = loadedEncryptionStatus + } + + // Return response + statusData, err := json.Marshal(reply) + if err != nil { + jww.ERROR.Printf( + "Failed to JSON marshal EncryptionStatusReply: %+v", err) + return + } + + wh.SendMessage(indexedDb.EncryptionStatusTag, statusData, nil) } - return nil } diff --git a/indexedDb/tag.go b/indexedDb/tag.go new file mode 100644 index 0000000000000000000000000000000000000000..205da10c49542681c8793bc0acc49e29a9054676 --- /dev/null +++ b/indexedDb/tag.go @@ -0,0 +1,48 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package indexedDb + +// Tag describes how a message sent to or from the worker should be handled. +type Tag string + +// List of tags that can be used when sending a message or registering a handler +// to receive a message. +const ( + ReadyTag Tag = "Ready" + + NewWASMEventModelTag Tag = "NewWASMEventModel" + EncryptionStatusTag Tag = "EncryptionStatus" + StoreDatabaseNameTag Tag = "StoreDatabaseName" + + JoinChannelTag Tag = "JoinChannel" + LeaveChannelTag Tag = "LeaveChannel" + ReceiveMessageTag Tag = "ReceiveMessage" + ReceiveReplyTag Tag = "ReceiveReply" + ReceiveReactionTag Tag = "ReceiveReaction" + UpdateFromUUIDTag Tag = "UpdateFromUUID" + UpdateFromMessageIDTag Tag = "UpdateFromMessageID" + GetMessageTag Tag = "GetMessage" + DeleteMessageTag Tag = "DeleteMessage" +) + +// deleteAfterReceiving is a list of Tags that will have their handler deleted +// after a message is received. This is mainly used for responses where the +// handler will only handle it once and never again. +var deleteAfterReceiving = map[Tag]struct{}{ + ReadyTag: {}, + NewWASMEventModelTag: {}, + EncryptionStatusTag: {}, + JoinChannelTag: {}, + LeaveChannelTag: {}, + ReceiveMessageTag: {}, + ReceiveReplyTag: {}, + ReceiveReactionTag: {}, + UpdateFromUUIDTag: {}, + UpdateFromMessageIDTag: {}, + DeleteMessageTag: {}, +} diff --git a/indexedDb/worker.go b/indexedDb/worker.go new file mode 100644 index 0000000000000000000000000000000000000000..2b40fdf24f4ef69d39847aee8fb86f2714c0cde0 --- /dev/null +++ b/indexedDb/worker.go @@ -0,0 +1,292 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +//go:build js && wasm + +package indexedDb + +import ( + "encoding/json" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/xxdk-wasm/utils" + "sync" + "syscall/js" + "time" +) + +// TODO: +// 1. Fix ID counter +// 2. Use transfer list when sending + +// InitID is the ID for the first item in the handler list. If the list only +// contains one handler, then this is the ID of that handler. If the list has +// autogenerated unique IDs, this is the initial ID to start at. +const InitID = uint64(0) + +// Response timeouts. +const ( + // workerInitialConnectionTimeout is the time to wait to receive initial + // contact from a new worker before timing out. + workerInitialConnectionTimeout = 2 * time.Second + + // ResponseTimeout is the general time to wait after sending a message to + // receive a response before timing out. + ResponseTimeout = 25 * time.Millisecond +) + +// HandlerFn is the function that handles incoming data from the worker. +type HandlerFn func(data []byte) + +// WorkerHandler manages the handling of messages received from the worker. +type WorkerHandler struct { + // worker is the Worker Javascript object. + // Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker + worker js.Value + + // handlers are a list of handlers that handle a specific message received + // from the worker. Each handler is keyed on a tag specifying how the + // received message should be handled. If the message is a reply to a + // message sent to the worker, then the handler is also keyed on a unique + // ID. If the message is not a reply, then it appears on InitID. + handlers map[Tag]map[uint64]HandlerFn + + // idCount tracks the newest ID to assign to new handlers. + idCount uint64 + + mux sync.Mutex +} + +// WorkerMessage is the outer message that contains the contents of each message +// sent to the worker. It is transmitted as JSON. +type WorkerMessage struct { + Tag Tag `json:"tag"` + ID uint64 `json:"id"` + Data []byte `json:"data"` +} + +// NewWorkerHandler generates a new WorkerHandler. This functions will only +// return once communication with the worker has been established. +func NewWorkerHandler(aURL, name string) (*WorkerHandler, error) { + // Create new worker options with the given name + opts := newWorkerOptions("", "", name) + + wh := &WorkerHandler{ + worker: js.Global().Get("Worker").New(aURL, opts), + handlers: make(map[Tag]map[uint64]HandlerFn), + idCount: InitID, + } + + // Register listeners on the Javascript worker object that receive messages + // and errors from the worker + wh.addEventListeners() + + // Register a handler that will receive initial message from worker + // indicating that it is ready + ready := make(chan struct{}) + wh.RegisterHandler( + ReadyTag, InitID, false, func([]byte) { ready <- struct{}{} }) + + // Wait for the ready signal from the worker + select { + case <-ready: + case <-time.After(workerInitialConnectionTimeout): + return nil, errors.Errorf("timed out after %s waiting for initial "+ + "message from worker", workerInitialConnectionTimeout) + } + + return wh, nil +} + +// SendMessage sends a message to the worker with the given tag. If a reception +// handler is specified, then the message is given a unique ID to handle the +// reply. Set receptionHandler to nil if no reply is expected. +func (wh *WorkerHandler) SendMessage( + tag Tag, data []byte, receptionHandler HandlerFn) { + var id uint64 + if receptionHandler != nil { + id = wh.RegisterHandler(tag, 0, true, receptionHandler) + } + + message := WorkerMessage{ + Tag: tag, + ID: id, + Data: data, + } + payload, err := json.Marshal(message) + if err != nil { + jww.FATAL.Panicf( + "Failed to marshal payload for %q going to worker: %+v", tag, err) + } + + go wh.postMessage(string(payload)) +} + +// RegisterHandler registers the handler for the given tag and ID unless autoID +// is true, in which case a unique ID is used. Returns the ID that was +// registered. If a previous handler was registered, it is overwritten. +// This function is thread safe. +func (wh *WorkerHandler) RegisterHandler( + tag Tag, id uint64, autoID bool, handler HandlerFn) uint64 { + wh.mux.Lock() + defer wh.mux.Unlock() + + // FIXME: This ID system only really works correctly when used with a + // single tag. This should probably be fixed. + if autoID { + id = wh.getNextID() + } + + if _, exists := wh.handlers[tag]; !exists { + wh.handlers[tag] = make(map[uint64]HandlerFn) + } + wh.handlers[tag][id] = handler + + return id +} + +// getNextID returns the next unique ID. This function is not thread-safe. +func (wh *WorkerHandler) getNextID() uint64 { + id := wh.idCount + wh.idCount++ + return id +} + +// addEventListeners adds the event listeners needed to receive a message from +// the worker. Two listeners were added; one to receive messages from the worker +// and the other to receive errors. +func (wh *WorkerHandler) addEventListeners() { + // Create a listener for when the message event is fired on the worker. This + // occurs when a message is received from the worker. + // Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/message_event + messageEvent := js.FuncOf(func(this js.Value, args []js.Value) any { + err := wh.receiveMessage([]byte(args[0].Get("data").String())) + if err != nil { + jww.ERROR.Printf("Failed to receive message from worker: %+v", err) + } + return nil + }) + + // Create listener for when a messageerror event is fired on the worker. + // This occurs when it receives a message that can't be deserialized. + // Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/messageerror_event + messageError := js.FuncOf(func(this js.Value, args []js.Value) any { + event := args[0] + jww.ERROR.Printf( + "Error receiving message from worker: %s", utils.JsToJson(event)) + return nil + }) + + wh.worker.Call("addEventListener", "message", messageEvent) + wh.worker.Call("addEventListener", "messageerror", messageError) +} + +// receiveMessage is registered with the Javascript event listener and is called +// every time a new message from the worker is received. +func (wh *WorkerHandler) receiveMessage(data []byte) error { + var message WorkerMessage + err := json.Unmarshal(data, &message) + if err != nil { + return err + } + + handler, err := wh.getHandler(message.Tag, message.ID) + if err != nil { + return err + } + + go handler(message.Data) + + return nil +} + +// getHandler returns the handler with the given ID or returns an error if no +// handler is found. The handler is deleted from the map if specified in +// deleteAfterReceiving. This function is thread safe. +func (wh *WorkerHandler) getHandler(tag Tag, id uint64) (HandlerFn, error) { + wh.mux.Lock() + defer wh.mux.Unlock() + handlers, exists := wh.handlers[tag] + if !exists { + return nil, errors.Errorf("no handlers found for tag %q", tag) + } + + handler, exists := handlers[id] + if !exists { + return nil, errors.Errorf("no %q handler found for ID %d", tag, id) + } + + if _, exists = deleteAfterReceiving[tag]; exists { + delete(wh.handlers[tag], id) + if len(wh.handlers[tag]) == 0 { + delete(wh.handlers, tag) + } + } + + return handler, nil +} + +// postMessage sends a message to the worker. +// +// message is the object to deliver to the worker; this will be in the data +// field in the event delivered to the worker. It must be a js.Value or a +// primitive type that can be converted via js.ValueOf. The Javascript object +// must be "any value or JavaScript object handled by the structured clone +// algorithm, which includes cyclical references.". See the doc for more +// information. +// +// If the message parameter is not provided, a SyntaxError will be thrown by the +// parser. If the data to be passed to the worker is unimportant, js.Null or +// js.Undefined can be passed explicitly. +// +// Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage +func (wh *WorkerHandler) postMessage(message any) { + wh.worker.Call("postMessage", message) +} + +// postMessageTransferList sends an array of Transferable objects to transfer to +// the worker. This is meant to be used to transfer large amounts of binary data +// using a high-performance, zero-copy operation. Refer to the doc for more +// information. +// +// Note: The binary data cannot simply be passed as the transferList. The +// traversable objects must be specified in the aMessage and included in the +// transferList +// +// Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage#transfer +func (wh *WorkerHandler) postMessageTransferList(message, transferList any) { + wh.worker.Call("postMessage", message, transferList) +} + +// newWorkerOptions creates a new Javascript object containing optional +// properties that can be set when creating a new worker. +// +// Each property is optional; leave a property empty to use the defaults (as +// documented). The available properties are: +// - type - The type of worker to create. The value can be either "classic" or +// "module". If not specified, the default used is classic. +// - credentials - The type of credentials to use for the worker. The value +// can be "omit", "same-origin", or "include". If it is not specified, or if +// the type is "classic", then the default used is "omit" (no credentials +// are required). +// - name - An identifying name for the worker, used mainly for debugging +// purposes. +// +// Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/Worker#options +func newWorkerOptions(workerType, credentials, name string) js.Value { + options := make(map[string]any, 3) + if workerType != "" { + options["type"] = workerType + } + if credentials != "" { + options["credentials"] = credentials + } + if name != "" { + options["name"] = name + } + return js.ValueOf(options) +} diff --git a/indexedDbWorker/channels/handlers.go b/indexedDbWorker/channels/handlers.go new file mode 100644 index 0000000000000000000000000000000000000000..7d997e746168aa50ba12240f66ea2d68df3d51ed --- /dev/null +++ b/indexedDbWorker/channels/handlers.go @@ -0,0 +1,372 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package main + +import ( + "encoding/json" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/channels" + "gitlab.com/elixxir/client/v4/cmix/rounds" + cryptoBroadcast "gitlab.com/elixxir/crypto/broadcast" + cryptoChannel "gitlab.com/elixxir/crypto/channel" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/elixxir/crypto/message" + "gitlab.com/elixxir/xxdk-wasm/indexedDb" + mChannels "gitlab.com/elixxir/xxdk-wasm/indexedDb/channels" + "gitlab.com/elixxir/xxdk-wasm/indexedDbWorker" + "gitlab.com/xx_network/crypto/csprng" + "gitlab.com/xx_network/primitives/id" + "time" +) + +// manager handles the event model and the message handler, which is used to +// send information between the event model and the main thread. +type manager struct { + mh *indexedDbWorker.MessageHandler + model channels.EventModel +} + +// RegisterHandlers registers all the reception handlers to manage messages from +// the main thread for the channels.EventModel. +func (m *manager) RegisterHandlers() { + + m.mh.RegisterHandler(indexedDb.NewWASMEventModelTag, m.newWASMEventModelHandler) + m.mh.RegisterHandler(indexedDb.JoinChannelTag, m.joinChannelHandler) + m.mh.RegisterHandler(indexedDb.LeaveChannelTag, m.leaveChannelHandler) + m.mh.RegisterHandler(indexedDb.ReceiveMessageTag, m.receiveMessageHandler) + m.mh.RegisterHandler(indexedDb.ReceiveReplyTag, m.receiveReplyHandler) + m.mh.RegisterHandler(indexedDb.ReceiveReactionTag, m.receiveReactionHandler) + m.mh.RegisterHandler(indexedDb.UpdateFromUUIDTag, m.updateFromUUIDHandler) + m.mh.RegisterHandler(indexedDb.UpdateFromMessageIDTag, m.updateFromMessageIDHandler) + m.mh.RegisterHandler(indexedDb.GetMessageTag, m.getMessageHandler) + m.mh.RegisterHandler(indexedDb.DeleteMessageTag, m.deleteMessageHandler) +} + +// newWASMEventModelHandler is the handler for NewWASMEventModel. +func (m *manager) newWASMEventModelHandler(data []byte) []byte { + var msg mChannels.NewWASMEventModelMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal "+ + "NewWASMEventModelMessage from main thread: %+v", err) + return []byte{} + } + + // Create new encryption cipher + rng := fastRNG.NewStreamGenerator(12, 1024, csprng.NewSystemRNG) + encryption, err := cryptoChannel.NewCipherFromJSON( + []byte(msg.EncryptionJSON), rng.GetStream()) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal channel cipher from "+ + "main thread: %+v", err) + return []byte{} + } + + m.model, err = NewWASMEventModel(msg.Path, encryption, + m.messageReceivedCallback, m.storeEncryptionStatus) + if err != nil { + return []byte(err.Error()) + } + return []byte{} +} + +// messageReceivedCallback sends calls to the MessageReceivedCallback in the +// main thread. +// +// storeEncryptionStatus adhere to the indexedDb.MessageReceivedCallback type. +func (m *manager) messageReceivedCallback( + uuid uint64, channelID *id.ID, update bool) { + // Package parameters for sending + msg := &mChannels.MessageReceivedCallbackMessage{ + UUID: uuid, + ChannelID: channelID, + Update: update, + } + data, err := json.Marshal(msg) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal MessageReceivedCallbackMessage: %+v", err) + return + } + + // Send it to the main thread + m.mh.SendResponse(indexedDb.GetMessageTag, indexedDb.InitID, data) +} + +// storeEncryptionStatus augments the functionality of +// storage.StoreIndexedDbEncryptionStatus. It takes the database name and +// encryption status +// +// storeEncryptionStatus adheres to the storeEncryptionStatusFn type. +func (m *manager) storeEncryptionStatus( + databaseName string, encryption bool) (bool, error) { + // Package parameters for sending + msg := &mChannels.EncryptionStatusMessage{ + DatabaseName: databaseName, + EncryptionStatus: encryption, + } + data, err := json.Marshal(msg) + if err != nil { + return false, err + } + + // Register response handler with channel that will wait for the response + responseChan := make(chan []byte) + m.mh.RegisterHandler(indexedDb.EncryptionStatusTag, + func(data []byte) []byte { + responseChan <- data + return nil + }) + + // Send encryption status to main thread + m.mh.SendResponse(indexedDb.EncryptionStatusTag, indexedDb.InitID, data) + + // Wait for response + var response mChannels.EncryptionStatusReply + select { + case responseData := <-responseChan: + if err = json.Unmarshal(responseData, &response); err != nil { + return false, err + } + case <-time.After(indexedDb.ResponseTimeout): + return false, errors.Errorf("timed out after %s waiting for "+ + "response about the database encryption status from local "+ + "storage in the main thread", indexedDb.ResponseTimeout) + } + + // If the response contain an error, return it + if response.Error != "" { + return false, errors.New(response.Error) + } + + // Return the encryption status + return response.EncryptionStatus, nil +} + +// joinChannelHandler is the handler for wasmModel.JoinChannel. +func (m *manager) joinChannelHandler(data []byte) []byte { + var channel cryptoBroadcast.Channel + err := json.Unmarshal(data, &channel) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal broadcast.Channel from "+ + "main thread: %+v", err) + return nil + } + + m.model.JoinChannel(&channel) + return nil +} + +// leaveChannelHandler is the handler for wasmModel.LeaveChannel. +func (m *manager) leaveChannelHandler(data []byte) []byte { + channelID, err := id.Unmarshal(data) + if err != nil { + jww.ERROR.Printf( + "Could not unmarshal channel ID from main thread: %+v", err) + return nil + } + + m.model.LeaveChannel(channelID) + return nil +} + +// receiveMessageHandler is the handler for wasmModel.ReceiveMessage. +func (m *manager) receiveMessageHandler(data []byte) []byte { + var msg channels.ModelMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal channels.ModelMessage "+ + "from main thread: %+v", err) + return nil + } + + uuid := m.model.ReceiveMessage(msg.ChannelID, msg.MessageID, msg.Nickname, + string(msg.Content), msg.PubKey, msg.DmToken, msg.CodesetVersion, + msg.Timestamp, msg.Lease, rounds.Round{ID: msg.Round}, msg.Type, + msg.Status, msg.Hidden) + + uuidData, err := json.Marshal(uuid) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal UUID from ReceiveMessage: %+v", err) + return nil + } + return uuidData +} + +// receiveReplyHandler is the handler for wasmModel.ReceiveReply. +func (m *manager) receiveReplyHandler(data []byte) []byte { + var msg mChannels.ReceiveReplyMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal ReceiveReplyMessage "+ + "from main thread: %+v", err) + return nil + } + + uuid := m.model.ReceiveReply(msg.ChannelID, msg.MessageID, msg.ReactionTo, + msg.Nickname, string(msg.Content), msg.PubKey, msg.DmToken, + msg.CodesetVersion, msg.Timestamp, msg.Lease, + rounds.Round{ID: msg.Round}, msg.Type, msg.Status, msg.Hidden) + + uuidData, err := json.Marshal(uuid) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal UUID from ReceiveReply: %+v", err) + return nil + } + return uuidData +} + +// receiveReactionHandler is the handler for wasmModel.ReceiveReaction. +func (m *manager) receiveReactionHandler(data []byte) []byte { + var msg mChannels.ReceiveReplyMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal ReceiveReplyMessage "+ + "from main thread: %+v", err) + return nil + } + + uuid := m.model.ReceiveReaction(msg.ChannelID, msg.MessageID, + msg.ReactionTo, msg.Nickname, string(msg.Content), msg.PubKey, + msg.DmToken, msg.CodesetVersion, msg.Timestamp, msg.Lease, + rounds.Round{ID: msg.Round}, msg.Type, msg.Status, msg.Hidden) + + uuidData, err := json.Marshal(uuid) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal UUID from ReceiveReaction: %+v", err) + return nil + } + return uuidData +} + +// updateFromUUIDHandler is the handler for wasmModel.UpdateFromUUID. +func (m *manager) updateFromUUIDHandler(data []byte) []byte { + var msg mChannels.MessageUpdateInfo + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal MessageUpdateInfo "+ + "from main thread: %+v", err) + return nil + } + var messageID *message.ID + var timestamp *time.Time + var round *rounds.Round + var pinned, hidden *bool + var status *channels.SentStatus + if msg.MessageIDSet { + messageID = &msg.MessageID + } + if msg.TimestampSet { + timestamp = &msg.Timestamp + } + if msg.RoundIDSet { + round = &rounds.Round{ID: msg.RoundID} + } + if msg.PinnedSet { + pinned = &msg.Pinned + } + if msg.HiddenSet { + hidden = &msg.Hidden + } + if msg.StatusSet { + status = &msg.Status + } + + m.model.UpdateFromUUID( + msg.UUID, messageID, timestamp, round, pinned, hidden, status) + return nil +} + +// updateFromMessageIDHandler is the handler for wasmModel.UpdateFromMessageID. +func (m *manager) updateFromMessageIDHandler(data []byte) []byte { + var msg mChannels.MessageUpdateInfo + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal MessageUpdateInfo "+ + "from main thread: %+v", err) + return nil + } + var timestamp *time.Time + var round *rounds.Round + var pinned, hidden *bool + var status *channels.SentStatus + if msg.TimestampSet { + timestamp = &msg.Timestamp + } + if msg.RoundIDSet { + round = &rounds.Round{ID: msg.RoundID} + } + if msg.PinnedSet { + pinned = &msg.Pinned + } + if msg.HiddenSet { + hidden = &msg.Hidden + } + if msg.StatusSet { + status = &msg.Status + } + + uuid := m.model.UpdateFromMessageID( + msg.MessageID, timestamp, round, pinned, hidden, status) + + uuidData, err := json.Marshal(uuid) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal UUID from UpdateFromMessageID: %+v", err) + return nil + } + return uuidData +} + +// getMessageHandler is the handler for wasmModel.GetMessage. +func (m *manager) getMessageHandler(data []byte) []byte { + messageID, err := message.UnmarshalID(data) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal message ID from main "+ + "thread: %+v", err) + return nil + } + + reply := mChannels.GetMessageMessage{} + + msg, err := m.model.GetMessage(messageID) + if err != nil { + reply.Error = err.Error() + } else { + reply.Message = msg + } + + messageData, err := json.Marshal(msg) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal UUID from ReceiveReaction: %+v", err) + return nil + } + return messageData +} + +// deleteMessageHandler is the handler for wasmModel.DeleteMessage. +func (m *manager) deleteMessageHandler(data []byte) []byte { + messageID, err := message.UnmarshalID(data) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal message ID from main "+ + "thread: %+v", err) + return nil + } + + err = m.model.DeleteMessage(messageID) + if err != nil { + return []byte(err.Error()) + } + + return nil +} diff --git a/indexedDbWorker/channels/implementation.go b/indexedDbWorker/channels/implementation.go new file mode 100644 index 0000000000000000000000000000000000000000..8fd7fba2a57fa7c1ba253f813e38e951cab3e888 --- /dev/null +++ b/indexedDbWorker/channels/implementation.go @@ -0,0 +1,515 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +//go:build js && wasm + +package main + +import ( + "crypto/ed25519" + "encoding/base64" + "encoding/json" + "strings" + "sync" + "syscall/js" + "time" + + "gitlab.com/elixxir/xxdk-wasm/indexedDb" + + "github.com/hack-pad/go-indexeddb/idb" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + + "gitlab.com/elixxir/client/v4/channels" + "gitlab.com/elixxir/client/v4/cmix/rounds" + cryptoBroadcast "gitlab.com/elixxir/crypto/broadcast" + cryptoChannel "gitlab.com/elixxir/crypto/channel" + "gitlab.com/elixxir/crypto/message" + "gitlab.com/elixxir/xxdk-wasm/utils" + "gitlab.com/xx_network/primitives/id" +) + +// wasmModel implements [channels.EventModel] interface, which uses the channels +// system passed an object that adheres to in order to get events on the +// channel. +type wasmModel struct { + db *idb.Database + cipher cryptoChannel.Cipher + receivedMessageCB MessageReceivedCallback + updateMux sync.Mutex +} + +// JoinChannel is called whenever a channel is joined locally. +func (w *wasmModel) JoinChannel(channel *cryptoBroadcast.Channel) { + parentErr := errors.New("failed to JoinChannel") + + // Build object + newChannel := Channel{ + ID: channel.ReceptionID.Marshal(), + Name: channel.Name, + Description: channel.Description, + } + + // Convert to jsObject + newChannelJson, err := json.Marshal(&newChannel) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to marshal Channel: %+v", err)) + return + } + channelObj, err := utils.JsonToJS(newChannelJson) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to marshal Channel: %+v", err)) + return + } + + _, err = indexedDb.Put(w.db, channelsStoreName, channelObj) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to put Channel: %+v", err)) + } +} + +// LeaveChannel is called whenever a channel is left locally. +func (w *wasmModel) LeaveChannel(channelID *id.ID) { + parentErr := errors.New("failed to LeaveChannel") + + // Delete the channel from storage + err := indexedDb.Delete(w.db, channelsStoreName, + js.ValueOf(channelID.String())) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to delete Channel: %+v", err)) + return + } + + // Clean up lingering data + err = w.deleteMsgByChannel(channelID) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Deleting Channel's Message data failed: %+v", err)) + return + } + jww.DEBUG.Printf("Successfully deleted channel: %s", channelID) +} + +// deleteMsgByChannel is a private helper that uses messageStoreChannelIndex +// to delete all Message with the given Channel ID. +func (w *wasmModel) deleteMsgByChannel(channelID *id.ID) error { + parentErr := errors.New("failed to deleteMsgByChannel") + + // Prepare the Transaction + txn, err := w.db.Transaction(idb.TransactionReadWrite, messageStoreName) + if err != nil { + return errors.WithMessagef(parentErr, + "Unable to create Transaction: %+v", err) + } + store, err := txn.ObjectStore(messageStoreName) + if err != nil { + return errors.WithMessagef(parentErr, + "Unable to get ObjectStore: %+v", err) + } + index, err := store.Index(messageStoreChannelIndex) + if err != nil { + return errors.WithMessagef(parentErr, + "Unable to get Index: %+v", err) + } + + // Perform the operation + channelIdStr := base64.StdEncoding.EncodeToString(channelID.Marshal()) + keyRange, err := idb.NewKeyRangeOnly(js.ValueOf(channelIdStr)) + cursorRequest, err := index.OpenCursorRange(keyRange, idb.CursorNext) + if err != nil { + return errors.WithMessagef(parentErr, "Unable to open Cursor: %+v", err) + } + ctx, cancel := indexedDb.NewContext() + err = cursorRequest.Iter(ctx, + func(cursor *idb.CursorWithValue) error { + _, err := cursor.Delete() + return err + }) + cancel() + if err != nil { + return errors.WithMessagef(parentErr, + "Unable to delete Message data: %+v", err) + } + return nil +} + +// ReceiveMessage is called whenever a message is received on a given channel. +// +// It may be called multiple times on the same message; it is incumbent on the +// user of the API to filter such called by message ID. +func (w *wasmModel) ReceiveMessage(channelID *id.ID, messageID message.ID, + nickname, text string, pubKey ed25519.PublicKey, dmToken uint32, + codeset uint8, timestamp time.Time, lease time.Duration, round rounds.Round, + mType channels.MessageType, status channels.SentStatus, hidden bool) uint64 { + textBytes := []byte(text) + var err error + + // Handle encryption, if it is present + if w.cipher != nil { + textBytes, err = w.cipher.Encrypt([]byte(text)) + if err != nil { + jww.ERROR.Printf("Failed to encrypt Message: %+v", err) + return 0 + } + } + + msgToInsert := buildMessage( + channelID.Marshal(), messageID.Bytes(), nil, nickname, + textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType, + false, hidden, status) + + uuid, err := w.receiveHelper(msgToInsert, false) + if err != nil { + jww.ERROR.Printf("Failed to receive Message: %+v", err) + } + + go w.receivedMessageCB(uuid, channelID, false) + return uuid +} + +// ReceiveReply is called whenever a message is received that is a reply on a +// given channel. It may be called multiple times on the same message; it is +// incumbent on the user of the API to filter such called by message ID. +// +// Messages may arrive our of order, so a reply, in theory, can arrive before +// the initial message. As a result, it may be important to buffer replies. +func (w *wasmModel) ReceiveReply(channelID *id.ID, messageID, + replyTo message.ID, nickname, text string, pubKey ed25519.PublicKey, + dmToken uint32, codeset uint8, timestamp time.Time, lease time.Duration, + round rounds.Round, mType channels.MessageType, status channels.SentStatus, + hidden bool) uint64 { + textBytes := []byte(text) + var err error + + // Handle encryption, if it is present + if w.cipher != nil { + textBytes, err = w.cipher.Encrypt([]byte(text)) + if err != nil { + jww.ERROR.Printf("Failed to encrypt Message: %+v", err) + return 0 + } + } + + msgToInsert := buildMessage(channelID.Marshal(), messageID.Bytes(), + replyTo.Bytes(), nickname, textBytes, pubKey, dmToken, codeset, + timestamp, lease, round.ID, mType, hidden, false, status) + + uuid, err := w.receiveHelper(msgToInsert, false) + + if err != nil { + jww.ERROR.Printf("Failed to receive reply: %+v", err) + } + go w.receivedMessageCB(uuid, channelID, false) + return uuid +} + +// ReceiveReaction is called whenever a reaction to a message is received on a +// given channel. It may be called multiple times on the same reaction; it is +// incumbent on the user of the API to filter such called by message ID. +// +// Messages may arrive our of order, so a reply, in theory, can arrive before +// the initial message. As a result, it may be important to buffer reactions. +func (w *wasmModel) ReceiveReaction(channelID *id.ID, messageID, + reactionTo message.ID, nickname, reaction string, pubKey ed25519.PublicKey, + dmToken uint32, codeset uint8, timestamp time.Time, lease time.Duration, + round rounds.Round, mType channels.MessageType, status channels.SentStatus, + hidden bool) uint64 { + textBytes := []byte(reaction) + var err error + + // Handle encryption, if it is present + if w.cipher != nil { + textBytes, err = w.cipher.Encrypt([]byte(reaction)) + if err != nil { + jww.ERROR.Printf("Failed to encrypt Message: %+v", err) + return 0 + } + } + + msgToInsert := buildMessage( + channelID.Marshal(), messageID.Bytes(), reactionTo.Bytes(), nickname, + textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType, + false, hidden, status) + + uuid, err := w.receiveHelper(msgToInsert, false) + if err != nil { + jww.ERROR.Printf("Failed to receive reaction: %+v", err) + } + go w.receivedMessageCB(uuid, channelID, false) + return uuid +} + +// UpdateFromUUID is called whenever a message at the UUID is modified. +// +// messageID, timestamp, round, pinned, and hidden are all nillable and may be +// updated based upon the UUID at a later date. If a nil value is passed, then +// make no update. +func (w *wasmModel) UpdateFromUUID(uuid uint64, messageID *message.ID, + timestamp *time.Time, round *rounds.Round, pinned, hidden *bool, + status *channels.SentStatus) { + parentErr := errors.New("failed to UpdateFromUUID") + + // FIXME: this is a bit of race condition without the mux. + // This should be done via the transactions (i.e., make a + // special version of receiveHelper) + w.updateMux.Lock() + defer w.updateMux.Unlock() + + // Convert messageID to the key generated by json.Marshal + key := js.ValueOf(uuid) + + // Use the key to get the existing Message + currentMsg, err := indexedDb.Get(w.db, messageStoreName, key) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to get message: %+v", err)) + return + } + + _, err = w.updateMessage(utils.JsToJson(currentMsg), messageID, timestamp, + round, pinned, hidden, status) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to updateMessage: %+v", err)) + } +} + +// UpdateFromMessageID is called whenever a message with the message ID is +// modified. +// +// The API needs to return the UUID of the modified message that can be +// referenced at a later time. +// +// timestamp, round, pinned, and hidden are all nillable and may be updated +// based upon the UUID at a later date. If a nil value is passed, then make +// no update. +func (w *wasmModel) UpdateFromMessageID(messageID message.ID, + timestamp *time.Time, round *rounds.Round, pinned, hidden *bool, + status *channels.SentStatus) uint64 { + parentErr := errors.New("failed to UpdateFromMessageID") + + // FIXME: this is a bit of race condition without the mux. + // This should be done via the transactions (i.e., make a + // special version of receiveHelper) + w.updateMux.Lock() + defer w.updateMux.Unlock() + + msgIDStr := base64.StdEncoding.EncodeToString(messageID.Marshal()) + currentMsgObj, err := indexedDb.GetIndex(w.db, messageStoreName, + messageStoreMessageIndex, js.ValueOf(msgIDStr)) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Failed to get message by index: %+v", err)) + return 0 + } + + currentMsg := utils.JsToJson(currentMsgObj) + uuid, err := w.updateMessage(currentMsg, &messageID, timestamp, + round, pinned, hidden, status) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to updateMessage: %+v", err)) + } + return uuid +} + +// updateMessage is a helper for updating a stored message. +func (w *wasmModel) updateMessage(currentMsgJson string, messageID *message.ID, + timestamp *time.Time, round *rounds.Round, pinned, hidden *bool, + status *channels.SentStatus) (uint64, error) { + + newMessage := &Message{} + err := json.Unmarshal([]byte(currentMsgJson), newMessage) + if err != nil { + return 0, err + } + + if status != nil { + newMessage.Status = uint8(*status) + } + if messageID != nil { + newMessage.MessageID = messageID.Bytes() + } + + if round != nil { + newMessage.Round = uint64(round.ID) + } + + if timestamp != nil { + newMessage.Timestamp = *timestamp + } + + if pinned != nil { + newMessage.Pinned = *pinned + } + + if hidden != nil { + newMessage.Hidden = *hidden + } + + // Store the updated Message + uuid, err := w.receiveHelper(newMessage, true) + if err != nil { + return 0, err + } + channelID := &id.ID{} + copy(channelID[:], newMessage.ChannelID) + go w.receivedMessageCB(uuid, channelID, true) + + return uuid, nil +} + +// buildMessage is a private helper that converts typical [channels.EventModel] +// inputs into a basic Message structure for insertion into storage. +// +// NOTE: ID is not set inside this function because we want to use the +// autoincrement key by default. If you are trying to overwrite an existing +// message, then you need to set it manually yourself. +func buildMessage(channelID, messageID, parentID []byte, nickname string, + text []byte, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8, + timestamp time.Time, lease time.Duration, round id.Round, + mType channels.MessageType, pinned, hidden bool, + status channels.SentStatus) *Message { + return &Message{ + MessageID: messageID, + Nickname: nickname, + ChannelID: channelID, + ParentMessageID: parentID, + Timestamp: timestamp, + Lease: lease, + Status: uint8(status), + Hidden: hidden, + Pinned: pinned, + Text: text, + Type: uint16(mType), + Round: uint64(round), + // User Identity Info + Pubkey: pubKey, + DmToken: dmToken, + CodesetVersion: codeset, + } +} + +// receiveHelper is a private helper for receiving any sort of message. +func (w *wasmModel) receiveHelper( + newMessage *Message, isUpdate bool) (uint64, error) { + // Convert to jsObject + newMessageJson, err := json.Marshal(newMessage) + if err != nil { + return 0, errors.Errorf("Unable to marshal Message: %+v", err) + } + messageObj, err := utils.JsonToJS(newMessageJson) + if err != nil { + return 0, errors.Errorf("Unable to marshal Message: %+v", err) + } + + // Unset the primaryKey for inserts so that it can be auto-populated and + // incremented + if !isUpdate { + messageObj.Delete("id") + } + + // Store message to database + result, err := indexedDb.Put(w.db, messageStoreName, messageObj) + if err != nil && !strings.Contains(err.Error(), + "at least one key does not satisfy the uniqueness requirements") { + // Only return non-unique constraint errors so that the case + // below this one can be hit and handle duplicate entries properly. + return 0, errors.Errorf("Unable to put Message: %+v", err) + } + + // NOTE: Sometimes the insert fails to return an error but hits a duplicate + // insert, so this fallthrough returns the UUID entry in that case. + if result.IsUndefined() { + msgID := message.ID{} + copy(msgID[:], newMessage.MessageID) + msg, errLookup := w.msgIDLookup(msgID) + if errLookup == nil && msg.ID != 0 { + return msg.ID, nil + } + return 0, errors.Errorf("uuid lookup failure: %+v", err) + } + uuid := uint64(result.Int()) + jww.DEBUG.Printf("Successfully stored message %d", uuid) + + return uuid, nil +} + +// GetMessage returns the message with the given [channel.MessageID]. +func (w *wasmModel) GetMessage( + messageID message.ID) (channels.ModelMessage, error) { + lookupResult, err := w.msgIDLookup(messageID) + if err != nil { + return channels.ModelMessage{}, err + } + + var channelId *id.ID + if lookupResult.ChannelID != nil { + channelId, err = id.Unmarshal(lookupResult.ChannelID) + if err != nil { + return channels.ModelMessage{}, err + } + } + + var parentMsgId message.ID + if lookupResult.ParentMessageID != nil { + parentMsgId, err = message.UnmarshalID(lookupResult.ParentMessageID) + if err != nil { + return channels.ModelMessage{}, err + } + } + + return channels.ModelMessage{ + UUID: lookupResult.ID, + Nickname: lookupResult.Nickname, + MessageID: messageID, + ChannelID: channelId, + ParentMessageID: parentMsgId, + Timestamp: lookupResult.Timestamp, + Lease: lookupResult.Lease, + Status: channels.SentStatus(lookupResult.Status), + Hidden: lookupResult.Hidden, + Pinned: lookupResult.Pinned, + Content: lookupResult.Text, + Type: channels.MessageType(lookupResult.Type), + Round: id.Round(lookupResult.Round), + PubKey: lookupResult.Pubkey, + CodesetVersion: lookupResult.CodesetVersion, + }, nil +} + +// DeleteMessage removes a message with the given messageID from storage. +func (w *wasmModel) DeleteMessage(messageID message.ID) error { + msgId := js.ValueOf(base64.StdEncoding.EncodeToString(messageID.Bytes())) + return indexedDb.DeleteIndex(w.db, messageStoreName, + messageStoreMessageIndex, pkeyName, msgId) +} + +// msgIDLookup gets the UUID of the Message with the given messageID. +func (w *wasmModel) msgIDLookup(messageID message.ID) (*Message, error) { + msgIDStr := js.ValueOf(base64.StdEncoding.EncodeToString(messageID.Bytes())) + resultObj, err := indexedDb.GetIndex(w.db, messageStoreName, + messageStoreMessageIndex, msgIDStr) + if err != nil { + return nil, err + } else if resultObj.IsUndefined() { + return nil, errors.Errorf("no message for %s found", msgIDStr) + } + + // Process result into string + resultMsg := &Message{} + err = json.Unmarshal([]byte(utils.JsToJson(resultObj)), resultMsg) + if err != nil { + return nil, err + } + return resultMsg, nil + +} diff --git a/indexedDb/channels/implementation_test.go b/indexedDbWorker/channels/implementation_test.go similarity index 93% rename from indexedDb/channels/implementation_test.go rename to indexedDbWorker/channels/implementation_test.go index 79a02de774ae701b6b579d356690f931f7a53457..0d035960ad9da8d5ac8baed2aadfe0fa54dea0e6 100644 --- a/indexedDb/channels/implementation_test.go +++ b/indexedDbWorker/channels/implementation_test.go @@ -7,7 +7,7 @@ //go:build js && wasm -package channels +package main import ( "encoding/json" @@ -38,6 +38,11 @@ func TestMain(m *testing.M) { func dummyCallback(uint64, *id.ID, bool) {} +// dummyStoreEncryptionStatus returns the same encryption status passed into it. +func dummyStoreEncryptionStatus(_ string, encryptionStatus bool) (bool, error) { + return encryptionStatus, nil +} + // Happy path, insert message and look it up func TestWasmModel_msgIDLookup(t *testing.T) { cipher, err := cryptoChannel.NewCipher( @@ -56,7 +61,8 @@ func TestWasmModel_msgIDLookup(t *testing.T) { testString := "TestWasmModel_msgIDLookup" + cs testMsgId := message.DeriveChannelMessageID(&id.ID{1}, 0, []byte(testString)) - eventModel, err := newWASMModel(testString, c, dummyCallback) + eventModel, err := newWASMModel( + testString, c, dummyCallback, dummyStoreEncryptionStatus) if err != nil { t.Fatalf("%+v", err) } @@ -85,7 +91,8 @@ func TestWasmModel_DeleteMessage(t *testing.T) { storage.GetLocalStorage().Clear() testString := "TestWasmModel_DeleteMessage" testMsgId := message.DeriveChannelMessageID(&id.ID{1}, 0, []byte(testString)) - eventModel, err := newWASMModel(testString, nil, dummyCallback) + eventModel, err := newWASMModel( + testString, nil, dummyCallback, dummyStoreEncryptionStatus) if err != nil { t.Fatalf("%+v", err) } @@ -141,7 +148,8 @@ func Test_wasmModel_UpdateSentStatus(t *testing.T) { testString := "Test_wasmModel_UpdateSentStatus" + cs testMsgId := message.DeriveChannelMessageID( &id.ID{1}, 0, []byte(testString)) - eventModel, err := newWASMModel(testString, c, dummyCallback) + eventModel, err := newWASMModel( + testString, c, dummyCallback,dummyStoreEncryptionStatus) if err != nil { t.Fatalf("%+v", err) } @@ -208,7 +216,8 @@ func Test_wasmModel_JoinChannel_LeaveChannel(t *testing.T) { } t.Run("Test_wasmModel_JoinChannel_LeaveChannel"+cs, func(t *testing.T) { storage.GetLocalStorage().Clear() - eventModel, err := newWASMModel("test", c, dummyCallback) + eventModel, err := newWASMModel( + "test", c, dummyCallback, dummyStoreEncryptionStatus) if err != nil { t.Fatalf("%+v", err) } @@ -261,7 +270,8 @@ func Test_wasmModel_UUIDTest(t *testing.T) { t.Run("Test_wasmModel_UUIDTest"+cs, func(t *testing.T) { storage.GetLocalStorage().Clear() testString := "testHello" + cs - eventModel, err := newWASMModel(testString, c, dummyCallback) + eventModel, err := newWASMModel( + testString, c, dummyCallback, dummyStoreEncryptionStatus) if err != nil { t.Fatalf("%+v", err) } @@ -307,7 +317,8 @@ func Test_wasmModel_DuplicateReceives(t *testing.T) { t.Run("Test_wasmModel_DuplicateReceives"+cs, func(t *testing.T) { storage.GetLocalStorage().Clear() testString := "testHello" - eventModel, err := newWASMModel(testString, c, dummyCallback) + eventModel, err := newWASMModel( + testString, c, dummyCallback, dummyStoreEncryptionStatus) if err != nil { t.Fatalf("%+v", err) } @@ -356,7 +367,8 @@ func Test_wasmModel_deleteMsgByChannel(t *testing.T) { testString := "test_deleteMsgByChannel" totalMessages := 10 expectedMessages := 5 - eventModel, err := newWASMModel(testString, c, dummyCallback) + eventModel, err := newWASMModel( + testString, c, dummyCallback, dummyStoreEncryptionStatus) if err != nil { t.Fatalf("%+v", err) } @@ -426,7 +438,8 @@ func TestWasmModel_receiveHelper_UniqueIndex(t *testing.T) { t.Run("TestWasmModel_receiveHelper_UniqueIndex"+cs, func(t *testing.T) { storage.GetLocalStorage().Clear() testString := fmt.Sprintf("test_receiveHelper_UniqueIndex_%d", i) - eventModel, err := newWASMModel(testString, c, dummyCallback) + eventModel, err := newWASMModel( + testString, c, dummyCallback, dummyStoreEncryptionStatus) if err != nil { t.Fatal(err) } diff --git a/indexedDbWorker/channels/init.go b/indexedDbWorker/channels/init.go new file mode 100644 index 0000000000000000000000000000000000000000..b6d350e48a0643063523b561388e23e0b7e3ff98 --- /dev/null +++ b/indexedDbWorker/channels/init.go @@ -0,0 +1,253 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +//go:build js && wasm + +package main + +import ( + "github.com/hack-pad/go-indexeddb/idb" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/channels" + cryptoChannel "gitlab.com/elixxir/crypto/channel" + "gitlab.com/elixxir/xxdk-wasm/indexedDb" + "gitlab.com/xx_network/primitives/id" + "syscall/js" + "time" +) + +const ( + // databaseSuffix is the suffix to be appended to the name of + // the database. + databaseSuffix = "_speakeasy" + + // currentVersion is the current version of the IndexDb + // runtime. Used for migration purposes. + currentVersion uint = 1 +) + +// MessageReceivedCallback is called any time a message is received or updated. +// +// update is true if the row is old and was edited. +type MessageReceivedCallback func(uuid uint64, channelID *id.ID, update bool) + +// NewWASMEventModel returns a [channels.EventModel] backed by a wasmModel. +// The name should be a base64 encoding of the users public key. +func NewWASMEventModel(path string, encryption cryptoChannel.Cipher, + cb MessageReceivedCallback, storeEncryptionStatus storeEncryptionStatusFn) ( + channels.EventModel, error) { + databaseName := path + databaseSuffix + return newWASMModel(databaseName, encryption, cb, storeEncryptionStatus) +} + +// storeEncryptionStatusFn matches storage.StoreIndexedDbEncryptionStatus so +// that the data can be sent between the worker and main thread. +type storeEncryptionStatusFn func( + databaseName string, encryptionStatus bool) (bool, error) + +// newWASMModel creates the given [idb.Database] and returns a wasmModel. +func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, + cb MessageReceivedCallback, storeEncryptionStatus storeEncryptionStatusFn) ( + *wasmModel, error) { + // Attempt to open database object + ctx, cancel := indexedDb.NewContext() + defer cancel() + openRequest, err := idb.Global().Open(ctx, databaseName, currentVersion, + func(db *idb.Database, oldVersion, newVersion uint) error { + if oldVersion == newVersion { + jww.INFO.Printf("IndexDb version is current: v%d", + newVersion) + return nil + } + + jww.INFO.Printf("IndexDb upgrade required: v%d -> v%d", + oldVersion, newVersion) + + if oldVersion == 0 && newVersion >= 1 { + err := v1Upgrade(db) + if err != nil { + return err + } + oldVersion = 1 + } + + // if oldVersion == 1 && newVersion >= 2 { v2Upgrade(), oldVersion = 2 } + return nil + }) + if err != nil { + return nil, err + } + + // Wait for database open to finish + db, err := openRequest.Await(ctx) + if err != nil { + return nil, err + } + + // Save the encryption status to storage + encryptionStatus := encryption != nil + loadedEncryptionStatus, err2 := + storeEncryptionStatus(databaseName, encryptionStatus) + if err2 != nil { + return nil, err2 + } + + // Verify encryption status does not change + if encryptionStatus != loadedEncryptionStatus { + return nil, errors.New( + "Cannot load database with different encryption status.") + } else if !encryptionStatus { + jww.WARN.Printf("IndexedDb encryption disabled!") + } + + // Attempt to ensure the database has been properly initialized + openRequest, err = idb.Global().Open(ctx, databaseName, currentVersion, + func(db *idb.Database, oldVersion, newVersion uint) error { + return nil + }) + if err != nil { + return nil, err + } + // Wait for database open to finish + db, err = openRequest.Await(ctx) + if err != nil { + return nil, err + } + wrapper := &wasmModel{db: db, receivedMessageCB: cb, cipher: encryption} + + return wrapper, nil +} + +// v1Upgrade performs the v0 -> v1 database upgrade. +// +// This can never be changed without permanently breaking backwards +// compatibility. +func v1Upgrade(db *idb.Database) error { + storeOpts := idb.ObjectStoreOptions{ + KeyPath: js.ValueOf(pkeyName), + AutoIncrement: true, + } + indexOpts := idb.IndexOptions{ + Unique: false, + MultiEntry: false, + } + + // Build Message ObjectStore and Indexes + messageStore, err := db.CreateObjectStore(messageStoreName, storeOpts) + if err != nil { + return err + } + _, err = messageStore.CreateIndex(messageStoreMessageIndex, + js.ValueOf(messageStoreMessage), + idb.IndexOptions{ + Unique: true, + MultiEntry: false, + }) + if err != nil { + return err + } + _, err = messageStore.CreateIndex(messageStoreChannelIndex, + js.ValueOf(messageStoreChannel), indexOpts) + if err != nil { + return err + } + _, err = messageStore.CreateIndex(messageStoreParentIndex, + js.ValueOf(messageStoreParent), indexOpts) + if err != nil { + return err + } + _, err = messageStore.CreateIndex(messageStoreTimestampIndex, + js.ValueOf(messageStoreTimestamp), indexOpts) + if err != nil { + return err + } + _, err = messageStore.CreateIndex(messageStorePinnedIndex, + js.ValueOf(messageStorePinned), indexOpts) + if err != nil { + return err + } + + // Build Channel ObjectStore + _, err = db.CreateObjectStore(channelsStoreName, storeOpts) + if err != nil { + return err + } + + // Get the database name and save it to storage + if databaseName, err2 := db.Name(); err2 != nil { + return err2 + } else if err = storeDatabaseName(databaseName); err != nil { + return err + } + + return nil +} + +// hackTestDb is a horrible function that exists as the result of an extremely +// long discussion about why initializing the IndexedDb sometimes silently +// fails. It ultimately tries to prevent an unrecoverable situation by actually +// inserting some nonsense data and then checking to see if it persists. +// If this function still exists in 2023, god help us all. Amen. +func (w *wasmModel) hackTestDb() error { + testMessage := &Message{ + ID: 0, + Nickname: "test", + MessageID: id.DummyUser.Marshal(), + } + msgId, helper := w.receiveHelper(testMessage, false) + if helper != nil { + return helper + } + result, err := indexedDb.Get(w.db, messageStoreName, js.ValueOf(msgId)) + if err != nil { + return err + } + if result.IsUndefined() { + return errors.Errorf("Failed to test db, record not present") + } + return nil +} + +// storeDatabaseName sends the database name to storage.StoreIndexedDb in the +// main thread to be stored in localstorage and waits for the error to be +// returned. +// +// The function specified below is a placeholder until set by +// registerDatabaseNameStore. registerDatabaseNameStore must be called before +// storeDatabaseName. +var storeDatabaseName = func(databaseName string) error { return nil } + +// RegisterDatabaseNameStore sets storeDatabaseName to send the database to +// storage.StoreIndexedDb in the main thread when called and registers a handler +// to listen for the response. +func RegisterDatabaseNameStore(m *manager) { + storeDatabaseNameResponseChan := make(chan []byte) + // Register handler + m.mh.RegisterHandler(indexedDb.StoreDatabaseNameTag, func(data []byte) []byte { + storeDatabaseNameResponseChan <- data + return nil + }) + + storeDatabaseName = func(databaseName string) error { + m.mh.SendResponse(indexedDb.StoreDatabaseNameTag, indexedDb.InitID, + []byte(databaseName)) + + // Wait for response + select { + case response := <-storeDatabaseNameResponseChan: + if len(response) > 0 { + return errors.New(string(response)) + } + case <-time.After(indexedDb.ResponseTimeout): + return errors.Errorf("timed out after %s waiting for "+ + "response about storing the database name in local "+ + "storage in the main thread", indexedDb.ResponseTimeout) + } + return nil + } +} diff --git a/indexedDbWorker/channels/main.go b/indexedDbWorker/channels/main.go new file mode 100644 index 0000000000000000000000000000000000000000..5610d4d69dfe4c0a3461be5182a0f3018502bd12 --- /dev/null +++ b/indexedDbWorker/channels/main.go @@ -0,0 +1,23 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package main + +import ( + "fmt" + "gitlab.com/elixxir/xxdk-wasm/indexedDbWorker" +) + +func main() { + fmt.Println("Starting xxDK WebAssembly Database Worker.") + + m := &manager{mh: indexedDbWorker.NewMessageHandler()} + m.RegisterHandlers() + RegisterDatabaseNameStore(m) + m.mh.SignalReady() + <-make(chan bool) +} diff --git a/indexedDb/channels/model.go b/indexedDbWorker/channels/model.go similarity index 99% rename from indexedDb/channels/model.go rename to indexedDbWorker/channels/model.go index 078d6bd67f7c43d9e50373b7801f69937cfce2ac..d90d5d0ff932c922634cdc49276074173631aea1 100644 --- a/indexedDb/channels/model.go +++ b/indexedDbWorker/channels/model.go @@ -7,7 +7,7 @@ //go:build js && wasm -package channels +package main import ( "time" diff --git a/indexedDbWorker/dm/implementation.go b/indexedDbWorker/dm/implementation.go new file mode 100644 index 0000000000000000000000000000000000000000..aedecb876c7e4b3a42cc6cb0897e5469027fcc01 --- /dev/null +++ b/indexedDbWorker/dm/implementation.go @@ -0,0 +1,381 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +//go:build js && wasm + +package channelEventModel + +import ( + "crypto/ed25519" + "encoding/json" + "strings" + "sync" + "syscall/js" + "time" + + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/cmix/rounds" + "gitlab.com/elixxir/client/v4/dm" + "gitlab.com/elixxir/xxdk-wasm/indexedDb" + "gitlab.com/elixxir/xxdk-wasm/utils" + "gitlab.com/xx_network/primitives/id" + + "github.com/hack-pad/go-indexeddb/idb" + cryptoChannel "gitlab.com/elixxir/crypto/channel" + "gitlab.com/elixxir/crypto/message" +) + +// wasmModel implements dm.EventModel interface, which uses the channels system +// passed an object that adheres to in order to get events on the channel. +type wasmModel struct { + db *idb.Database + cipher cryptoChannel.Cipher + receivedMessageCB MessageReceivedCallback + updateMux sync.Mutex +} + +// joinConversation is used for joining new conversations. +func (w *wasmModel) joinConversation(nickname string, + pubKey ed25519.PublicKey, dmToken uint32, codeset uint8) error { + parentErr := errors.New("failed to joinConversation") + + // Build object + newConvo := Conversation{ + Pubkey: pubKey, + Nickname: nickname, + Token: dmToken, + CodesetVersion: codeset, + Blocked: false, + } + + // Convert to jsObject + newConvoJson, err := json.Marshal(&newConvo) + if err != nil { + return errors.WithMessagef(parentErr, + "Unable to marshal Conversation: %+v", err) + } + convoObj, err := utils.JsonToJS(newConvoJson) + if err != nil { + return errors.WithMessagef(parentErr, + "Unable to marshal Conversation: %+v", err) + } + + _, err = indexedDb.Put(w.db, conversationStoreName, convoObj) + if err != nil { + return errors.WithMessagef(parentErr, + "Unable to put Conversation: %+v", err) + } + return nil +} + +// buildMessage is a private helper that converts typical dm.EventModel inputs +// into a basic Message structure for insertion into storage. +// +// NOTE: ID is not set inside this function because we want to use the +// autoincrement key by default. If you are trying to overwrite an existing +// message, then you need to set it manually yourself. +func buildMessage(messageID, parentID, text []byte, pubKey ed25519.PublicKey, + timestamp time.Time, round id.Round, mType dm.MessageType, + status dm.Status) *Message { + return &Message{ + MessageID: messageID, + ConversationPubKey: pubKey, + ParentMessageID: parentID, + Timestamp: timestamp, + Status: uint8(status), + Text: text, + Type: uint16(mType), + Round: uint64(round), + } +} + +func (w *wasmModel) Receive(messageID message.ID, nickname string, text []byte, + pubKey ed25519.PublicKey, dmToken uint32, codeset uint8, timestamp time.Time, + round rounds.Round, mType dm.MessageType, status dm.Status) uint64 { + parentErr := errors.New("failed to Receive") + + // If there is no extant Conversation, create one. + _, err := indexedDb.Get( + w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) + if err != nil { + if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { + err = w.joinConversation(nickname, pubKey, dmToken, codeset) + if err != nil { + jww.ERROR.Printf("%+v", err) + } + } else { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to get Conversation: %+v", err)) + } + return 0 + } else { + jww.DEBUG.Printf("Conversation with %s already joined", nickname) + } + + // Handle encryption, if it is present + if w.cipher != nil { + text, err = w.cipher.Encrypt(text) + if err != nil { + jww.ERROR.Printf("Failed to encrypt Message: %+v", err) + return 0 + } + } + + msgToInsert := buildMessage(messageID.Bytes(), nil, text, pubKey, timestamp, + round.ID, mType, status) + uuid, err := w.receiveHelper(msgToInsert, false) + if err != nil { + jww.ERROR.Printf("Failed to receive Message: %+v", err) + } + + go w.receivedMessageCB(uuid, pubKey, false) + return uuid +} + +func (w *wasmModel) ReceiveText(messageID message.ID, nickname, text string, + pubKey ed25519.PublicKey, dmToken uint32, codeset uint8, + timestamp time.Time, round rounds.Round, status dm.Status) uint64 { + parentErr := errors.New("failed to ReceiveText") + + // If there is no extant Conversation, create one. + _, err := indexedDb.Get( + w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) + if err != nil { + if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { + err = w.joinConversation(nickname, pubKey, dmToken, codeset) + if err != nil { + jww.ERROR.Printf("%+v", err) + } + } else { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to get Conversation: %+v", err)) + } + return 0 + } else { + jww.DEBUG.Printf("Conversation with %s already joined", nickname) + } + + // Handle encryption, if it is present + textBytes := []byte(text) + if w.cipher != nil { + textBytes, err = w.cipher.Encrypt(textBytes) + if err != nil { + jww.ERROR.Printf("Failed to encrypt Message: %+v", err) + return 0 + } + } + + msgToInsert := buildMessage(messageID.Bytes(), nil, textBytes, + pubKey, timestamp, round.ID, dm.TextType, status) + + uuid, err := w.receiveHelper(msgToInsert, false) + if err != nil { + jww.ERROR.Printf("Failed to receive Message: %+v", err) + } + + go w.receivedMessageCB(uuid, pubKey, false) + return uuid +} + +func (w *wasmModel) ReceiveReply(messageID, reactionTo message.ID, nickname, + text string, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8, + timestamp time.Time, round rounds.Round, status dm.Status) uint64 { + parentErr := errors.New("failed to ReceiveReply") + + // If there is no extant Conversation, create one. + _, err := indexedDb.Get( + w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) + if err != nil { + if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { + err = w.joinConversation(nickname, pubKey, dmToken, codeset) + if err != nil { + jww.ERROR.Printf("%+v", err) + } + } else { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to get Conversation: %+v", err)) + } + return 0 + } else { + jww.DEBUG.Printf("Conversation with %s already joined", nickname) + } + + // Handle encryption, if it is present + textBytes := []byte(text) + if w.cipher != nil { + textBytes, err = w.cipher.Encrypt(textBytes) + if err != nil { + jww.ERROR.Printf("Failed to encrypt Message: %+v", err) + return 0 + } + } + + msgToInsert := buildMessage(messageID.Bytes(), reactionTo.Marshal(), textBytes, + pubKey, timestamp, round.ID, dm.TextType, status) + + uuid, err := w.receiveHelper(msgToInsert, false) + if err != nil { + jww.ERROR.Printf("Failed to receive Message: %+v", err) + } + + go w.receivedMessageCB(uuid, pubKey, false) + return uuid +} + +func (w *wasmModel) ReceiveReaction(messageID, _ message.ID, nickname, + reaction string, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8, + timestamp time.Time, round rounds.Round, status dm.Status) uint64 { + parentErr := errors.New("failed to ReceiveText") + + // If there is no extant Conversation, create one. + _, err := indexedDb.Get( + w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) + if err != nil { + if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { + err = w.joinConversation(nickname, pubKey, dmToken, codeset) + if err != nil { + jww.ERROR.Printf("%+v", err) + } + } else { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to get Conversation: %+v", err)) + } + return 0 + } else { + jww.DEBUG.Printf("Conversation with %s already joined", nickname) + } + + // Handle encryption, if it is present + textBytes := []byte(reaction) + if w.cipher != nil { + textBytes, err = w.cipher.Encrypt(textBytes) + if err != nil { + jww.ERROR.Printf("Failed to encrypt Message: %+v", err) + return 0 + } + } + + msgToInsert := buildMessage(messageID.Bytes(), nil, textBytes, + pubKey, timestamp, round.ID, dm.ReactionType, status) + + uuid, err := w.receiveHelper(msgToInsert, false) + if err != nil { + jww.ERROR.Printf("Failed to receive Message: %+v", err) + } + + go w.receivedMessageCB(uuid, pubKey, false) + return uuid +} + +func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID, + timestamp time.Time, round rounds.Round, status dm.Status) { + parentErr := errors.New("failed to UpdateSentStatus") + + // FIXME: this is a bit of race condition without the mux. + // This should be done via the transactions (i.e., make a + // special version of receiveHelper) + w.updateMux.Lock() + defer w.updateMux.Unlock() + + // Convert messageID to the key generated by json.Marshal + key := js.ValueOf(uuid) + + // Use the key to get the existing Message + currentMsg, err := indexedDb.Get(w.db, messageStoreName, key) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Unable to get message: %+v", err)) + return + } + + // Extract the existing Message and update the Status + newMessage := &Message{} + err = json.Unmarshal([]byte(utils.JsToJson(currentMsg)), newMessage) + if err != nil { + jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, + "Could not JSON unmarshal message: %+v", err)) + return + } + + newMessage.Status = uint8(status) + if !messageID.Equals(message.ID{}) { + newMessage.MessageID = messageID.Bytes() + } + + if round.ID != 0 { + newMessage.Round = uint64(round.ID) + } + + if !timestamp.Equal(time.Time{}) { + newMessage.Timestamp = timestamp + } + + // Store the updated Message + _, err = w.receiveHelper(newMessage, true) + if err != nil { + jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error())) + } + go w.receivedMessageCB(uuid, newMessage.ConversationPubKey, true) +} + +// receiveHelper is a private helper for receiving any sort of message. +func (w *wasmModel) receiveHelper( + newMessage *Message, isUpdate bool) (uint64, error) { + // Convert to jsObject + newMessageJson, err := json.Marshal(newMessage) + if err != nil { + return 0, errors.Errorf("Unable to marshal Message: %+v", err) + } + messageObj, err := utils.JsonToJS(newMessageJson) + if err != nil { + return 0, errors.Errorf("Unable to marshal Message: %+v", err) + } + + // Unset the primaryKey for inserts so that it can be auto-populated and + // incremented + if !isUpdate { + messageObj.Delete("id") + } + + // Store message to database + result, err := indexedDb.Put(w.db, messageStoreName, messageObj) + if err != nil { + return 0, errors.Errorf("Unable to put Message: %+v", err) + } + + // NOTE: Sometimes the insert fails to return an error but hits a duplicate + // insert, so this fallthrough returns the UUID entry in that case. + if result.IsUndefined() { + msgID := message.ID{} + copy(msgID[:], newMessage.MessageID) + uuid, errLookup := w.msgIDLookup(msgID) + if uuid != 0 && errLookup == nil { + return uuid, nil + } + return 0, errors.Errorf("uuid lookup failure: %+v", err) + } + uuid := uint64(result.Int()) + jww.DEBUG.Printf("Successfully stored message %d", uuid) + + return uuid, nil +} + +// msgIDLookup gets the UUID of the Message with the given messageID. +func (w *wasmModel) msgIDLookup(messageID message.ID) (uint64, error) { + resultObj, err := indexedDb.GetIndex(w.db, messageStoreName, + messageStoreMessageIndex, utils.CopyBytesToJS(messageID.Marshal())) + if err != nil { + return 0, err + } + + uuid := uint64(0) + if !resultObj.IsUndefined() { + uuid = uint64(resultObj.Get("id").Int()) + } + return uuid, nil +} diff --git a/indexedDbWorker/dm/init.go b/indexedDbWorker/dm/init.go new file mode 100644 index 0000000000000000000000000000000000000000..29b80911804004539beb210dc85533afc7c98f31 --- /dev/null +++ b/indexedDbWorker/dm/init.go @@ -0,0 +1,183 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +//go:build js && wasm + +package channelEventModel + +import ( + "crypto/ed25519" + "syscall/js" + + "github.com/hack-pad/go-indexeddb/idb" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/dm" + cryptoChannel "gitlab.com/elixxir/crypto/channel" + "gitlab.com/elixxir/xxdk-wasm/indexedDb" + "gitlab.com/elixxir/xxdk-wasm/storage" +) + +const ( + // databaseSuffix is the suffix to be appended to the name of + // the database. + databaseSuffix = "_speakeasy_dm" + + // currentVersion is the current version of the IndexDb + // runtime. Used for migration purposes. + currentVersion uint = 1 +) + +// MessageReceivedCallback is called any time a message is received or updated. +// +// update is true if the row is old and was edited. +type MessageReceivedCallback func( + uuid uint64, pubKey ed25519.PublicKey, update bool) + +// NewWASMEventModel returns a [channels.EventModel] backed by a wasmModel. +// The name should be a base64 encoding of the users public key. +func NewWASMEventModel(path string, encryption cryptoChannel.Cipher, + cb MessageReceivedCallback) (dm.EventModel, error) { + databaseName := path + databaseSuffix + return newWASMModel(databaseName, encryption, cb) +} + +// newWASMModel creates the given [idb.Database] and returns a wasmModel. +func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, + cb MessageReceivedCallback) (*wasmModel, error) { + // Attempt to open database object + ctx, cancel := indexedDb.NewContext() + defer cancel() + openRequest, err := idb.Global().Open(ctx, databaseName, currentVersion, + func(db *idb.Database, oldVersion, newVersion uint) error { + if oldVersion == newVersion { + jww.INFO.Printf("IndexDb version is current: v%d", + newVersion) + return nil + } + + jww.INFO.Printf("IndexDb upgrade required: v%d -> v%d", + oldVersion, newVersion) + + if oldVersion == 0 && newVersion >= 1 { + err := v1Upgrade(db) + if err != nil { + return err + } + oldVersion = 1 + } + + // if oldVersion == 1 && newVersion >= 2 { v2Upgrade(), oldVersion = 2 } + return nil + }) + if err != nil { + return nil, err + } + + // Wait for database open to finish + db, err := openRequest.Await(ctx) + if err != nil { + return nil, err + } + + // Save the encryption status to storage + encryptionStatus := encryption != nil + loadedEncryptionStatus, err := storage.StoreIndexedDbEncryptionStatus( + databaseName, encryptionStatus) + if err != nil { + return nil, err + } + + // Verify encryption status does not change + if encryptionStatus != loadedEncryptionStatus { + return nil, errors.New( + "Cannot load database with different encryption status.") + } else if !encryptionStatus { + jww.WARN.Printf("IndexedDb encryption disabled!") + } + + // Attempt to ensure the database has been properly initialized + openRequest, err = idb.Global().Open(ctx, databaseName, currentVersion, + func(db *idb.Database, oldVersion, newVersion uint) error { + return nil + }) + if err != nil { + return nil, err + } + // Wait for database open to finish + db, err = openRequest.Await(ctx) + if err != nil { + return nil, err + } + wrapper := &wasmModel{db: db, receivedMessageCB: cb, cipher: encryption} + + return wrapper, nil +} + +// v1Upgrade performs the v0 -> v1 database upgrade. +// +// This can never be changed without permanently breaking backwards +// compatibility. +func v1Upgrade(db *idb.Database) error { + indexOpts := idb.IndexOptions{ + Unique: false, + MultiEntry: false, + } + + // Build Message ObjectStore and Indexes + messageStoreOpts := idb.ObjectStoreOptions{ + KeyPath: js.ValueOf(msgPkeyName), + AutoIncrement: true, + } + messageStore, err := db.CreateObjectStore(messageStoreName, messageStoreOpts) + if err != nil { + return err + } + _, err = messageStore.CreateIndex(messageStoreMessageIndex, + js.ValueOf(messageStoreMessage), + idb.IndexOptions{ + Unique: true, + MultiEntry: false, + }) + if err != nil { + return err + } + _, err = messageStore.CreateIndex(messageStoreConversationIndex, + js.ValueOf(messageStoreConversation), indexOpts) + if err != nil { + return err + } + _, err = messageStore.CreateIndex(messageStoreParentIndex, + js.ValueOf(messageStoreParent), indexOpts) + if err != nil { + return err + } + _, err = messageStore.CreateIndex(messageStoreTimestampIndex, + js.ValueOf(messageStoreTimestamp), indexOpts) + if err != nil { + return err + } + + // Build Channel ObjectStore + conversationStoreOpts := idb.ObjectStoreOptions{ + KeyPath: js.ValueOf(convoPkeyName), + AutoIncrement: false, + } + _, err = db.CreateObjectStore(conversationStoreName, conversationStoreOpts) + if err != nil { + return err + } + + // Get the database name and save it to storage + if databaseName, err := db.Name(); err != nil { + return err + } else if err = storage.StoreIndexedDb(databaseName); err != nil { + return err + } + + return nil +} diff --git a/indexedDbWorker/dm/model.go b/indexedDbWorker/dm/model.go new file mode 100644 index 0000000000000000000000000000000000000000..bb6f34588aa2976c1689b629c6df31219de6b585 --- /dev/null +++ b/indexedDbWorker/dm/model.go @@ -0,0 +1,63 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +//go:build js && wasm + +package channelEventModel + +import ( + "time" +) + +const ( + // Text representation of primary key value (keyPath). + msgPkeyName = "id" + convoPkeyName = "pub_key" + + // Text representation of the names of the various [idb.ObjectStore]. + messageStoreName = "messages" + conversationStoreName = "conversations" + + // Message index names. + messageStoreMessageIndex = "message_id_index" + messageStoreConversationIndex = "conversation_id_index" + messageStoreParentIndex = "parent_message_id_index" + messageStoreTimestampIndex = "timestamp_index" + + // Message keyPath names (must match json struct tags). + messageStoreMessage = "message_id" + messageStoreConversation = "conversation_id" + messageStoreParent = "parent_message_id" + messageStoreTimestamp = "timestamp" +) + +// Message defines the IndexedDb representation of a single Message. +// +// A Message belongs to one Conversation. +// A Message may belong to one Message (Parent). +type Message struct { + ID uint64 `json:"id"` // Matches msgPkeyName + MessageID []byte `json:"message_id"` // Index + ConversationPubKey []byte `json:"conversation_pub_key"` // Index + ParentMessageID []byte `json:"parent_message_id"` // Index + Timestamp time.Time `json:"timestamp"` // Index + Status uint8 `json:"status"` + Text []byte `json:"text"` + Type uint16 `json:"type"` + Round uint64 `json:"round"` +} + +// Conversation defines the IndexedDb representation of a single +// message exchange between two recipients. +// A Conversation has many Message. +type Conversation struct { + Pubkey []byte `json:"pub_key"` // Matches convoPkeyName + Nickname string `json:"nickname"` + Token uint32 `json:"token"` + CodesetVersion uint8 `json:"codeset_version"` + Blocked bool `json:"blocked"` +} diff --git a/indexedDbWorker/messageHandler.go b/indexedDbWorker/messageHandler.go new file mode 100644 index 0000000000000000000000000000000000000000..7b402d74967beccab060632dec4e51a4a669d289 --- /dev/null +++ b/indexedDbWorker/messageHandler.go @@ -0,0 +1,162 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package indexedDbWorker + +import ( + "encoding/json" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/xxdk-wasm/indexedDb" + "gitlab.com/elixxir/xxdk-wasm/utils" + "sync" + "syscall/js" +) + +// HandlerFn is the function that handles incoming data from the main thread. +type HandlerFn func(data []byte) []byte + +// MessageHandler queues incoming messages from the main thread and handles them +// based on their tag. +type MessageHandler struct { + // messages is a list of queued messages sent from the main thread. + messages chan js.Value + + // handlers is a list of functions to handle messages that come from the + // main thread keyed on the handler tag. + handlers map[indexedDb.Tag]HandlerFn + + mux sync.Mutex +} + +// NewMessageHandler initialises a new MessageHandler. +func NewMessageHandler() *MessageHandler { + mh := &MessageHandler{ + messages: make(chan js.Value, 100), + handlers: make(map[indexedDb.Tag]HandlerFn), + } + + mh.addEventListeners() + + return mh +} + +// SignalReady sends a signal to the main thread indicating that the worker is +// ready. Once the main thread receives this, it will initiate communication. +// Therefore, this should only be run once all listeners are ready. +func (mh *MessageHandler) SignalReady() { + mh.SendResponse(indexedDb.ReadyTag, indexedDb.InitID, nil) +} + +// SendResponse sends a reply to the main thread with the given tag and ID, +func (mh *MessageHandler) SendResponse(tag indexedDb.Tag, id uint64, data []byte) { + message := indexedDb.WorkerMessage{ + Tag: tag, + ID: id, + Data: data, + } + + payload, err := json.Marshal(message) + if err != nil { + jww.FATAL.Panicf("Failed to marshal payload with tag %q and ID %d "+ + "going to main thread: %+v", tag, id, err) + } + + go postMessage(string(payload)) +} + +// receiveMessage is registered with the Javascript event listener and is called +// everytime a message from the main thread is received. If the registered +// handler returns a response, it is sent to the main thread. +func (mh *MessageHandler) receiveMessage(data []byte) error { + var message indexedDb.WorkerMessage + err := json.Unmarshal(data, &message) + if err != nil { + return err + } + + mh.mux.Lock() + handler, exists := mh.handlers[message.Tag] + mh.mux.Unlock() + if !exists { + return errors.Errorf("no handler found for tag %q", message.Tag) + } + + // Call handler and register response with its return + go func() { + response := handler(message.Data) + if response != nil { + mh.SendResponse(message.Tag, message.ID, response) + } + }() + + return nil +} + +// RegisterHandler registers the handler with the given tag overwriting any +// previous registered handler with the same tag. This function is thread safe. +// +// If the handler returns anything but nil, it will be returned as a response. +func (mh *MessageHandler) RegisterHandler(tag indexedDb.Tag, handler HandlerFn) { + mh.mux.Lock() + mh.handlers[tag] = handler + mh.mux.Unlock() +} + +// addEventListeners adds the event listeners needed to receive a message from +// the worker. Two listeners were added; one to receive messages from the worker +// and the other to receive errors. +func (mh *MessageHandler) addEventListeners() { + // Create a listener for when the message event is fire on the worker. This + // occurs when a message is received from the main thread. + // Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/message_event + messageEvent := js.FuncOf(func(this js.Value, args []js.Value) any { + err := mh.receiveMessage([]byte(args[0].Get("data").String())) + if err != nil { + jww.ERROR.Printf("Failed to receive message from main thread: %+v", err) + } + return nil + }) + + // Create listener for when a messageerror event is fired on the worker. + // This occurs when it receives a message that can't be deserialized. + // Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/messageerror_event + messageError := js.FuncOf(func(this js.Value, args []js.Value) any { + event := args[0] + jww.ERROR.Printf( + "Error receiving message from main thread: %s", utils.JsToJson(event)) + return nil + }) + + js.Global().Call("addEventListener", "message", messageEvent) + js.Global().Call("addEventListener", "messageerror", messageError) +} + +// postMessage sends a message from this worker to the main WASM thread. +// +// aMessage must be a js.Value or a primitive type that can be converted via +// js.ValueOf. The Javascript object must be "any value or JavaScript object +// handled by the structured clone algorithm". See the doc for more information. +// +// Doc: https://developer.mozilla.org/docs/Web/API/DedicatedWorkerGlobalScope/postMessage +func postMessage(aMessage any) { + js.Global().Call("postMessage", aMessage) +} + +// postMessageTransferList sends an array of Transferable objects to transfer to +// the main thread. This is meant to be used to transfer large amounts of binary +// data using a high-performance, zero-copy operation. Refer to the doc for more +// information. +// +// Note: The binary data cannot simply be passed as the transferList. The +// traversable objects must be specified in the aMessage and included in the +// transferList +// +// Doc: https://developer.mozilla.org/en-US/docs/Web/API/DedicatedWorkerGlobalScope/postMessage +func postMessageTransferList(aMessage, transferList any) { + js.Global().Call("postMessage", aMessage, transferList) +} diff --git a/indexedDbWorker/utils.go b/indexedDbWorker/utils.go new file mode 100644 index 0000000000000000000000000000000000000000..321dd082214e7ab88ed6351442882317426f4e87 --- /dev/null +++ b/indexedDbWorker/utils.go @@ -0,0 +1,262 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +//go:build js && wasm + +// This file contains several generic IndexedDB helper functions that +// may be useful for any IndexedDB implementations. + +package indexedDbWorker + +import ( + "context" + "github.com/hack-pad/go-indexeddb/idb" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/xxdk-wasm/utils" + "syscall/js" + "time" +) + +const ( + // dbTimeout is the global timeout for operations with the storage + // [context.Context]. + dbTimeout = time.Second + + // ErrDoesNotExist is an error string for got undefined on Get operations. + ErrDoesNotExist = "result is undefined" +) + +// NewContext builds a context for indexedDb operations. +func NewContext() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), dbTimeout) +} + +// Get is a generic helper for getting values from the given [idb.ObjectStore]. +// Only usable by primary key. +func Get(db *idb.Database, objectStoreName string, key js.Value) (js.Value, error) { + parentErr := errors.Errorf("failed to Get %s/%s", objectStoreName, key) + + // Prepare the Transaction + txn, err := db.Transaction(idb.TransactionReadOnly, objectStoreName) + if err != nil { + return js.Undefined(), errors.WithMessagef(parentErr, + "Unable to create Transaction: %+v", err) + } + store, err := txn.ObjectStore(objectStoreName) + if err != nil { + return js.Undefined(), errors.WithMessagef(parentErr, + "Unable to get ObjectStore: %+v", err) + } + + // Perform the operation + getRequest, err := store.Get(key) + if err != nil { + return js.Undefined(), errors.WithMessagef(parentErr, + "Unable to Get from ObjectStore: %+v", err) + } + + // Wait for the operation to return + ctx, cancel := NewContext() + resultObj, err := getRequest.Await(ctx) + cancel() + if err != nil { + return js.Undefined(), errors.WithMessagef(parentErr, + "Unable to get from ObjectStore: %+v", err) + } else if resultObj.IsUndefined() { + return js.Undefined(), errors.WithMessagef(parentErr, + "Unable to get from ObjectStore: %s", ErrDoesNotExist) + } + + // Process result into string + jww.DEBUG.Printf("Got from %s/%s: %s", + objectStoreName, key, utils.JsToJson(resultObj)) + return resultObj, nil +} + +// GetIndex is a generic helper for getting values from the given +// [idb.ObjectStore] using the given [idb.Index]. +func GetIndex(db *idb.Database, objectStoreName, + indexName string, key js.Value) (js.Value, error) { + parentErr := errors.Errorf("failed to GetIndex %s/%s/%s", + objectStoreName, indexName, key) + + // Prepare the Transaction + txn, err := db.Transaction(idb.TransactionReadOnly, objectStoreName) + if err != nil { + return js.Undefined(), errors.WithMessagef(parentErr, + "Unable to create Transaction: %+v", err) + } + store, err := txn.ObjectStore(objectStoreName) + if err != nil { + return js.Undefined(), errors.WithMessagef(parentErr, + "Unable to get ObjectStore: %+v", err) + } + idx, err := store.Index(indexName) + if err != nil { + return js.Undefined(), errors.WithMessagef(parentErr, + "Unable to get Index: %+v", err) + } + + // Perform the operation + getRequest, err := idx.Get(key) + if err != nil { + return js.Undefined(), errors.WithMessagef(parentErr, + "Unable to Get from ObjectStore: %+v", err) + } + + // Wait for the operation to return + ctx, cancel := NewContext() + resultObj, err := getRequest.Await(ctx) + cancel() + if err != nil { + return js.Undefined(), errors.WithMessagef(parentErr, + "Unable to get from ObjectStore: %+v", err) + } else if resultObj.IsUndefined() { + return js.Undefined(), errors.WithMessagef(parentErr, + "Unable to get from ObjectStore: %s", ErrDoesNotExist) + } + + // Process result into string + jww.DEBUG.Printf("Got from %s/%s/%s: %s", + objectStoreName, indexName, key, utils.JsToJson(resultObj)) + return resultObj, nil +} + +// Put is a generic helper for putting values into the given [idb.ObjectStore]. +// Equivalent to insert if not exists else update. +func Put(db *idb.Database, objectStoreName string, value js.Value) (js.Value, error) { + // Prepare the Transaction + txn, err := db.Transaction(idb.TransactionReadWrite, objectStoreName) + if err != nil { + return js.Undefined(), errors.Errorf("Unable to create Transaction: %+v", err) + } + store, err := txn.ObjectStore(objectStoreName) + if err != nil { + return js.Undefined(), errors.Errorf("Unable to get ObjectStore: %+v", err) + } + + // Perform the operation + request, err := store.Put(value) + if err != nil { + return js.Undefined(), errors.Errorf("Unable to Put: %+v", err) + } + + // Wait for the operation to return + ctx, cancel := NewContext() + result, err := request.Await(ctx) + cancel() + if err != nil { + return js.Undefined(), errors.Errorf("Putting value failed: %+v", err) + } + jww.DEBUG.Printf("Successfully put value in %s: %s", + objectStoreName, utils.JsToJson(value)) + return result, nil +} + +// Delete is a generic helper for removing values from the given +// [idb.ObjectStore]. Only usable by primary key. +func Delete(db *idb.Database, objectStoreName string, key js.Value) error { + parentErr := errors.Errorf("failed to Delete %s/%s", objectStoreName, key) + + // Prepare the Transaction + txn, err := db.Transaction(idb.TransactionReadWrite, objectStoreName) + if err != nil { + return errors.WithMessagef(parentErr, + "Unable to create Transaction: %+v", err) + } + store, err := txn.ObjectStore(objectStoreName) + if err != nil { + return errors.WithMessagef(parentErr, + "Unable to get ObjectStore: %+v", err) + } + + // Perform the operation + _, err = store.Delete(key) + if err != nil { + return errors.WithMessagef(parentErr, + "Unable to Delete from ObjectStore: %+v", err) + } + + // Wait for the operation to return + ctx, cancel := NewContext() + err = txn.Await(ctx) + cancel() + if err != nil { + return errors.WithMessagef(parentErr, + "Unable to Delete from ObjectStore: %+v", err) + } + jww.DEBUG.Printf("Successfully deleted value at %s/%s", + objectStoreName, utils.JsToJson(key)) + return nil +} + +// DeleteIndex is a generic helper for removing values from the +// given [idb.ObjectStore] using the given [idb.Index]. Requires passing +// in the name of the primary key for the store. +func DeleteIndex(db *idb.Database, objectStoreName, + indexName, pkeyName string, key js.Value) error { + parentErr := errors.Errorf("failed to DeleteIndex %s/%s", objectStoreName, key) + + value, err := GetIndex(db, objectStoreName, indexName, key) + if err != nil { + return errors.WithMessagef(parentErr, "%+v", err) + } + + err = Delete(db, objectStoreName, value.Get(pkeyName)) + if err != nil { + return errors.WithMessagef(parentErr, "%+v", err) + } + + jww.DEBUG.Printf("Successfully deleted value at %s/%s/%s", + objectStoreName, indexName, utils.JsToJson(key)) + return nil +} + +// Dump returns the given [idb.ObjectStore] contents to string slice for +// testing and debugging purposes. +func Dump(db *idb.Database, objectStoreName string) ([]string, error) { + parentErr := errors.Errorf("failed to Dump %s", objectStoreName) + + txn, err := db.Transaction(idb.TransactionReadOnly, objectStoreName) + if err != nil { + return nil, errors.WithMessagef(parentErr, + "Unable to create Transaction: %+v", err) + } + store, err := txn.ObjectStore(objectStoreName) + if err != nil { + return nil, errors.WithMessagef(parentErr, + "Unable to get ObjectStore: %+v", err) + } + cursorRequest, err := store.OpenCursor(idb.CursorNext) + if err != nil { + return nil, errors.WithMessagef(parentErr, + "Unable to open Cursor: %+v", err) + } + + // Run the query + jww.DEBUG.Printf("%s values:", objectStoreName) + results := make([]string, 0) + ctx, cancel := NewContext() + err = cursorRequest.Iter(ctx, + func(cursor *idb.CursorWithValue) error { + value, err := cursor.Value() + if err != nil { + return err + } + valueStr := utils.JsToJson(value) + results = append(results, valueStr) + jww.DEBUG.Printf("- %v", valueStr) + return nil + }) + cancel() + if err != nil { + return nil, errors.WithMessagef(parentErr, + "Unable to dump ObjectStore: %+v", err) + } + return results, nil +} diff --git a/indexedDbWorker/utils_test.go b/indexedDbWorker/utils_test.go new file mode 100644 index 0000000000000000000000000000000000000000..62851adb196907b392a85d82b5e333990cc26122 --- /dev/null +++ b/indexedDbWorker/utils_test.go @@ -0,0 +1,80 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +//go:build js && wasm + +package indexedDbWorker + +import ( + "github.com/hack-pad/go-indexeddb/idb" + "strings" + "syscall/js" + "testing" +) + +// Error path: Tests that Get returns an error when trying to get a message that +// does not exist. +func TestGet_NoMessageError(t *testing.T) { + db := newTestDB("messages", "index", t) + + _, err := Get(db, "messages", js.ValueOf(5)) + if err == nil || !strings.Contains(err.Error(), "undefined") { + t.Errorf("Did not get expected error when getting a message that "+ + "does not exist: %+v", err) + } +} + +// Error path: Tests that GetIndex returns an error when trying to get a message +// that does not exist. +func TestGetIndex_NoMessageError(t *testing.T) { + db := newTestDB("messages", "index", t) + + _, err := GetIndex(db, "messages", "index", js.ValueOf(5)) + if err == nil || !strings.Contains(err.Error(), "undefined") { + t.Errorf("Did not get expected error when getting a message that "+ + "does not exist: %+v", err) + } +} + +// newTestDB creates a new idb.Database for testing. +func newTestDB(name, index string, t *testing.T) *idb.Database { + // Attempt to open database object + ctx, cancel := NewContext() + defer cancel() + openRequest, err := idb.Global().Open(ctx, "databaseName", 0, + func(db *idb.Database, _ uint, _ uint) error { + storeOpts := idb.ObjectStoreOptions{ + KeyPath: js.ValueOf("id"), + AutoIncrement: true, + } + + // Build Message ObjectStore and Indexes + messageStore, err := db.CreateObjectStore(name, storeOpts) + if err != nil { + return err + } + + _, err = messageStore.CreateIndex( + index, js.ValueOf("id"), idb.IndexOptions{}) + if err != nil { + return err + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + + // Wait for database open to finish + db, err := openRequest.Await(ctx) + if err != nil { + t.Fatal(err) + } + + return db +}