Skip to content
Snippets Groups Projects
Commit 31d01459 authored by benjamin's avatar benjamin
Browse files

in progress event model change

parent 12f3c10e
No related branches found
No related tags found
4 merge requests!510Release,!419rewrote the health tracker to both consider if there are waiting rounds and...,!413Xx 4263,!340Project/channels
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
package channels package channels
import ( import (
"crypto/ed25519"
"errors" "errors"
"fmt" "fmt"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
...@@ -35,6 +36,8 @@ const ( ...@@ -35,6 +36,8 @@ const (
Failed Failed
) )
var AdminFakePubkey = ed25519.PublicKey{}
// EventModel is an interface which an external party which uses the channels // EventModel is an interface which an external party which uses the channels
// system passed an object which adheres to in order to get events on the // system passed an object which adheres to in order to get events on the
// channel. // channel.
...@@ -62,7 +65,7 @@ type EventModel interface { ...@@ -62,7 +65,7 @@ type EventModel interface {
// Message type is included in the call, it will always be Text (1) // Message type is included in the call, it will always be Text (1)
// for this call, but it may be required in downstream databases // for this call, but it may be required in downstream databases
ReceiveMessage(channelID *id.ID, messageID cryptoChannel.MessageID, ReceiveMessage(channelID *id.ID, messageID cryptoChannel.MessageID,
nickname, text string, identity cryptoChannel.Identity, nickname, text string, pubkey ed25519.PublicKey, codeset uint8,
timestamp time.Time, lease time.Duration, round rounds.Round, timestamp time.Time, lease time.Duration, round rounds.Round,
mType MessageType, status SentStatus) uint64 mType MessageType, status SentStatus) uint64
...@@ -87,7 +90,7 @@ type EventModel interface { ...@@ -87,7 +90,7 @@ type EventModel interface {
// this call, but it may be required in downstream databases // this call, but it may be required in downstream databases
ReceiveReply(channelID *id.ID, messageID cryptoChannel.MessageID, ReceiveReply(channelID *id.ID, messageID cryptoChannel.MessageID,
reactionTo cryptoChannel.MessageID, nickname, text string, reactionTo cryptoChannel.MessageID, nickname, text string,
identity cryptoChannel.Identity, timestamp time.Time, pubkey ed25519.PublicKey, codeset uint8, timestamp time.Time,
lease time.Duration, round rounds.Round, mType MessageType, lease time.Duration, round rounds.Round, mType MessageType,
status SentStatus) uint64 status SentStatus) uint64
...@@ -114,7 +117,7 @@ type EventModel interface { ...@@ -114,7 +117,7 @@ type EventModel interface {
// this call, but it may be required in downstream databases // this call, but it may be required in downstream databases
ReceiveReaction(channelID *id.ID, messageID cryptoChannel.MessageID, ReceiveReaction(channelID *id.ID, messageID cryptoChannel.MessageID,
reactionTo cryptoChannel.MessageID, nickname, reaction string, reactionTo cryptoChannel.MessageID, nickname, reaction string,
identity cryptoChannel.Identity, timestamp time.Time, ipubkey ed25519.PublicKey, codeset uint8, timestamp time.Time,
lease time.Duration, round rounds.Round, mType MessageType, lease time.Duration, round rounds.Round, mType MessageType,
status SentStatus) uint64 status SentStatus) uint64
...@@ -142,7 +145,7 @@ type EventModel interface { ...@@ -142,7 +145,7 @@ type EventModel interface {
// later // later
type MessageTypeReceiveMessage func(channelID *id.ID, type MessageTypeReceiveMessage func(channelID *id.ID,
messageID cryptoChannel.MessageID, messageType MessageType, messageID cryptoChannel.MessageID, messageType MessageType,
nickname string, content []byte, identity cryptoChannel.Identity, nickname string, content []byte, pubkey ed25519.PublicKey, codeset uint8,
timestamp time.Time, lease time.Duration, round rounds.Round, timestamp time.Time, lease time.Duration, round rounds.Round,
status SentStatus) uint64 status SentStatus) uint64
...@@ -212,24 +215,23 @@ func (e *events) triggerEvent(chID *id.ID, umi *userMessageInternal, ts time.Tim ...@@ -212,24 +215,23 @@ func (e *events) triggerEvent(chID *id.ID, umi *userMessageInternal, ts time.Tim
cm := umi.GetChannelMessage() cm := umi.GetChannelMessage()
messageType := MessageType(cm.PayloadType) messageType := MessageType(cm.PayloadType)
identity := cryptoChannel.ConstructIdentity(um.ECCPublicKey)
// Check if the type is already registered // Check if the type is already registered
e.mux.RLock() e.mux.RLock()
listener, exists := e.registered[messageType] listener, exists := e.registered[messageType]
e.mux.RUnlock() e.mux.RUnlock()
if !exists { if !exists {
errStr := fmt.Sprintf("Received message from %s on channel %s in "+ errStr := fmt.Sprintf("Received message from %x on channel %s in "+
"round %d which could not be handled due to unregistered message "+ "round %d which could not be handled due to unregistered message "+
"type %s; Contents: %v", identity.Codename, chID, round.ID, messageType, "type %s; Contents: %v", um.ECCPublicKey, chID, round.ID, messageType,
cm.Payload) cm.Payload)
jww.WARN.Printf(errStr) jww.WARN.Printf(errStr)
return 0, errors.New(errStr) return 0, errors.New(errStr)
} }
// Call the listener. This is already in an instanced event, no new thread needed. // Call the listener. This is already in an instanced event, no new thread needed.
uuid := listener(chID, umi.GetMessageID(), messageType, cm.Nickname, cm.Payload, identity, uuid := listener(chID, umi.GetMessageID(), messageType, cm.Nickname, cm.Payload,
ts, time.Duration(cm.Lease), round, status) um.ECCPublicKey, 0, ts, time.Duration(cm.Lease),
round, status)
return uuid, nil return uuid, nil
} }
...@@ -262,7 +264,7 @@ func (e *events) triggerAdminEvent(chID *id.ID, cm *ChannelMessage, ...@@ -262,7 +264,7 @@ func (e *events) triggerAdminEvent(chID *id.ID, cm *ChannelMessage,
// Call the listener. This is already in an instanced event, no new thread needed. // Call the listener. This is already in an instanced event, no new thread needed.
uuid := listener(chID, messageID, messageType, AdminUsername, cm.Payload, uuid := listener(chID, messageID, messageType, AdminUsername, cm.Payload,
cryptoChannel.Identity{Codename: AdminUsername}, ts, AdminFakePubkey, 0, ts,
time.Duration(cm.Lease), round, status) time.Duration(cm.Lease), round, status)
return uuid, nil return uuid, nil
} }
...@@ -275,15 +277,15 @@ func (e *events) triggerAdminEvent(chID *id.ID, cm *ChannelMessage, ...@@ -275,15 +277,15 @@ func (e *events) triggerAdminEvent(chID *id.ID, cm *ChannelMessage,
// write to the log. // write to the log.
func (e *events) receiveTextMessage(channelID *id.ID, func (e *events) receiveTextMessage(channelID *id.ID,
messageID cryptoChannel.MessageID, messageType MessageType, messageID cryptoChannel.MessageID, messageType MessageType,
nickname string, content []byte, identity cryptoChannel.Identity, nickname string, content []byte, pubkey ed25519.PublicKey, codeset uint8,
timestamp time.Time, lease time.Duration, round rounds.Round, timestamp time.Time, lease time.Duration, round rounds.Round,
status SentStatus) uint64 { status SentStatus) uint64 {
txt := &CMIXChannelText{} txt := &CMIXChannelText{}
if err := proto.Unmarshal(content, txt); err != nil { if err := proto.Unmarshal(content, txt); err != nil {
jww.ERROR.Printf("Failed to text unmarshal message %s from %s on "+ jww.ERROR.Printf("Failed to text unmarshal message %s from %x on "+
"channel %s, type %s, ts: %s, lease: %s, round: %d: %+v", "channel %s, type %s, ts: %s, lease: %s, round: %d: %+v",
messageID, identity.Codename, channelID, messageType, timestamp, lease, messageID, pubkey, channelID, messageType, timestamp, lease,
round.ID, err) round.ID, err)
return 0 return 0
} }
...@@ -294,20 +296,20 @@ func (e *events) receiveTextMessage(channelID *id.ID, ...@@ -294,20 +296,20 @@ func (e *events) receiveTextMessage(channelID *id.ID,
var replyTo cryptoChannel.MessageID var replyTo cryptoChannel.MessageID
copy(replyTo[:], txt.ReplyMessageID) copy(replyTo[:], txt.ReplyMessageID)
return e.model.ReceiveReply(channelID, messageID, replyTo, return e.model.ReceiveReply(channelID, messageID, replyTo,
nickname, txt.Text, identity, timestamp, lease, round, Text, status) nickname, txt.Text, pubkey, codeset, timestamp, lease, round, Text, status)
} else { } else {
jww.ERROR.Printf("Failed process reply to for message %s from %s on "+ jww.ERROR.Printf("Failed process reply to for message %s from %s on "+
"channel %s, type %s, ts: %s, lease: %s, round: %d, returning "+ "channel %s, type %s, ts: %s, lease: %s, round: %d, returning "+
"without reply", "without reply",
messageID, identity.Codename, channelID, messageType, timestamp, lease, messageID, pubkey, codeset, channelID, messageType, timestamp, lease,
round.ID) round.ID)
// Still process the message, but drop the reply because it is // Still process the message, but drop the reply because it is
// malformed // malformed
} }
} }
return e.model.ReceiveMessage(channelID, messageID, nickname, txt.Text, identity, return e.model.ReceiveMessage(channelID, messageID, nickname, txt.Text, pubkey, codeset,
timestamp, lease, round, Text, status) timestamp, lease, round, Text, status)
} }
...@@ -320,24 +322,24 @@ func (e *events) receiveTextMessage(channelID *id.ID, ...@@ -320,24 +322,24 @@ func (e *events) receiveTextMessage(channelID *id.ID,
// reaction is dropped. // reaction is dropped.
func (e *events) receiveReaction(channelID *id.ID, func (e *events) receiveReaction(channelID *id.ID,
messageID cryptoChannel.MessageID, messageType MessageType, messageID cryptoChannel.MessageID, messageType MessageType,
nickname string, content []byte, identity cryptoChannel.Identity, nickname string, content []byte, pubkey ed25519.PublicKey, codeset uint8,
timestamp time.Time, lease time.Duration, round rounds.Round, timestamp time.Time, lease time.Duration, round rounds.Round,
status SentStatus) uint64 { status SentStatus) uint64 {
react := &CMIXChannelReaction{} react := &CMIXChannelReaction{}
if err := proto.Unmarshal(content, react); err != nil { if err := proto.Unmarshal(content, react); err != nil {
jww.ERROR.Printf("Failed to text unmarshal message %s from %s on "+ jww.ERROR.Printf("Failed to text unmarshal message %s from %x on "+
"channel %s, type %s, ts: %s, lease: %s, round: %d: %+v", "channel %s, type %s, ts: %s, lease: %s, round: %d: %+v",
messageID, identity.Codename, channelID, messageType, timestamp, lease, messageID, pubkey, channelID, messageType, timestamp, lease,
round.ID, err) round.ID, err)
return 0 return 0
} }
// check that the reaction is a single emoji and ignore if it isn't // check that the reaction is a single emoji and ignore if it isn't
if err := ValidateReaction(react.Reaction); err != nil { if err := ValidateReaction(react.Reaction); err != nil {
jww.ERROR.Printf("Failed process reaction %s from %s on channel "+ jww.ERROR.Printf("Failed process reaction %s from %x on channel "+
"%s, type %s, ts: %s, lease: %s, round: %d, due to malformed "+ "%s, type %s, ts: %s, lease: %s, round: %d, due to malformed "+
"reaction (%s), ignoring reaction", "reaction (%s), ignoring reaction",
messageID, identity.Codename, channelID, messageType, timestamp, lease, messageID, pubkey, channelID, messageType, timestamp, lease,
round.ID, err) round.ID, err)
return 0 return 0
} }
...@@ -346,12 +348,12 @@ func (e *events) receiveReaction(channelID *id.ID, ...@@ -346,12 +348,12 @@ func (e *events) receiveReaction(channelID *id.ID,
var reactTo cryptoChannel.MessageID var reactTo cryptoChannel.MessageID
copy(reactTo[:], react.ReactionMessageID) copy(reactTo[:], react.ReactionMessageID)
return e.model.ReceiveReaction(channelID, messageID, reactTo, nickname, return e.model.ReceiveReaction(channelID, messageID, reactTo, nickname,
react.Reaction, identity, timestamp, lease, round, Reaction, status) react.Reaction, pubkey, codeset, timestamp, lease, round, Reaction, status)
} else { } else {
jww.ERROR.Printf("Failed process reaction %s from %s on channel "+ jww.ERROR.Printf("Failed process reaction %s from %s on channel "+
"%s, type %s, ts: %s, lease: %s, round: %d, reacting to "+ "%s, type %s, ts: %s, lease: %s, round: %d, reacting to "+
"invalid message, ignoring reaction", "invalid message, ignoring reaction",
messageID, identity.Codename, channelID, messageType, timestamp, lease, messageID, pubkey, codeset, channelID, messageType, timestamp, lease,
round.ID) round.ID)
} }
return 0 return 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment