Skip to content
Snippets Groups Projects

fix for latest client release

Merged Jake Taylor requested to merge release into master
4 files
+ 22
2
Compare changes
  • Side-by-side
  • Inline
Files
4
@@ -7,51 +7,51 @@
//go:build js && wasm
package channelEventModel
package main
import (
"bytes"
"crypto/ed25519"
"encoding/json"
"strings"
"sync"
"syscall/js"
"time"
"github.com/hack-pad/go-indexeddb/idb"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/v4/cmix/rounds"
"gitlab.com/elixxir/client/v4/dm"
"gitlab.com/elixxir/xxdk-wasm/indexedDb"
"gitlab.com/elixxir/xxdk-wasm/utils"
"gitlab.com/xx_network/primitives/id"
"github.com/hack-pad/go-indexeddb/idb"
cryptoChannel "gitlab.com/elixxir/crypto/channel"
"gitlab.com/elixxir/crypto/message"
"gitlab.com/elixxir/xxdk-wasm/indexedDb/impl"
"gitlab.com/elixxir/xxdk-wasm/utils"
"gitlab.com/xx_network/primitives/id"
)
// wasmModel implements [dm.Receiver] interface, which uses the channels
// system passed an object that adheres to in order to get events on the
// channel.
// wasmModel implements dm.EventModel interface backed by IndexedDb.
// NOTE: This model is NOT thread safe - it is the responsibility of the
// caller to ensure that its methods are called sequentially.
type wasmModel struct {
db *idb.Database
cipher cryptoChannel.Cipher
receivedMessageCB MessageReceivedCallback
updateMux sync.Mutex
}
// joinConversation is used for joining new conversations.
func (w *wasmModel) joinConversation(nickname string,
pubKey ed25519.PublicKey, dmToken uint32, codeset uint8) error {
parentErr := errors.New("failed to joinConversation")
// upsertConversation is used for joining or updating a Conversation.
func (w *wasmModel) upsertConversation(nickname string,
pubKey ed25519.PublicKey, partnerToken uint32, codeset uint8,
blocked bool) error {
parentErr := errors.New("[DM indexedDB] failed to upsertConversation")
// Build object
newConvo := Conversation{
Pubkey: pubKey,
Nickname: nickname,
Token: dmToken,
Token: partnerToken,
CodesetVersion: codeset,
Blocked: false,
Blocked: blocked,
}
// Convert to jsObject
@@ -66,7 +66,7 @@ func (w *wasmModel) joinConversation(nickname string,
"Unable to marshal Conversation: %+v", err)
}
_, err = indexedDb.Put(w.db, conversationStoreName, convoObj)
_, err = impl.Put(w.db, conversationStoreName, convoObj)
if err != nil {
return errors.WithMessagef(parentErr,
"Unable to put Conversation: %+v", err)
@@ -74,21 +74,23 @@ func (w *wasmModel) joinConversation(nickname string,
return nil
}
// buildMessage is a private helper that converts typical [dm.Receiver]
// inputs into a basic Message structure for insertion into storage.
// buildMessage is a private helper that converts typical dm.EventModel inputs
// into a basic Message structure for insertion into storage.
//
// NOTE: ID is not set inside this function because we want to use the
// autoincrement key by default. If you are trying to overwrite an existing
// message, then you need to set it manually yourself.
func buildMessage(messageID, parentID []byte, text []byte,
pubKey ed25519.PublicKey, timestamp time.Time, round id.Round,
mType dm.MessageType, status dm.Status) *Message {
func buildMessage(messageID, parentID, text []byte, partnerKey,
senderKey ed25519.PublicKey, timestamp time.Time, round id.Round,
mType dm.MessageType, codeset uint8, status dm.Status) *Message {
return &Message{
MessageID: messageID,
ConversationPubKey: pubKey,
ConversationPubKey: partnerKey[:],
ParentMessageID: parentID,
Timestamp: timestamp,
SenderPubKey: senderKey[:],
Status: uint8(status),
CodesetVersion: codeset,
Text: text,
Type: uint16(mType),
Round: uint64(round),
@@ -96,199 +98,79 @@ func buildMessage(messageID, parentID []byte, text []byte,
}
func (w *wasmModel) Receive(messageID message.ID, nickname string, text []byte,
pubKey ed25519.PublicKey, dmToken uint32, codeset uint8,
timestamp time.Time, round rounds.Round, mType dm.MessageType,
status dm.Status) uint64 {
parentErr := errors.New("failed to Receive")
// If there is no extant Conversation, create one.
_, err := indexedDb.Get(
w.db, conversationStoreName, utils.CopyBytesToJS(pubKey))
if err != nil {
if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) {
err = w.joinConversation(nickname, pubKey, dmToken, codeset)
if err != nil {
jww.ERROR.Printf("%+v", err)
}
} else {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to get Conversation: %+v", err))
}
return 0
} else {
jww.DEBUG.Printf("Conversation with %s already joined", nickname)
}
partnerKey, senderKey ed25519.PublicKey, dmToken uint32, codeset uint8, timestamp time.Time,
round rounds.Round, mType dm.MessageType, status dm.Status) uint64 {
parentErr := "[DM indexedDB] failed to Receive"
jww.TRACE.Printf("[DM indexedDB] Receive(%s)", messageID)
// Handle encryption, if it is present
if w.cipher != nil {
text, err = w.cipher.Encrypt(text)
if err != nil {
jww.ERROR.Printf("Failed to encrypt Message: %+v", err)
return 0
}
}
msgToInsert := buildMessage(messageID.Bytes(), nil, text,
pubKey, timestamp, round.ID, mType, status)
uuid, err := w.receiveHelper(msgToInsert, false)
uuid, err := w.receiveWrapper(messageID, nil, nickname, string(text),
partnerKey, senderKey, dmToken, codeset, timestamp, round, mType, status)
if err != nil {
jww.ERROR.Printf("Failed to receive Message: %+v", err)
jww.ERROR.Printf("%+v", errors.WithMessagef(err, parentErr))
return 0
}
go w.receivedMessageCB(uuid, pubKey, false)
return uuid
}
func (w *wasmModel) ReceiveText(messageID message.ID, nickname, text string,
pubKey ed25519.PublicKey, dmToken uint32, codeset uint8,
partnerKey, senderKey ed25519.PublicKey, dmToken uint32, codeset uint8,
timestamp time.Time, round rounds.Round, status dm.Status) uint64 {
parentErr := errors.New("failed to ReceiveText")
parentErr := "[DM indexedDB] failed to ReceiveText"
jww.TRACE.Printf("[DM indexedDB] ReceiveText(%s)", messageID)
// If there is no extant Conversation, create one.
_, err := indexedDb.Get(
w.db, conversationStoreName, utils.CopyBytesToJS(pubKey))
uuid, err := w.receiveWrapper(messageID, nil, nickname, text,
partnerKey, senderKey, dmToken, codeset, timestamp, round,
dm.TextType, status)
if err != nil {
if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) {
err = w.joinConversation(nickname, pubKey, dmToken, codeset)
if err != nil {
jww.ERROR.Printf("%+v", err)
}
} else {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to get Conversation: %+v", err))
}
jww.ERROR.Printf("%+v", errors.WithMessagef(err, parentErr))
return 0
} else {
jww.DEBUG.Printf("Conversation with %s already joined", nickname)
}
// Handle encryption, if it is present
textBytes := []byte(text)
if w.cipher != nil {
textBytes, err = w.cipher.Encrypt(textBytes)
if err != nil {
jww.ERROR.Printf("Failed to encrypt Message: %+v", err)
return 0
}
}
msgToInsert := buildMessage(messageID.Bytes(), nil, textBytes,
pubKey, timestamp, round.ID, dm.TextType, status)
uuid, err := w.receiveHelper(msgToInsert, false)
if err != nil {
jww.ERROR.Printf("Failed to receive Message: %+v", err)
}
go w.receivedMessageCB(uuid, pubKey, false)
return uuid
}
func (w *wasmModel) ReceiveReply(messageID, reactionTo message.ID, nickname,
text string, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8,
text string, partnerKey, senderKey ed25519.PublicKey, dmToken uint32, codeset uint8,
timestamp time.Time, round rounds.Round, status dm.Status) uint64 {
parentErr := errors.New("failed to ReceiveReply")
parentErr := "[DM indexedDB] failed to ReceiveReply"
jww.TRACE.Printf("[DM indexedDB] ReceiveReply(%s)", messageID)
// If there is no extant Conversation, create one.
_, err := indexedDb.Get(
w.db, conversationStoreName, utils.CopyBytesToJS(pubKey))
uuid, err := w.receiveWrapper(messageID, &reactionTo, nickname, text,
partnerKey, senderKey, dmToken, codeset, timestamp, round,
dm.ReplyType, status)
if err != nil {
if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) {
err = w.joinConversation(nickname, pubKey, dmToken, codeset)
if err != nil {
jww.ERROR.Printf("%+v", err)
}
} else {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to get Conversation: %+v", err))
}
jww.ERROR.Printf("%+v", errors.WithMessagef(err, parentErr))
return 0
} else {
jww.DEBUG.Printf("Conversation with %s already joined", nickname)
}
// Handle encryption, if it is present
textBytes := []byte(text)
if w.cipher != nil {
textBytes, err = w.cipher.Encrypt(textBytes)
if err != nil {
jww.ERROR.Printf("Failed to encrypt Message: %+v", err)
return 0
}
}
msgToInsert := buildMessage(messageID.Bytes(), reactionTo.Marshal(), textBytes,
pubKey, timestamp, round.ID, dm.TextType, status)
uuid, err := w.receiveHelper(msgToInsert, false)
if err != nil {
jww.ERROR.Printf("Failed to receive Message: %+v", err)
}
go w.receivedMessageCB(uuid, pubKey, false)
return uuid
}
func (w *wasmModel) ReceiveReaction(messageID, _ message.ID, nickname,
reaction string, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8,
func (w *wasmModel) ReceiveReaction(messageID, reactionTo message.ID, nickname,
reaction string, partnerKey, senderKey ed25519.PublicKey, dmToken uint32, codeset uint8,
timestamp time.Time, round rounds.Round, status dm.Status) uint64 {
parentErr := errors.New("failed to ReceiveText")
parentErr := "[DM indexedDB] failed to ReceiveReaction"
jww.TRACE.Printf("[DM indexedDB] ReceiveReaction(%s)", messageID)
// If there is no extant Conversation, create one.
_, err := indexedDb.Get(
w.db, conversationStoreName, utils.CopyBytesToJS(pubKey))
uuid, err := w.receiveWrapper(messageID, &reactionTo, nickname, reaction,
partnerKey, senderKey, dmToken, codeset, timestamp, round,
dm.ReactionType, status)
if err != nil {
if strings.Contains(err.Error(), indexedDb.ErrDoesNotExist) {
err = w.joinConversation(nickname, pubKey, dmToken, codeset)
if err != nil {
jww.ERROR.Printf("%+v", err)
}
} else {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to get Conversation: %+v", err))
}
jww.ERROR.Printf("%+v", errors.WithMessagef(err, parentErr))
return 0
} else {
jww.DEBUG.Printf("Conversation with %s already joined", nickname)
}
// Handle encryption, if it is present
textBytes := []byte(reaction)
if w.cipher != nil {
textBytes, err = w.cipher.Encrypt(textBytes)
if err != nil {
jww.ERROR.Printf("Failed to encrypt Message: %+v", err)
return 0
}
}
msgToInsert := buildMessage(messageID.Bytes(), nil, textBytes,
pubKey, timestamp, round.ID, dm.ReactionType, status)
uuid, err := w.receiveHelper(msgToInsert, false)
if err != nil {
jww.ERROR.Printf("Failed to receive Message: %+v", err)
}
go w.receivedMessageCB(uuid, pubKey, false)
return uuid
}
func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID,
timestamp time.Time, round rounds.Round, status dm.Status) {
parentErr := errors.New("failed to UpdateSentStatus")
// FIXME: this is a bit of race condition without the mux.
// This should be done via the transactions (i.e., make a
// special version of receiveHelper)
w.updateMux.Lock()
defer w.updateMux.Unlock()
parentErr := errors.New("[DM indexedDB] failed to UpdateSentStatus")
jww.TRACE.Printf(
"[DM indexedDB] UpdateSentStatus(%d, %s, ...)", uuid, messageID)
// Convert messageID to the key generated by json.Marshal
key := js.ValueOf(uuid)
// Use the key to get the existing Message
currentMsg, err := indexedDb.Get(w.db, messageStoreName, key)
currentMsg, err := impl.Get(w.db, messageStoreName, key)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
"Unable to get message: %+v", err))
@@ -318,18 +200,113 @@ func (w *wasmModel) UpdateSentStatus(uuid uint64, messageID message.ID,
}
// Store the updated Message
_, err = w.receiveHelper(newMessage, true)
_, err = w.upsertMessage(newMessage)
if err != nil {
jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error()))
return
}
go w.receivedMessageCB(uuid, newMessage.ConversationPubKey, true)
jww.TRACE.Printf("[DM indexedDB] Calling ReceiveMessageCB(%v, %v, t, f)",
uuid, newMessage.ConversationPubKey)
go w.receivedMessageCB(uuid, newMessage.ConversationPubKey,
true, false)
}
// receiveHelper is a private helper for receiving any sort of message.
func (w *wasmModel) receiveHelper(
newMessage *Message, isUpdate bool) (uint64, error) {
// receiveWrapper is a higher-level wrapper of upsertMessage.
func (w *wasmModel) receiveWrapper(messageID message.ID, parentID *message.ID, nickname,
data string, partnerKey, senderKey ed25519.PublicKey, partnerToken uint32, codeset uint8,
timestamp time.Time, round rounds.Round, mType dm.MessageType, status dm.Status) (uint64, error) {
// Keep track of whether a Conversation was altered
var convoToUpdate *Conversation
// Determine whether Conversation needs to be created
result, err := w.getConversation(partnerKey)
if err != nil {
if !strings.Contains(err.Error(), impl.ErrDoesNotExist) {
return 0, err
} else {
// If there is no extant Conversation, create one.
jww.DEBUG.Printf(
"[DM indexedDB] Joining conversation with %s", nickname)
convoToUpdate = &Conversation{
Pubkey: partnerKey,
Nickname: nickname,
Token: partnerToken,
CodesetVersion: codeset,
Blocked: false,
}
}
} else {
jww.DEBUG.Printf(
"[DM indexedDB] Conversation with %s already joined", nickname)
// Update Conversation if nickname was altered
isFromPartner := bytes.Equal(result.Pubkey, senderKey)
nicknameChanged := result.Nickname != nickname
if isFromPartner && nicknameChanged {
jww.DEBUG.Printf(
"[DM indexedDB] Updating from nickname %s to %s",
result.Nickname, nickname)
convoToUpdate = result
convoToUpdate.Nickname = nickname
}
// Fix conversation if dmToken is altered
dmTokenChanged := result.Token != partnerToken
if isFromPartner && dmTokenChanged {
jww.WARN.Printf(
"[DM indexedDB] Updating from dmToken %d to %d",
result.Token, partnerToken)
convoToUpdate = result
convoToUpdate.Token = partnerToken
}
}
// Update the conversation in storage, if needed
conversationUpdated := convoToUpdate != nil
if conversationUpdated {
err = w.upsertConversation(convoToUpdate.Nickname, convoToUpdate.Pubkey,
convoToUpdate.Token, convoToUpdate.CodesetVersion, convoToUpdate.Blocked)
if err != nil {
return 0, err
}
}
// Handle encryption, if it is present
textBytes := []byte(data)
if w.cipher != nil {
textBytes, err = w.cipher.Encrypt(textBytes)
if err != nil {
return 0, err
}
}
var parentIdBytes []byte
if parentID != nil {
parentIdBytes = parentID.Marshal()
}
msgToInsert := buildMessage(messageID.Bytes(), parentIdBytes, textBytes,
partnerKey, senderKey, timestamp, round.ID, mType, codeset, status)
uuid, err := w.upsertMessage(msgToInsert)
if err != nil {
return 0, err
}
jww.TRACE.Printf("[DM indexedDB] Calling ReceiveMessageCB(%v, %v, f, %t)",
uuid, partnerKey, conversationUpdated)
go w.receivedMessageCB(uuid, partnerKey, false, conversationUpdated)
return uuid, nil
}
// upsertMessage is a helper function that will update an existing record
// if Message.ID is specified. Otherwise, it will perform an insert.
func (w *wasmModel) upsertMessage(msg *Message) (uint64, error) {
// Convert to jsObject
newMessageJson, err := json.Marshal(newMessage)
newMessageJson, err := json.Marshal(msg)
if err != nil {
return 0, errors.Errorf("Unable to marshal Message: %+v", err)
}
@@ -338,46 +315,108 @@ func (w *wasmModel) receiveHelper(
return 0, errors.Errorf("Unable to marshal Message: %+v", err)
}
// Unset the primaryKey for inserts so that it can be auto-populated and
// incremented
if !isUpdate {
messageObj.Delete("id")
// Store message to database
msgIdObj, err := impl.Put(w.db, messageStoreName, messageObj)
if err != nil {
return 0, errors.Errorf("Unable to put Message: %+v\n%s",
err, newMessageJson)
}
// Store message to database
result, err := indexedDb.Put(w.db, messageStoreName, messageObj)
uuid := msgIdObj.Int()
jww.DEBUG.Printf("[DM indexedDB] Successfully stored message %d", uuid)
return uint64(uuid), nil
}
// BlockSender silences messages sent by the indicated sender
// public key.
func (w *wasmModel) BlockSender(senderPubKey ed25519.PublicKey) {
parentErr := "failed to BlockSender"
err := w.setBlocked(senderPubKey, true)
if err != nil {
return 0, errors.Errorf("Unable to put Message: %+v", err)
jww.ERROR.Printf("%+v", errors.WithMessage(err, parentErr))
}
}
// NOTE: Sometimes the insert fails to return an error but hits a duplicate
// insert, so this fallthrough returns the UUID entry in that case.
if result.IsUndefined() {
msgID := message.ID{}
copy(msgID[:], newMessage.MessageID)
uuid, errLookup := w.msgIDLookup(msgID)
if uuid != 0 && errLookup == nil {
return uuid, nil
}
return 0, errors.Errorf("uuid lookup failure: %+v", err)
// UnblockSender allows messages sent by the indicated sender
// public key.
func (w *wasmModel) UnblockSender(senderPubKey ed25519.PublicKey) {
parentErr := "failed to UnblockSender"
err := w.setBlocked(senderPubKey, false)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessage(err, parentErr))
}
uuid := uint64(result.Int())
jww.DEBUG.Printf("Successfully stored message %d", uuid)
}
return uuid, nil
// setBlocked is a helper for blocking/unblocking a given Conversation.
func (w *wasmModel) setBlocked(senderPubKey ed25519.PublicKey, isBlocked bool) error {
// Get current Conversation and set blocked
resultConvo, err := w.getConversation(senderPubKey)
if err != nil {
return err
}
return w.upsertConversation(resultConvo.Nickname, resultConvo.Pubkey,
resultConvo.Token, resultConvo.CodesetVersion, isBlocked)
}
// msgIDLookup gets the UUID of the Message with the given messageID.
func (w *wasmModel) msgIDLookup(messageID message.ID) (uint64, error) {
resultObj, err := indexedDb.GetIndex(w.db, messageStoreName,
messageStoreMessageIndex, utils.CopyBytesToJS(messageID.Marshal()))
// GetConversation returns the conversation held by the model (receiver).
func (w *wasmModel) GetConversation(senderPubKey ed25519.PublicKey) *dm.ModelConversation {
parentErr := "failed to GetConversation"
resultConvo, err := w.getConversation(senderPubKey)
if err != nil {
return 0, err
jww.ERROR.Printf("%+v", errors.WithMessage(err, parentErr))
return nil
}
uuid := uint64(0)
if !resultObj.IsUndefined() {
uuid = uint64(resultObj.Get("id").Int())
return &dm.ModelConversation{
Pubkey: resultConvo.Pubkey,
Nickname: resultConvo.Nickname,
Token: resultConvo.Token,
CodesetVersion: resultConvo.CodesetVersion,
Blocked: resultConvo.Blocked,
}
return uuid, nil
}
// getConversation is a helper that returns the Conversation with the given senderPubKey.
func (w *wasmModel) getConversation(senderPubKey ed25519.PublicKey) (*Conversation, error) {
resultObj, err := impl.Get(w.db, conversationStoreName, impl.EncodeBytes(senderPubKey))
if err != nil {
return nil, err
}
resultConvo := &Conversation{}
err = json.Unmarshal([]byte(utils.JsToJson(resultObj)), resultConvo)
if err != nil {
return nil, err
}
return resultConvo, nil
}
// GetConversations returns any conversations held by the model (receiver).
func (w *wasmModel) GetConversations() []dm.ModelConversation {
parentErr := "failed to GetConversations"
results, err := impl.GetAll(w.db, conversationStoreName)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessage(err, parentErr))
return nil
}
conversations := make([]dm.ModelConversation, len(results))
for i := range results {
resultConvo := &Conversation{}
err = json.Unmarshal([]byte(utils.JsToJson(results[i])), resultConvo)
if err != nil {
jww.ERROR.Printf("%+v", errors.WithMessage(err, parentErr))
return nil
}
conversations[i] = dm.ModelConversation{
Pubkey: resultConvo.Pubkey,
Nickname: resultConvo.Nickname,
Token: resultConvo.Token,
CodesetVersion: resultConvo.CodesetVersion,
Blocked: resultConvo.Blocked,
}
}
return conversations
}
Loading