Skip to content
Snippets Groups Projects
Commit 7ff1ea08 authored by Jake Taylor's avatar Jake Taylor
Browse files

finish implementation of EventModel

parent 59086601
No related branches found
No related tags found
2 merge requests!60Revert "Fail a test to be sure it works",!4Xx 4114/index db
...@@ -25,6 +25,9 @@ import ( ...@@ -25,6 +25,9 @@ import (
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
) )
// dbTimeout is the global timeout for operations with the storage context.Contact
const dbTimeout = time.Second
// jsObject is the Golang type translation for a JavaScript object // jsObject is the Golang type translation for a JavaScript object
type jsObject map[string]interface{} type jsObject map[string]interface{}
...@@ -34,6 +37,12 @@ type wasmModel struct { ...@@ -34,6 +37,12 @@ type wasmModel struct {
db *idb.Database db *idb.Database
} }
// newContext builds a context for database operations
func newContext() (context.Context, context.CancelFunc) {
ctx, cancel := context.WithTimeout(context.Background(), dbTimeout)
return ctx, cancel
}
// JoinChannel is called whenever a channel is joined locally. // JoinChannel is called whenever a channel is joined locally.
func (w *wasmModel) JoinChannel(channel *cryptoBroadcast.Channel) { func (w *wasmModel) JoinChannel(channel *cryptoBroadcast.Channel) {
parentErr := errors.New("failed to JoinChannel") parentErr := errors.New("failed to JoinChannel")
...@@ -61,7 +70,8 @@ func (w *wasmModel) JoinChannel(channel *cryptoBroadcast.Channel) { ...@@ -61,7 +70,8 @@ func (w *wasmModel) JoinChannel(channel *cryptoBroadcast.Channel) {
} }
// Prepare the Transaction // Prepare the Transaction
ctx := context.Background() ctx, cancel := newContext()
defer cancel()
txn, err := w.db.Transaction(idb.TransactionReadWrite, channelsStoreName) txn, err := w.db.Transaction(idb.TransactionReadWrite, channelsStoreName)
if err != nil { if err != nil {
jww.ERROR.Printf("%+v", errors.Wrapf(parentErr, jww.ERROR.Printf("%+v", errors.Wrapf(parentErr,
...@@ -97,7 +107,8 @@ func (w *wasmModel) LeaveChannel(channelID *id.ID) { ...@@ -97,7 +107,8 @@ func (w *wasmModel) LeaveChannel(channelID *id.ID) {
parentErr := errors.New("failed to LeaveChannel") parentErr := errors.New("failed to LeaveChannel")
// Prepare the Transaction // Prepare the Transaction
ctx := context.Background() ctx, cancel := newContext()
defer cancel()
txn, err := w.db.Transaction(idb.TransactionReadWrite, channelsStoreName) txn, err := w.db.Transaction(idb.TransactionReadWrite, channelsStoreName)
if err != nil { if err != nil {
jww.ERROR.Printf("%+v", errors.Wrapf(parentErr, jww.ERROR.Printf("%+v", errors.Wrapf(parentErr,
...@@ -128,70 +139,16 @@ func (w *wasmModel) LeaveChannel(channelID *id.ID) { ...@@ -128,70 +139,16 @@ func (w *wasmModel) LeaveChannel(channelID *id.ID) {
} }
} }
// receiveHelper is a private helper for receiving any sort of message
func (w *wasmModel) receiveHelper(channelID []byte, messageID []byte,
parentId []byte, senderUsername string,
text string, timestamp time.Time, lease time.Duration) error {
// Build object
newMessage := Message{
Id: messageID,
SenderUsername: senderUsername,
ChannelId: channelID,
ParentMessageId: parentId,
Timestamp: timestamp,
Lease: lease,
Status: uint8(channels.Delivered),
Hidden: false,
Pinned: false,
Text: text,
}
// Convert to jsObject
newMessageJson, err := json.Marshal(&newMessage)
if err != nil {
return errors.Errorf("Unable to marshal Message: %+v", err)
}
var messageObj *jsObject
err = json.Unmarshal(newMessageJson, messageObj)
if err != nil {
return errors.Errorf("Unable to unmarshal Message: %+v", err)
}
// Prepare the Transaction
ctx := context.Background()
txn, err := w.db.Transaction(idb.TransactionReadWrite, messageStoreName)
if err != nil {
return errors.Errorf("Unable to create Transaction: %+v", err)
}
store, err := txn.ObjectStore(messageStoreName)
if err != nil {
return errors.Errorf("Unable to get ObjectStore: %+v", err)
}
// Perform the operation
_, err = store.Add(js.ValueOf(*messageObj))
if err != nil {
return errors.Errorf("Unable to Add Message: %+v", err)
}
// Wait for the operation to return
err = txn.Await(ctx)
if err != nil {
return errors.Errorf("Adding Message failed: %+v", err)
}
return nil
}
// ReceiveMessage is called whenever a message is received on a given channel // ReceiveMessage is called whenever a message is received on a given channel
// It may be called multiple times on the same message, it is incumbent on // It may be called multiple times on the same message, it is incumbent on
// the user of the API to filter such called by message ID. // the user of the API to filter such called by message ID.
func (w *wasmModel) ReceiveMessage(channelID *id.ID, messageID cryptoChannel.MessageID, func (w *wasmModel) ReceiveMessage(channelID *id.ID, messageID cryptoChannel.MessageID,
senderUsername string, text string, timestamp time.Time, lease time.Duration, senderUsername string, text string, timestamp time.Time, lease time.Duration,
round rounds.Round) { _ rounds.Round) {
parentErr := errors.New("failed to ReceiveMessage") parentErr := errors.New("failed to ReceiveMessage")
err := w.receiveHelper(channelID.Marshal(), messageID.Bytes(), nil, err := w.receiveHelper(buildMessage(channelID.Marshal(), messageID.Bytes(), nil,
senderUsername, text, timestamp, lease) senderUsername, channels.Delivered, text, timestamp, lease))
if err != nil { if err != nil {
jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error())) jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error()))
} }
...@@ -204,12 +161,11 @@ func (w *wasmModel) ReceiveMessage(channelID *id.ID, messageID cryptoChannel.Mes ...@@ -204,12 +161,11 @@ func (w *wasmModel) ReceiveMessage(channelID *id.ID, messageID cryptoChannel.Mes
// the initial message, as a result it may be important to buffer replies. // the initial message, as a result it may be important to buffer replies.
func (w *wasmModel) ReceiveReply(channelID *id.ID, messageID cryptoChannel.MessageID, func (w *wasmModel) ReceiveReply(channelID *id.ID, messageID cryptoChannel.MessageID,
replyTo cryptoChannel.MessageID, senderUsername string, replyTo cryptoChannel.MessageID, senderUsername string,
text string, timestamp time.Time, lease time.Duration, text string, timestamp time.Time, lease time.Duration, _ rounds.Round) {
round rounds.Round) {
parentErr := errors.New("failed to ReceiveReply") parentErr := errors.New("failed to ReceiveReply")
err := w.receiveHelper(channelID.Marshal(), messageID.Bytes(), err := w.receiveHelper(buildMessage(channelID.Marshal(), messageID.Bytes(),
replyTo.Bytes(), senderUsername, text, timestamp, lease) replyTo.Bytes(), senderUsername, channels.Delivered, text, timestamp, lease))
if err != nil { if err != nil {
jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error())) jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error()))
} }
...@@ -222,12 +178,11 @@ func (w *wasmModel) ReceiveReply(channelID *id.ID, messageID cryptoChannel.Messa ...@@ -222,12 +178,11 @@ func (w *wasmModel) ReceiveReply(channelID *id.ID, messageID cryptoChannel.Messa
// the initial message, as a result it may be important to buffer reactions. // the initial message, as a result it may be important to buffer reactions.
func (w *wasmModel) ReceiveReaction(channelID *id.ID, messageID cryptoChannel.MessageID, func (w *wasmModel) ReceiveReaction(channelID *id.ID, messageID cryptoChannel.MessageID,
reactionTo cryptoChannel.MessageID, senderUsername string, reactionTo cryptoChannel.MessageID, senderUsername string,
reaction string, timestamp time.Time, lease time.Duration, reaction string, timestamp time.Time, lease time.Duration, _ rounds.Round) {
round rounds.Round) {
parentErr := errors.New("failed to ReceiveReaction") parentErr := errors.New("failed to ReceiveReaction")
err := w.receiveHelper(channelID.Marshal(), messageID.Bytes(), err := w.receiveHelper(buildMessage(channelID.Marshal(), messageID.Bytes(),
reactionTo.Bytes(), senderUsername, reaction, timestamp, lease) reactionTo.Bytes(), senderUsername, channels.Delivered, reaction, timestamp, lease))
if err != nil { if err != nil {
jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error())) jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error()))
} }
...@@ -237,30 +192,115 @@ func (w *wasmModel) ReceiveReaction(channelID *id.ID, messageID cryptoChannel.Me ...@@ -237,30 +192,115 @@ func (w *wasmModel) ReceiveReaction(channelID *id.ID, messageID cryptoChannel.Me
//designated as "sent" and that delivery is unknown. //designated as "sent" and that delivery is unknown.
func (w *wasmModel) MessageSent(channelID *id.ID, messageID cryptoChannel.MessageID, func (w *wasmModel) MessageSent(channelID *id.ID, messageID cryptoChannel.MessageID,
myUsername string, text string, timestamp time.Time, myUsername string, text string, timestamp time.Time,
lease time.Duration, round rounds.Round) { lease time.Duration, _ rounds.Round) {
parentErr := errors.New("failed to MessageSent")
err := w.receiveHelper(buildMessage(channelID.Marshal(), messageID.Bytes(),
nil, myUsername, channels.Sent, text, timestamp, lease))
if err != nil {
jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error()))
}
} }
// ReplySent is called whenever the user sends a reply. It should be // ReplySent is called whenever the user sends a reply. It should be
// designated as "sent" and that delivery is unknown. // designated as "sent" and that delivery is unknown.
func (w *wasmModel) ReplySent(channelID *id.ID, messageID cryptoChannel.MessageID, func (w *wasmModel) ReplySent(channelID *id.ID, messageID cryptoChannel.MessageID,
replyTo cryptoChannel.MessageID, myUsername string, text string, replyTo cryptoChannel.MessageID, myUsername string, text string,
timestamp time.Time, lease time.Duration, round rounds.Round) { timestamp time.Time, lease time.Duration, _ rounds.Round) {
parentErr := errors.New("failed to ReplySent")
err := w.receiveHelper(buildMessage(channelID.Marshal(), messageID.Bytes(),
replyTo.Bytes(), myUsername, channels.Sent, text, timestamp, lease))
if err != nil {
jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error()))
}
} }
// ReactionSent is called whenever the user sends a reply. It should be // ReactionSent is called whenever the user sends a reply. It should be
// designated as "sent" and that delivery is unknown. // designated as "sent" and that delivery is unknown.
func (w *wasmModel) ReactionSent(channelID *id.ID, messageID cryptoChannel.MessageID, func (w *wasmModel) ReactionSent(channelID *id.ID, messageID cryptoChannel.MessageID,
reactionTo cryptoChannel.MessageID, senderUsername string, reactionTo cryptoChannel.MessageID, myUsername string,
reaction string, timestamp time.Time, lease time.Duration, reaction string, timestamp time.Time, lease time.Duration, _ rounds.Round) {
round rounds.Round) { parentErr := errors.New("failed to ReactionSent")
err := w.receiveHelper(buildMessage(channelID.Marshal(), messageID.Bytes(),
reactionTo.Bytes(), myUsername, channels.Sent, reaction, timestamp, lease))
if err != nil {
jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error()))
}
} }
// UpdateSentStatus is called whenever the sent status of a message // UpdateSentStatus is called whenever the SentStatus of a message
// has changed // has changed
func (w *wasmModel) UpdateSentStatus(messageID cryptoChannel.MessageID, func (w *wasmModel) UpdateSentStatus(messageID cryptoChannel.MessageID,
status channels.SentStatus) { status channels.SentStatus) {
parentErr := errors.New("failed to UpdateSentStatus")
newMessage := &Message{
Id: messageID.Bytes(),
Status: uint8(status),
}
err := w.receiveHelper(newMessage)
if err != nil {
jww.ERROR.Printf("%+v", errors.Wrap(parentErr, err.Error()))
}
}
// buildMessage is a private helper that converts typical [channels.EventModel]
// inputs into a basic Message structure for insertion into storage
func buildMessage(channelID []byte, messageID []byte,
parentId []byte, senderUsername string, status channels.SentStatus,
text string, timestamp time.Time, lease time.Duration) *Message {
return &Message{
Id: messageID,
SenderUsername: senderUsername,
ChannelId: channelID,
ParentMessageId: parentId,
Timestamp: timestamp,
Lease: lease,
Status: uint8(status),
Hidden: false,
Pinned: false,
Text: text,
}
}
// receiveHelper is a private helper for receiving any sort of message
func (w *wasmModel) receiveHelper(newMessage *Message) error {
// Convert to jsObject
newMessageJson, err := json.Marshal(newMessage)
if err != nil {
return errors.Errorf("Unable to marshal Message: %+v", err)
}
var messageObj *jsObject
err = json.Unmarshal(newMessageJson, messageObj)
if err != nil {
return errors.Errorf("Unable to unmarshal Message: %+v", err)
}
// Prepare the Transaction
ctx, cancel := newContext()
defer cancel()
txn, err := w.db.Transaction(idb.TransactionReadWrite, messageStoreName)
if err != nil {
return errors.Errorf("Unable to create Transaction: %+v", err)
}
store, err := txn.ObjectStore(messageStoreName)
if err != nil {
return errors.Errorf("Unable to get ObjectStore: %+v", err)
}
// Perform the upsert (put) operation
_, err = store.Put(js.ValueOf(*messageObj))
if err != nil {
return errors.Errorf("Unable to upsert Message: %+v", err)
}
// Wait for the operation to return
err = txn.Await(ctx)
if err != nil {
return errors.Errorf("Upserting Message failed: %+v", err)
}
return nil
} }
...@@ -96,10 +96,10 @@ func v1Upgrade(db *idb.Database) error { ...@@ -96,10 +96,10 @@ func v1Upgrade(db *idb.Database) error {
} }
// Build User ObjectStore // Build User ObjectStore
_, err = db.CreateObjectStore(userStoreName, storeOpts) //_, err = db.CreateObjectStore(userStoreName, storeOpts)
if err != nil { //if err != nil {
return err // return err
} //}
// Build Channel ObjectStore // Build Channel ObjectStore
_, err = db.CreateObjectStore(channelsStoreName, storeOpts) _, err = db.CreateObjectStore(channelsStoreName, storeOpts)
......
...@@ -19,7 +19,7 @@ const ( ...@@ -19,7 +19,7 @@ const (
// Text representation of the names of the various [idb.ObjectStore]. // Text representation of the names of the various [idb.ObjectStore].
messageStoreName = "messages" messageStoreName = "messages"
userStoreName = "users" //userStoreName = "users"
channelsStoreName = "channels" channelsStoreName = "channels"
// Message index names. // Message index names.
...@@ -54,10 +54,10 @@ type Message struct { ...@@ -54,10 +54,10 @@ type Message struct {
// User defines the IndexedDb representation of a single User. // User defines the IndexedDb representation of a single User.
// A User has many Message. // A User has many Message.
type User struct { //type User struct {
Id []byte `json:"id"` // Matches pkeyName // Id []byte `json:"id"` // Matches pkeyName
Username string `json:"username"` // Username string `json:"username"`
} //}
// Channel defines the IndexedDb representation of a single Channel // Channel defines the IndexedDb representation of a single Channel
// A Channel has many Message. // A Channel has many Message.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment