diff --git a/indexedDb/dm/implementation.go b/indexedDb/dm/implementation.go index aedecb876c7e4b3a42cc6cb0897e5469027fcc01..77b5fbc73d5ec3bea746597694342b522bd382de 100644 --- a/indexedDb/dm/implementation.go +++ b/indexedDb/dm/implementation.go @@ -12,370 +12,235 @@ package channelEventModel import ( "crypto/ed25519" "encoding/json" - "strings" - "sync" - "syscall/js" "time" - "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/v4/cmix/rounds" "gitlab.com/elixxir/client/v4/dm" - "gitlab.com/elixxir/xxdk-wasm/indexedDb" - "gitlab.com/elixxir/xxdk-wasm/utils" - "gitlab.com/xx_network/primitives/id" - - "github.com/hack-pad/go-indexeddb/idb" - cryptoChannel "gitlab.com/elixxir/crypto/channel" "gitlab.com/elixxir/crypto/message" + "gitlab.com/elixxir/xxdk-wasm/indexedDb" ) // wasmModel implements dm.EventModel interface, which uses the channels system // passed an object that adheres to in order to get events on the channel. type wasmModel struct { - db *idb.Database - cipher cryptoChannel.Cipher - receivedMessageCB MessageReceivedCallback - updateMux sync.Mutex -} - -// joinConversation is used for joining new conversations. -func (w *wasmModel) joinConversation(nickname string, - pubKey ed25519.PublicKey, dmToken uint32, codeset uint8) error { - parentErr := errors.New("failed to joinConversation") - - // Build object - newConvo := Conversation{ - Pubkey: pubKey, - Nickname: nickname, - Token: dmToken, - CodesetVersion: codeset, - Blocked: false, - } - - // Convert to jsObject - newConvoJson, err := json.Marshal(&newConvo) - if err != nil { - return errors.WithMessagef(parentErr, - "Unable to marshal Conversation: %+v", err) - } - convoObj, err := utils.JsonToJS(newConvoJson) - if err != nil { - return errors.WithMessagef(parentErr, - "Unable to marshal Conversation: %+v", err) - } - - _, err = indexedDb.Put(w.db, conversationStoreName, convoObj) - if err != nil { - return errors.WithMessagef(parentErr, - "Unable to put Conversation: %+v", err) - } - return nil + wh *indexedDb.WorkerHandler } -// buildMessage is a private helper that converts typical dm.EventModel inputs -// into a basic Message structure for insertion into storage. -// -// NOTE: ID is not set inside this function because we want to use the -// autoincrement key by default. If you are trying to overwrite an existing -// message, then you need to set it manually yourself. -func buildMessage(messageID, parentID, text []byte, pubKey ed25519.PublicKey, - timestamp time.Time, round id.Round, mType dm.MessageType, - status dm.Status) *Message { - return &Message{ - MessageID: messageID, - ConversationPubKey: pubKey, - ParentMessageID: parentID, - Timestamp: timestamp, - Status: uint8(status), - Text: text, - Type: uint16(mType), - Round: uint64(round), - } +// TransferMessage is JSON marshalled and sent to the worker. +type TransferMessage struct { + UUID uint64 `json:"uuid"` + MessageID message.ID `json:"messageID"` + ReactionTo message.ID `json:"reactionTo"` + Nickname string `json:"nickname"` + Text []byte `json:"text"` + PubKey ed25519.PublicKey `json:"pubKey"` + DmToken uint32 `json:"dmToken"` + Codeset uint8 `json:"codeset"` + Timestamp time.Time `json:"timestamp"` + Round rounds.Round `json:"round"` + MType dm.MessageType `json:"mType"` + Status dm.Status `json:"status"` } func (w *wasmModel) Receive(messageID message.ID, nickname string, text []byte, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8, timestamp time.Time, round rounds.Round, mType dm.MessageType, status dm.Status) uint64 { - parentErr := errors.New("failed to Receive") - - // If there is no extant Conversation, create one. - _, err := indexedDb.Get( - w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) - if err != nil { - if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { - err = w.joinConversation(nickname, pubKey, dmToken, codeset) - if err != nil { - jww.ERROR.Printf("%+v", err) - } - } else { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to get Conversation: %+v", err)) - } + msg := TransferMessage{ + MessageID: messageID, + Nickname: nickname, + Text: text, + PubKey: pubKey, + DmToken: dmToken, + Codeset: codeset, + Timestamp: timestamp, + Round: round, + MType: mType, + Status: status, + } + + data, err := json.Marshal(msg) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal payload for TransferMessage: %+v", err) return 0 - } else { - jww.DEBUG.Printf("Conversation with %s already joined", nickname) } - // Handle encryption, if it is present - if w.cipher != nil { - text, err = w.cipher.Encrypt(text) + uuidChan := make(chan uint64) + w.wh.SendMessage(indexedDb.ReceiveTag, data, func(data []byte) { + var uuid uint64 + err = json.Unmarshal(data, &uuid) if err != nil { - jww.ERROR.Printf("Failed to encrypt Message: %+v", err) - return 0 + jww.ERROR.Printf( + "Could not JSON unmarshal response to Receive: %+v", err) + uuidChan <- 0 } - } + uuidChan <- uuid + }) - msgToInsert := buildMessage(messageID.Bytes(), nil, text, pubKey, timestamp, - round.ID, mType, status) - uuid, err := w.receiveHelper(msgToInsert, false) - if err != nil { - jww.ERROR.Printf("Failed to receive Message: %+v", err) + select { + case uuid := <-uuidChan: + return uuid + case <-time.After(indexedDb.ResponseTimeout): + jww.ERROR.Printf("Timed out after %s waiting for response from the "+ + "worker about Receive", indexedDb.ResponseTimeout) } - go w.receivedMessageCB(uuid, pubKey, false) - return uuid + return 0 } func (w *wasmModel) ReceiveText(messageID message.ID, nickname, text string, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8, timestamp time.Time, round rounds.Round, status dm.Status) uint64 { - parentErr := errors.New("failed to ReceiveText") - - // If there is no extant Conversation, create one. - _, err := indexedDb.Get( - w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) - if err != nil { - if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { - err = w.joinConversation(nickname, pubKey, dmToken, codeset) - if err != nil { - jww.ERROR.Printf("%+v", err) - } - } else { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to get Conversation: %+v", err)) - } + msg := TransferMessage{ + MessageID: messageID, + Nickname: nickname, + Text: []byte(text), + PubKey: pubKey, + DmToken: dmToken, + Codeset: codeset, + Timestamp: timestamp, + Round: round, + Status: status, + } + + data, err := json.Marshal(msg) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal payload for TransferMessage: %+v", err) return 0 - } else { - jww.DEBUG.Printf("Conversation with %s already joined", nickname) } - // Handle encryption, if it is present - textBytes := []byte(text) - if w.cipher != nil { - textBytes, err = w.cipher.Encrypt(textBytes) + uuidChan := make(chan uint64) + w.wh.SendMessage(indexedDb.ReceiveTextTag, data, func(data []byte) { + var uuid uint64 + err = json.Unmarshal(data, &uuid) if err != nil { - jww.ERROR.Printf("Failed to encrypt Message: %+v", err) - return 0 + jww.ERROR.Printf( + "Could not JSON unmarshal response to ReceiveText: %+v", err) + uuidChan <- 0 } - } + uuidChan <- uuid + }) - msgToInsert := buildMessage(messageID.Bytes(), nil, textBytes, - pubKey, timestamp, round.ID, dm.TextType, status) - - uuid, err := w.receiveHelper(msgToInsert, false) - if err != nil { - jww.ERROR.Printf("Failed to receive Message: %+v", err) + select { + case uuid := <-uuidChan: + return uuid + case <-time.After(indexedDb.ResponseTimeout): + jww.ERROR.Printf("Timed out after %s waiting for response from the "+ + "worker about ReceiveText", indexedDb.ResponseTimeout) } - go w.receivedMessageCB(uuid, pubKey, false) - return uuid + return 0 } func (w *wasmModel) ReceiveReply(messageID, reactionTo message.ID, nickname, text string, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8, timestamp time.Time, round rounds.Round, status dm.Status) uint64 { - parentErr := errors.New("failed to ReceiveReply") - - // If there is no extant Conversation, create one. - _, err := indexedDb.Get( - w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) - if err != nil { - if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { - err = w.joinConversation(nickname, pubKey, dmToken, codeset) - if err != nil { - jww.ERROR.Printf("%+v", err) - } - } else { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to get Conversation: %+v", err)) - } + msg := TransferMessage{ + MessageID: messageID, + ReactionTo: reactionTo, + Nickname: nickname, + Text: []byte(text), + PubKey: pubKey, + DmToken: dmToken, + Codeset: codeset, + Timestamp: timestamp, + Round: round, + Status: status, + } + + data, err := json.Marshal(msg) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal payload for TransferMessage: %+v", err) return 0 - } else { - jww.DEBUG.Printf("Conversation with %s already joined", nickname) } - // Handle encryption, if it is present - textBytes := []byte(text) - if w.cipher != nil { - textBytes, err = w.cipher.Encrypt(textBytes) + uuidChan := make(chan uint64) + w.wh.SendMessage(indexedDb.ReceiveReplyTag, data, func(data []byte) { + var uuid uint64 + err = json.Unmarshal(data, &uuid) if err != nil { - jww.ERROR.Printf("Failed to encrypt Message: %+v", err) - return 0 + jww.ERROR.Printf( + "Could not JSON unmarshal response to ReceiveReply: %+v", err) + uuidChan <- 0 } - } - - msgToInsert := buildMessage(messageID.Bytes(), reactionTo.Marshal(), textBytes, - pubKey, timestamp, round.ID, dm.TextType, status) + uuidChan <- uuid + }) - uuid, err := w.receiveHelper(msgToInsert, false) - if err != nil { - jww.ERROR.Printf("Failed to receive Message: %+v", err) + select { + case uuid := <-uuidChan: + return uuid + case <-time.After(indexedDb.ResponseTimeout): + jww.ERROR.Printf("Timed out after %s waiting for response from the "+ + "worker about ReceiveReply", indexedDb.ResponseTimeout) } - go w.receivedMessageCB(uuid, pubKey, false) - return uuid + return 0 } -func (w *wasmModel) ReceiveReaction(messageID, _ message.ID, nickname, +func (w *wasmModel) ReceiveReaction(messageID, reactionTo message.ID, nickname, reaction string, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8, timestamp time.Time, round rounds.Round, status dm.Status) uint64 { - parentErr := errors.New("failed to ReceiveText") - - // If there is no extant Conversation, create one. - _, err := indexedDb.Get( - w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) - if err != nil { - if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { - err = w.joinConversation(nickname, pubKey, dmToken, codeset) - if err != nil { - jww.ERROR.Printf("%+v", err) - } - } else { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to get Conversation: %+v", err)) - } + msg := TransferMessage{ + MessageID: messageID, + ReactionTo: reactionTo, + Nickname: nickname, + Text: []byte(reaction), + PubKey: pubKey, + DmToken: dmToken, + Codeset: codeset, + Timestamp: timestamp, + Round: round, + Status: status, + } + + data, err := json.Marshal(msg) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal payload for TransferMessage: %+v", err) return 0 - } else { - jww.DEBUG.Printf("Conversation with %s already joined", nickname) } - // Handle encryption, if it is present - textBytes := []byte(reaction) - if w.cipher != nil { - textBytes, err = w.cipher.Encrypt(textBytes) + uuidChan := make(chan uint64) + w.wh.SendMessage(indexedDb.ReceiveReactionTag, data, func(data []byte) { + var uuid uint64 + err = json.Unmarshal(data, &uuid) if err != nil { - jww.ERROR.Printf("Failed to encrypt Message: %+v", err) - return 0 + jww.ERROR.Printf( + "Could not JSON unmarshal response to ReceiveReaction: %+v", err) + uuidChan <- 0 } - } - - msgToInsert := buildMessage(messageID.Bytes(), nil, textBytes, - pubKey, timestamp, round.ID, dm.ReactionType, status) + uuidChan <- uuid + }) - uuid, err := w.receiveHelper(msgToInsert, false) - if err != nil { - jww.ERROR.Printf("Failed to receive Message: %+v", err) + select { + case uuid := <-uuidChan: + return uuid + case <-time.After(indexedDb.ResponseTimeout): + jww.ERROR.Printf("Timed out after %s waiting for response from the "+ + "worker about ReceiveReaction", indexedDb.ResponseTimeout) } - go w.receivedMessageCB(uuid, pubKey, false) - return uuid + return 0 } func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID, timestamp time.Time, round rounds.Round, status dm.Status) { - parentErr := errors.New("failed to UpdateSentStatus") - - // FIXME: this is a bit of race condition without the mux. - // This should be done via the transactions (i.e., make a - // special version of receiveHelper) - w.updateMux.Lock() - defer w.updateMux.Unlock() - - // Convert messageID to the key generated by json.Marshal - key := js.ValueOf(uuid) - - // Use the key to get the existing Message - currentMsg, err := indexedDb.Get(w.db, messageStoreName, key) - if err != nil { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Unable to get message: %+v", err)) - return - } - - // Extract the existing Message and update the Status - newMessage := &Message{} - err = json.Unmarshal([]byte(utils.JsToJson(currentMsg)), newMessage) - if err != nil { - jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, - "Could not JSON unmarshal message: %+v", err)) - return - } - - newMessage.Status = uint8(status) - if !messageID.Equals(message.ID{}) { - newMessage.MessageID = messageID.Bytes() - } - - if round.ID != 0 { - newMessage.Round = uint64(round.ID) - } - - if !timestamp.Equal(time.Time{}) { - newMessage.Timestamp = timestamp - } - - // Store the updated Message - _, err = w.receiveHelper(newMessage, true) - if err != nil { - jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error())) - } - go w.receivedMessageCB(uuid, newMessage.ConversationPubKey, true) -} - -// receiveHelper is a private helper for receiving any sort of message. -func (w *wasmModel) receiveHelper( - newMessage *Message, isUpdate bool) (uint64, error) { - // Convert to jsObject - newMessageJson, err := json.Marshal(newMessage) - if err != nil { - return 0, errors.Errorf("Unable to marshal Message: %+v", err) - } - messageObj, err := utils.JsonToJS(newMessageJson) - if err != nil { - return 0, errors.Errorf("Unable to marshal Message: %+v", err) - } - - // Unset the primaryKey for inserts so that it can be auto-populated and - // incremented - if !isUpdate { - messageObj.Delete("id") + msg := TransferMessage{ + UUID: uuid, + MessageID: messageID, + Timestamp: timestamp, + Round: round, + Status: status, } - // Store message to database - result, err := indexedDb.Put(w.db, messageStoreName, messageObj) + data, err := json.Marshal(msg) if err != nil { - return 0, errors.Errorf("Unable to put Message: %+v", err) + jww.ERROR.Printf( + "Could not JSON marshal payload for TransferMessage: %+v", err) } - // NOTE: Sometimes the insert fails to return an error but hits a duplicate - // insert, so this fallthrough returns the UUID entry in that case. - if result.IsUndefined() { - msgID := message.ID{} - copy(msgID[:], newMessage.MessageID) - uuid, errLookup := w.msgIDLookup(msgID) - if uuid != 0 && errLookup == nil { - return uuid, nil - } - return 0, errors.Errorf("uuid lookup failure: %+v", err) - } - uuid := uint64(result.Int()) - jww.DEBUG.Printf("Successfully stored message %d", uuid) - - return uuid, nil -} - -// msgIDLookup gets the UUID of the Message with the given messageID. -func (w *wasmModel) msgIDLookup(messageID message.ID) (uint64, error) { - resultObj, err := indexedDb.GetIndex(w.db, messageStoreName, - messageStoreMessageIndex, utils.CopyBytesToJS(messageID.Marshal())) - if err != nil { - return 0, err - } - - uuid := uint64(0) - if !resultObj.IsUndefined() { - uuid = uint64(resultObj.Get("id").Int()) - } - return uuid, nil + w.wh.SendMessage(indexedDb.UpdateSentStatusTag, data, nil) } diff --git a/indexedDb/dm/init.go b/indexedDb/dm/init.go index 29b80911804004539beb210dc85533afc7c98f31..827d6a9e8a920187756a2d82d77eb880f9529e4d 100644 --- a/indexedDb/dm/init.go +++ b/indexedDb/dm/init.go @@ -11,9 +11,9 @@ package channelEventModel import ( "crypto/ed25519" - "syscall/js" + "encoding/json" + "time" - "github.com/hack-pad/go-indexeddb/idb" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/v4/dm" @@ -22,15 +22,9 @@ import ( "gitlab.com/elixxir/xxdk-wasm/storage" ) -const ( - // databaseSuffix is the suffix to be appended to the name of - // the database. - databaseSuffix = "_speakeasy_dm" - - // currentVersion is the current version of the IndexDb - // runtime. Used for migration purposes. - currentVersion uint = 1 -) +// WorkerJavascriptFileURL is the URL of the script the worker will execute to +// launch the worker WASM binary. It must obey the same-origin policy. +const WorkerJavascriptFileURL = "/integrations/assets/indexedDbWorker.js" // MessageReceivedCallback is called any time a message is received or updated. // @@ -38,146 +32,134 @@ const ( type MessageReceivedCallback func( uuid uint64, pubKey ed25519.PublicKey, update bool) +// NewWASMEventModelMessage is JSON marshalled and sent to the worker for +// [NewWASMEventModel]. +type NewWASMEventModelMessage struct { + Path string `json:"path"` + EncryptionJSON string `json:"encryptionJSON"` +} + // NewWASMEventModel returns a [channels.EventModel] backed by a wasmModel. // The name should be a base64 encoding of the users public key. func NewWASMEventModel(path string, encryption cryptoChannel.Cipher, cb MessageReceivedCallback) (dm.EventModel, error) { - databaseName := path + databaseSuffix - return newWASMModel(databaseName, encryption, cb) -} -// newWASMModel creates the given [idb.Database] and returns a wasmModel. -func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, - cb MessageReceivedCallback) (*wasmModel, error) { - // Attempt to open database object - ctx, cancel := indexedDb.NewContext() - defer cancel() - openRequest, err := idb.Global().Open(ctx, databaseName, currentVersion, - func(db *idb.Database, oldVersion, newVersion uint) error { - if oldVersion == newVersion { - jww.INFO.Printf("IndexDb version is current: v%d", - newVersion) - return nil - } - - jww.INFO.Printf("IndexDb upgrade required: v%d -> v%d", - oldVersion, newVersion) - - if oldVersion == 0 && newVersion >= 1 { - err := v1Upgrade(db) - if err != nil { - return err - } - oldVersion = 1 - } - - // if oldVersion == 1 && newVersion >= 2 { v2Upgrade(), oldVersion = 2 } - return nil - }) + // TODO: bring in URL and name from caller + wh, err := indexedDb.NewWorkerHandler( + WorkerJavascriptFileURL, "indexedDbWorker") if err != nil { return nil, err } - // Wait for database open to finish - db, err := openRequest.Await(ctx) - if err != nil { - return nil, err - } + // Register handler to manage messages for the MessageReceivedCallback + wh.RegisterHandler(indexedDb.GetMessageTag, indexedDb.InitID, false, + messageReceivedCallbackHandler(cb)) + + // Register handler to manage checking encryption status from local storage + wh.RegisterHandler(indexedDb.EncryptionStatusTag, indexedDb.InitID, false, + checkDbEncryptionStatusHandler(wh)) - // Save the encryption status to storage - encryptionStatus := encryption != nil - loadedEncryptionStatus, err := storage.StoreIndexedDbEncryptionStatus( - databaseName, encryptionStatus) + encryptionJSON, err := json.Marshal(encryption) if err != nil { return nil, err } - // Verify encryption status does not change - if encryptionStatus != loadedEncryptionStatus { - return nil, errors.New( - "Cannot load database with different encryption status.") - } else if !encryptionStatus { - jww.WARN.Printf("IndexedDb encryption disabled!") + message := NewWASMEventModelMessage{ + Path: path, + EncryptionJSON: string(encryptionJSON), } - // Attempt to ensure the database has been properly initialized - openRequest, err = idb.Global().Open(ctx, databaseName, currentVersion, - func(db *idb.Database, oldVersion, newVersion uint) error { - return nil - }) + payload, err := json.Marshal(message) if err != nil { return nil, err } - // Wait for database open to finish - db, err = openRequest.Await(ctx) - if err != nil { - return nil, err + + errChan := make(chan string) + wh.SendMessage(indexedDb.NewWASMEventModelTag, payload, func(data []byte) { + errChan <- string(data) + }) + + select { + case workerErr := <-errChan: + if workerErr != "" { + return nil, errors.New(workerErr) + } + case <-time.After(indexedDb.ResponseTimeout): + return nil, errors.Errorf("timed out after %s waiting for indexedDB "+ + "database in worker to intialize", indexedDb.ResponseTimeout) } - wrapper := &wasmModel{db: db, receivedMessageCB: cb, cipher: encryption} - return wrapper, nil + return &wasmModel{wh}, nil } -// v1Upgrade performs the v0 -> v1 database upgrade. -// -// This can never be changed without permanently breaking backwards -// compatibility. -func v1Upgrade(db *idb.Database) error { - indexOpts := idb.IndexOptions{ - Unique: false, - MultiEntry: false, - } +// MessageReceivedCallbackMessage is JSON marshalled and received from the +// worker for the [MessageReceivedCallback] callback. +type MessageReceivedCallbackMessage struct { + UUID uint64 `json:"uuid"` + PubKey ed25519.PublicKey `json:"pubKey"` + Update bool `json:"update"` +} - // Build Message ObjectStore and Indexes - messageStoreOpts := idb.ObjectStoreOptions{ - KeyPath: js.ValueOf(msgPkeyName), - AutoIncrement: true, - } - messageStore, err := db.CreateObjectStore(messageStoreName, messageStoreOpts) - if err != nil { - return err - } - _, err = messageStore.CreateIndex(messageStoreMessageIndex, - js.ValueOf(messageStoreMessage), - idb.IndexOptions{ - Unique: true, - MultiEntry: false, - }) - if err != nil { - return err - } - _, err = messageStore.CreateIndex(messageStoreConversationIndex, - js.ValueOf(messageStoreConversation), indexOpts) - if err != nil { - return err - } - _, err = messageStore.CreateIndex(messageStoreParentIndex, - js.ValueOf(messageStoreParent), indexOpts) - if err != nil { - return err - } - _, err = messageStore.CreateIndex(messageStoreTimestampIndex, - js.ValueOf(messageStoreTimestamp), indexOpts) - if err != nil { - return err +// messageReceivedCallbackHandler returns a handler to manage messages for the +// MessageReceivedCallback. +func messageReceivedCallbackHandler(cb MessageReceivedCallback) func(data []byte) { + return func(data []byte) { + var msg MessageReceivedCallbackMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Failed to JSON unmarshal "+ + "MessageReceivedCallback message from worker: %+v", err) + return + } + cb(msg.UUID, msg.PubKey, msg.Update) } +} - // Build Channel ObjectStore - conversationStoreOpts := idb.ObjectStoreOptions{ - KeyPath: js.ValueOf(convoPkeyName), - AutoIncrement: false, - } - _, err = db.CreateObjectStore(conversationStoreName, conversationStoreOpts) - if err != nil { - return err - } +// EncryptionStatusMessage is JSON marshalled and received from the worker when +// the database checks if it is encrypted. +type EncryptionStatusMessage struct { + DatabaseName string `json:"databaseName"` + EncryptionStatus bool `json:"encryptionStatus"` +} - // Get the database name and save it to storage - if databaseName, err := db.Name(); err != nil { - return err - } else if err = storage.StoreIndexedDb(databaseName); err != nil { - return err - } +// EncryptionStatusReply is JSON marshalled and sent to the worker is response +// to the [EncryptionStatusMessage]. +type EncryptionStatusReply struct { + EncryptionStatus bool `json:"encryptionStatus"` + Error string `json:"error"` +} - return nil +// checkDbEncryptionStatusHandler returns a handler to manage checking +// encryption status from local storage. +func checkDbEncryptionStatusHandler(wh *indexedDb.WorkerHandler) func(data []byte) { + return func(data []byte) { + // Unmarshal received message + var msg EncryptionStatusMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Failed to JSON unmarshal "+ + "EncryptionStatusMessage message from worker: %+v", err) + return + } + + // Pass message values to storage + loadedEncryptionStatus, err := storage.StoreIndexedDbEncryptionStatus( + msg.DatabaseName, msg.EncryptionStatus) + var reply EncryptionStatusReply + if err != nil { + reply.Error = err.Error() + } else { + reply.EncryptionStatus = loadedEncryptionStatus + } + + // Return response + statusData, err := json.Marshal(reply) + if err != nil { + jww.ERROR.Printf( + "Failed to JSON marshal EncryptionStatusReply: %+v", err) + return + } + + wh.SendMessage(indexedDb.EncryptionStatusTag, statusData, nil) + } } diff --git a/indexedDb/dm/model.go b/indexedDb/dm/model.go deleted file mode 100644 index bb6f34588aa2976c1689b629c6df31219de6b585..0000000000000000000000000000000000000000 --- a/indexedDb/dm/model.go +++ /dev/null @@ -1,63 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2022 xx foundation // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file. // -//////////////////////////////////////////////////////////////////////////////// - -//go:build js && wasm - -package channelEventModel - -import ( - "time" -) - -const ( - // Text representation of primary key value (keyPath). - msgPkeyName = "id" - convoPkeyName = "pub_key" - - // Text representation of the names of the various [idb.ObjectStore]. - messageStoreName = "messages" - conversationStoreName = "conversations" - - // Message index names. - messageStoreMessageIndex = "message_id_index" - messageStoreConversationIndex = "conversation_id_index" - messageStoreParentIndex = "parent_message_id_index" - messageStoreTimestampIndex = "timestamp_index" - - // Message keyPath names (must match json struct tags). - messageStoreMessage = "message_id" - messageStoreConversation = "conversation_id" - messageStoreParent = "parent_message_id" - messageStoreTimestamp = "timestamp" -) - -// Message defines the IndexedDb representation of a single Message. -// -// A Message belongs to one Conversation. -// A Message may belong to one Message (Parent). -type Message struct { - ID uint64 `json:"id"` // Matches msgPkeyName - MessageID []byte `json:"message_id"` // Index - ConversationPubKey []byte `json:"conversation_pub_key"` // Index - ParentMessageID []byte `json:"parent_message_id"` // Index - Timestamp time.Time `json:"timestamp"` // Index - Status uint8 `json:"status"` - Text []byte `json:"text"` - Type uint16 `json:"type"` - Round uint64 `json:"round"` -} - -// Conversation defines the IndexedDb representation of a single -// message exchange between two recipients. -// A Conversation has many Message. -type Conversation struct { - Pubkey []byte `json:"pub_key"` // Matches convoPkeyName - Nickname string `json:"nickname"` - Token uint32 `json:"token"` - CodesetVersion uint8 `json:"codeset_version"` - Blocked bool `json:"blocked"` -} diff --git a/indexedDb/tag.go b/indexedDb/tag.go index 205da10c49542681c8793bc0acc49e29a9054676..0fbd54c44eb090e0c0209dbc1b282fc988c6528d 100644 --- a/indexedDb/tag.go +++ b/indexedDb/tag.go @@ -10,6 +10,9 @@ package indexedDb // Tag describes how a message sent to or from the worker should be handled. type Tag string +// FIXME: This system of tagging does not work so well when using the +// WorkerHandler for more than one database. This should bed fixed. + // List of tags that can be used when sending a message or registering a handler // to receive a message. const ( @@ -28,6 +31,10 @@ const ( UpdateFromMessageIDTag Tag = "UpdateFromMessageID" GetMessageTag Tag = "GetMessage" DeleteMessageTag Tag = "DeleteMessage" + + ReceiveTag Tag = "Receive" + ReceiveTextTag Tag = "ReceiveText" + UpdateSentStatusTag Tag = "UpdateSentStatusTag" ) // deleteAfterReceiving is a list of Tags that will have their handler deleted diff --git a/indexedDb/utils.go b/indexedDb/utils.go deleted file mode 100644 index 664a1cc663ddef38016359067b8dd2e527da24aa..0000000000000000000000000000000000000000 --- a/indexedDb/utils.go +++ /dev/null @@ -1,262 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2022 xx foundation // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file. // -//////////////////////////////////////////////////////////////////////////////// - -//go:build js && wasm - -// This file contains several generic IndexedDB helper functions that -// may be useful for any IndexedDB implementations. - -package indexedDb - -import ( - "context" - "github.com/hack-pad/go-indexeddb/idb" - "github.com/pkg/errors" - jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/xxdk-wasm/utils" - "syscall/js" - "time" -) - -const ( - // dbTimeout is the global timeout for operations with the storage - // [context.Context]. - dbTimeout = time.Second - - // ErrDoesNotExist is an error string for got undefined on Get operations. - ErrDoesNotExist = "result is undefined" -) - -// NewContext builds a context for indexedDb operations. -func NewContext() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), dbTimeout) -} - -// Get is a generic helper for getting values from the given [idb.ObjectStore]. -// Only usable by primary key. -func Get(db *idb.Database, objectStoreName string, key js.Value) (js.Value, error) { - parentErr := errors.Errorf("failed to Get %s/%s", objectStoreName, key) - - // Prepare the Transaction - txn, err := db.Transaction(idb.TransactionReadOnly, objectStoreName) - if err != nil { - return js.Undefined(), errors.WithMessagef(parentErr, - "Unable to create Transaction: %+v", err) - } - store, err := txn.ObjectStore(objectStoreName) - if err != nil { - return js.Undefined(), errors.WithMessagef(parentErr, - "Unable to get ObjectStore: %+v", err) - } - - // Perform the operation - getRequest, err := store.Get(key) - if err != nil { - return js.Undefined(), errors.WithMessagef(parentErr, - "Unable to Get from ObjectStore: %+v", err) - } - - // Wait for the operation to return - ctx, cancel := NewContext() - resultObj, err := getRequest.Await(ctx) - cancel() - if err != nil { - return js.Undefined(), errors.WithMessagef(parentErr, - "Unable to get from ObjectStore: %+v", err) - } else if resultObj.IsUndefined() { - return js.Undefined(), errors.WithMessagef(parentErr, - "Unable to get from ObjectStore: %s", ErrDoesNotExist) - } - - // Process result into string - jww.DEBUG.Printf("Got from %s/%s: %s", - objectStoreName, key, utils.JsToJson(resultObj)) - return resultObj, nil -} - -// GetIndex is a generic helper for getting values from the given -// [idb.ObjectStore] using the given [idb.Index]. -func GetIndex(db *idb.Database, objectStoreName, - indexName string, key js.Value) (js.Value, error) { - parentErr := errors.Errorf("failed to GetIndex %s/%s/%s", - objectStoreName, indexName, key) - - // Prepare the Transaction - txn, err := db.Transaction(idb.TransactionReadOnly, objectStoreName) - if err != nil { - return js.Undefined(), errors.WithMessagef(parentErr, - "Unable to create Transaction: %+v", err) - } - store, err := txn.ObjectStore(objectStoreName) - if err != nil { - return js.Undefined(), errors.WithMessagef(parentErr, - "Unable to get ObjectStore: %+v", err) - } - idx, err := store.Index(indexName) - if err != nil { - return js.Undefined(), errors.WithMessagef(parentErr, - "Unable to get Index: %+v", err) - } - - // Perform the operation - getRequest, err := idx.Get(key) - if err != nil { - return js.Undefined(), errors.WithMessagef(parentErr, - "Unable to Get from ObjectStore: %+v", err) - } - - // Wait for the operation to return - ctx, cancel := NewContext() - resultObj, err := getRequest.Await(ctx) - cancel() - if err != nil { - return js.Undefined(), errors.WithMessagef(parentErr, - "Unable to get from ObjectStore: %+v", err) - } else if resultObj.IsUndefined() { - return js.Undefined(), errors.WithMessagef(parentErr, - "Unable to get from ObjectStore: %s", ErrDoesNotExist) - } - - // Process result into string - jww.DEBUG.Printf("Got from %s/%s/%s: %s", - objectStoreName, indexName, key, utils.JsToJson(resultObj)) - return resultObj, nil -} - -// Put is a generic helper for putting values into the given [idb.ObjectStore]. -// Equivalent to insert if not exists else update. -func Put(db *idb.Database, objectStoreName string, value js.Value) (js.Value, error) { - // Prepare the Transaction - txn, err := db.Transaction(idb.TransactionReadWrite, objectStoreName) - if err != nil { - return js.Undefined(), errors.Errorf("Unable to create Transaction: %+v", err) - } - store, err := txn.ObjectStore(objectStoreName) - if err != nil { - return js.Undefined(), errors.Errorf("Unable to get ObjectStore: %+v", err) - } - - // Perform the operation - request, err := store.Put(value) - if err != nil { - return js.Undefined(), errors.Errorf("Unable to Put: %+v", err) - } - - // Wait for the operation to return - ctx, cancel := NewContext() - result, err := request.Await(ctx) - cancel() - if err != nil { - return js.Undefined(), errors.Errorf("Putting value failed: %+v", err) - } - jww.DEBUG.Printf("Successfully put value in %s: %s", - objectStoreName, utils.JsToJson(value)) - return result, nil -} - -// Delete is a generic helper for removing values from the given -// [idb.ObjectStore]. Only usable by primary key. -func Delete(db *idb.Database, objectStoreName string, key js.Value) error { - parentErr := errors.Errorf("failed to Delete %s/%s", objectStoreName, key) - - // Prepare the Transaction - txn, err := db.Transaction(idb.TransactionReadWrite, objectStoreName) - if err != nil { - return errors.WithMessagef(parentErr, - "Unable to create Transaction: %+v", err) - } - store, err := txn.ObjectStore(objectStoreName) - if err != nil { - return errors.WithMessagef(parentErr, - "Unable to get ObjectStore: %+v", err) - } - - // Perform the operation - _, err = store.Delete(key) - if err != nil { - return errors.WithMessagef(parentErr, - "Unable to Delete from ObjectStore: %+v", err) - } - - // Wait for the operation to return - ctx, cancel := NewContext() - err = txn.Await(ctx) - cancel() - if err != nil { - return errors.WithMessagef(parentErr, - "Unable to Delete from ObjectStore: %+v", err) - } - jww.DEBUG.Printf("Successfully deleted value at %s/%s", - objectStoreName, utils.JsToJson(key)) - return nil -} - -// DeleteIndex is a generic helper for removing values from the -// given [idb.ObjectStore] using the given [idb.Index]. Requires passing -// in the name of the primary key for the store. -func DeleteIndex(db *idb.Database, objectStoreName, - indexName, pkeyName string, key js.Value) error { - parentErr := errors.Errorf("failed to DeleteIndex %s/%s", objectStoreName, key) - - value, err := GetIndex(db, objectStoreName, indexName, key) - if err != nil { - return errors.WithMessagef(parentErr, "%+v", err) - } - - err = Delete(db, objectStoreName, value.Get(pkeyName)) - if err != nil { - return errors.WithMessagef(parentErr, "%+v", err) - } - - jww.DEBUG.Printf("Successfully deleted value at %s/%s/%s", - objectStoreName, indexName, utils.JsToJson(key)) - return nil -} - -// Dump returns the given [idb.ObjectStore] contents to string slice for -// testing and debugging purposes. -func Dump(db *idb.Database, objectStoreName string) ([]string, error) { - parentErr := errors.Errorf("failed to Dump %s", objectStoreName) - - txn, err := db.Transaction(idb.TransactionReadOnly, objectStoreName) - if err != nil { - return nil, errors.WithMessagef(parentErr, - "Unable to create Transaction: %+v", err) - } - store, err := txn.ObjectStore(objectStoreName) - if err != nil { - return nil, errors.WithMessagef(parentErr, - "Unable to get ObjectStore: %+v", err) - } - cursorRequest, err := store.OpenCursor(idb.CursorNext) - if err != nil { - return nil, errors.WithMessagef(parentErr, - "Unable to open Cursor: %+v", err) - } - - // Run the query - jww.DEBUG.Printf("%s values:", objectStoreName) - results := make([]string, 0) - ctx, cancel := NewContext() - err = cursorRequest.Iter(ctx, - func(cursor *idb.CursorWithValue) error { - value, err := cursor.Value() - if err != nil { - return err - } - valueStr := utils.JsToJson(value) - results = append(results, valueStr) - jww.DEBUG.Printf("- %v", valueStr) - return nil - }) - cancel() - if err != nil { - return nil, errors.WithMessagef(parentErr, - "Unable to dump ObjectStore: %+v", err) - } - return results, nil -} diff --git a/indexedDb/utils_test.go b/indexedDb/utils_test.go deleted file mode 100644 index 1c657ebd5dbb4607c977323dc8883042989f05f9..0000000000000000000000000000000000000000 --- a/indexedDb/utils_test.go +++ /dev/null @@ -1,80 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2022 xx foundation // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file. // -//////////////////////////////////////////////////////////////////////////////// - -//go:build js && wasm - -package indexedDb - -import ( - "github.com/hack-pad/go-indexeddb/idb" - "strings" - "syscall/js" - "testing" -) - -// Error path: Tests that Get returns an error when trying to get a message that -// does not exist. -func TestGet_NoMessageError(t *testing.T) { - db := newTestDB("messages", "index", t) - - _, err := Get(db, "messages", js.ValueOf(5)) - if err == nil || !strings.Contains(err.Error(), "undefined") { - t.Errorf("Did not get expected error when getting a message that "+ - "does not exist: %+v", err) - } -} - -// Error path: Tests that GetIndex returns an error when trying to get a message -// that does not exist. -func TestGetIndex_NoMessageError(t *testing.T) { - db := newTestDB("messages", "index", t) - - _, err := GetIndex(db, "messages", "index", js.ValueOf(5)) - if err == nil || !strings.Contains(err.Error(), "undefined") { - t.Errorf("Did not get expected error when getting a message that "+ - "does not exist: %+v", err) - } -} - -// newTestDB creates a new idb.Database for testing. -func newTestDB(name, index string, t *testing.T) *idb.Database { - // Attempt to open database object - ctx, cancel := NewContext() - defer cancel() - openRequest, err := idb.Global().Open(ctx, "databaseName", 0, - func(db *idb.Database, _ uint, _ uint) error { - storeOpts := idb.ObjectStoreOptions{ - KeyPath: js.ValueOf("id"), - AutoIncrement: true, - } - - // Build Message ObjectStore and Indexes - messageStore, err := db.CreateObjectStore(name, storeOpts) - if err != nil { - return err - } - - _, err = messageStore.CreateIndex( - index, js.ValueOf("id"), idb.IndexOptions{}) - if err != nil { - return err - } - - return nil - }) - if err != nil { - t.Fatal(err) - } - - // Wait for database open to finish - db, err := openRequest.Await(ctx) - if err != nil { - t.Fatal(err) - } - - return db -} diff --git a/indexedDbWorker/channels/implementation.go b/indexedDbWorker/channels/implementation.go index 8fd7fba2a57fa7c1ba253f813e38e951cab3e888..96e4351964a024155041a92892d2b6328c932c4e 100644 --- a/indexedDbWorker/channels/implementation.go +++ b/indexedDbWorker/channels/implementation.go @@ -13,13 +13,12 @@ import ( "crypto/ed25519" "encoding/base64" "encoding/json" + "gitlab.com/elixxir/xxdk-wasm/indexedDbWorker" "strings" "sync" "syscall/js" "time" - "gitlab.com/elixxir/xxdk-wasm/indexedDb" - "github.com/hack-pad/go-indexeddb/idb" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" @@ -68,7 +67,7 @@ func (w *wasmModel) JoinChannel(channel *cryptoBroadcast.Channel) { return } - _, err = indexedDb.Put(w.db, channelsStoreName, channelObj) + _, err = indexedDbWorker.Put(w.db, channelsStoreName, channelObj) if err != nil { jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, "Unable to put Channel: %+v", err)) @@ -80,7 +79,7 @@ func (w *wasmModel) LeaveChannel(channelID *id.ID) { parentErr := errors.New("failed to LeaveChannel") // Delete the channel from storage - err := indexedDb.Delete(w.db, channelsStoreName, + err := indexedDbWorker.Delete(w.db, channelsStoreName, js.ValueOf(channelID.String())) if err != nil { jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, @@ -127,7 +126,7 @@ func (w *wasmModel) deleteMsgByChannel(channelID *id.ID) error { if err != nil { return errors.WithMessagef(parentErr, "Unable to open Cursor: %+v", err) } - ctx, cancel := indexedDb.NewContext() + ctx, cancel := indexedDbWorker.NewContext() err = cursorRequest.Iter(ctx, func(cursor *idb.CursorWithValue) error { _, err := cursor.Delete() @@ -267,7 +266,7 @@ func (w *wasmModel) UpdateFromUUID(uuid uint64, messageID *message.ID, key := js.ValueOf(uuid) // Use the key to get the existing Message - currentMsg, err := indexedDb.Get(w.db, messageStoreName, key) + currentMsg, err := indexedDbWorker.Get(w.db, messageStoreName, key) if err != nil { jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, "Unable to get message: %+v", err)) @@ -303,7 +302,7 @@ func (w *wasmModel) UpdateFromMessageID(messageID message.ID, defer w.updateMux.Unlock() msgIDStr := base64.StdEncoding.EncodeToString(messageID.Marshal()) - currentMsgObj, err := indexedDb.GetIndex(w.db, messageStoreName, + currentMsgObj, err := indexedDbWorker.GetIndex(w.db, messageStoreName, messageStoreMessageIndex, js.ValueOf(msgIDStr)) if err != nil { jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, @@ -418,7 +417,7 @@ func (w *wasmModel) receiveHelper( } // Store message to database - result, err := indexedDb.Put(w.db, messageStoreName, messageObj) + result, err := indexedDbWorker.Put(w.db, messageStoreName, messageObj) if err != nil && !strings.Contains(err.Error(), "at least one key does not satisfy the uniqueness requirements") { // Only return non-unique constraint errors so that the case @@ -489,14 +488,14 @@ func (w *wasmModel) GetMessage( // DeleteMessage removes a message with the given messageID from storage. func (w *wasmModel) DeleteMessage(messageID message.ID) error { msgId := js.ValueOf(base64.StdEncoding.EncodeToString(messageID.Bytes())) - return indexedDb.DeleteIndex(w.db, messageStoreName, + return indexedDbWorker.DeleteIndex(w.db, messageStoreName, messageStoreMessageIndex, pkeyName, msgId) } // msgIDLookup gets the UUID of the Message with the given messageID. func (w *wasmModel) msgIDLookup(messageID message.ID) (*Message, error) { msgIDStr := js.ValueOf(base64.StdEncoding.EncodeToString(messageID.Bytes())) - resultObj, err := indexedDb.GetIndex(w.db, messageStoreName, + resultObj, err := indexedDbWorker.GetIndex(w.db, messageStoreName, messageStoreMessageIndex, msgIDStr) if err != nil { return nil, err diff --git a/indexedDbWorker/channels/implementation_test.go b/indexedDbWorker/channels/implementation_test.go index 0d035960ad9da8d5ac8baed2aadfe0fa54dea0e6..1a78e4702848e71d5b50b132dfacf018836360f9 100644 --- a/indexedDbWorker/channels/implementation_test.go +++ b/indexedDbWorker/channels/implementation_test.go @@ -14,7 +14,7 @@ import ( "fmt" "github.com/hack-pad/go-indexeddb/idb" "gitlab.com/elixxir/crypto/message" - "gitlab.com/elixxir/xxdk-wasm/indexedDb" + "gitlab.com/elixxir/xxdk-wasm/indexedDbWorker" "gitlab.com/elixxir/xxdk-wasm/storage" "gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/primitives/netTime" @@ -107,7 +107,7 @@ func TestWasmModel_DeleteMessage(t *testing.T) { } // Check the resulting status - results, err := indexedDb.Dump(eventModel.db, messageStoreName) + results, err := indexedDbWorker.Dump(eventModel.db, messageStoreName) if err != nil { t.Fatalf("%+v", err) } @@ -122,7 +122,7 @@ func TestWasmModel_DeleteMessage(t *testing.T) { } // Check the resulting status - results, err = indexedDb.Dump(eventModel.db, messageStoreName) + results, err = indexedDbWorker.Dump(eventModel.db, messageStoreName) if err != nil { t.Fatalf("%+v", err) } @@ -149,7 +149,7 @@ func Test_wasmModel_UpdateSentStatus(t *testing.T) { testMsgId := message.DeriveChannelMessageID( &id.ID{1}, 0, []byte(testString)) eventModel, err := newWASMModel( - testString, c, dummyCallback,dummyStoreEncryptionStatus) + testString, c, dummyCallback, dummyStoreEncryptionStatus) if err != nil { t.Fatalf("%+v", err) } @@ -164,7 +164,7 @@ func Test_wasmModel_UpdateSentStatus(t *testing.T) { } // Ensure one message is stored - results, err := indexedDb.Dump(eventModel.db, messageStoreName) + results, err := indexedDbWorker.Dump(eventModel.db, messageStoreName) if err != nil { t.Fatalf("%+v", err) } @@ -178,7 +178,7 @@ func Test_wasmModel_UpdateSentStatus(t *testing.T) { uuid, nil, nil, nil, nil, nil, &expectedStatus) // Check the resulting status - results, err = indexedDb.Dump(eventModel.db, messageStoreName) + results, err = indexedDbWorker.Dump(eventModel.db, messageStoreName) if err != nil { t.Fatalf("%+v", err) } @@ -236,7 +236,7 @@ func Test_wasmModel_JoinChannel_LeaveChannel(t *testing.T) { } eventModel.JoinChannel(testChannel) eventModel.JoinChannel(testChannel2) - results, err := indexedDb.Dump(eventModel.db, channelsStoreName) + results, err := indexedDbWorker.Dump(eventModel.db, channelsStoreName) if err != nil { t.Fatalf("%+v", err) } @@ -244,7 +244,7 @@ func Test_wasmModel_JoinChannel_LeaveChannel(t *testing.T) { t.Fatalf("Expected 2 channels to exist") } eventModel.LeaveChannel(testChannel.ReceptionID) - results, err = indexedDb.Dump(eventModel.db, channelsStoreName) + results, err = indexedDbWorker.Dump(eventModel.db, channelsStoreName) if err != nil { t.Fatalf("%+v", err) } @@ -396,7 +396,7 @@ func Test_wasmModel_deleteMsgByChannel(t *testing.T) { } // Check pre-results - result, err := indexedDb.Dump(eventModel.db, messageStoreName) + result, err := indexedDbWorker.Dump(eventModel.db, messageStoreName) if err != nil { t.Fatalf("%+v", err) } @@ -411,7 +411,7 @@ func Test_wasmModel_deleteMsgByChannel(t *testing.T) { } // Check final results - result, err = indexedDb.Dump(eventModel.db, messageStoreName) + result, err = indexedDbWorker.Dump(eventModel.db, messageStoreName) if err != nil { t.Fatalf("%+v", err) } diff --git a/indexedDbWorker/channels/init.go b/indexedDbWorker/channels/init.go index b6d350e48a0643063523b561388e23e0b7e3ff98..a1eba82bae80dd8ad035669bd86def850b8dfb88 100644 --- a/indexedDbWorker/channels/init.go +++ b/indexedDbWorker/channels/init.go @@ -16,6 +16,7 @@ import ( "gitlab.com/elixxir/client/v4/channels" cryptoChannel "gitlab.com/elixxir/crypto/channel" "gitlab.com/elixxir/xxdk-wasm/indexedDb" + "gitlab.com/elixxir/xxdk-wasm/indexedDbWorker" "gitlab.com/xx_network/primitives/id" "syscall/js" "time" @@ -55,7 +56,7 @@ func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, cb MessageReceivedCallback, storeEncryptionStatus storeEncryptionStatusFn) ( *wasmModel, error) { // Attempt to open database object - ctx, cancel := indexedDb.NewContext() + ctx, cancel := indexedDbWorker.NewContext() defer cancel() openRequest, err := idb.Global().Open(ctx, databaseName, currentVersion, func(db *idb.Database, oldVersion, newVersion uint) error { @@ -91,10 +92,10 @@ func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, // Save the encryption status to storage encryptionStatus := encryption != nil - loadedEncryptionStatus, err2 := + loadedEncryptionStatus, err := storeEncryptionStatus(databaseName, encryptionStatus) - if err2 != nil { - return nil, err2 + if err != nil { + return nil, err } // Verify encryption status does not change @@ -203,7 +204,7 @@ func (w *wasmModel) hackTestDb() error { if helper != nil { return helper } - result, err := indexedDb.Get(w.db, messageStoreName, js.ValueOf(msgId)) + result, err := indexedDbWorker.Get(w.db, messageStoreName, js.ValueOf(msgId)) if err != nil { return err } diff --git a/indexedDbWorker/dm/handlers.go b/indexedDbWorker/dm/handlers.go new file mode 100644 index 0000000000000000000000000000000000000000..7ecebd59d91131493d3ac7f4242b73b71acaed74 --- /dev/null +++ b/indexedDbWorker/dm/handlers.go @@ -0,0 +1,251 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package main + +import ( + "crypto/ed25519" + "encoding/json" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/dm" + cryptoChannel "gitlab.com/elixxir/crypto/channel" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/elixxir/xxdk-wasm/indexedDb" + mChannels "gitlab.com/elixxir/xxdk-wasm/indexedDb/channels" + mDm "gitlab.com/elixxir/xxdk-wasm/indexedDb/dm" + "gitlab.com/elixxir/xxdk-wasm/indexedDbWorker" + "gitlab.com/xx_network/crypto/csprng" + "time" +) + +// manager handles the event model and the message handler, which is used to +// send information between the event model and the main thread. +type manager struct { + mh *indexedDbWorker.MessageHandler + model dm.EventModel +} + +// RegisterHandlers registers all the reception handlers to manage messages from +// the main thread for the channels.EventModel. +func (m *manager) RegisterHandlers() { + + m.mh.RegisterHandler(indexedDb.NewWASMEventModelTag, m.newWASMEventModelHandler) + m.mh.RegisterHandler(indexedDb.ReceiveTag, m.receiveHandler) + m.mh.RegisterHandler(indexedDb.ReceiveTextTag, m.receiveTextHandler) + m.mh.RegisterHandler(indexedDb.ReceiveReplyTag, m.receiveReplyHandler) + m.mh.RegisterHandler(indexedDb.ReceiveReactionTag, m.receiveReactionHandler) + m.mh.RegisterHandler(indexedDb.UpdateSentStatusTag, m.updateSentStatusHandler) +} + +// newWASMEventModelHandler is the handler for NewWASMEventModel. +func (m *manager) newWASMEventModelHandler(data []byte) []byte { + var msg mChannels.NewWASMEventModelMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal NewWASMEventModelMessage "+ + "from NewWASMEventModel in main thread: %+v", err) + return []byte{} + } + + // Create new encryption cipher + rng := fastRNG.NewStreamGenerator(12, 1024, csprng.NewSystemRNG) + encryption, err := cryptoChannel.NewCipherFromJSON( + []byte(msg.EncryptionJSON), rng.GetStream()) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal channel cipher from "+ + "main thread: %+v", err) + return []byte{} + } + + m.model, err = NewWASMEventModel(msg.Path, encryption, + m.messageReceivedCallback, m.storeEncryptionStatus) + if err != nil { + return []byte(err.Error()) + } + return []byte{} +} + +// messageReceivedCallback sends calls to the MessageReceivedCallback in the +// main thread. +// +// messageReceivedCallback adhere to the MessageReceivedCallback type. +func (m *manager) messageReceivedCallback( + uuid uint64, pubKey ed25519.PublicKey, update bool) { + // Package parameters for sending + msg := &mDm.MessageReceivedCallbackMessage{ + UUID: uuid, + PubKey: pubKey, + Update: update, + } + data, err := json.Marshal(msg) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal MessageReceivedCallbackMessage: %+v", err) + return + } + + // Send it to the main thread + m.mh.SendResponse(indexedDb.GetMessageTag, indexedDb.InitID, data) +} + +// storeEncryptionStatus augments the functionality of +// storage.StoreIndexedDbEncryptionStatus. It takes the database name and +// encryption status +// +// storeEncryptionStatus adheres to the storeEncryptionStatusFn type. +func (m *manager) storeEncryptionStatus( + databaseName string, encryption bool) (bool, error) { + // Package parameters for sending + msg := &mChannels.EncryptionStatusMessage{ + DatabaseName: databaseName, + EncryptionStatus: encryption, + } + data, err := json.Marshal(msg) + if err != nil { + return false, err + } + + // Register response handler with channel that will wait for the response + responseChan := make(chan []byte) + m.mh.RegisterHandler(indexedDb.EncryptionStatusTag, + func(data []byte) []byte { + responseChan <- data + return nil + }) + + // Send encryption status to main thread + m.mh.SendResponse(indexedDb.EncryptionStatusTag, indexedDb.InitID, data) + + // Wait for response + var response mChannels.EncryptionStatusReply + select { + case responseData := <-responseChan: + if err = json.Unmarshal(responseData, &response); err != nil { + return false, err + } + case <-time.After(indexedDb.ResponseTimeout): + return false, errors.Errorf("timed out after %s waiting for "+ + "response about the database encryption status from local "+ + "storage in the main thread", indexedDb.ResponseTimeout) + } + + // If the response contain an error, return it + if response.Error != "" { + return false, errors.New(response.Error) + } + + // Return the encryption status + return response.EncryptionStatus, nil +} + +// receiveHandler is the handler for wasmModel.Receive. +func (m *manager) receiveHandler(data []byte) []byte { + var msg mDm.TransferMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal dm.TransferMessage from "+ + "Receive in main thread: %+v", err) + return nil + } + + uuid := m.model.Receive( + msg.MessageID, msg.Nickname, msg.Text, msg.PubKey, msg.DmToken, + msg.Codeset, msg.Timestamp, msg.Round, msg.MType, msg.Status) + + uuidData, err := json.Marshal(uuid) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal UUID from Receive: %+v", err) + return nil + } + return uuidData +} + +// receiveTextHandler is the handler for wasmModel.ReceiveText. +func (m *manager) receiveTextHandler(data []byte) []byte { + var msg mDm.TransferMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal dm.TransferMessage from "+ + "ReceiveText in main thread: %+v", err) + return nil + } + + uuid := m.model.ReceiveText( + msg.MessageID, msg.Nickname, string(msg.Text), msg.PubKey, msg.DmToken, + msg.Codeset, msg.Timestamp, msg.Round, msg.Status) + + uuidData, err := json.Marshal(uuid) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal UUID from ReceiveText: %+v", err) + return nil + } + return uuidData +} + +// receiveReplyHandler is the handler for wasmModel.ReceiveReply. +func (m *manager) receiveReplyHandler(data []byte) []byte { + var msg mDm.TransferMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal dm.TransferMessage from "+ + "ReceiveReply in main thread: %+v", err) + return nil + } + + uuid := m.model.ReceiveReply(msg.MessageID, msg.ReactionTo, msg.Nickname, + string(msg.Text), msg.PubKey, msg.DmToken, msg.Codeset, msg.Timestamp, + msg.Round, msg.Status) + + uuidData, err := json.Marshal(uuid) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal UUID from ReceiveReply: %+v", err) + return nil + } + return uuidData +} + +// receiveReactionHandler is the handler for wasmModel.ReceiveReaction. +func (m *manager) receiveReactionHandler(data []byte) []byte { + var msg mDm.TransferMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal dm.TransferMessage from "+ + "ReceiveReaction in main thread: %+v", err) + return nil + } + + uuid := m.model.ReceiveReaction(msg.MessageID, msg.ReactionTo, msg.Nickname, + string(msg.Text), msg.PubKey, msg.DmToken, msg.Codeset, msg.Timestamp, + msg.Round, msg.Status) + + uuidData, err := json.Marshal(uuid) + if err != nil { + jww.ERROR.Printf( + "Could not JSON marshal UUID from ReceiveReaction: %+v", err) + return nil + } + return uuidData +} + +// updateSentStatusHandler is the handler for wasmModel.UpdateSentStatus. +func (m *manager) updateSentStatusHandler(data []byte) []byte { + var msg mDm.TransferMessage + err := json.Unmarshal(data, &msg) + if err != nil { + jww.ERROR.Printf("Could not JSON unmarshal dm.TransferMessage from "+ + "UpdateSentStatus in main thread: %+v", err) + return nil + } + + m.model.UpdateSentStatus( + msg.UUID, msg.MessageID, msg.Timestamp, msg.Round, msg.Status) + return nil +} diff --git a/indexedDbWorker/dm/implementation.go b/indexedDbWorker/dm/implementation.go index aedecb876c7e4b3a42cc6cb0897e5469027fcc01..9c8f5a5be24b76fa344c3100371b1c660327dade 100644 --- a/indexedDbWorker/dm/implementation.go +++ b/indexedDbWorker/dm/implementation.go @@ -7,11 +7,12 @@ //go:build js && wasm -package channelEventModel +package main import ( "crypto/ed25519" "encoding/json" + "gitlab.com/elixxir/xxdk-wasm/indexedDbWorker" "strings" "sync" "syscall/js" @@ -21,7 +22,6 @@ import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/v4/cmix/rounds" "gitlab.com/elixxir/client/v4/dm" - "gitlab.com/elixxir/xxdk-wasm/indexedDb" "gitlab.com/elixxir/xxdk-wasm/utils" "gitlab.com/xx_network/primitives/id" @@ -65,7 +65,7 @@ func (w *wasmModel) joinConversation(nickname string, "Unable to marshal Conversation: %+v", err) } - _, err = indexedDb.Put(w.db, conversationStoreName, convoObj) + _, err = indexedDbWorker.Put(w.db, conversationStoreName, convoObj) if err != nil { return errors.WithMessagef(parentErr, "Unable to put Conversation: %+v", err) @@ -100,10 +100,10 @@ func (w *wasmModel) Receive(messageID message.ID, nickname string, text []byte, parentErr := errors.New("failed to Receive") // If there is no extant Conversation, create one. - _, err := indexedDb.Get( + _, err := indexedDbWorker.Get( w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) if err != nil { - if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { + if strings.Contains(err.Error(), indexedDbWorker.ErrDoesNotExist) { err = w.joinConversation(nickname, pubKey, dmToken, codeset) if err != nil { jww.ERROR.Printf("%+v", err) @@ -143,10 +143,10 @@ func (w *wasmModel) ReceiveText(messageID message.ID, nickname, text string, parentErr := errors.New("failed to ReceiveText") // If there is no extant Conversation, create one. - _, err := indexedDb.Get( + _, err := indexedDbWorker.Get( w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) if err != nil { - if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { + if strings.Contains(err.Error(), indexedDbWorker.ErrDoesNotExist) { err = w.joinConversation(nickname, pubKey, dmToken, codeset) if err != nil { jww.ERROR.Printf("%+v", err) @@ -188,10 +188,10 @@ func (w *wasmModel) ReceiveReply(messageID, reactionTo message.ID, nickname, parentErr := errors.New("failed to ReceiveReply") // If there is no extant Conversation, create one. - _, err := indexedDb.Get( + _, err := indexedDbWorker.Get( w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) if err != nil { - if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { + if strings.Contains(err.Error(), indexedDbWorker.ErrDoesNotExist) { err = w.joinConversation(nickname, pubKey, dmToken, codeset) if err != nil { jww.ERROR.Printf("%+v", err) @@ -233,10 +233,10 @@ func (w *wasmModel) ReceiveReaction(messageID, _ message.ID, nickname, parentErr := errors.New("failed to ReceiveText") // If there is no extant Conversation, create one. - _, err := indexedDb.Get( + _, err := indexedDbWorker.Get( w.db, conversationStoreName, utils.CopyBytesToJS(pubKey)) if err != nil { - if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) { + if strings.Contains(err.Error(), indexedDbWorker.ErrDoesNotExist) { err = w.joinConversation(nickname, pubKey, dmToken, codeset) if err != nil { jww.ERROR.Printf("%+v", err) @@ -286,7 +286,7 @@ func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID, key := js.ValueOf(uuid) // Use the key to get the existing Message - currentMsg, err := indexedDb.Get(w.db, messageStoreName, key) + currentMsg, err := indexedDbWorker.Get(w.db, messageStoreName, key) if err != nil { jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr, "Unable to get message: %+v", err)) @@ -343,7 +343,7 @@ func (w *wasmModel) receiveHelper( } // Store message to database - result, err := indexedDb.Put(w.db, messageStoreName, messageObj) + result, err := indexedDbWorker.Put(w.db, messageStoreName, messageObj) if err != nil { return 0, errors.Errorf("Unable to put Message: %+v", err) } @@ -367,7 +367,7 @@ func (w *wasmModel) receiveHelper( // msgIDLookup gets the UUID of the Message with the given messageID. func (w *wasmModel) msgIDLookup(messageID message.ID) (uint64, error) { - resultObj, err := indexedDb.GetIndex(w.db, messageStoreName, + resultObj, err := indexedDbWorker.GetIndex(w.db, messageStoreName, messageStoreMessageIndex, utils.CopyBytesToJS(messageID.Marshal())) if err != nil { return 0, err diff --git a/indexedDbWorker/dm/init.go b/indexedDbWorker/dm/init.go index 29b80911804004539beb210dc85533afc7c98f31..184f81311a20f4c0e5f0cfe7a802cd8df4d1bed3 100644 --- a/indexedDbWorker/dm/init.go +++ b/indexedDbWorker/dm/init.go @@ -7,19 +7,20 @@ //go:build js && wasm -package channelEventModel +package main import ( "crypto/ed25519" + "gitlab.com/elixxir/xxdk-wasm/indexedDb" + "gitlab.com/elixxir/xxdk-wasm/indexedDbWorker" "syscall/js" + "time" "github.com/hack-pad/go-indexeddb/idb" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/v4/dm" cryptoChannel "gitlab.com/elixxir/crypto/channel" - "gitlab.com/elixxir/xxdk-wasm/indexedDb" - "gitlab.com/elixxir/xxdk-wasm/storage" ) const ( @@ -41,16 +42,23 @@ type MessageReceivedCallback func( // NewWASMEventModel returns a [channels.EventModel] backed by a wasmModel. // The name should be a base64 encoding of the users public key. func NewWASMEventModel(path string, encryption cryptoChannel.Cipher, - cb MessageReceivedCallback) (dm.EventModel, error) { + cb MessageReceivedCallback, storeEncryptionStatus storeEncryptionStatusFn) ( + dm.EventModel, error) { databaseName := path + databaseSuffix - return newWASMModel(databaseName, encryption, cb) + return newWASMModel(databaseName, encryption, cb, storeEncryptionStatus) } +// storeEncryptionStatusFn matches storage.StoreIndexedDbEncryptionStatus so +// that the data can be sent between the worker and main thread. +type storeEncryptionStatusFn func( + databaseName string, encryptionStatus bool) (bool, error) + // newWASMModel creates the given [idb.Database] and returns a wasmModel. func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, - cb MessageReceivedCallback) (*wasmModel, error) { + cb MessageReceivedCallback, storeEncryptionStatus storeEncryptionStatusFn) ( + *wasmModel, error) { // Attempt to open database object - ctx, cancel := indexedDb.NewContext() + ctx, cancel := indexedDbWorker.NewContext() defer cancel() openRequest, err := idb.Global().Open(ctx, databaseName, currentVersion, func(db *idb.Database, oldVersion, newVersion uint) error { @@ -86,8 +94,8 @@ func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, // Save the encryption status to storage encryptionStatus := encryption != nil - loadedEncryptionStatus, err := storage.StoreIndexedDbEncryptionStatus( - databaseName, encryptionStatus) + loadedEncryptionStatus, err := + storeEncryptionStatus(databaseName, encryptionStatus) if err != nil { return nil, err } @@ -173,11 +181,50 @@ func v1Upgrade(db *idb.Database) error { } // Get the database name and save it to storage - if databaseName, err := db.Name(); err != nil { - return err - } else if err = storage.StoreIndexedDb(databaseName); err != nil { + if databaseName, err2 := db.Name(); err2 != nil { + return err2 + } else if err = storeDatabaseName(databaseName); err != nil { return err } return nil } + +// storeDatabaseName sends the database name to storage.StoreIndexedDb in the +// main thread to be stored in localstorage and waits for the error to be +// returned. +// +// The function specified below is a placeholder until set by +// registerDatabaseNameStore. registerDatabaseNameStore must be called before +// storeDatabaseName. +var storeDatabaseName = func(databaseName string) error { return nil } + +// RegisterDatabaseNameStore sets storeDatabaseName to send the database to +// storage.StoreIndexedDb in the main thread when called and registers a handler +// to listen for the response. +func RegisterDatabaseNameStore(m *manager) { + storeDatabaseNameResponseChan := make(chan []byte) + // Register handler + m.mh.RegisterHandler(indexedDb.StoreDatabaseNameTag, func(data []byte) []byte { + storeDatabaseNameResponseChan <- data + return nil + }) + + storeDatabaseName = func(databaseName string) error { + m.mh.SendResponse(indexedDb.StoreDatabaseNameTag, indexedDb.InitID, + []byte(databaseName)) + + // Wait for response + select { + case response := <-storeDatabaseNameResponseChan: + if len(response) > 0 { + return errors.New(string(response)) + } + case <-time.After(indexedDb.ResponseTimeout): + return errors.Errorf("timed out after %s waiting for "+ + "response about storing the database name in local "+ + "storage in the main thread", indexedDb.ResponseTimeout) + } + return nil + } +} diff --git a/indexedDbWorker/dm/main.go b/indexedDbWorker/dm/main.go new file mode 100644 index 0000000000000000000000000000000000000000..5610d4d69dfe4c0a3461be5182a0f3018502bd12 --- /dev/null +++ b/indexedDbWorker/dm/main.go @@ -0,0 +1,23 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package main + +import ( + "fmt" + "gitlab.com/elixxir/xxdk-wasm/indexedDbWorker" +) + +func main() { + fmt.Println("Starting xxDK WebAssembly Database Worker.") + + m := &manager{mh: indexedDbWorker.NewMessageHandler()} + m.RegisterHandlers() + RegisterDatabaseNameStore(m) + m.mh.SignalReady() + <-make(chan bool) +} diff --git a/indexedDbWorker/dm/model.go b/indexedDbWorker/dm/model.go index bb6f34588aa2976c1689b629c6df31219de6b585..9b0b99e85bd5c97c389a1ac015819026b1d46b94 100644 --- a/indexedDbWorker/dm/model.go +++ b/indexedDbWorker/dm/model.go @@ -7,7 +7,7 @@ //go:build js && wasm -package channelEventModel +package main import ( "time"