From f86d7bf5c6c41f4f9119d4b3836121679af3009d Mon Sep 17 00:00:00 2001 From: Jono Wenger <jono@elixxir.io> Date: Mon, 9 Jan 2023 13:34:04 -0800 Subject: [PATCH] Separate replies and sending of new messages to inherently know when to delete a handler and rename handlers to callbacks --- indexedDb/channels/handlers.go | 13 ++- indexedDb/dm/handlers.go | 12 ++- indexedDbWorker/channels/init.go | 12 +-- indexedDbWorker/dm/init.go | 12 +-- worker/manager.go | 132 ++++++++++++++++--------------- worker/manager_test.go | 4 +- worker/message.go | 17 ++++ worker/tag.go | 21 ----- worker/thread.go | 82 ++++++++++++------- 9 files changed, 162 insertions(+), 143 deletions(-) create mode 100644 worker/message.go diff --git a/indexedDb/channels/handlers.go b/indexedDb/channels/handlers.go index 9d8a0d49..9c13ed77 100644 --- a/indexedDb/channels/handlers.go +++ b/indexedDb/channels/handlers.go @@ -97,8 +97,7 @@ func (m *manager) messageReceivedCallback( } // Send it to the main thread - m.mh.SendResponse( - worker.MessageReceivedCallbackTag, worker.InitID, data) + m.mh.SendMessage(worker.MessageReceivedCallbackTag, data) } // storeDatabaseName sends the database name to the main thread and waits for @@ -108,15 +107,14 @@ func (m *manager) messageReceivedCallback( func (m *manager) storeDatabaseName(databaseName string) error { // Register response handler with channel that will wait for the response responseChan := make(chan []byte) - m.mh.RegisterHandler(worker.StoreDatabaseNameTag, + m.mh.RegisterCallback(worker.StoreDatabaseNameTag, func(data []byte) ([]byte, error) { responseChan <- data return nil, nil }) // Send encryption status to main thread - m.mh.SendResponse( - worker.StoreDatabaseNameTag, worker.InitID, []byte(databaseName)) + m.mh.SendMessage(worker.StoreDatabaseNameTag, []byte(databaseName)) // Wait for response select { @@ -153,15 +151,14 @@ func (m *manager) storeEncryptionStatus( // Register response handler with channel that will wait for the response responseChan := make(chan []byte) - m.mh.RegisterHandler(worker.EncryptionStatusTag, + m.mh.RegisterCallback(worker.EncryptionStatusTag, func(data []byte) ([]byte, error) { responseChan <- data return nil, nil }) // Send encryption status to main thread - m.mh.SendResponse( - worker.EncryptionStatusTag, worker.InitID, data) + m.mh.SendMessage(worker.EncryptionStatusTag, data) // Wait for response var response mChannels.EncryptionStatusReply diff --git a/indexedDb/dm/handlers.go b/indexedDb/dm/handlers.go index 60c6b9f3..4e2607ee 100644 --- a/indexedDb/dm/handlers.go +++ b/indexedDb/dm/handlers.go @@ -91,7 +91,7 @@ func (m *manager) messageReceivedCallback( } // Send it to the main thread - m.mh.SendResponse(worker.MessageReceivedCallbackTag, worker.InitID, data) + m.mh.SendMessage(worker.MessageReceivedCallbackTag, data) } // storeDatabaseName sends the database name to the main thread and waits for @@ -101,15 +101,14 @@ func (m *manager) messageReceivedCallback( func (m *manager) storeDatabaseName(databaseName string) error { // Register response handler with channel that will wait for the response responseChan := make(chan []byte) - m.mh.RegisterHandler(worker.StoreDatabaseNameTag, + m.mh.RegisterCallback(worker.StoreDatabaseNameTag, func(data []byte) ([]byte, error) { responseChan <- data return nil, nil }) // Send encryption status to main thread - m.mh.SendResponse( - worker.StoreDatabaseNameTag, worker.InitID, []byte(databaseName)) + m.mh.SendMessage(worker.StoreDatabaseNameTag, []byte(databaseName)) // Wait for response select { @@ -146,15 +145,14 @@ func (m *manager) storeEncryptionStatus( // Register response handler with channel that will wait for the response responseChan := make(chan []byte) - m.mh.RegisterHandler(worker.EncryptionStatusTag, + m.mh.RegisterCallback(worker.EncryptionStatusTag, func(data []byte) ([]byte, error) { responseChan <- data return nil, nil }) // Send encryption status to main thread - m.mh.SendResponse( - worker.EncryptionStatusTag, worker.InitID, data) + m.mh.SendMessage(worker.EncryptionStatusTag, data) // Wait for response var response mChannels.EncryptionStatusReply diff --git a/indexedDbWorker/channels/init.go b/indexedDbWorker/channels/init.go index 78ffc47d..653ba644 100644 --- a/indexedDbWorker/channels/init.go +++ b/indexedDbWorker/channels/init.go @@ -63,16 +63,16 @@ func NewWASMEventModel(path string, encryption cryptoChannel.Cipher, } // Register handler to manage messages for the MessageReceivedCallback - wm.RegisterHandler(worker.MessageReceivedCallbackTag, - worker.InitID, false, messageReceivedCallbackHandler(cb)) + wm.RegisterCallback( + worker.MessageReceivedCallbackTag, messageReceivedCallbackHandler(cb)) // Register handler to manage checking encryption status from local storage - wm.RegisterHandler(worker.EncryptionStatusTag, - worker.InitID, false, checkDbEncryptionStatusHandler(wm)) + wm.RegisterCallback( + worker.EncryptionStatusTag, checkDbEncryptionStatusHandler(wm)) // Register handler to manage the storage of the database name - wm.RegisterHandler(worker.StoreDatabaseNameTag, - worker.InitID, false, storeDatabaseNameHandler(wm)) + wm.RegisterCallback( + worker.StoreDatabaseNameTag, storeDatabaseNameHandler(wm)) encryptionJSON, err := json.Marshal(encryption) if err != nil { diff --git a/indexedDbWorker/dm/init.go b/indexedDbWorker/dm/init.go index 378b6a0c..4290a580 100644 --- a/indexedDbWorker/dm/init.go +++ b/indexedDbWorker/dm/init.go @@ -53,16 +53,16 @@ func NewWASMEventModel(path string, encryption cryptoChannel.Cipher, } // Register handler to manage messages for the MessageReceivedCallback - wh.RegisterHandler(worker.MessageReceivedCallbackTag, worker.InitID, false, - messageReceivedCallbackHandler(cb)) + wh.RegisterCallback( + worker.MessageReceivedCallbackTag, messageReceivedCallbackHandler(cb)) // Register handler to manage checking encryption status from local storage - wh.RegisterHandler(worker.EncryptionStatusTag, worker.InitID, false, - checkDbEncryptionStatusHandler(wh)) + wh.RegisterCallback( + worker.EncryptionStatusTag, checkDbEncryptionStatusHandler(wh)) // Register handler to manage the storage of the database name - wh.RegisterHandler(worker.StoreDatabaseNameTag, worker.InitID, false, - storeDatabaseNameHandler(wh)) + wh.RegisterCallback( + worker.StoreDatabaseNameTag, storeDatabaseNameHandler(wh)) encryptionJSON, err := json.Marshal(encryption) if err != nil { diff --git a/worker/manager.go b/worker/manager.go index 26c8a85e..bd648c57 100644 --- a/worker/manager.go +++ b/worker/manager.go @@ -25,8 +25,8 @@ import ( // 3. Get path to JS file from bindings // 4. Add tests for manager.go and thread.go -// InitID is the ID for the first item in the handler list. If the list only -// contains one handler, then this is the ID of that handler. If the list has +// InitID is the ID for the first item in the callback list. If the list only +// contains one callback, then this is the ID of that callback. If the list has // autogenerated unique IDs, this is the initial ID to start at. const InitID = uint64(0) @@ -41,8 +41,8 @@ const ( ResponseTimeout = 8 * time.Second ) -// RHandlerFn is the function that handles incoming data from the worker. -type RHandlerFn func(data []byte) +// ReceptionCallback is the function that handles incoming data from the worker. +type ReceptionCallback func(data []byte) // Manager manages the handling of messages received from the worker. type Manager struct { @@ -50,17 +50,17 @@ type Manager struct { // Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker worker js.Value - // handlers are a list of handlers that handle a specific message received - // from the worker. Each handler is keyed on a tag specifying how the - // received message should be handled. If the message is a reply to a - // message sent to the worker, then the handler is also keyed on a unique + // callbacks are a list of ReceptionCallback that handle a specific message + // received from the worker. Each callback is keyed on a tag specifying how + // the received message should be handled. If the message is a reply to a + // message sent to the worker, then the callback is also keyed on a unique // ID. If the message is not a reply, then it appears on InitID. - handlers map[Tag]map[uint64]RHandlerFn + callbacks map[Tag]map[uint64]ReceptionCallback - // handlerIDs is a list of the newest ID to assign to each handler when + // responseIDs is a list of the newest ID to assign to each callback when // registered. The IDs are used to connect a reply from the worker to the // original message sent by the main thread. - handlerIDs map[Tag]uint64 + responseIDs map[Tag]uint64 // name describes the worker. It is used for debugging and logging purposes. name string @@ -68,14 +68,6 @@ type Manager struct { mux sync.Mutex } -// Message is the outer message that contains the contents of each message sent -// to the worker. It is transmitted as JSON. -type Message struct { - Tag Tag `json:"tag"` - ID uint64 `json:"id"` - Data []byte `json:"data"` -} - // NewManager generates a new Manager. This functions will only return once // communication with the worker has been established. func NewManager(aURL, name string) (*Manager, error) { @@ -83,21 +75,20 @@ func NewManager(aURL, name string) (*Manager, error) { opts := newWorkerOptions("", "", name) m := &Manager{ - worker: js.Global().Get("Worker").New(aURL, opts), - handlers: make(map[Tag]map[uint64]RHandlerFn), - handlerIDs: make(map[Tag]uint64), - name: name, + worker: js.Global().Get("Worker").New(aURL, opts), + callbacks: make(map[Tag]map[uint64]ReceptionCallback), + responseIDs: make(map[Tag]uint64), + name: name, } // Register listeners on the Javascript worker object that receive messages // and errors from the worker m.addEventListeners() - // Register a handler that will receive initial message from worker + // Register a callback that will receive initial message from worker // indicating that it is ready ready := make(chan struct{}) - m.RegisterHandler( - ReadyTag, InitID, false, func([]byte) { ready <- struct{}{} }) + m.RegisterCallback(ReadyTag, func([]byte) { ready <- struct{}{} }) // Wait for the ready signal from the worker select { @@ -112,13 +103,13 @@ func NewManager(aURL, name string) (*Manager, error) { } // SendMessage sends a message to the worker with the given tag. If a reception -// handler is specified, then the message is given a unique ID to handle the -// reply. Set receptionHandler to nil if no reply is expected. +// callback is specified, then the message is given a unique ID to handle the +// reply. Set receptionCB to nil if no reply is expected. func (m *Manager) SendMessage( - tag Tag, data []byte, receptionHandler RHandlerFn) { + tag Tag, data []byte, receptionCB ReceptionCallback) { var id uint64 - if receptionHandler != nil { - id = m.RegisterHandler(tag, 0, true, receptionHandler) + if receptionCB != nil { + id = m.registerReplyCallback(tag, receptionCB) } jww.DEBUG.Printf("[WW] [%s] Main sending message for %q and ID %d with "+ @@ -149,62 +140,75 @@ func (m *Manager) receiveMessage(data []byte) error { jww.DEBUG.Printf("[WW] [%s] Main received message for %q and ID %d with "+ "data: %s", m.name, msg.Tag, msg.ID, msg.Data) - handler, err := m.getHandler(msg.Tag, msg.ID) + callback, err := m.getCallback(msg.Tag, msg.ID, msg.DeleteCB) if err != nil { return err } - go handler(msg.Data) + go callback(msg.Data) return nil } -// getHandler returns the handler with the given ID or returns an error if no -// handler is found. The handler is deleted from the map if specified in -// deleteAfterReceiving. This function is thread safe. -func (m *Manager) getHandler(tag Tag, id uint64) (RHandlerFn, error) { +// getCallback returns the callback for the given ID or returns an error if no +// callback is found. The callback is deleted from the map if specified in the +// message. This function is thread safe. +func (m *Manager) getCallback( + tag Tag, id uint64, deleteCB bool) (ReceptionCallback, error) { m.mux.Lock() defer m.mux.Unlock() - handlers, exists := m.handlers[tag] + callbacks, exists := m.callbacks[tag] if !exists { - return nil, errors.Errorf("no handlers found for tag %q", tag) + return nil, errors.Errorf("no callbacks found for tag %q", tag) } - handler, exists := handlers[id] + callback, exists := callbacks[id] if !exists { - return nil, errors.Errorf("no %q handler found for ID %d", tag, id) + return nil, errors.Errorf("no %q callback found for ID %d", tag, id) } - if _, exists = deleteAfterReceiving[tag]; exists { - delete(m.handlers[tag], id) - if len(m.handlers[tag]) == 0 { - delete(m.handlers, tag) + if deleteCB { + delete(m.callbacks[tag], id) + if len(m.callbacks[tag]) == 0 { + delete(m.callbacks, tag) } } - return handler, nil + return callback, nil } -// RegisterHandler registers the handler for the given tag and ID unless autoID -// is true, in which case a unique ID is used. Returns the ID that was -// registered. If a previous handler was registered, it is overwritten. -// This function is thread safe. -func (m *Manager) RegisterHandler( - tag Tag, id uint64, autoID bool, handler RHandlerFn) uint64 { +// RegisterCallback registers the reception callback for the given tag. If a +// previous callback was registered, it is overwritten. This function is thread +// safe. +func (m *Manager) RegisterCallback(tag Tag, receptionCB ReceptionCallback) { m.mux.Lock() defer m.mux.Unlock() - if autoID { - id = m.getNextID(tag) - } + id := InitID + + jww.DEBUG.Printf("[WW] [%s] Main registering callback for tag %q and ID "+ + "%d (autoID: %t)", m.name, tag, id) + + m.callbacks[tag] = map[uint64]ReceptionCallback{id: receptionCB} +} + +// RegisterCallback registers the reception callback for the given tag and a new +// unique ID used to associate the reply to the callback. Returns the ID that +// was registered. If a previous callback was registered, it is overwritten. +// This function is thread safe. +func (m *Manager) registerReplyCallback( + tag Tag, receptionCB ReceptionCallback) uint64 { + m.mux.Lock() + defer m.mux.Unlock() + id := m.getNextID(tag) - jww.DEBUG.Printf("[WW] [%s] Main registering handler for tag %q and ID %d "+ - "(autoID: %t)", m.name, tag, id, autoID) + jww.DEBUG.Printf("[WW] [%s] Main registering callback for tag %q and ID %d", + m.name, tag, id) - if _, exists := m.handlers[tag]; !exists { - m.handlers[tag] = make(map[uint64]RHandlerFn) + if _, exists := m.callbacks[tag]; !exists { + m.callbacks[tag] = make(map[uint64]ReceptionCallback) } - m.handlers[tag][id] = handler + m.callbacks[tag][id] = receptionCB return id } @@ -212,12 +216,12 @@ func (m *Manager) RegisterHandler( // getNextID returns the next unique ID for the given tag. This function is not // thread-safe. func (m *Manager) getNextID(tag Tag) uint64 { - if _, exists := m.handlerIDs[tag]; !exists { - m.handlerIDs[tag] = InitID + if _, exists := m.responseIDs[tag]; !exists { + m.responseIDs[tag] = InitID } - id := m.handlerIDs[tag] - m.handlerIDs[tag]++ + id := m.responseIDs[tag] + m.responseIDs[tag]++ return id } diff --git a/worker/manager_test.go b/worker/manager_test.go index ebee8ceb..6d9d8530 100644 --- a/worker/manager_test.go +++ b/worker/manager_test.go @@ -31,8 +31,8 @@ func TestManager_RegisterHandler(t *testing.T) { // Tests that Manager.getNextID returns the expected ID for various Tags. func TestManager_getNextID(t *testing.T) { m := &Manager{ - handlers: make(map[Tag]map[uint64]RHandlerFn), - handlerIDs: make(map[Tag]uint64), + callbacks: make(map[Tag]map[uint64]ReceptionCallback), + responseIDs: make(map[Tag]uint64), } for _, tag := range []Tag{ diff --git a/worker/message.go b/worker/message.go new file mode 100644 index 00000000..a4adfcca --- /dev/null +++ b/worker/message.go @@ -0,0 +1,17 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package worker + +// Message is the outer message that contains the contents of each message sent +// to the worker. It is transmitted as JSON. +type Message struct { + Tag Tag `json:"tag"` + ID uint64 `json:"id"` + DeleteCB bool `json:"deleteCB"` + Data []byte `json:"data"` +} diff --git a/worker/tag.go b/worker/tag.go index 9375d20c..0763da0c 100644 --- a/worker/tag.go +++ b/worker/tag.go @@ -39,24 +39,3 @@ const ( ReceiveTextTag Tag = "ReceiveText" UpdateSentStatusTag Tag = "UpdateSentStatusTag" ) - -// deleteAfterReceiving is a list of Tags that will have their handler deleted -// after a message is received. This is mainly used for responses where the -// handler will only handle it once and never again. -var deleteAfterReceiving = map[Tag]struct{}{ - ReadyTag: {}, - NewWASMEventModelTag: {}, - EncryptionStatusTag: {}, - JoinChannelTag: {}, - LeaveChannelTag: {}, - ReceiveMessageTag: {}, - ReceiveReplyTag: {}, - ReceiveReactionTag: {}, - UpdateFromUUIDTag: {}, - UpdateFromMessageIDTag: {}, - GetMessageTag: {}, - DeleteMessageTag: {}, - ReceiveTag: {}, - ReceiveTextTag: {}, - UpdateSentStatusTag: {}, -} diff --git a/worker/thread.go b/worker/thread.go index 3ed8ba33..00969db8 100644 --- a/worker/thread.go +++ b/worker/thread.go @@ -18,8 +18,9 @@ import ( "syscall/js" ) -// HandlerFn is the function that handles incoming data from the main thread. -type HandlerFn func(data []byte) ([]byte, error) +// ThreadReceptionCallback is the function that handles incoming data from the +// main thread. +type ThreadReceptionCallback func(data []byte) ([]byte, error) // ThreadManager queues incoming messages from the main thread and handles them // based on their tag. @@ -27,9 +28,9 @@ type ThreadManager struct { // messages is a list of queued messages sent from the main thread. messages chan js.Value - // handlers is a list of functions to handle messages that come from the - // main thread keyed on the handler tag. - handlers map[Tag]HandlerFn + // callbacks is a list of callbacks to handle messages that come from the + // main thread keyed on the callback tag. + callbacks map[Tag]ThreadReceptionCallback // name describes the worker. It is used for debugging and logging purposes. name string @@ -40,9 +41,9 @@ type ThreadManager struct { // NewThreadManager initialises a new ThreadManager. func NewThreadManager(name string) *ThreadManager { mh := &ThreadManager{ - messages: make(chan js.Value, 100), - handlers: make(map[Tag]HandlerFn), - name: name, + messages: make(chan js.Value, 100), + callbacks: make(map[Tag]ThreadReceptionCallback), + name: name, } mh.addEventListeners() @@ -54,18 +55,39 @@ func NewThreadManager(name string) *ThreadManager { // ready. Once the main thread receives this, it will initiate communication. // Therefore, this should only be run once all listeners are ready. func (tm *ThreadManager) SignalReady() { - tm.SendResponse(ReadyTag, InitID, nil) + tm.SendMessage(ReadyTag, nil) } -// SendResponse sends a reply to the main thread with the given tag and ID, -func (tm *ThreadManager) SendResponse( +// SendMessage sends a message to the main thread for the given tag. +func (tm *ThreadManager) SendMessage(tag Tag, data []byte) { + msg := Message{ + Tag: tag, + ID: InitID, + DeleteCB: false, + Data: data, + } + jww.DEBUG.Printf("[WW] [%s] Worker sending message for %q with data: %s", + tm.name, tag, data) + + payload, err := json.Marshal(msg) + if err != nil { + jww.FATAL.Panicf("[WW] [%s] Worker failed to marshal %T for %q going "+ + "to main: %+v", tm.name, msg, tag, err) + } + + go tm.postMessage(string(payload)) +} + +// sendResponse sends a reply to the main thread with the given tag and ID. +func (tm *ThreadManager) sendResponse( tag Tag, id uint64, data []byte) { msg := Message{ - Tag: tag, - ID: id, - Data: data, + Tag: tag, + ID: id, + DeleteCB: true, + Data: data, } - jww.DEBUG.Printf("[WW] [%s] Worker sending message for %q and ID %d with "+ + jww.DEBUG.Printf("[WW] [%s] Worker sending reply for %q and ID %d with "+ "data: %s", tm.name, tag, id, data) payload, err := json.Marshal(msg) @@ -79,7 +101,7 @@ func (tm *ThreadManager) SendResponse( // receiveMessage is registered with the Javascript event listener and is called // everytime a message from the main thread is received. If the registered -// handler returns a response, it is sent to the main thread. +// callback returns a response, it is sent to the main thread. func (tm *ThreadManager) receiveMessage(data []byte) error { var msg Message err := json.Unmarshal(data, &msg) @@ -90,36 +112,38 @@ func (tm *ThreadManager) receiveMessage(data []byte) error { "data: %s", tm.name, msg.Tag, msg.ID, msg.Data) tm.mux.Lock() - handler, exists := tm.handlers[msg.Tag] + callback, exists := tm.callbacks[msg.Tag] tm.mux.Unlock() if !exists { - return errors.Errorf("no handler found for tag %q", msg.Tag) + return errors.Errorf("no callback found for tag %q", msg.Tag) } - // Call handler and register response with its return + // Call callback and register response with its return go func() { - response, err2 := handler(msg.Data) + response, err2 := callback(msg.Data) if err2 != nil { - jww.ERROR.Printf("[WW] [%s] Handler for for %q and ID %d returned "+ - "an error: %+v", tm.name, msg.Tag, msg.ID, err) + jww.ERROR.Printf("[WW] [%s] Callback for for %q and ID %d "+ + "returned an error: %+v", tm.name, msg.Tag, msg.ID, err) } if response != nil { - tm.SendResponse(msg.Tag, msg.ID, response) + tm.sendResponse(msg.Tag, msg.ID, response) } }() return nil } -// RegisterHandler registers the handler with the given tag overwriting any -// previous registered handler with the same tag. This function is thread safe. +// RegisterCallback registers the callback with the given tag overwriting any +// previous registered callbacks with the same tag. This function is thread +// safe. // -// If the handler returns anything but nil, it will be returned as a response. -func (tm *ThreadManager) RegisterHandler(tag Tag, handler HandlerFn) { +// If the callback returns anything but nil, it will be returned as a response. +func (tm *ThreadManager) RegisterCallback( + tag Tag, receptionCallback ThreadReceptionCallback) { jww.DEBUG.Printf( - "[WW] [%s] Worker registering handler for tag %q", tm.name, tag) + "[WW] [%s] Worker registering callback for tag %q", tm.name, tag) tm.mux.Lock() - tm.handlers[tag] = handler + tm.callbacks[tag] = receptionCallback tm.mux.Unlock() } -- GitLab