Skip to content
Snippets Groups Projects
Commit b2023935 authored by Josh Brooks's avatar Josh Brooks
Browse files

Add EventModel wrapper to bindings

parent 74213037
No related branches found
No related tags found
4 merge requests!510Release,!419rewrote the health tracker to both consider if there are waiting rounds and...,!367Xx 4150/channel bindings,!340Project/channels
......@@ -103,7 +103,7 @@ func (cm *ChannelsManager) GetID() int {
// - e2eID - The tracked e2e object ID. This can be retrieved using [E2e.GetID].
// - udID - The tracked UD object ID. This can be retrieved using
// [UserDiscovery.GetID].
func NewChannelsManager(e2eID, udID int) (*ChannelsManager, error) {
func NewChannelsManager(e2eID, udID, eventModelId int) (*ChannelsManager, error) {
// Get user from singleton
user, err := e2eTrackerSingleton.get(e2eID)
if err != nil {
......@@ -115,14 +115,19 @@ func NewChannelsManager(e2eID, udID int) (*ChannelsManager, error) {
return nil, err
}
eventModel, err := eventModelTrackerSingleton.get(eventModelId)
if err != nil {
return nil, err
}
nameService, err := udMan.api.StartChannelNameService()
if err != nil {
return nil, err
}
// fixme: there is nothing adhering to event model
// Construct new channels manager
m := channels.NewManager(user.api.GetStorage().GetKV(), user.api.GetCmix(),
user.api.GetRng(), nameService, nil)
user.api.GetRng(), nameService, eventModel.api)
// Add channel to singleton and return
return channelManagerTrackerSingleton.make(m), nil
......@@ -132,8 +137,7 @@ func NewChannelsManager(e2eID, udID int) (*ChannelsManager, error) {
// been joined.
//
// Parameters:
// - channelJson - A JSON encoded [ChannelDef]. This may be retrieved from
// [Channel.Get], for example..
// - channelJson - A JSON encoded [ChannelDef].
func (cm *ChannelsManager) JoinChannel(channelJson []byte) error {
// Unmarshal channel definition
def := ChannelDef{}
......@@ -191,6 +195,7 @@ func (cm *ChannelsManager) GetChannels() ([]byte, error) {
//
// Returns:
// - []byte - A JSON encoded channel ID ([id.ID]).
//
// JSON Example:
// "dGVzdAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD"
func (cm *ChannelsManager) GetChannelId(channelJson []byte) ([]byte, error) {
......@@ -284,17 +289,13 @@ func (cm *ChannelsManager) ReplayChannel(marshalledChanId []byte) error {
// JSON Example:
// {
// "MessageId": "0kitNxoFdsF4q1VMSI/xPzfCnGB2l+ln2+7CTHjHbJw=",
// "RoundId": {
// "Rounds": [
// 123
// ]
// },
// "Rounds":[1,5,9],
// "EphId": 0
// }
type ChannelSendReport struct {
MessageId []byte
RoundId RoundsList
EphId int64
RoundsList
EphId int64
}
// SendGeneric is used to send a raw message over a channel. In general, it
......@@ -571,9 +572,9 @@ func constructChannelSendReport(channelMessageId cryptoChannel.MessageID,
roundId id.Round, ephId ephemeral.Id) ([]byte, error) {
// Construct send report
chanSendReport := ChannelSendReport{
MessageId: channelMessageId.Bytes(),
RoundId: makeRoundsList(roundId),
EphId: ephId.Int64(),
MessageId: channelMessageId.Bytes(),
RoundsList: makeRoundsList(roundId),
EphId: ephId.Int64(),
}
// Marshal send report
......@@ -584,7 +585,7 @@ func constructChannelSendReport(channelMessageId cryptoChannel.MessageID,
// Channel Receiving Logic & Callback Registration //
////////////////////////////////////////////////////////////////////////////////
// ReceivedChannelMessage is a report structure returned via the
// ReceivedChannelMessageReport is a report structure returned via the
// ChannelMessageReceptionCallback. This report gives the context
// for the channel the message was sent to and the message itself.
// This is returned via the callback as JSON marshalled bytes.
......@@ -593,26 +594,24 @@ func constructChannelSendReport(channelMessageId cryptoChannel.MessageID,
// {
// "ChannelId": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
// "MessageId": "3S6DiVjWH9mLmjy1oaam/3x45bJQzOW6u2KgeUn59wA=",
// "ReplyTo":"cxMyGUFJ+Ff1Xp2X+XkIpOnNAQEZmv8SNP5eYH4tCik=",
// "MessageType": 42,
// "SenderUsername": "hunter2",
// "Content": "YmFuX2JhZFVTZXI=",
// "Timestamp": 1662502150335283000,
// "Lease": 25,
// "RoundIds": {
// "Rounds": [
// 11420
// ]
// }
// "Rounds": [ 1, 4, 9],
// }
type ReceivedChannelMessage struct {
type ReceivedChannelMessageReport struct {
ChannelId []byte
MessageId []byte
ReplyTo []byte
MessageType int
SenderUsername string
Content []byte
Timestamp int64
Lease int64
RoundIds RoundsList
RoundsList
}
// ChannelMessageReceptionCallback is the callback that returns the
......@@ -642,7 +641,7 @@ func (cm *ChannelsManager) RegisterReceiveHandler(messageType int,
content []byte, timestamp time.Time, lease time.Duration,
round rounds.Round) {
rcm := ReceivedChannelMessage{
rcm := ReceivedChannelMessageReport{
ChannelId: channelID.Marshal(),
MessageId: messageID.Bytes(),
MessageType: int(messageType),
......@@ -650,7 +649,7 @@ func (cm *ChannelsManager) RegisterReceiveHandler(messageType int,
Content: content,
Timestamp: timestamp.UnixNano(),
Lease: int64(lease),
RoundIds: makeRoundsList(round.ID),
RoundsList: makeRoundsList(round.ID),
}
listenerCb.Callback(json.Marshal(rcm))
......
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package bindings
import (
"encoding/json"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/channels"
"gitlab.com/elixxir/client/cmix/rounds"
cryptoBroadcast "gitlab.com/elixxir/crypto/broadcast"
cryptoChannel "gitlab.com/elixxir/crypto/channel"
"gitlab.com/xx_network/crypto/signature/rsa"
"gitlab.com/xx_network/primitives/id"
"sync"
"time"
)
////////////////////////////////////////////////////////////////////////////////
// Singleton Tracker //
////////////////////////////////////////////////////////////////////////////////
// eventModelTrackerSingleton is used to track EventModel objects
// so that they can be referenced by ID back over the bindings.
var eventModelTrackerSingleton = &eventModelTracker{
tracked: make(map[int]*EventModel),
count: 0,
}
// eventModelTracker is a singleton used to keep track of extant
// EventModel objects, preventing race conditions created by passing it
// over the bindings.
type eventModelTracker struct {
tracked map[int]*EventModel
count int
mux sync.RWMutex
}
// make create an EventModel from an [channels.EventModel], assigns it a
// unique ID, and adds it to the eventModelTracker.
func (emt *eventModelTracker) make(eventModel channels.EventModel) *EventModel {
emt.mux.Lock()
defer emt.mux.Unlock()
id := emt.count
emt.count++
emt.tracked[id] = &EventModel{
api: eventModel,
id: id,
}
return emt.tracked[id]
}
// get an EventModel from the eventModelTracker given its ID.
func (emt *eventModelTracker) get(id int) (*EventModel, error) {
emt.mux.RLock()
defer emt.mux.RUnlock()
c, exist := emt.tracked[id]
if !exist {
return nil, errors.Errorf(
"Cannot get EventModel for ID %d, does not exist", id)
}
return c, nil
}
// delete removes a EventModel from the eventModelTracker.
func (emt *eventModelTracker) delete(id int) {
emt.mux.Lock()
defer emt.mux.Unlock()
delete(emt.tracked, id)
}
////////////////////////////////////////////////////////////////////////////////
// Basic EventModel API //
////////////////////////////////////////////////////////////////////////////////
type EventModel struct {
api channels.EventModel
id int
}
// NewEventModel IS CURRENTLY UNIMPLEMENTED.
func NewEventModel() *EventModel {
return eventModelTrackerSingleton.make(nil)
}
// JoinChannel is called whenever a channel is joined locally.
//
// Parameters:
// - channelJson - A JSON encoded [ChannelDef].
func (e *EventModel) JoinChannel(channelJson []byte) {
// Unmarshal channel definition
def := ChannelDef{}
err := json.Unmarshal(channelJson, &def)
if err != nil {
jww.ERROR.Printf("Could not parse channel JSON: %+v", err)
return
}
// Construct ID using the embedded cryptographic information
channelId, err := cryptoBroadcast.NewChannelID(def.Name, def.Description,
def.Salt, def.PubKey)
if err != nil {
jww.ERROR.Printf("Could not construct channel ID: %+v", err)
return
}
// Construct public key into object
rsaPubKey, err := rsa.LoadPublicKeyFromPem(def.PubKey)
if err != nil {
jww.ERROR.Printf("Could not read public key: %+v", err)
return
}
// Construct cryptographic channel object
channel := &cryptoBroadcast.Channel{
ReceptionID: channelId,
Name: def.Name,
Description: def.Description,
Salt: def.Salt,
RsaPubKey: rsaPubKey,
}
e.api.JoinChannel(channel)
return
}
// LeaveChannel is called whenever a channel is left locally.
//
// Parameters:
// - []byte - A JSON marshalled channel ID ([id.ID]). This may be retrieved
// using ChannelsManager.GetChannelId.
func (e *EventModel) LeaveChannel(marshalledChanId []byte) {
// Unmarshal channel ID
channelId, err := id.Unmarshal(marshalledChanId)
if err != nil {
jww.ERROR.Printf("Could not parse channel ID (%s): %+v",
marshalledChanId, err)
return
}
e.api.LeaveChannel(channelId)
return
}
// ReceiveMessage is called whenever a message is received on a given channel
// It may be called multiple times on the same message, it is incumbent on
// the user of the API to filter such called by message ID.
//
// Parameters:
// - reportJson - A JSON marshalled ReceivedChannelMessageReport.
func (e *EventModel) ReceiveMessage(reportJson []byte) {
report, err := parseChannelMessageReport(reportJson)
if err != nil {
jww.ERROR.Printf("%+v", err)
return
}
// fixme: the internal API should accept an object, probably
// just use receivedChannelMessageReport in the channels package
e.api.ReceiveMessage(report.ChannelID, report.MessageID,
report.SenderUsername, report.Content, report.Timestamp,
report.Lease, report.Round)
return
}
// ReceiveReply is called whenever a message is received which is a reply
// on a given channel. It may be called multiple times on the same message,
// it is incumbent on the user of the API to filter such called by message ID
// Messages may arrive our of order, so a reply in theory can arrive before
// the initial message, as a result it may be important to buffer replies.
//
// Parameters:
// - reportJson - A JSON marshalled ReceivedChannelMessageReport.
func (e *EventModel) ReceiveReply(reportJson []byte) {
report, err := parseChannelMessageReport(reportJson)
if err != nil {
jww.ERROR.Printf("%+v", err)
return
}
// fixme: the internal API should accept an object, probably
// just use receivedChannelMessageReport in the channels package. This i
e.api.ReceiveReply(report.ChannelID, report.MessageID, report.ReplyTo,
report.SenderUsername, report.Content, report.Timestamp,
report.Lease, report.Round)
}
// ReceiveReaction is called whenever a Content to a message is received
// on a given channel. It may be called multiple times on the same Content,
// it is incumbent on the user of the API to filter such called by message ID
// Messages may arrive our of order, so a reply in theory can arrive before
// the initial message, as a result it may be important to buffer reactions.
//
// Parameters:
// - reportJson - A JSON marshalled ReceivedChannelMessageReport.
func (e *EventModel) ReceiveReaction(reportJson []byte) {
report, err := parseChannelMessageReport(reportJson)
if err != nil {
jww.ERROR.Printf("%+v", err)
return
}
// fixme: the internal API should accept an object, probably
// just move receivedChannelMessageReport to the channels package and export it.
e.api.ReceiveReaction(report.ChannelID, report.MessageID, report.ReplyTo,
report.SenderUsername, report.Content, report.Timestamp, report.Lease,
report.Round)
}
// receivedChannelMessageReport is the Golang representation of
// a channel message report.
type receivedChannelMessageReport struct {
// Channel ID is the message of the channel this message was received on.
ChannelID *id.ID
// MessageID is the ID of the channel message received.
MessageID cryptoChannel.MessageID
// ReplyTo is overloaded to be a reply or react to,
// depending on the context of the received message
// (EventModel.ReceiveReaction or EventModel.ReceiveReply).
ReplyTo cryptoChannel.MessageID
// SenderUsername is the username of the sender.
SenderUsername string
// Content is the payload of the message. This is overloaded with
// reaction in the [EventModel.ReceiveReaction]. This may
// either be text or an emoji.
Content string
// The timestamp of the message.
Timestamp time.Time
// The duration of this channel message.
Lease time.Duration
// The round this message was sent on.
Round rounds.Round
}
// parseChannelMessageReport converts the JSON representation of
// a ReceivedChannelMessageReport into the Golang representation,
// receivedChannelMessageReport.
func parseChannelMessageReport(reportJson []byte) (
receivedChannelMessageReport, error) {
// Unmarshal message report
messageReport := ReceivedChannelMessageReport{}
err := json.Unmarshal(reportJson, &messageReport)
if err != nil {
return receivedChannelMessageReport{},
errors.Errorf("Could not parse received message report (%s): %+v",
reportJson, err)
}
// Unmarshal channel ID
chanId, err := id.Unmarshal(messageReport.ChannelId)
if err != nil {
return receivedChannelMessageReport{},
errors.Errorf("Could not parse channel ID (%s): %+v",
messageReport.ChannelId, err)
}
// Unmarshal message ID
msgId := cryptoChannel.MessageID{}
copy(msgId[:], messageReport.MessageId)
// Unmarshal reply to/react to message ID
replyTo := cryptoChannel.MessageID{}
copy(replyTo[:], messageReport.ReplyTo)
// Construct Round
rnd := rounds.Round{ID: id.Round(messageReport.Rounds[0])}
return receivedChannelMessageReport{
ChannelID: chanId,
MessageID: msgId,
ReplyTo: replyTo,
SenderUsername: messageReport.SenderUsername,
Content: string(messageReport.Content),
Timestamp: time.Unix(0, messageReport.Timestamp),
Lease: time.Duration(messageReport.Lease),
Round: rnd,
}, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment