Skip to content
Snippets Groups Projects
Select Git revision
  • 1245f8e6c38a31988b756ce221ae47b6c04247b5
  • release default
  • 11-22-implement-kv-interface-defined-in-collectiveversionedkvgo
  • master protected
  • XX-4688/DbEncoding
  • hotfix/update
  • @XX-4682/Files
  • hotfix/XX-4655
  • dev protected
  • project/HavenNotifications
  • XX-4602/SilentMessageType
  • jono/npmTest
  • wasmTest2
  • XX-4461/FileUpload
  • XX-4505/blockuser
  • XX-4441
  • Jakub/Emoji-CI-Test
  • testing/websockets
  • fastReg
  • fast-registration
  • NewHostPool
  • v0.3.22
  • v0.3.21
  • v0.3.20
  • v0.3.18
  • v0.3.17
  • v0.3.16
  • v0.3.15
  • v0.3.14
  • v0.3.13
  • v0.3.12
  • v0.3.11
  • v0.3.10
  • v0.3.9
  • v0.3.8
  • v0.3.7
  • v0.3.6
  • v0.3.5
  • v0.3.4
  • 812b395df518ce096d01d5292596ca26f8fe92d9c4487ddfa515e190a51aa1a1
  • 76ba08e2dfa1798412a265404fa271840b52c035869111fce8e8cdb23a036a5a
41 results

implementation.go

Blame
  • Jono Wenger's avatar
    Jono Wenger authored
    79b49116
    History
    implementation.go 16.26 KiB
    ////////////////////////////////////////////////////////////////////////////////
    // Copyright © 2022 xx foundation                                             //
    //                                                                            //
    // Use of this source code is governed by a license that can be found in the  //
    // LICENSE file.                                                              //
    ////////////////////////////////////////////////////////////////////////////////
    
    //go:build js && wasm
    
    package main
    
    import (
    	"crypto/ed25519"
    	"encoding/json"
    	"strconv"
    	"strings"
    	"syscall/js"
    	"time"
    
    	"github.com/hack-pad/go-indexeddb/idb"
    	"github.com/pkg/errors"
    	jww "github.com/spf13/jwalterweatherman"
    
    	"gitlab.com/elixxir/client/v4/channels"
    	"gitlab.com/elixxir/client/v4/cmix/rounds"
    	cryptoBroadcast "gitlab.com/elixxir/crypto/broadcast"
    	cryptoChannel "gitlab.com/elixxir/crypto/channel"
    	"gitlab.com/elixxir/crypto/message"
    	"gitlab.com/elixxir/xxdk-wasm/indexedDb/impl"
    	wChannels "gitlab.com/elixxir/xxdk-wasm/indexedDb/worker/channels"
    	"gitlab.com/elixxir/xxdk-wasm/utils"
    	"gitlab.com/xx_network/primitives/id"
    )
    
    // wasmModel implements [channels.EventModel] interface, which uses the channels
    // system passed an object that adheres to in order to get events on the
    // channel.
    // NOTE: This model is NOT thread safe - it is the responsibility of the
    // caller to ensure that its methods are called sequentially.
    type wasmModel struct {
    	db                *idb.Database
    	cipher            cryptoChannel.Cipher
    	receivedMessageCB wChannels.MessageReceivedCallback
    	deletedMessageCB  wChannels.DeletedMessageCallback
    	mutedUserCB       wChannels.MutedUserCallback
    }
    
    // JoinChannel is called whenever a channel is joined locally.
    func (w *wasmModel) JoinChannel(channel *cryptoBroadcast.Channel) {
    	parentErr := errors.New("failed to JoinChannel")
    
    	// Build object
    	newChannel := Channel{
    		ID:          channel.ReceptionID.Marshal(),
    		Name:        channel.Name,
    		Description: channel.Description,
    	}
    
    	// Convert to jsObject
    	newChannelJson, err := json.Marshal(&newChannel)
    	if err != nil {
    		jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
    			"Unable to marshal Channel: %+v", err))
    		return
    	}
    	channelObj, err := utils.JsonToJS(newChannelJson)
    	if err != nil {
    		jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
    			"Unable to marshal Channel: %+v", err))
    		return
    	}
    
    	_, err = impl.Put(w.db, channelStoreName, channelObj)
    	if err != nil {
    		jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
    			"Unable to put Channel: %+v", err))
    	}
    }
    
    // LeaveChannel is called whenever a channel is left locally.
    func (w *wasmModel) LeaveChannel(channelID *id.ID) {
    	parentErr := errors.New("failed to LeaveChannel")
    
    	// Delete the channel from storage
    	err := impl.Delete(w.db, channelStoreName, js.ValueOf(channelID.String()))
    	if err != nil {
    		jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
    			"Unable to delete Channel: %+v", err))
    		return
    	}
    
    	// Clean up lingering data
    	err = w.deleteMsgByChannel(channelID)
    	if err != nil {
    		jww.ERROR.Printf("%+v", errors.WithMessagef(parentErr,
    			"Deleting Channel's Message data failed: %+v", err))
    		return
    	}
    	jww.DEBUG.Printf("Successfully deleted channel: %s", channelID)
    }
    
    // deleteMsgByChannel is a private helper that uses messageStoreChannelIndex
    // to delete all Message with the given Channel ID.
    func (w *wasmModel) deleteMsgByChannel(channelID *id.ID) error {
    	parentErr := errors.New("failed to deleteMsgByChannel")
    
    	// Prepare the Transaction
    	txn, err := w.db.Transaction(idb.TransactionReadWrite, messageStoreName)
    	if err != nil {
    		return errors.WithMessagef(parentErr,
    			"Unable to create Transaction: %+v", err)
    	}
    	store, err := txn.ObjectStore(messageStoreName)
    	if err != nil {
    		return errors.WithMessagef(parentErr,
    			"Unable to get ObjectStore: %+v", err)
    	}
    	index, err := store.Index(messageStoreChannelIndex)
    	if err != nil {
    		return errors.WithMessagef(parentErr,
    			"Unable to get Index: %+v", err)
    	}
    
    	// Perform the operation
    	keyRange, err := idb.NewKeyRangeOnly(impl.EncodeBytes(channelID.Marshal()))
    	cursorRequest, err := index.OpenCursorRange(keyRange, idb.CursorNext)
    	if err != nil {
    		return errors.WithMessagef(parentErr, "Unable to open Cursor: %+v", err)
    	}
    	ctx, cancel := impl.NewContext()
    	err = cursorRequest.Iter(ctx,
    		func(cursor *idb.CursorWithValue) error {
    			_, err := cursor.Delete()
    			return err
    		})
    	cancel()
    	if err != nil {
    		return errors.WithMessagef(parentErr,
    			"Unable to delete Message data: %+v", err)
    	}
    	return nil
    }
    
    // ReceiveMessage is called whenever a message is received on a given channel.
    //
    // It may be called multiple times on the same message; it is incumbent on the
    // user of the API to filter such called by message ID.
    func (w *wasmModel) ReceiveMessage(channelID *id.ID, messageID message.ID,
    	nickname, text string, pubKey ed25519.PublicKey, dmToken uint32,
    	codeset uint8, timestamp time.Time, lease time.Duration, round rounds.Round,
    	mType channels.MessageType, status channels.SentStatus, hidden bool) uint64 {
    	textBytes := []byte(text)
    	var err error
    
    	// Handle encryption, if it is present
    	if w.cipher != nil {
    		textBytes, err = w.cipher.Encrypt([]byte(text))
    		if err != nil {
    			jww.ERROR.Printf("Failed to encrypt Message: %+v", err)
    			return 0
    		}
    	}
    
    	msgToInsert := buildMessage(
    		channelID.Marshal(), messageID.Bytes(), nil, nickname,
    		textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType,
    		false, hidden, status)
    
    	uuid, err := w.upsertMessage(msgToInsert)
    	if err != nil {
    		jww.ERROR.Printf("Failed to receive Message: %+v", err)
    		return 0
    	}
    
    	go w.receivedMessageCB(uuid, channelID, false)
    	return uuid
    }
    
    // ReceiveReply is called whenever a message is received that is a reply on a
    // given channel. It may be called multiple times on the same message; it is
    // incumbent on the user of the API to filter such called by message ID.
    //
    // Messages may arrive our of order, so a reply, in theory, can arrive before
    // the initial message. As a result, it may be important to buffer replies.
    func (w *wasmModel) ReceiveReply(channelID *id.ID, messageID,
    	replyTo message.ID, nickname, text string, pubKey ed25519.PublicKey,
    	dmToken uint32, codeset uint8, timestamp time.Time, lease time.Duration,
    	round rounds.Round, mType channels.MessageType, status channels.SentStatus,
    	hidden bool) uint64 {
    	textBytes := []byte(text)
    	var err error
    
    	// Handle encryption, if it is present
    	if w.cipher != nil {
    		textBytes, err = w.cipher.Encrypt([]byte(text))
    		if err != nil {
    			jww.ERROR.Printf("Failed to encrypt Message: %+v", err)
    			return 0
    		}
    	}
    
    	msgToInsert := buildMessage(channelID.Marshal(), messageID.Bytes(),
    		replyTo.Bytes(), nickname, textBytes, pubKey, dmToken, codeset,
    		timestamp, lease, round.ID, mType, hidden, false, status)
    
    	uuid, err := w.upsertMessage(msgToInsert)
    	if err != nil {
    		jww.ERROR.Printf("Failed to receive reply: %+v", err)
    		return 0
    	}
    	go w.receivedMessageCB(uuid, channelID, false)
    	return uuid
    }
    
    // ReceiveReaction is called whenever a reaction to a message is received on a
    // given channel. It may be called multiple times on the same reaction; it is
    // incumbent on the user of the API to filter such called by message ID.
    //
    // Messages may arrive our of order, so a reply, in theory, can arrive before
    // the initial message. As a result, it may be important to buffer reactions.
    func (w *wasmModel) ReceiveReaction(channelID *id.ID, messageID,
    	reactionTo message.ID, nickname, reaction string, pubKey ed25519.PublicKey,
    	dmToken uint32, codeset uint8, timestamp time.Time, lease time.Duration,
    	round rounds.Round, mType channels.MessageType, status channels.SentStatus,
    	hidden bool) uint64 {
    	textBytes := []byte(reaction)
    	var err error
    
    	// Handle encryption, if it is present
    	if w.cipher != nil {
    		textBytes, err = w.cipher.Encrypt([]byte(reaction))
    		if err != nil {
    			jww.ERROR.Printf("Failed to encrypt Message: %+v", err)
    			return 0
    		}
    	}
    
    	msgToInsert := buildMessage(
    		channelID.Marshal(), messageID.Bytes(), reactionTo.Bytes(), nickname,
    		textBytes, pubKey, dmToken, codeset, timestamp, lease, round.ID, mType,
    		false, hidden, status)
    
    	uuid, err := w.upsertMessage(msgToInsert)
    	if err != nil {
    		jww.ERROR.Printf("Failed to receive reaction: %+v", err)
    		return 0
    	}
    	go w.receivedMessageCB(uuid, channelID, false)
    	return uuid
    }
    
    // UpdateFromUUID is called whenever a message at the UUID is modified.
    //
    // messageID, timestamp, round, pinned, and hidden are all nillable and may be
    // updated based upon the UUID at a later date. If a nil value is passed, then
    // make no update.
    //
    // Returns an error if the message cannot be updated. It must return
    // channels.NoMessageErr if the message does not exist.
    func (w *wasmModel) UpdateFromUUID(uuid uint64, messageID *message.ID,
    	timestamp *time.Time, round *rounds.Round, pinned, hidden *bool,
    	status *channels.SentStatus) error {
    	parentErr := "failed to UpdateFromUUID"
    
    	// Convert messageID to the key generated by json.Marshal
    	key := js.ValueOf(uuid)
    
    	// Use the key to get the existing Message
    	msgObj, err := impl.Get(w.db, messageStoreName, key)
    	if err != nil {
    		if strings.Contains(err.Error(), impl.ErrDoesNotExist) {
    			return errors.WithMessage(channels.NoMessageErr, parentErr)
    		}
    		return errors.WithMessage(err, parentErr)
    	}
    
    	currentMsg, err := valueToMessage(msgObj)
    	if err != nil {
    		return errors.WithMessagef(err,
    			"%s Failed to marshal Message", parentErr)
    	}
    
    	_, err = w.updateMessage(currentMsg, messageID, timestamp,
    		round, pinned, hidden, status)
    	if err != nil {
    		return errors.WithMessage(err, parentErr)
    	}
    	return nil
    }
    
    // UpdateFromMessageID is called whenever a message with the message ID is
    // modified.
    //
    // The API needs to return the UUID of the modified message that can be
    // referenced at a later time.
    //
    // timestamp, round, pinned, and hidden are all nillable and may be updated
    // based upon the UUID at a later date. If a nil value is passed, then make
    // no update.
    //
    // Returns an error if the message cannot be updated. It must return
    // channels.NoMessageErr if the message does not exist.
    func (w *wasmModel) UpdateFromMessageID(messageID message.ID,
    	timestamp *time.Time, round *rounds.Round, pinned, hidden *bool,
    	status *channels.SentStatus) (uint64, error) {
    	parentErr := "failed to UpdateFromMessageID"
    
    	msgObj, err := impl.GetIndex(w.db, messageStoreName,
    		messageStoreMessageIndex, impl.EncodeBytes(messageID.Marshal()))
    	if err != nil {
    		if strings.Contains(err.Error(), impl.ErrDoesNotExist) {
    			return 0, errors.WithMessage(channels.NoMessageErr, parentErr)
    		}
    		return 0, errors.WithMessage(err, parentErr)
    	}
    
    	currentMsg, err := valueToMessage(msgObj)
    	if err != nil {
    		return 0, errors.WithMessagef(err,
    			"%s Failed to marshal Message", parentErr)
    	}
    
    	uuid, err := w.updateMessage(currentMsg, &messageID, timestamp,
    		round, pinned, hidden, status)
    	if err != nil {
    		return 0, errors.WithMessage(err, parentErr)
    	}
    	return uuid, nil
    }
    
    // buildMessage is a private helper that converts typical [channels.EventModel]
    // inputs into a basic Message structure for insertion into storage.
    //
    // NOTE: ID is not set inside this function because we want to use the
    // autoincrement key by default. If you are trying to overwrite an existing
    // message, then you need to set it manually yourself.
    func buildMessage(channelID, messageID, parentID []byte, nickname string,
    	text []byte, pubKey ed25519.PublicKey, dmToken uint32, codeset uint8,
    	timestamp time.Time, lease time.Duration, round id.Round,
    	mType channels.MessageType, pinned, hidden bool,
    	status channels.SentStatus) *Message {
    	return &Message{
    		MessageID:       messageID,
    		Nickname:        nickname,
    		ChannelID:       channelID,
    		ParentMessageID: parentID,
    		Timestamp:       timestamp,
    		Lease:           strconv.FormatInt(int64(lease), 10),
    		Status:          uint8(status),
    		Hidden:          hidden,
    		Pinned:          pinned,
    		Text:            text,
    		Type:            uint16(mType),
    		Round:           uint64(round),
    		// User Identity Info
    		Pubkey:         pubKey,
    		DmToken:        dmToken,
    		CodesetVersion: codeset,
    	}
    }
    
    // updateMessage is a helper for updating a stored message.
    func (w *wasmModel) updateMessage(currentMsg *Message, messageID *message.ID,
    	timestamp *time.Time, round *rounds.Round, pinned, hidden *bool,
    	status *channels.SentStatus) (uint64, error) {
    
    	if status != nil {
    		currentMsg.Status = uint8(*status)
    	}
    	if messageID != nil {
    		currentMsg.MessageID = messageID.Bytes()
    	}
    
    	if round != nil {
    		currentMsg.Round = uint64(round.ID)
    	}
    
    	if timestamp != nil {
    		currentMsg.Timestamp = *timestamp
    	}
    
    	if pinned != nil {
    		currentMsg.Pinned = *pinned
    	}
    
    	if hidden != nil {
    		currentMsg.Hidden = *hidden
    	}
    
    	// Store the updated Message
    	uuid, err := w.upsertMessage(currentMsg)
    	if err != nil {
    		return 0, err
    	}
    	channelID := &id.ID{}
    	copy(channelID[:], currentMsg.ChannelID)
    	go w.receivedMessageCB(uuid, channelID, true)
    
    	return uuid, nil
    }
    
    // upsertMessage is a helper function that will update an existing record
    // if Message.ID is specified. Otherwise, it will perform an insert.
    func (w *wasmModel) upsertMessage(msg *Message) (uint64, error) {
    	// Convert to jsObject
    	newMessageJson, err := json.Marshal(msg)
    	if err != nil {
    		return 0, errors.Errorf("Unable to marshal Message: %+v", err)
    	}
    	messageObj, err := utils.JsonToJS(newMessageJson)
    	if err != nil {
    		return 0, errors.Errorf("Unable to marshal Message: %+v", err)
    	}
    
    	// Store message to database
    	msgIdObj, err := impl.Put(w.db, messageStoreName, messageObj)
    	if err != nil {
    		return 0, errors.Errorf("Unable to put Message: %+v\n%s",
    			err, newMessageJson)
    	}
    
    	uuid := msgIdObj.Int()
    	jww.DEBUG.Printf("Successfully stored message %d", uuid)
    	return uint64(uuid), nil
    }
    
    // GetMessage returns the message with the given [channel.MessageID].
    func (w *wasmModel) GetMessage(
    	messageID message.ID) (channels.ModelMessage, error) {
    	msgIDStr := impl.EncodeBytes(messageID.Marshal())
    
    	resultObj, err := impl.GetIndex(w.db, messageStoreName,
    		messageStoreMessageIndex, msgIDStr)
    	if err != nil {
    		return channels.ModelMessage{}, err
    	}
    
    	lookupResult, err := valueToMessage(resultObj)
    	if err != nil {
    		return channels.ModelMessage{}, err
    	}
    
    	var channelId *id.ID
    	if lookupResult.ChannelID != nil {
    		channelId, err = id.Unmarshal(lookupResult.ChannelID)
    		if err != nil {
    			return channels.ModelMessage{}, err
    		}
    	}
    
    	var parentMsgId message.ID
    	if lookupResult.ParentMessageID != nil {
    		parentMsgId, err = message.UnmarshalID(lookupResult.ParentMessageID)
    		if err != nil {
    			return channels.ModelMessage{}, err
    		}
    	}
    
    	lease := time.Duration(0)
    	if len(lookupResult.Lease) > 0 {
    		leaseInt, err := strconv.ParseInt(lookupResult.Lease, 10, 64)
    		if err != nil {
    			return channels.ModelMessage{}, err
    		}
    		lease = time.Duration(leaseInt)
    	}
    
    	return channels.ModelMessage{
    		UUID:            lookupResult.ID,
    		Nickname:        lookupResult.Nickname,
    		MessageID:       messageID,
    		ChannelID:       channelId,
    		ParentMessageID: parentMsgId,
    		Timestamp:       lookupResult.Timestamp,
    		Lease:           lease,
    		Status:          channels.SentStatus(lookupResult.Status),
    		Hidden:          lookupResult.Hidden,
    		Pinned:          lookupResult.Pinned,
    		Content:         lookupResult.Text,
    		Type:            channels.MessageType(lookupResult.Type),
    		Round:           id.Round(lookupResult.Round),
    		PubKey:          lookupResult.Pubkey,
    		CodesetVersion:  lookupResult.CodesetVersion,
    	}, nil
    }
    
    // DeleteMessage removes a message with the given messageID from storage.
    func (w *wasmModel) DeleteMessage(messageID message.ID) error {
    	err := impl.DeleteIndex(w.db, messageStoreName,
    		messageStoreMessageIndex, pkeyName, impl.EncodeBytes(messageID.Marshal()))
    	if err != nil {
    		return err
    	}
    
    	go w.deletedMessageCB(messageID)
    
    	return nil
    }
    
    // MuteUser is called whenever a user is muted or unmuted.
    func (w *wasmModel) MuteUser(
    	channelID *id.ID, pubKey ed25519.PublicKey, unmute bool) {
    	go w.mutedUserCB(channelID, pubKey, unmute)
    }
    
    // valueToMessage is a helper for converting js.Value to Message.
    func valueToMessage(msgObj js.Value) (*Message, error) {
    	resultMsg := &Message{}
    	return resultMsg, json.Unmarshal([]byte(utils.JsToJson(msgObj)), resultMsg)
    }
    
    // valueToFile is a helper for converting js.Value to File.
    func valueToFile(fileObj js.Value) (*File, error) {
    	resultFile := &File{}
    	return resultFile, json.Unmarshal([]byte(utils.JsToJson(fileObj)), resultFile)
    }