Skip to content
Snippets Groups Projects
Commit 09e0827f authored by Jono Wenger's avatar Jono Wenger
Browse files

Update worker

parent fd2b3c4a
Branches
Tags
2 merge requests!67fix for latest client release,!52XX-4382 / Move indexedDb databases to web workers
......@@ -31,8 +31,8 @@ build:
- go mod vendor -v
- mkdir -p release
- GOOS=js GOARCH=wasm go build -ldflags '-w -s' -o release/xxdk.wasm main.go
- GOOS=js GOARCH=wasm go build -mod vendor -ldflags '-w -s' -trimpath -o release/xxdk-channelsIndexedDkWorker.wasm ./indexedDb/impl/channels/...
- GOOS=js GOARCH=wasm go build -mod vendor -ldflags '-w -s' -trimpath -o release/xxdk-dmIndexedDkWorker.wasm ./indexedDb/impl/dm/...
- GOOS=js GOARCH=wasm go build -ldflags '-w -s' -trimpath -o release/xxdk-channelsIndexedDkWorker.wasm ./indexedDb/impl/channels/...
- GOOS=js GOARCH=wasm go build -ldflags '-w -s' -trimpath -o release/xxdk-dmIndexedDkWorker.wasm ./indexedDb/impl/dm/...
- cp wasm_exec.js release/
- cp indexedDb/impl/channels/channelsIndexedDbWorker.js release/
- cp indexedDb/impl/dm/dmIndexedDbWorker.js release/
......
......@@ -32,7 +32,7 @@ func main() {
js.Global().Set("LogToFile", js.FuncOf(wasm.LogToFile))
js.Global().Set("RegisterLogWriter", js.FuncOf(wasm.RegisterLogWriter))
m := &manager{mh: worker.NewThreadManager("ChannelsIndexedDbWorker")}
m := &manager{mh: worker.NewThreadManager("ChannelsIndexedDbWorker", true)}
m.registerCallbacks()
m.mh.SignalReady()
<-make(chan bool)
......
......@@ -32,7 +32,7 @@ func main() {
js.Global().Set("LogToFile", js.FuncOf(wasm.LogToFile))
js.Global().Set("RegisterLogWriter", js.FuncOf(wasm.RegisterLogWriter))
m := &manager{mh: worker.NewThreadManager("DmIndexedDbWorker")}
m := &manager{mh: worker.NewThreadManager("DmIndexedDbWorker", true)}
m.registerCallbacks()
m.mh.SignalReady()
<-make(chan bool)
......
......@@ -66,7 +66,7 @@ func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher,
deletedMessageCB DeletedMessageCallback, mutedUserCB MutedUserCallback) (
channels.EventModel, error) {
wm, err := worker.NewManager(wasmJsPath, "channelsIndexedDb")
wm, err := worker.NewManager(wasmJsPath, "channelsIndexedDb", true)
if err != nil {
return nil, err
}
......
......@@ -41,7 +41,7 @@ type NewWASMEventModelMessage struct {
func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher,
cb MessageReceivedCallback) (dm.EventModel, error) {
wh, err := worker.NewManager(wasmJsPath, "dmIndexedDb")
wh, err := worker.NewManager(wasmJsPath, "dmIndexedDb", true)
if err != nil {
return nil, err
}
......
......@@ -15,7 +15,6 @@ import (
"syscall/js"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/v4/bindings"
"gitlab.com/elixxir/xxdk-wasm/storage"
"gitlab.com/elixxir/xxdk-wasm/utils"
"gitlab.com/elixxir/xxdk-wasm/wasm"
......@@ -37,7 +36,6 @@ func init() {
func main() {
fmt.Println("Starting xxDK WebAssembly bindings.")
fmt.Printf("Client version %s\n", bindings.GetVersion())
// storage/password.go
js.Global().Set("GetOrInitPassword", js.FuncOf(storage.GetOrInitPassword))
......
......@@ -49,3 +49,14 @@ func JsonToJS(inputJson []byte) (js.Value, error) {
return js.ValueOf(jsObj), nil
}
// JsErrorToJson converts the Javascript error to JSON. This should be used for
// all Javascript error objects instead of JsonToJS.
func JsErrorToJson(value js.Value) string {
if value.IsUndefined() {
return "null"
}
properties := Object.Call("getOwnPropertyNames", value)
return JSON.Call("stringify", value, properties).String()
}
......@@ -241,3 +241,65 @@ func TestJsonToJSJsToJson(t *testing.T) {
"\nexpected: %s\nreceived: %s", jsonData, jsJson)
}
}
// Tests that JsErrorToJson can convert a Javascript object to JSON that matches
// the output of json.Marshal on the Go version of the same object.
func TestJsErrorToJson(t *testing.T) {
testObj := map[string]any{
"nil": nil,
"bool": true,
"int": 1,
"float": 1.5,
"string": "I am string",
"array": []any{1, 2, 3},
"object": map[string]any{"int": 5},
}
expected, err := json.Marshal(testObj)
if err != nil {
t.Errorf("Failed to JSON marshal test object: %+v", err)
}
jsJson := JsErrorToJson(js.ValueOf(testObj))
// Javascript does not return the JSON object fields sorted so the letters
// of each Javascript string are sorted and compared
er := []rune(string(expected))
sort.SliceStable(er, func(i, j int) bool { return er[i] < er[j] })
jj := []rune(jsJson)
sort.SliceStable(jj, func(i, j int) bool { return jj[i] < jj[j] })
if string(er) != string(jj) {
t.Errorf("Recieved incorrect JSON from Javascript object."+
"\nexpected: %s\nreceived: %s", expected, jsJson)
}
}
// Tests that JsErrorToJson return a null object when the Javascript object is
// undefined.
func TestJsErrorToJson_Undefined(t *testing.T) {
expected, err := json.Marshal(nil)
if err != nil {
t.Errorf("Failed to JSON marshal test object: %+v", err)
}
jsJson := JsErrorToJson(js.Undefined())
if string(expected) != jsJson {
t.Errorf("Recieved incorrect JSON from Javascript object."+
"\nexpected: %s\nreceived: %s", expected, jsJson)
}
}
// Tests that JsErrorToJson returns a JSON object containing the original error
// string.
func TestJsErrorToJson_ErrorObject(t *testing.T) {
expected := "An error"
jsErr := Error.New(expected)
jsJson := JsErrorToJson(jsErr)
if !strings.Contains(jsJson, expected) {
t.Errorf("Recieved incorrect JSON from Javascript error."+
"\nexpected: %s\nreceived: %s", expected, jsJson)
}
}
......@@ -19,9 +19,6 @@ import (
"time"
)
// TODO:
// 1. Add tests for manager.go and thread.go
// initID is the ID for the first item in the callback list. If the list only
// contains one callback, then this is the ID of that callback. If the list has
// autogenerated unique IDs, this is the initial ID to start at.
......@@ -31,11 +28,11 @@ const initID = uint64(0)
const (
// workerInitialConnectionTimeout is the time to wait to receive initial
// contact from a new worker before timing out.
workerInitialConnectionTimeout = 16 * time.Second
workerInitialConnectionTimeout = 90 * time.Second
// ResponseTimeout is the general time to wait after sending a message to
// receive a response before timing out.
ResponseTimeout = 8 * time.Second
ResponseTimeout = 30 * time.Second
)
// ReceptionCallback is the function that handles incoming data from the worker.
......@@ -62,12 +59,16 @@ type Manager struct {
// name describes the worker. It is used for debugging and logging purposes.
name string
// messageLogging determines if debug message logs should be printed every
// time a message is sent/received to/from the worker.
messageLogging bool
mux sync.Mutex
}
// NewManager generates a new Manager. This functions will only return once
// communication with the worker has been established.
func NewManager(aURL, name string) (*Manager, error) {
func NewManager(aURL, name string, messageLogging bool) (*Manager, error) {
// Create new worker options with the given name
opts := newWorkerOptions("", "", name)
......@@ -76,6 +77,7 @@ func NewManager(aURL, name string) (*Manager, error) {
callbacks: make(map[Tag]map[uint64]ReceptionCallback),
responseIDs: make(map[Tag]uint64),
name: name,
messageLogging: messageLogging,
}
// Register listeners on the Javascript worker object that receive messages
......@@ -109,10 +111,12 @@ func (m *Manager) SendMessage(
id = m.registerReplyCallback(tag, receptionCB)
}
jww.DEBUG.Printf("[WW] [%s] Main sending message for %q and ID %d with "+
"data: %s", m.name, tag, id, data)
if m.messageLogging {
jww.DEBUG.Printf("[WW] [%s] Main sending message for %q and ID %d "+
"with data: %s", m.name, tag, id, data)
}
msg := message{
msg := Message{
Tag: tag,
ID: id,
Data: data,
......@@ -129,13 +133,16 @@ func (m *Manager) SendMessage(
// receiveMessage is registered with the Javascript event listener and is called
// every time a new message from the worker is received.
func (m *Manager) receiveMessage(data []byte) error {
var msg message
var msg Message
err := json.Unmarshal(data, &msg)
if err != nil {
return err
}
jww.DEBUG.Printf("[WW] [%s] Main received message for %q and ID %d with "+
"data: %s", m.name, msg.Tag, msg.ID, msg.Data)
if m.messageLogging {
jww.DEBUG.Printf("[WW] [%s] Main received message for %q and ID %d "+
"with data: %s", m.name, msg.Tag, msg.ID, msg.Data)
}
callback, err := m.getCallback(msg.Tag, msg.ID, msg.DeleteCB)
if err != nil {
......@@ -222,6 +229,14 @@ func (m *Manager) getNextID(tag Tag) uint64 {
return id
}
// GetWorker returns the web worker object. This returned so the worker object
// can be returned to the Javascript layer for it to communicate with the worker
// thread.
func (m *Manager) GetWorker() js.Value { return m.worker }
// Name returns the name of the web worker object.
func (m *Manager) Name() string { return m.name }
////////////////////////////////////////////////////////////////////////////////
// Javascript Call Wrappers //
////////////////////////////////////////////////////////////////////////////////
......@@ -242,20 +257,31 @@ func (m *Manager) addEventListeners() {
return nil
})
// Create listener for when an error event is fired on the worker. This
// occurs when an error occurs in the worker.
// Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/error_event
errorEvent := js.FuncOf(func(_ js.Value, args []js.Value) any {
event := args[0]
jww.ERROR.Printf("[WW] [%s] Main received error event: %s",
m.name, utils.JsErrorToJson(event))
return nil
})
// Create listener for when a messageerror event is fired on the worker.
// This occurs when it receives a message that cannot be deserialized.
// Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/messageerror_event
messageError := js.FuncOf(func(_ js.Value, args []js.Value) any {
messageerrorEvent := js.FuncOf(func(_ js.Value, args []js.Value) any {
event := args[0]
jww.ERROR.Printf("[WW] [%s] Main received error message from worker: %s",
m.name, utils.JsToJson(event))
jww.ERROR.Printf("[WW] [%s] Main received message error event: %s",
m.name, utils.JsErrorToJson(event))
return nil
})
// Register each event listener on the worker using addEventListener
// Doc: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget/addEventListener
m.worker.Call("addEventListener", "message", messageEvent)
m.worker.Call("addEventListener", "messageerror", messageError)
m.worker.Call("addEventListener", "error", errorEvent)
m.worker.Call("addEventListener", "messageerror", messageerrorEvent)
}
// postMessage sends a message to the worker.
......@@ -290,7 +316,7 @@ func (m *Manager) Terminate() {
// Each property is optional; leave a property empty to use the defaults (as
// documented). The available properties are:
// - type - The type of worker to create. The value can be either "classic" or
// "module". If not specified, the default used is classic.
// "module". If not specified, the default used is "classic".
// - credentials - The type of credentials to use for the worker. The value
// can be "omit", "same-origin", or "include". If it is not specified, or if
// the type is "classic", then the default used is "omit" (no credentials
......
......@@ -10,22 +10,147 @@
package worker
import (
"encoding/json"
"reflect"
"testing"
"time"
)
func TestNewManager(t *testing.T) {
// Tests Manager.receiveMessage calls the expected callback.
func TestManager_receiveMessage(t *testing.T) {
m := &Manager{callbacks: make(map[Tag]map[uint64]ReceptionCallback)}
msg := Message{Tag: readyTag, ID: 5}
cbChan := make(chan struct{})
cb := func([]byte) { cbChan <- struct{}{} }
m.callbacks[msg.Tag] = map[uint64]ReceptionCallback{msg.ID: cb}
data, err := json.Marshal(msg)
if err != nil {
t.Fatalf("Failed to JSON marshal Message: %+v", err)
}
func TestManager_SendMessage(t *testing.T) {
go func() {
select {
case <-cbChan:
case <-time.After(10 * time.Millisecond):
t.Error("Timed out waiting for callback to be called.")
}
}()
func TestManager_receiveMessage(t *testing.T) {
err = m.receiveMessage(data)
if err != nil {
t.Errorf("Failed to receive message: %+v", err)
}
}
// Tests Manager.getCallback returns the expected callback and deletes only the
// given callback when deleteCB is true.
func TestManager_getCallback(t *testing.T) {
m := &Manager{callbacks: make(map[Tag]map[uint64]ReceptionCallback)}
// Add new callback and check that it is returned by getCallback
tag, id1 := readyTag, uint64(5)
cb := func([]byte) {}
m.callbacks[tag] = map[uint64]ReceptionCallback{id1: cb}
received, err := m.getCallback(tag, id1, false)
if err != nil {
t.Errorf("getCallback error for tag %q and ID %d: %+v", tag, id1, err)
}
if reflect.ValueOf(cb).Pointer() != reflect.ValueOf(received).Pointer() {
t.Errorf("Wrong callback.\nexpected: %p\nreceived: %p", cb, received)
}
// Add new callback under the same tag but with deleteCB set to true and
// check that it is returned by getCallback and that it was deleted from the
// map while id1 was not
id2 := uint64(56)
cb = func([]byte) {}
m.callbacks[tag][id2] = cb
received, err = m.getCallback(tag, id2, true)
if err != nil {
t.Errorf("getCallback error for tag %q and ID %d: %+v", tag, id2, err)
}
if reflect.ValueOf(cb).Pointer() != reflect.ValueOf(received).Pointer() {
t.Errorf("Wrong callback.\nexpected: %p\nreceived: %p", cb, received)
}
received, err = m.getCallback(tag, id1, false)
if err != nil {
t.Errorf("getCallback error for tag %q and ID %d: %+v", tag, id1, err)
}
received, err = m.getCallback(tag, id2, true)
if err == nil {
t.Errorf("getCallback did not get error when trying to get deleted "+
"callback for tag %q and ID %d", tag, id2)
}
}
// Tests that Manager.RegisterCallback registers a callback that is then called
// by Manager.receiveMessage.
func TestManager_RegisterCallback(t *testing.T) {
m := &Manager{callbacks: make(map[Tag]map[uint64]ReceptionCallback)}
msg := Message{Tag: readyTag, ID: initID}
cbChan := make(chan struct{})
cb := func([]byte) { cbChan <- struct{}{} }
m.RegisterCallback(msg.Tag, cb)
data, err := json.Marshal(msg)
if err != nil {
t.Fatalf("Failed to JSON marshal Message: %+v", err)
}
go func() {
select {
case <-cbChan:
case <-time.After(10 * time.Millisecond):
t.Error("Timed out waiting for callback to be called.")
}
}()
err = m.receiveMessage(data)
if err != nil {
t.Errorf("Failed to receive message: %+v", err)
}
}
func TestManager_getHandler(t *testing.T) {
// Tests that Manager.registerReplyCallback registers a callback that is then
// called by Manager.receiveMessage.
func TestManager_registerReplyCallback(t *testing.T) {
m := &Manager{
callbacks: make(map[Tag]map[uint64]ReceptionCallback),
responseIDs: make(map[Tag]uint64),
}
func TestManager_RegisterHandler(t *testing.T) {
msg := Message{Tag: readyTag, ID: 5}
cbChan := make(chan struct{})
cb := func([]byte) { cbChan <- struct{}{} }
m.registerReplyCallback(msg.Tag, cb)
m.callbacks[msg.Tag] = map[uint64]ReceptionCallback{msg.ID: cb}
data, err := json.Marshal(msg)
if err != nil {
t.Fatalf("Failed to JSON marshal Message: %+v", err)
}
go func() {
select {
case <-cbChan:
case <-time.After(10 * time.Millisecond):
t.Error("Timed out waiting for callback to be called.")
}
}()
err = m.receiveMessage(data)
if err != nil {
t.Errorf("Failed to receive message: %+v", err)
}
}
// Tests that Manager.getNextID returns the expected ID for various Tags.
......@@ -56,11 +181,32 @@ func TestManager_getNextID(t *testing.T) {
// Javascript Call Wrappers //
////////////////////////////////////////////////////////////////////////////////
func TestManager_addEventListeners(t *testing.T) {
// Tests that newWorkerOptions returns a Javascript object with the expected
// type, credentials, and name fields.
func Test_newWorkerOptions(t *testing.T) {
for i, workerType := range []string{"classic", "module"} {
for j, credentials := range []string{"omit", "same-origin", "include"} {
for k, name := range []string{"name1", "name2", "name3"} {
opts := newWorkerOptions(workerType, credentials, name)
v := opts.Get("type").String()
if v != workerType {
t.Errorf("Unexpected type (%d, %d, %d)."+
"\nexpected: %s\nreceived: %s", i, j, k, workerType, v)
}
func TestManager_postMessage(t *testing.T) {
v = opts.Get("credentials").String()
if v != credentials {
t.Errorf("Unexpected credentials (%d, %d, %d)."+
"\nexpected: %s\nreceived: %s", i, j, k, credentials, v)
}
func Test_newWorkerOptions(t *testing.T) {
v = opts.Get("name").String()
if v != name {
t.Errorf("Unexpected name (%d, %d, %d)."+
"\nexpected: %s\nreceived: %s", i, j, k, name, v)
}
}
}
}
}
......@@ -9,9 +9,9 @@
package worker
// message is the outer message that contains the contents of each message sent
// Message is the outer message that contains the contents of each message sent
// to the worker. It is transmitted as JSON.
type message struct {
type Message struct {
Tag Tag `json:"tag"`
ID uint64 `json:"id"`
DeleteCB bool `json:"deleteCB"`
......
......@@ -35,15 +35,20 @@ type ThreadManager struct {
// name describes the worker. It is used for debugging and logging purposes.
name string
// messageLogging determines if debug message logs should be printed every
// time a message is sent/received to/from the worker.
messageLogging bool
mux sync.Mutex
}
// NewThreadManager initialises a new ThreadManager.
func NewThreadManager(name string) *ThreadManager {
func NewThreadManager(name string, messageLogging bool) *ThreadManager {
mh := &ThreadManager{
messages: make(chan js.Value, 100),
callbacks: make(map[Tag]ThreadReceptionCallback),
name: name,
messageLogging: messageLogging,
}
mh.addEventListeners()
......@@ -60,14 +65,17 @@ func (tm *ThreadManager) SignalReady() {
// SendMessage sends a message to the main thread for the given tag.
func (tm *ThreadManager) SendMessage(tag Tag, data []byte) {
msg := message{
msg := Message{
Tag: tag,
ID: initID,
DeleteCB: false,
Data: data,
}
if tm.messageLogging {
jww.DEBUG.Printf("[WW] [%s] Worker sending message for %q with data: %s",
tm.name, tag, data)
}
payload, err := json.Marshal(msg)
if err != nil {
......@@ -81,14 +89,17 @@ func (tm *ThreadManager) SendMessage(tag Tag, data []byte) {
// sendResponse sends a reply to the main thread with the given tag and ID.
func (tm *ThreadManager) sendResponse(
tag Tag, id uint64, data []byte) {
msg := message{
msg := Message{
Tag: tag,
ID: id,
DeleteCB: true,
Data: data,
}
jww.DEBUG.Printf("[WW] [%s] Worker sending reply for %q and ID %d with "+
"data: %s", tm.name, tag, id, data)
if tm.messageLogging {
jww.DEBUG.Printf("[WW] [%s] Worker sending reply for %q and ID %d "+
"with data: %s", tm.name, tag, id, data)
}
payload, err := json.Marshal(msg)
if err != nil {
......@@ -103,13 +114,16 @@ func (tm *ThreadManager) sendResponse(
// everytime a message from the main thread is received. If the registered
// callback returns a response, it is sent to the main thread.
func (tm *ThreadManager) receiveMessage(data []byte) error {
var msg message
var msg Message
err := json.Unmarshal(data, &msg)
if err != nil {
return err
}
jww.DEBUG.Printf("[WW] [%s] Worker received message for %q and ID %d with "+
"data: %s", tm.name, msg.Tag, msg.ID, msg.Data)
if tm.messageLogging {
jww.DEBUG.Printf("[WW] [%s] Worker received message for %q and ID %d "+
"with data: %s", tm.name, msg.Tag, msg.ID, msg.Data)
}
tm.mux.Lock()
callback, exists := tm.callbacks[msg.Tag]
......@@ -167,20 +181,31 @@ func (tm *ThreadManager) addEventListeners() {
return nil
})
// Create listener for when an error event is fired on the worker. This
// occurs when an error occurs in the worker.
// Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/error_event
errorEvent := js.FuncOf(func(_ js.Value, args []js.Value) any {
event := args[0]
jww.ERROR.Printf("[WW] [%s] Worker received error event: %s",
tm.name, utils.JsErrorToJson(event))
return nil
})
// Create listener for when a messageerror event is fired on the worker.
// This occurs when it receives a message that cannot be deserialized.
// Doc: https://developer.mozilla.org/en-US/docs/Web/API/Worker/messageerror_event
messageError := js.FuncOf(func(_ js.Value, args []js.Value) any {
messageerrorEvent := js.FuncOf(func(_ js.Value, args []js.Value) any {
event := args[0]
jww.ERROR.Printf("[WW] [%s] Worker received error message from main "+
"thread: %s", tm.name, utils.JsToJson(event))
jww.ERROR.Printf("[WW] [%s] Worker received message error event: %s",
tm.name, utils.JsErrorToJson(event))
return nil
})
// Register each event listener on the worker using addEventListener
// Doc: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget/addEventListener
js.Global().Call("addEventListener", "message", messageEvent)
js.Global().Call("addEventListener", "messageerror", messageError)
js.Global().Call("addEventListener", "error", errorEvent)
js.Global().Call("addEventListener", "messageerror", messageerrorEvent)
}
// postMessage sends a message from this worker to the main WASM thread.
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
//go:build js && wasm
package worker
import (
"encoding/json"
"testing"
"time"
)
// Tests that ThreadManager.receiveMessage calls the expected callback.
func TestThreadManager_receiveMessage(t *testing.T) {
tm := &ThreadManager{callbacks: make(map[Tag]ThreadReceptionCallback)}
msg := Message{Tag: readyTag, ID: 5}
cbChan := make(chan struct{})
cb := func([]byte) ([]byte, error) { cbChan <- struct{}{}; return nil, nil }
tm.callbacks[msg.Tag] = cb
data, err := json.Marshal(msg)
if err != nil {
t.Fatalf("Failed to JSON marshal Message: %+v", err)
}
go func() {
select {
case <-cbChan:
case <-time.After(10 * time.Millisecond):
t.Error("Timed out waiting for callback to be called.")
}
}()
err = tm.receiveMessage(data)
if err != nil {
t.Errorf("Failed to receive message: %+v", err)
}
}
// Tests that ThreadManager.RegisterCallback registers a callback that is then
// called by ThreadManager.receiveMessage.
func TestThreadManager_RegisterCallback(t *testing.T) {
tm := &ThreadManager{callbacks: make(map[Tag]ThreadReceptionCallback)}
msg := Message{Tag: readyTag, ID: 5}
cbChan := make(chan struct{})
cb := func([]byte) ([]byte, error) { cbChan <- struct{}{}; return nil, nil }
tm.RegisterCallback(msg.Tag, cb)
data, err := json.Marshal(msg)
if err != nil {
t.Fatalf("Failed to JSON marshal Message: %+v", err)
}
go func() {
select {
case <-cbChan:
case <-time.After(10 * time.Millisecond):
t.Error("Timed out waiting for callback to be called.")
}
}()
err = tm.receiveMessage(data)
if err != nil {
t.Errorf("Failed to receive message: %+v", err)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment