Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • elixxir/xxdk-wasm
1 result
Select Git revision
Show changes
Showing
with 1160 additions and 794 deletions
......@@ -7,56 +7,60 @@
//go:build js && wasm
package utils
package main
import (
"encoding/json"
"github.com/hack-pad/go-indexeddb/idb"
"github.com/pkg/errors"
"gitlab.com/elixxir/wasm-utils/utils"
"gitlab.com/elixxir/xxdk-wasm/indexedDb/impl"
"syscall/js"
)
// CopyBytesToGo copies the [Uint8Array] stored in the [js.Value] to []byte.
// This is a wrapper for [js.CopyBytesToGo] to make it more convenient.
func CopyBytesToGo(src js.Value) []byte {
b := make([]byte, src.Length())
js.CopyBytesToGo(b, src)
return b
// stateModel implements [ClientState] interface backed by IndexedDb.
// NOTE: This model is NOT thread safe - it is the responsibility of the
// caller to ensure that its methods are called sequentially.
type stateModel struct {
db *idb.Database
}
// CopyBytesToJS copies the []byte to a [Uint8Array] stored in a [js.Value].
// This is a wrapper for [js.CopyBytesToJS] to make it more convenient.
func CopyBytesToJS(src []byte) js.Value {
dst := Uint8Array.New(len(src))
js.CopyBytesToJS(dst, src)
return dst
func (s *stateModel) Get(key string) ([]byte, error) {
result, err := impl.Get(s.db, stateStoreName, js.ValueOf(key))
if err != nil {
return nil, err
}
// JsToJson converts the Javascript value to JSON.
func JsToJson(value js.Value) string {
if value.IsUndefined() {
return "null"
stateObj := &State{}
err = json.Unmarshal([]byte(utils.JsToJson(result)), stateObj)
if err != nil {
return nil, err
}
return JSON.Call("stringify", value).String()
return stateObj.Value, err
}
// JsonToJS converts a JSON bytes input to a [js.Value] of the object subtype.
func JsonToJS(inputJson []byte) (js.Value, error) {
var jsObj map[string]any
err := json.Unmarshal(inputJson, &jsObj)
if err != nil {
return js.ValueOf(nil), err
func (s *stateModel) Set(key string, value []byte) error {
state := &State{
Id: key,
Value: value,
}
return js.ValueOf(jsObj), nil
// Convert to jsObject
newStateJSON, err := json.Marshal(state)
if err != nil {
return errors.Errorf("Unable to marshal State: %+v", err)
}
// JsErrorToJson converts the Javascript error to JSON. This should be used for
// all Javascript error objects instead of JsonToJS.
func JsErrorToJson(value js.Value) string {
if value.IsUndefined() {
return "null"
stateObj, err := utils.JsonToJS(newStateJSON)
if err != nil {
return errors.Errorf("Unable to marshal State: %+v", err)
}
properties := Object.Call("getOwnPropertyNames", value)
return JSON.Call("stringify", value, properties).String()
// Store State to database
_, err = impl.Put(s.db, stateStoreName, stateObj)
if err != nil {
return errors.Errorf("Unable to put State: %+v\n%s",
err, newStateJSON)
}
return nil
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
//go:build js && wasm
package main
import (
"github.com/hack-pad/go-indexeddb/idb"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/v4/storage/utility"
"gitlab.com/elixxir/xxdk-wasm/indexedDb/impl"
"syscall/js"
)
// currentVersion is the current version of the IndexedDb runtime. Used for
// migration purposes.
const currentVersion uint = 1
// NewState returns a [utility.WebState] backed by IndexedDb.
// The name should be a base64 encoding of the users public key.
func NewState(databaseName string) (utility.WebState, error) {
return newState(databaseName)
}
// newState creates the given [idb.Database] and returns a stateModel.
func newState(databaseName string) (*stateModel, error) {
// Attempt to open database object
ctx, cancel := impl.NewContext()
defer cancel()
openRequest, err := idb.Global().Open(ctx, databaseName, currentVersion,
func(db *idb.Database, oldVersion, newVersion uint) error {
if oldVersion == newVersion {
jww.INFO.Printf("IndexDb version for %s is current: v%d",
databaseName, newVersion)
return nil
}
jww.INFO.Printf("IndexDb upgrade required for %s: v%d -> v%d",
databaseName, oldVersion, newVersion)
if oldVersion == 0 && newVersion >= 1 {
err := v1Upgrade(db)
if err != nil {
return err
}
oldVersion = 1
}
// if oldVersion == 1 && newVersion >= 2 { v2Upgrade(), oldVersion = 2 }
return nil
})
if err != nil {
return nil, err
}
// Wait for database open to finish
db, err := openRequest.Await(ctx)
if err != nil {
return nil, err
} else if ctx.Err() != nil {
return nil, ctx.Err()
}
wrapper := &stateModel{db: db}
return wrapper, nil
}
// v1Upgrade performs the v0 -> v1 database upgrade.
//
// This can never be changed without permanently breaking backwards
// compatibility.
func v1Upgrade(db *idb.Database) error {
storeOpts := idb.ObjectStoreOptions{
KeyPath: js.ValueOf(pkeyName),
AutoIncrement: false,
}
_, err := db.CreateObjectStore(stateStoreName, storeOpts)
return err
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
//go:build js && wasm
package main
import (
"fmt"
"os"
"syscall/js"
"github.com/spf13/cobra"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/xxdk-wasm/logging"
"gitlab.com/elixxir/xxdk-wasm/worker"
)
// SEMVER is the current semantic version of the xxDK web worker.
const SEMVER = "0.1.0"
func main() {
// Set to os.Args because the default is os.Args[1:] and in WASM, args start
// at 0, not 1.
channelsCmd.SetArgs(os.Args)
err := channelsCmd.Execute()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
var channelsCmd = &cobra.Command{
Use: "stateIndexedDbWorker",
Short: "IndexedDb database for state.",
Example: "const go = new Go();\ngo.argv = [\"--logLevel=1\"]",
Run: func(cmd *cobra.Command, args []string) {
// Start logger first to capture all logging events
err := logging.EnableLogging(logLevel, -1, 0, "", "")
if err != nil {
fmt.Printf("Failed to intialize logging: %+v", err)
os.Exit(1)
}
jww.INFO.Printf("xxDK state web worker version: v%s", SEMVER)
jww.INFO.Print("[WW] Starting xxDK WebAssembly State Database Worker.")
m := &manager{
wtm: worker.NewThreadManager("StateIndexedDbWorker", true),
}
m.registerCallbacks()
m.wtm.SignalReady()
// Indicate to the Javascript caller that the WASM is ready by resolving
// a promise created by the caller.
js.Global().Get("onWasmInitialized").Invoke()
<-make(chan bool)
fmt.Println("[WW] Closing xxDK WebAssembly State Database Worker.")
os.Exit(0)
},
}
var (
logLevel jww.Threshold
)
func init() {
// Initialize all startup flags
channelsCmd.Flags().IntVarP((*int)(&logLevel), "logLevel", "l", 2,
"Sets the log level output when outputting to the Javascript console. "+
"0 = TRACE, 1 = DEBUG, 2 = INFO, 3 = WARN, 4 = ERROR, "+
"5 = CRITICAL, 6 = FATAL, -1 = disabled.")
}
......@@ -7,36 +7,21 @@
//go:build js && wasm
package utils
package main
import (
"fmt"
"github.com/pkg/errors"
"testing"
)
// Tests that TestJsError returns a Javascript Error object with the expected
// message.
func TestJsError(t *testing.T) {
err := errors.New("test error")
expectedErr := err.Error()
jsError := JsError(err).Get("message").String()
const (
// Text representation of primary key value (keyPath).
pkeyName = "id"
if jsError != expectedErr {
t.Errorf("Failed to get expected error message."+
"\nexpected: %s\nreceived: %s", expectedErr, jsError)
}
}
// Text representation of the names of the various [idb.ObjectStore].
stateStoreName = "states"
)
// Tests that TestJsTrace returns a Javascript Error object with the expected
// message and stack trace.
func TestJsTrace(t *testing.T) {
err := errors.New("test error")
expectedErr := fmt.Sprintf("%+v", err)
jsError := JsTrace(err).Get("message").String()
// State defines the IndexedDb representation of a single KV data store.
type State struct {
// Id is a unique identifier for a given State.
Id string `json:"id"` // Matches pkeyName
if jsError != expectedErr {
t.Errorf("Failed to get expected error message."+
"\nexpected: %s\nreceived: %s", expectedErr, jsError)
}
// Value stores the data contents of the State.
Value []byte `json:"value"`
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
importScripts('wasm_exec.js');
const isReady = new Promise((resolve) => {
self.onWasmInitialized = resolve;
});
const go = new Go();
const binPath = 'xxdk-stateIndexedDkWorker.wasm'
WebAssembly.instantiateStreaming(fetch(binPath), go.importObject).then(async (result) => {
go.run(result.instance);
await isReady;
}).catch((err) => {
console.error(err);
});
\ No newline at end of file
......@@ -18,7 +18,7 @@ import (
"github.com/hack-pad/go-indexeddb/idb"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/xxdk-wasm/utils"
"gitlab.com/elixxir/wasm-utils/utils"
"syscall/js"
"time"
)
......@@ -30,9 +30,6 @@ const (
// ErrDoesNotExist is an error string for got undefined on Get operations.
ErrDoesNotExist = "result is undefined"
// ErrUniqueConstraint is an error string for failed uniqueness inserts.
ErrUniqueConstraint = "at least one key does not satisfy the uniqueness requirements"
)
// NewContext builds a context for indexedDb operations.
......@@ -45,6 +42,31 @@ func EncodeBytes(input []byte) js.Value {
return js.ValueOf(base64.StdEncoding.EncodeToString(input))
}
// SendRequest is a wrapper for the request.Await() method providing a timeout.
func SendRequest(request *idb.Request) (js.Value, error) {
ctx, cancel := NewContext()
defer cancel()
result, err := request.Await(ctx)
if err != nil {
return js.Undefined(), err
} else if ctx.Err() != nil {
return js.Undefined(), ctx.Err()
}
return result, nil
}
// SendCursorRequest is a wrapper for the cursorRequest.Await() method providing a timeout.
func SendCursorRequest(cur *idb.CursorWithValueRequest,
iterFunc func(cursor *idb.CursorWithValue) error) error {
ctx, cancel := NewContext()
defer cancel()
err := cur.Iter(ctx, iterFunc)
if ctx.Err() != nil {
return ctx.Err()
}
return err
}
// Get is a generic helper for getting values from the given [idb.ObjectStore].
// Only usable by primary key.
func Get(db *idb.Database, objectStoreName string, key js.Value) (js.Value, error) {
......@@ -62,17 +84,15 @@ func Get(db *idb.Database, objectStoreName string, key js.Value) (js.Value, erro
"Unable to get ObjectStore: %+v", err)
}
// Perform the operation
// Set up the operation
getRequest, err := store.Get(key)
if err != nil {
return js.Undefined(), errors.WithMessagef(parentErr,
"Unable to Get from ObjectStore: %+v", err)
}
// Wait for the operation to return
ctx, cancel := NewContext()
resultObj, err := getRequest.Await(ctx)
cancel()
// Perform the operation
resultObj, err := SendRequest(getRequest)
if err != nil {
return js.Undefined(), errors.WithMessagef(parentErr,
"Unable to get from ObjectStore: %+v", err)
......@@ -103,14 +123,15 @@ func GetAll(db *idb.Database, objectStoreName string) ([]js.Value, error) {
"Unable to get ObjectStore: %+v", err)
}
// Perform the operation
result := make([]js.Value, 0)
// Set up the operation
cursorRequest, err := store.OpenCursor(idb.CursorNext)
if err != nil {
return nil, errors.WithMessagef(parentErr, "Unable to open Cursor: %+v", err)
}
ctx, cancel := NewContext()
err = cursorRequest.Iter(ctx,
result := make([]js.Value, 0)
// Perform the operation
err = SendCursorRequest(cursorRequest,
func(cursor *idb.CursorWithValue) error {
row, err := cursor.Value()
if err != nil {
......@@ -119,7 +140,6 @@ func GetAll(db *idb.Database, objectStoreName string) ([]js.Value, error) {
result = append(result, row)
return nil
})
cancel()
if err != nil {
return nil, errors.WithMessagef(parentErr, err.Error())
}
......@@ -150,17 +170,15 @@ func GetIndex(db *idb.Database, objectStoreName,
"Unable to get Index: %+v", err)
}
// Perform the operation
// Set up the operation
getRequest, err := idx.Get(key)
if err != nil {
return js.Undefined(), errors.WithMessagef(parentErr,
"Unable to Get from ObjectStore: %+v", err)
}
// Wait for the operation to return
ctx, cancel := NewContext()
resultObj, err := getRequest.Await(ctx)
cancel()
// Perform the operation
resultObj, err := SendRequest(getRequest)
if err != nil {
return js.Undefined(), errors.WithMessagef(parentErr,
"Unable to get from ObjectStore: %+v", err)
......@@ -189,23 +207,21 @@ func Put(db *idb.Database, objectStoreName string, value js.Value) (js.Value, er
return js.Undefined(), errors.Errorf("Unable to get ObjectStore: %+v", err)
}
// Perform the operation
// Set up the operation
request, err := store.Put(value)
if err != nil {
return js.Undefined(), errors.Errorf("Unable to Put: %+v", err)
}
// Wait for the operation to return
ctx, cancel := NewContext()
result, err := request.Await(ctx)
cancel()
// Perform the operation
resultObj, err := SendRequest(request)
if err != nil {
return js.Undefined(), errors.Errorf("Putting value failed: %+v\n%s",
err, utils.JsToJson(value))
}
jww.DEBUG.Printf("Successfully put value in %s: %s",
objectStoreName, utils.JsToJson(value))
return result, nil
return resultObj, nil
}
// Delete is a generic helper for removing values from the given
......@@ -226,16 +242,14 @@ func Delete(db *idb.Database, objectStoreName string, key js.Value) error {
}
// Perform the operation
_, err = store.Delete(key)
deleteRequest, err := store.Delete(key)
if err != nil {
return errors.WithMessagef(parentErr,
"Unable to Delete from ObjectStore: %+v", err)
}
// Wait for the operation to return
ctx, cancel := NewContext()
err = txn.Await(ctx)
cancel()
// Perform the operation
_, err = SendRequest(deleteRequest.Request)
if err != nil {
return errors.WithMessagef(parentErr,
"Unable to Delete from ObjectStore: %+v", err)
......@@ -282,17 +296,18 @@ func Dump(db *idb.Database, objectStoreName string) ([]string, error) {
return nil, errors.WithMessagef(parentErr,
"Unable to get ObjectStore: %+v", err)
}
// Set up the operation
cursorRequest, err := store.OpenCursor(idb.CursorNext)
if err != nil {
return nil, errors.WithMessagef(parentErr,
"Unable to open Cursor: %+v", err)
}
// Run the query
jww.DEBUG.Printf("%s values:", objectStoreName)
results := make([]string, 0)
ctx, cancel := NewContext()
err = cursorRequest.Iter(ctx,
// Perform the operation
err = SendCursorRequest(cursorRequest,
func(cursor *idb.CursorWithValue) error {
value, err := cursor.Value()
if err != nil {
......@@ -303,7 +318,6 @@ func Dump(db *idb.Database, objectStoreName string) ([]string, error) {
jww.DEBUG.Printf("- %v", valueStr)
return nil
})
cancel()
if err != nil {
return nil, errors.WithMessagef(parentErr,
"Unable to dump ObjectStore: %+v", err)
......
......@@ -11,9 +11,11 @@ package impl
import (
"github.com/hack-pad/go-indexeddb/idb"
jww "github.com/spf13/jwalterweatherman"
"strings"
"syscall/js"
"testing"
"time"
)
// Error path: Tests that Get returns an error when trying to get a message that
......@@ -92,3 +94,52 @@ func newTestDB(name, index string, t *testing.T) *idb.Database {
return db
}
// TestBenchmark ensures IndexedDb can take at least n operations per second.
func TestBenchmark(t *testing.T) {
jww.SetStdoutThreshold(jww.LevelInfo)
benchmarkDb(50, t)
}
// benchmarkDb sends n operations to IndexedDb and prints errors.
func benchmarkDb(n int, t *testing.T) {
jww.INFO.Printf("Benchmarking IndexedDb: %d total.", n)
objectStoreName := "test"
testValue := js.ValueOf(make(map[string]interface{}))
db := newTestDB(objectStoreName, "index", t)
type metric struct {
didSucceed bool
duration time.Duration
}
done := make(chan metric)
// Spawn n operations at the same time
startTime := time.Now()
for i := 0; i < n; i++ {
go func() {
opStart := time.Now()
_, err := Put(db, objectStoreName, testValue)
done <- metric{
didSucceed: err == nil,
duration: time.Since(opStart),
}
}()
}
// Wait for all to complete
didSucceed := true
for i := 0; i < n; i++ {
result := <-done
if !result.didSucceed {
didSucceed = false
}
jww.DEBUG.Printf("Operation time: %s", result.duration)
}
timeElapsed := time.Since(startTime)
jww.INFO.Printf("Benchmarking complete. Succeeded: %t\n"+
"Took %s, Average of %s.",
didSucceed, timeElapsed, timeElapsed/time.Duration(n))
}
......@@ -452,8 +452,8 @@ func (w *wasmModel) DeleteMessage(messageID message.ID) error {
// MuteUserMessage is JSON marshalled and sent to the worker for
// [wasmModel.MuteUser].
type MuteUserMessage struct {
ChannelID *id.ID `json:"channelID"`
PubKey ed25519.PublicKey `json:"pubKey"`
ChannelID []byte `json:"channelID"`
PubKey []byte `json:"pubKey"`
Unmute bool `json:"unmute"`
}
......@@ -461,7 +461,7 @@ type MuteUserMessage struct {
func (w *wasmModel) MuteUser(
channelID *id.ID, pubKey ed25519.PublicKey, unmute bool) {
msg := MuteUserMessage{
ChannelID: channelID,
ChannelID: channelID.Marshal(),
PubKey: pubKey,
Unmute: unmute,
}
......
......@@ -10,47 +10,37 @@
package channels
import (
"crypto/ed25519"
"encoding/json"
"github.com/pkg/errors"
"time"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/v4/bindings"
"gitlab.com/elixxir/client/v4/channels"
cryptoChannel "gitlab.com/elixxir/crypto/channel"
"gitlab.com/elixxir/crypto/message"
"gitlab.com/elixxir/xxdk-wasm/storage"
"gitlab.com/elixxir/xxdk-wasm/worker"
"gitlab.com/xx_network/primitives/id"
)
// databaseSuffix is the suffix to be appended to the name of the database.
const databaseSuffix = "_speakeasy"
// MessageReceivedCallback is called any time a message is received or updated.
//
// update is true if the row is old and was edited.
type MessageReceivedCallback func(uuid uint64, channelID *id.ID, update bool)
// DeletedMessageCallback is called any time a message is deleted.
type DeletedMessageCallback func(messageID message.ID)
// MutedUserCallback is called any time a user is muted or unmuted. unmute is
// true if the user has been unmuted and false if they have been muted.
type MutedUserCallback func(
channelID *id.ID, pubKey ed25519.PublicKey, unmute bool)
// eventUpdateCallback is the [bindings.ChannelUICallback] callback function
// it has a type ([bindings.NickNameUpdate] to [bindings.MessageDeleted]
// and json data that is the callback information.
type eventUpdateCallback func(eventType int64, jsonData []byte)
// NewWASMEventModelBuilder returns an EventModelBuilder which allows
// the channel manager to define the path but the callback is the same
// across the board.
func NewWASMEventModelBuilder(wasmJsPath string,
encryption cryptoChannel.Cipher, messageReceivedCB MessageReceivedCallback,
deletedMessageCB DeletedMessageCallback,
mutedUserCB MutedUserCallback) channels.EventModelBuilder {
encryption cryptoChannel.Cipher,
channelCbs bindings.ChannelUICallbacks) channels.EventModelBuilder {
fn := func(path string) (channels.EventModel, error) {
return NewWASMEventModel(path, wasmJsPath, encryption,
messageReceivedCB, deletedMessageCB, mutedUserCB)
channelCbs)
}
return fn
}
......@@ -65,8 +55,7 @@ type NewWASMEventModelMessage struct {
// NewWASMEventModel returns a [channels.EventModel] backed by a wasmModel.
// The name should be a base64 encoding of the users public key.
func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher,
messageReceivedCB MessageReceivedCallback,
deletedMessageCB DeletedMessageCallback, mutedUserCB MutedUserCallback) (
channelCbs bindings.ChannelUICallbacks) (
channels.EventModel, error) {
databaseName := path + databaseSuffix
......@@ -75,17 +64,9 @@ func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher,
return nil, err
}
// Register handler to manage messages for the MessageReceivedCallback
wm.RegisterCallback(MessageReceivedCallbackTag,
messageReceivedCallbackHandler(messageReceivedCB))
// Register handler to manage messages for the DeletedMessageCallback
wm.RegisterCallback(DeletedMessageCallbackTag,
deletedMessageCallbackHandler(deletedMessageCB))
// Register handler to manage messages for the MutedUserCallback
wm.RegisterCallback(MutedUserCallbackTag,
mutedUserCallbackHandler(mutedUserCB))
// Register handler to manage messages for the EventUpdate
wm.RegisterCallback(EventUpdateCallbackTag,
messageReceivedCallbackHandler(channelCbs.EventUpdate))
// Store the database name
err = storage.StoreIndexedDb(databaseName)
......@@ -132,49 +113,18 @@ func NewWASMEventModel(path, wasmJsPath string, encryption cryptoChannel.Cipher,
return &wasmModel{wm}, nil
}
// MessageReceivedCallbackMessage is JSON marshalled and received from the
// worker for the [MessageReceivedCallback] callback.
type MessageReceivedCallbackMessage struct {
UUID uint64 `json:"uuid"`
ChannelID *id.ID `json:"channelID"`
Update bool `json:"update"`
// EventUpdateCallbackMessage is JSON marshalled and received from the worker
// for the [EventUpdate] callback.
type EventUpdateCallbackMessage struct {
EventType int64 `json:"eventType"`
JsonData []byte `json:"jsonData"`
}
// messageReceivedCallbackHandler returns a handler to manage messages for the
// MessageReceivedCallback.
func messageReceivedCallbackHandler(cb MessageReceivedCallback) func(data []byte) {
return func(data []byte) {
var msg MessageReceivedCallbackMessage
err := json.Unmarshal(data, &msg)
if err != nil {
jww.ERROR.Printf(
"Failed to JSON unmarshal %T from worker: %+v", msg, err)
return
}
cb(msg.UUID, msg.ChannelID, msg.Update)
}
}
// deletedMessageCallbackHandler returns a handler to manage messages for the
// DeletedMessageCallback.
func deletedMessageCallbackHandler(cb DeletedMessageCallback) func(data []byte) {
return func(data []byte) {
messageID, err := message.UnmarshalID(data)
if err != nil {
jww.ERROR.Printf(
"Failed to JSON unmarshal message ID from worker: %+v", err)
}
cb(messageID)
}
}
// mutedUserCallbackHandler returns a handler to manage messages for the
// MutedUserCallback.
func mutedUserCallbackHandler(cb MutedUserCallback) func(data []byte) {
func messageReceivedCallbackHandler(cb eventUpdateCallback) func(data []byte) {
return func(data []byte) {
var msg MuteUserMessage
var msg EventUpdateCallbackMessage
err := json.Unmarshal(data, &msg)
if err != nil {
jww.ERROR.Printf(
......@@ -182,7 +132,7 @@ func mutedUserCallbackHandler(cb MutedUserCallback) func(data []byte) {
return
}
cb(msg.ChannelID, msg.PubKey, msg.Unmute)
cb(msg.EventType, msg.JsonData)
}
}
......
......@@ -15,9 +15,7 @@ import "gitlab.com/elixxir/xxdk-wasm/worker"
// to receive a message.
const (
NewWASMEventModelTag worker.Tag = "NewWASMEventModel"
MessageReceivedCallbackTag worker.Tag = "MessageReceivedCallback"
DeletedMessageCallbackTag worker.Tag = "DeletedMessageCallback"
MutedUserCallbackTag worker.Tag = "MutedUserCallback"
EventUpdateCallbackTag worker.Tag = "EventUpdateCallback"
JoinChannelTag worker.Tag = "JoinChannel"
LeaveChannelTag worker.Tag = "LeaveChannel"
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
//go:build js && wasm
package dm
import (
"encoding/json"
"github.com/pkg/errors"
"time"
"gitlab.com/elixxir/xxdk-wasm/worker"
)
type wasmModel struct {
wh *worker.Manager
}
// TransferMessage is JSON marshalled and sent to the worker.
type TransferMessage struct {
Key string `json:"key"`
Value []byte `json:"value"`
Error string `json:"error"`
}
func (w *wasmModel) Set(key string, value []byte) error {
msg := TransferMessage{
Key: key,
Value: value,
}
data, err := json.Marshal(msg)
if err != nil {
return errors.Errorf(
"Could not JSON marshal payload for TransferMessage: %+v", err)
}
resultChan := make(chan []byte)
w.wh.SendMessage(SetTag, data,
func(data []byte) {
resultChan <- data
})
select {
case result := <-resultChan:
return errors.New(string(result))
case <-time.After(worker.ResponseTimeout):
return errors.Errorf("Timed out after %s waiting for response from the "+
"worker about Get", worker.ResponseTimeout)
}
}
func (w *wasmModel) Get(key string) ([]byte, error) {
resultChan := make(chan []byte)
w.wh.SendMessage(GetTag, []byte(key),
func(data []byte) {
resultChan <- data
})
select {
case result := <-resultChan:
var msg TransferMessage
err := json.Unmarshal(result, &msg)
if err != nil {
return nil, errors.Errorf(
"failed to JSON unmarshal %T from main thread: %+v", msg, err)
}
if len(msg.Error) > 0 {
return nil, errors.New(msg.Error)
}
return msg.Value, nil
case <-time.After(worker.ResponseTimeout):
return nil, errors.Errorf("Timed out after %s waiting for response from the "+
"worker about Get", worker.ResponseTimeout)
}
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
//go:build js && wasm
package dm
import (
"encoding/json"
"time"
"github.com/pkg/errors"
"gitlab.com/elixxir/client/v4/storage/utility"
"gitlab.com/elixxir/xxdk-wasm/storage"
"gitlab.com/elixxir/xxdk-wasm/worker"
)
// databaseSuffix is the suffix to be appended to the name of the database.
const databaseSuffix = "_speakeasy_state"
// NewStateMessage is JSON marshalled and sent to the worker for
// [NewState].
type NewStateMessage struct {
DatabaseName string `json:"databaseName"`
}
// NewState returns a [utility.WebState] backed by indexeddb.
// The name should be a base64 encoding of the users public key.
func NewState(path, wasmJsPath string) (utility.WebState, error) {
databaseName := path + databaseSuffix
wh, err := worker.NewManager(wasmJsPath, "stateIndexedDb", true)
if err != nil {
return nil, err
}
// Store the database name
err = storage.StoreIndexedDb(databaseName)
if err != nil {
return nil, err
}
msg := NewStateMessage{
DatabaseName: databaseName,
}
payload, err := json.Marshal(msg)
if err != nil {
return nil, err
}
dataChan := make(chan []byte)
wh.SendMessage(NewStateTag, payload,
func(data []byte) { dataChan <- data })
select {
case data := <-dataChan:
if len(data) > 0 {
return nil, errors.New(string(data))
}
case <-time.After(worker.ResponseTimeout):
return nil, errors.Errorf("timed out after %s waiting for indexedDB "+
"database in worker to initialize", worker.ResponseTimeout)
}
return &wasmModel{wh}, nil
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
//go:build js && wasm
package dm
import "gitlab.com/elixxir/xxdk-wasm/worker"
// List of tags that can be used when sending a message or registering a handler
// to receive a message.
const (
NewStateTag worker.Tag = "NewState"
SetTag worker.Tag = "Set"
GetTag worker.Tag = "Get"
)
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
//go:build js && wasm
package logging
import (
"io"
"math"
"github.com/armon/circbuf"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/xxdk-wasm/worker"
)
// fileLogger manages the recording of jwalterweatherman logs to the local
// in-memory file buffer.
type fileLogger struct {
threshold jww.Threshold
cb *circbuf.Buffer
}
// newFileLogger starts logging to a local, in-memory log file at the specified
// threshold. Returns a [fileLogger] that can be used to get the log file.
func newFileLogger(threshold jww.Threshold, maxLogFileSize int) (*fileLogger, error) {
b, err := circbuf.NewBuffer(int64(maxLogFileSize))
if err != nil {
return nil, errors.Wrap(err, "could not create new circular buffer")
}
fl := &fileLogger{
threshold: threshold,
cb: b,
}
jww.FEEDBACK.Printf("[LOG] Outputting log to file of max size %d at level %s",
b.Size(), fl.threshold)
logger = fl
return fl, nil
}
// Write adheres to the io.Writer interface and writes log entries to the
// buffer.
func (fl *fileLogger) Write(p []byte) (n int, err error) {
return fl.cb.Write(p)
}
// Listen adheres to the [jwalterweatherman.LogListener] type and returns the
// log writer when the threshold is within the set threshold limit.
func (fl *fileLogger) Listen(threshold jww.Threshold) io.Writer {
if threshold < fl.threshold {
return nil
}
return fl
}
// StopLogging stops log message writes. Once logging is stopped, it cannot be
// resumed and the log file cannot be recovered.
func (fl *fileLogger) StopLogging() {
fl.threshold = math.MaxInt
fl.cb.Reset()
}
// GetFile returns the entire log file.
func (fl *fileLogger) GetFile() []byte {
return fl.cb.Bytes()
}
// Threshold returns the log level threshold used in the file.
func (fl *fileLogger) Threshold() jww.Threshold {
return fl.threshold
}
// MaxSize returns the max size, in bytes, that the log file is allowed to be.
func (fl *fileLogger) MaxSize() int {
return int(fl.cb.Size())
}
// Size returns the current size, in bytes, written to the log file.
func (fl *fileLogger) Size() int {
return int(fl.cb.TotalWritten())
}
// Worker returns nil.
func (fl *fileLogger) Worker() *worker.Manager {
return nil
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
//go:build js && wasm
package logging
import (
"bytes"
"github.com/armon/circbuf"
jww "github.com/spf13/jwalterweatherman"
"math/rand"
"reflect"
"testing"
)
func Test_newFileLogger(t *testing.T) {
expected := &fileLogger{
threshold: jww.LevelError,
}
expected.cb, _ = circbuf.NewBuffer(512)
fl, err := newFileLogger(expected.threshold, int(expected.cb.Size()))
if err != nil {
t.Fatalf("Failed to make new fileLogger: %+v", err)
}
if !reflect.DeepEqual(expected, fl) {
t.Errorf("Unexpected new fileLogger.\nexpected: %+v\nreceived: %+v",
expected, fl)
}
if !reflect.DeepEqual(logger, fl) {
t.Errorf("Failed to set logger.\nexpected: %+v\nreceived: %+v",
logger, fl)
}
}
// Tests that fileLogger.Write writes the expected data to the buffer and that
// when the max file size is reached, old data is replaced.
func Test_fileLogger_Write(t *testing.T) {
rng := rand.New(rand.NewSource(3424))
fl, err := newFileLogger(jww.LevelError, 512)
if err != nil {
t.Fatalf("Failed to make new fileLogger: %+v", err)
}
expected := make([]byte, fl.MaxSize())
rng.Read(expected)
n, err := fl.Write(expected)
if err != nil {
t.Fatalf("Failed to write: %+v", err)
} else if n != len(expected) {
t.Fatalf("Did not write expected length.\nexpected: %d\nreceived: %d",
len(expected), n)
}
if !bytes.Equal(fl.cb.Bytes(), expected) {
t.Fatalf("Incorrect bytes in buffer.\nexpected: %v\nreceived: %v",
expected, fl.cb.Bytes())
}
// Check that the data is overwritten
rng.Read(expected)
n, err = fl.Write(expected)
if err != nil {
t.Fatalf("Failed to write: %+v", err)
} else if n != len(expected) {
t.Fatalf("Did not write expected length.\nexpected: %d\nreceived: %d",
len(expected), n)
}
if !bytes.Equal(fl.cb.Bytes(), expected) {
t.Fatalf("Incorrect bytes in buffer.\nexpected: %v\nreceived: %v",
expected, fl.cb.Bytes())
}
}
// Tests that fileLogger.Listen only returns an io.Writer for valid thresholds.
func Test_fileLogger_Listen(t *testing.T) {
th := jww.LevelError
fl, err := newFileLogger(th, 512)
if err != nil {
t.Fatalf("Failed to make new fileLogger: %+v", err)
}
thresholds := []jww.Threshold{-1, jww.LevelTrace, jww.LevelDebug,
jww.LevelFatal, jww.LevelWarn, jww.LevelError, jww.LevelCritical,
jww.LevelFatal}
for _, threshold := range thresholds {
w := fl.Listen(threshold)
if threshold < th {
if w != nil {
t.Errorf("Did not receive nil io.Writer for level %s: %+v",
threshold, w)
}
} else if w == nil {
t.Errorf("Received nil io.Writer for level %s", threshold)
}
}
}
// Tests that fileLogger.Listen always returns nil after fileLogger.StopLogging
// is called.
func Test_fileLogger_StopLogging(t *testing.T) {
fl, err := newFileLogger(jww.LevelError, 512)
if err != nil {
t.Fatalf("Failed to make new fileLogger: %+v", err)
}
fl.StopLogging()
if w := fl.Listen(jww.LevelFatal); w != nil {
t.Errorf("Listen returned non-nil io.Writer when logging should have "+
"been stopped: %+v", w)
}
file := fl.GetFile()
if !bytes.Equal([]byte{}, file) {
t.Errorf("Did not receice empty file: %+v", file)
}
}
// Tests that fileLogger.GetFile returns the expected file.
func Test_fileLogger_GetFile(t *testing.T) {
rng := rand.New(rand.NewSource(9863))
fl, err := newFileLogger(jww.LevelError, 512)
if err != nil {
t.Fatalf("Failed to make new fileLogger: %+v", err)
}
var expected []byte
for i := 0; i < 5; i++ {
p := make([]byte, rng.Intn(64))
rng.Read(p)
expected = append(expected, p...)
if _, err = fl.Write(p); err != nil {
t.Errorf("Write %d failed: %+v", i, err)
}
}
file := fl.GetFile()
if !bytes.Equal(expected, file) {
t.Errorf("Unexpected file.\nexpected: %v\nreceived: %v", expected, file)
}
}
// Tests that fileLogger.Threshold returns the expected threshold.
func Test_fileLogger_Threshold(t *testing.T) {
thresholds := []jww.Threshold{-1, jww.LevelTrace, jww.LevelDebug,
jww.LevelFatal, jww.LevelWarn, jww.LevelError, jww.LevelCritical,
jww.LevelFatal}
for _, threshold := range thresholds {
fl, err := newFileLogger(threshold, 512)
if err != nil {
t.Fatalf("Failed to make new fileLogger: %+v", err)
}
if fl.Threshold() != threshold {
t.Errorf("Incorrect threshold.\nexpected: %s (%d)\nreceived: %s (%d)",
threshold, threshold, fl.Threshold(), fl.Threshold())
}
}
}
// Unit test of fileLogger.MaxSize.
func Test_fileLogger_MaxSize(t *testing.T) {
maxSize := 512
fl, err := newFileLogger(jww.LevelError, maxSize)
if err != nil {
t.Fatalf("Failed to make new fileLogger: %+v", err)
}
if fl.MaxSize() != maxSize {
t.Errorf("Incorrect max size.\nexpected: %d\nreceived: %d",
maxSize, fl.MaxSize())
}
}
// Unit test of fileLogger.Size.
func Test_fileLogger_Size(t *testing.T) {
rng := rand.New(rand.NewSource(9863))
fl, err := newFileLogger(jww.LevelError, 512)
if err != nil {
t.Fatalf("Failed to make new fileLogger: %+v", err)
}
var expected []byte
for i := 0; i < 5; i++ {
p := make([]byte, rng.Intn(64))
rng.Read(p)
expected = append(expected, p...)
if _, err = fl.Write(p); err != nil {
t.Errorf("Write %d failed: %+v", i, err)
}
size := fl.Size()
if size != len(expected) {
t.Errorf("Incorrect size (%d).\nexpected: %d\nreceived: %d",
i, len(expected), size)
}
}
file := fl.GetFile()
if !bytes.Equal(expected, file) {
t.Errorf("Unexpected file.\nexpected: %v\nreceived: %v", expected, file)
}
}
// Tests that fileLogger.Worker always returns nil.
func Test_fileLogger_Worker(t *testing.T) {
fl, err := newFileLogger(jww.LevelError, 512)
if err != nil {
t.Fatalf("Failed to make new fileLogger: %+v", err)
}
w := fl.Worker()
if w != nil {
t.Errorf("Did not get nil worker: %+v", w)
}
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
//go:build js && wasm
package logging
import (
"fmt"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/xxdk-wasm/utils"
"log"
"syscall/js"
)
// LogLevel sets level of logging. All logs at the set level and below will be
// displayed (e.g., when log level is ERROR, only ERROR, CRITICAL, and FATAL
// messages will be printed).
//
// The default log level without updates is INFO.
func LogLevel(threshold jww.Threshold) error {
if threshold < jww.LevelTrace || threshold > jww.LevelFatal {
return errors.Errorf("log level is not valid: log level: %d", threshold)
}
jww.SetLogThreshold(threshold)
jww.SetFlags(log.LstdFlags | log.Lmicroseconds)
ll := NewJsConsoleLogListener(threshold)
AddLogListener(ll.Listen)
jww.SetStdoutThreshold(jww.LevelFatal + 1)
msg := fmt.Sprintf("Log level set to: %s", threshold)
switch threshold {
case jww.LevelTrace:
fallthrough
case jww.LevelDebug:
fallthrough
case jww.LevelInfo:
jww.INFO.Print(msg)
case jww.LevelWarn:
jww.WARN.Print(msg)
case jww.LevelError:
jww.ERROR.Print(msg)
case jww.LevelCritical:
jww.CRITICAL.Print(msg)
case jww.LevelFatal:
jww.FATAL.Print(msg)
}
return nil
}
// LogLevelJS sets level of logging. All logs at the set level and below will be
// displayed (e.g., when log level is ERROR, only ERROR, CRITICAL, and FATAL
// messages will be printed).
//
// Log level options:
//
// TRACE - 0
// DEBUG - 1
// INFO - 2
// WARN - 3
// ERROR - 4
// CRITICAL - 5
// FATAL - 6
//
// The default log level without updates is INFO.
//
// Parameters:
// - args[0] - Log level (int).
//
// Returns:
// - Throws TypeError if the log level is invalid.
func LogLevelJS(_ js.Value, args []js.Value) any {
threshold := jww.Threshold(args[0].Int())
err := LogLevel(threshold)
if err != nil {
utils.Throw(utils.TypeError, err)
return nil
}
return nil
}
......@@ -10,29 +10,13 @@
package logging
import (
"encoding/binary"
"encoding/json"
"fmt"
"github.com/armon/circbuf"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/xxdk-wasm/utils"
"gitlab.com/elixxir/xxdk-wasm/worker"
"io"
"strconv"
"sync/atomic"
"syscall/js"
"time"
)
const (
// DefaultInitThreshold is the log threshold used for the initial log before
// any logging options is set.
DefaultInitThreshold = jww.LevelTrace
jww "github.com/spf13/jwalterweatherman"
// logListenerChanSize is the size of the listener channel that stores log
// messages before they are written.
logListenerChanSize = 3000
"gitlab.com/elixxir/wasm-utils/utils"
"gitlab.com/elixxir/xxdk-wasm/worker"
)
// List of tags that can be used when sending a message or registering a handler
......@@ -46,346 +30,79 @@ const (
)
// logger is the global that all jwalterweatherman logging is sent to.
var logger *Logger
// Logger manages the recording of jwalterweatherman logs. It can write logs to
// a local, in-memory buffer or to an external worker.
type Logger struct {
threshold jww.Threshold
maxLogFileSize int
logListenerID uint64
listenChan chan []byte
mode atomic.Uint32
processQuit chan struct{}
cb *circbuf.Buffer
wm *worker.Manager
}
// InitLogger initializes the logger. Include this in the init function in main.
func InitLogger() *Logger {
logger = NewLogger()
return logger
}
var logger Logger
// GetLogger returns the Logger object, used to manager where logging is
// recorded.
func GetLogger() *Logger {
func GetLogger() Logger {
return logger
}
// NewLogger creates a new Logger that begins storing the first
// DefaultInitThreshold log entries. If either the log file or log worker is
// enabled, then these logs are redirected to the set destination. If the
// channel fills up with no log recorder enabled, then the listener is disabled.
func NewLogger() *Logger {
lf := newLogger()
// Add the log listener
lf.logListenerID = AddLogListener(lf.Listen)
type Logger interface {
// StopLogging stops log message writes. Once logging is stopped, it cannot
// be resumed and the log file cannot be recovered.
StopLogging()
jww.INFO.Printf("[LOG] Enabled initial log file listener in %s with ID %d "+
"at threshold %s that can store %d entries",
lf.getMode(), lf.logListenerID, lf.Threshold(), cap(lf.listenChan))
return lf
}
// GetFile returns the entire log file.
GetFile() []byte
// newLogger initialises a Logger without adding it as a log listener.
func newLogger() *Logger {
lf := &Logger{
threshold: DefaultInitThreshold,
listenChan: make(chan []byte, logListenerChanSize),
mode: atomic.Uint32{},
processQuit: make(chan struct{}),
}
lf.setMode(initMode)
// Threshold returns the log level threshold used in the file.
Threshold() jww.Threshold
return lf
}
// MaxSize returns the maximum size, in bytes, of the log file before it
// rolls over and starts overwriting the oldest entries
MaxSize() int
// LogToFile starts logging to a local, in-memory log file.
func (l *Logger) LogToFile(threshold jww.Threshold, maxLogFileSize int) error {
err := l.prepare(threshold, maxLogFileSize, fileMode)
if err != nil {
return err
}
// Size returns the number of bytes written to the log file.
Size() int
b, err := circbuf.NewBuffer(int64(maxLogFileSize))
if err != nil {
return err
}
l.cb = b
sendLog := func(p []byte) {
if n, err2 := l.cb.Write(p); err2 != nil {
jww.ERROR.Printf(
"[LOG] Error writing log to circular buffer: %+v", err2)
} else if n != len(p) {
jww.ERROR.Printf(
"[LOG] Wrote %d bytes when %d bytes expected", n, len(p))
// Worker returns the manager for the Javascript Worker object. If the
// worker has not been initialized, it returns nil.
Worker() *worker.Manager
}
}
go l.processLog(workerMode, sendLog, l.processQuit)
return nil
}
// EnableLogging enables logging to the Javascript console and to a local or
// worker file buffer. This must be called only once at initialisation.
func EnableLogging(logLevel, fileLogLevel jww.Threshold, maxLogFileSizeMB int,
workerScriptURL, workerName string) error {
// LogToFileWorker starts a new worker that begins listening for logs and
// writing them to file. This function blocks until the worker has started.
func (l *Logger) LogToFileWorker(threshold jww.Threshold, maxLogFileSize int,
wasmJsPath, workerName string) error {
err := l.prepare(threshold, maxLogFileSize, workerMode)
if err != nil {
return err
var listeners []jww.LogListener
if logLevel > -1 {
// Overwrites setting the log level to INFO done in bindings so that the
// Javascript console can be used
ll := NewJsConsoleLogListener(logLevel)
listeners = append(listeners, ll.Listen)
jww.SetStdoutThreshold(jww.LevelFatal + 1)
jww.FEEDBACK.Printf("[LOG] Log level for console set to %s", logLevel)
} else {
jww.FEEDBACK.Print("[LOG] Disabling logging to console.")
}
// Create new worker manager, which will start the worker and wait until
// communication has been established
wm, err := worker.NewManager(wasmJsPath, workerName, false)
if fileLogLevel > -1 {
maxLogFileSize := maxLogFileSizeMB * 1_000_000
if workerScriptURL == "" {
fl, err := newFileLogger(fileLogLevel, maxLogFileSize)
if err != nil {
return err
return errors.Wrap(err, "could not initialize logging to file")
}
l.wm = wm
// Register the callback used by the Javascript to request the log file.
// This prevents an error print when GetFileExtTag is not registered.
l.wm.RegisterCallback(GetFileExtTag, func([]byte) {
jww.DEBUG.Print("[LOG] Received file requested from external " +
"Javascript. Ignoring file.")
})
data, err := json.Marshal(l.maxLogFileSize)
if err != nil {
return err
}
// Send message to initialize the log file listener
errChan := make(chan error)
l.wm.SendMessage(NewLogFileTag, data, func(data []byte) {
if len(data) > 0 {
errChan <- errors.New(string(data))
listeners = append(listeners, fl.Listen)
} else {
errChan <- nil
}
})
// Wait for worker to respond
select {
case err = <-errChan:
wl, err := newWorkerLogger(
fileLogLevel, maxLogFileSize, workerScriptURL, workerName)
if err != nil {
return err
}
case <-time.After(worker.ResponseTimeout):
return errors.Errorf("timed out after %s waiting for new log "+
"file in worker to initialize", worker.ResponseTimeout)
}
jww.INFO.Printf("[LOG] Initialized log to file web worker %s.", workerName)
sendLog := func(p []byte) { l.wm.SendMessage(WriteLogTag, p, nil) }
go l.processLog(workerMode, sendLog, l.processQuit)
return nil
}
// processLog processes the log messages sent to the listener channel and sends
// them to the appropriate recorder.
func (l *Logger) processLog(m mode, sendLog func(p []byte), quit chan struct{}) {
jww.INFO.Printf("[LOG] Starting log file processing thread in %s.", m)
for {
select {
case <-quit:
jww.INFO.Printf("[LOG] Stopping log file processing thread.")
return
case p := <-l.listenChan:
go sendLog(p)
}
}
}
// prepare sets the threshold, maxLogFileSize, and mode of the logger and
// prints a log message indicating this information.
func (l *Logger) prepare(
threshold jww.Threshold, maxLogFileSize int, m mode) error {
if m := l.getMode(); m != initMode {
return errors.Errorf("log already set to %s", m)
} else if threshold < jww.LevelTrace || threshold > jww.LevelFatal {
return errors.Errorf("log level of %d is invalid", threshold)
}
l.threshold = threshold
l.maxLogFileSize = maxLogFileSize
l.setMode(m)
msg := fmt.Sprintf("[LOG] Outputting log to file in %s of max size %d "+
"with level %s", m, l.MaxSize(), l.Threshold())
switch l.Threshold() {
case jww.LevelTrace:
fallthrough
case jww.LevelDebug:
fallthrough
case jww.LevelInfo:
jww.INFO.Print(msg)
case jww.LevelWarn:
jww.WARN.Print(msg)
case jww.LevelError:
jww.ERROR.Print(msg)
case jww.LevelCritical:
jww.CRITICAL.Print(msg)
case jww.LevelFatal:
jww.FATAL.Print(msg)
}
return nil
}
// StopLogging stops the logging of log messages and disables the log listener.
// If the log worker is running, it is terminated. Once logging is stopped, it
// cannot be resumed the log file cannot be recovered.
func (l *Logger) StopLogging() {
jww.DEBUG.Printf("[LOG] Removing log listener with ID %d", l.logListenerID)
RemoveLogListener(l.logListenerID)
switch l.getMode() {
case workerMode:
l.wm.Stop()
jww.DEBUG.Printf("[LOG] Terminated log worker.")
case fileMode:
jww.DEBUG.Printf("[LOG] Reset circular buffer.")
l.cb.Reset()
}
select {
case l.processQuit <- struct{}{}:
jww.DEBUG.Printf("[LOG] Sent quit channel to log process.")
default:
jww.DEBUG.Printf("[LOG] Failed to stop log processes.")
}
}
// GetFile returns the entire log file.
//
// If the log file is listening locally, it returns it from the local buffer. If
// it is listening from the worker, it blocks until the file is returned.
func (l *Logger) GetFile() []byte {
switch l.getMode() {
case fileMode:
return l.cb.Bytes()
case workerMode:
fileChan := make(chan []byte)
l.wm.SendMessage(GetFileTag, nil, func(data []byte) { fileChan <- data })
select {
case file := <-fileChan:
return file
case <-time.After(worker.ResponseTimeout):
jww.FATAL.Panicf("[LOG] Timed out after %s waiting for log "+
"file from worker", worker.ResponseTimeout)
return nil
}
default:
return nil
}
}
// Threshold returns the log level threshold used in the file.
func (l *Logger) Threshold() jww.Threshold {
return l.threshold
return errors.Wrap(err, "could not initialize logging to worker file")
}
// MaxSize returns the max size, in bytes, that the log file is allowed to be.
func (l *Logger) MaxSize() int {
return l.maxLogFileSize
listeners = append(listeners, wl.Listen)
}
// Size returns the current size, in bytes, written to the log file.
//
// If the log file is listening locally, it returns it from the local buffer. If
// it is listening from the worker, it blocks until the size is returned.
func (l *Logger) Size() int {
switch l.getMode() {
case fileMode:
return int(l.cb.Size())
case workerMode:
sizeChan := make(chan []byte)
l.wm.SendMessage(SizeTag, nil, func(data []byte) { sizeChan <- data })
select {
case data := <-sizeChan:
return int(jww.Threshold(binary.LittleEndian.Uint64(data)))
case <-time.After(worker.ResponseTimeout):
jww.FATAL.Panicf("[LOG] Timed out after %s waiting for log "+
"file size from worker", worker.ResponseTimeout)
return 0
}
default:
return 0
js.Global().Set("GetLogger", js.FuncOf(GetLoggerJS))
}
}
////////////////////////////////////////////////////////////////////////////////
// JWW Listener //
////////////////////////////////////////////////////////////////////////////////
jww.SetLogListeners(listeners...)
// Listen is called for every logging event. This function adheres to the
// [jwalterweatherman.LogListener] type.
func (l *Logger) Listen(t jww.Threshold) io.Writer {
if t < l.threshold {
return nil
}
return l
}
// Write sends the bytes to the listener channel. It always returns the length
// of p and a nil error. This function adheres to the io.Writer interface.
func (l *Logger) Write(p []byte) (n int, err error) {
select {
case l.listenChan <- append([]byte{}, p...):
default:
jww.ERROR.Printf(
"[LOG] Logger channel filled. Log file recording stopping.")
l.StopLogging()
return 0, errors.Errorf(
"Logger channel filled. Log file recording stopping.")
}
return len(p), nil
}
////////////////////////////////////////////////////////////////////////////////
// Log File Mode //
////////////////////////////////////////////////////////////////////////////////
// mode represents the state of the Logger.
type mode uint32
const (
initMode mode = iota
fileMode
workerMode
)
func (l *Logger) setMode(m mode) { l.mode.Store(uint32(m)) }
func (l *Logger) getMode() mode { return mode(l.mode.Load()) }
// String returns a human-readable representation of the mode for logging and
// debugging. This function adheres to the fmt.Stringer interface.
func (m mode) String() string {
switch m {
case initMode:
return "uninitialized mode"
case fileMode:
return "file mode"
case workerMode:
return "worker mode"
default:
return "invalid mode: " + strconv.Itoa(int(m))
}
}
////////////////////////////////////////////////////////////////////////////////
// Javascript Bindings //
////////////////////////////////////////////////////////////////////////////////
......@@ -396,142 +113,98 @@ func (m mode) String() string {
// Returns:
// - A Javascript representation of the [Logger] object.
func GetLoggerJS(js.Value, []js.Value) any {
return newLoggerJS(GetLogger())
// l := GetLogger()
// if l != nil {
// return newLoggerJS(LoggerJS{GetLogger()})
// }
// return js.Null()
return newLoggerJS(LoggerJS{GetLogger()})
}
type LoggerJS struct {
api Logger
}
// newLoggerJS creates a new Javascript compatible object (map[string]any) that
// matches the [Logger] structure.
func newLoggerJS(lfw *Logger) map[string]any {
func newLoggerJS(l LoggerJS) map[string]any {
logFileWorker := map[string]any{
"LogToFile": js.FuncOf(lfw.LogToFileJS),
"LogToFileWorker": js.FuncOf(lfw.LogToFileWorkerJS),
"StopLogging": js.FuncOf(lfw.StopLoggingJS),
"GetFile": js.FuncOf(lfw.GetFileJS),
"Threshold": js.FuncOf(lfw.ThresholdJS),
"MaxSize": js.FuncOf(lfw.MaxSizeJS),
"Size": js.FuncOf(lfw.SizeJS),
"Worker": js.FuncOf(lfw.WorkerJS),
"StopLogging": js.FuncOf(l.StopLogging),
"GetFile": js.FuncOf(l.GetFile),
"Threshold": js.FuncOf(l.Threshold),
"MaxSize": js.FuncOf(l.MaxSize),
"Size": js.FuncOf(l.Size),
"Worker": js.FuncOf(l.Worker),
}
return logFileWorker
}
// LogToFileJS starts logging to a local, in-memory log file.
//
// Parameters:
// - args[0] - Log level (int).
// - args[1] - Max log file size, in bytes (int).
//
// Returns:
// - Throws a TypeError if starting the log file fails.
func (l *Logger) LogToFileJS(_ js.Value, args []js.Value) any {
threshold := jww.Threshold(args[0].Int())
maxLogFileSize := args[1].Int()
err := l.LogToFile(threshold, maxLogFileSize)
if err != nil {
utils.Throw(utils.TypeError, err)
return nil
}
return nil
}
// LogToFileWorkerJS starts a new worker that begins listening for logs and
// writing them to file. This function blocks until the worker has started.
//
// Parameters:
// - args[0] - Log level (int).
// - args[1] - Max log file size, in bytes (int).
// - args[2] - Path to Javascript start file for the worker WASM (string).
// - args[3] - Name of the worker (used in logs) (string).
//
// Returns a promise:
// - Resolves to nothing on success (void).
// - Rejected with an error if starting the worker fails.
func (l *Logger) LogToFileWorkerJS(_ js.Value, args []js.Value) any {
threshold := jww.Threshold(args[0].Int())
maxLogFileSize := args[1].Int()
wasmJsPath := args[2].String()
workerName := args[3].String()
promiseFn := func(resolve, reject func(args ...any) js.Value) {
err := l.LogToFileWorker(
threshold, maxLogFileSize, wasmJsPath, workerName)
if err != nil {
reject(utils.JsTrace(err))
} else {
resolve()
}
}
return utils.CreatePromise(promiseFn)
}
// StopLoggingJS stops the logging of log messages and disables the log
// StopLogging stops the logging of log messages and disables the log
// listener. If the log worker is running, it is terminated. Once logging is
// stopped, it cannot be resumed the log file cannot be recovered.
func (l *Logger) StopLoggingJS(js.Value, []js.Value) any {
l.StopLogging()
func (l *LoggerJS) StopLogging(js.Value, []js.Value) any {
l.api.StopLogging()
return nil
}
// GetFileJS returns the entire log file.
// GetFile returns the entire log file.
//
// If the log file is listening locally, it returns it from the local buffer. If
// it is listening from the worker, it blocks until the file is returned.
//
// Returns a promise:
// - Resolves to the log file contents (string).
func (l *Logger) GetFileJS(js.Value, []js.Value) any {
func (l *LoggerJS) GetFile(js.Value, []js.Value) any {
promiseFn := func(resolve, _ func(args ...any) js.Value) {
resolve(string(l.GetFile()))
resolve(string(l.api.GetFile()))
}
return utils.CreatePromise(promiseFn)
}
// ThresholdJS returns the log level threshold used in the file.
// Threshold returns the log level threshold used in the file.
//
// Returns:
// - Log level (int).
func (l *Logger) ThresholdJS(js.Value, []js.Value) any {
return int(l.Threshold())
func (l *LoggerJS) Threshold(js.Value, []js.Value) any {
return int(l.api.Threshold())
}
// MaxSizeJS returns the max size, in bytes, that the log file is allowed to be.
// MaxSize returns the max size, in bytes, that the log file is allowed to be.
//
// Returns:
// - Max file size (int).
func (l *Logger) MaxSizeJS(js.Value, []js.Value) any {
return l.MaxSize()
func (l *LoggerJS) MaxSize(js.Value, []js.Value) any {
return l.api.MaxSize()
}
// SizeJS returns the current size, in bytes, written to the log file.
// Size returns the current size, in bytes, written to the log file.
//
// If the log file is listening locally, it returns it from the local buffer. If
// it is listening from the worker, it blocks until the size is returned.
//
// Returns a promise:
// - Resolves to the current file size (int).
func (l *Logger) SizeJS(js.Value, []js.Value) any {
func (l *LoggerJS) Size(js.Value, []js.Value) any {
promiseFn := func(resolve, _ func(args ...any) js.Value) {
resolve(l.Size())
resolve(l.api.Size())
}
return utils.CreatePromise(promiseFn)
}
// WorkerJS returns the web worker object.
// Worker returns the web worker object.
//
// Returns:
// - Javascript worker object. If the worker has not been initialized, it
// returns null.
func (l *Logger) WorkerJS(js.Value, []js.Value) any {
if l.getMode() == workerMode {
return l.wm.GetWorker()
func (l *LoggerJS) Worker(js.Value, []js.Value) any {
wm := l.api.Worker()
if wm == nil {
return js.Null()
}
return js.Null()
return wm.GetWorker()
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
//go:build js && wasm
package logging
import (
"bytes"
"fmt"
jww "github.com/spf13/jwalterweatherman"
"testing"
)
// Tests InitLogger
func TestInitLogger(t *testing.T) {
}
// Tests GetLogger
func TestGetLogger(t *testing.T) {
}
// Tests NewLogger
func TestNewLogger(t *testing.T) {
}
// Tests Logger.LogToFile
func TestLogger_LogToFile(t *testing.T) {
jww.SetStdoutThreshold(jww.LevelTrace)
l := NewLogger()
err := l.LogToFile(jww.LevelTrace, 50000000)
if err != nil {
t.Fatalf("Failed to LogToFile: %+v", err)
}
jww.INFO.Printf("test")
file := l.cb.Bytes()
fmt.Printf("file:----------------------------\n%s\n---------------------------------\n", file)
}
// Tests Logger.LogToFileWorker
func TestLogger_LogToFileWorker(t *testing.T) {
}
// Tests Logger.processLog
func TestLogger_processLog(t *testing.T) {
}
// Tests Logger.prepare
func TestLogger_prepare(t *testing.T) {
}
// Tests Logger.StopLogging
func TestLogger_StopLogging(t *testing.T) {
}
// Tests Logger.GetFile
func TestLogger_GetFile(t *testing.T) {
}
// Tests Logger.Threshold
func TestLogger_Threshold(t *testing.T) {
}
// Tests Logger.MaxSize
func TestLogger_MaxSize(t *testing.T) {
}
// Tests Logger.Size
func TestLogger_Size(t *testing.T) {
}
// Tests Logger.Listen
func TestLogger_Listen(t *testing.T) {
// l := newLogger()
}
// Tests that Logger.Write can fill the listenChan channel completely and that
// all messages are received in the order they were added.
func TestLogger_Write(t *testing.T) {
l := newLogger()
expectedLogs := make([][]byte, logListenerChanSize)
for i := range expectedLogs {
p := []byte(
fmt.Sprintf("Log message %d of %d.", i+1, logListenerChanSize))
expectedLogs[i] = p
n, err := l.Listen(jww.LevelError).Write(p)
if err != nil {
t.Errorf("Received impossible error (%d): %+v", i, err)
} else if n != len(p) {
t.Errorf("Received incorrect bytes written (%d)."+
"\nexpected: %d\nreceived: %d", i, len(p), n)
}
}
for i, expected := range expectedLogs {
select {
case received := <-l.listenChan:
if !bytes.Equal(expected, received) {
t.Errorf("Received unexpected meessage (%d)."+
"\nexpected: %q\nreceived: %q", i, expected, received)
}
default:
t.Errorf("Failed to read from channel.")
}
}
}
// Error path: Tests that Logger.Write returns an error when the listener
// channel is full.
func TestLogger_Write_ChannelFilledError(t *testing.T) {
l := newLogger()
expectedLogs := make([][]byte, logListenerChanSize)
for i := range expectedLogs {
p := []byte(
fmt.Sprintf("Log message %d of %d.", i+1, logListenerChanSize))
expectedLogs[i] = p
n, err := l.Listen(jww.LevelError).Write(p)
if err != nil {
t.Errorf("Received impossible error (%d): %+v", i, err)
} else if n != len(p) {
t.Errorf("Received incorrect bytes written (%d)."+
"\nexpected: %d\nreceived: %d", i, len(p), n)
}
}
_, err := l.Write([]byte("test"))
if err == nil {
t.Error("Failed to receive error when the chanel should be full.")
}
}
// Tests that Logger.getMode gets the same value set with Logger.setMode.
func TestLogger_setMode_getMode(t *testing.T) {
l := newLogger()
for i, m := range []mode{initMode, fileMode, workerMode, 12} {
l.setMode(m)
received := l.getMode()
if m != received {
t.Errorf("Received wrong mode (%d).\nexpected: %s\nreceived: %s",
i, m, received)
}
}
}
// Unit test of mode.String.
func Test_mode_String(t *testing.T) {
for m, expected := range map[mode]string{
initMode: "uninitialized mode",
fileMode: "file mode",
workerMode: "worker mode",
12: "invalid mode: 12",
} {
s := m.String()
if s != expected {
t.Errorf("Wrong string for mode %d.\nexpected: %s\nreceived: %s",
m, expected, s)
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
//go:build js && wasm
package logging
import (
"encoding/binary"
"encoding/json"
"io"
"math"
"time"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/xxdk-wasm/worker"
)
// TODO: add ability to import worker so that multiple threads can send logs: https://stackoverflow.com/questions/8343781/how-to-do-worker-to-worker-communication
// workerLogger manages the recording of jwalterweatherman logs to the in-memory
// file buffer in a remote Worker thread.
type workerLogger struct {
threshold jww.Threshold
maxLogFileSize int
wm *worker.Manager
}
// newWorkerLogger starts logging to an in-memory log file in a remote Worker
// at the specified threshold. Returns a [workerLogger] that can be used to get
// the log file.
func newWorkerLogger(threshold jww.Threshold, maxLogFileSize int,
wasmJsPath, workerName string) (*workerLogger, error) {
// Create new worker manager, which will start the worker and wait until
// communication has been established
wm, err := worker.NewManager(wasmJsPath, workerName, false)
if err != nil {
return nil, err
}
wl := &workerLogger{
threshold: threshold,
maxLogFileSize: maxLogFileSize,
wm: wm,
}
// Register the callback used by the Javascript to request the log file.
// This prevents an error print when GetFileExtTag is not registered.
wl.wm.RegisterCallback(GetFileExtTag, func([]byte) {
jww.DEBUG.Print("[LOG] Received file requested from external " +
"Javascript. Ignoring file.")
})
data, err := json.Marshal(wl.maxLogFileSize)
if err != nil {
return nil, err
}
// Send message to initialize the log file listener
errChan := make(chan error)
wl.wm.SendMessage(NewLogFileTag, data, func(data []byte) {
if len(data) > 0 {
errChan <- errors.New(string(data))
} else {
errChan <- nil
}
})
// Wait for worker to respond
select {
case err = <-errChan:
if err != nil {
return nil, err
}
case <-time.After(worker.ResponseTimeout):
return nil, errors.Errorf("timed out after %s waiting for new log "+
"file in worker to initialize", worker.ResponseTimeout)
}
jww.FEEDBACK.Printf("[LOG] Outputting log to file of max size %d at level "+
"%s using web worker %s", wl.maxLogFileSize, wl.threshold, workerName)
logger = wl
return wl, nil
}
// Write adheres to the io.Writer interface and sends the log entries to the
// worker to be added to the file buffer. Always returns the length of p and
// nil. All errors are printed to the log.
func (wl *workerLogger) Write(p []byte) (n int, err error) {
wl.wm.SendMessage(WriteLogTag, p, nil)
return len(p), nil
}
// Listen adheres to the [jwalterweatherman.LogListener] type and returns the
// log writer when the threshold is within the set threshold limit.
func (wl *workerLogger) Listen(threshold jww.Threshold) io.Writer {
if threshold < wl.threshold {
return nil
}
return wl
}
// StopLogging stops log message writes and terminates the worker. Once logging
// is stopped, it cannot be resumed and the log file cannot be recovered.
func (wl *workerLogger) StopLogging() {
wl.threshold = math.MaxInt
wl.wm.Stop()
jww.DEBUG.Printf("[LOG] Terminated log worker.")
}
// GetFile returns the entire log file.
func (wl *workerLogger) GetFile() []byte {
fileChan := make(chan []byte)
wl.wm.SendMessage(GetFileTag, nil, func(data []byte) { fileChan <- data })
select {
case file := <-fileChan:
return file
case <-time.After(worker.ResponseTimeout):
jww.FATAL.Panicf("[LOG] Timed out after %s waiting for log "+
"file from worker", worker.ResponseTimeout)
return nil
}
}
// Threshold returns the log level threshold used in the file.
func (wl *workerLogger) Threshold() jww.Threshold {
return wl.threshold
}
// MaxSize returns the max size, in bytes, that the log file is allowed to be.
func (wl *workerLogger) MaxSize() int {
return wl.maxLogFileSize
}
// Size returns the number of bytes written to the log file.
func (wl *workerLogger) Size() int {
sizeChan := make(chan []byte)
wl.wm.SendMessage(SizeTag, nil, func(data []byte) { sizeChan <- data })
select {
case data := <-sizeChan:
return int(binary.LittleEndian.Uint64(data))
case <-time.After(worker.ResponseTimeout):
jww.FATAL.Panicf("[LOG] Timed out after %s waiting for log "+
"file size from worker", worker.ResponseTimeout)
return 0
}
}
// Worker returns the manager for the Javascript Worker object.
func (wl *workerLogger) Worker() *worker.Manager {
return wl.wm
}
......@@ -7,11 +7,15 @@
importScripts('wasm_exec.js');
const isReady = new Promise((resolve) => {
self.onWasmInitialized = resolve;
});
const go = new Go();
const binPath = 'xxdk-logFileWorker.wasm'
WebAssembly.instantiateStreaming(fetch(binPath), go.importObject).then((result) => {
WebAssembly.instantiateStreaming(fetch(binPath), go.importObject).then(async (result) => {
go.run(result.instance);
LogLevel(1);
await isReady;
}).catch((err) => {
console.error(err);
});
\ No newline at end of file