diff --git a/api/client.go b/api/client.go index 79422ce399150762775a90041929252ae387f0b1..9b640a694737924f0897b31231b2c33fe7df4ff3 100644 --- a/api/client.go +++ b/api/client.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/auth" + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/preimage" @@ -69,7 +70,7 @@ type Client struct { clientErrorChannel chan interfaces.ClientError // Event reporting in event.go - events *eventManager + events *event.eventManager // Handles the triggering and delivery of backups backup *interfaces.BackupContainer @@ -242,7 +243,7 @@ func OpenClient(storageDir string, password []byte, parameters params.Network) ( followerServices: newServices(), parameters: parameters, clientErrorChannel: make(chan interfaces.ClientError, 1000), - events: newEventManager(), + events: event.newEventManager(), backup: &interfaces.BackupContainer{}, } diff --git a/api/event.go b/api/event.go index c5fcf03a90842095990568893051afea5a7e6cee..52fa3c1708dce3a5cd074d1a4030859219be7760 100644 --- a/api/event.go +++ b/api/event.go @@ -1,114 +1,9 @@ -/////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 xx network SEZC // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file // -/////////////////////////////////////////////////////////////////////////////// - package api import ( - "fmt" - "github.com/pkg/errors" - jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/interfaces" - "gitlab.com/elixxir/client/stoppable" - "sync" + "gitlab.com/elixxir/client/event" ) -// ReportableEvent is used to surface events to client users. -type reportableEvent struct { - Priority int - Category string - EventType string - Details string -} - -// String stringer interace implementation -func (e reportableEvent) String() string { - return fmt.Sprintf("Event(%d, %s, %s, %s)", e.Priority, e.Category, - e.EventType, e.Details) -} - -// Holds state for the event reporting system -type eventManager struct { - eventCh chan reportableEvent - eventCbs sync.Map -} - -func newEventManager() *eventManager { - return &eventManager{ - eventCh: make(chan reportableEvent, 1000), - } -} - -// Report reports an event from the client to api users, providing a -// priority, category, eventType, and details -func (e *eventManager) Report(priority int, category, evtType, details string) { - re := reportableEvent{ - Priority: priority, - Category: category, - EventType: evtType, - Details: details, - } - select { - case e.eventCh <- re: - jww.TRACE.Printf("Event reported: %s", re) - default: - jww.ERROR.Printf("Event Queue full, unable to report: %s", re) - } -} - -// RegisterEventCallback records the given function to receive -// ReportableEvent objects. It returns the internal index -// of the callback so that it can be deleted later. -func (e *eventManager) RegisterEventCallback(name string, - myFunc interfaces.EventCallbackFunction) error { - _, existsAlready := e.eventCbs.LoadOrStore(name, myFunc) - if existsAlready { - return errors.Errorf("Key %s already exists as event callback", - name) - } - return nil -} - -// UnregisterEventCallback deletes the callback identified by the -// index. It returns an error if it fails. -func (e *eventManager) UnregisterEventCallback(name string) { - e.eventCbs.Delete(name) -} - -func (e *eventManager) eventService() (stoppable.Stoppable, error) { - stop := stoppable.NewSingle("EventReporting") - go e.reportEventsHandler(stop) - return stop, nil -} - -// reportEventsHandler reports events to every registered event callback -func (e *eventManager) reportEventsHandler(stop *stoppable.Single) { - jww.DEBUG.Print("reportEventsHandler routine started") - for { - select { - case <-stop.Quit(): - jww.DEBUG.Printf("Stopping reportEventsHandler") - stop.ToStopped() - return - case evt := <-e.eventCh: - jww.TRACE.Printf("Received event: %s", evt) - // NOTE: We could call each in a routine but decided - // against it. It's the users responsibility not to let - // the event queue explode. The API will report errors - // in the logging any time the event queue gets full. - e.eventCbs.Range(func(name, myFunc interface{}) bool { - f := myFunc.(interfaces.EventCallbackFunction) - f(evt.Priority, evt.Category, evt.EventType, - evt.Details) - return true - }) - } - } -} - // ReportEvent reports an event from the client to api users, providing a // priority, category, eventType, and details func (c *Client) ReportEvent(priority int, category, evtType, details string) { @@ -119,7 +14,7 @@ func (c *Client) ReportEvent(priority int, category, evtType, details string) { // ReportableEvent objects. It returns the internal index // of the callback so that it can be deleted later. func (c *Client) RegisterEventCallback(name string, - myFunc interfaces.EventCallbackFunction) error { + myFunc event.Callback) error { return c.events.RegisterEventCallback(name, myFunc) } diff --git a/api/utilsInterfaces_test.go b/api/utilsInterfaces_test.go index 3bb409ceea106d0881ff3186bad4e40ab8ce7b71..01d2f44b4ab200608f6f22ebb65f03df01e0afbf 100644 --- a/api/utilsInterfaces_test.go +++ b/api/utilsInterfaces_test.go @@ -7,6 +7,7 @@ package api import ( + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" @@ -86,7 +87,7 @@ type testNetworkManagerGeneric struct { type dummyEventMgr struct{} func (d *dummyEventMgr) Report(p int, a, b, c string) {} -func (t *testNetworkManagerGeneric) GetEventManager() interfaces.EventManager { +func (t *testNetworkManagerGeneric) GetEventManager() event.Manager { return &dummyEventMgr{} } diff --git a/dummy/utils_test.go b/dummy/utils_test.go index 48bff1261649af5948e6945d2f3597183a6834c6..8a0b0ab9786b7f527fb1fd767fe3d2d6c2d6640d 100644 --- a/dummy/utils_test.go +++ b/dummy/utils_test.go @@ -9,6 +9,7 @@ package dummy import ( "github.com/pkg/errors" + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" @@ -142,7 +143,7 @@ func (tnm *testNetworkManager) SendManyCMIX([]message.TargetedCmixMessage, param type dummyEventMgr struct{} func (d *dummyEventMgr) Report(int, string, string, string) {} -func (tnm *testNetworkManager) GetEventManager() interfaces.EventManager { +func (tnm *testNetworkManager) GetEventManager() event.Manager { return &dummyEventMgr{} } diff --git a/event/event.go b/event/event.go new file mode 100644 index 0000000000000000000000000000000000000000..4a15a56fb8aea3968f2d39ff1592d749f8427ee5 --- /dev/null +++ b/event/event.go @@ -0,0 +1,109 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package event + +import ( + "fmt" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/stoppable" + "sync" +) + +// ReportableEvent is used to surface events to client users. +type reportableEvent struct { + Priority int + Category string + EventType string + Details string +} + +// String stringer interace implementation +func (e reportableEvent) String() string { + return fmt.Sprintf("Event(%d, %s, %s, %s)", e.Priority, e.Category, + e.EventType, e.Details) +} + +// Holds state for the event reporting system +type eventManager struct { + eventCh chan reportableEvent + eventCbs sync.Map +} + +func newEventManager() *eventManager { + return &eventManager{ + eventCh: make(chan reportableEvent, 1000), + } +} + +// Report reports an event from the client to api users, providing a +// priority, category, eventType, and details +func (e *eventManager) Report(priority int, category, evtType, details string) { + re := reportableEvent{ + Priority: priority, + Category: category, + EventType: evtType, + Details: details, + } + select { + case e.eventCh <- re: + jww.TRACE.Printf("Event reported: %s", re) + default: + jww.ERROR.Printf("Event Queue full, unable to report: %s", re) + } +} + +// RegisterEventCallback records the given function to receive +// ReportableEvent objects. It returns the internal index +// of the callback so that it can be deleted later. +func (e *eventManager) RegisterEventCallback(name string, + myFunc Callback) error { + _, existsAlready := e.eventCbs.LoadOrStore(name, myFunc) + if existsAlready { + return errors.Errorf("Key %s already exists as event callback", + name) + } + return nil +} + +// UnregisterEventCallback deletes the callback identified by the +// index. It returns an error if it fails. +func (e *eventManager) UnregisterEventCallback(name string) { + e.eventCbs.Delete(name) +} + +func (e *eventManager) eventService() (stoppable.Stoppable, error) { + stop := stoppable.NewSingle("EventReporting") + go e.reportEventsHandler(stop) + return stop, nil +} + +// reportEventsHandler reports events to every registered event callback +func (e *eventManager) reportEventsHandler(stop *stoppable.Single) { + jww.DEBUG.Print("reportEventsHandler routine started") + for { + select { + case <-stop.Quit(): + jww.DEBUG.Printf("Stopping reportEventsHandler") + stop.ToStopped() + return + case evt := <-e.eventCh: + jww.TRACE.Printf("Received event: %s", evt) + // NOTE: We could call each in a routine but decided + // against it. It's the users responsibility not to let + // the event queue explode. The API will report errors + // in the logging any time the event queue gets full. + e.eventCbs.Range(func(name, myFunc interface{}) bool { + f := myFunc.(Callback) + f(evt.Priority, evt.Category, evt.EventType, + evt.Details) + return true + }) + } + } +} diff --git a/api/event_test.go b/event/event_test.go similarity index 99% rename from api/event_test.go rename to event/event_test.go index 775d2a6e6609009be6082cb29bdd73dba0a880ee..8be16a67ee3526b619907214953b277c8cc6226f 100644 --- a/api/event_test.go +++ b/event/event_test.go @@ -5,7 +5,7 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package api +package event import ( "testing" diff --git a/interfaces/event.go b/event/interface.go similarity index 67% rename from interfaces/event.go rename to event/interface.go index f08ff547e637bf4b0fccde6d166d7e8cf21298d8..47e8d9795d7cc63d30b04715b92f0ec842479cf5 100644 --- a/interfaces/event.go +++ b/event/interface.go @@ -5,12 +5,12 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package interfaces +package event -// EventCallbackFunction defines the callback functions for client event reports -type EventCallbackFunction func(priority int, category, evtType, details string) +// Callback defines the callback functions for client event reports +type Callback func(priority int, category, evtType, details string) -// EventManager reporting api (used internally) -type EventManager interface { +// Manager reporting api (used internally) +type Manager interface { Report(priority int, category, evtType, details string) } diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index a982d3ba226565db9e4886132f164998560826a6..6065e0441d183908f661948dd86bcfd20d7734c4 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -13,6 +13,7 @@ import ( "github.com/cloudflare/circl/dh/sidh" "github.com/pkg/errors" "gitlab.com/elixxir/client/api" + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" @@ -510,7 +511,7 @@ func (tnm *testNetworkManager) SendManyCMIX(messages []message.TargetedCmixMessa type dummyEventMgr struct{} func (d *dummyEventMgr) Report(int, string, string, string) {} -func (tnm *testNetworkManager) GetEventManager() interfaces.EventManager { +func (tnm *testNetworkManager) GetEventManager() event.Manager { return &dummyEventMgr{} } diff --git a/groupChat/utils_test.go b/groupChat/utils_test.go index 612ad97f0c6831cc01632b71217346f4ca0532eb..b19c697d80fb77ff06917593471e87a85d338521 100644 --- a/groupChat/utils_test.go +++ b/groupChat/utils_test.go @@ -10,6 +10,7 @@ package groupChat import ( "encoding/base64" "github.com/pkg/errors" + "gitlab.com/elixxir/client/event" gs "gitlab.com/elixxir/client/groupChat/groupStore" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" @@ -292,7 +293,7 @@ func (tnm *testNetworkManager) SendManyCMIX( type dummyEventMgr struct{} func (d *dummyEventMgr) Report(int, string, string, string) {} -func (tnm *testNetworkManager) GetEventManager() interfaces.EventManager { +func (tnm *testNetworkManager) GetEventManager() event.Manager { return &dummyEventMgr{} } diff --git a/interfaces/bloom.go b/interfaces/bloom.go deleted file mode 100644 index 3b5a002e0b5d9b48a7512d3f861009c7ad7dc109..0000000000000000000000000000000000000000 --- a/interfaces/bloom.go +++ /dev/null @@ -1,4 +0,0 @@ -package interfaces - -const BloomFilterSize = 648 // In Bits -const BloomFilterHashes = 10 diff --git a/interfaces/message/encryptionType.go b/interfaces/message/encryptionType.go deleted file mode 100644 index cdd86c06af973526a5589d8073664af6c75762ff..0000000000000000000000000000000000000000 --- a/interfaces/message/encryptionType.go +++ /dev/null @@ -1,26 +0,0 @@ -/////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 xx network SEZC // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file // -/////////////////////////////////////////////////////////////////////////////// - -package message - -type EncryptionType uint8 - -const ( - None EncryptionType = 0 - E2E EncryptionType = 1 -) - -func (et EncryptionType) String() string { - switch et { - case None: - return "None" - case E2E: - return "E2E" - default: - return "Unknown" - } -} diff --git a/interfaces/message/sendMany.go b/interfaces/message/sendMany.go deleted file mode 100644 index a14dcef0a10203424e4fb21fdca342767988aef1..0000000000000000000000000000000000000000 --- a/interfaces/message/sendMany.go +++ /dev/null @@ -1,20 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 xx network SEZC // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file // -//////////////////////////////////////////////////////////////////////////////// - -package message - -import ( - "gitlab.com/elixxir/primitives/format" - "gitlab.com/xx_network/primitives/id" -) - -// TargetedCmixMessage defines a recipient target pair in a sendMany cMix -// message. -type TargetedCmixMessage struct { - Recipient *id.ID - Message format.Message -} diff --git a/interfaces/networkManager.go b/interfaces/networkManager.go index 1fa1d5d3d06772c8261146b8a5a76d8ef644fded..e39af97f8e2380b6aced6dcbd01ad7ec0d612b9e 100644 --- a/interfaces/networkManager.go +++ b/interfaces/networkManager.go @@ -5,20 +5,18 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -/*===Sending==============================================================*/ - package interfaces import ( - "gitlab.com/elixxir/client/network/gateway" + "gitlab.com/elixxir/comms/network" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/primitives/ndf" "time" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/comms/mixmessages" - "gitlab.com/elixxir/comms/network" - "gitlab.com/elixxir/crypto/e2e" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" @@ -31,69 +29,70 @@ type NetworkManager interface { // Only one follower may run at a time. Follow(report ClientErrorReport) (stoppable.Stoppable, error) + /*===Sending==============================================================*/ + // SendCMIX sends a "raw" CMIX message payload to the provided recipient. // Returns the round ID of the round the payload was sent or an error // if it fails. - SendCMIX(message format.Message, recipient *id.ID, p params.CMIX) (id.Round, ephemeral.Id, error) - SendManyCMIX(messages []message.TargetedCmixMessage, p params.CMIX) (id.Round, []ephemeral.Id, error) + SendCMIX(message format.Message, recipient *id.ID, p params.CMIX) ( + id.Round, ephemeral.Id, error) - /*===Accessors============================================================*/ - /* Accessors */ - // GetInstance returns the network instance object, which tracks the - // state of the network - GetInstance() *network.Instance - - // GetInstance returns the health tracker, which using a polling or - // event api lets you determine if network following is functioning - GetHealthTracker() HealthTracker + // SendManyCMIX sends many "raw" cMix message payloads to each of the provided + // recipients. Used to send messages in group chats. Metadata is NOT as well + // protected with this call and can leak data about yourself. Should be + // replaced with multiple uses of SendCmix in most cases. Returns the round + // ID of the round the payload was sent or an error if it fails. + // WARNING: Potentially Unsafe + SendManyCMIX(messages []message.TargetedCmixMessage, p params.CMIX) ( + id.Round, []ephemeral.Id, error) - // GetVerboseRounds returns stringification of verbose round info - GetVerboseRounds() string - - // SetPoolFilter sets the filter used to filter gateway IDs. - // allows you to disable proxying through certain gateways - SetPoolFilter(f gateway.Filter) - - /* Message Receiving */ + /*===Message Reception====================================================*/ /* Identities are all network identites which the client is currently - trying to pick up message on. Each identity has a default trigger - pickup that it will receive on, but this default is generally - low privacy and an alternative should be used in most cases. An identity must be added + trying to pick up message on. An identity must be added to receive messages, fake ones will be used to poll the network - if none are present. */ + if none are present. On creation of the network handler, the identity in + session storage will be automatically added*/ // AddIdentity adds an identity to be tracked - // and Identity is Defined by a source ID and a current EphemeralID - // In its IdentityParams, paremeters describing the properties - // of the identity as well as how long it will last are described + // If persistent is false, the identity will not be stored to disk and + // will be dropped on reload. AddIdentity(id *id.ID, validUntil time.Time, persistent bool) error // RemoveIdentity removes a currently tracked identity. RemoveIdentity(id *id.ID) - /* Fingerprints are the primary mechanisim of identifying a picked up message over - cMix. They are a unique one time use 255 bit vector generally - assoceated with a specific encryption key, but can be used for an alternative proptocol. - When registering a fingeprprint, a MessageProcessorFP is registered to handle the message. - The */ + /* Fingerprints are the primary mechanism of identifying a picked up + message over cMix. They are a unique one time use 255 bit vector generally + associated with a specific encryption key, but can be used for an + alternative protocol.When registering a fingerprint, a MessageProcessor + is registered to handle the message.*/ + + // AddFingerprint - Adds a fingerprint which will be handled by a + // specific processor for messages received by the given identity + AddFingerprint(identity *id.ID, fingerprint format.Fingerprint, + mp MessageProcessor) error - //AddFingerprint - Adds a fingerprint which will be handled by a specific processor - AddFingerprint(fingerprint format.Fingerprint, processor MessageProcessor) - RemoveFingerprint(fingerprint format.Fingerprint) - RemoveFingerprints(fingerprints []format.Fingerprint) - CheckFingerprint(fingerprint format.Fingerprint) bool + // DeleteFingerprint deletes a single fingerprint associated with the given + // identity if it exists - /* trigger - predefined hash based tags appended to all cmix messages + DeleteFingerprint(identity *id.ID, fingerprint format.Fingerprint) + // DeleteClientFingerprints deletes al fingerprint associated with the given + // identity if it exists + DeleteClientFingerprints(identity *id.ID) + + /* trigger - predefined hash based tags appended to all cMix messages which, though trial hashing, are used to determine if a message applies to this client - Triggers are used for 2 purposes - can be processed by the notifications system, - or can be used to implement custom non fingerprint processing of payloads. - I.E. key negotiation, broadcast negotiation + Triggers are used for 2 purposes - They can be processed by the + notifications system, or can be used to implement custom non fingerprint + processing of payloads. I.E. key negotiation, broadcast negotiation - A tag is appended to the message of the format tag = H(H(messageContents),preimage) - and trial hashing is used to determine if a message adheres to a tag. - WARNING: If a preiamge is known by an adversary, they can determine which messages - are for the client. + A tag is appended to the message of the format tag = H(H(messageContents), + preimage) and trial hashing is used to determine if a message adheres to a + tag. + WARNING: If a preimage is known by an adversary, they can determine which + messages are for the client on reception (which is normally hidden due to + collision between ephemeral IDs. Due to the extra overhead of trial hashing, triggers are processed after fingerprints. If a fingerprint match occurs on the message, triggers will not be handled. @@ -102,27 +101,95 @@ type NetworkManager interface { re-added before StartNetworkFollower is called. */ - // AddTrigger - Adds a trigger which can call a message - // handing function or be used for notifications. - // Multiple triggers can be registered for the same preimage. + // AddTrigger - Adds a trigger which can call a message handing function or + // be used for notifications. Multiple triggers can be registered for the + // same preimage. // preimage - the preimage which is triggered on // type - a descriptive string of the trigger. Generally used in notifications // source - a byte buffer of related data. Generally used in notifications. // Example: Sender ID - AddTrigger(trigger Trigger, response MessageProcessor) error + AddTrigger(identity *id.ID, newTrigger Trigger, response MessageProcessor) + + // DeleteTrigger - If only a single response is associated with the + // preimage, the entire preimage is removed. If there is more than one + // response, only the given response is removed if nil is passed in for + // response, all triggers for the preimage will be removed + DeleteTrigger(identity *id.ID, preimage Preimage, response MessageProcessor) error - // RemoveTrigger - If only a single response is associated with the preimage, the entire - // preimage is removed. If there is more than one response, only the given response is removed - // if nil is passed in for response, all triggers for the preimage will be removed - RemoveTrigger(preimage Preimage, response MessageProcessor) error + // DeleteClientTriggers - deletes all triggers assoseated with the given identity + DeleteClientTriggers(identity *id.ID) // TrackTriggers - Registers a callback which will get called every time triggers change. // It will receive the triggers list every time it is modified. // Will only get callbacks while the Network Follower is running. // Multiple trackTriggers can be registered - TrackTriggers(func(triggers []Trigger)) + TrackTriggers(TriggerTracker) + + /* In inProcess */ + // it is possible to receive a message over cMix before the fingerprints or + // triggers are registered. As a result, when handling fails, messages are + // put in the inProcess que for a set number of retries. + + // CheckInProgressMessages - retry processing all messages in check in + // progress messages. Call this after adding fingerprints or triggers + //while the follower is running. + CheckInProgressMessages() + + /*===Nodes================================================================*/ + /* Keys must be registed with nodes in order to send messages throug them. + this process is in general automatically handled by the Network Manager*/ + + // HasNode can be used to determine if a keying relationship exists with a + // node. + HasNode(nid *id.ID) bool + + // NumRegisteredNodes Returns the total number of nodes we have a keying + // relationship with + NumRegisteredNodes() int + + // Triggers the generation of a keying relationship with a given node + TriggerNodeRegistration(nid *id.ID) + + /*===Historical Rounds====================================================*/ + /* A complete set of round info is not kept on the client, and sometimes + the network will need to be queried to get round info. Historical rounds + is the system internal to the Network Manager to do this. + It can be used externally as well.*/ + + // LookupHistoricalRound - looks up the passed historical round on the + // network + LookupHistoricalRound(rid id.Round, callback func(info *mixmessages.RoundInfo, + success bool)) error + + /*===Sender===============================================================*/ + /* The sender handles sending comms to the network. It tracks connections to + gateways and handles proxying to gateways for targeted comms. It can be + used externally to contact gateway directly, bypassing the majority of + the network package*/ + + // SendToAny can be used to send the comm to any gateway in the network. + SendToAny(sendFunc func(host *connect.Host) (interface{}, error), stop *stoppable.Single) (interface{}, error) + + // SendToPreferred sends to a specific gateway, doing so through another + // gateway as a proxy if not directly connected. + SendToPreferred(targets []*id.ID, sendFunc func(host *connect.Host, + target *id.ID, timeout time.Duration) (interface{}, error), + stop *stoppable.Single, timeout time.Duration) (interface{}, error) + + // SetGatewayFilter sets a function which will be used to filter gateways + // before connecting. + SetGatewayFilter(f func(map[id.ID]int, + *ndf.NetworkDefinition) map[id.ID]int) + + // GetHostParams - returns the host params used when connectign to gateways + GetHostParams() connect.HostParams + + /*===Address Space========================================================*/ + // The network compasses identities into a smaller address space to cause + // collisions and hide the actual recipient of messages. These functions + // allow for the tracking of this addresses space. In general, address space + // issues are completely handled by the network package - /*Address Space*/ // GetAddressSpace GetAddressSize returns the current address size of IDs. Blocks until an // address size is known. GetAddressSpace() uint8 @@ -131,25 +198,35 @@ type NetworkManager interface { // every address space size update. The provided tag is the unique ID for // the channel. Returns an error if the tag is already used. RegisterAddressSpaceNotification(tag string) (chan uint8, error) + // UnregisterAddressSpaceNotification stops broadcasting address space size // updates on the channel with the specified tag. UnregisterAddressSpaceNotification(tag string) -} -type Preimage [32]byte + /*===Accessors============================================================*/ + + // GetInstance returns the network instance object, which tracks the + // state of the network + GetInstance() *network.Instance + + // GetHealthTracker returns the health tracker, which using a polling or + // event api lets you determine if network following is functioning + GetHealthTracker() HealthTracker -type EphemeralIdentity struct { - // Identity - EphId ephemeral.Id - Source *id.ID + // GetVerboseRounds returns stringification of verbose round info + GetVerboseRounds() string } +type Preimage [32]byte + type Trigger struct { Preimage Type string Source []byte } +type TriggerTracker func(triggers []Trigger) + type MessageProcessor interface { // Process decrypts and hands off the message to its internal down // stream message processing system. @@ -162,6 +239,8 @@ type MessageProcessor interface { round *mixmessages.RoundInfo) } +type ClientErrorReport func(source, message, trace string) + //type Ratchet interface { // SendE2E(m message.Send, p params.E2E, stop *stoppable.Single) ([]id.Round, e2e.MessageID, time.Time, error) // SendUnsafe(m message.Send, p params.Unsafe) ([]id.Round, error) @@ -175,4 +254,4 @@ type MessageProcessor interface { //} //for use in key exchange which needs to be callable inside of network -type SendE2E func(m message.Send, p params.E2E, stop *stoppable.Single) ([]id.Round, e2e.MessageID, time.Time, error) +///type SendE2E func(m message.Send, p params.E2E, stop *stoppable.Single) ([]id.Round, e2e.MessageID, time.Time, error) diff --git a/interfaces/params/CMIX.go b/interfaces/params/CMIX.go index 5d4fadbdf190019a0cf33af67d28426422728560..8706165ade26ff43279a325a33fa00b9d306f26a 100644 --- a/interfaces/params/CMIX.go +++ b/interfaces/params/CMIX.go @@ -14,56 +14,3 @@ import ( "gitlab.com/xx_network/primitives/id" "time" ) - -type CMIX struct { - // maximum number of rounds to try and send on - RoundTries uint - Timeout time.Duration - RetryDelay time.Duration - ExcludedRounds excludedRounds.ExcludedRounds - - // Duration to wait before sending on a round times out and a new round is - // tried - SendTimeout time.Duration - - // an alternate identity preimage to use on send. If not set, the default - // for the sending identity will be used - IdentityPreimage []byte - - // Tag which prints with sending logs to help localize the source - // All internal sends are tagged, so the default tag is "External" - DebugTag string - - //Threading interface, can be used to stop the send early - Stop *stoppable.Single - - //List of nodes to not send to, will skip a round with these - //nodes in it - BlacklistedNodes map[id.ID]interface{} -} - -func GetDefaultCMIX() CMIX { - return CMIX{ - RoundTries: 10, - Timeout: 25 * time.Second, - RetryDelay: 1 * time.Second, - SendTimeout: 3 * time.Second, - DebugTag: "External", - } -} - -func (c CMIX) Marshal() ([]byte, error) { - return json.Marshal(c) -} - -// GetCMIXParameters func obtains default CMIX parameters, or overrides with given parameters if set -func GetCMIXParameters(params string) (CMIX, error) { - p := GetDefaultCMIX() - if len(params) > 0 { - err := json.Unmarshal([]byte(params), &p) - if err != nil { - return CMIX{}, err - } - } - return p, nil -} diff --git a/interfaces/params/historical.go b/interfaces/params/historical.go index c7bf66779baccfabf0a7855c005bc8a03d34d2c9..816ee4f309c9d49e14bb325660b3d27aa344bd2f 100644 --- a/interfaces/params/historical.go +++ b/interfaces/params/historical.go @@ -6,32 +6,3 @@ /////////////////////////////////////////////////////////////////////////////// package params - -import ( - "time" -) - -type Historical struct { - // Number of historical rounds required to automatically send a historical - // rounds query - MaxHistoricalRounds uint - // Maximum period of time a pending historical round query will wait before - // it is transmitted - HistoricalRoundsPeriod time.Duration - - // Length of historical rounds channel buffer - HistoricalRoundsBufferLen uint - - // Maximum number of times a historical round lookup will be attempted - MaxHistoricalRoundsRetries uint -} - -func GetDefaultHistorical() Historical { - return Historical{ - MaxHistoricalRounds: 100, - HistoricalRoundsPeriod: 100 * time.Millisecond, - - HistoricalRoundsBufferLen: 1000, - MaxHistoricalRoundsRetries: 3, - } -} diff --git a/interfaces/params/network.go b/interfaces/params/network.go index f29586a8f867d0e5a2a753911b465aa1cef1de1f..1da0ce56350bf64c6a39999c78d625a00a1f236b 100644 --- a/interfaces/params/network.go +++ b/interfaces/params/network.go @@ -29,8 +29,6 @@ type Network struct { // If true, client receives a filtered set of updates // If false, client receives the full list of network updates FastPolling bool - // Messages will not be sent to Rounds containing these Nodes - BlacklistedNodes []string // Determines if the state of every round processed is tracked in ram. // This is very memory intensive and is primarily used for debugging VerboseRoundTracking bool diff --git a/interfaces/preimage/generate.go b/interfaces/preimage/generate.go index 84798d696a8b2311e6e89b01f64c885eb6c2d5b8..cf176599482c0f448638fdbad9c7bbdfc4fe2421 100644 --- a/interfaces/preimage/generate.go +++ b/interfaces/preimage/generate.go @@ -6,7 +6,6 @@ import ( ) func Generate(data []byte, t string) []byte { - if t == Default { return data } diff --git a/keyExchange/rekey.go b/keyExchange/rekey.go index ea14e92fa37bda8a2826eca10e43112fc1661530..5f3380c3f07aa79227bd1c5db43a4ea8cc04f51f 100644 --- a/keyExchange/rekey.go +++ b/keyExchange/rekey.go @@ -12,6 +12,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" @@ -28,7 +29,7 @@ import ( ) func CheckKeyExchanges(instance *network.Instance, sendE2E interfaces.SendE2E, - events interfaces.EventManager, sess *storage.Session, + events event.Manager, sess *storage.Session, manager *e2e.Manager, sendTimeout time.Duration, stop *stoppable.Single) { sessions := manager.TriggerNegotiations() @@ -43,7 +44,7 @@ func CheckKeyExchanges(instance *network.Instance, sendE2E interfaces.SendE2E, // session. They run the same negotiation, the former does it on a newly created // session while the latter on an extant session func trigger(instance *network.Instance, sendE2E interfaces.SendE2E, - events interfaces.EventManager, sess *storage.Session, + events event.Manager, sess *storage.Session, manager *e2e.Manager, session *e2e.Session, sendTimeout time.Duration, stop *stoppable.Single) { var negotiatingSession *e2e.Session diff --git a/keyExchange/utils_test.go b/keyExchange/utils_test.go index a6b5afeadb04344c5550281e97ff4ca63c26e3c1..ef27ae79eb04cd7917cb62f557c093351a674a01 100644 --- a/keyExchange/utils_test.go +++ b/keyExchange/utils_test.go @@ -11,6 +11,7 @@ import ( "github.com/cloudflare/circl/dh/sidh" "github.com/golang/protobuf/proto" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" @@ -100,7 +101,7 @@ func (t *testNetworkManagerGeneric) GetInstance() *network.Instance { } -func (t *testNetworkManagerGeneric) GetEventManager() interfaces.EventManager { +func (t *testNetworkManagerGeneric) GetEventManager() event.Manager { return &dummyEventMgr{} } @@ -167,7 +168,7 @@ type testNetworkManagerFullExchange struct { type dummyEventMgr struct{} func (d *dummyEventMgr) Report(p int, a, b, c string) {} -func (t *testNetworkManagerFullExchange) GetEventManager() interfaces.EventManager { +func (t *testNetworkManagerFullExchange) GetEventManager() event.Manager { return &dummyEventMgr{} } diff --git a/network/address/testutil.go b/network/address/testutil.go index 0c52b6329ffd0b425edf45f8aea6e636cfc024b5..cb43c66f1413bd361fd0ebc79a4bdb5a7f49faf6 100644 --- a/network/address/testutil.go +++ b/network/address/testutil.go @@ -8,6 +8,7 @@ package address import ( + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/network/gateway" "testing" "time" @@ -74,7 +75,7 @@ func (t *testNetworkManager) GetInstance() *network.Instance { type dummyEventMgr struct{} func (d *dummyEventMgr) Report(p int, a, b, c string) {} -func (t *testNetworkManager) GetEventManager() interfaces.EventManager { +func (t *testNetworkManager) GetEventManager() event.Manager { return &dummyEventMgr{} } diff --git a/network/follow.go b/network/follow.go index a494c4885931c8045ed7dc76cd2b99e70d466ee3..d2f6802fa9bdcf10b168d988c4b37594ab693e0a 100644 --- a/network/follow.go +++ b/network/follow.go @@ -28,7 +28,6 @@ import ( "encoding/binary" "fmt" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/network/identity/receptionID/store" "gitlab.com/elixxir/client/stoppable" pb "gitlab.com/elixxir/comms/mixmessages" @@ -59,7 +58,7 @@ type followNetworkComms interface { // followNetwork polls the network to get updated on the state of nodes, the // round status, and informs the client when messages can be retrieved. -func (m *manager) followNetwork(report interfaces.ClientErrorReport, +func (m *manager) followNetwork(report ClientErrorReport, stop *stoppable.Single) { ticker := time.NewTicker(m.param.TrackNetworkPeriod) TrackTicker := time.NewTicker(debugTrackPeriod) @@ -109,7 +108,7 @@ func (m *manager) followNetwork(report interfaces.ClientErrorReport, } // executes each iteration of the follower -func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, +func (m *manager) follow(report ClientErrorReport, rng csprng.Source, comms followNetworkComms, stop *stoppable.Single, abandon func(round id.Round)) { //get the identity we will poll for diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index 2aa2b714de6abc473fa03f1e4fde2c60f5ac1c77..2f7ade53bd08b3b0095105284a6b2f6e82f83a14 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -346,7 +346,7 @@ func (h *HostPool) UpdateNdf(ndf *ndf.NetworkDefinition) { } // SetPoolFilter sets the filter used to filter gateways from the ID map. -func (h *HostPool) SetHostPoolFilter(f Filter) { +func (h *HostPool) SetGatewayFilter(f Filter) { h.filterMux.Lock() defer h.filterMux.Unlock() diff --git a/network/gateway/sender.go b/network/gateway/sender.go index b930cb31a0a42d141cb2085d6efd9179bb8e56e8..7c41dc7139c91d822e0fc5b4d2250058e1522f46 100644 --- a/network/gateway/sender.go +++ b/network/gateway/sender.go @@ -29,7 +29,7 @@ type Sender interface { SendToPreferred(targets []*id.ID, sendFunc sendToPreferredFunc, stop *stoppable.Single, timeout time.Duration) (interface{}, error) UpdateNdf(ndf *ndf.NetworkDefinition) - SetHostPoolFilter(f Filter) + SetGatewayFilter(f Filter) GetHostParams() connect.HostParams } diff --git a/network/health/tracker.go b/network/health/tracker.go index bfef7a53678fcd677cacd59ba7460aa9afcde6a5..5873e9d646d10dc0450dea1dd5ded92d287e2951 100644 --- a/network/health/tracker.go +++ b/network/health/tracker.go @@ -19,12 +19,19 @@ import ( "time" ) -type Tracker struct { +type Monitor interface { + AddHealthCallback(f func(bool)) uint64 + RemoveHealthCallback(uint64) + IsHealthy() bool + WasHealthy() bool + StartProcessies() (stoppable.Stoppable, error) +} + +type tracker struct { timeout time.Duration heartbeat chan network.Heartbeat - channels map[uint64]chan bool funcs map[uint64]func(isHealthy bool) channelsID uint64 funcsID uint64 @@ -42,18 +49,17 @@ type Tracker struct { // Init creates a single HealthTracker thread, starts it, and returns a tracker // and a stoppable. -func Init(instance *network.Instance, timeout time.Duration) *Tracker { +func Init(instance *network.Instance, timeout time.Duration) Monitor { tracker := newTracker(timeout) instance.SetNetworkHealthChan(tracker.heartbeat) return tracker } -// newTracker builds and returns a new Tracker object given a Context. -func newTracker(timeout time.Duration) *Tracker { - return &Tracker{ +// newTracker builds and returns a new tracker object given a Context. +func newTracker(timeout time.Duration) *tracker { + return &tracker{ timeout: timeout, - channels: map[uint64]chan bool{}, funcs: map[uint64]func(isHealthy bool){}, heartbeat: make(chan network.Heartbeat, 100), isHealthy: false, @@ -61,38 +67,10 @@ func newTracker(timeout time.Duration) *Tracker { } } -// AddChannel adds a channel to the list of Tracker channels such that each -// channel can be notified of network changes. Returns a unique ID for the -// channel. -func (t *Tracker) AddChannel(c chan bool) uint64 { - var currentID uint64 - - t.mux.Lock() - t.channels[t.channelsID] = c - currentID = t.channelsID - t.channelsID++ - t.mux.Unlock() - - select { - case c <- t.IsHealthy(): - default: - } - - return currentID -} - -// RemoveChannel removes the channel with the given ID from the list of Tracker -// channels so that it will not longer be notified of network changes. -func (t *Tracker) RemoveChannel(chanID uint64) { - t.mux.Lock() - delete(t.channels, chanID) - t.mux.Unlock() -} - -// AddFunc adds a function to the list of Tracker functions such that each +// AddFunc adds a function to the list of tracker functions such that each // function can be run after network changes. Returns a unique ID for the // function. -func (t *Tracker) AddFunc(f func(isHealthy bool)) uint64 { +func (t *tracker) AddHealthCallback(f func(isHealthy bool)) uint64 { var currentID uint64 t.mux.Lock() @@ -106,15 +84,15 @@ func (t *Tracker) AddFunc(f func(isHealthy bool)) uint64 { return currentID } -// RemoveFunc removes the function with the given ID from the list of Tracker +// RemoveFunc removes the function with the given ID from the list of tracker // functions so that it will not longer be run. -func (t *Tracker) RemoveFunc(chanID uint64) { +func (t *tracker) RemoveHealthCallback(chanID uint64) { t.mux.Lock() - delete(t.channels, chanID) + delete(t.funcs, chanID) t.mux.Unlock() } -func (t *Tracker) IsHealthy() bool { +func (t *tracker) IsHealthy() bool { t.mux.RLock() defer t.mux.RUnlock() @@ -122,14 +100,14 @@ func (t *Tracker) IsHealthy() bool { } // WasHealthy returns true if isHealthy has ever been true. -func (t *Tracker) WasHealthy() bool { +func (t *tracker) WasHealthy() bool { t.mux.RLock() defer t.mux.RUnlock() return t.wasHealthy } -func (t *Tracker) setHealth(h bool) { +func (t *tracker) setHealth(h bool) { t.mux.Lock() // Only set wasHealthy to true if either // wasHealthy is true or @@ -141,7 +119,7 @@ func (t *Tracker) setHealth(h bool) { t.transmit(h) } -func (t *Tracker) Start() (stoppable.Stoppable, error) { +func (t *tracker) StartProcessies() (stoppable.Stoppable, error) { t.mux.Lock() if t.running { t.mux.Unlock() @@ -153,7 +131,7 @@ func (t *Tracker) Start() (stoppable.Stoppable, error) { t.isHealthy = false t.mux.Unlock() - stop := stoppable.NewSingle("health Tracker") + stop := stoppable.NewSingle("health tracker") go t.start(stop) @@ -162,7 +140,7 @@ func (t *Tracker) Start() (stoppable.Stoppable, error) { // start starts a long-running thread used to monitor and report on network // health. -func (t *Tracker) start(stop *stoppable.Single) { +func (t *tracker) start(stop *stoppable.Single) { for { var heartbeat network.Heartbeat select { @@ -192,15 +170,7 @@ func (t *Tracker) start(stop *stoppable.Single) { } } -func (t *Tracker) transmit(health bool) { - for _, c := range t.channels { - select { - case c <- health: - default: - jww.DEBUG.Printf("Unable to send health event") - } - } - +func (t *tracker) transmit(health bool) { // Run all listening functions for _, f := range t.funcs { go f(health) diff --git a/network/health/tracker_test.go b/network/health/tracker_test.go index a2e20651adaa06781f4d685cb5502cd5b56faae0..abfb33b3d71f8aff4b4301b47ab256871fbf87c9 100644 --- a/network/health/tracker_test.go +++ b/network/health/tracker_test.go @@ -33,8 +33,8 @@ func TestNewTracker(t *testing.T) { counter-- } } - tracker.AddChannel(listenChan) - tracker.AddFunc(listenFunc) + tracker.AddHealthChannelCallback(listenChan) + tracker.AddHealthCallback(listenFunc) go func() { for isHealthy := range listenChan { if isHealthy { @@ -66,12 +66,12 @@ func TestNewTracker(t *testing.T) { // Verify the network was marked as healthy if !tracker.IsHealthy() { - t.Fatal("Tracker did not become healthy.") + t.Fatal("tracker did not become healthy.") } // Check if the tracker was ever healthy if !tracker.WasHealthy() { - t.Fatal("Tracker did not become healthy.") + t.Fatal("tracker did not become healthy.") } // Verify the heartbeat triggered the listening chan/func @@ -85,12 +85,12 @@ func TestNewTracker(t *testing.T) { // Verify the network was marked as NOT healthy if tracker.IsHealthy() { - t.Fatal("Tracker should not report healthy.") + t.Fatal("tracker should not report healthy.") } // Check if the tracker was ever healthy, after setting healthy to false if !tracker.WasHealthy() { - t.Fatal("Tracker was healthy previously but not reported healthy.") + t.Fatal("tracker was healthy previously but not reported healthy.") } // Verify the timeout triggered the listening chan/func diff --git a/network/historical/historical.go b/network/historical/historical.go index 68e3984d9b43389528d8e4bd342e8aabbfc9bd04..25688c109ba9d9d609fd4a9b476cbfb2ab57098a 100644 --- a/network/historical/historical.go +++ b/network/historical/historical.go @@ -11,8 +11,7 @@ import ( "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/interfaces" - "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/stoppable" pb "gitlab.com/elixxir/comms/mixmessages" @@ -36,11 +35,11 @@ type Retriever interface { // manager is the controlling structure type manager struct { - params params.Historical + params Params comms RoundsComms sender gateway.Sender - events interfaces.EventManager + events event.Manager c chan roundRequest } @@ -62,8 +61,8 @@ type roundRequest struct { numAttempts uint } -func NewRetriever(param params.Historical, comms RoundsComms, - sender gateway.Sender, events interfaces.EventManager) Retriever { +func NewRetriever(param Params, comms RoundsComms, + sender gateway.Sender, events event.Manager) Retriever { return &manager{ params: param, comms: comms, diff --git a/network/historical/params.go b/network/historical/params.go new file mode 100644 index 0000000000000000000000000000000000000000..b9965394e80545287c45011e29e57e795925ebbc --- /dev/null +++ b/network/historical/params.go @@ -0,0 +1,28 @@ +package historical + +import "time" + +type Params struct { + // Number of historical rounds required to automatically send a historical + // rounds query + MaxHistoricalRounds uint + // Maximum period of time a pending historical round query will wait before + // it is transmitted + HistoricalRoundsPeriod time.Duration + + // Length of historical rounds channel buffer + HistoricalRoundsBufferLen uint + + // Maximum number of times a historical round lookup will be attempted + MaxHistoricalRoundsRetries uint +} + +func GetDefaultParams() Params { + return Params{ + MaxHistoricalRounds: 100, + HistoricalRoundsPeriod: 100 * time.Millisecond, + + HistoricalRoundsBufferLen: 1000, + MaxHistoricalRoundsRetries: 3, + } +} diff --git a/network/identity/receptionID/identity.go b/network/identity/receptionID/identity.go index ee7c069e911d3dcd43102c90087920f3f1bb33e9..636036ba074f517d01a70fe5ebd3cd2be0e69d08 100644 --- a/network/identity/receptionID/identity.go +++ b/network/identity/receptionID/identity.go @@ -3,8 +3,9 @@ package receptionID import ( "encoding/json" "github.com/pkg/errors" - "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/netTime" "strconv" "strings" @@ -14,9 +15,15 @@ import ( const identityStorageKey = "IdentityStorage" const identityStorageVersion = 0 +type EphemeralIdentity struct { + // Identity + EphId ephemeral.Id + Source *id.ID +} + type Identity struct { // Identity - interfaces.EphemeralIdentity + EphemeralIdentity AddressSize uint8 // Usage variables diff --git a/network/interface.go b/network/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..3f37458ec8ac35b9feb734a7723397c8d66a0978 --- /dev/null +++ b/network/interface.go @@ -0,0 +1,249 @@ +package network + +import ( + "gitlab.com/elixxir/client/interfaces/message" + "gitlab.com/elixxir/client/network/identity/receptionID" + "gitlab.com/elixxir/client/stoppable" + "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/comms/network" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" + "gitlab.com/xx_network/primitives/ndf" + "time" +) + +type Manager interface { + // Follow starts the tracking of the network in a new thread. + // Errors that occur are reported on the ClientErrorReport function if + // passed. The returned stopable can be used to stop the follower. + // Only one follower may run at a time. + Follow(report ClientErrorReport) (stoppable.Stoppable, error) + + /*===Sending==============================================================*/ + + // SendCMIX sends a "raw" CMIX message payload to the provided recipient. + // Returns the round ID of the round the payload was sent or an error + // if it fails. + SendCMIX(message format.Message, recipient *id.ID, p CMIXParams) ( + id.Round, ephemeral.Id, error) + + // SendManyCMIX sends many "raw" cMix message payloads to each of the provided + // recipients. Used to send messages in group chats. Metadata is NOT as well + // protected with this call and can leak data about yourself. Should be + // replaced with multiple uses of SendCmix in most cases. Returns the round + // ID of the round the payload was sent or an error if it fails. + // WARNING: Potentially Unsafe + SendManyCMIX(messages []message.TargetedCmixMessage, p CMIXParams) ( + id.Round, []ephemeral.Id, error) + + /*===Message Reception====================================================*/ + /* Identities are all network identites which the client is currently + trying to pick up message on. An identity must be added + to receive messages, fake ones will be used to poll the network + if none are present. On creation of the network handler, the identity in + session storage will be automatically added*/ + + // AddIdentity adds an identity to be tracked + // If persistent is false, the identity will not be stored to disk and + // will be dropped on reload. + AddIdentity(id *id.ID, validUntil time.Time, persistent bool) error + // RemoveIdentity removes a currently tracked identity. + RemoveIdentity(id *id.ID) + + /* Fingerprints are the primary mechanism of identifying a picked up + message over cMix. They are a unique one time use 255 bit vector generally + associated with a specific encryption key, but can be used for an + alternative protocol.When registering a fingerprint, a MessageProcessor + is registered to handle the message.*/ + + // AddFingerprint - Adds a fingerprint which will be handled by a + // specific processor for messages received by the given identity + AddFingerprint(identity *id.ID, fingerprint format.Fingerprint, + mp MessageProcessor) error + + // DeleteFingerprint deletes a single fingerprint associated with the given + // identity if it exists + DeleteFingerprint(identity *id.ID, fingerprint format.Fingerprint) + + // DeleteClientFingerprints deletes al fingerprint associated with the given + // identity if it exists + DeleteClientFingerprints(identity *id.ID) + + /* trigger - predefined hash based tags appended to all cMix messages + which, though trial hashing, are used to determine if a message applies + to this client + + Triggers are used for 2 purposes - They can be processed by the + notifications system, or can be used to implement custom non fingerprint + processing of payloads. I.E. key negotiation, broadcast negotiation + + A tag is appended to the message of the format tag = H(H(messageContents), + preimage) and trial hashing is used to determine if a message adheres to a + tag. + WARNING: If a preimage is known by an adversary, they can determine which + messages are for the client on reception (which is normally hidden due to + collision between ephemeral IDs. + + Due to the extra overhead of trial hashing, triggers are processed after fingerprints. + If a fingerprint match occurs on the message, triggers will not be handled. + + Triggers are address to the session. When starting a new client, all triggers must be + re-added before StartNetworkFollower is called. + */ + + // AddTrigger - Adds a trigger which can call a message handing function or + // be used for notifications. Multiple triggers can be registered for the + // same preimage. + // preimage - the preimage which is triggered on + // type - a descriptive string of the trigger. Generally used in notifications + // source - a byte buffer of related data. Generally used in notifications. + // Example: Sender ID + AddTrigger(identity *id.ID, newTrigger Trigger, response MessageProcessor) + + // DeleteTrigger - If only a single response is associated with the + // preimage, the entire preimage is removed. If there is more than one + // response, only the given response is removed if nil is passed in for + // response, all triggers for the preimage will be removed + DeleteTrigger(identity *id.ID, preimage Preimage, response MessageProcessor) error + + // DeleteClientTriggers - deletes all triggers assoseated with the given identity + DeleteClientTriggers(identity *id.ID) + + // TrackTriggers - Registers a callback which will get called every time triggers change. + // It will receive the triggers list every time it is modified. + // Will only get callbacks while the Network Follower is running. + // Multiple trackTriggers can be registered + TrackTriggers(TriggerTracker) + + /* In inProcess */ + // it is possible to receive a message over cMix before the fingerprints or + // triggers are registered. As a result, when handling fails, messages are + // put in the inProcess que for a set number of retries. + + // CheckInProgressMessages - retry processing all messages in check in + // progress messages. Call this after adding fingerprints or triggers + //while the follower is running. + CheckInProgressMessages() + + /*===Health Monitor=======================================================*/ + // The health monitor is a system which tracks if the client sees a live + // network. It can either be polled or set up with events + + // IsHealthy Returns true if currently healthy + IsHealthy() bool + + // WasHealthy returns true if the network has ever been healthy in this run + WasHealthy() bool + + // AddHealthCallback - adds a callback which gets called whenever the heal + // changes. Returns a registration ID which can be used to unregister + AddHealthCallback(f func(bool)) uint64 + + // RemoveHealthCallback - Removes a health callback using its + // registration ID + RemoveHealthCallback(uint64) + + /*===Nodes================================================================*/ + /* Keys must be registed with nodes in order to send messages throug them. + this process is in general automatically handled by the Network Manager*/ + + // HasNode can be used to determine if a keying relationship exists with a + // node. + HasNode(nid *id.ID) bool + + // NumRegisteredNodes Returns the total number of nodes we have a keying + // relationship with + NumRegisteredNodes() int + + // TriggerNodeRegistration Triggers the negotiation of a keying + // relationship with a given node + TriggerNodeRegistration(nid *id.ID) + + /*===Historical Rounds====================================================*/ + /* A complete set of round info is not kept on the client, and sometimes + the network will need to be queried to get round info. Historical rounds + is the system internal to the Network Manager to do this. + It can be used externally as well.*/ + + // LookupHistoricalRound - looks up the passed historical round on the + // network + LookupHistoricalRound(rid id.Round, callback func(info *mixmessages.RoundInfo, + success bool)) error + + /*===Sender===============================================================*/ + /* The sender handles sending comms to the network. It tracks connections to + gateways and handles proxying to gateways for targeted comms. It can be + used externally to contact gateway directly, bypassing the majority of + the network package*/ + + // SendToAny can be used to send the comm to any gateway in the network. + SendToAny(sendFunc func(host *connect.Host) (interface{}, error), stop *stoppable.Single) (interface{}, error) + + // SendToPreferred sends to a specific gateway, doing so through another + // gateway as a proxy if not directly connected. + SendToPreferred(targets []*id.ID, sendFunc func(host *connect.Host, + target *id.ID, timeout time.Duration) (interface{}, error), + stop *stoppable.Single, timeout time.Duration) (interface{}, error) + + // SetGatewayFilter sets a function which will be used to filter gateways + // before connecting. + SetGatewayFilter(f func(map[id.ID]int, + *ndf.NetworkDefinition) map[id.ID]int) + + // GetHostParams - returns the host params used when connectign to gateways + GetHostParams() connect.HostParams + + /*===Address Space========================================================*/ + // The network compasses identities into a smaller address space to cause + // collisions and hide the actual recipient of messages. These functions + // allow for the tracking of this addresses space. In general, address space + // issues are completely handled by the network package + + // GetAddressSpace GetAddressSize returns the current address size of IDs. Blocks until an + // address size is known. + GetAddressSpace() uint8 + + // RegisterAddressSpaceNotification returns a channel that will trigger for + // every address space size update. The provided tag is the unique ID for + // the channel. Returns an error if the tag is already used. + RegisterAddressSpaceNotification(tag string) (chan uint8, error) + + // UnregisterAddressSpaceNotification stops broadcasting address space size + // updates on the channel with the specified tag. + UnregisterAddressSpaceNotification(tag string) + + /*===Accessors============================================================*/ + + // GetInstance returns the network instance object, which tracks the + // state of the network + GetInstance() *network.Instance + + // GetVerboseRounds returns stringification of verbose round info + GetVerboseRounds() string +} + +type Preimage [32]byte + +type Trigger struct { + Preimage + Type string + Source []byte +} + +type TriggerTracker func(triggers []Trigger) + +type MessageProcessor interface { + // Process decrypts and hands off the message to its internal down + // stream message processing system. + // CRITICAL: Fingerprints should never be used twice. Process must + // denote, in long term storage, usage of a fingerprint and that + // fingerprint must not be added again during application load. + // It is a security vulnerability to reuse a fingerprint. It leaks + // privacy and can lead to compromise of message contents and integrity. + Process(message format.Message, receptionID receptionID.EphemeralIdentity, + round *mixmessages.RoundInfo) +} + +type ClientErrorReport func(source, message, trace string) diff --git a/network/manager.go b/network/manager.go index b1c97abd59fd7bf2aa9f02a5c3b8db8d318dd231..5624c916d64a30a98082aab9657b8ead5eba19db 100644 --- a/network/manager.go +++ b/network/manager.go @@ -13,8 +13,7 @@ package network import ( "fmt" "github.com/pkg/errors" - "gitlab.com/elixxir/client/interfaces" - "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/network/address" "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/health" @@ -54,14 +53,11 @@ type manager struct { rng *fastRNG.StreamGenerator // comms pointer to send/recv messages comms *client.Comms - //contains the health tracker which keeps track of if from the client's - //perspective, the network is in good condition - health *health.Tracker //contains the network instance instance *commNetwork.Instance // parameters of the network - param params.Network + param Params //sub-managers gateway.Sender @@ -71,6 +67,7 @@ type manager struct { rounds.Pickup address.Space identity.Tracker + health.Monitor // Earliest tracked round earliestRound *uint64 @@ -82,13 +79,13 @@ type manager struct { verboseRounds *RoundTracker // Event reporting api - events interfaces.EventManager + events event.Manager } // NewManager builds a new reception manager object using inputted key fields -func NewManager(params params.Network, comms *client.Comms, session storage.Session, - ndf *ndf.NetworkDefinition, rng *fastRNG.StreamGenerator, events interfaces.EventManager, -) (interfaces.NetworkManager, error) { +func NewManager(params Params, comms *client.Comms, session storage.Session, + ndf *ndf.NetworkDefinition, rng *fastRNG.StreamGenerator, events event.Manager, +) (Manager, error) { //start network instance instance, err := commNetwork.NewInstance(comms.ProtoComms, ndf, nil, nil, commNetwork.None, params.FastPolling) @@ -100,7 +97,7 @@ func NewManager(params params.Network, comms *client.Comms, session storage.Sess tracker := uint64(0) earliest := uint64(0) // create manager object - m := manager{ + m := &manager{ param: params, tracker: &tracker, Space: address.NewAddressSpace(), @@ -109,7 +106,6 @@ func NewManager(params params.Network, comms *client.Comms, session storage.Sess session: session, rng: rng, comms: comms, - health: health.Init(instance, params.NetworkHealthTimeout), instance: instance, } m.UpdateAddressSpace(18) @@ -145,11 +141,11 @@ func NewManager(params params.Network, comms *client.Comms, session storage.Sess m.Retriever = historical.NewRetriever(params.Historical, comms, m.Sender, events) //Set up Message Handler - m.Handler = message.NewHandler(params, m.session.GetKV(), m.events) + m.Handler = message.NewHandler(params.Message, m.session.GetKV(), m.events) //set up round handler - m.Pickup = rounds.NewPickup(m.param.Rounds, m.Handler.GetMessageReceptionChannel(), - m.Sender, m.Retriever, m.rng, m.instance, m.session.GetKV()) + m.Pickup = rounds.NewPickup(params.Rounds, m.Handler.GetMessageReceptionChannel(), + m.Sender, m.Retriever, m.rng, m.instance, m.session) //add the identity system m.Tracker = identity.NewOrLoadTracker(m.session, m.Space) @@ -157,13 +153,15 @@ func NewManager(params params.Network, comms *client.Comms, session storage.Sess // Set upthe ability to register with new nodes when they appear m.instance.SetAddGatewayChan(nodechan) + m.Monitor = health.Init(instance, params.NetworkHealthTimeout) + // Report health events - m.health.AddFunc(func(isHealthy bool) { + m.Monitor.AddHealthCallback(func(isHealthy bool) { m.events.Report(5, "health", "IsHealthy", fmt.Sprintf("%v", isHealthy)) }) - return &m, nil + return m, nil } // Follow StartRunners kicks off all network reception goroutines ("threads"). @@ -172,15 +170,15 @@ func NewManager(params params.Network, comms *client.Comms, session storage.Sess // - Historical Round Retrieval (/network/rounds/historical.go) // - Message Retrieval Worker Group (/network/rounds/retrieve.go) // - Message Handling Worker Group (/network/message/handle.go) -// - health Tracker (/network/health) +// - health tracker (/network/health) // - Garbled Messages (/network/message/inProgress.go) // - Critical Messages (/network/message/critical.go) // - Ephemeral ID tracking (network/address/tracker.go) -func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppable, error) { +func (m *manager) Follow(report ClientErrorReport) (stoppable.Stoppable, error) { multi := stoppable.NewMulti("networkManager") // health tracker - healthStop, err := m.health.Start() + healthStop, err := m.Monitor.StartProcessies() if err != nil { return nil, errors.Errorf("failed to follow") } @@ -190,7 +188,7 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab multi.Add(m.Registrar.StartProcesses(m.param.ParallelNodeRegistrations)) // Adding/MixCypher //TODO-node remover - // Start the Network Tracker + // Start the Network tracker followNetworkStopper := stoppable.NewSingle("FollowNetwork") go m.followNetwork(report, followNetworkStopper) multi.Add(followNetworkStopper) @@ -210,11 +208,6 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab return multi, nil } -// GetHealthTracker returns the health tracker -func (m *manager) GetHealthTracker() interfaces.HealthTracker { - return m.health -} - // GetInstance returns the network instance object (ndf state) func (m *manager) GetInstance() *commNetwork.Instance { return m.instance diff --git a/network/message/params.go b/network/message/params.go new file mode 100644 index 0000000000000000000000000000000000000000..9a3a56cd599860416ebe2951048c2c5ac7767987 --- /dev/null +++ b/network/message/params.go @@ -0,0 +1,21 @@ +package message + +import "time" + +type Params struct { + MessageReceptionBuffLen uint + MessageReceptionWorkerPoolSize uint + MaxChecksInProcessMessage uint + InProcessMessageWait time.Duration + RealtimeOnly bool +} + +func GetDefaultParams() Params { + return Params{ + MessageReceptionBuffLen: 500, + MessageReceptionWorkerPoolSize: 4, + MaxChecksInProcessMessage: 10, + InProcessMessageWait: 15 * time.Minute, + RealtimeOnly: false, + } +} diff --git a/network/message/pickup.go b/network/message/pickup.go index ce2e5b77cc8532eac11212d3ca8acc1adb35b334..d86aa1a2f70464511dcfd3275ad439cdb267094e 100644 --- a/network/message/pickup.go +++ b/network/message/pickup.go @@ -8,7 +8,7 @@ package message import ( - "encoding/base64" + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/primitives/format" @@ -16,7 +16,6 @@ import ( "strconv" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/stoppable" ) @@ -36,26 +35,26 @@ type Handler interface { // Triggers AddTrigger(clientID *id.ID, newTrigger interfaces.Trigger, response interfaces.MessageProcessor) - DeleteTriggers(clientID *id.ID, preimage interfaces.Preimage, response interfaces.MessageProcessor) error + DeleteTrigger(clientID *id.ID, preimage interfaces.Preimage, response interfaces.MessageProcessor) error DeleteClientTriggers(clientID *id.ID) + TrackTriggers(triggerTracker interfaces.TriggerTracker) } type handler struct { - param params.Network - blacklistedNodes map[string]interface{} + param Params messageReception chan Bundle checkInProgress chan struct{} inProcess *MeteredCmixMessageBuffer - events interfaces.EventManager + events event.Manager FingerprintsManager TriggersManager } -func NewHandler(param params.Network, kv *versioned.KV, events interfaces.EventManager) Handler { +func NewHandler(param Params, kv *versioned.KV, events event.Manager) Handler { garbled, err := NewOrLoadMeteredCmixMessageBuffer(kv, inProcessKey) if err != nil { @@ -69,15 +68,6 @@ func NewHandler(param params.Network, kv *versioned.KV, events interfaces.EventM inProcess: garbled, events: events, } - for _, nodeId := range param.BlacklistedNodes { - decodedId, err := base64.StdEncoding.DecodeString(nodeId) - if err != nil { - jww.ERROR.Printf("Unable to decode blacklisted Node ID %s: %+v", - decodedId, err) - continue - } - m.blacklistedNodes[string(decodedId)] = nil - } m.FingerprintsManager = *newFingerprints() m.TriggersManager = *NewTriggers() diff --git a/network/message/triggers.go b/network/message/triggers.go index 7106f40ef6097c581e88f55a13a9f5a9369500d1..3011ae600e51c18ab9ab49d5df10d2daee346e79 100644 --- a/network/message/triggers.go +++ b/network/message/triggers.go @@ -37,7 +37,9 @@ must be re-added before StartNetworkFollower is called. */ type TriggersManager struct { - tmap map[id.ID]map[interfaces.Preimage][]trigger + tmap map[id.ID]map[interfaces.Preimage][]trigger + trackers []interfaces.TriggerTracker + numTriggers uint sync.Mutex } @@ -112,13 +114,15 @@ func (t *TriggersManager) AddTrigger(clientID *id.ID, newTrigger interfaces.Trig t.tmap[cid][pi] = []trigger{newEntry} + t.numTriggers++ + t.triggerTriggerTracking() } -// DeleteTriggers - If only a single response is associated with the preimage, +// DeleteTrigger - If only a single response is associated with the preimage, // the entire preimage is removed. If there is more than one response, only the // given response is removed. If nil is passed in for response, all triggers for // the preimage will be removed. -func (t *TriggersManager) DeleteTriggers(clientID *id.ID, preimage interfaces.Preimage, +func (t *TriggersManager) DeleteTrigger(clientID *id.ID, preimage interfaces.Preimage, response interfaces.MessageProcessor) error { t.Lock() defer t.Unlock() @@ -154,7 +158,8 @@ func (t *TriggersManager) DeleteTriggers(clientID *id.ID, preimage interfaces.Pr return nil } } - + t.numTriggers-- + t.triggerTriggerTracking() return nil } @@ -165,3 +170,35 @@ func (t *TriggersManager) DeleteClientTriggers(clientID *id.ID) { delete(t.tmap, *clientID) } + +// TrackTriggers adds a trigger tracker to be triggered when a nee trigger +// as added. +func (t *TriggersManager) TrackTriggers(triggerTracker interfaces.TriggerTracker) { + if triggerTracker == nil { + return + } + t.Lock() + defer t.Unlock() + + t.trackers = append(t.trackers, triggerTracker) +} + +//triggerTriggerTracking triggers the tracking of triggers +func (t *TriggersManager) triggerTriggerTracking() { + if len(t.trackers) == 0 { + return + } + + triggers := make([]interfaces.Trigger, 0, t.numTriggers) + for _, tmap := range t.tmap { + for _, tlist := range tmap { + for i := range tlist { + triggers = append(triggers, tlist[i].Trigger) + } + } + } + + for _, callback := range t.trackers { + go callback(triggers) + } +} diff --git a/network/params.go b/network/params.go new file mode 100644 index 0000000000000000000000000000000000000000..cd8e68d2dbf775b933734438022ffdd0e6be35a8 --- /dev/null +++ b/network/params.go @@ -0,0 +1,138 @@ +package network + +import ( + "encoding/json" + "gitlab.com/elixxir/client/network/historical" + "gitlab.com/elixxir/client/network/message" + "gitlab.com/elixxir/client/network/rounds" + "gitlab.com/elixxir/client/stoppable" + "gitlab.com/elixxir/primitives/excludedRounds" + "gitlab.com/xx_network/primitives/id" + "time" +) + +type Params struct { + TrackNetworkPeriod time.Duration + // maximum number of rounds to check in a single iterations network updates + MaxCheckedRounds uint + // Size of the buffer of nodes to register + RegNodesBufferLen uint + // Longest delay between network events for health tracker to denote that + // the network is in a bad state + NetworkHealthTimeout time.Duration + //Number of parallel nodes registration the client is capable of + ParallelNodeRegistrations uint + //How far back in rounds the network should actually check + KnownRoundsThreshold uint + // Determines verbosity of network updates while polling + // If true, client receives a filtered set of updates + // If false, client receives the full list of network updates + FastPolling bool + // Determines if the state of every round processed is tracked in ram. + // This is very memory intensive and is primarily used for debugging + VerboseRoundTracking bool + //disables all attempts to pick up dropped or missed messages + RealtimeOnly bool + // Resends auth requests up the stack if received multiple times + ReplayRequests bool + + Rounds rounds.Params + Message message.Params + Historical historical.Params +} + +func GetDefaultParams() Params { + n := Params{ + TrackNetworkPeriod: 100 * time.Millisecond, + MaxCheckedRounds: 500, + RegNodesBufferLen: 1000, + NetworkHealthTimeout: 30 * time.Second, + ParallelNodeRegistrations: 20, + KnownRoundsThreshold: 1500, //5 rounds/sec * 60 sec/min * 5 min + FastPolling: true, + VerboseRoundTracking: false, + RealtimeOnly: false, + ReplayRequests: true, + } + n.Rounds = rounds.GetDefaultParams() + n.Message = message.GetDefaultParams() + n.Historical = historical.GetDefaultParams() + + return n +} + +func (n Params) Marshal() ([]byte, error) { + return json.Marshal(n) +} + +func (n Params) SetRealtimeOnlyAll() Params { + n.RealtimeOnly = true + n.Rounds.RealtimeOnly = true + n.Message.RealtimeOnly = true + return n +} + +// Obtain default Network parameters, or override with given parameters if set +func GetParameters(params string) (Params, error) { + p := GetDefaultParams() + if len(params) > 0 { + err := json.Unmarshal([]byte(params), &p) + if err != nil { + return Params{}, err + } + } + return p, nil +} + +type CMIXParams struct { + // maximum number of rounds to try and send on + RoundTries uint + Timeout time.Duration + RetryDelay time.Duration + ExcludedRounds excludedRounds.ExcludedRounds + + // Duration to wait before sending on a round times out and a new round is + // tried + SendTimeout time.Duration + + // an alternate identity preimage to use on send. If not set, the default + // for the sending identity will be used + IdentityPreimage []byte + + // Tag which prints with sending logs to help localize the source + // All internal sends are tagged, so the default tag is "External" + DebugTag string + + //Threading interface, can be used to stop the send early + Stop *stoppable.Single + + //List of nodes to not send to, will skip a round with these + //nodes in it + BlacklistedNodes map[id.ID]interface{} +} + +func GetDefaultCMIX() CMIXParams { + return CMIXParams{ + RoundTries: 10, + Timeout: 25 * time.Second, + RetryDelay: 1 * time.Second, + SendTimeout: 3 * time.Second, + DebugTag: "External", + } +} + +func (c CMIXParams) Marshal() ([]byte, error) { + return json.Marshal(c) +} + +// GetCMIXParameters func obtains default CMIX parameters, or overrides with given parameters if set +func GetCMIXParameters(params string) (CMIXParams, error) { + p := GetDefaultCMIX() + if len(params) > 0 { + err := json.Unmarshal([]byte(params), &p) + if err != nil { + return CMIXParams{}, err + } + } + return p, nil +} diff --git a/network/remoteFilters.go b/network/remoteFilters.go index 0c88e42aabce67e103bd31ec61044cd4b3387d4a..19421a8dbd6f98d25bd0a281fcc922395c0d68ff 100644 --- a/network/remoteFilters.go +++ b/network/remoteFilters.go @@ -10,11 +10,13 @@ package network import ( jww "github.com/spf13/jwalterweatherman" bloom "gitlab.com/elixxir/bloomfilter" - "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/xx_network/primitives/id" ) +const BloomFilterSize = 648 // In Bits +const BloomFilterHashes = 10 + func NewRemoteFilter(data *mixmessages.ClientBloom) *RemoteFilter { return &RemoteFilter{ data: data, @@ -30,8 +32,8 @@ func (rf *RemoteFilter) GetFilter() *bloom.Ring { if rf.filter == nil { var err error - rf.filter, _ = bloom.InitByParameters(interfaces.BloomFilterSize, - interfaces.BloomFilterHashes) + rf.filter, _ = bloom.InitByParameters(BloomFilterSize, + BloomFilterHashes) err = rf.filter.UnmarshalBinary(rf.data.Filter) if err != nil { jww.FATAL.Panicf("Failed to properly unmarshal the bloom filter: %+v", err) diff --git a/network/remoteFilters_test.go b/network/remoteFilters_test.go index d9f410adbf049e126f29c805db0ba31977e75361..2fff7034d4e5de9e1ecff7f5be00df8356eaebda 100644 --- a/network/remoteFilters_test.go +++ b/network/remoteFilters_test.go @@ -10,7 +10,6 @@ package network import ( jww "github.com/spf13/jwalterweatherman" bloom "gitlab.com/elixxir/bloomfilter" - "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" @@ -43,8 +42,8 @@ func TestNewRemoteFilter(t *testing.T) { // Unit test GetFilter func TestRemoteFilter_GetFilter(t *testing.T) { - testFilter, err := bloom.InitByParameters(interfaces.BloomFilterSize, - interfaces.BloomFilterHashes) + testFilter, err := bloom.InitByParameters(BloomFilterSize, + BloomFilterHashes) if err != nil { t.Fatalf("GetFilter error: "+ "Cannot initialize bloom filter for setup: %v", err) diff --git a/network/rounds/manager.go b/network/rounds/manager.go index bc0315bcd5741c90b40614db14847e7d07510014..6cd1a1330342e6b5478258504e213003878467b6 100644 --- a/network/rounds/manager.go +++ b/network/rounds/manager.go @@ -9,14 +9,12 @@ package rounds import ( "gitlab.com/elixxir/client/interfaces" - "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/historical" "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/network/rounds/store" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" - "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/xx_network/primitives/id" "strconv" @@ -28,9 +26,9 @@ type Pickup interface { } type manager struct { - params params.Rounds + params Params sender gateway.Sender - session *storage.Session + session storage.Session comms MessageRetrievalComms @@ -46,10 +44,10 @@ type manager struct { unchecked *store.UncheckedRoundStore } -func NewPickup(params params.Rounds, bundles chan<- message.Bundle, +func NewPickup(params Params, bundles chan<- message.Bundle, sender gateway.Sender, historical historical.Retriever, rng *fastRNG.StreamGenerator, - instance RoundGetter, kv *versioned.KV) Pickup { - unchecked := store.NewOrLoadUncheckedStore(kv) + instance RoundGetter, session storage.Session) Pickup { + unchecked := store.NewOrLoadUncheckedStore(session.GetKV()) m := &manager{ params: params, lookupRoundMessages: make(chan roundLookup, params.LookupRoundsBufferLen), @@ -59,6 +57,7 @@ func NewPickup(params params.Rounds, bundles chan<- message.Bundle, rng: rng, instance: instance, unchecked: unchecked, + session: session, } return m @@ -68,8 +67,6 @@ func (m *manager) StartProcessors() stoppable.Stoppable { multi := stoppable.NewMulti("Rounds") - //start the historical rounds thread - //start the message retrieval worker pool for i := uint(0); i < m.params.NumMessageRetrievalWorkers; i++ { stopper := stoppable.NewSingle("Message Retriever " + strconv.Itoa(int(i))) diff --git a/network/rounds/params.go b/network/rounds/params.go new file mode 100644 index 0000000000000000000000000000000000000000..dbd9e3e6a3bd5ccd1b154e81471d2e6dc8b05282 --- /dev/null +++ b/network/rounds/params.go @@ -0,0 +1,44 @@ +package rounds + +import "time" + +type Params struct { + // Number of worker threads for retrieving messages from gateways + NumMessageRetrievalWorkers uint + + // Length of round lookup channel buffer + LookupRoundsBufferLen uint + + // Maximum number of times a historical round lookup will be attempted + MaxHistoricalRoundsRetries uint + + // Interval between checking for rounds in UncheckedRoundStore + // due for a message retrieval retry + UncheckRoundPeriod time.Duration + + // Toggles if message pickup retrying mechanism if forced + // by intentionally not looking up messages + ForceMessagePickupRetry bool + + // Duration to wait before sending on a round times out and a new round is + // tried + SendTimeout time.Duration + + //disables all attempts to pick up dropped or missed messages + RealtimeOnly bool + + // Toggles if historical rounds should always be used + ForceHistoricalRounds bool +} + +func GetDefaultParams() Params { + return Params{ + NumMessageRetrievalWorkers: 8, + LookupRoundsBufferLen: 2000, + MaxHistoricalRoundsRetries: 3, + UncheckRoundPeriod: 20 * time.Second, + ForceMessagePickupRetry: false, + SendTimeout: 3 * time.Second, + RealtimeOnly: false, + } +} diff --git a/network/sendCmix.go b/network/sendCmix.go index dd0ee8be3e760dccce61d5de80e36b1064260432..097247647bf1037cd599ca14cd0b249abf13777c 100644 --- a/network/sendCmix.go +++ b/network/sendCmix.go @@ -11,8 +11,7 @@ import ( "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/interfaces" - "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/nodes" "gitlab.com/elixxir/client/stoppable" @@ -36,8 +35,8 @@ import ( // Returns the round ID of the round the payload was sent or an error // if it fails. func (m *manager) SendCMIX(msg format.Message, - recipient *id.ID, cmixParams params.CMIX) (id.Round, ephemeral.Id, error) { - if !m.health.IsHealthy() { + recipient *id.ID, cmixParams CMIXParams) (id.Round, ephemeral.Id, error) { + if !m.Monitor.IsHealthy() { return 0, ephemeral.Id{}, errors.New("Cannot send cmix message when the " + "network is not healthy") } @@ -58,9 +57,9 @@ func (m *manager) SendCMIX(msg format.Message, // which can be registered with the network instance to get a callback on // its status func sendCmixHelper(sender gateway.Sender, msg format.Message, - recipient *id.ID, cmixParams params.CMIX, instance *network.Instance, + recipient *id.ID, cmixParams CMIXParams, instance *network.Instance, grp *cyclic.Group, nodes nodes.Registrar, - rng *fastRNG.StreamGenerator, events interfaces.EventManager, + rng *fastRNG.StreamGenerator, events event.Manager, senderId *id.ID, comms SendCmixCommsInterface) (id.Round, ephemeral.Id, error) { timeStart := netTime.Now() diff --git a/network/sendCmixUtils.go b/network/sendCmixUtils.go index 9785eeed7cd238597cc0b92ddc16dc81835b6d04..941ef57aa042cab07b922dcce63f1cc9f3465181 100644 --- a/network/sendCmixUtils.go +++ b/network/sendCmixUtils.go @@ -10,8 +10,6 @@ package network import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/interfaces/message" - "gitlab.com/elixxir/client/interfaces/params" preimage2 "gitlab.com/elixxir/client/interfaces/preimage" "gitlab.com/elixxir/client/network/nodes" pb "gitlab.com/elixxir/comms/mixmessages" @@ -114,7 +112,7 @@ func processRound(nodes nodes.Registrar, bestRound *pb.RoundInfo, // the recipient. func buildSlotMessage(msg format.Message, recipient *id.ID, target *id.ID, stream *fastRNG.Stream, senderId *id.ID, bestRound *pb.RoundInfo, - mixCrypt nodes.MixCypher, param params.CMIX) (*pb.GatewaySlot, + mixCrypt nodes.MixCypher, param CMIXParams) (*pb.GatewaySlot, format.Message, ephemeral.Id, error) { @@ -211,7 +209,7 @@ func handleMissingNodeKeys(instance *network.Instance, // string of comma seperated recipient IDs and a string of comma seperated // message digests. Duplicate recipient IDs are printed once. Intended for use // in printing to log. -func messageListToStrings(msgList []message.TargetedCmixMessage) (string, string) { +func messageListToStrings(msgList []TargetedCmixMessage) (string, string) { idStrings := make([]string, 0, len(msgList)) idMap := make(map[id.ID]bool, len(msgList)) msgDigests := make([]string, len(msgList)) diff --git a/network/sendCmix_test.go b/network/sendCmix_test.go index d5e56f2df9f80a6973ca0df92a48049010a8e87c..ee905d536a95f2e132777d904e1befc47f3cf45b 100644 --- a/network/sendCmix_test.go +++ b/network/sendCmix_test.go @@ -5,7 +5,6 @@ import ( "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/network/gateway" - "gitlab.com/elixxir/client/network/internal" message2 "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/switchboard" diff --git a/network/sendManyCmix.go b/network/sendManyCmix.go index 9c60a093c73cad4941a66a7875682cafe69a6285..ad2ee803ba9ec49f3a2dbb2a825f3028a58c96b9 100644 --- a/network/sendManyCmix.go +++ b/network/sendManyCmix.go @@ -11,9 +11,7 @@ import ( "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/interfaces" - "gitlab.com/elixxir/client/interfaces/message" - "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/nodes" "gitlab.com/elixxir/client/stoppable" @@ -32,12 +30,24 @@ import ( "time" ) +// TargetedCmixMessage defines a recipient target pair in a sendMany cMix +// message. +type TargetedCmixMessage struct { + Recipient *id.ID + Message format.Message +} + // SendManyCMIX sends many "raw" cMix message payloads to each of the provided // recipients. Used to send messages in group chats. Metadata is NOT protected // with this call and can leak data about yourself. Returns the round ID of the // round the payload was sent or an error if it fails. // WARNING: Potentially Unsafe -func (m *manager) SendManyCMIX(messages []message.TargetedCmixMessage, p params.CMIX) (id.Round, []ephemeral.Id, error) { +func (m *manager) SendManyCMIX(messages []TargetedCmixMessage, + p CMIXParams) (id.Round, []ephemeral.Id, error) { + if !m.Monitor.IsHealthy() { + return 0, []ephemeral.Id{}, errors.New("Cannot send cmix " + + "message when the network is not healthy") + } return sendManyCmixHelper(m.Sender, messages, p, m.instance, m.session.GetCmixGroup(), m.Registrar, m.rng, m.events, @@ -56,9 +66,9 @@ func (m *manager) SendManyCMIX(messages []message.TargetedCmixMessage, p params. // which can be registered with the network instance to get a callback on its // status. func sendManyCmixHelper(sender gateway.Sender, - msgs []message.TargetedCmixMessage, param params.CMIX, instance *network.Instance, + msgs []TargetedCmixMessage, param CMIXParams, instance *network.Instance, grp *cyclic.Group, registrar nodes.Registrar, - rng *fastRNG.StreamGenerator, events interfaces.EventManager, + rng *fastRNG.StreamGenerator, events event.Manager, senderId *id.ID, comms SendCmixCommsInterface) ( id.Round, []ephemeral.Id, error) { diff --git a/single/manager_test.go b/single/manager_test.go index 66568dfe5500d35723f4e693928750e27535b0a9..1af9467c086613d77adfc0aad4c6e71e6cd3b46c 100644 --- a/single/manager_test.go +++ b/single/manager_test.go @@ -11,6 +11,7 @@ import ( "bytes" "errors" "gitlab.com/elixxir/client/api" + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" @@ -339,7 +340,7 @@ func (tnm *testNetworkManager) GetInstance() *network.Instance { type dummyEventMgr struct{} func (d *dummyEventMgr) Report(p int, a, b, c string) {} -func (t *testNetworkManager) GetEventManager() interfaces.EventManager { +func (t *testNetworkManager) GetEventManager() event.Manager { return &dummyEventMgr{} } diff --git a/ud/utils_test.go b/ud/utils_test.go index fc224802e63a1b489013f7e07aac82293e249665..73a55fc597774f00f659fc5c6278306e08ae04ad 100644 --- a/ud/utils_test.go +++ b/ud/utils_test.go @@ -15,6 +15,7 @@ package ud import ( + "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" @@ -75,7 +76,7 @@ func (tnm *testNetworkManager) SendManyCMIX([]message.TargetedCmixMessage, param type dummyEventMgr struct{} func (d *dummyEventMgr) Report(int, string, string, string) {} -func (tnm *testNetworkManager) GetEventManager() interfaces.EventManager { +func (tnm *testNetworkManager) GetEventManager() event.Manager { return &dummyEventMgr{} }