diff --git a/api/client.go b/api/client.go index 6cb4786535df667e5232635a566e7e245f71a109..b1edca2841c9c349c1443cd6e1f903530265963b 100644 --- a/api/client.go +++ b/api/client.go @@ -63,6 +63,9 @@ type Client struct { followerServices *services clientErrorChannel chan interfaces.ClientError + + // Event reporting in event.go + events *eventManager } // NewClient creates client storage, generates keys, connects, and registers @@ -184,6 +187,7 @@ func OpenClient(storageDir string, password []byte, parameters params.Network) ( followerServices: newServices(), parameters: parameters, clientErrorChannel: make(chan interfaces.ClientError, 1000), + events: newEventManager(), } return c, nil @@ -237,8 +241,8 @@ func Login(storageDir string, password []byte, parameters params.Network) (*Clie } // Initialize network and link it to context - c.network, err = network.NewManager(c.storage, c.switchboard, c.rng, c.comms, - parameters, def) + c.network, err = network.NewManager(c.storage, c.switchboard, c.rng, + c.events, c.comms, parameters, def) if err != nil { return nil, err } @@ -297,8 +301,8 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, } // Initialize network and link it to context - c.network, err = network.NewManager(c.storage, c.switchboard, c.rng, c.comms, - parameters, def) + c.network, err = network.NewManager(c.storage, c.switchboard, c.rng, + c.events, c.comms, parameters, def) if err != nil { return nil, err } @@ -370,8 +374,13 @@ func (c *Client) registerFollower() error { } } + err := c.followerServices.add(c.events.eventService) + if err != nil { + return errors.WithMessage(err, "Couldn't start event reporting") + } + //register the core follower service - err := c.followerServices.add(func() (stoppable.Stoppable, error) { return c.network.Follow(cer) }) + err = c.followerServices.add(func() (stoppable.Stoppable, error) { return c.network.Follow(cer) }) if err != nil { return errors.WithMessage(err, "Failed to start following "+ "the network") diff --git a/api/event.go b/api/event.go new file mode 100644 index 0000000000000000000000000000000000000000..86e75b14ef212033ca906858e265c713022ee709 --- /dev/null +++ b/api/event.go @@ -0,0 +1,130 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package api + +import ( + "fmt" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/interfaces" + "gitlab.com/elixxir/client/stoppable" + "sync" +) + +// ReportableEvent is used to surface events to client users. +type reportableEvent struct { + Priority int + Category string + EventType string + Details string +} + +// String stringer interace implementation +func (e reportableEvent) String() string { + return fmt.Sprintf("Event(%d, %s, %s, %s)", e.Priority, e.Category, + e.EventType, e.Details) +} + +// Holds state for the event reporting system +type eventManager struct { + eventCh chan reportableEvent + eventCbs sync.Map +} + +func newEventManager() *eventManager { + return &eventManager{ + eventCh: make(chan reportableEvent, 1000), + } +} + +// Report reports an event from the client to api users, providing a +// priority, category, eventType, and details +func (e *eventManager) Report(priority int, category, evtType, details string) { + re := reportableEvent{ + Priority: priority, + Category: category, + EventType: evtType, + Details: details, + } + select { + case e.eventCh <- re: + jww.TRACE.Printf("Event reported: %s", re) + default: + jww.ERROR.Printf("Event Queue full, unable to report: %s", re) + } +} + +// RegisterEventCallback records the given function to receive +// ReportableEvent objects. It returns the internal index +// of the callback so that it can be deleted later. +func (e *eventManager) RegisterEventCallback(name string, + myFunc interfaces.EventCallbackFunction) error { + _, existsAlready := e.eventCbs.LoadOrStore(name, myFunc) + if existsAlready { + return errors.Errorf("Key %s already exists as event callback", + name) + } + return nil +} + +// UnregisterEventCallback deletes the callback identified by the +// index. It returns an error if it fails. +func (e *eventManager) UnregisterEventCallback(name string) { + e.eventCbs.Delete(name) +} + +func (e *eventManager) eventService() (stoppable.Stoppable, error) { + stop := stoppable.NewSingle("EventReporting") + go e.reportEventsHandler(stop) + return stop, nil +} + +// reportEventsHandler reports events to every registered event callback +func (e *eventManager) reportEventsHandler(stop *stoppable.Single) { + jww.DEBUG.Print("reportEventsHandler routine started") + for { + select { + case <-stop.Quit(): + jww.DEBUG.Printf("Stopping reportEventsHandler") + stop.ToStopped() + return + case evt := <-e.eventCh: + jww.DEBUG.Printf("Received event: %s", evt) + // NOTE: We could call each in a routine but decided + // against it. It's the users responsibility not to let + // the event queue explode. The API will report errors + // in the logging any time the event queue gets full. + e.eventCbs.Range(func(name, myFunc interface{}) bool { + f := myFunc.(interfaces.EventCallbackFunction) + f(evt.Priority, evt.Category, evt.EventType, + evt.Details) + return true + }) + } + } +} + +// ReportEvent reports an event from the client to api users, providing a +// priority, category, eventType, and details +func (c *Client) ReportEvent(priority int, category, evtType, details string) { + c.events.Report(priority, category, evtType, details) +} + +// RegisterEventCallback records the given function to receive +// ReportableEvent objects. It returns the internal index +// of the callback so that it can be deleted later. +func (c *Client) RegisterEventCallback(name string, + myFunc interfaces.EventCallbackFunction) error { + return c.events.RegisterEventCallback(name, myFunc) +} + +// UnregisterEventCallback deletes the callback identified by the +// index. It returns an error if it fails. +func (c *Client) UnregisterEventCallback(name string) { + c.events.UnregisterEventCallback(name) +} diff --git a/api/event_test.go b/api/event_test.go new file mode 100644 index 0000000000000000000000000000000000000000..775d2a6e6609009be6082cb29bdd73dba0a880ee --- /dev/null +++ b/api/event_test.go @@ -0,0 +1,83 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package api + +import ( + "testing" + "time" +) + +func TestEventReporting(t *testing.T) { + evts := make([]reportableEvent, 0) // used for convenience... + myCb := func(priority int, cat, ty, det string) { + evt := reportableEvent{ + Priority: priority, + Category: cat, + EventType: ty, + Details: det, + } + t.Logf("EVENT: %s", evt) + evts = append(evts, evt) + } + + evtMgr := newEventManager() + stop, _ := evtMgr.eventService() + // Register a callback + err := evtMgr.RegisterEventCallback("test", myCb) + if err != nil { + t.Errorf("TestEventReporting unexpected error: %+v", err) + } + + // Send a few events + evtMgr.Report(10, "Hi", "TypityType", "I'm an event") + evtMgr.Report(1, "Hi", "TypeII", "Type II errors are the worst") + evtMgr.Report(20, "Hi", "TypityType3", "eventy details") + evtMgr.Report(22, "Hi", "TypityType4", "I'm an event 2") + + time.Sleep(100 * time.Millisecond) + + if len(evts) != 4 { + t.Errorf("TestEventReporting: Got %d events, expected 4", + len(evts)) + } + + // Verify events are received + if evts[0].Priority != 10 { + t.Errorf("TestEventReporting: Expected priority 10, got: %s", + evts[0]) + } + if evts[1].Category != "Hi" { + t.Errorf("TestEventReporting: Expected cat Hi, got: %s", + evts[1]) + } + if evts[2].EventType != "TypityType3" { + t.Errorf("TestEventReporting: Expected TypeityType3, got: %s", + evts[2]) + } + if evts[3].Details != "I'm an event 2" { + t.Errorf("TestEventReporting: Expected event 2, got: %s", + evts[3]) + } + + // Delete callback + evtMgr.UnregisterEventCallback("test") + // Send more events + evtMgr.Report(10, "Hi", "TypityType", "I'm an event") + evtMgr.Report(1, "Hi", "TypeII", "Type II errors are the worst") + evtMgr.Report(20, "Hi", "TypityType3", "eventy details") + evtMgr.Report(22, "Hi", "TypityType4", "I'm an event 2") + + time.Sleep(100 * time.Millisecond) + + // Verify events are not received + if len(evts) != 4 { + t.Errorf("TestEventReporting: Got %d events, expected 4", + len(evts)) + } + stop.Close() +} diff --git a/api/utilsInterfaces_test.go b/api/utilsInterfaces_test.go index e421ffdc9aa20bd5c8ce89b1a7dea24432e7f9d9..0997993aebfabf2a0bd4fa4ca2efb9c269112a1f 100644 --- a/api/utilsInterfaces_test.go +++ b/api/utilsInterfaces_test.go @@ -83,6 +83,12 @@ type testNetworkManagerGeneric struct { instance *network.Instance sender *gateway.Sender } +type dummyEventMgr struct{} + +func (d *dummyEventMgr) Report(p int, a, b, c string) {} +func (t *testNetworkManagerGeneric) GetEventManager() interfaces.EventManager { + return &dummyEventMgr{} +} /* Below methods built for interface adherence */ func (t *testNetworkManagerGeneric) GetHealthTracker() interfaces.HealthTracker { diff --git a/auth/callback.go b/auth/callback.go index b713ce5abe73b1b7f0f7444b5fcf54e5cd74aa75..4a5fba05b7e74335fd71bac6685db01126ae2527 100644 --- a/auth/callback.go +++ b/auth/callback.go @@ -8,6 +8,7 @@ package auth import ( + "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces" @@ -128,8 +129,11 @@ func (m *Manager) handleRequest(cmixMsg format.Message, return } - jww.INFO.Printf("Received AuthRequest from %s,"+ + events := m.net.GetEventManager() + em := fmt.Sprintf("Received AuthRequest from %s,"+ " msgDigest: %s", partnerID, cmixMsg.Digest()) + jww.INFO.Print(em) + events.Report(1, "Auth", "RequestReceived", em) /*do state edge checks*/ // check if a relationship already exists. @@ -137,8 +141,10 @@ func (m *Manager) handleRequest(cmixMsg format.Message, // confirmation in case there are state issues. // do not store if _, err := m.storage.E2e().GetPartner(partnerID); err == nil { - jww.WARN.Printf("Received Auth request for %s, "+ + em := fmt.Sprintf("Received Auth request for %s, "+ "channel already exists. Ignoring", partnerID) + jww.WARN.Print(em) + events.Report(5, "Auth", "RequestIgnored", em) //exit return } else { @@ -146,17 +152,21 @@ func (m *Manager) handleRequest(cmixMsg format.Message, rType, sr2, _, err := m.storage.Auth().GetRequest(partnerID) if err != nil && !strings.Contains(err.Error(), auth.NoRequest) { // if another error is received, print it and exit - jww.WARN.Printf("Received new Auth request for %s, "+ + em := fmt.Sprintf("Received new Auth request for %s, "+ "internal lookup produced bad result: %+v", partnerID, err) + jww.ERROR.Print(em) + events.Report(10, "Auth", "RequestError", em) return } else { //handle the events where the relationship already exists switch rType { // if this is a duplicate, ignore the message case auth.Receive: - jww.WARN.Printf("Received new Auth request for %s, "+ + em := fmt.Sprintf("Received new Auth request for %s, "+ "is a duplicate", partnerID) + jww.WARN.Print(em) + events.Report(5, "Auth", "DuplicateRequest", em) return // if we sent a request, then automatically confirm // then exit, nothing else needed @@ -167,8 +177,11 @@ func (m *Manager) handleRequest(cmixMsg format.Message, // do the confirmation if err := m.doConfirm(sr2, grp, partnerPubKey, m.storage.E2e().GetDHPrivateKey(), sr2.GetPartnerHistoricalPubKey(), ecrFmt.GetOwnership()); err != nil { - jww.WARN.Printf("Auto Confirmation with %s failed: %s", + em := fmt.Sprintf("Auto Confirmation with %s failed: %s", partnerID, err) + jww.WARN.Print(em) + events.Report(10, "Auth", + "RequestError", em) } //exit return @@ -180,8 +193,10 @@ func (m *Manager) handleRequest(cmixMsg format.Message, facts, msg, err := fact.UnstringifyFactList( string(requestFmt.msgPayload)) if err != nil { - jww.WARN.Printf("failed to parse facts and message "+ + em := fmt.Sprintf("failed to parse facts and message "+ "from Auth Request: %s", err) + jww.WARN.Print(em) + events.Report(10, "Auth", "RequestError", em) return } @@ -197,8 +212,10 @@ func (m *Manager) handleRequest(cmixMsg format.Message, // crash occurs after the store but before the conclusion of the callback //create the auth storage if err = m.storage.Auth().AddReceived(c); err != nil { - jww.WARN.Printf("failed to store contact Auth "+ + em := fmt.Sprintf("failed to store contact Auth "+ "Request: %s", err) + jww.WARN.Print(em) + events.Report(10, "Auth", "RequestError", em) return } @@ -214,10 +231,14 @@ func (m *Manager) handleRequest(cmixMsg format.Message, func (m *Manager) handleConfirm(cmixMsg format.Message, sr *auth.SentRequest, grp *cyclic.Group) { + events := m.net.GetEventManager() + // check if relationship already exists if mgr, err := m.storage.E2e().GetPartner(sr.GetPartner()); mgr != nil || err == nil { - jww.WARN.Printf("Cannot confirm auth for %s, channel already "+ + em := fmt.Sprintf("Cannot confirm auth for %s, channel already "+ "exists.", sr.GetPartner()) + jww.WARN.Print(em) + events.Report(10, "Auth", "ConfirmError", em) m.storage.Auth().Done(sr.GetPartner()) return } @@ -225,7 +246,9 @@ func (m *Manager) handleConfirm(cmixMsg format.Message, sr *auth.SentRequest, // extract the message baseFmt, partnerPubKey, err := handleBaseFormat(cmixMsg, grp) if err != nil { - jww.WARN.Printf("Failed to handle auth confirm: %s", err) + em := fmt.Sprintf("Failed to handle auth confirm: %s", err) + jww.WARN.Print(em) + events.Report(10, "Auth", "ConfirmError", em) m.storage.Auth().Done(sr.GetPartner()) return } @@ -242,16 +265,20 @@ func (m *Manager) handleConfirm(cmixMsg format.Message, sr *auth.SentRequest, cmixMsg.GetMac(), grp) if !success { - jww.WARN.Printf("Received auth confirmation failed its mac " + + em := fmt.Sprintf("Received auth confirmation failed its mac " + "check") + jww.WARN.Print(em) + events.Report(10, "Auth", "ConfirmError", em) m.storage.Auth().Done(sr.GetPartner()) return } ecrFmt, err := unmarshalEcrFormat(payload) if err != nil { - jww.WARN.Printf("Failed to unmarshal auth confirmation's "+ + em := fmt.Sprintf("Failed to unmarshal auth confirmation's "+ "encrypted payload: %s", err) + jww.WARN.Print(em) + events.Report(10, "Auth", "ConfirmError", em) m.storage.Auth().Done(sr.GetPartner()) return } @@ -259,7 +286,9 @@ func (m *Manager) handleConfirm(cmixMsg format.Message, sr *auth.SentRequest, // finalize the confirmation if err := m.doConfirm(sr, grp, partnerPubKey, sr.GetMyPrivKey(), sr.GetPartnerHistoricalPubKey(), ecrFmt.GetOwnership()); err != nil { - jww.WARN.Printf("Confirmation failed: %s", err) + em := fmt.Sprintf("Confirmation failed: %s", err) + jww.WARN.Print(em) + events.Report(10, "Auth", "ConfirmError", em) m.storage.Auth().Done(sr.GetPartner()) return } diff --git a/auth/confirm.go b/auth/confirm.go index 30a51181c97f3e8bea4993eac82555152655dc7f..f13d3871d9669b52c66924cfb7e4f17bc89b080c 100644 --- a/auth/confirm.go +++ b/auth/confirm.go @@ -8,6 +8,7 @@ package auth import ( + "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces" @@ -100,13 +101,17 @@ func ConfirmRequestAuth(partner contact.Contact, rng io.Reader, // the second does not or the two occur and the storage into critical // messages does not occur + events := net.GetEventManager() + //create local relationship p := storage.E2e().GetE2ESessionParams() if err := storage.E2e().AddPartner(partner.ID, partner.DhPubKey, newPrivKey, p, p); err != nil { - jww.WARN.Printf("Failed to create channel with partner (%s) "+ + em := fmt.Sprintf("Failed to create channel with partner (%s) "+ "on confirmation, this is likley a replay: %s", partner.ID, err.Error()) + jww.WARN.Print(em) + events.Report(10, "Auth", "SendConfirmError", em) } // delete the in progress negotiation @@ -131,8 +136,10 @@ func ConfirmRequestAuth(partner contact.Contact, rng io.Reader, return 0, errors.WithMessage(err, "Auth Confirm Failed to transmit") } - jww.INFO.Printf("Confirm Request with %s (msgDigest: %s) sent on round %d", + em := fmt.Sprintf("Confirm Request with %s (msgDigest: %s) sent on round %d", partner.ID, cmixMsg.Digest(), round) + jww.INFO.Print(em) + events.Report(1, "Auth", "SendConfirm", em) return round, nil } diff --git a/auth/request.go b/auth/request.go index add36835bf19025abc00c8272b6a7bdd042634e1..1be24089150fd7ef7dcce0c6ffaaffb1324bab2e 100644 --- a/auth/request.go +++ b/auth/request.go @@ -8,6 +8,7 @@ package auth import ( + "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces" @@ -169,8 +170,10 @@ func RequestAuth(partner, me contact.Contact, message string, rng io.Reader, cmixMsg.Digest(), err) } - jww.INFO.Printf("Auth Request with %s (msgDigest: %s) sent"+ + em := fmt.Sprintf("Auth Request with %s (msgDigest: %s) sent"+ " on round %d", partner.ID, cmixMsg.Digest(), round) + jww.INFO.Print(em) + net.GetEventManager().Report(1, "Auth", "RequestSent", em) return round, nil } diff --git a/bindings/event.go b/bindings/event.go new file mode 100644 index 0000000000000000000000000000000000000000..7e28c72e025768a7dc454098e070d923c9832f2f --- /dev/null +++ b/bindings/event.go @@ -0,0 +1,26 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package bindings + +import ( + "gitlab.com/elixxir/client/interfaces" +) + +// RegisterEventCallback records the given function to receive +// ReportableEvent objects. It returns the internal index +// of the callback so that it can be deleted later. +func (c *Client) RegisterEventCallback(name string, + myFunc interfaces.EventCallbackFunction) error { + return c.api.RegisterEventCallback(name, myFunc) +} + +// UnregisterEventCallback deletes the callback identified by the +// index. It returns an error if it fails. +func (c *Client) UnregisterEventCallback(name string) { + c.api.UnregisterEventCallback(name) +} diff --git a/groupChat/utils_test.go b/groupChat/utils_test.go index 4997976c643c3d90de630309daee335c503c63b0..df7fc4e35b68bad6d9a57fb7e2c34ef6f15613c0 100644 --- a/groupChat/utils_test.go +++ b/groupChat/utils_test.go @@ -278,6 +278,13 @@ func (tnm *testNetworkManager) SendManyCMIX(messages map[id.ID]format.Message, _ return 0, nil, nil } +type dummyEventMgr struct{} + +func (d *dummyEventMgr) Report(p int, a, b, c string) {} +func (t *testNetworkManager) GetEventManager() interfaces.EventManager { + return &dummyEventMgr{} +} + func (tnm *testNetworkManager) GetInstance() *network.Instance { return tnm.instance } func (tnm *testNetworkManager) GetHealthTracker() interfaces.HealthTracker { return nil } func (tnm *testNetworkManager) Follow(interfaces.ClientErrorReport) (stoppable.Stoppable, error) { diff --git a/interfaces/event.go b/interfaces/event.go new file mode 100644 index 0000000000000000000000000000000000000000..f08ff547e637bf4b0fccde6d166d7e8cf21298d8 --- /dev/null +++ b/interfaces/event.go @@ -0,0 +1,16 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package interfaces + +// EventCallbackFunction defines the callback functions for client event reports +type EventCallbackFunction func(priority int, category, evtType, details string) + +// EventManager reporting api (used internally) +type EventManager interface { + Report(priority int, category, evtType, details string) +} diff --git a/interfaces/networkManager.go b/interfaces/networkManager.go index 3e40b9baea140a99fb615362ec50ee02c6a09f5d..072306fddc18f53c7f07299dcffb52e64cdce853 100644 --- a/interfaces/networkManager.go +++ b/interfaces/networkManager.go @@ -28,6 +28,7 @@ type NetworkManager interface { SendManyCMIX(messages map[id.ID]format.Message, p params.CMIX) (id.Round, []ephemeral.Id, error) GetInstance() *network.Instance GetHealthTracker() HealthTracker + GetEventManager() EventManager GetSender() *gateway.Sender Follow(report ClientErrorReport) (stoppable.Stoppable, error) CheckGarbledMessages() diff --git a/keyExchange/utils_test.go b/keyExchange/utils_test.go index d844f16b46bf90b026d6ede34b743432939e03c2..0a1b0206bc8e72bf3ca25b4dd17e4ce02a5937d1 100644 --- a/keyExchange/utils_test.go +++ b/keyExchange/utils_test.go @@ -94,6 +94,10 @@ func (t *testNetworkManagerGeneric) GetInstance() *network.Instance { } +func (t *testNetworkManagerGeneric) GetEventManager() interfaces.EventManager { + return &dummyEventMgr{} +} + func (t *testNetworkManagerGeneric) RegisterWithPermissioning(string) ([]byte, error) { return nil, nil } @@ -154,6 +158,12 @@ func InitTestingContextGeneric(i interface{}) (*storage.Session, interfaces.Netw type testNetworkManagerFullExchange struct { instance *network.Instance } +type dummyEventMgr struct{} + +func (d *dummyEventMgr) Report(p int, a, b, c string) {} +func (t *testNetworkManagerFullExchange) GetEventManager() interfaces.EventManager { + return &dummyEventMgr{} +} func (t *testNetworkManagerFullExchange) GetHealthTracker() interfaces.HealthTracker { return nil diff --git a/network/ephemeral/testutil.go b/network/ephemeral/testutil.go index c5cbd8b7c03861888392f9a4b8077b0cf49fb72d..1051e9c297d84a22436a08429f8f023deb953698 100644 --- a/network/ephemeral/testutil.go +++ b/network/ephemeral/testutil.go @@ -71,6 +71,13 @@ func (t *testNetworkManager) GetInstance() *network.Instance { return t.instance } +type dummyEventMgr struct{} + +func (d *dummyEventMgr) Report(p int, a, b, c string) {} +func (t *testNetworkManager) GetEventManager() interfaces.EventManager { + return &dummyEventMgr{} +} + func (t *testNetworkManager) GetHealthTracker() interfaces.HealthTracker { return nil } diff --git a/network/follow.go b/network/follow.go index b813bb0fb3607fb4c1ac14f4f9370ab2c16888bf..2082a6ee4b5417a7de66b4ebc76209deff9072b7 100644 --- a/network/follow.go +++ b/network/follow.go @@ -67,15 +67,26 @@ func (m *manager) followNetwork(report interfaces.ClientErrorReport, case <-TrackTicker.C: numPolls := atomic.SwapUint64(m.tracker, 0) if m.numLatencies != 0 { - latencyAvg := time.Nanosecond * time.Duration(m.latencySum/m.numLatencies) + latencyAvg := time.Nanosecond * time.Duration( + m.latencySum/m.numLatencies) m.latencySum, m.numLatencies = 0, 0 - jww.INFO.Printf("Polled the network %d times in the "+ - "last %s, with an average newest packet latency of %s", numPolls, - debugTrackPeriod, latencyAvg) + infoMsg := fmt.Sprintf("Polled the network "+ + "%d times in the last %s, with an "+ + "average newest packet latency of %s", + numPolls, debugTrackPeriod, latencyAvg) + + jww.INFO.Printf(infoMsg) + m.Internal.Events.Report(1, "Polling", + "MetricsWithLatency", infoMsg) } else { - jww.INFO.Printf("Polled the network %d times in the "+ - "last %s", numPolls, debugTrackPeriod) + infoMsg := fmt.Sprintf("Polled the network "+ + "%d times in the last %s", numPolls, + debugTrackPeriod) + + jww.INFO.Printf(infoMsg) + m.Internal.Events.Report(1, "Polling", + "Metrics", infoMsg) } } } @@ -133,7 +144,9 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, fmt.Sprintf("%+v", err), ) } - jww.ERROR.Printf("Unable to poll gateways: %+v", err) + errMsg := fmt.Sprintf("Unable to poll gateway: %+v", err) + m.Internal.Events.Report(10, "Polling", "Error", errMsg) + jww.ERROR.Printf(errMsg) return } diff --git a/network/internal/internal.go b/network/internal/internal.go index fc0d6aa429348b43469494b3136abc30c347e6da..8fe96d073d123e971f14f3a3985bb05c30c0ce10 100644 --- a/network/internal/internal.go +++ b/network/internal/internal.go @@ -8,6 +8,7 @@ package internal import ( + "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/network/health" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/switchboard" @@ -37,4 +38,7 @@ type Internal struct { //channels NodeRegistration chan network.NodeGateway + + // Event Reporting + Events interfaces.EventManager } diff --git a/network/manager.go b/network/manager.go index 5fcef26c34cac6d6193e840ccce5f87303d8446d..3ebc1d1d9cd84007e01bce843cb9afd11d09e24b 100644 --- a/network/manager.go +++ b/network/manager.go @@ -11,6 +11,7 @@ package network // and intraclient state are accessible through the context object. import ( + "fmt" "github.com/pkg/errors" "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/params" @@ -55,12 +56,16 @@ type manager struct { // Address space size addrSpace *ephemeral.AddressSpace + + // Event reporting api + events interfaces.EventManager } // NewManager builds a new reception manager object using inputted key fields func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, - rng *fastRNG.StreamGenerator, comms *client.Comms, - params params.Network, ndf *ndf.NetworkDefinition) (interfaces.NetworkManager, error) { + rng *fastRNG.StreamGenerator, events interfaces.EventManager, + comms *client.Comms, params params.Network, + ndf *ndf.NetworkDefinition) (interfaces.NetworkManager, error) { //start network instance instance, err := network.NewInstance(comms.ProtoComms, ndf, nil, nil, network.None, params.FastPolling) @@ -93,6 +98,7 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, Instance: instance, TransmissionID: session.User().GetCryptographicIdentity().GetTransmissionID(), ReceptionID: session.User().GetCryptographicIdentity().GetReceptionID(), + Events: events, } // Set up gateway.Sender @@ -105,6 +111,12 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, return nil, err } + // Report health events + m.Internal.Health.AddFunc(func(isHealthy bool) { + m.Internal.Events.Report(5, "Health", "IsHealthy", + fmt.Sprintf("%v", isHealthy)) + }) + //create sub managers m.message = message.NewManager(m.Internal, m.param.Messages, m.NodeRegistration, m.sender) m.round = rounds.NewManager(m.Internal, m.param.Rounds, m.message.GetMessageReceptionChannel(), m.sender) @@ -154,6 +166,11 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab return multi, nil } +// GetEventManager returns the health tracker +func (m *manager) GetEventManager() interfaces.EventManager { + return m.events +} + // GetHealthTracker returns the health tracker func (m *manager) GetHealthTracker() interfaces.HealthTracker { return m.Health diff --git a/network/message/handler.go b/network/message/handler.go index 060332e8cb982744e9925c539ac455a499bf2e18..08ced621adaddc0b74772e82dfd02862ece95b5b 100644 --- a/network/message/handler.go +++ b/network/message/handler.go @@ -8,6 +8,7 @@ package message import ( + "fmt" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/stoppable" @@ -76,8 +77,12 @@ func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) { //drop the message is decryption failed if err != nil { //if decryption failed, print an error - jww.WARN.Printf("Failed to decrypt message with fp %s "+ - "from partner %s: %s", key.Fingerprint(), sender, err) + msg := fmt.Sprintf("Failed to decrypt message with "+ + "fp %s from partner %s: %s", key.Fingerprint(), + sender, err) + jww.WARN.Printf(msg) + m.Internal.Events.Report(9, "MessageReception", + "DecryptionError", msg) return } //set the type as E2E encrypted @@ -103,15 +108,19 @@ func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) { RoundId: id.Round(bundle.RoundInfo.ID), RoundTimestamp: time.Unix(0, int64(bundle.RoundInfo.Timestamps[states.QUEUED])), } - jww.INFO.Printf("Garbled/RAW Message: keyFP: %v, msgDigest: %s", - msg.GetKeyFP(), msg.Digest()) + im := fmt.Sprintf("Garbled/RAW Message: keyFP: %v, "+ + "msgDigest: %s", msg.GetKeyFP(), msg.Digest()) + jww.INFO.Print(im) + m.Internal.Events.Report(1, "MessageReception", "Garbled", im) m.Session.GetGarbledMessages().Add(msg) m.Switchboard.Speak(raw) return } - jww.INFO.Printf("Received message of type %s from %s,"+ + im := fmt.Sprintf("Received message of type %s from %s,"+ " msgDigest: %s", encTy, sender, msgDigest) + jww.INFO.Print(im) + m.Internal.Events.Report(2, "MessageReception", "MessagePart", im) // Process the decrypted/unencrypted message partition, to see if // we get a full message @@ -127,9 +136,12 @@ func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) { xxMsg.RoundId = id.Round(bundle.RoundInfo.ID) xxMsg.RoundTimestamp = time.Unix(0, int64(bundle.RoundInfo.Timestamps[states.QUEUED])) if xxMsg.MessageType == message.Raw { - jww.WARN.Panicf("Recieved a message of type 'Raw' from %s."+ + rm := fmt.Sprintf("Recieved a message of type 'Raw' from %s."+ "Message Ignored, 'Raw' is a reserved type. Message supressed.", xxMsg.ID) + jww.WARN.Print(rm) + m.Internal.Events.Report(10, "MessageReception", + "Error", rm) } else { m.Switchboard.Speak(xxMsg) } diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 98b0759625f4a01fbd7eb1161a1b0f73f561af72..258483fee8f50f87ba5a023bc42ddf00ffa3e6cd 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -8,9 +8,11 @@ package message import ( + "fmt" "github.com/golang-collections/collections/set" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/stoppable" @@ -32,7 +34,8 @@ func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX, stop *stoppable.Single) (id.Round, ephemeral.Id, error) { msgCopy := msg.Copy() return sendCmixHelper(sender, msgCopy, recipient, param, m.Instance, - m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, m.Comms, stop) + m.Session, m.nodeRegistration, m.Rng, m.Internal.Events, + m.TransmissionID, m.Comms, stop) } // Helper function for sendCmix @@ -47,7 +50,8 @@ func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID, cmixParams params.CMIX, instance *network.Instance, session *storage.Session, nodeRegistration chan network.NodeGateway, - rng *fastRNG.StreamGenerator, senderId *id.ID, comms sendCmixCommsInterface, + rng *fastRNG.StreamGenerator, events interfaces.EventManager, + senderId *id.ID, comms sendCmixCommsInterface, stop *stoppable.Single) (id.Round, ephemeral.Id, error) { timeStart := netTime.Now() @@ -144,8 +148,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, // Return if it sends properly gwSlotResp := result.(*pb.GatewaySlotResponse) if gwSlotResp.Accepted { - jww.INFO.Printf("Successfully sent to EphID %v (source: %s) "+ - "in round %d (msgDigest: %s)", ephID.Int64(), recipient, bestRound.ID, msg.Digest()) + m := fmt.Sprintf("Successfully sent to EphID %v "+ + "(source: %s) in round %d (msgDigest: %s), "+ + "elapsed: %s numRoundTries: %d", ephID.Int64(), + recipient, bestRound.ID, msg.Digest(), + elapsed, numRoundTries) + jww.INFO.Print(m) + events.Report(1, "MessageSend", "Metric", m) return id.Round(bestRound.ID), ephID, nil } else { jww.FATAL.Panicf("Gateway %s returned no error, but failed "+ diff --git a/network/message/sendCmix_test.go b/network/message/sendCmix_test.go index 28acd56525e7393500819539561cda4f8160ba07..29180767d23e51450ccd094cfe59ff98136a176f 100644 --- a/network/message/sendCmix_test.go +++ b/network/message/sendCmix_test.go @@ -26,12 +26,18 @@ import ( "time" ) +type dummyEvent struct{} + +func (e *dummyEvent) Report(priority int, category, evtType, details string) {} + // Unit test func Test_attemptSendCmix(t *testing.T) { sess1 := storage.InitTestingSession(t) sess2 := storage.InitTestingSession(t) + events := &dummyEvent{} + sw := switchboard.New() l := TestListener{ ch: make(chan bool), @@ -113,7 +119,7 @@ func Test_attemptSendCmix(t *testing.T) { e2e.SetUnencrypted(msgCmix, m.Session.User().GetCryptographicIdentity().GetTransmissionID()) _, _, err = sendCmixHelper(sender, msgCmix, sess2.GetUser().ReceptionID, params.GetDefaultCMIX(), m.Instance, m.Session, m.nodeRegistration, - m.Rng, m.TransmissionID, &MockSendCMIXComms{t: t}, nil) + m.Rng, events, m.TransmissionID, &MockSendCMIXComms{t: t}, nil) if err != nil { t.Errorf("Failed to sendcmix: %+v", err) panic("t") diff --git a/network/rounds/historical.go b/network/rounds/historical.go index b4920335458283243fe7f4b77d4777bde2efb4e7..c3b9b4b9b4296ed776dfa7503f01b301ffc9e46d 100644 --- a/network/rounds/historical.go +++ b/network/rounds/historical.go @@ -8,6 +8,7 @@ package rounds import ( + "fmt" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage/reception" @@ -73,7 +74,7 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, stop *stop } // get new round to lookup and force a lookup if case r := <-m.historicalRounds: - jww.DEBUG.Printf("Recieved and quing round %d for "+ + jww.DEBUG.Printf("Received and queueing round %d for "+ "historical rounds lookup", r.rid) roundRequests = append(roundRequests, r) if len(roundRequests) > int(m.params.MaxHistoricalRounds) { @@ -97,9 +98,11 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, stop *stop Rounds: rounds, } + var gwHost *connect.Host result, err := m.sender.SendToAny(func(host *connect.Host) (interface{}, error) { jww.DEBUG.Printf("Requesting Historical rounds %v from "+ "gateway %s", rounds, host.GetId()) + gwHost = host return comm.RequestHistoricalRounds(host, hr) }, stop) @@ -113,29 +116,34 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, stop *stop } response := result.(*pb.HistoricalRoundsResponse) + rids := make([]uint64, 0) // process the returned historical roundRequests. for i, roundInfo := range response.Rounds { // The interface has missing returns returned as nil, such roundRequests // need be be removes as processing so the network follower will // pick them up in the future. if roundInfo == nil { + var errMsg string roundRequests[i].numAttempts++ if roundRequests[i].numAttempts == m.params.MaxHistoricalRoundsRetries { - jww.ERROR.Printf("Failed to retreive historical "+ + errMsg = fmt.Sprintf("Failed to retreive historical "+ "round %d on last attempt, will not try again", roundRequests[i].rid) } else { select { case m.historicalRounds <- roundRequests[i]: - jww.WARN.Printf("Failed to retreive historical "+ + errMsg = fmt.Sprintf("Failed to retreive historical "+ "round %d, will try up to %d more times", roundRequests[i].rid, m.params.MaxHistoricalRoundsRetries-roundRequests[i].numAttempts) default: - jww.WARN.Printf("Failed to retreive historical "+ + errMsg = fmt.Sprintf("Failed to retreive historical "+ "round %d, failed to try again, round will not be "+ "retreived", roundRequests[i].rid) } } + jww.WARN.Printf(errMsg) + m.Internal.Events.Report(5, "HistoricalRounds", + "Error", errMsg) continue } // Successfully retrieved roundRequests are sent to the Message @@ -145,8 +153,14 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, stop *stop identity: roundRequests[i].identity, } m.lookupRoundMessages <- rl + rids = append(rids, roundInfo.ID) } + m.Internal.Events.Report(1, "HistoricalRounds", "Metrics", + fmt.Sprintf("Received %d historical rounds from"+ + " gateway %s: %v", len(response.Rounds), gwHost, + rids)) + //clear the buffer now that all have been checked roundRequests = make([]historicalRoundRequest, 0) } diff --git a/single/manager_test.go b/single/manager_test.go index a787e7462870d0f3cda30a5a31b3a89a92c3a1f3..ac5dd4253820f4fa8f2f50d5cdba079f0c94147c 100644 --- a/single/manager_test.go +++ b/single/manager_test.go @@ -327,6 +327,13 @@ func (tnm *testNetworkManager) GetInstance() *network.Instance { return tnm.instance } +type dummyEventMgr struct{} + +func (d *dummyEventMgr) Report(p int, a, b, c string) {} +func (t *testNetworkManager) GetEventManager() interfaces.EventManager { + return &dummyEventMgr{} +} + func (tnm *testNetworkManager) GetHealthTracker() interfaces.HealthTracker { return nil } diff --git a/single/receiveResponse.go b/single/receiveResponse.go index 6a6fa94fddbc10f71a6e311fea944a71ea8f35eb..831b25b358196ddc4806991610b51a0548ab56eb 100644 --- a/single/receiveResponse.go +++ b/single/receiveResponse.go @@ -8,6 +8,7 @@ package single import ( + "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/message" @@ -37,8 +38,13 @@ func (m *Manager) receiveResponseHandler(rawMessages chan message.Receive, // Process CMIX message err := m.processesResponse(msg.RecipientID, msg.EphemeralID, msg.Payload) if err != nil { - jww.WARN.Printf("Failed to read single-use CMIX message "+ - "response: %+v", err) + em := fmt.Sprintf("Failed to read single-use "+ + "CMIX message response: %+v", err) + jww.WARN.Print(em) + if m.client != nil { + m.client.ReportEvent(9, "SingleUse", + "Error", em) + } } } } @@ -89,6 +95,11 @@ func (m *Manager) processesResponse(rid *id.ID, ephID ephemeral.Id, // Once all message parts have been received delete and close everything if collated { + if m.client != nil { + m.client.ReportEvent(1, "SingleUse", "MessageReceived", + fmt.Sprintf("Single use response received "+ + "from %s", rid)) + } jww.DEBUG.Print("Received all parts of single-use response message.") // Exit the timeout handler state.quitChan <- struct{}{} diff --git a/single/transmission.go b/single/transmission.go index a6ceed465f53aeca901521d02499a823c9312d8e..25cd73276fb76eb2226ce7f44d1140cc0b9134b7 100644 --- a/single/transmission.go +++ b/single/transmission.go @@ -113,7 +113,8 @@ func (m *Manager) transmitSingleUse(partner contact2.Contact, payload []byte, go func() { // Send Message - jww.DEBUG.Printf("Sending single-use transmission CMIX message to %s.", partner.ID) + jww.DEBUG.Printf("Sending single-use transmission CMIX "+ + "message to %s.", partner.ID) round, _, err := m.net.SendCMIX(cmixMsg, partner.ID, params.GetDefaultCMIX()) if err != nil { errorString := fmt.Sprintf("failed to send single-use transmission "+ @@ -144,8 +145,13 @@ func (m *Manager) transmitSingleUse(partner contact2.Contact, payload []byte, roundEvents.AddRoundEventChan(round, sendResults, roundEventTimeout, states.COMPLETED, states.FAILED) - jww.DEBUG.Printf("Sent single-use transmission CMIX message to %s and "+ - "ephemeral ID %d on round %d.", partner.ID, ephID.Int64(), round) + im := fmt.Sprintf("Sent single-use transmission CMIX "+ + "message to %s and ephemeral ID %d on round %d.", + partner.ID, ephID.Int64(), round) + jww.DEBUG.Print(im) + if m.client != nil { + m.client.ReportEvent(1, "SingleUse", "MessageSend", im) + } // Wait until the result tracking responds success, numRoundFail, numTimeOut := utility.TrackResults(sendResults, 1) diff --git a/ud/register.go b/ud/register.go index b46f99a09fd6e686c2293b35a54147d7292d58cb..8a25bafc01d150708136945a119de470b6e64e82 100644 --- a/ud/register.go +++ b/ud/register.go @@ -1,6 +1,7 @@ package ud import ( + "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" pb "gitlab.com/elixxir/comms/mixmessages" @@ -85,6 +86,11 @@ func (m *Manager) register(username string, comm registerUserComms) error { if err == nil { err = m.setRegistered() + if m.client != nil { + m.client.ReportEvent(1, "UserDiscovery", "Registration", + fmt.Sprintf("User Registered with UD: %+v", + user)) + } } return err diff --git a/ud/search.go b/ud/search.go index 86a19551b5af2c7bc95c0736845263e91d17da0e..41001234896bba755fd6f032afdb19df8a23b22b 100644 --- a/ud/search.go +++ b/ud/search.go @@ -1,6 +1,7 @@ package ud import ( + "fmt" "github.com/golang/protobuf/proto" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" @@ -50,11 +51,17 @@ func (m *Manager) Search(list fact.FactList, callback searchCallback, timeout ti return errors.WithMessage(err, "Failed to transmit search request.") } + if m.client != nil { + m.client.ReportEvent(1, "UserDiscovery", "SearchRequest", + fmt.Sprintf("Sent: %+v", request)) + } + return nil } func (m *Manager) searchResponseHandler(factMap map[string]fact.Fact, callback searchCallback, payload []byte, err error) { + if err != nil { go callback(nil, errors.WithMessage(err, "Failed to search.")) return @@ -66,6 +73,12 @@ func (m *Manager) searchResponseHandler(factMap map[string]fact.Fact, jww.WARN.Printf("Dropped a search response from user discovery due to "+ "failed unmarshal: %s", err) } + + if m.client != nil { + m.client.ReportEvent(1, "UserDiscovery", "SearchResponse", + fmt.Sprintf("Received: %+v", searchResponse)) + } + if searchResponse.Error != "" { err = errors.Errorf("User Discovery returned an error on search: %s", searchResponse.Error)