diff --git a/indexedDb/impl/channels/callbacks.go b/indexedDb/impl/channels/callbacks.go index 68cc92d71f962ab4adc5d734f598561c95b006ea..79fba1cc290097701747c5403a7ebf364369ca46 100644 --- a/indexedDb/impl/channels/callbacks.go +++ b/indexedDb/impl/channels/callbacks.go @@ -71,13 +71,13 @@ func (m *manager) newWASMEventModelCB(data []byte) ([]byte, error) { "failed to JSON unmarshal Cipher from main thread: %+v", err) } - m.model, err = NewWASMEventModel(msg.Path, encryption, - m.messageReceivedCallback, m.deletedMessageCallback, m.mutedUserCallback, - m.storeDatabaseName, m.storeEncryptionStatus) + m.model, err = NewWASMEventModel(msg.DatabaseName, encryption, + m.messageReceivedCallback, m.deletedMessageCallback, m.mutedUserCallback) if err != nil { return []byte(err.Error()), nil } - return []byte{}, nil + + return nil, nil } // messageReceivedCallback sends calls to the channels.MessageReceivedCallback @@ -132,88 +132,6 @@ func (m *manager) mutedUserCallback( m.mh.SendMessage(wChannels.MutedUserCallbackTag, data) } -// storeDatabaseName sends the database name to the main thread and waits for -// the response. This function mocks the behavior of storage.StoreIndexedDb. -// -// storeDatabaseName adheres to the storeDatabaseNameFn type. -func (m *manager) storeDatabaseName(databaseName string) error { - // Register response callback with channel that will wait for the response - responseChan := make(chan []byte) - m.mh.RegisterCallback(wChannels.StoreDatabaseNameTag, - func(data []byte) ([]byte, error) { - responseChan <- data - return nil, nil - }) - - // Send encryption status to main thread - m.mh.SendMessage(wChannels.StoreDatabaseNameTag, []byte(databaseName)) - - // Wait for response - select { - case response := <-responseChan: - if len(response) > 0 { - return errors.New(string(response)) - } - case <-time.After(worker.ResponseTimeout): - return errors.Errorf("[WW] Timed out after %s waiting for response "+ - "about storing the database name in local storage in the main "+ - "thread", worker.ResponseTimeout) - } - - return nil -} - -// storeEncryptionStatus sends the database name and encryption status to the -// main thread and waits for the response. If the value has not been previously -// saved, it returns the saves encryption status. This function mocks the -// behavior of storage.StoreIndexedDbEncryptionStatus. -// -// storeEncryptionStatus adheres to the storeEncryptionStatusFn type. -func (m *manager) storeEncryptionStatus( - databaseName string, encryption bool) (bool, error) { - // Package parameters for sending - msg := &wChannels.EncryptionStatusMessage{ - DatabaseName: databaseName, - EncryptionStatus: encryption, - } - data, err := json.Marshal(msg) - if err != nil { - return false, err - } - - // Register response callback with channel that will wait for the response - responseChan := make(chan []byte) - m.mh.RegisterCallback(wChannels.EncryptionStatusTag, - func(data []byte) ([]byte, error) { - responseChan <- data - return nil, nil - }) - - // Send encryption status to main thread - m.mh.SendMessage(wChannels.EncryptionStatusTag, data) - - // Wait for response - var response wChannels.EncryptionStatusReply - select { - case responseData := <-responseChan: - if err = json.Unmarshal(responseData, &response); err != nil { - return false, err - } - case <-time.After(worker.ResponseTimeout): - return false, errors.Errorf("timed out after %s waiting for "+ - "response about the database encryption status from local "+ - "storage in the main thread", worker.ResponseTimeout) - } - - // If the response contain an error, return it - if response.Error != "" { - return false, errors.New(response.Error) - } - - // Return the encryption status - return response.EncryptionStatus, nil -} - // joinChannelCB is the callback for wasmModel.JoinChannel. Always returns nil; // meaning, no response is supplied (or expected). func (m *manager) joinChannelCB(data []byte) ([]byte, error) { @@ -258,7 +176,7 @@ func (m *manager) receiveMessageCB(data []byte) ([]byte, error) { uuidData, err := json.Marshal(uuid) if err != nil { - return zeroUUID, errors.Errorf("failed to JSON marshal UUID : %+v", err) + return zeroUUID, errors.Errorf("failed to JSON marshal UUID: %+v", err) } return uuidData, nil } @@ -280,7 +198,7 @@ func (m *manager) receiveReplyCB(data []byte) ([]byte, error) { uuidData, err := json.Marshal(uuid) if err != nil { - return zeroUUID, errors.Errorf("failed to JSON marshal UUID : %+v", err) + return zeroUUID, errors.Errorf("failed to JSON marshal UUID: %+v", err) } return uuidData, nil } @@ -302,7 +220,7 @@ func (m *manager) receiveReactionCB(data []byte) ([]byte, error) { uuidData, err := json.Marshal(uuid) if err != nil { - return zeroUUID, errors.Errorf("failed to JSON marshal UUID : %+v", err) + return zeroUUID, errors.Errorf("failed to JSON marshal UUID: %+v", err) } return uuidData, nil } @@ -379,7 +297,7 @@ func (m *manager) updateFromMessageIDCB(data []byte) ([]byte, error) { uuidData, err := json.Marshal(uuid) if err != nil { - return nil, errors.Errorf("failed to JSON marshal UUID : %+v", err) + return nil, errors.Errorf("failed to JSON marshal UUID: %+v", err) } return uuidData, nil diff --git a/indexedDb/impl/channels/implementation_test.go b/indexedDb/impl/channels/implementation_test.go index 67aaa6514777d2419414d800041bc84c6d6bec53..9577217901a13075bcd6b5bb3848d69d8577e81c 100644 --- a/indexedDb/impl/channels/implementation_test.go +++ b/indexedDb/impl/channels/implementation_test.go @@ -41,10 +41,6 @@ func TestMain(m *testing.M) { func dummyReceivedMessageCB(uint64, *id.ID, bool) {} func dummyDeletedMessageCB(message.ID) {} func dummyMutedUserCB(*id.ID, ed25519.PublicKey, bool) {} -func dummyStoreDatabaseName(string) error { return nil } -func dummyStoreEncryptionStatus(_ string, encryptionStatus bool) (bool, error) { - return encryptionStatus, nil -} // Happy path, insert message and look it up func TestWasmModel_GetMessage(t *testing.T) { @@ -64,8 +60,7 @@ func TestWasmModel_GetMessage(t *testing.T) { testMsgId := message.DeriveChannelMessageID(&id.ID{1}, 0, []byte(testString)) eventModel, err := newWASMModel(testString, c, - dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB, - dummyStoreDatabaseName, dummyStoreEncryptionStatus) + dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB) if err != nil { t.Fatal(err) } @@ -96,8 +91,7 @@ func TestWasmModel_DeleteMessage(t *testing.T) { testString := "TestWasmModel_DeleteMessage" testMsgId := message.DeriveChannelMessageID(&id.ID{1}, 0, []byte(testString)) eventModel, err := newWASMModel(testString, nil, dummyReceivedMessageCB, - dummyDeletedMessageCB, dummyMutedUserCB, dummyStoreDatabaseName, - dummyStoreEncryptionStatus) + dummyDeletedMessageCB, dummyMutedUserCB) if err != nil { t.Fatal(err) } @@ -154,8 +148,7 @@ func Test_wasmModel_UpdateSentStatus(t *testing.T) { testMsgId := message.DeriveChannelMessageID( &id.ID{1}, 0, []byte(testString)) eventModel, err2 := newWASMModel(testString, c, - dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB, - dummyStoreDatabaseName, dummyStoreEncryptionStatus) + dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB) if err2 != nil { t.Fatal(err) } @@ -223,8 +216,7 @@ func Test_wasmModel_JoinChannel_LeaveChannel(t *testing.T) { t.Run("Test_wasmModel_JoinChannel_LeaveChannel"+cs, func(t *testing.T) { storage.GetLocalStorage().Clear() eventModel, err2 := newWASMModel("test", c, dummyReceivedMessageCB, - dummyDeletedMessageCB, dummyMutedUserCB, dummyStoreDatabaseName, - dummyStoreEncryptionStatus) + dummyDeletedMessageCB, dummyMutedUserCB) if err2 != nil { t.Fatal(err2) } @@ -278,8 +270,7 @@ func Test_wasmModel_UUIDTest(t *testing.T) { storage.GetLocalStorage().Clear() testString := "testHello" + cs eventModel, err2 := newWASMModel(testString, c, - dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB, - dummyStoreDatabaseName, dummyStoreEncryptionStatus) + dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB) if err2 != nil { t.Fatal(err2) } @@ -326,8 +317,7 @@ func Test_wasmModel_DuplicateReceives(t *testing.T) { t.Run(testString, func(t *testing.T) { storage.GetLocalStorage().Clear() eventModel, err := newWASMModel(testString, c, - dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB, - dummyStoreDatabaseName, dummyStoreEncryptionStatus) + dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB) if err != nil { t.Fatal(err) } @@ -376,8 +366,7 @@ func Test_wasmModel_deleteMsgByChannel(t *testing.T) { totalMessages := 10 expectedMessages := 5 eventModel, err := newWASMModel(testString, c, - dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB, - dummyStoreDatabaseName, dummyStoreEncryptionStatus) + dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB) if err != nil { t.Fatal(err) } @@ -448,8 +437,7 @@ func TestWasmModel_receiveHelper_UniqueIndex(t *testing.T) { storage.GetLocalStorage().Clear() testString := fmt.Sprintf("test_receiveHelper_UniqueIndex_%d", i) eventModel, err := newWASMModel(testString, c, - dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB, - dummyStoreDatabaseName, dummyStoreEncryptionStatus) + dummyReceivedMessageCB, dummyDeletedMessageCB, dummyMutedUserCB) if err != nil { t.Fatal(err) } diff --git a/indexedDb/impl/channels/init.go b/indexedDb/impl/channels/init.go index 45107fa2c437b82bce04a50fe71eebeaac0625ee..9a8b940898ac30e81fa755e9d4053a21220ef66b 100644 --- a/indexedDb/impl/channels/init.go +++ b/indexedDb/impl/channels/init.go @@ -13,7 +13,6 @@ import ( "syscall/js" "github.com/hack-pad/go-indexeddb/idb" - "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/v4/channels" @@ -22,45 +21,26 @@ import ( wChannels "gitlab.com/elixxir/xxdk-wasm/indexedDb/worker/channels" ) -const ( - // databaseSuffix is the suffix to be appended to the name of - // the database. - databaseSuffix = "_speakeasy" - - // currentVersion is the current version of the IndexDb - // runtime. Used for migration purposes. - currentVersion uint = 1 -) - -// storeDatabaseNameFn matches storage.StoreIndexedDb so that the data can be -// sent between the worker and main thread. -type storeDatabaseNameFn func(databaseName string) error - -// storeEncryptionStatusFn matches storage.StoreIndexedDbEncryptionStatus so -// that the data can be sent between the worker and main thread. -type storeEncryptionStatusFn func( - databaseName string, encryptionStatus bool) (bool, error) +// currentVersion is the current version of the IndexedDb runtime. Used for +// migration purposes. +const currentVersion uint = 1 // NewWASMEventModel returns a [channels.EventModel] backed by a wasmModel. -// The name should be a base64 encoding of the users public key. -func NewWASMEventModel(path string, encryption cryptoChannel.Cipher, +// The name should be a base64 encoding of the users public key. Returns the +// EventModel based on IndexedDb and the database name as reported by IndexedDb. +func NewWASMEventModel(databaseName string, encryption cryptoChannel.Cipher, messageReceivedCB wChannels.MessageReceivedCallback, deletedMessageCB wChannels.DeletedMessageCallback, - mutedUserCB wChannels.MutedUserCallback, - storeDatabaseName storeDatabaseNameFn, - storeEncryptionStatus storeEncryptionStatusFn) (channels.EventModel, error) { - databaseName := path + databaseSuffix + mutedUserCB wChannels.MutedUserCallback) (channels.EventModel, error) { return newWASMModel(databaseName, encryption, messageReceivedCB, - deletedMessageCB, mutedUserCB, storeDatabaseName, storeEncryptionStatus) + deletedMessageCB, mutedUserCB) } // newWASMModel creates the given [idb.Database] and returns a wasmModel. func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, messageReceivedCB wChannels.MessageReceivedCallback, deletedMessageCB wChannels.DeletedMessageCallback, - mutedUserCB wChannels.MutedUserCallback, - storeDatabaseName storeDatabaseNameFn, - storeEncryptionStatus storeEncryptionStatusFn) (*wasmModel, error) { + mutedUserCB wChannels.MutedUserCallback) (*wasmModel, error) { // Attempt to open database object ctx, cancel := impl.NewContext() defer cancel() @@ -95,29 +75,6 @@ func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, return nil, err } - // Get the database name and save it to storage - if dbName, err2 := db.Name(); err2 != nil { - return nil, err2 - } else if err = storeDatabaseName(dbName); err != nil { - return nil, err - } - - // Save the encryption status to storage - encryptionStatus := encryption != nil - loadedEncryptionStatus, err := - storeEncryptionStatus(databaseName, encryptionStatus) - if err != nil { - return nil, err - } - - // Verify encryption status does not change - if encryptionStatus != loadedEncryptionStatus { - return nil, errors.New( - "Cannot load database with different encryption status.") - } else if !encryptionStatus { - jww.WARN.Printf("IndexedDb encryption disabled!") - } - wrapper := &wasmModel{ db: db, cipher: encryption, diff --git a/indexedDb/impl/dm/callbacks.go b/indexedDb/impl/dm/callbacks.go index b477df50c01c0c9dda3195be7f9706d8070cd930..a2deba1774d8c3936340a86170349def554ad705 100644 --- a/indexedDb/impl/dm/callbacks.go +++ b/indexedDb/impl/dm/callbacks.go @@ -12,10 +12,10 @@ package main import ( "crypto/ed25519" "encoding/json" - "time" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/dm" cryptoChannel "gitlab.com/elixxir/crypto/channel" "gitlab.com/elixxir/crypto/fastRNG" @@ -68,12 +68,13 @@ func (m *manager) newWASMEventModelCB(data []byte) ([]byte, error) { "cipher from main thread: %+v", err) } - m.model, err = NewWASMEventModel(msg.Path, encryption, - m.messageReceivedCallback, m.storeDatabaseName, m.storeEncryptionStatus) + m.model, err = NewWASMEventModel( + msg.DatabaseName, encryption, m.messageReceivedCallback) if err != nil { return []byte(err.Error()), nil } - return []byte{}, nil + + return nil, nil } // messageReceivedCallback sends calls to the MessageReceivedCallback in the @@ -100,88 +101,6 @@ func (m *manager) messageReceivedCallback(uuid uint64, pubKey ed25519.PublicKey, m.mh.SendMessage(wDm.MessageReceivedCallbackTag, data) } -// storeDatabaseName sends the database name to the main thread and waits for -// the response. This function mocks the behavior of storage.StoreIndexedDb. -// -// storeDatabaseName adheres to the storeDatabaseNameFn type. -func (m *manager) storeDatabaseName(databaseName string) error { - // Register response callback with channel that will wait for the response - responseChan := make(chan []byte) - m.mh.RegisterCallback(wDm.StoreDatabaseNameTag, - func(data []byte) ([]byte, error) { - responseChan <- data - return nil, nil - }) - - // Send encryption status to main thread - m.mh.SendMessage(wDm.StoreDatabaseNameTag, []byte(databaseName)) - - // Wait for response - select { - case response := <-responseChan: - if len(response) > 0 { - return errors.New(string(response)) - } - case <-time.After(worker.ResponseTimeout): - return errors.Errorf("[WW] Timed out after %s waiting for response "+ - "about storing the database name in local storage in the main "+ - "thread", worker.ResponseTimeout) - } - - return nil -} - -// storeEncryptionStatus sends the database name and encryption status to the -// main thread and waits for the response. If the value has not been previously -// saved, it returns the saves encryption status. This function mocks the -// behavior of storage.StoreIndexedDbEncryptionStatus. -// -// storeEncryptionStatus adheres to the storeEncryptionStatusFn type. -func (m *manager) storeEncryptionStatus( - databaseName string, encryption bool) (bool, error) { - // Package parameters for sending - msg := &wDm.EncryptionStatusMessage{ - DatabaseName: databaseName, - EncryptionStatus: encryption, - } - data, err := json.Marshal(msg) - if err != nil { - return false, err - } - - // Register response callback with channel that will wait for the response - responseChan := make(chan []byte) - m.mh.RegisterCallback(wDm.EncryptionStatusTag, - func(data []byte) ([]byte, error) { - responseChan <- data - return nil, nil - }) - - // Send encryption status to main thread - m.mh.SendMessage(wDm.EncryptionStatusTag, data) - - // Wait for response - var response wDm.EncryptionStatusReply - select { - case responseData := <-responseChan: - if err = json.Unmarshal(responseData, &response); err != nil { - return false, err - } - case <-time.After(worker.ResponseTimeout): - return false, errors.Errorf("timed out after %s waiting for "+ - "response about the database encryption status from local "+ - "storage in the main thread", worker.ResponseTimeout) - } - - // If the response contain an error, return it - if response.Error != "" { - return false, errors.New(response.Error) - } - - // Return the encryption status - return response.EncryptionStatus, nil -} - // receiveCB is the callback for wasmModel.Receive. Returns a UUID of 0 on error // or the JSON marshalled UUID (uint64) on success. func (m *manager) receiveCB(data []byte) ([]byte, error) { @@ -198,7 +117,7 @@ func (m *manager) receiveCB(data []byte) ([]byte, error) { uuidData, err := json.Marshal(uuid) if err != nil { - return zeroUUID, errors.Errorf("failed to JSON marshal UUID : %+v", err) + return zeroUUID, errors.Errorf("failed to JSON marshal UUID: %+v", err) } return uuidData, nil } @@ -219,7 +138,7 @@ func (m *manager) receiveTextCB(data []byte) ([]byte, error) { uuidData, err := json.Marshal(uuid) if err != nil { - return []byte{}, errors.Errorf("failed to JSON marshal UUID : %+v", err) + return []byte{}, errors.Errorf("failed to JSON marshal UUID: %+v", err) } return uuidData, nil @@ -241,7 +160,7 @@ func (m *manager) receiveReplyCB(data []byte) ([]byte, error) { uuidData, err := json.Marshal(uuid) if err != nil { - return zeroUUID, errors.Errorf("failed to JSON marshal UUID : %+v", err) + return zeroUUID, errors.Errorf("failed to JSON marshal UUID: %+v", err) } return uuidData, nil @@ -263,7 +182,7 @@ func (m *manager) receiveReactionCB(data []byte) ([]byte, error) { uuidData, err := json.Marshal(uuid) if err != nil { - return zeroUUID, errors.Errorf("failed to JSON marshal UUID : %+v", err) + return zeroUUID, errors.Errorf("failed to JSON marshal UUID: %+v", err) } return uuidData, nil diff --git a/indexedDb/impl/dm/implementation_test.go b/indexedDb/impl/dm/implementation_test.go index 5936c83ba3b6537c350cec3a852f489a3911e913..409c6239b84787d3696e62fcb4f7f8e9092b9c2e 100644 --- a/indexedDb/impl/dm/implementation_test.go +++ b/indexedDb/impl/dm/implementation_test.go @@ -11,16 +11,13 @@ package main import ( "crypto/ed25519" - jww "github.com/spf13/jwalterweatherman" "os" "testing" + + jww "github.com/spf13/jwalterweatherman" ) func dummyReceivedMessageCB(uint64, ed25519.PublicKey, bool, bool) {} -func dummyStoreDatabaseName(string) error { return nil } -func dummyStoreEncryptionStatus(_ string, encryptionStatus bool) (bool, error) { - return encryptionStatus, nil -} func TestMain(m *testing.M) { jww.SetStdoutThreshold(jww.LevelDebug) @@ -29,8 +26,7 @@ func TestMain(m *testing.M) { // Test happy path toggling between blocked/unblocked in a Conversation. func TestWasmModel_BlockSender(t *testing.T) { - m, err := newWASMModel("test", nil, - dummyReceivedMessageCB, dummyStoreDatabaseName, dummyStoreEncryptionStatus) + m, err := newWASMModel("test", nil, dummyReceivedMessageCB) if err != nil { t.Fatal(err.Error()) } diff --git a/indexedDb/impl/dm/init.go b/indexedDb/impl/dm/init.go index 98c9e85a8278e9ed67bce4a960df99431819648d..b9bad462303b8000c33713f08170f69ac64f39c2 100644 --- a/indexedDb/impl/dm/init.go +++ b/indexedDb/impl/dm/init.go @@ -11,25 +11,19 @@ package main import ( "crypto/ed25519" - "gitlab.com/elixxir/xxdk-wasm/indexedDb/impl" "syscall/js" "github.com/hack-pad/go-indexeddb/idb" - "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/dm" cryptoChannel "gitlab.com/elixxir/crypto/channel" + "gitlab.com/elixxir/xxdk-wasm/indexedDb/impl" ) -const ( - // databaseSuffix is the suffix to be appended to the name of - // the database. - databaseSuffix = "_speakeasy_dm" - - // currentVersion is the current version of the IndexDb - // runtime. Used for migration purposes. - currentVersion uint = 1 -) +// currentVersion is the current version of the IndexedDb runtime. Used for +// migration purposes. +const currentVersion uint = 1 // MessageReceivedCallback is called any time a message is received or updated. // @@ -38,29 +32,17 @@ const ( type MessageReceivedCallback func( uuid uint64, pubKey ed25519.PublicKey, messageUpdate, conversationUpdate bool) -// storeDatabaseNameFn matches storage.StoreIndexedDb so that the data can be -// sent between the worker and main thread. -type storeDatabaseNameFn func(databaseName string) error - -// storeEncryptionStatusFn matches storage.StoreIndexedDbEncryptionStatus so -// that the data can be sent between the worker and main thread. -type storeEncryptionStatusFn func( - databaseName string, encryptionStatus bool) (bool, error) - // NewWASMEventModel returns a [channels.EventModel] backed by a wasmModel. -// The name should be a base64 encoding of the users public key. -func NewWASMEventModel(path string, encryption cryptoChannel.Cipher, - cb MessageReceivedCallback, storeDatabaseName storeDatabaseNameFn, - storeEncryptionStatus storeEncryptionStatusFn) (dm.EventModel, error) { - databaseName := path + databaseSuffix - return newWASMModel( - databaseName, encryption, cb, storeDatabaseName, storeEncryptionStatus) +// The name should be a base64 encoding of the users public key. Returns the +// EventModel based on IndexedDb and the database name as reported by IndexedDb. +func NewWASMEventModel(databaseName string, encryption cryptoChannel.Cipher, + cb MessageReceivedCallback) (dm.EventModel, error) { + return newWASMModel(databaseName, encryption, cb) } // newWASMModel creates the given [idb.Database] and returns a wasmModel. func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, - cb MessageReceivedCallback, storeDatabaseName storeDatabaseNameFn, - storeEncryptionStatus storeEncryptionStatusFn) (*wasmModel, error) { + cb MessageReceivedCallback) (*wasmModel, error) { // Attempt to open database object ctx, cancel := impl.NewContext() defer cancel() @@ -95,29 +77,6 @@ func newWASMModel(databaseName string, encryption cryptoChannel.Cipher, return nil, err } - // Get the database name and save it to storage - if dbName, err2 := db.Name(); err2 != nil { - return nil, err2 - } else if err = storeDatabaseName(dbName); err != nil { - return nil, err - } - - // Save the encryption status to storage - encryptionStatus := encryption != nil - loadedEncryptionStatus, err := - storeEncryptionStatus(databaseName, encryptionStatus) - if err != nil { - return nil, err - } - - // Verify encryption status does not change - if encryptionStatus != loadedEncryptionStatus { - return nil, errors.New( - "Cannot load database with different encryption status.") - } else if !encryptionStatus { - jww.WARN.Printf("IndexedDb encryption disabled!") - } - wrapper := &wasmModel{db: db, receivedMessageCB: cb, cipher: encryption} return wrapper, nil diff --git a/indexedDb/worker/channels/init.go b/indexedDb/worker/channels/init.go index 04b7a8f4392ac78051bd9cce001be45a98477dca..0fa2fd566c50625af3d623eb3383a04bb035134d 100644 --- a/indexedDb/worker/channels/init.go +++ b/indexedDb/worker/channels/init.go @@ -25,6 +25,9 @@ import ( "gitlab.com/xx_network/primitives/id" ) +// databaseSuffix is the suffix to be appended to the name of the database. +const databaseSuffix = "_speakeasy" + // MessageReceivedCallback is called any time a message is received or updated. // // update is true if the row is old and was edited. @@ -55,7 +58,7 @@ func NewWASMEventModelBuilder(wasmJsPath string, // NewWASMEventModelMessage is JSON marshalled and sent to the worker for // [NewWASMEventModel]. type NewWASMEventModelMessage struct { - Path string `json:"path"` + DatabaseName string `json:"databaseName"` EncryptionJSON string `json:"encryptionJSON"` } @@ -65,6 +68,7 @@ func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher, messageReceivedCB MessageReceivedCallback, deletedMessageCB DeletedMessageCallback, mutedUserCB MutedUserCallback) ( channels.EventModel, error) { + databaseName := path + databaseSuffix wm, err := worker.NewManager(wasmJsPath, "channelsIndexedDb", true) if err != nil { @@ -83,11 +87,18 @@ func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher, wm.RegisterCallback(MutedUserCallbackTag, mutedUserCallbackHandler(mutedUserCB)) - // Register handler to manage checking encryption status from local storage - wm.RegisterCallback(EncryptionStatusTag, checkDbEncryptionStatusHandler(wm)) + // Store the database name + err = storage.StoreIndexedDb(databaseName) + if err != nil { + return nil, err + } - // Register handler to manage the storage of the database name - wm.RegisterCallback(StoreDatabaseNameTag, storeDatabaseNameHandler(wm)) + // Check that the encryption status + encryptionStatus := encryption != nil + err = checkDbEncryptionStatus(databaseName, encryptionStatus) + if err != nil { + return nil, err + } encryptionJSON, err := json.Marshal(encryption) if err != nil { @@ -95,7 +106,7 @@ func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher, } msg := NewWASMEventModelMessage{ - Path: path, + DatabaseName: databaseName, EncryptionJSON: string(encryptionJSON), } @@ -104,14 +115,14 @@ func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher, return nil, err } - errChan := make(chan string) + dataChan := make(chan []byte) wm.SendMessage(NewWASMEventModelTag, payload, - func(data []byte) { errChan <- string(data) }) + func(data []byte) { dataChan <- data }) select { - case workerErr := <-errChan: - if workerErr != "" { - return nil, errors.New(workerErr) + case data := <-dataChan: + if data != nil { + return nil, errors.New(string(data)) } case <-time.After(worker.ResponseTimeout): return nil, errors.Errorf("timed out after %s waiting for indexedDB "+ @@ -189,54 +200,24 @@ type EncryptionStatusReply struct { Error string `json:"error"` } -// checkDbEncryptionStatusHandler returns a handler to manage checking -// encryption status from local storage. -func checkDbEncryptionStatusHandler( - wh *worker.Manager) func(data []byte) { - return func(data []byte) { - // Unmarshal received message - var msg EncryptionStatusMessage - err := json.Unmarshal(data, &msg) - if err != nil { - jww.ERROR.Printf("Failed to JSON unmarshal "+ - "EncryptionStatusMessage message from worker: %+v", err) - return - } +// checkDbEncryptionStatus returns an error if the encryption status provided +// does not match the stored status for this database name. +func checkDbEncryptionStatus(databaseName string, encryptionStatus bool) error { - // Pass message values to storage - loadedEncryptionStatus, err := storage.StoreIndexedDbEncryptionStatus( - msg.DatabaseName, msg.EncryptionStatus) - var reply EncryptionStatusReply - if err != nil { - reply.Error = err.Error() - } else { - reply.EncryptionStatus = loadedEncryptionStatus - } - - // Return response - statusData, err := json.Marshal(reply) - if err != nil { - jww.ERROR.Printf( - "Failed to JSON marshal EncryptionStatusReply: %+v", err) - return - } - - wh.SendMessage(EncryptionStatusTag, statusData, nil) + // Pass message values to storage + loadedEncryptionStatus, err := storage.StoreIndexedDbEncryptionStatus( + databaseName, encryptionStatus) + if err != nil { + return err } -} - -// storeDatabaseNameHandler returns a handler that stores the database name to -// storage when it is received from the worker. -func storeDatabaseNameHandler( - wh *worker.Manager) func(data []byte) { - return func(data []byte) { - var returnData []byte - // Get the database name and save it to storage - if err := storage.StoreIndexedDb(string(data)); err != nil { - returnData = []byte(err.Error()) - } - - wh.SendMessage(StoreDatabaseNameTag, returnData, nil) + // Verify encryption status does not change + if encryptionStatus != loadedEncryptionStatus { + return errors.New( + "cannot load database with different encryption status") + } else if !encryptionStatus { + jww.WARN.Printf("IndexedDb encryption disabled!") } + + return nil } diff --git a/indexedDb/worker/channels/tags.go b/indexedDb/worker/channels/tags.go index d2ae61a9f6b65a9bf2a02483806e89fbc14edb9d..d3555e549163c18b772cc86f965a2f7aeca7a827 100644 --- a/indexedDb/worker/channels/tags.go +++ b/indexedDb/worker/channels/tags.go @@ -18,8 +18,6 @@ const ( MessageReceivedCallbackTag worker.Tag = "MessageReceivedCallback" DeletedMessageCallbackTag worker.Tag = "DeletedMessageCallback" MutedUserCallbackTag worker.Tag = "MutedUserCallback" - EncryptionStatusTag worker.Tag = "EncryptionStatus" - StoreDatabaseNameTag worker.Tag = "StoreDatabaseName" JoinChannelTag worker.Tag = "JoinChannel" LeaveChannelTag worker.Tag = "LeaveChannel" diff --git a/indexedDb/worker/dm/init.go b/indexedDb/worker/dm/init.go index 4bf441a73448d45404be8a371855b63817f21de3..d8e905bed513f4bb44d0920ee18d75be7e3877e2 100644 --- a/indexedDb/worker/dm/init.go +++ b/indexedDb/worker/dm/init.go @@ -23,6 +23,9 @@ import ( "gitlab.com/elixxir/xxdk-wasm/worker" ) +// databaseSuffix is the suffix to be appended to the name of the database. +const databaseSuffix = "_speakeasy_dm" + // MessageReceivedCallback is called any time a message is received or updated. // // messageUpdate is true if the Message already exists and was edited. @@ -33,7 +36,7 @@ type MessageReceivedCallback func(uuid uint64, pubKey ed25519.PublicKey, // NewWASMEventModelMessage is JSON marshalled and sent to the worker for // [NewWASMEventModel]. type NewWASMEventModelMessage struct { - Path string `json:"path"` + DatabaseName string `json:"databaseName"` EncryptionJSON string `json:"encryptionJSON"` } @@ -41,6 +44,7 @@ type NewWASMEventModelMessage struct { // The name should be a base64 encoding of the users public key. func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher, cb MessageReceivedCallback) (dm.EventModel, error) { + databaseName := path + databaseSuffix wh, err := worker.NewManager(wasmJsPath, "dmIndexedDb", true) if err != nil { @@ -51,11 +55,18 @@ func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher, wh.RegisterCallback( MessageReceivedCallbackTag, messageReceivedCallbackHandler(cb)) - // Register handler to manage checking encryption status from local storage - wh.RegisterCallback(EncryptionStatusTag, checkDbEncryptionStatusHandler(wh)) + // Store the database name + err = storage.StoreIndexedDb(databaseName) + if err != nil { + return nil, err + } - // Register handler to manage the storage of the database name - wh.RegisterCallback(StoreDatabaseNameTag, storeDatabaseNameHandler(wh)) + // Check that the encryption status + encryptionStatus := encryption != nil + err = checkDbEncryptionStatus(databaseName, encryptionStatus) + if err != nil { + return nil, err + } encryptionJSON, err := json.Marshal(encryption) if err != nil { @@ -63,7 +74,7 @@ func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher, } msg := NewWASMEventModelMessage{ - Path: path, + DatabaseName: databaseName, EncryptionJSON: string(encryptionJSON), } @@ -72,14 +83,14 @@ func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher, return nil, err } - errChan := make(chan string) + dataChan := make(chan []byte) wh.SendMessage(NewWASMEventModelTag, payload, - func(data []byte) { errChan <- string(data) }) + func(data []byte) { dataChan <- data }) select { - case workerErr := <-errChan: - if workerErr != "" { - return nil, errors.New(workerErr) + case data := <-dataChan: + if data != nil { + return nil, errors.New(string(data)) } case <-time.After(worker.ResponseTimeout): return nil, errors.Errorf("timed out after %s waiting for indexedDB "+ @@ -113,66 +124,23 @@ func messageReceivedCallbackHandler(cb MessageReceivedCallback) func(data []byte } } -// EncryptionStatusMessage is JSON marshalled and received from the worker when -// the database checks if it is encrypted. -type EncryptionStatusMessage struct { - DatabaseName string `json:"databaseName"` - EncryptionStatus bool `json:"encryptionStatus"` -} - -// EncryptionStatusReply is JSON marshalled and sent to the worker is response -// to the [EncryptionStatusMessage]. -type EncryptionStatusReply struct { - EncryptionStatus bool `json:"encryptionStatus"` - Error string `json:"error"` -} - -// checkDbEncryptionStatusHandler returns a handler to manage checking -// encryption status from local storage. -func checkDbEncryptionStatusHandler(wh *worker.Manager) func(data []byte) { - return func(data []byte) { - // Unmarshal received message - var msg EncryptionStatusMessage - err := json.Unmarshal(data, &msg) - if err != nil { - jww.ERROR.Printf("Failed to JSON unmarshal "+ - "EncryptionStatusMessage message from worker: %+v", err) - return - } - - // Pass message values to storage - loadedEncryptionStatus, err := storage.StoreIndexedDbEncryptionStatus( - msg.DatabaseName, msg.EncryptionStatus) - var reply EncryptionStatusReply - if err != nil { - reply.Error = err.Error() - } else { - reply.EncryptionStatus = loadedEncryptionStatus - } - - // Return response - statusData, err := json.Marshal(reply) - if err != nil { - jww.ERROR.Printf( - "Failed to JSON marshal EncryptionStatusReply: %+v", err) - return - } - - wh.SendMessage(EncryptionStatusTag, statusData, nil) +// checkDbEncryptionStatus returns an error if the encryption status provided +// does not match the stored status for this database name. +func checkDbEncryptionStatus(databaseName string, encryptionStatus bool) error { + // Pass message values to storage + loadedEncryptionStatus, err := storage.StoreIndexedDbEncryptionStatus( + databaseName, encryptionStatus) + if err != nil { + return err } -} - -// storeDatabaseNameHandler returns a handler that stores the database name to -// storage when it is received from the worker. -func storeDatabaseNameHandler(wh *worker.Manager) func(data []byte) { - return func(data []byte) { - var returnData []byte - - // Get the database name and save it to storage - if err := storage.StoreIndexedDb(string(data)); err != nil { - returnData = []byte(err.Error()) - } - wh.SendMessage(StoreDatabaseNameTag, returnData, nil) + // Verify encryption status does not change + if encryptionStatus != loadedEncryptionStatus { + return errors.New( + "cannot load database with different encryption status") + } else if !encryptionStatus { + jww.WARN.Printf("IndexedDb encryption disabled!") } + + return nil } diff --git a/indexedDb/worker/dm/tags.go b/indexedDb/worker/dm/tags.go index 9a3e710b870f5f2747fd0b7baeb428c826b9f2d1..b71762421a15003123071954312e290e5677a24e 100644 --- a/indexedDb/worker/dm/tags.go +++ b/indexedDb/worker/dm/tags.go @@ -16,8 +16,6 @@ import "gitlab.com/elixxir/xxdk-wasm/worker" const ( NewWASMEventModelTag worker.Tag = "NewWASMEventModel" MessageReceivedCallbackTag worker.Tag = "MessageReceivedCallback" - EncryptionStatusTag worker.Tag = "EncryptionStatus" - StoreDatabaseNameTag worker.Tag = "StoreDatabaseName" ReceiveReplyTag worker.Tag = "ReceiveReply" ReceiveReactionTag worker.Tag = "ReceiveReaction" diff --git a/logging/logger.go b/logging/logger.go index b3e5f728d803e113c1952bae62384d668f7752fc..03bd1cf4abebde1e4645bc87d31089ed9bf1c5d3 100644 --- a/logging/logger.go +++ b/logging/logger.go @@ -251,7 +251,7 @@ func (l *Logger) StopLogging() { switch l.getMode() { case workerMode: - go l.wm.Terminate() + l.wm.Stop() jww.DEBUG.Printf("[LOG] Terminated log worker.") case fileMode: jww.DEBUG.Printf("[LOG] Reset circular buffer.") diff --git a/worker/manager.go b/worker/manager.go index 1a28ac2f99e803b902cdac1e1696ec031d42ca48..c38438facdf8b502d8b2fc00ba0fe4855b5cfaf8 100644 --- a/worker/manager.go +++ b/worker/manager.go @@ -11,12 +11,14 @@ package worker import ( "encoding/json" - "github.com/pkg/errors" - jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/xxdk-wasm/utils" "sync" "syscall/js" "time" + + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + + "gitlab.com/elixxir/xxdk-wasm/utils" ) // initID is the ID for the first item in the callback list. If the list only @@ -35,6 +37,10 @@ const ( ResponseTimeout = 30 * time.Second ) +// receiveQueueChanSize is the size of the channel that received messages are +// put on. +const receiveQueueChanSize = 100 + // ReceptionCallback is the function that handles incoming data from the worker. type ReceptionCallback func(data []byte) @@ -56,6 +62,13 @@ type Manager struct { // original message sent by the main thread. responseIDs map[Tag]uint64 + // receiveQueue is the channel that all received messages are queued on + // while they wait to be processed. + receiveQueue chan []byte + + // quit, when triggered, stops the thread that processes received messages. + quit chan struct{} + // name describes the worker. It is used for debugging and logging purposes. name string @@ -76,10 +89,15 @@ func NewManager(aURL, name string, messageLogging bool) (*Manager, error) { worker: js.Global().Get("Worker").New(aURL, opts), callbacks: make(map[Tag]map[uint64]ReceptionCallback), responseIDs: make(map[Tag]uint64), + receiveQueue: make(chan []byte, receiveQueueChanSize), + quit: make(chan struct{}), name: name, messageLogging: messageLogging, } + // Start thread to process responses from worker + go m.processThread() + // Register listeners on the Javascript worker object that receive messages // and errors from the worker m.addEventListeners() @@ -101,6 +119,35 @@ func NewManager(aURL, name string, messageLogging bool) (*Manager, error) { return m, nil } +// Stop closes the worker manager and terminates the worker. +func (m *Manager) Stop() { + // Stop processThread + select { + case m.quit <- struct{}{}: + } + + // Terminate the worker + go m.terminate() +} + +// processThread processes received messages sequentially. +func (m *Manager) processThread() { + jww.INFO.Printf("[WW] [%s] Starting process thread.", m.name) + for { + select { + case <-m.quit: + jww.INFO.Printf("[WW] [%s] Quitting process thread.", m.name) + return + case message := <-m.receiveQueue: + err := m.processReceivedMessage(message) + if err != nil { + jww.ERROR.Printf("[WW] [%s] Failed to process received "+ + "message from worker: %+v", m.name, err) + } + } + } +} + // SendMessage sends a message to the worker with the given tag. If a reception // callback is specified, then the message is given a unique ID to handle the // reply. Set receptionCB to nil if no reply is expected. @@ -132,7 +179,14 @@ func (m *Manager) SendMessage( // receiveMessage is registered with the Javascript event listener and is called // every time a new message from the worker is received. -func (m *Manager) receiveMessage(data []byte) error { +func (m *Manager) receiveMessage(data []byte) { + m.receiveQueue <- data +} + +// processReceivedMessage processes the message received from the worker and +// calls the associated callback. This functions blocks until the callback +// returns. +func (m *Manager) processReceivedMessage(data []byte) error { var msg Message err := json.Unmarshal(data, &msg) if err != nil { @@ -149,7 +203,7 @@ func (m *Manager) receiveMessage(data []byte) error { return err } - go callback(msg.Data) + callback(msg.Data) return nil } @@ -249,11 +303,7 @@ func (m *Manager) addEventListeners() { // occurs when a message is received from the worker. // Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/message_event messageEvent := js.FuncOf(func(_ js.Value, args []js.Value) any { - err := m.receiveMessage([]byte(args[0].Get("data").String())) - if err != nil { - jww.ERROR.Printf("[WW] [%s] Failed to receive message from "+ - "worker: %+v", m.name, err) - } + m.receiveMessage([]byte(args[0].Get("data").String())) return nil }) @@ -302,11 +352,11 @@ func (m *Manager) postMessage(msg any) { m.worker.Call("postMessage", msg) } -// Terminate immediately terminates the Worker. This does not offer the worker +// terminate immediately terminates the Worker. This does not offer the worker // an opportunity to finish its operations; it is stopped at once. // // Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/terminate -func (m *Manager) Terminate() { +func (m *Manager) terminate() { m.worker.Call("terminate") } diff --git a/worker/manager_test.go b/worker/manager_test.go index 2eadaff2edeaf66a4071f387f608a602c6967b8d..49a395a9c45a33f25892cc1efc86e263ccf06063 100644 --- a/worker/manager_test.go +++ b/worker/manager_test.go @@ -16,8 +16,8 @@ import ( "time" ) -// Tests Manager.receiveMessage calls the expected callback. -func TestManager_receiveMessage(t *testing.T) { +// Tests Manager.processReceivedMessage calls the expected callback. +func TestManager_processReceivedMessage(t *testing.T) { m := &Manager{callbacks: make(map[Tag]map[uint64]ReceptionCallback)} msg := Message{Tag: readyTag, ID: 5} @@ -38,7 +38,7 @@ func TestManager_receiveMessage(t *testing.T) { } }() - err = m.receiveMessage(data) + err = m.processReceivedMessage(data) if err != nil { t.Errorf("Failed to receive message: %+v", err) } @@ -92,7 +92,7 @@ func TestManager_getCallback(t *testing.T) { } // Tests that Manager.RegisterCallback registers a callback that is then called -// by Manager.receiveMessage. +// by Manager.processReceivedMessage. func TestManager_RegisterCallback(t *testing.T) { m := &Manager{callbacks: make(map[Tag]map[uint64]ReceptionCallback)} @@ -114,14 +114,14 @@ func TestManager_RegisterCallback(t *testing.T) { } }() - err = m.receiveMessage(data) + err = m.processReceivedMessage(data) if err != nil { t.Errorf("Failed to receive message: %+v", err) } } // Tests that Manager.registerReplyCallback registers a callback that is then -// called by Manager.receiveMessage. +// called by Manager.processReceivedMessage. func TestManager_registerReplyCallback(t *testing.T) { m := &Manager{ callbacks: make(map[Tag]map[uint64]ReceptionCallback), @@ -147,7 +147,7 @@ func TestManager_registerReplyCallback(t *testing.T) { } }() - err = m.receiveMessage(data) + err = m.processReceivedMessage(data) if err != nil { t.Errorf("Failed to receive message: %+v", err) } diff --git a/worker/thread.go b/worker/thread.go index df824d045f4337c6384eac389f692b15bec200a8..07591fc7d8f7a905f521b98215096b2d8d5ba044 100644 --- a/worker/thread.go +++ b/worker/thread.go @@ -11,11 +11,13 @@ package worker import ( "encoding/json" + "sync" + "syscall/js" + "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/xxdk-wasm/utils" - "sync" - "syscall/js" ) // ThreadReceptionCallback is the function that handles incoming data from the @@ -32,6 +34,13 @@ type ThreadManager struct { // main thread keyed on the callback tag. callbacks map[Tag]ThreadReceptionCallback + // receiveQueue is the channel that all received messages are queued on + // while they wait to be processed. + receiveQueue chan []byte + + // quit, when triggered, stops the thread that processes received messages. + quit chan struct{} + // name describes the worker. It is used for debugging and logging purposes. name string @@ -44,16 +53,52 @@ type ThreadManager struct { // NewThreadManager initialises a new ThreadManager. func NewThreadManager(name string, messageLogging bool) *ThreadManager { - mh := &ThreadManager{ + tm := &ThreadManager{ messages: make(chan js.Value, 100), callbacks: make(map[Tag]ThreadReceptionCallback), + receiveQueue: make(chan []byte, receiveQueueChanSize), + quit: make(chan struct{}), name: name, messageLogging: messageLogging, } + // Start thread to process messages from the main thread + go tm.processThread() - mh.addEventListeners() + tm.addEventListeners() - return mh + return tm +} + +// Stop closes the thread manager and stops the worker. +func (tm *ThreadManager) Stop() { + // Stop processThread + select { + case tm.quit <- struct{}{}: + } + + // Terminate the worker + go tm.close() +} + +// processThread processes received messages sequentially. +func (tm *ThreadManager) processThread() { + jww.INFO.Printf("[WW] [%s] Starting worker process thread.", tm.name) + for { + select { + case <-tm.quit: + jww.INFO.Printf("[WW] [%s] Quitting worker process thread.", tm.name) + return + case message := <-tm.receiveQueue: + if tm.messageLogging { + jww.INFO.Printf("[WW] Worker processors received message: %q", message) + } + err := tm.processReceivedMessage(message) + if err != nil { + jww.ERROR.Printf("[WW] [%s] Failed to receive message from "+ + "main thread: %+v", tm.name, err) + } + } + } } // SignalReady sends a signal to the main thread indicating that the worker is @@ -87,8 +132,7 @@ func (tm *ThreadManager) SendMessage(tag Tag, data []byte) { } // sendResponse sends a reply to the main thread with the given tag and ID. -func (tm *ThreadManager) sendResponse( - tag Tag, id uint64, data []byte) { +func (tm *ThreadManager) sendResponse(tag Tag, id uint64, data []byte) error { msg := Message{ Tag: tag, ID: id, @@ -103,17 +147,26 @@ func (tm *ThreadManager) sendResponse( payload, err := json.Marshal(msg) if err != nil { - jww.FATAL.Panicf("[WW] [%s] Worker failed to marshal %T for %q and ID "+ - "%d going to main: %+v", tm.name, msg, tag, id, err) + return errors.Errorf("worker failed to marshal %T for %q and ID "+ + "%d going to main: %+v", msg, tag, id, err) } go tm.postMessage(string(payload)) + + return nil } // receiveMessage is registered with the Javascript event listener and is called -// everytime a message from the main thread is received. If the registered -// callback returns a response, it is sent to the main thread. -func (tm *ThreadManager) receiveMessage(data []byte) error { +// every time a new message from the main thread is received. +func (tm *ThreadManager) receiveMessage(data []byte) { + tm.receiveQueue <- data +} + +// processReceivedMessage processes the message received from the main thread +// and calls the associated callback. If the registered callback returns a +// response, it is sent to the main thread. This functions blocks until the +// callback returns. +func (tm *ThreadManager) processReceivedMessage(data []byte) error { var msg Message err := json.Unmarshal(data, &msg) if err != nil { @@ -133,16 +186,14 @@ func (tm *ThreadManager) receiveMessage(data []byte) error { } // Call callback and register response with its return - go func() { - response, err2 := callback(msg.Data) - if err2 != nil { - jww.ERROR.Printf("[WW] [%s] Callback for for %q and ID %d "+ - "returned an error: %+v", tm.name, msg.Tag, msg.ID, err) - } - if response != nil { - tm.sendResponse(msg.Tag, msg.ID, response) - } - }() + response, err := callback(msg.Data) + if err != nil { + return errors.Errorf("callback for %q and ID %d returned an error: %+v", + msg.Tag, msg.ID, err) + } + if response != nil { + return tm.sendResponse(msg.Tag, msg.ID, response) + } return nil } @@ -173,11 +224,7 @@ func (tm *ThreadManager) addEventListeners() { // occurs when a message is received from the main thread. // Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/message_event messageEvent := js.FuncOf(func(_ js.Value, args []js.Value) any { - err := tm.receiveMessage([]byte(args[0].Get("data").String())) - if err != nil { - jww.ERROR.Printf("[WW] [%s] Failed to receive message from "+ - "main thread: %+v", tm.name, err) - } + tm.receiveMessage([]byte(args[0].Get("data").String())) return nil }) @@ -218,3 +265,15 @@ func (tm *ThreadManager) addEventListeners() { func (tm *ThreadManager) postMessage(aMessage any) { js.Global().Call("postMessage", aMessage) } + +// close discards any tasks queued in the worker's event loop, effectively +// closing this particular scope. +// +// aMessage must be a js.Value or a primitive type that can be converted via +// js.ValueOf. The Javascript object must be "any value or JavaScript object +// handled by the structured clone algorithm". See the doc for more information. +// +// Doc: https://developer.mozilla.org/en-US/docs/Web/API/DedicatedWorkerGlobalScope/close +func (tm *ThreadManager) close() { + js.Global().Call("close") +} diff --git a/worker/thread_test.go b/worker/thread_test.go index 8f7d09cd931f5e28002ea9cb22b764df834c8062..ada6de8fc00916699b45eaee1483830a277b9fc9 100644 --- a/worker/thread_test.go +++ b/worker/thread_test.go @@ -15,8 +15,8 @@ import ( "time" ) -// Tests that ThreadManager.receiveMessage calls the expected callback. -func TestThreadManager_receiveMessage(t *testing.T) { +// Tests that ThreadManager.processReceivedMessage calls the expected callback. +func TestThreadManager_processReceivedMessage(t *testing.T) { tm := &ThreadManager{callbacks: make(map[Tag]ThreadReceptionCallback)} msg := Message{Tag: readyTag, ID: 5} @@ -37,14 +37,14 @@ func TestThreadManager_receiveMessage(t *testing.T) { } }() - err = tm.receiveMessage(data) + err = tm.processReceivedMessage(data) if err != nil { t.Errorf("Failed to receive message: %+v", err) } } // Tests that ThreadManager.RegisterCallback registers a callback that is then -// called by ThreadManager.receiveMessage. +// called by ThreadManager.processReceivedMessage. func TestThreadManager_RegisterCallback(t *testing.T) { tm := &ThreadManager{callbacks: make(map[Tag]ThreadReceptionCallback)} @@ -66,7 +66,7 @@ func TestThreadManager_RegisterCallback(t *testing.T) { } }() - err = tm.receiveMessage(data) + err = tm.processReceivedMessage(data) if err != nil { t.Errorf("Failed to receive message: %+v", err) }