Skip to content
Snippets Groups Projects
Commit fd700be7 authored by David Stainton's avatar David Stainton
Browse files

Merge remote-tracking branch 'origin/channelsImpl' into XX-4055/ChannelIdentityTracking

parents 19f1732c 72765d5c
No related branches found
No related tags found
6 merge requests!510Release,!419rewrote the health tracker to both consider if there are waiting rounds and...,!371[Channel RSAtoPrivate] Implement Reverse Asymmetric in Client/Broadcast,!354Channels impl,!340Project/channels,!338Xx 4055/channel identity tracking
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
// versions: // versions:
// protoc-gen-go v1.26.0 // protoc-gen-go v1.26.0
// protoc (unknown) // protoc (unknown)
// source: messages.proto // source: channelMessages.proto
package channels package channels
...@@ -49,7 +49,7 @@ type ChannelMessage struct { ...@@ -49,7 +49,7 @@ type ChannelMessage struct {
func (x *ChannelMessage) Reset() { func (x *ChannelMessage) Reset() {
*x = ChannelMessage{} *x = ChannelMessage{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_messages_proto_msgTypes[0] mi := &file_channelMessages_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
...@@ -62,7 +62,7 @@ func (x *ChannelMessage) String() string { ...@@ -62,7 +62,7 @@ func (x *ChannelMessage) String() string {
func (*ChannelMessage) ProtoMessage() {} func (*ChannelMessage) ProtoMessage() {}
func (x *ChannelMessage) ProtoReflect() protoreflect.Message { func (x *ChannelMessage) ProtoReflect() protoreflect.Message {
mi := &file_messages_proto_msgTypes[0] mi := &file_channelMessages_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
...@@ -75,7 +75,7 @@ func (x *ChannelMessage) ProtoReflect() protoreflect.Message { ...@@ -75,7 +75,7 @@ func (x *ChannelMessage) ProtoReflect() protoreflect.Message {
// Deprecated: Use ChannelMessage.ProtoReflect.Descriptor instead. // Deprecated: Use ChannelMessage.ProtoReflect.Descriptor instead.
func (*ChannelMessage) Descriptor() ([]byte, []int) { func (*ChannelMessage) Descriptor() ([]byte, []int) {
return file_messages_proto_rawDescGZIP(), []int{0} return file_channelMessages_proto_rawDescGZIP(), []int{0}
} }
func (x *ChannelMessage) GetLease() int64 { func (x *ChannelMessage) GetLease() int64 {
...@@ -139,7 +139,7 @@ type UserMessage struct { ...@@ -139,7 +139,7 @@ type UserMessage struct {
func (x *UserMessage) Reset() { func (x *UserMessage) Reset() {
*x = UserMessage{} *x = UserMessage{}
if protoimpl.UnsafeEnabled { if protoimpl.UnsafeEnabled {
mi := &file_messages_proto_msgTypes[1] mi := &file_channelMessages_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
...@@ -152,7 +152,7 @@ func (x *UserMessage) String() string { ...@@ -152,7 +152,7 @@ func (x *UserMessage) String() string {
func (*UserMessage) ProtoMessage() {} func (*UserMessage) ProtoMessage() {}
func (x *UserMessage) ProtoReflect() protoreflect.Message { func (x *UserMessage) ProtoReflect() protoreflect.Message {
mi := &file_messages_proto_msgTypes[1] mi := &file_channelMessages_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
...@@ -165,7 +165,7 @@ func (x *UserMessage) ProtoReflect() protoreflect.Message { ...@@ -165,7 +165,7 @@ func (x *UserMessage) ProtoReflect() protoreflect.Message {
// Deprecated: Use UserMessage.ProtoReflect.Descriptor instead. // Deprecated: Use UserMessage.ProtoReflect.Descriptor instead.
func (*UserMessage) Descriptor() ([]byte, []int) { func (*UserMessage) Descriptor() ([]byte, []int) {
return file_messages_proto_rawDescGZIP(), []int{1} return file_channelMessages_proto_rawDescGZIP(), []int{1}
} }
func (x *UserMessage) GetMessage() []byte { func (x *UserMessage) GetMessage() []byte {
...@@ -210,54 +210,55 @@ func (x *UserMessage) GetUsernameLease() int64 { ...@@ -210,54 +210,55 @@ func (x *UserMessage) GetUsernameLease() int64 {
return 0 return 0
} }
var File_messages_proto protoreflect.FileDescriptor var File_channelMessages_proto protoreflect.FileDescriptor
var file_messages_proto_rawDesc = []byte{ var file_channelMessages_proto_rawDesc = []byte{
0x0a, 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x0a, 0x15, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x12, 0x05, 0x70, 0x61, 0x72, 0x73, 0x65, 0x22, 0x7c, 0x0a, 0x0e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x70, 0x61, 0x72, 0x73, 0x65, 0x22, 0x7c,
0x65, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x4c, 0x65, 0x61, 0x0a, 0x0e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x12, 0x12, 0x14, 0x0a, 0x05, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52,
0x18, 0x0a, 0x07, 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x05, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x49,
0x52, 0x07, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x49, 0x44, 0x12, 0x20, 0x0a, 0x0b, 0x50, 0x61, 0x79, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x49, 0x44,
0x6c, 0x6f, 0x61, 0x64, 0x54, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x12, 0x20, 0x0a, 0x0b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x54, 0x79, 0x70, 0x65, 0x18,
0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x50, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x54, 0x79,
0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20,
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0xdd, 0x01, 0x0a, 0x0b, 0x55, 0x73, 0x65, 0x72, 0x4d, 0x65, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0xdd, 0x01, 0x0a,
0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x0b, 0x55, 0x73, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d,
0x30, 0x0a, 0x13, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x69, 0x67, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x13, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61,
0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x13, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20,
0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x01, 0x28, 0x0c, 0x52, 0x13, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53,
0x65, 0x12, 0x1c, 0x0a, 0x09, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x03, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x53, 0x69, 0x67, 0x6e,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67,
0x1a, 0x0a, 0x08, 0x55, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x55, 0x73, 0x65, 0x72, 0x6e, 0x61,
0x09, 0x52, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x22, 0x0a, 0x0c, 0x45, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61,
0x43, 0x43, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x4b, 0x65, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x6d, 0x65, 0x12, 0x22, 0x0a, 0x0c, 0x45, 0x43, 0x43, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x4b,
0x0c, 0x52, 0x0c, 0x65, 0x43, 0x43, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x4b, 0x65, 0x79, 0x12, 0x65, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x65, 0x43, 0x43, 0x50, 0x75, 0x62,
0x24, 0x0a, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x6c, 0x69, 0x63, 0x4b, 0x65, 0x79, 0x12, 0x24, 0x0a, 0x0d, 0x55, 0x73, 0x65, 0x72, 0x6e, 0x61,
0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x6d, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x75,
0x4c, 0x65, 0x61, 0x73, 0x65, 0x42, 0x0b, 0x5a, 0x09, 0x2f, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x42, 0x0b, 0x5a, 0x09,
0x6c, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x2f, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x33,
} }
var ( var (
file_messages_proto_rawDescOnce sync.Once file_channelMessages_proto_rawDescOnce sync.Once
file_messages_proto_rawDescData = file_messages_proto_rawDesc file_channelMessages_proto_rawDescData = file_channelMessages_proto_rawDesc
) )
func file_messages_proto_rawDescGZIP() []byte { func file_channelMessages_proto_rawDescGZIP() []byte {
file_messages_proto_rawDescOnce.Do(func() { file_channelMessages_proto_rawDescOnce.Do(func() {
file_messages_proto_rawDescData = protoimpl.X.CompressGZIP(file_messages_proto_rawDescData) file_channelMessages_proto_rawDescData = protoimpl.X.CompressGZIP(file_channelMessages_proto_rawDescData)
}) })
return file_messages_proto_rawDescData return file_channelMessages_proto_rawDescData
} }
var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_channelMessages_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_messages_proto_goTypes = []interface{}{ var file_channelMessages_proto_goTypes = []interface{}{
(*ChannelMessage)(nil), // 0: parse.ChannelMessage (*ChannelMessage)(nil), // 0: parse.ChannelMessage
(*UserMessage)(nil), // 1: parse.UserMessage (*UserMessage)(nil), // 1: parse.UserMessage
} }
var file_messages_proto_depIdxs = []int32{ var file_channelMessages_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type 0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type 0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension type_name
...@@ -265,13 +266,13 @@ var file_messages_proto_depIdxs = []int32{ ...@@ -265,13 +266,13 @@ var file_messages_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for field type_name 0, // [0:0] is the sub-list for field type_name
} }
func init() { file_messages_proto_init() } func init() { file_channelMessages_proto_init() }
func file_messages_proto_init() { func file_channelMessages_proto_init() {
if File_messages_proto != nil { if File_channelMessages_proto != nil {
return return
} }
if !protoimpl.UnsafeEnabled { if !protoimpl.UnsafeEnabled {
file_messages_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { file_channelMessages_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ChannelMessage); i { switch v := v.(*ChannelMessage); i {
case 0: case 0:
return &v.state return &v.state
...@@ -283,7 +284,7 @@ func file_messages_proto_init() { ...@@ -283,7 +284,7 @@ func file_messages_proto_init() {
return nil return nil
} }
} }
file_messages_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { file_channelMessages_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*UserMessage); i { switch v := v.(*UserMessage); i {
case 0: case 0:
return &v.state return &v.state
...@@ -300,18 +301,18 @@ func file_messages_proto_init() { ...@@ -300,18 +301,18 @@ func file_messages_proto_init() {
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{ File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_messages_proto_rawDesc, RawDescriptor: file_channelMessages_proto_rawDesc,
NumEnums: 0, NumEnums: 0,
NumMessages: 2, NumMessages: 2,
NumExtensions: 0, NumExtensions: 0,
NumServices: 0, NumServices: 0,
}, },
GoTypes: file_messages_proto_goTypes, GoTypes: file_channelMessages_proto_goTypes,
DependencyIndexes: file_messages_proto_depIdxs, DependencyIndexes: file_channelMessages_proto_depIdxs,
MessageInfos: file_messages_proto_msgTypes, MessageInfos: file_channelMessages_proto_msgTypes,
}.Build() }.Build()
File_messages_proto = out.File File_channelMessages_proto = out.File
file_messages_proto_rawDesc = nil file_channelMessages_proto_rawDesc = nil
file_messages_proto_goTypes = nil file_channelMessages_proto_goTypes = nil
file_messages_proto_depIdxs = nil file_channelMessages_proto_depIdxs = nil
} }
File moved
...@@ -22,18 +22,37 @@ var ( ...@@ -22,18 +22,37 @@ var (
"already been registered") "already been registered")
) )
// EventModel is an interface which an external party which uses the channels
// system passed an object which adheres to in order to get events on the channel
type EventModel interface { type EventModel interface {
// JoinChannel is called whenever a channel is joined locally
JoinChannel(channel cryptoBroadcast.Channel) JoinChannel(channel cryptoBroadcast.Channel)
// LeaveChannel is called whenever a channel is left locally
LeaveChannel(ChannelID *id.ID) LeaveChannel(ChannelID *id.ID)
// ReceiveMessage is called whenever a message is received on a given channel
// It may be called multiple times on the same message, it is incumbent on
// the user of the API to filter such called by message ID
ReceiveMessage(ChannelID *id.ID, MessageID cryptoChannel.MessageID, ReceiveMessage(ChannelID *id.ID, MessageID cryptoChannel.MessageID,
SenderUsername string, text string, SenderUsername string, text string,
timestamp time.Time, lease time.Duration, round rounds.Round) timestamp time.Time, lease time.Duration, round rounds.Round)
// ReceiveReply is called whenever a message is received which is a reply
// on a given channel. It may be called multiple times on the same message,
// it is incumbent on the user of the API to filter such called by message ID
// Messages may arrive our of order, so a reply in theory can arrive before
// the initial message, as a result it may be important to buffer replies.
ReceiveReply(ChannelID *id.ID, MessageID cryptoChannel.MessageID, ReceiveReply(ChannelID *id.ID, MessageID cryptoChannel.MessageID,
ReplyTo cryptoChannel.MessageID, SenderUsername string, ReplyTo cryptoChannel.MessageID, SenderUsername string,
text string, timestamp time.Time, lease time.Duration, text string, timestamp time.Time, lease time.Duration,
round rounds.Round) round rounds.Round)
// ReceiveReaction is called whenever a reaction to a message is received
// on a given channel. It may be called multiple times on the same reaction,
// it is incumbent on the user of the API to filter such called by message ID
// Messages may arrive our of order, so a reply in theory can arrive before
// the initial message, as a result it may be important to buffer reactions.
ReceiveReaction(ChannelID *id.ID, MessageID cryptoChannel.MessageID, ReceiveReaction(ChannelID *id.ID, MessageID cryptoChannel.MessageID,
ReactionTo cryptoChannel.MessageID, SenderUsername string, ReactionTo cryptoChannel.MessageID, SenderUsername string,
Reaction string, timestamp time.Time, lease time.Duration, Reaction string, timestamp time.Time, lease time.Duration,
...@@ -57,6 +76,8 @@ type events struct { ...@@ -57,6 +76,8 @@ type events struct {
mux sync.RWMutex mux sync.RWMutex
} }
// initEvents initializes the event model and registers default message type
// handlers
func initEvents(model EventModel) *events { func initEvents(model EventModel) *events {
e := &events{ e := &events{
model: model, model: model,
...@@ -71,6 +92,11 @@ func initEvents(model EventModel) *events { ...@@ -71,6 +92,11 @@ func initEvents(model EventModel) *events {
return e return e
} }
// RegisterReceiveHandler is used to register handlers for non default message
// types s they can be processed by modules. it is important that such modules
// sync up with the event model implementation.
// There can only be one handler per message type, and this will return an error
// on a multiple registration.
func (e *events) RegisterReceiveHandler(messageType MessageType, func (e *events) RegisterReceiveHandler(messageType MessageType,
listener MessageTypeReceiveMessage) error { listener MessageTypeReceiveMessage) error {
e.mux.Lock() e.mux.Lock()
...@@ -87,6 +113,9 @@ func (e *events) RegisterReceiveHandler(messageType MessageType, ...@@ -87,6 +113,9 @@ func (e *events) RegisterReceiveHandler(messageType MessageType,
return nil return nil
} }
// triggerEvent is an internal function which is used to trigger message
// reception on a message received from a user (symmetric encryption)
// It will call the appropriate MessageTypeHandler assuming one exists.
func (e *events) triggerEvent(chID *id.ID, umi *UserMessageInternal, func (e *events) triggerEvent(chID *id.ID, umi *UserMessageInternal,
receptionID receptionID.EphemeralIdentity, round rounds.Round) { receptionID receptionID.EphemeralIdentity, round rounds.Round) {
um := umi.GetUserMessage() um := umi.GetUserMessage()
...@@ -110,6 +139,9 @@ func (e *events) triggerEvent(chID *id.ID, umi *UserMessageInternal, ...@@ -110,6 +139,9 @@ func (e *events) triggerEvent(chID *id.ID, umi *UserMessageInternal,
return return
} }
// triggerAdminEvent is an internal function which is used to trigger message
// reception on a message received from the admin (asymmetric encryption)
// It will call the appropriate MessageTypeHandler assuming one exists.
func (e *events) triggerAdminEvent(chID *id.ID, cm *ChannelMessage, func (e *events) triggerAdminEvent(chID *id.ID, cm *ChannelMessage,
messageID cryptoChannel.MessageID, receptionID receptionID.EphemeralIdentity, round rounds.Round) { messageID cryptoChannel.MessageID, receptionID receptionID.EphemeralIdentity, round rounds.Round) {
messageType := MessageType(cm.PayloadType) messageType := MessageType(cm.PayloadType)
...@@ -131,6 +163,11 @@ func (e *events) triggerAdminEvent(chID *id.ID, cm *ChannelMessage, ...@@ -131,6 +163,11 @@ func (e *events) triggerAdminEvent(chID *id.ID, cm *ChannelMessage,
return return
} }
// receiveTextMessage is the internal function which handles the reception of
// text messages. It handles both messages and replies and calls the correct
// function on the event model.
// If the message has a reply but it is malformed, it will drop the reply and
// write to the log
func (e *events) receiveTextMessage(ChannelID *id.ID, func (e *events) receiveTextMessage(ChannelID *id.ID,
MessageID cryptoChannel.MessageID, messageType MessageType, MessageID cryptoChannel.MessageID, messageType MessageType,
SenderUsername string, Content []byte, timestamp time.Time, SenderUsername string, Content []byte, timestamp time.Time,
...@@ -165,6 +202,9 @@ func (e *events) receiveTextMessage(ChannelID *id.ID, ...@@ -165,6 +202,9 @@ func (e *events) receiveTextMessage(ChannelID *id.ID,
timestamp, lease, round) timestamp, lease, round)
} }
// receiveReaction is the internal function which handles the reception of
// Reactions.
// It does edge chaling
func (e *events) receiveReaction(ChannelID *id.ID, func (e *events) receiveReaction(ChannelID *id.ID,
MessageID cryptoChannel.MessageID, messageType MessageType, MessageID cryptoChannel.MessageID, messageType MessageType,
SenderUsername string, Content []byte, timestamp time.Time, SenderUsername string, Content []byte, timestamp time.Time,
......
...@@ -7,5 +7,5 @@ ...@@ -7,5 +7,5 @@
#/ LICENSE file // #/ LICENSE file //
#/////////////////////////////////////////////////////////////////////////////// #///////////////////////////////////////////////////////////////////////////////
protoc --go_out=paths=source_relative:. messages.proto protoc --go_out=paths=source_relative:. channelMessages.proto
protoc --go_out=paths=source_relative:. text.proto protoc --go_out=paths=source_relative:. text.proto
...@@ -68,7 +68,7 @@ func (m *manager) loadChannels() { ...@@ -68,7 +68,7 @@ func (m *manager) loadChannels() {
for i := range chList { for i := range chList {
jc, err := loadJoinedChannel(chList[i], m.kv, m.client, m.rng, m.name, jc, err := loadJoinedChannel(chList[i], m.kv, m.client, m.rng, m.name,
&m.events, m.broadcastMaker) m.events, m.broadcastMaker)
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to load channel %s: %+v", jww.FATAL.Panicf("Failed to load channel %s: %+v",
chList[i], err) chList[i], err)
...@@ -95,7 +95,7 @@ func (m *manager) addChannel(channel cryptoBroadcast.Channel) error { ...@@ -95,7 +95,7 @@ func (m *manager) addChannel(channel cryptoBroadcast.Channel) error {
//Connect to listeners //Connect to listeners
err = b.RegisterListener((&userListener{ err = b.RegisterListener((&userListener{
name: m.name, name: m.name,
events: &m.events, events: m.events,
chID: channel.ReceptionID, chID: channel.ReceptionID,
}).Listen, broadcast.Symmetric) }).Listen, broadcast.Symmetric)
if err != nil { if err != nil {
...@@ -104,7 +104,7 @@ func (m *manager) addChannel(channel cryptoBroadcast.Channel) error { ...@@ -104,7 +104,7 @@ func (m *manager) addChannel(channel cryptoBroadcast.Channel) error {
err = b.RegisterListener((&adminListener{ err = b.RegisterListener((&adminListener{
name: m.name, name: m.name,
events: &m.events, events: m.events,
chID: channel.ReceptionID, chID: channel.ReceptionID,
}).Listen, broadcast.Asymmetric) }).Listen, broadcast.Asymmetric)
if err != nil { if err != nil {
......
...@@ -21,15 +21,18 @@ type manager struct { ...@@ -21,15 +21,18 @@ type manager struct {
name NameService name NameService
//Events model //Events model
events *events
// make the function used to create broadcasts be a pointer so it // make the function used to create broadcasts be a pointer so it
// can be replaced in tests // can be replaced in tests
broadcastMaker broadcast.NewBroadcastChannelFunc broadcastMaker broadcast.NewBroadcastChannelFunc
} }
// NewManager Creates a new channel.Manager.
// Prefix's teh KV with the username so multiple instances for multiple
// users will not error.
func NewManager(kv *versioned.KV, client broadcast.Client, func NewManager(kv *versioned.KV, client broadcast.Client,
rng *fastRNG.StreamGenerator, name NameService) Manager { rng *fastRNG.StreamGenerator, name NameService, model EventModel) Manager {
//prefix the kv with the username so multiple can be run //prefix the kv with the username so multiple can be run
kv = kv.Prefix(name.GetUsername()) kv = kv.Prefix(name.GetUsername())
...@@ -39,15 +42,18 @@ func NewManager(kv *versioned.KV, client broadcast.Client, ...@@ -39,15 +42,18 @@ func NewManager(kv *versioned.KV, client broadcast.Client,
client: client, client: client,
rng: rng, rng: rng,
name: name, name: name,
events: events{},
broadcastMaker: broadcast.NewBroadcastChannel, broadcastMaker: broadcast.NewBroadcastChannel,
} }
m.events = initEvents(model)
m.loadChannels() m.loadChannels()
return &m return &m
} }
// JoinChannel joins the given channel. Will fail if the channel
// has already been joined.
func (m *manager) JoinChannel(channel cryptoBroadcast.Channel) error { func (m *manager) JoinChannel(channel cryptoBroadcast.Channel) error {
err := m.addChannel(channel) err := m.addChannel(channel)
if err != nil { if err != nil {
...@@ -57,6 +63,8 @@ func (m *manager) JoinChannel(channel cryptoBroadcast.Channel) error { ...@@ -57,6 +63,8 @@ func (m *manager) JoinChannel(channel cryptoBroadcast.Channel) error {
return nil return nil
} }
// LeaveChannel leaves the given channel. It will return an error
// if the channel was not previously joined.
func (m *manager) LeaveChannel(channelId *id.ID) error { func (m *manager) LeaveChannel(channelId *id.ID) error {
err := m.removeChannel(channelId) err := m.removeChannel(channelId)
if err != nil { if err != nil {
......
...@@ -16,6 +16,12 @@ const ( ...@@ -16,6 +16,12 @@ const (
cmixChannelReactionVersion = 0 cmixChannelReactionVersion = 0
) )
// SendGeneric is used to send a raw message over a channel. In general, it
// should be wrapped in a function which defines the wire protocol
// If the final message, before being sent over the wire, is too long, this will
// return an error. Due to the underlying encoding using compression, it isn't
// possible to define the largest payload that can be sent, but
// it will always be possible to send a payload of 802 bytes at minimum
func (m *manager) SendGeneric(channelID *id.ID, messageType MessageType, func (m *manager) SendGeneric(channelID *id.ID, messageType MessageType,
msg []byte, validUntil time.Duration, params cmix.CMIXParams) ( msg []byte, validUntil time.Duration, params cmix.CMIXParams) (
cryptoChannel.MessageID, id.Round, ephemeral.Id, error) { cryptoChannel.MessageID, id.Round, ephemeral.Id, error) {
...@@ -62,7 +68,7 @@ func (m *manager) SendGeneric(channelID *id.ID, messageType MessageType, ...@@ -62,7 +68,7 @@ func (m *manager) SendGeneric(channelID *id.ID, messageType MessageType,
Signature: messageSig, Signature: messageSig,
Username: m.name.GetUsername(), Username: m.name.GetUsername(),
ECCPublicKey: m.name.GetChannelPubkey(), ECCPublicKey: m.name.GetChannelPubkey(),
UsernameLease: unameLease.Unix(), UsernameLease: unameLease.UnixNano(),
} }
//Serialize the user message //Serialize the user message
...@@ -89,6 +95,11 @@ func (m *manager) SendGeneric(channelID *id.ID, messageType MessageType, ...@@ -89,6 +95,11 @@ func (m *manager) SendGeneric(channelID *id.ID, messageType MessageType,
return msgId, rid, ephid, err return msgId, rid, ephid, err
} }
// SendAdminGeneric is used to send a raw message over a channel encrypted
// with admin keys, identifying it as sent by the admin. In general, it
// should be wrapped in a function which defines the wire protocol
// If the final message, before being sent over the wire, is too long, this will
// return an error. The message must be at most 510 bytes long.
func (m *manager) SendAdminGeneric(privKey *rsa.PrivateKey, channelID *id.ID, func (m *manager) SendAdminGeneric(privKey *rsa.PrivateKey, channelID *id.ID,
msg []byte, validUntil time.Duration, messageType MessageType, msg []byte, validUntil time.Duration, messageType MessageType,
params cmix.CMIXParams) (cryptoChannel.MessageID, id.Round, ephemeral.Id, params cmix.CMIXParams) (cryptoChannel.MessageID, id.Round, ephemeral.Id,
...@@ -145,6 +156,10 @@ func (m *manager) SendAdminGeneric(privKey *rsa.PrivateKey, channelID *id.ID, ...@@ -145,6 +156,10 @@ func (m *manager) SendAdminGeneric(privKey *rsa.PrivateKey, channelID *id.ID,
return msgId, rid, ephid, err return msgId, rid, ephid, err
} }
// SendMessage is used to send a formatted message over a channel.
// Due to the underlying encoding using compression, it isn't
// possible to define the largest payload that can be sent, but
// it will always be possible to send a payload of 798 bytes at minimum
func (m *manager) SendMessage(channelID *id.ID, msg string, func (m *manager) SendMessage(channelID *id.ID, msg string,
validUntil time.Duration, params cmix.CMIXParams) ( validUntil time.Duration, params cmix.CMIXParams) (
cryptoChannel.MessageID, id.Round, ephemeral.Id, error) { cryptoChannel.MessageID, id.Round, ephemeral.Id, error) {
...@@ -162,6 +177,12 @@ func (m *manager) SendMessage(channelID *id.ID, msg string, ...@@ -162,6 +177,12 @@ func (m *manager) SendMessage(channelID *id.ID, msg string,
return m.SendGeneric(channelID, Text, txtMarshaled, validUntil, params) return m.SendGeneric(channelID, Text, txtMarshaled, validUntil, params)
} }
// SendReply is used to send a formatted message over a channel.
// Due to the underlying encoding using compression, it isn't
// possible to define the largest payload that can be sent, but
// it will always be possible to send a payload of 766 bytes at minimum.
// If the message ID the reply is sent to doesnt exist, the other side will
// post the message as a normal message and not a reply.
func (m *manager) SendReply(channelID *id.ID, msg string, func (m *manager) SendReply(channelID *id.ID, msg string,
replyTo cryptoChannel.MessageID, validUntil time.Duration, replyTo cryptoChannel.MessageID, validUntil time.Duration,
params cmix.CMIXParams) (cryptoChannel.MessageID, id.Round, ephemeral.Id, params cmix.CMIXParams) (cryptoChannel.MessageID, id.Round, ephemeral.Id,
...@@ -180,8 +201,12 @@ func (m *manager) SendReply(channelID *id.ID, msg string, ...@@ -180,8 +201,12 @@ func (m *manager) SendReply(channelID *id.ID, msg string,
return m.SendGeneric(channelID, Text, txtMarshaled, validUntil, params) return m.SendGeneric(channelID, Text, txtMarshaled, validUntil, params)
} }
// SendReaction is used to send a reaction to a message over a channel.
// The reaction must be a single emoji with no other characters, and will
// be rejected otherwise.
// Clients will drop the reaction if they do not recognize the reactTo message
func (m *manager) SendReaction(channelID *id.ID, reaction string, func (m *manager) SendReaction(channelID *id.ID, reaction string,
replyTo cryptoChannel.MessageID, validUntil time.Duration, reactTo cryptoChannel.MessageID, validUntil time.Duration,
params cmix.CMIXParams) (cryptoChannel.MessageID, id.Round, ephemeral.Id, params cmix.CMIXParams) (cryptoChannel.MessageID, id.Round, ephemeral.Id,
error) { error) {
...@@ -192,7 +217,7 @@ func (m *manager) SendReaction(channelID *id.ID, reaction string, ...@@ -192,7 +217,7 @@ func (m *manager) SendReaction(channelID *id.ID, reaction string,
react := &CMIXChannelReaction{ react := &CMIXChannelReaction{
Version: cmixChannelReactionVersion, Version: cmixChannelReactionVersion,
Reaction: reaction, Reaction: reaction,
ReactionMessageID: replyTo[:], ReactionMessageID: reactTo[:],
} }
reactMarshaled, err := proto.Marshal(react) reactMarshaled, err := proto.Marshal(react)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment