Skip to content
Snippets Groups Projects
Commit 1b813f5d authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

converted switchboard

parent 8304e368
No related branches found
No related tags found
3 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast
......@@ -9,14 +9,14 @@ package parse
import (
"encoding/binary"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/catalog"
"time"
)
// Sizes of message parts, in bytes.
const (
numPartsLen = 1
typeLen = message.TypeLen
typeLen = catalog.MessageTypeLen
timestampLen = 8
firstPartVerLen = 1
firstHeaderLen = headerLen + numPartsLen + typeLen + timestampLen + firstPartVerLen
......@@ -35,7 +35,7 @@ type firstMessagePart struct {
// newFirstMessagePart creates a new firstMessagePart for the passed in
// contents. Does no length checks.
func newFirstMessagePart(mt message.Type, id uint32, numParts uint8,
func newFirstMessagePart(mt catalog.MessageType, id uint32, numParts uint8,
timestamp time.Time, contents []byte) firstMessagePart {
// Create the message structure
......@@ -106,8 +106,8 @@ func firstMessagePartFromBytesVer0(data []byte) firstMessagePart {
}
// GetType returns the message type.
func (m firstMessagePart) GetType() message.Type {
return message.Type(binary.BigEndian.Uint32(m.Type))
func (m firstMessagePart) GetType() catalog.MessageType {
return catalog.MessageType(binary.BigEndian.Uint32(m.Type))
}
// GetNumParts returns the number of message parts.
......
......@@ -9,7 +9,7 @@ package parse
import (
"bytes"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/catalog"
"reflect"
"testing"
"time"
......@@ -36,7 +36,7 @@ var efmp = firstMessagePart{
// Test that newFirstMessagePart returns a correctly made firstMessagePart
func TestNewFirstMessagePart(t *testing.T) {
fmp := newFirstMessagePart(
message.XxMessage,
catalog.XxMessage,
1077,
2,
time.Unix(1609786229, 0).UTC(),
......@@ -67,8 +67,8 @@ func TestFirstMessagePartFromBytes(t *testing.T) {
// Test that GetType returns the correct type for a firstMessagePart
func TestFirstMessagePart_GetType(t *testing.T) {
if efmp.GetType() != message.XxMessage {
t.Errorf("Got %v, expected %v", efmp.GetType(), message.XxMessage)
if efmp.GetType() != catalog.XxMessage {
t.Errorf("Got %v, expected %v", efmp.GetType(), catalog.XxMessage)
}
}
......
......@@ -9,7 +9,8 @@ package parse
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/e2e/receive"
"gitlab.com/elixxir/client/storage"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime"
......@@ -39,7 +40,7 @@ func NewPartitioner(messageSize int, session *storage.Session) Partitioner {
return p
}
func (p Partitioner) Partition(recipient *id.ID, mt message.Type,
func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType,
timestamp time.Time, payload []byte) ([][]byte, uint64, error) {
if len(payload) > p.maxSize {
......@@ -69,8 +70,8 @@ func (p Partitioner) Partition(recipient *id.ID, mt message.Type,
return parts, fullMessageID, nil
}
func (p Partitioner) HandlePartition(sender *id.ID, _ message.EncryptionType,
contents []byte, relationshipFingerprint []byte) (message.Receive, bool) {
func (p Partitioner) HandlePartition(sender *id.ID,
contents []byte, relationshipFingerprint []byte) (receive.Message, bool) {
if isFirst(contents) {
// If it is the first message in a set, then handle it as so
......
......@@ -5,15 +5,15 @@
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package switchboard
package receive
import (
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/catalog"
"gitlab.com/xx_network/primitives/id"
)
// ID to respond to any message type
const AnyType = message.NoType
const AnyType = catalog.NoType
//ID to respond to any user
func AnyUser() *id.ID {
......
......@@ -5,7 +5,7 @@
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package switchboard
package receive
import (
"gitlab.com/xx_network/primitives/id"
......
......@@ -5,7 +5,7 @@
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package switchboard
package receive
import (
"github.com/golang-collections/collections/set"
......
......@@ -5,7 +5,7 @@
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package switchboard
package receive
import (
"github.com/golang-collections/collections/set"
......
......@@ -5,15 +5,15 @@
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package switchboard
package receive
import (
"github.com/golang-collections/collections/set"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/catalog"
)
type byType struct {
list map[message.Type]*set.Set
list map[catalog.MessageType]*set.Set
generic *set.Set
}
......@@ -21,7 +21,7 @@ type byType struct {
// registers an AnyType as generic
func newByType() *byType {
bt := &byType{
list: make(map[message.Type]*set.Set),
list: make(map[catalog.MessageType]*set.Set),
generic: set.New(),
}
......@@ -34,7 +34,7 @@ func newByType() *byType {
// returns a set associated with the passed messageType unioned with the
// generic return
func (bt *byType) Get(messageType message.Type) *set.Set {
func (bt *byType) Get(messageType catalog.MessageType) *set.Set {
lookup, ok := bt.list[messageType]
if !ok {
return bt.generic
......@@ -45,7 +45,7 @@ func (bt *byType) Get(messageType message.Type) *set.Set {
// adds a listener to a set for the given messageType. Creates a new set to add
// it to if the set does not exist
func (bt *byType) Add(messageType message.Type, r Listener) *set.Set {
func (bt *byType) Add(messageType catalog.MessageType, r Listener) *set.Set {
s, ok := bt.list[messageType]
if !ok {
s = set.New(r)
......@@ -59,7 +59,7 @@ func (bt *byType) Add(messageType message.Type, r Listener) *set.Set {
// Removes the passed listener from the set for messageType and
// deletes the set if it is empty and the type is not AnyType
func (bt *byType) Remove(mt message.Type, l Listener) {
func (bt *byType) Remove(mt catalog.MessageType, l Listener) {
s, ok := bt.list[mt]
if ok {
s.Remove(l)
......
......@@ -5,11 +5,11 @@
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package switchboard
package receive
import (
"github.com/golang-collections/collections/set"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/catalog"
"testing"
)
......@@ -43,7 +43,7 @@ func TestByType_Get_Empty(t *testing.T) {
func TestByType_Get_Selected(t *testing.T) {
nbt := newByType()
m := message.Type(42)
m := catalog.MessageType(42)
set1 := set.New(0)
......@@ -63,7 +63,7 @@ func TestByType_Get_Selected(t *testing.T) {
func TestByType_Get_Generic(t *testing.T) {
nbt := newByType()
m := message.Type(42)
m := catalog.MessageType(42)
nbt.generic.Insert(0)
......@@ -81,7 +81,7 @@ func TestByType_Get_Generic(t *testing.T) {
func TestByType_Get_GenericSelected(t *testing.T) {
nbt := newByType()
m := message.Type(42)
m := catalog.MessageType(42)
nbt.generic.Insert(1)
......@@ -106,7 +106,7 @@ func TestByType_Get_GenericSelected(t *testing.T) {
func TestByType_Add_New(t *testing.T) {
nbt := newByType()
m := message.Type(42)
m := catalog.MessageType(42)
l := &funcListener{}
......@@ -128,7 +128,7 @@ func TestByType_Add_New(t *testing.T) {
func TestByType_Add_Old(t *testing.T) {
nbt := newByType()
m := message.Type(42)
m := catalog.MessageType(42)
l1 := &funcListener{}
l2 := &funcListener{}
......@@ -179,7 +179,7 @@ func TestByType_Add_Generic(t *testing.T) {
func TestByType_Remove_SingleInSet(t *testing.T) {
nbt := newByType()
m := message.Type(42)
m := catalog.MessageType(42)
l1 := &funcListener{}
......
......@@ -5,47 +5,47 @@
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package switchboard
package receive
import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/catalog"
"gitlab.com/xx_network/primitives/id"
)
//interface for a listener adhere to
//Listener interface for a listener adhere to
type Listener interface {
// the Hear function is called to exercise the listener, passing in the
// data as an item
Hear(item message.Receive)
Hear(item Message)
// Returns a name, used for debugging
Name() string
}
// This function type defines callbacks that get passed when the listener is
// ListenerFunc This function type defines callbacks that get passed when the listener is
// listened to. It will always be called in its own goroutine. It may be called
// multiple times simultaneously
type ListenerFunc func(item message.Receive)
type ListenerFunc func(item Message)
// id object returned when a listener is created and is used to delete it from
// ListenerID id object returned when a listener is created and is used to delete it from
// the system
type ListenerID struct {
userID *id.ID
messageType message.Type
messageType catalog.MessageType
listener Listener
}
//getter for userID
// GetUserID getter for userID
func (lid ListenerID) GetUserID() *id.ID {
return lid.userID
}
//getter for message type
func (lid ListenerID) GetMessageType() message.Type {
// GetMessageType getter for message type
func (lid ListenerID) GetMessageType() catalog.MessageType {
return lid.messageType
}
//getter for name
// GetName getter for name
func (lid ListenerID) GetName() string {
return lid.listener.Name()
}
......@@ -58,7 +58,7 @@ type funcListener struct {
name string
}
// creates a new FuncListener Adhereing to the listener interface out of the
// newFuncListener creates a new FuncListener Adhereing to the listener interface out of the
// passed function and name, returns a pointer to the result
func newFuncListener(listener ListenerFunc, name string) *funcListener {
return &funcListener{
......@@ -69,7 +69,7 @@ func newFuncListener(listener ListenerFunc, name string) *funcListener {
// Adheres to the Hear function of the listener interface, calls the internal
// function with the passed item
func (fl *funcListener) Hear(item message.Receive) {
func (fl *funcListener) Hear(item Message) {
fl.listener(item)
}
......@@ -81,13 +81,13 @@ func (fl *funcListener) Name() string {
//listener based off of a channel
type chanListener struct {
listener chan message.Receive
listener chan Message
name string
}
// creates a new ChanListener Adhereing to the listener interface out of the
// passed channel and name, returns a pointer to the result
func newChanListener(listener chan message.Receive, name string) *chanListener {
func newChanListener(listener chan Message, name string) *chanListener {
return &chanListener{
listener: listener,
name: name,
......@@ -97,7 +97,7 @@ func newChanListener(listener chan message.Receive, name string) *chanListener {
// Adheres to the Hear function of the listener interface, calls the passed the
// heard item across the channel. Drops the item if it cannot put it into the
// channel immediately
func (cl *chanListener) Hear(item message.Receive) {
func (cl *chanListener) Hear(item Message) {
select {
case cl.listener <- item:
default:
......
......@@ -5,10 +5,9 @@
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package switchboard
package receive
import (
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/xx_network/primitives/id"
"reflect"
"testing"
......@@ -64,7 +63,7 @@ func TestListenerID_GetName(t *testing.T) {
//tests new function listener creates the funcListener properly
func TestNewFuncListener(t *testing.T) {
f := func(item message.Receive) {}
f := func(item Message) {}
name := "test"
listener := newFuncListener(f, name)
......@@ -79,15 +78,15 @@ func TestNewFuncListener(t *testing.T) {
//tests FuncListener Hear works
func TestFuncListener_Hear(t *testing.T) {
m := message.Receive{
m := Message{
Payload: []byte{0, 1, 2, 3},
Sender: id.NewIdFromUInt(42, id.User, t),
MessageType: 69,
}
heard := make(chan message.Receive, 1)
heard := make(chan Message, 1)
f := func(item message.Receive) {
f := func(item Message) {
heard <- item
}
......@@ -117,7 +116,7 @@ func TestFuncListener_Name(t *testing.T) {
//tests new chan listener creates the chanListener properly
func TestNewChanListener(t *testing.T) {
c := make(chan message.Receive)
c := make(chan Message)
name := "test"
listener := newChanListener(c, name)
......@@ -132,13 +131,13 @@ func TestNewChanListener(t *testing.T) {
//tests ChanListener Hear works
func TestChanListener_Hear(t *testing.T) {
m := message.Receive{
m := Message{
Payload: []byte{0, 1, 2, 3},
Sender: id.NewIdFromUInt(42, id.User, t),
MessageType: 69,
}
heard := make(chan message.Receive, 1)
heard := make(chan Message, 1)
listener := newChanListener(heard, "test")
......
package switchboard
package receive
import (
"gitlab.com/elixxir/client/catalog"
"gitlab.com/elixxir/client/network/historical"
"gitlab.com/elixxir/crypto/e2e"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"time"
)
type Receive struct {
type Message struct {
MessageType catalog.MessageType
ID e2e.MessageID
Payload []byte
......@@ -16,8 +17,9 @@ type Receive struct {
Sender *id.ID
RecipientID *id.ID
EphemeralID ephemeral.Id
RoundId id.Round
RoundTimestamp time.Time
Timestamp time.Time // Message timestamp of when the user sent
Encrypted bool
Round historical.Round
}
......@@ -5,12 +5,12 @@
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package switchboard
package receive
import (
"github.com/golang-collections/collections/set"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/catalog"
"gitlab.com/xx_network/primitives/id"
"sync"
)
......@@ -38,13 +38,13 @@ func New() *Switchboard {
// user: 0 for all, or any user ID to listen for messages from a particular
// user. 0 can be id.ZeroUser or id.ZeroID
// messageType: 0 for all, or any message type to listen for messages of that
// type. 0 can be switchboard.AnyType
// type. 0 can be Message.AnyType
// newListener: something implementing the Listener interface. Do not
// pass nil to this.
//
// If a message matches multiple listeners, all of them will hear the message.
func (sw *Switchboard) RegisterListener(user *id.ID, messageType message.Type,
newListener Listener) ListenerID {
func (sw *Switchboard) RegisterListener(user *id.ID,
messageType catalog.MessageType, newListener Listener) ListenerID {
// check the input data is valid
if user == nil {
......@@ -80,13 +80,13 @@ func (sw *Switchboard) RegisterListener(user *id.ID, messageType message.Type,
// user: 0 for all, or any user ID to listen for messages from a particular
// user. 0 can be id.ZeroUser or id.ZeroID
// messageType: 0 for all, or any message type to listen for messages of that
// type. 0 can be switchboard.AnyType
// type. 0 can be Message.AnyType
// newListener: a function implementing the ListenerFunc function type.
// Do not pass nil to this.
//
// If a message matches multiple listeners, all of them will hear the message.
func (sw *Switchboard) RegisterFunc(name string, user *id.ID,
messageType message.Type, newListener ListenerFunc) ListenerID {
messageType catalog.MessageType, newListener ListenerFunc) ListenerID {
// check that the input data is valid
if newListener == nil {
jww.FATAL.Panicf("cannot register function listener '%s' "+
......@@ -109,13 +109,13 @@ func (sw *Switchboard) RegisterFunc(name string, user *id.ID,
// user: 0 for all, or any user ID to listen for messages from a particular
// user. 0 can be id.ZeroUser or id.ZeroID
// messageType: 0 for all, or any message type to listen for messages of that
// type. 0 can be switchboard.AnyType
// type. 0 can be Message.AnyType
// newListener: an item channel.
// Do not pass nil to this.
//
// If a message matches multiple listeners, all of them will hear the message.
func (sw *Switchboard) RegisterChannel(name string, user *id.ID,
messageType message.Type, newListener chan message.Receive) ListenerID {
messageType catalog.MessageType, newListener chan Message) ListenerID {
// check that the input data is valid
if newListener == nil {
jww.FATAL.Panicf("cannot register channel listener '%s' with"+
......@@ -131,7 +131,7 @@ func (sw *Switchboard) RegisterChannel(name string, user *id.ID,
// Speak broadcasts a message to the appropriate listeners.
// each is spoken to in their own goroutine
func (sw *Switchboard) Speak(item message.Receive) {
func (sw *Switchboard) Speak(item Message) {
sw.mux.RLock()
defer sw.mux.RUnlock()
......@@ -166,7 +166,7 @@ func (sw *Switchboard) Unregister(listenerID ListenerID) {
// finds all listeners who match the items sender or ID, or have those fields
// as generic
func (sw *Switchboard) matchListeners(item message.Receive) *set.Set {
func (sw *Switchboard) matchListeners(item Message) *set.Set {
idSet := sw.id.Get(item.Sender)
typeSet := sw.messageType.Get(item.MessageType)
return idSet.Intersection(typeSet)
......
......@@ -5,7 +5,7 @@
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package switchboard
package receive
import (
"gitlab.com/elixxir/client/interfaces/message"
......
......@@ -24,7 +24,7 @@ type Switchboard interface {
// user: 0 for all, or any user ID to listen for messages from a particular
// user. 0 can be id.ZeroUser or id.ZeroID
// messageType: 0 for all, or any message type to listen for messages of
// that type. 0 can be switchboard.AnyType
// that type. 0 can be Receive.AnyType
// newListener: something implementing the Listener interface. Do not
// pass nil to this.
//
......@@ -42,7 +42,7 @@ type Switchboard interface {
// user: 0 for all, or any user ID to listen for messages from a particular
// user. 0 can be id.ZeroUser or id.ZeroID
// messageType: 0 for all, or any message type to listen for messages of
// that type. 0 can be switchboard.AnyType
// that type. 0 can be Receive.AnyType
// newListener: a function implementing the ListenerFunc function type.
// Do not pass nil to this.
//
......@@ -60,7 +60,7 @@ type Switchboard interface {
// user: 0 for all, or any user ID to listen for messages from a particular
// user. 0 can be id.ZeroUser or id.ZeroID
// messageType: 0 for all, or any message type to listen for messages of
// that type. 0 can be switchboard.AnyType
// that type. 0 can be Receive.AnyType
// newListener: an item channel.
// Do not pass nil to this.
//
......
package historical
import (
jww "github.com/spf13/jwalterweatherman"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/comms/connect"
......@@ -99,3 +100,27 @@ func MakeRound(ri *pb.RoundInfo) Round {
Raw: ri,
}
}
// GetEndTimestamp Returns the timestamp of the last known event,
// which is generally the state unless in queued, which stores the next event
func (r Round) GetEndTimestamp() time.Time {
switch r.State {
case states.PENDING:
return r.Timestamps[states.PENDING]
case states.PRECOMPUTING:
return r.Timestamps[states.PRECOMPUTING]
case states.STANDBY:
return r.Timestamps[states.STANDBY]
case states.QUEUED:
return r.Timestamps[states.STANDBY]
case states.COMPLETED:
return r.Timestamps[states.COMPLETED]
case states.FAILED:
return r.Timestamps[states.FAILED]
default:
jww.FATAL.Panicf("Could not get final timestamp of round, "+
"invalid state: %s", r.State)
}
//unreachable
return time.Time{}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment