Skip to content
Snippets Groups Projects

fix for latest client release

Merged Jake Taylor requested to merge release into master
Compare and Show latest version
9 files
+ 409
45
Compare changes
  • Side-by-side
  • Inline
Files
9
@@ -7,19 +7,16 @@
//go:build js && wasm
package channels
package main
import (
"crypto/ed25519"
"encoding/base64"
"encoding/json"
"strconv"
"strings"
"sync"
"syscall/js"
"time"
"gitlab.com/elixxir/xxdk-wasm/indexedDb"
"github.com/hack-pad/go-indexeddb/idb"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
@@ -29,6 +26,8 @@ import (
cryptoBroadcast "gitlab.com/elixxir/crypto/broadcast"
cryptoChannel "gitlab.com/elixxir/crypto/channel"
"gitlab.com/elixxir/crypto/message"
"gitlab.com/elixxir/xxdk-wasm/indexedDb/impl"
wChannels "gitlab.com/elixxir/xxdk-wasm/indexedDb/worker/channels"
"gitlab.com/elixxir/xxdk-wasm/utils"
"gitlab.com/xx_network/primitives/id"
)
@@ -36,13 +35,14 @@ import (
// wasmModel implements [channels.EventModel] interface, which uses the channels
// system passed an object that adheres to in order to get events on the
// channel.
// NOTE: This model is NOT thread safe - it is the responsibility of the
// caller to ensure that its methods are called sequentially.
type wasmModel struct {
db *idb.Database
cipher cryptoChannel.Cipher
receivedMessageCB MessageReceivedCallback
deletedMessageCB DeletedMessageCallback
mutedUserCB MutedUserCallback
updateMux sync.Mutex
receivedMessageCB wChannels.MessageReceivedCallback
deletedMessageCB wChannels.DeletedMessageCallback
mutedUserCB wChannels.MutedUserCallback
}
// JoinChannel is called whenever a channel is joined locally.
@@ -70,7 +70,7 @@ func (w *wasmModel) JoinChannel(channel *cryptoBroadcast.Channel) {
return
}
_, err = indexedDb.Put(w.db, channelsStoreName, channelObj)
_, err = impl.Put(w.db, channelStoreName, channelObj)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to put Channel: %+v", err))
@@ -82,8 +82,7 @@ func (w *wasmModel) LeaveChannel(channelID *id.ID) {
parentErr := errors.New("failed to LeaveChannel")
// Delete the channel from storage
err := indexedDb.Delete(w.db, channelsStoreName,
js.ValueOf(channelID.String()))
err := impl.Delete(w.db, channelStoreName, js.ValueOf(channelID.String()))
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to delete Channel: %+v", err))
@@ -122,20 +121,19 @@ func (w *wasmModel) deleteMsgByChannel(channelID *id.ID) error {
"Unable to get Index: %+v", err)
}
// Perform the operation
channelIdStr := base64.StdEncoding.EncodeToString(channelID.Marshal())
keyRange, err := idb.NewKeyRangeOnly(js.ValueOf(channelIdStr))
// Set up the operation
keyRange, err := idb.NewKeyRangeOnly(impl.EncodeBytes(channelID.Marshal()))
cursorRequest, err := index.OpenCursorRange(keyRange, idb.CursorNext)
if err != nil {
return errors.WithMessagef(parentErr, "Unable to open Cursor: %+v", err)
}
ctx, cancel := indexedDb.NewContext()
err = cursorRequest.Iter(ctx,
// Perform the operation
err = impl.SendCursorRequest(cursorRequest,
func(cursor *idb.CursorWithValue) error {
_, err := cursor.Delete()
return err
})
cancel()
if err != nil {
return errors.WithMessagef(parentErr,
"Unable to delete Message data: %+v", err)
@@ -168,9 +166,10 @@ func (w *wasmModel) ReceiveMessage(channelID *id.ID, messageID message.ID,
textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType,
false, hidden, status)
uuid, err := w.receiveHelper(msgToInsert, false)
uuid, err := w.upsertMessage(msgToInsert)
if err != nil {
jww.ERROR.Printf("Failed to receive Message: %+v", err)
return 0
}
go w.receivedMessageCB(uuid, channelID, false)
@@ -204,10 +203,10 @@ func (w *wasmModel) ReceiveReply(channelID *id.ID, messageID,
replyTo.Bytes(), nickname, textBytes, pubKey, dmToken, codeset,
timestamp, lease, round.ID, mType, hidden, false, status)
uuid, err := w.receiveHelper(msgToInsert, false)
uuid, err := w.upsertMessage(msgToInsert)
if err != nil {
jww.ERROR.Printf("Failed to receive reply: %+v", err)
return 0
}
go w.receivedMessageCB(uuid, channelID, false)
return uuid
@@ -241,9 +240,10 @@ func (w *wasmModel) ReceiveReaction(channelID *id.ID, messageID,
textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType,
false, hidden, status)
uuid, err := w.receiveHelper(msgToInsert, false)
uuid, err := w.upsertMessage(msgToInsert)
if err != nil {
jww.ERROR.Printf("Failed to receive reaction: %+v", err)
return 0
}
go w.receivedMessageCB(uuid, channelID, false)
return uuid
@@ -254,34 +254,38 @@ func (w *wasmModel) ReceiveReaction(channelID *id.ID, messageID,
// messageID, timestamp, round, pinned, and hidden are all nillable and may be
// updated based upon the UUID at a later date. If a nil value is passed, then
// make no update.
//
// Returns an error if the message cannot be updated. It must return
// channels.NoMessageErr if the message does not exist.
func (w *wasmModel) UpdateFromUUID(uuid uint64, messageID *message.ID,
timestamp *time.Time, round *rounds.Round, pinned, hidden *bool,
status *channels.SentStatus) {
parentErr := errors.New("failed to UpdateFromUUID")
// FIXME: this is a bit of race condition without the mux.
// This should be done via the transactions (i.e., make a
// special version of receiveHelper)
w.updateMux.Lock()
defer w.updateMux.Unlock()
status *channels.SentStatus) error {
parentErr := "failed to UpdateFromUUID"
// Convert messageID to the key generated by json.Marshal
key := js.ValueOf(uuid)
// Use the key to get the existing Message
currentMsg, err := indexedDb.Get(w.db, messageStoreName, key)
msgObj, err := impl.Get(w.db, messageStoreName, key)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to get message: %+v", err))
return
if strings.Contains(err.Error(), impl.ErrDoesNotExist) {
return errors.WithMessage(channels.NoMessageErr, parentErr)
}
return errors.WithMessage(err, parentErr)
}
currentMsg, err := valueToMessage(msgObj)
if err != nil {
return errors.WithMessagef(err,
"%s Failed to marshal Message", parentErr)
}
_, err = w.updateMessage(utils.JsToJson(currentMsg), messageID, timestamp,
_, err = w.updateMessage(currentMsg, messageID, timestamp,
round, pinned, hidden, status)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to updateMessage: %+v", err))
return errors.WithMessage(err, parentErr)
}
return nil
}
// UpdateFromMessageID is called whenever a message with the message ID is
@@ -293,79 +297,34 @@ func (w *wasmModel) UpdateFromUUID(uuid uint64, messageID *message.ID,
// timestamp, round, pinned, and hidden are all nillable and may be updated
// based upon the UUID at a later date. If a nil value is passed, then make
// no update.
//
// Returns an error if the message cannot be updated. It must return
// channels.NoMessageErr if the message does not exist.
func (w *wasmModel) UpdateFromMessageID(messageID message.ID,
timestamp *time.Time, round *rounds.Round, pinned, hidden *bool,
status *channels.SentStatus) uint64 {
parentErr := errors.New("failed to UpdateFromMessageID")
// FIXME: this is a bit of race condition without the mux.
// This should be done via the transactions (i.e., make a
// special version of receiveHelper)
w.updateMux.Lock()
defer w.updateMux.Unlock()
msgIDStr := base64.StdEncoding.EncodeToString(messageID.Marshal())
currentMsgObj, err := indexedDb.GetIndex(w.db, messageStoreName,
messageStoreMessageIndex, js.ValueOf(msgIDStr))
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Failed to get message by index: %+v", err))
return 0
}
currentMsg := utils.JsToJson(currentMsgObj)
uuid, err := w.updateMessage(currentMsg, &messageID, timestamp,
round, pinned, hidden, status)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to updateMessage: %+v", err))
}
return uuid
}
// updateMessage is a helper for updating a stored message.
func (w *wasmModel) updateMessage(currentMsgJson string, messageID *message.ID,
timestamp *time.Time, round *rounds.Round, pinned, hidden *bool,
status *channels.SentStatus) (uint64, error) {
parentErr := "failed to UpdateFromMessageID"
newMessage := &Message{}
err := json.Unmarshal([]byte(currentMsgJson), newMessage)
msgObj, err := impl.GetIndex(w.db, messageStoreName,
messageStoreMessageIndex, impl.EncodeBytes(messageID.Marshal()))
if err != nil {
return 0, err
}
if status != nil {
newMessage.Status = uint8(*status)
}
if messageID != nil {
newMessage.MessageID = messageID.Bytes()
}
if round != nil {
newMessage.Round = uint64(round.ID)
}
if timestamp != nil {
newMessage.Timestamp = *timestamp
}
if pinned != nil {
newMessage.Pinned = *pinned
if strings.Contains(err.Error(), impl.ErrDoesNotExist) {
return 0, errors.WithMessage(channels.NoMessageErr, parentErr)
}
return 0, errors.WithMessage(err, parentErr)
}
if hidden != nil {
newMessage.Hidden = *hidden
currentMsg, err := valueToMessage(msgObj)
if err != nil {
return 0, errors.WithMessagef(err,
"%s Failed to marshal Message", parentErr)
}
// Store the updated Message
uuid, err := w.receiveHelper(newMessage, true)
uuid, err := w.updateMessage(currentMsg, &messageID, timestamp,
round, pinned, hidden, status)
if err != nil {
return 0, err
return 0, errors.WithMessage(err, parentErr)
}
channelID := &id.ID{}
copy(channelID[:], newMessage.ChannelID)
go w.receivedMessageCB(uuid, channelID, true)
return uuid, nil
}
@@ -386,7 +345,7 @@ func buildMessage(channelID, messageID, parentID []byte, nickname string,
ChannelID: channelID,
ParentMessageID: parentID,
Timestamp: timestamp,
Lease: lease,
Lease: strconv.FormatInt(int64(lease), 10),
Status: uint8(status),
Hidden: hidden,
Pinned: pinned,
@@ -400,11 +359,51 @@ func buildMessage(channelID, messageID, parentID []byte, nickname string,
}
}
// receiveHelper is a private helper for receiving any sort of message.
func (w *wasmModel) receiveHelper(
newMessage *Message, isUpdate bool) (uint64, error) {
// updateMessage is a helper for updating a stored message.
func (w *wasmModel) updateMessage(currentMsg *Message, messageID *message.ID,
timestamp *time.Time, round *rounds.Round, pinned, hidden *bool,
status *channels.SentStatus) (uint64, error) {
if status != nil {
currentMsg.Status = uint8(*status)
}
if messageID != nil {
currentMsg.MessageID = messageID.Bytes()
}
if round != nil {
currentMsg.Round = uint64(round.ID)
}
if timestamp != nil {
currentMsg.Timestamp = *timestamp
}
if pinned != nil {
currentMsg.Pinned = *pinned
}
if hidden != nil {
currentMsg.Hidden = *hidden
}
// Store the updated Message
uuid, err := w.upsertMessage(currentMsg)
if err != nil {
return 0, err
}
channelID := &id.ID{}
copy(channelID[:], currentMsg.ChannelID)
go w.receivedMessageCB(uuid, channelID, true)
return uuid, nil
}
// upsertMessage is a helper function that will update an existing record
// if Message.ID is specified. Otherwise, it will perform an insert.
func (w *wasmModel) upsertMessage(msg *Message) (uint64, error) {
// Convert to jsObject
newMessageJson, err := json.Marshal(newMessage)
newMessageJson, err := json.Marshal(msg)
if err != nil {
return 0, errors.Errorf("Unable to marshal Message: %+v", err)
}
@@ -413,42 +412,30 @@ func (w *wasmModel) receiveHelper(
return 0, errors.Errorf("Unable to marshal Message: %+v", err)
}
// Unset the primaryKey for inserts so that it can be auto-populated and
// incremented
if !isUpdate {
messageObj.Delete("id")
}
// Store message to database
result, err := indexedDb.Put(w.db, messageStoreName, messageObj)
if err != nil && !strings.Contains(err.Error(),
"at least one key does not satisfy the uniqueness requirements") {
// Only return non-unique constraint errors so that the case
// below this one can be hit and handle duplicate entries properly.
return 0, errors.Errorf("Unable to put Message: %+v", err)
}
// NOTE: Sometimes the insert fails to return an error but hits a duplicate
// insert, so this fallthrough returns the UUID entry in that case.
if result.IsUndefined() {
msgID := message.ID{}
copy(msgID[:], newMessage.MessageID)
msg, errLookup := w.msgIDLookup(msgID)
if errLookup == nil && msg.ID != 0 {
return msg.ID, nil
}
return 0, errors.Errorf("uuid lookup failure: %+v", err)
msgIdObj, err := impl.Put(w.db, messageStoreName, messageObj)
if err != nil {
return 0, errors.Errorf("Unable to put Message: %+v\n%s",
err, newMessageJson)
}
uuid := uint64(result.Int())
jww.DEBUG.Printf("Successfully stored message %d", uuid)
return uuid, nil
uuid := msgIdObj.Int()
jww.DEBUG.Printf("Successfully stored message %d", uuid)
return uint64(uuid), nil
}
// GetMessage returns the message with the given [channel.MessageID].
func (w *wasmModel) GetMessage(
messageID message.ID) (channels.ModelMessage, error) {
lookupResult, err := w.msgIDLookup(messageID)
msgIDStr := impl.EncodeBytes(messageID.Marshal())
resultObj, err := impl.GetIndex(w.db, messageStoreName,
messageStoreMessageIndex, msgIDStr)
if err != nil {
return channels.ModelMessage{}, err
}
lookupResult, err := valueToMessage(resultObj)
if err != nil {
return channels.ModelMessage{}, err
}
@@ -469,6 +456,15 @@ func (w *wasmModel) GetMessage(
}
}
lease := time.Duration(0)
if len(lookupResult.Lease) > 0 {
leaseInt, err := strconv.ParseInt(lookupResult.Lease, 10, 64)
if err != nil {
return channels.ModelMessage{}, err
}
lease = time.Duration(leaseInt)
}
return channels.ModelMessage{
UUID: lookupResult.ID,
Nickname: lookupResult.Nickname,
@@ -476,7 +472,7 @@ func (w *wasmModel) GetMessage(
ChannelID: channelId,
ParentMessageID: parentMsgId,
Timestamp: lookupResult.Timestamp,
Lease: lookupResult.Lease,
Lease: lease,
Status: channels.SentStatus(lookupResult.Status),
Hidden: lookupResult.Hidden,
Pinned: lookupResult.Pinned,
@@ -490,10 +486,8 @@ func (w *wasmModel) GetMessage(
// DeleteMessage removes a message with the given messageID from storage.
func (w *wasmModel) DeleteMessage(messageID message.ID) error {
msgId := js.ValueOf(base64.StdEncoding.EncodeToString(messageID.Bytes()))
err := indexedDb.DeleteIndex(w.db, messageStoreName,
messageStoreMessageIndex, pkeyName, msgId)
err := impl.DeleteIndex(w.db, messageStoreName,
messageStoreMessageIndex, pkeyName, impl.EncodeBytes(messageID.Marshal()))
if err != nil {
return err
}
@@ -509,22 +503,14 @@ func (w *wasmModel) MuteUser(
go w.mutedUserCB(channelID, pubKey, unmute)
}
// msgIDLookup gets the UUID of the Message with the given messageID.
func (w *wasmModel) msgIDLookup(messageID message.ID) (*Message, error) {
msgIDStr := js.ValueOf(base64.StdEncoding.EncodeToString(messageID.Bytes()))
resultObj, err := indexedDb.GetIndex(w.db, messageStoreName,
messageStoreMessageIndex, msgIDStr)
if err != nil {
return nil, err
} else if resultObj.IsUndefined() {
return nil, errors.Errorf("no message for %s found", msgIDStr)
}
// Process result into string
// valueToMessage is a helper for converting js.Value to Message.
func valueToMessage(msgObj js.Value) (*Message, error) {
resultMsg := &Message{}
err = json.Unmarshal([]byte(utils.JsToJson(resultObj)), resultMsg)
if err != nil {
return nil, err
}
return resultMsg, nil
return resultMsg, json.Unmarshal([]byte(utils.JsToJson(msgObj)), resultMsg)
}
// valueToFile is a helper for converting js.Value to File.
func valueToFile(fileObj js.Value) (*File, error) {
resultFile := &File{}
return resultFile, json.Unmarshal([]byte(utils.JsToJson(fileObj)), resultFile)
}
Loading