From 761bf39f9717ab12b26b3371652b4159822dbda6 Mon Sep 17 00:00:00 2001 From: Jono Wenger <jono@elixxir.io> Date: Tue, 29 Mar 2022 13:35:18 -0700 Subject: [PATCH] Fix comments and formatting in network --- network/check.go | 20 +-- network/cmixMessageBuffer.go | 63 +++++--- network/cmixMessageBuffer_test.go | 73 +++------ network/critical.go | 47 +++--- network/follow.go | 123 +++++++------- network/interface.go | 259 +++++++++++++++--------------- network/manager.go | 114 ++++++------- network/params.go | 85 +++++----- network/polltracker.go | 4 +- network/remoteFilters.go | 3 +- network/remoteFilters_test.go | 34 ++-- network/roundTracking.go | 24 +-- network/sendCmix.go | 181 +++++++++++---------- network/sendCmixUtils.go | 72 +++++---- network/sendManyCmix.go | 38 ++--- 15 files changed, 585 insertions(+), 555 deletions(-) diff --git a/network/check.go b/network/check.go index c92f537e0..fd84dc88a 100644 --- a/network/check.go +++ b/network/check.go @@ -6,17 +6,16 @@ import ( "gitlab.com/xx_network/primitives/id" ) -// Checker is a single use function which is meant to be wrapped -// and adhere to the knownRounds checker interface. it receives a round ID and -// looks up the state of that round to determine if the client has a message -// waiting in it. -// It will return true if it can conclusively determine no message exists, -// returning false and set the round to processing if it needs further +// Checker is a single use function that is meant to be wrapped and adhere to +// the knownRounds checker interface. It receives a round ID and looks up the +// state of that round to determine if the client has a message waiting in it. +// It will return true if it can conclusively determine no message exists, and +// it will return false and set the round to processing if it needs further // investigation. // Once it determines messages might be waiting in a round, it determines -// if the information about that round is already present, if it is the data is -// sent to Message Retrieval Workers, otherwise it is sent to Historical Round -// Retrieval +// if the information about that round is already present. If it is, then the +// data is sent to Message Retrieval Workers; otherwise, it is sent to +// Historical Round Retrieval // false: no message // true: message func Checker(roundID id.Round, filters []*RemoteFilter, cr *store.CheckedRounds) bool { @@ -25,7 +24,7 @@ func Checker(roundID id.Round, filters []*RemoteFilter, cr *store.CheckedRounds) return true } - //find filters that could have the round and check them + // Find filters that could have the round and check them serialRid := serializeRound(roundID) for _, filter := range filters { if filter != nil && filter.FirstRound() <= roundID && @@ -35,6 +34,7 @@ func Checker(roundID id.Round, filters []*RemoteFilter, cr *store.CheckedRounds) } } } + return false } diff --git a/network/cmixMessageBuffer.go b/network/cmixMessageBuffer.go index d70c22aba..f7b36ca13 100644 --- a/network/cmixMessageBuffer.go +++ b/network/cmixMessageBuffer.go @@ -30,7 +30,6 @@ type storedMessage struct { } func (sm storedMessage) Marshal() []byte { - data, err := json.Marshal(&sm) if err != nil { jww.FATAL.Panicf("Failed to marshal stored message: %s", err) @@ -41,7 +40,8 @@ func (sm storedMessage) Marshal() []byte { // SaveMessage saves the message as a versioned object at the specified key // in the key value store. -func (cmh *cmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key string) error { +func (cmh *cmixMessageHandler) SaveMessage( + kv *versioned.KV, m interface{}, key string) error { sm := m.(storedMessage) // Create versioned object @@ -58,7 +58,9 @@ func (cmh *cmixMessageHandler) SaveMessage(kv *versioned.KV, m interface{}, key // LoadMessage returns the message with the specified key from the key value // store. An empty message and error are returned if the message could not be // retrieved. -func (cmh *cmixMessageHandler) LoadMessage(kv *versioned.KV, key string) (interface{}, error) { +func (cmh *cmixMessageHandler) LoadMessage(kv *versioned.KV, key string) ( + interface{}, error) { + // Load the versioned object vo, err := kv.Get(key, currentCmixMessageVersion) if err != nil { @@ -92,13 +94,14 @@ func (cmh *cmixMessageHandler) HashMessage(m interface{}) utility.MessageHash { return messageHash } -// CmixMessageBuffer wraps the message buffer to store and load raw cmix +// CmixMessageBuffer wraps the message buffer to store and load raw cMix // messages. type CmixMessageBuffer struct { mb *utility.MessageBuffer } -func NewOrLoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, error) { +func NewOrLoadCmixMessageBuffer(kv *versioned.KV, key string) ( + *CmixMessageBuffer, error) { cmb, err := LoadCmixMessageBuffer(kv, key) if err != nil { @@ -106,6 +109,7 @@ func NewOrLoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffe if err != nil { return nil, err } + return &CmixMessageBuffer{mb: mb}, nil } @@ -121,24 +125,34 @@ func LoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, er return &CmixMessageBuffer{mb: mb}, nil } -func (cmb *CmixMessageBuffer) Add(msg format.Message, recipent *id.ID, params CMIXParams) { - paramBytes, _ := params.MarshalJSON() +func (cmb *CmixMessageBuffer) Add(msg format.Message, recipient *id.ID, params CMIXParams) { + paramBytes, err := json.Marshal(params) + if err != nil { + jww.FATAL.Panicf("Failed to JSON marshal CMIXParams: %+v", err) + } + sm := storedMessage{ Msg: msg.Marshal(), - Recipient: recipent.Marshal(), + Recipient: recipient.Marshal(), Params: paramBytes, } + cmb.mb.Add(sm) } -func (cmb *CmixMessageBuffer) AddProcessing(msg format.Message, recipent *id.ID, +func (cmb *CmixMessageBuffer) AddProcessing(msg format.Message, recipient *id.ID, params CMIXParams) { - paramBytes, _ := params.MarshalJSON() + paramBytes, err := json.Marshal(params) + if err != nil { + jww.FATAL.Panicf("Failed to JSON marshal CMIXParams: %+v", err) + } + sm := storedMessage{ Msg: msg.Marshal(), - Recipient: recipent.Marshal(), + Recipient: recipient.Marshal(), Params: paramBytes, } + cmb.mb.AddProcessing(sm) } @@ -151,39 +165,42 @@ func (cmb *CmixMessageBuffer) Next() (format.Message, *id.ID, CMIXParams, bool) sm := m.(storedMessage) msg, err := format.Unmarshal(sm.Msg) if err != nil { - jww.FATAL.Panicf("Could not unmarshal for stored cmix "+ - "message buffer: %+v", err) + jww.FATAL.Panicf( + "Could not unmarshal for stored cMix message buffer: %+v", err) } - recpient, err := id.Unmarshal(sm.Recipient) + + recipient, err := id.Unmarshal(sm.Recipient) if err != nil { - jww.FATAL.Panicf("Could not get an id for stored cmix "+ - "message buffer: %+v", err) + jww.FATAL.Panicf( + "Could not get an ID for stored cMix message buffer: %+v", err) } params := CMIXParams{} if sm.Params == nil || len(sm.Params) == 0 { params = GetDefaultCMIXParams() } else { - if err = params.UnmarshalJSON(sm.Params); err != nil { - jww.FATAL.Panicf("Could not parse the parms for stored CMIX "+ + if err = json.Unmarshal(sm.Params, ¶ms); err != nil { + jww.FATAL.Panicf("Could not parse the params for stored cMix "+ "message buffer: %+v", err) } } - return msg, recpient, params, true + return msg, recipient, params, true } -func (cmb *CmixMessageBuffer) Succeeded(msg format.Message, recipent *id.ID) { +func (cmb *CmixMessageBuffer) Succeeded(msg format.Message, recipient *id.ID) { sm := storedMessage{ Msg: msg.Marshal(), - Recipient: recipent.Marshal(), + Recipient: recipient.Marshal(), } + cmb.mb.Succeeded(sm) } -func (cmb *CmixMessageBuffer) Failed(msg format.Message, recipent *id.ID) { +func (cmb *CmixMessageBuffer) Failed(msg format.Message, recipient *id.ID) { sm := storedMessage{ Msg: msg.Marshal(), - Recipient: recipent.Marshal(), + Recipient: recipient.Marshal(), } + cmb.mb.Failed(sm) } diff --git a/network/cmixMessageBuffer_test.go b/network/cmixMessageBuffer_test.go index 4420255b6..43c97639b 100644 --- a/network/cmixMessageBuffer_test.go +++ b/network/cmixMessageBuffer_test.go @@ -20,8 +20,8 @@ import ( "testing" ) -// Test happy path of cmixMessageHandler.SaveMessage(). -func TestCmixMessageHandler_SaveMessage(t *testing.T) { +// Test happy path of cmixMessageHandler.SaveMessage. +func Test_cmixMessageHandler_SaveMessage(t *testing.T) { // Set up test values cmh := &cmixMessageHandler{} kv := versioned.NewKV(make(ekv.Memstore)) @@ -32,32 +32,31 @@ func TestCmixMessageHandler_SaveMessage(t *testing.T) { Msg: testMsgs[i].Marshal(), Recipient: ids[i].Marshal(), } + key := utility.MakeStoredMessageKey("testKey", cmh.HashMessage(msg)) // Save message err := cmh.SaveMessage(kv, msg, key) if err != nil { - t.Errorf("SaveMessage() returned an error."+ - "\n\texpected: %v\n\trecieved: %v", nil, err) + t.Errorf("SaveMessage returned an error: %+v", err) } // Try to get message obj, err := kv.Get(key, 0) if err != nil { - t.Errorf("get() returned an error: %v", err) + t.Errorf("Failed to get message: %v", err) } // Test if message retrieved matches expected if !bytes.Equal(msg.Marshal(), obj.Data) { - t.Errorf("SaveMessage() returned versioned object with incorrect data."+ - "\n\texpected: %v\n\treceived: %v", - msg, obj.Data) + t.Errorf("Failed to get expected message from storage."+ + "\nexpected: %v\nreceived: %v", msg, obj.Data) } } } -// Test happy path of cmixMessageHandler.LoadMessage(). -func TestCmixMessageHandler_LoadMessage(t *testing.T) { +// Test happy path of cmixMessageHandler.LoadMessage. +func Test_cmixMessageHandler_LoadMessage(t *testing.T) { // Set up test values cmh := &cmixMessageHandler{} kv := versioned.NewKV(make(ekv.Memstore)) @@ -72,98 +71,68 @@ func TestCmixMessageHandler_LoadMessage(t *testing.T) { // Save message if err := cmh.SaveMessage(kv, msg, key); err != nil { - t.Errorf("SaveMessage() returned an error: %v", err) + t.Errorf("Failed to save message to storage: %v", err) } // Try to load message testMsg, err := cmh.LoadMessage(kv, key) if err != nil { - t.Errorf("LoadMessage() returned an error."+ - "\n\texpected: %v\n\trecieved: %v", nil, err) + t.Errorf("LoadMessage returned an error: %v", err) } // Test if message loaded matches expected if !reflect.DeepEqual(msg, testMsg) { - t.Errorf("LoadMessage() returned an unexpected object."+ - "\n\texpected: %v\n\treceived: %v", - msg, testMsg) + t.Errorf("Failed to load expected message from storage."+ + "\nexpected: %v\nreceived: %v", msg, testMsg) } } } // Smoke test of cmixMessageHandler. -func TestCmixMessageBuffer_Smoke(t *testing.T) { +func Test_cmixMessageBuffer_Smoke(t *testing.T) { // Set up test messages testMsgs, ids, _ := makeTestCmixMessages(2) // Create new buffer cmb, err := NewOrLoadCmixMessageBuffer(versioned.NewKV(make(ekv.Memstore)), "testKey") if err != nil { - t.Errorf("NewCmixMessageBuffer() returned an error."+ - "\n\texpected: %v\n\trecieved: %v", nil, err) + t.Errorf("Failed to make new cmixMessageHandler: %+v", err) } // Add two messages cmb.Add(testMsgs[0], ids[0], GetDefaultCMIXParams()) cmb.Add(testMsgs[1], ids[1], GetDefaultCMIXParams()) - if cmb.mb.Len() != 2 { - t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 2, cmb.mb.Len()) - } - msg, rid, _, exists := cmb.Next() if !exists { - t.Error("Next() did not find any messages in buffer.") + t.Error("Next did not find any messages in buffer.") } cmb.Succeeded(msg, rid) - l := cmb.mb.Len() - if l != 1 { - t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 1, l) - } - msg, rid, _, exists = cmb.Next() if !exists { - t.Error("Next() did not find any messages in buffer.") + t.Error("Next did not find any messages in buffer.") } - l = cmb.mb.Len() - if l != 0 { - t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 0, l) - } cmb.Failed(msg, rid) - l = cmb.mb.Len() - if l != 1 { - t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 1, l) - } - msg, rid, _, exists = cmb.Next() if !exists { - t.Error("Next() did not find any messages in buffer.") + t.Error("Next did not find any messages in buffer.") } cmb.Succeeded(msg, rid) msg, rid, _, exists = cmb.Next() if exists { - t.Error("Next() found a message in the buffer when it should be empty.") - } - - l = cmb.mb.Len() - if l != 0 { - t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", - 0, l) + t.Error("Next found a message in the buffer when it should be empty.") } } // makeTestCmixMessages creates a list of messages with random data and the // expected map after they are added to the buffer. -func makeTestCmixMessages(n int) ([]format.Message, []*id.ID, map[utility.MessageHash]struct{}) { +func makeTestCmixMessages(n int) ( + []format.Message, []*id.ID, map[utility.MessageHash]struct{}) { cmh := &cmixMessageHandler{} prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) mh := map[utility.MessageHash]struct{}{} diff --git a/network/critical.go b/network/critical.go index 221ae508e..dde56c99d 100644 --- a/network/critical.go +++ b/network/critical.go @@ -16,20 +16,21 @@ import ( const criticalRawMessagesKey = "CriticalRawMessages" // roundEventRegistrar is an interface for the round events system to allow -// for easy testing +// for easy testing. type roundEventRegistrar interface { AddRoundEventChan(rid id.Round, eventChan chan ds.EventReturn, timeout time.Duration, validStates ...states.Round) *ds.EventCallback } -// criticalSender is an anonymous function which takes the data critical -// knows for sending. It should call sendCmixHelper and use scope sharing in an -// anonymous function to include the structures from manager which critical is -// not aware of +// criticalSender is an anonymous function that takes the data critical knows +// for sending. It should call sendCmixHelper and use scope sharing in an +// anonymous function to include the structures from manager that critical is +// not aware of. type criticalSender func(msg format.Message, recipient *id.ID, params CMIXParams) (id.Round, ephemeral.Id, error) -//Structure which allows the auto resending of messages that must be received +// critical is a structure that allows the auto resending of messages that must +// be received. type critical struct { *CmixMessageBuffer roundEvents roundEventRegistrar @@ -41,18 +42,19 @@ func newCritical(kv *versioned.KV, hm health.Monitor, roundEvents roundEventRegistrar, send criticalSender) *critical { cm, err := NewOrLoadCmixMessageBuffer(kv, criticalRawMessagesKey) if err != nil { - jww.FATAL.Panicf("Failed to load the buffer for critical "+ - "messages: %v", err) + jww.FATAL.Panicf( + "Failed to load the buffer for critical messages: %+v", err) } + c := &critical{ CmixMessageBuffer: cm, roundEvents: roundEvents, trigger: make(chan bool, 100), send: send, } - hm.AddHealthCallback(func(healthy bool) { - c.trigger <- healthy - }) + + hm.AddHealthCallback(func(healthy bool) { c.trigger <- healthy }) + return c } @@ -76,17 +78,17 @@ func (c *critical) handle(msg format.Message, recipient *id.ID, rid id.Round, rt } else { sendResults := make(chan ds.EventReturn, 1) - c.roundEvents.AddRoundEventChan(rid, sendResults, 1*time.Minute, - states.COMPLETED, states.FAILED) + c.roundEvents.AddRoundEventChan( + rid, sendResults, 1*time.Minute, states.COMPLETED, states.FAILED) success, numTimeOut, _ := TrackResults(sendResults, 1) if !success { if numTimeOut > 0 { - jww.ERROR.Printf("critical raw message resend to %s "+ + jww.ERROR.Printf("Critical raw message resend to %s "+ "(msgDigest: %s) on round %d failed to transmit due to "+ "timeout", recipient, msg.Digest(), rid) } else { - jww.ERROR.Printf("critical raw message resend to %s "+ + jww.ERROR.Printf("Critical raw message resend to %s "+ "(msgDigest: %s) on round %d failed to transmit due to "+ "send failure", recipient, msg.Digest(), rid) } @@ -95,16 +97,16 @@ func (c *critical) handle(msg format.Message, recipient *id.ID, rid id.Round, rt return } - jww.INFO.Printf("Successful resend of critical raw message "+ - "to %s (msgDigest: %s) on round %d", recipient, msg.Digest(), rid) + jww.INFO.Printf("Successful resend of critical raw message to %s "+ + "(msgDigest: %s) on round %d", recipient, msg.Digest(), rid) c.Succeeded(msg, recipient) } } -// evaluate tries to send every message in the critical messages and the raw critical -// messages buffer in parallel +// evaluate tries to send every message in the critical messages and the raw +// critical messages buffer in parallel. func (c *critical) evaluate(stop *stoppable.Single) { for msg, recipient, params, has := c.Next(); has; msg, recipient, params, has = c.Next() { localRid := recipient.DeepCopy() @@ -112,11 +114,12 @@ func (c *critical) evaluate(stop *stoppable.Single) { params.Stop = stop jww.INFO.Printf("Resending critical raw message to %s "+ "(msgDigest: %s)", recipient, msg.Digest()) - //send the message + + // Send the message round, _, err := c.send(msg, recipient, params) - //pass to the handler + + // Pass to the handler c.handle(msg, recipient, round, err) - //wait on the results to make sure the rounds were successful }(msg, localRid, params) } diff --git a/network/follow.go b/network/follow.go index d2f6802fa..958f0fa11 100644 --- a/network/follow.go +++ b/network/follow.go @@ -44,13 +44,14 @@ import ( const ( debugTrackPeriod = 1 * time.Minute - // Estimate the number of rounds per second in the network. Will need updated someday - // in order to correctly determine how far back to search rounds for messages - // as the network continues to grow, otherwise message drops occur. + // Estimate the number of rounds per second in the network. Will need + // updated someday in order to correctly determine how far back to search + // rounds for messages as the network continues to grow, otherwise message + // drops occur. estimatedRoundsPerSecond = 5 ) -//comms interface makes testing easier +// followNetworkComms is a comms interface to make testing easier. type followNetworkComms interface { GetHost(hostId *id.ID) (*connect.Host, bool) SendPoll(host *connect.Host, message *pb.GatewayPoll) (*pb.GatewayPollResponse, error) @@ -86,42 +87,38 @@ func (m *manager) followNetwork(report ClientErrorReport, m.latencySum/m.numLatencies) m.latencySum, m.numLatencies = 0, 0 - infoMsg := fmt.Sprintf("Polled the network "+ - "%d times in the last %s, with an "+ - "average newest packet latency of %s", + infoMsg := fmt.Sprintf("Polled the network %d times in the "+ + "last %s, with an average newest packet latency of %s", numPolls, debugTrackPeriod, latencyAvg) jww.INFO.Printf(infoMsg) - m.events.Report(1, "Polling", - "MetricsWithLatency", infoMsg) + m.events.Report(1, "Polling", "MetricsWithLatency", infoMsg) } else { - infoMsg := fmt.Sprintf("Polled the network "+ - "%d times in the last %s", numPolls, + infoMsg := fmt.Sprintf( + "Polled the network %d times in the last %s", numPolls, debugTrackPeriod) jww.INFO.Printf(infoMsg) - m.events.Report(1, "Polling", - "Metrics", infoMsg) + m.events.Report(1, "Polling", "Metrics", infoMsg) } } } } -// executes each iteration of the follower +// follow executes each iteration of the follower. func (m *manager) follow(report ClientErrorReport, rng csprng.Source, comms followNetworkComms, stop *stoppable.Single, abandon func(round id.Round)) { - //get the identity we will poll for + // Get the identity we will poll for identity, err := m.GetEphemeralIdentity(rng, m.Space.GetAddressSpaceWithoutWait()) if err != nil { - jww.FATAL.Panicf("Failed to get an identity, this should be "+ - "impossible: %+v", err) + jww.FATAL.Panicf( + "Failed to get an identity, this should be impossible: %+v", err) } - // While polling with a fake identity, it is necessary to have - // populated earliestRound data. However, as with fake identities - // we want the values to be randomly generated rather than based on - // actual state. + // While polling with a fake identity, it is necessary to have populated + // earliestRound data. However, as with fake identities, we want the values + // to be randomly generated rather than based on actual state. if identity.Fake { fakeEr := &store.EarliestRound{} fakeEr.Set(m.getFakeEarliestRound()) @@ -130,7 +127,7 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, atomic.AddUint64(m.tracker, 1) - // get client version for poll + // Get client version for poll version := m.session.GetClientVersion() // Poll network updates @@ -150,7 +147,8 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, result, err := m.SendToAny(func(host *connect.Host) (interface{}, error) { jww.DEBUG.Printf("Executing poll for %v(%s) range: %s-%s(%s) from %s", identity.EphId.Int64(), identity.Source, identity.StartValid, - identity.EndValid, identity.EndValid.Sub(identity.StartValid), host.GetId()) + identity.EndValid, identity.EndValid.Sub(identity.StartValid), + host.GetId()) return comms.SendPoll(host, &pollReq) }, stop) @@ -172,7 +170,7 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, } errMsg := fmt.Sprintf("Unable to poll gateway: %+v", err) m.events.Report(10, "Polling", "Error", errMsg) - jww.ERROR.Printf(errMsg) + jww.ERROR.Print(errMsg) return } @@ -187,8 +185,8 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, } // ---- Node Events ---- - // NOTE: this updates the structure, AND sends events over the nodes - // update channels about new and removed nodes + // NOTE: this updates the structure, AND sends events over the nodes update + // channels about new and removed nodes if pollResp.PartialNDF != nil { err = m.instance.UpdatePartialNdf(pollResp.PartialNDF) if err != nil { @@ -206,7 +204,8 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, m.UpdateAddressSpace(m.instance.GetPartialNdf().Get().AddressSpace[0].Size) } - // NOTE: this updates rounds and updates the tracking of the health of the network + // NOTE: this updates rounds and updates the tracking of the health of the + // network if pollResp.Updates != nil { // TODO: ClientErr needs to know the source of the error and it doesn't yet // Iterate over ClientErrors for each RoundUpdate @@ -235,13 +234,14 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, // FIXME: before keys are deleted update.State = uint32(states.FAILED) - //trigger a reregistration with the node + // trigger a reregistration with the node m.Registrar.TriggerNodeRegistration(nid) } } } - // Trigger RoundEvents for all polled updates, including modified rounds with ClientErrors + // Trigger RoundEvents for all polled updates, including modified rounds + // with ClientErrors err = m.instance.RoundUpdates(pollResp.Updates) if err != nil { jww.ERROR.Printf("%+v", err) @@ -266,7 +266,7 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, // ---- Identity Specific Round Processing ----- if identity.Fake { - jww.DEBUG.Printf("not processing result, identity.Fake == true") + jww.DEBUG.Printf("Not processing result, identity.Fake == true") return } @@ -276,15 +276,16 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, return } - //prepare the filter objects for processing + // Prepare the filter objects for processing filterList := make([]*RemoteFilter, 0, len(pollResp.Filters.Filters)) for i := range pollResp.Filters.Filters { if len(pollResp.Filters.Filters[i].Filter) != 0 { - filterList = append(filterList, NewRemoteFilter(pollResp.Filters.Filters[i])) + filterList = append(filterList, + NewRemoteFilter(pollResp.Filters.Filters[i])) } } - // check rounds using the round checker function which determines if there + // Check rounds using the round checker function, which determines if there // are messages waiting in rounds and then sends signals to the appropriate // handling threads roundChecker := func(rid id.Round) bool { @@ -295,8 +296,8 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, return hasMessage } - // move the earliest unknown round tracker forward to the earliest - // tracked round if it is behind + // Move the earliest unknown round tracker forward to the earliest tracked + // round if it is behind earliestTrackedRound := id.Round(pollResp.EarliestRound) m.SetFakeEarliestRound(earliestTrackedRound) updatedEarliestRound, old, _ := identity.ER.Set(earliestTrackedRound) @@ -304,21 +305,26 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, // If there was no registered rounds for the identity if old == 0 { lastCheckedRound := gwRoundsState.GetLastChecked() - // Approximate the earliest possible round that messages could be received on this ID - // by using an estimate of how many rounds the network runs per second + // Approximate the earliest possible round that messages could be + // received on this ID by using an estimate of how many rounds the + // network runs per second roundsDelta := uint(time.Now().Sub(identity.StartValid) / time.Second * estimatedRoundsPerSecond) if roundsDelta < m.param.KnownRoundsThreshold { roundsDelta = m.param.KnownRoundsThreshold } + if id.Round(roundsDelta) > lastCheckedRound { - // Handles edge case for new networks to prevent starting at negative rounds + // Handles edge case for new networks to prevent starting at + // negative rounds updatedEarliestRound = 1 } else { updatedEarliestRound = lastCheckedRound - id.Round(roundsDelta) earliestFilterRound := filterList[0].FirstRound() // Length of filterList always > 0 - // If the network appears to be moving faster than our estimate, causing - // earliestFilterRound to be lower, we will instead use the earliestFilterRound - // which will ensure messages are not dropped as long as contacted gateway has all data + + // If the network appears to be moving faster than our estimate, + // causing earliestFilterRound to be lower, we will instead use the + // earliestFilterRound, which will ensure messages are not dropped + // as long as contacted gateway has all data if updatedEarliestRound > earliestFilterRound { updatedEarliestRound = earliestFilterRound } @@ -326,21 +332,28 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, identity.ER.Set(updatedEarliestRound) } - // loop through all rounds the client does not know about and the gateway - // does, checking the bloom filter for the user to see if there are - // messages for the user (bloom not implemented yet) - //threshold is the earliest round that will not be excluded from earliest remaining - earliestRemaining, roundsWithMessages, roundsUnknown := gwRoundsState.RangeUnchecked(updatedEarliestRound, - m.param.KnownRoundsThreshold, roundChecker) + // Loop through all rounds the client does not know about and the gateway + // does, checking the bloom filter for the user to see if there are messages + // for the user (bloom not implemented yet) + // Threshold is the earliest round that will not be excluded from earliest + // remaining + earliestRemaining, roundsWithMessages, roundsUnknown := + gwRoundsState.RangeUnchecked( + updatedEarliestRound, m.param.KnownRoundsThreshold, roundChecker) + jww.DEBUG.Printf("Processed RangeUnchecked, Oldest: %d, firstUnchecked: %d, "+ - "last Checked: %d, threshold: %d, NewEarliestRemaning: %d, NumWithMessages: %d, "+ - "NumUnknown: %d", updatedEarliestRound, gwRoundsState.GetFirstUnchecked(), gwRoundsState.GetLastChecked(), - m.param.KnownRoundsThreshold, earliestRemaining, len(roundsWithMessages), len(roundsUnknown)) + "last Checked: %d, threshold: %d, NewEarliestRemaining: %d, NumWithMessages: %d, "+ + "NumUnknown: %d", updatedEarliestRound, + gwRoundsState.GetFirstUnchecked(), gwRoundsState.GetLastChecked(), + m.param.KnownRoundsThreshold, earliestRemaining, + len(roundsWithMessages), len(roundsUnknown)) _, _, changed := identity.ER.Set(earliestRemaining) if changed { - jww.TRACE.Printf("External returns of RangeUnchecked: %d, %v, %v", earliestRemaining, roundsWithMessages, roundsUnknown) - jww.DEBUG.Printf("New Earliest Remaining: %d, Gateways last checked: %d", earliestRemaining, gwRoundsState.GetLastChecked()) + jww.TRACE.Printf("External returns of RangeUnchecked: %d, %v, %v", + earliestRemaining, roundsWithMessages, roundsUnknown) + jww.DEBUG.Printf("New Earliest Remaining: %d, Gateways last checked: %d", + earliestRemaining, gwRoundsState.GetLastChecked()) } var roundsWithMessages2 []id.Round @@ -355,7 +368,7 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, } for _, rid := range roundsWithMessages { - //denote that the round has been looked at in the tracking store + // Denote that the round has been looked at in the tracking store if identity.CR.Check(rid) { m.GetMessagesFromRound(rid, identity.EphemeralIdentity) } @@ -377,7 +390,9 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, if uint(earliestRemaining-updatedEarliestRound) > m.param.KnownRoundsThreshold { trackingStart = earliestRemaining - id.Round(m.param.KnownRoundsThreshold) } + jww.DEBUG.Printf("Rounds tracked: %v to %v", trackingStart, earliestRemaining) + for i := trackingStart; i <= earliestRemaining; i++ { state := Unchecked for _, rid := range roundsWithMessages { @@ -405,7 +420,7 @@ func (m *manager) follow(report ClientErrorReport, rng csprng.Source, func (m *manager) getFakeEarliestRound() id.Round { b, err := csprng.Generate(8, rand.Reader) if err != nil { - jww.FATAL.Panicf("Could not get random number: %v", err) + jww.FATAL.Panicf("Could not get random number: %+v", err) } rangeVal := binary.LittleEndian.Uint64(b) % 800 diff --git a/network/interface.go b/network/interface.go index 1d2300238..3d45710cd 100644 --- a/network/interface.go +++ b/network/interface.go @@ -16,58 +16,60 @@ import ( type Manager interface { // Follow starts the tracking of the network in a new thread. // Errors that occur are reported on the ClientErrorReport function if - // passed. The returned stopable can be used to stop the follower. + // passed. The returned stoppable can be used to stop the follower. // Only one follower may run at a time. Follow(report ClientErrorReport) (stoppable.Stoppable, error) - /*===Sending==============================================================*/ + /* === Sending ========================================================== */ - // GetMaxMessageLength returns the max message size for the current network + // GetMaxMessageLength returns the max message size for the current network. GetMaxMessageLength() int - // SendCMIX sends a "raw" CMIX message payload to the provided recipient. - // Returns the round ID of the round the payload was sent or an error - // if it fails. - // This does not have end to end encryption on it and is used exclusively as a - // send for higher order cryptographic protocols. Do not use unless implementing - // a protocol on top. - // recipient - cMix ID of the recipient - // fingerprint - Key Fingerprint. 256 bit field to store a 255 bit + // SendCMIX sends a "raw" cMix message payload to the provided recipient. + // Returns the round ID of the round the payload was sent or an error if it + // fails. + // This does not have end-to-end encryption on it and is used exclusively as + // a send for higher order cryptographic protocols. Do not use unless + // implementing a protocol on top. + // recipient - cMix ID of the recipient. + // fingerprint - Key Fingerprint. 256-bit field to store a 255-bit // fingerprint, highest order bit must be 0 (panic otherwise). If your // system does not use key fingerprints, this must be random bits. // service - Reception Service. The backup way for a client to identify // messages on receipt via trial hashing and to identify notifications. - // If unused, use messages.RandomService to fill the field with random data + // If unused, use message.GetRandomService to fill the field with + // random data. // payload - Contents of the message. Cannot exceed the payload size for a // cMix message (panic otherwise). - // mac - 256 bit field to store a 255 bit mac, highest order bit must be 0 + // mac - 256-bit field to store a 255-bit mac, highest order bit must be 0 // (panic otherwise). If used, fill with random bits. // Will return an error if the network is unhealthy or if it fails to send - // (along with the reason). Blocks until successful send or err. - // WARNING: Do not roll your own crypto + // (along with the reason). Blocks until successful sends or errors. + // WARNING: Do not roll your own crypto. SendCMIX(recipient *id.ID, fingerprint format.Fingerprint, service message.Service, payload, mac []byte, cmixParams CMIXParams) ( id.Round, ephemeral.Id, error) - // SendManyCMIX sends many "raw" CMIX message payloads to the provided + // SendManyCMIX sends many "raw" cMix message payloads to the provided // recipients all in the same round. - // Returns the round ID of the round the payloads was sent or an error - // if it fails. - // This does not have end to end encryption on it and is used exclusively as a - // send for higher order cryptographic protocols. Do not use unless implementing - // a protocol on top. - // Due to sending multiple payloads, this leaks more metadata than a standard - // cmix send and should be in general avoided. - // recipient - cMix ID of the recipient - // fingerprint - Key Fingerprint. 256 bit field to store a 255 bit + // Returns the round ID of the round the payloads was sent or an error if it + // fails. + // This does not have end-to-end encryption on it and is used exclusively as + // a send for higher order cryptographic protocols. Do not use unless + // implementing a protocol on top. + // Due to sending multiple payloads, this leaks more metadata than a + // standard cMix send and should be in general avoided. + // recipient - cMix ID of the recipient. + // fingerprint - Key Fingerprint. 256-bit field to store a 255-bit // fingerprint, highest order bit must be 0 (panic otherwise). If your // system does not use key fingerprints, this must be random bits. // service - Reception Service. The backup way for a client to identify // messages on receipt via trial hashing and to identify notifications. - // If unused, use messages.RandomService to fill the field with random data + // If unused, use message.GetRandomService to fill the field with + // random data. // payload - Contents of the message. Cannot exceed the payload size for a // cMix message (panic otherwise). - // mac - 256 bit field to store a 255 bit mac, highest order bit must be 0 + // mac - 256-bit field to store a 255-bit mac, highest order bit must be 0 // (panic otherwise). If used, fill with random bits. // Will return an error if the network is unhealthy or if it fails to send // (along with the reason). Blocks until successful send or err. @@ -75,152 +77,151 @@ type Manager interface { SendManyCMIX(messages []TargetedCmixMessage, p CMIXParams) ( id.Round, []ephemeral.Id, error) - /*===Message Reception====================================================*/ - /* Identities are all network identites which the client is currently - trying to pick up message on. An identity must be added - to receive messages, fake ones will be used to poll the network - if none are present. On creation of the network handler, the identity in - session storage will be automatically added*/ + /* === Message Reception ================================================ */ + /* Identities are all network identities which the client is currently + trying to pick up message on. An identity must be added to receive + messages, fake ones will be used to poll the network if none are present. + On creation of the network handler, the identity in session storage will + be automatically added. */ - // AddIdentity adds an identity to be tracked - // If persistent is false, the identity will not be stored to disk and - // will be dropped on reload. + // AddIdentity adds an identity to be tracked. If persistent is false, + // the identity will not be stored to disk and will be dropped on reload. AddIdentity(id *id.ID, validUntil time.Time, persistent bool) + // RemoveIdentity removes a currently tracked identity. RemoveIdentity(id *id.ID) - /* Fingerprints are the primary mechanism of identifying a picked up - message over cMix. They are a unique one time use 255 bit vector generally - associated with a specific encryption key, but can be used for an - alternative protocol.When registering a fingerprint, a MessageProcessor - is registered to handle the message.*/ + /* Fingerprints are the primary mechanism of identifying a picked up message + over cMix. They are a unique one time use a 255-bit vector generally + associated with a specific encryption key, but can be used for an + alternative protocol. When registering a fingerprint, a message.Processor + is registered to handle the message. */ - // AddFingerprint - Adds a fingerprint which will be handled by a - // specific processor for messages received by the given identity - // If a nil identity is passed, it will automatically use the default - // identity in the session + // AddFingerprint adds a fingerprint that will be handled by a specific + // processor for messages received by the given identity. If a nil identity + // is passed, it will automatically use the default identity in the session. AddFingerprint(identity *id.ID, fingerprint format.Fingerprint, mp message.Processor) error // DeleteFingerprint deletes a single fingerprint associated with the given - // identity if it exists - // If a nil identity is passed, it will automatically use the default - // identity in the session + // identity, if it exists. If a nil identity is passed, it will + // automatically use the default identity in the session. DeleteFingerprint(identity *id.ID, fingerprint format.Fingerprint) - // DeleteClientFingerprints deletes al fingerprint associated with the given - // identity if it exists - // A sepecific identity must be supplied, a nil identity will result in a - // panic + // DeleteClientFingerprints deletes all fingerprint associated with the + // given identity, if it exists. A specific identity must be supplied; a + // nil identity will result in a panic. DeleteClientFingerprints(identity *id.ID) - /* Service - predefined hash based tags appended to all cMix messages - which, though trial hashing, are used to determine if a message applies - to this client + /* Service - predefined hash based tags appended to all cMix messages that, + though trial hashing, are used to determine if a message applies to this + client. - Services are used for 2 purposes - They can be processed by the - notifications system, or can be used to implement custom non fingerprint - processing of payloads. I.E. key negotiation, broadcast negotiation + Services are used for 2 purposes: they can be processed by the + notifications system, or they can be used to implement custom non- + fingerprint processing of payloads. i.e. key negotiation, broadcast + negotiation. - A tag is appended to the message of the format tag = H(H(messageContents), - preimage) and trial hashing is used to determine if a message adheres to a - tag. - WARNING: If a preimage is known by an adversary, they can determine which - messages are for the client on reception (which is normally hidden due to - collision between ephemeral IDs. + A tag is appended to the message of the format tag = H(H(messageContents), + preimage) and trial hashing is used to determine if a message adheres to + a tag. + WARNING: If a preimage is known by an adversary, they can determine which + messages are for the client on reception (which is normally hidden due to + collision between ephemeral IDs). - Due to the extra overhead of trial hashing, services are processed after - fingerprints. If a fingerprint match occurs on the message, services will - not be handled. + Due to the extra overhead of trial hashing, services are processed after + fingerprints. If a fingerprint match occurs on the message, services will + not be handled. - Services are address to the session. When starting a new client, all - services must be re-added before StartNetworkFollower is called. + Services are address to the session. When starting a new client, all + services must be re-added before StartNetworkFollower is called. */ - // AddService adds a service which can call a message handing function or be - // used for notifications. In general a single service can only be registered - // for the same identifier/tag pair. - // preimage - the preimage which is triggered on - // type - a descriptive string of the service. Generally used in notifications - // source - a byte buffer of related data. Generally used in notifications. + // AddService adds a service that can call a message handing function or be + // used for notifications. In general, a single service can only be + // registered for the same identifier/tag pair. + // preimage - The preimage that is triggered on. + // type - A descriptive string of the service. Generally used in + // notifications. + // source - A byte buffer of related data. Generally used in notifications. // Example: Sender ID - AddService(AddService *id.ID, newService message.Service, response message.Processor) + AddService(AddService *id.ID, newService message.Service, + response message.Processor) - // DeleteService - If only a single response is associated with the preimage, - // the entire preimage is removed. If there is more than one response, only the - // given response is removed. If nil is passed in for response, all triggers for - // the preimage will be removed. - DeleteService(clientID *id.ID, toDelete message.Service, processor message.Processor) + // DeleteService deletes a message service. If only a single response is + // associated with the preimage, the entire preimage is removed. If there is + // more than one response, only the given response is removed. If nil is + // passed in for response, all triggers for the preimage will be removed. + DeleteService(clientID *id.ID, toDelete message.Service, + processor message.Processor) // DeleteClientService deletes the mapping associated with an ID. DeleteClientService(clientID *id.ID) - // TrackServices - Registers a callback which will get called every time a - // service is added or removed. - // It will receive the triggers list every time it is modified. - // Will only get callbacks while the Network Follower is running. - // Multiple trackTriggers can be registered + // TrackServices registers a callback that will get called every time a + // service is added or removed. It will receive the triggers list every time + // it is modified. It will only get callbacks while the network follower is + // running. Multiple trackTriggers can be registered TrackServices(tracker message.ServicesTracker) - /* In inProcess */ - // it is possible to receive a message over cMix before the fingerprints or - // triggers are registered. As a result, when handling fails, messages are - // put in the inProcess que for a set number of retries. + /* === In inProcess ===================================================== */ + /* It is possible to receive a message over cMix before the fingerprints or + triggers are registered. As a result, when handling fails, messages are + put in the inProcess que for a set number of retries. */ - // CheckInProgressMessages - retry processing all messages in check in - // progress messages. Call this after adding fingerprints or triggers - //while the follower is running. + // CheckInProgressMessages retries processing all messages in check in + // progress messages. Call this after adding fingerprints or triggers while + // the follower is running. CheckInProgressMessages() - /*===Health Monitor=======================================================*/ - // The health monitor is a system which tracks if the client sees a live - // network. It can either be polled or set up with events + /* === Health Monitor =================================================== */ + /* The health monitor is a system that tracks if the client sees a live + network. It can either be polled or set up with events. */ - // IsHealthy Returns true if currently healthy + // IsHealthy returns true if currently healthy. IsHealthy() bool - // WasHealthy returns true if the network has ever been healthy in this run + // WasHealthy returns true if the network has ever been healthy in this run. WasHealthy() bool - // AddHealthCallback - adds a callback which gets called whenever the heal - // changes. Returns a registration ID which can be used to unregister + // AddHealthCallback adds a callback that gets called whenever the network + // health changes. Returns a registration ID that can be used to unregister. AddHealthCallback(f func(bool)) uint64 - // RemoveHealthCallback - Removes a health callback using its - // registration ID + // RemoveHealthCallback removes a health callback using its registration ID. RemoveHealthCallback(uint64) - /*===Nodes================================================================*/ - /* Keys must be registed with nodes in order to send messages throug them. - this process is in general automatically handled by the Network Manager*/ + /* === Nodes ============================================================ */ + /* Keys must be registered with nodes in order to send messages through + them. This process is, in general, automatically handled by the Network + Manager. */ // HasNode can be used to determine if a keying relationship exists with a // node. HasNode(nid *id.ID) bool - // NumRegisteredNodes Returns the total number of nodes we have a keying - // relationship with + // NumRegisteredNodes returns the total number of nodes we have a keying + // relationship with. NumRegisteredNodes() int - // TriggerNodeRegistration Triggers the negotiation of a keying - // relationship with a given node + // TriggerNodeRegistration triggers the negotiation of a keying relationship + // with a given node. TriggerNodeRegistration(nid *id.ID) - /*===Historical Rounds====================================================*/ + /* === Historical Rounds ================================================ */ /* A complete set of round info is not kept on the client, and sometimes - the network will need to be queried to get round info. Historical rounds - is the system internal to the Network Manager to do this. - It can be used externally as well.*/ + the network will need to be queried to get round info. Historical rounds + is the system internal to the Network Manager to do this. It can be used + externally as well. */ - // LookupHistoricalRound - looks up the passed historical round on the - // network + // LookupHistoricalRound looks up the passed historical round on the network. LookupHistoricalRound(rid id.Round, callback historical.RoundResultCallback) error - /*===Sender===============================================================*/ + /* === Sender =========================================================== */ /* The sender handles sending comms to the network. It tracks connections to - gateways and handles proxying to gateways for targeted comms. It can be - used externally to contact gateway directly, bypassing the majority of - the network package*/ + gateways and handles proxying to gateways for targeted comms. It can be + used externally to contact gateway directly, bypassing the majority of + the network package. */ // SendToAny can be used to send the comm to any gateway in the network. SendToAny(sendFunc func(host *connect.Host) (interface{}, error), @@ -235,16 +236,16 @@ type Manager interface { // before connecting. SetGatewayFilter(f gateway.Filter) - // GetHostParams - returns the host params used when connectign to gateways + // GetHostParams returns the host params used when connecting to gateways. GetHostParams() connect.HostParams - /*===Address Space========================================================*/ - // The network compasses identities into a smaller address space to cause - // collisions and hide the actual recipient of messages. These functions - // allow for the tracking of this addresses space. In general, address space - // issues are completely handled by the network package + /* === Address Space ==================================================== */ + /* The network compasses identities into a smaller address space to cause + collisions and hide the actual recipient of messages. These functions + allow for the tracking of this addresses space. In general, address space + issues are completely handled by the network package. */ - // GetAddressSpace GetAddressSize returns the current address size of IDs. Blocks until an + // GetAddressSpace returns the current address size of IDs. Blocks until an // address size is known. GetAddressSpace() uint8 @@ -257,13 +258,13 @@ type Manager interface { // updates on the channel with the specified tag. UnregisterAddressSpaceNotification(tag string) - /*===Accessors============================================================*/ + /* === Accessors ======================================================== */ // GetInstance returns the network instance object, which tracks the - // state of the network + // state of the network. GetInstance() *network.Instance - // GetVerboseRounds returns stringification of verbose round info + // GetVerboseRounds returns stringification of verbose round info. GetVerboseRounds() string } diff --git a/network/manager.go b/network/manager.go index 555b60105..441d90390 100644 --- a/network/manager.go +++ b/network/manager.go @@ -8,10 +8,9 @@ package network // tracker.go controls access to network resources. Interprocess communications -// and intraclient state are accessible through the context object. +// and intra-client state are accessible through the context object. import ( - "fmt" "github.com/pkg/errors" "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/network/address" @@ -32,36 +31,36 @@ import ( "gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/ndf" "math" + "strconv" "sync/atomic" "time" ) -// fakeIdentityRange indicates the range generated between -// 0 (most current) and fakeIdentityRange rounds behind the earliest known -// round that will be used as the earliest round when polling with a -// fake identity. +// fakeIdentityRange indicates the range generated between 0 (most current) and +// fakeIdentityRange rounds behind the earliest known round that will be used as +// the earliest round when polling with a fake identity. const fakeIdentityRange = 800 -// manager implements the NetworkManager interface inside context. It -// controls access to network resources and implements all the communications -// functions used by the client. -// CRITICAL: Manager must be private. It embeds sub moduals which -// export functions for it, but not for public consumption. By being private -// and returning ass the public interface, these can be kept private. +// manager implements the Manager interface inside context. It controls access +// to network resources and implements all the communications functions used by +// the client. +// CRITICAL: Manager must be private. It embeds submodules that export functions +// for it, but not for public consumption. By being private and returning as the +// public interface, these can be kept private. type manager struct { - //User Identity Storage + // User Identity Storage session storage.Session - //generic RNG for client + // Generic RNG for client rng *fastRNG.StreamGenerator - // comms pointer to send/recv messages + // Comms pointer to send/receive messages comms *client.Comms - //contains the network instance + // Contains the network instance instance *commNetwork.Instance - // parameters of the network + // Parameters of the network param Params - //sub-managers + // Sub-managers gateway.Sender message.Handler nodes.Registrar @@ -75,36 +74,38 @@ type manager struct { // Earliest tracked round earliestRound *uint64 - //number of polls done in a period of time + // Number of polls done in a period of time tracker *uint64 latencySum uint64 numLatencies uint64 verboseRounds *RoundTracker - // Event reporting api + // Event reporting API events event.Manager - //storage of the max message length + // Storage of the max message length maxMsgLen int } -// NewManager builds a new reception manager object using inputted key fields +// NewManager builds a new reception manager object using inputted key fields. func NewManager(params Params, comms *client.Comms, session storage.Session, - ndf *ndf.NetworkDefinition, rng *fastRNG.StreamGenerator, events event.Manager, -) (Manager, error) { + ndf *ndf.NetworkDefinition, rng *fastRNG.StreamGenerator, + events event.Manager) (Manager, error) { - //start network instance - instance, err := commNetwork.NewInstance(comms.ProtoComms, ndf, nil, nil, commNetwork.None, params.FastPolling) + // Start network instance + instance, err := commNetwork.NewInstance( + comms.ProtoComms, ndf, nil, nil, commNetwork.None, params.FastPolling) if err != nil { - return nil, errors.WithMessage(err, "failed to create"+ - " client network manager") + return nil, errors.WithMessage( + err, "failed to create client network manager") } tmpMsg := format.NewMessage(session.GetCmixGroup().GetP().ByteLen()) tracker := uint64(0) earliest := uint64(0) - // create manager object + + // Create manager object m := &manager{ param: params, tracker: &tracker, @@ -123,63 +124,66 @@ func NewManager(params Params, comms *client.Comms, session storage.Session, m.verboseRounds = NewRoundTracker() } - /* set up modules */ - nodechan := make(chan commNetwork.NodeGateway, nodes.InputChanLen) + /* Set up modules */ + nodeChan := make(chan commNetwork.NodeGateway, nodes.InputChanLen) // Set up gateway.Sender poolParams := gateway.DefaultPoolParams() + // Client will not send KeepAlive packets poolParams.HostParams.KaClientOpts.Time = time.Duration(math.MaxInt64) + // Enable optimized HostPool initialization poolParams.MaxPings = 50 poolParams.ForceConnection = true - m.Sender, err = gateway.NewSender(poolParams, rng, - ndf, comms, session, nodechan) + m.Sender, err = gateway.NewSender( + poolParams, rng, ndf, comms, session, nodeChan) if err != nil { return nil, err } - //setup the node registrar - m.Registrar, err = nodes.LoadRegistrar(session, m.Sender, m.comms, m.rng, nodechan) + // Set up the node registrar + m.Registrar, err = nodes.LoadRegistrar( + session, m.Sender, m.comms, m.rng, nodeChan) if err != nil { return nil, err } - //setup the historical rounds handler - m.Retriever = historical.NewRetriever(params.Historical, comms, m.Sender, events) + // Set up the historical rounds handler + m.Retriever = historical.NewRetriever( + params.Historical, comms, m.Sender, events) - //Set up Message Handler + // Set up Message Handler m.Handler = message.NewHandler(params.Message, m.session.GetKV(), m.events, m.session.GetReceptionID()) - //set up round handler + // Set up round handler m.Pickup = rounds.NewPickup(params.Rounds, m.Handler.GetMessageReceptionChannel(), m.Sender, m.Retriever, m.rng, m.instance, m.session) - //add the identity system + // Add the identity system m.Tracker = identity.NewOrLoadTracker(m.session, m.Space) - // Set upthe ability to register with new nodes when they appear - m.instance.SetAddGatewayChan(nodechan) + // Set up the ability to register with new nodes when they appear + m.instance.SetAddGatewayChan(nodeChan) - // set up the health monitor + // Set up the health monitor m.Monitor = health.Init(instance, params.NetworkHealthTimeout) - //set up critical message tracking (sendCmix only) - critSender := func(msg format.Message, recipient *id.ID, - params CMIXParams) (id.Round, ephemeral.Id, error) { + // Set up critical message tracking (sendCmix only) + critSender := func(msg format.Message, recipient *id.ID, params CMIXParams, + ) (id.Round, ephemeral.Id, error) { return sendCmixHelper(m.Sender, msg, recipient, params, m.instance, m.session.GetCmixGroup(), m.Registrar, m.rng, m.events, m.session.GetTransmissionID(), m.comms) } - m.crit = newCritical(session.GetKV(), m.Monitor, - m.instance.GetRoundEvents(), critSender) + m.crit = newCritical( + session.GetKV(), m.Monitor, m.instance.GetRoundEvents(), critSender) // Report health events m.Monitor.AddHealthCallback(func(isHealthy bool) { - m.events.Report(5, "health", "IsHealthy", - fmt.Sprintf("%v", isHealthy)) + m.events.Report(5, "health", "IsHealthy", strconv.FormatBool(isHealthy)) }) return m, nil @@ -207,7 +211,7 @@ func (m *manager) Follow(report ClientErrorReport) (stoppable.Stoppable, error) // Node Updates multi.Add(m.Registrar.StartProcesses(m.param.ParallelNodeRegistrations)) // Adding/MixCypher - //TODO-node remover + // TODO-node remover // Start the Network tracker followNetworkStopper := stoppable.NewSingle("FollowNetwork") @@ -223,18 +227,18 @@ func (m *manager) Follow(report ClientErrorReport) (stoppable.Stoppable, error) // Historical rounds processing multi.Add(m.Retriever.StartProcessies()) - //start the processies for the identity handler + // Start the processes for the identity handler multi.Add(m.Tracker.StartProcessies()) return multi, nil } -// GetInstance returns the network instance object (ndf state) +// GetInstance returns the network instance object (NDF state). func (m *manager) GetInstance() *commNetwork.Instance { return m.instance } -// GetVerboseRounds returns verbose round information +// GetVerboseRounds returns verbose round information. func (m *manager) GetVerboseRounds() string { if m.verboseRounds == nil { return "Verbose Round tracking not enabled" @@ -246,7 +250,7 @@ func (m *manager) SetFakeEarliestRound(rnd id.Round) { atomic.StoreUint64(m.earliestRound, uint64(rnd)) } -// GetMaxMessageLength returns the maximum length of a cmix message +// GetMaxMessageLength returns the maximum length of a cMix message. func (m *manager) GetMaxMessageLength() int { return m.maxMsgLen } diff --git a/network/params.go b/network/params.go index 219633077..a37eaa3ad 100644 --- a/network/params.go +++ b/network/params.go @@ -13,27 +13,40 @@ import ( type Params struct { TrackNetworkPeriod time.Duration - // maximum number of rounds to check in a single iterations network updates + // MaxCheckedRounds is the maximum number of rounds to check in a single + // iterations network updates. MaxCheckedRounds uint - // Size of the buffer of nodes to register + + // RegNodesBufferLen is the size of the buffer of nodes to register. RegNodesBufferLen uint - // Longest delay between network events for health tracker to denote that - // the network is in a bad state + + // NetworkHealthTimeout is the longest delay between network events for + // health tracker to denote that the network is in a bad state. NetworkHealthTimeout time.Duration - //Number of parallel nodes registration the client is capable of + + // ParallelNodeRegistrations is the number of parallel node registrations + // that the client is capable of. ParallelNodeRegistrations uint - //How far back in rounds the network should actually check + + // KnownRoundsThreshold dictates how far back in rounds the network should + // actually check. KnownRoundsThreshold uint - // Determines verbosity of network updates while polling - // If true, client receives a filtered set of updates - // If false, client receives the full list of network updates + + // FastPolling determines verbosity of network updates while polling. If + // true, client receives a filtered set of updates. If false, client + // receives the full list of network updates. FastPolling bool - // Determines if the state of every round processed is tracked in ram. - // This is very memory intensive and is primarily used for debugging + + // VerboseRoundTracking determines if the state of every round processed is + // tracked in memory. This is very memory intensive and is primarily used + // for debugging. VerboseRoundTracking bool - //disables all attempts to pick up dropped or missed messages + + // RealtimeOnly disables all attempts to pick up dropped or missed messages. RealtimeOnly bool - // Resends auth requests up the stack if received multiple times + + // ReplayRequests Resends auth requests up the stack if received multiple + // times. ReplayRequests bool Rounds rounds.Params @@ -48,7 +61,7 @@ func GetDefaultParams() Params { RegNodesBufferLen: 1000, NetworkHealthTimeout: 30 * time.Second, ParallelNodeRegistrations: 20, - KnownRoundsThreshold: 1500, //5 rounds/sec * 60 sec/min * 5 min + KnownRoundsThreshold: 1500, // 5 rounds/sec * 60 sec/min * 5 min FastPolling: true, VerboseRoundTracking: false, RealtimeOnly: false, @@ -72,7 +85,8 @@ func (n Params) SetRealtimeOnlyAll() Params { return n } -// Obtain default Network parameters, or override with given parameters if set +// GetParameters returns the default network parameters, or override with given +// parameters, if set. func GetParameters(params string) (Params, error) { p := GetDefaultParams() if len(params) > 0 { @@ -85,34 +99,34 @@ func GetParameters(params string) (Params, error) { } type CMIXParams struct { - // maximum number of rounds to try and send on + // RoundTries is the maximum number of rounds to try to send on RoundTries uint Timeout time.Duration RetryDelay time.Duration ExcludedRounds excludedRounds.ExcludedRounds `json:"-"` - // Duration to wait before sending on a round times out and a new round is - // tried + // SendTimeout is the duration to wait before sending on a round times out + // and a new round is tried. SendTimeout time.Duration - // Tag which prints with sending logs to help localize the source - // All internal sends are tagged, so the default tag is "External" + // DebugTag is a tag that is printed with sending logs to help localize the + // source. All internal sends are tagged, so the default tag is "External". DebugTag string - //Threading interface, can be used to stop the send early + // Stop can be used to stop the send early. Stop *stoppable.Single `json:"-"` - //List of nodes to not send to, will skip a round with these - //nodes in it - //todo - do not omit this on json + // BlacklistedNodes is a list of nodes to not send to; will skip a round + // with these nodes in it. + // TODO: Make it not omitted from JSON marshalling BlacklistedNodes map[id.ID]bool `json:"-"` - // Sets the message as critical. The system will track that the round it - // sends on completes and will auto resend in the event the round fails or - // completion cannot be determined. The sent data will be byte identical, - // so this has a high chance of metadata leak. This system should only be - // used in cases where repeats cannot be different - // Only used in sendCmix, not sendManyCmix + // Critical indicates if the message is critical. The system will track that + // the round it sends on completes and will auto resend in the event the + // round fails or completion cannot be determined. The sent data will be + // byte identical, so this has a high chance of metadata leak. This system + // should only be used in cases where repeats cannot be different. Only used + // in sendCmix, not sendManyCmix. Critical bool } @@ -126,15 +140,8 @@ func GetDefaultCMIXParams() CMIXParams { } } -func (c CMIXParams) MarshalJSON() ([]byte, error) { - return json.Marshal(c) -} - -func (c CMIXParams) UnmarshalJSON(b []byte) error { - return json.Unmarshal(b, &c) -} - -// GetCMIXParameters func obtains default CMIX parameters, or overrides with given parameters if set +// GetCMIXParameters obtains default CMIX parameters, or overrides with given +// parameters if set. func GetCMIXParameters(params string) (CMIXParams, error) { p := GetDefaultCMIXParams() if len(params) > 0 { diff --git a/network/polltracker.go b/network/polltracker.go index 63739e19a..63bc03c90 100644 --- a/network/polltracker.go +++ b/network/polltracker.go @@ -13,7 +13,7 @@ func newPollTracker() *pollTracker { return &pt } -// Track tracks a single poll +// Track tracks a single poll. func (pt *pollTracker) Track(ephID ephemeral.Id, source *id.ID) { if _, exists := (*pt)[*source]; !exists { (*pt)[*source] = make(map[int64]uint) @@ -25,7 +25,7 @@ func (pt *pollTracker) Track(ephID ephemeral.Id, source *id.ID) { } } -// Report reports all recent polls +// Report reports all recent polls. func (pt *pollTracker) Report() string { report := "" numReports := uint(0) diff --git a/network/remoteFilters.go b/network/remoteFilters.go index 19421a8db..dd01c09e8 100644 --- a/network/remoteFilters.go +++ b/network/remoteFilters.go @@ -36,7 +36,8 @@ func (rf *RemoteFilter) GetFilter() *bloom.Ring { BloomFilterHashes) err = rf.filter.UnmarshalBinary(rf.data.Filter) if err != nil { - jww.FATAL.Panicf("Failed to properly unmarshal the bloom filter: %+v", err) + jww.FATAL.Panicf( + "Failed to properly unmarshal the bloom filter: %+v", err) } } return rf.filter diff --git a/network/remoteFilters_test.go b/network/remoteFilters_test.go index 2fff7034d..bf7c52c77 100644 --- a/network/remoteFilters_test.go +++ b/network/remoteFilters_test.go @@ -24,7 +24,7 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -// Unit test NewRemoteFilter +// Unit test NewRemoteFilter. func TestNewRemoteFilter(t *testing.T) { bloomFilter := &mixmessages.ClientBloom{ Filter: nil, @@ -34,25 +34,22 @@ func TestNewRemoteFilter(t *testing.T) { rf := NewRemoteFilter(bloomFilter) if !reflect.DeepEqual(rf.data, bloomFilter) { - t.Fatalf("NewRemoteFilter() error: "+ - "RemoteFilter not initialized as expected."+ - "\n\tExpected: %v\n\tReceived: %v", bloomFilter, rf.data) + t.Fatalf("RemoteFilter not initialized as expected."+ + "\nexpected: %+v\nreceived: %+v", bloomFilter, rf.data) } } -// Unit test GetFilter +// Unit test RemoteFilter.GetFilter. func TestRemoteFilter_GetFilter(t *testing.T) { testFilter, err := bloom.InitByParameters(BloomFilterSize, BloomFilterHashes) if err != nil { - t.Fatalf("GetFilter error: "+ - "Cannot initialize bloom filter for setup: %v", err) + t.Fatalf("Cannot initialize bloom filter for setup: %+v", err) } data, err := testFilter.MarshalBinary() if err != nil { - t.Fatalf("GetFilter error: "+ - "Cannot marshal filter for setup: %v", err) + t.Fatalf("Cannot marshal filter for setup: %+v", err) } bloomFilter := &mixmessages.ClientBloom{ @@ -64,14 +61,13 @@ func TestRemoteFilter_GetFilter(t *testing.T) { rf := NewRemoteFilter(bloomFilter) retrievedFilter := rf.GetFilter() if !reflect.DeepEqual(retrievedFilter, testFilter) { - t.Fatalf("GetFilter error: "+ - "Did not retrieve expected filter."+ - "\n\tExpected: %v\n\tReceived: %v", testFilter, retrievedFilter) + t.Fatalf("Did not retrieve expected filter."+ + "\nexpected: %+v\nreceived: %+v", testFilter, retrievedFilter) } } -// Unit test fro FirstRound and LastRound -func TestRemoteFilter_FirstLastRound(t *testing.T) { +// Unit test for RemoteFilter.FirstRound and RemoteFilter.LastRound. +func TestRemoteFilter_FirstRound_LastRound(t *testing.T) { firstRound := uint64(25) roundRange := uint32(75) bloomFilter := &mixmessages.ClientBloom{ @@ -84,17 +80,15 @@ func TestRemoteFilter_FirstLastRound(t *testing.T) { // Test FirstRound receivedFirstRound := rf.FirstRound() if receivedFirstRound != id.Round(firstRound) { - t.Fatalf("FirstRound error: "+ - "Did not receive expected round."+ - "\n\tExpected: %v\n\tReceived: %v", firstRound, receivedFirstRound) + t.Fatalf("Did not receive expected round.\nexpected: %d\nreceived: %d", + firstRound, receivedFirstRound) } // Test LastRound receivedLastRound := rf.LastRound() if receivedLastRound != id.Round(firstRound+uint64(roundRange)) { - t.Fatalf("LastRound error: "+ - "Did not receive expected round."+ - "\n\tExpected: %v\n\tReceived: %v", receivedLastRound, firstRound+uint64(roundRange)) + t.Fatalf("Did not receive expected round.\nexpected: %d\nreceived: %d", + receivedLastRound, firstRound+uint64(roundRange)) } } diff --git a/network/roundTracking.go b/network/roundTracking.go index b0cf1bf9c..e1f325de5 100644 --- a/network/roundTracking.go +++ b/network/roundTracking.go @@ -4,10 +4,10 @@ // All rights reserved. / //////////////////////////////////////////////////////////////////////////////// -// this is an in memory track of rounds that have been processed in this run of the -// xxdk. It only is enabled when loglevel is debug or higher. It will accumulate all -// rounds and then dump on exist. Is only enabled when run though the command line -// interface unless enabled explicitly in code. +// This is an in-memory track of rounds that have been processed in this run of +// the xxdk. It only is enabled when loglevel is debug or higher. It will +// accumulate all rounds and then dump on exit. Is only enabled when run through +// the command line interface unless enabled explicitly in code. package network @@ -16,6 +16,7 @@ import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/xx_network/primitives/id" "sort" + "strconv" "sync" ) @@ -42,7 +43,7 @@ func (rs RoundState) String() string { case Abandoned: return "Abandoned" default: - return fmt.Sprintf("Unregistered Round State: %d", rs) + return "Unregistered Round State: " + strconv.FormatUint(uint64(rs), 10) } } @@ -60,14 +61,16 @@ func NewRoundTracker() *RoundTracker { func (rt *RoundTracker) denote(rid id.Round, state RoundState) { rt.mux.Lock() defer rt.mux.Unlock() - // this ensures a lower state will not overwrite a higher state. - // eg. Unchecked does not overwrite MessageAvailable + + // This ensures that a lower state will not overwrite a higher state. + // e.g. Unchecked does not overwrite MessageAvailable. if storedState, exists := rt.state[rid]; exists && storedState > state { - jww.TRACE.Printf("did not denote round %d because "+ - "stored state of %s (%d) > passed state %s (%d)", + jww.TRACE.Printf("Did not denote round %d because stored state of %s "+ + "(%d) > passed state %s (%d)", rid, storedState, storedState, state, state) return } + rt.state[rid] = state } @@ -84,7 +87,8 @@ func (rt *RoundTracker) String() string { stringification := "" for _, key := range keys { - stringification += fmt.Sprintf("Round: %d, state:%s \n", key, rt.state[id.Round(key)]) + stringification += fmt.Sprintf( + "Round: %d, state:%s\n", key, rt.state[id.Round(key)]) } return stringification diff --git a/network/sendCmix.go b/network/sendCmix.go index fa47cacac..c67dbcaca 100644 --- a/network/sendCmix.go +++ b/network/sendCmix.go @@ -32,35 +32,36 @@ import ( "time" ) -// SendCMIX sends a "raw" CMIX message payload to the provided recipient. -// Returns the round ID of the round the payload was sent or an error -// if it fails. -// This does not have end to end encryption on it and is used exclusively as a -// send for higher order cryptographic protocols. Do not use unless implementing -// a protocol on top. -// recipient - cMix ID of the recipient -// fingerprint - Key Fingerprint. 256 bit field to store a 255 bit +// SendCMIX sends a "raw" cMix message payload to the provided recipient. +// Returns the round ID of the round the payload was sent or an error if it +// fails. +// This does not have end-to-end encryption on it and is used exclusively as +// a send for higher order cryptographic protocols. Do not use unless +// implementing a protocol on top. +// recipient - cMix ID of the recipient. +// fingerprint - Key Fingerprint. 256-bit field to store a 255-bit // fingerprint, highest order bit must be 0 (panic otherwise). If your // system does not use key fingerprints, this must be random bits. // service - Reception Service. The backup way for a client to identify // messages on receipt via trial hashing and to identify notifications. -// If unused, use messages.RandomService to fill the field with random data +// If unused, use message.GetRandomService to fill the field with +// random data. // payload - Contents of the message. Cannot exceed the payload size for a // cMix message (panic otherwise). -// mac - 256 bit field to store a 255 bit mac, highest order bit must be 0 +// mac - 256-bit field to store a 255-bit mac, highest order bit must be 0 // (panic otherwise). If used, fill with random bits. // Will return an error if the network is unhealthy or if it fails to send -// (along with the reason). Blocks until successful send or err. -// WARNING: Do not roll your own crypto +// (along with the reason). Blocks until successful sends or errors. +// WARNING: Do not roll your own crypto. func (m *manager) SendCMIX(recipient *id.ID, fingerprint format.Fingerprint, service message.Service, payload, mac []byte, cmixParams CMIXParams) ( id.Round, ephemeral.Id, error) { if !m.Monitor.IsHealthy() { - return 0, ephemeral.Id{}, errors.New("Cannot send cmix message when the " + - "network is not healthy") + return 0, ephemeral.Id{}, errors.New( + "Cannot send cmix message when the network is not healthy") } - //Build message. Will panic if inputs are not correct. + // Build message. Will panic if inputs are not correct. msg := format.NewMessage(m.session.GetCmixGroup().GetP().ByteLen()) msg.SetKeyFP(fingerprint) msg.SetContents(payload) @@ -82,19 +83,19 @@ func (m *manager) SendCMIX(recipient *id.ID, fingerprint format.Fingerprint, return rid, ephID, rtnErr } -// Helper function for sendCmix -// NOTE: Payloads send are not End to End encrypted, MetaData is NOT protected with -// this call, see SendE2E for End to End encryption and full privacy protection -// Internal SendCmix which bypasses the network check, will attempt to send to -// the network without checking state. It has a built in retry system which can +// sendCmixHelper is a helper function for manager.SendCMIX. +// NOTE: Payloads sent are not end-to-end encrypted; metadata is NOT protected +// with this call. See SendE2E for end-to-end encryption and full privacy +// protection. +// Internal SendCmix, which bypasses the network check, will attempt to send to +// the network without checking state. It has a built-in retry system which can // be configured through the params object. -// If the message is successfully sent, the id of the round sent it is returned, -// which can be registered with the network instance to get a callback on -// its status -func sendCmixHelper(sender gateway.Sender, msg format.Message, - recipient *id.ID, cmixParams CMIXParams, instance *network.Instance, - grp *cyclic.Group, nodes nodes.Registrar, - rng *fastRNG.StreamGenerator, events event.Manager, +// If the message is successfully sent, the ID of the round sent it is returned, +// which can be registered with the network instance to get a callback on its +// status. +func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID, + cmixParams CMIXParams, instance *network.Instance, grp *cyclic.Group, + nodes nodes.Registrar, rng *fastRNG.StreamGenerator, events event.Manager, senderId *id.ID, comms SendCmixCommsInterface) (id.Round, ephemeral.Id, error) { timeStart := netTime.Now() @@ -107,14 +108,14 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, attempted = excludedRounds.NewSet() } - jww.INFO.Printf("[SendCMIX-%s]Looking for round to send cMix message to %s "+ - "(msgDigest: %s)", cmixParams.DebugTag, recipient, msg.Digest()) + jww.INFO.Printf("[SendCMIX-%s] Looking for round to send cMix message to "+ + "%s (msgDigest: %s)", cmixParams.DebugTag, recipient, msg.Digest()) stream := rng.GetStream() defer stream.Close() - // flip leading bits randomly to thwart a tagging attack. - // See SetGroupBits for more info + // Flip leading bits randomly to thwart a tagging attack. + // See cmix.SetGroupBits for more info. cmix.SetGroupBits(msg, grp, stream) for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries++ { @@ -123,36 +124,42 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, cmixParams.DebugTag, numRoundTries, elapsed) if elapsed > cmixParams.Timeout { - jww.INFO.Printf("[SendCMIX-%s] No rounds to send to %s (msgDigest: %s) "+ - "were found before timeout %s", cmixParams.DebugTag, recipient, msg.Digest(), - cmixParams.Timeout) + jww.INFO.Printf("[SendCMIX-%s] No rounds to send to %s "+ + "(msgDigest: %s) were found before timeout %s", + cmixParams.DebugTag, recipient, msg.Digest(), cmixParams.Timeout) return 0, ephemeral.Id{}, errors.New("Sending cmix message timed out") } + if numRoundTries > 0 { - jww.INFO.Printf("[SendCMIX-%s] Attempt %d to find round to send message "+ - "to %s (msgDigest: %s)", cmixParams.DebugTag, numRoundTries+1, recipient, - msg.Digest()) + jww.INFO.Printf("[SendCMIX-%s] Attempt %d to find round to send "+ + "message to %s (msgDigest: %s)", cmixParams.DebugTag, + numRoundTries+1, recipient, msg.Digest()) } - // find the best round to send to, excluding attempted rounds + // Find the best round to send to, excluding attempted rounds remainingTime := cmixParams.Timeout - elapsed - bestRound, err := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer) + bestRound, err := instance.GetWaitingRounds().GetUpcomingRealtime( + remainingTime, attempted, sendTimeBuffer) if err != nil { jww.WARN.Printf("[SendCMIX-%s] Failed to GetUpcomingRealtime "+ "(msgDigest: %s): %+v", cmixParams.DebugTag, msg.Digest(), err) } + if bestRound == nil { - jww.WARN.Printf("[SendCMIX-%s], Best round on send is nil", cmixParams.DebugTag) + jww.WARN.Printf( + "[SendCMIX-%s] Best round on send is nil", cmixParams.DebugTag) continue } - jww.TRACE.Printf("[SendCMIX-%s] bestRound: %v", cmixParams.DebugTag, bestRound) - // Determine whether the selected round contains any Nodes - // that are blacklisted by the params.Network object + jww.TRACE.Printf("[SendCMIX-%s] Best round found: %+v", + cmixParams.DebugTag, bestRound) + + // Determine whether the selected round contains any nodes that are + // blacklisted by the CMIXParams object containsBlacklisted := false if cmixParams.BlacklistedNodes != nil { for _, nodeId := range bestRound.Topology { - nid := &id.ID{} + var nid *id.ID copy(nid[:], nodeId) if _, isBlacklisted := cmixParams.BlacklistedNodes[*nid]; isBlacklisted { containsBlacklisted = true @@ -162,68 +169,72 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, } if containsBlacklisted { - jww.WARN.Printf("[SendCMIX-%s]Round %d contains blacklisted nodes, "+ - "skipping...", cmixParams.DebugTag, bestRound.ID) + jww.WARN.Printf("[SendCMIX-%s] Round %d contains blacklisted "+ + "nodes, skipping...", cmixParams.DebugTag, bestRound.ID) continue } // Retrieve host and key information from round - firstGateway, roundKeys, err := processRound(nodes, bestRound, recipient.String(), msg.Digest()) + firstGateway, roundKeys, err := processRound( + nodes, bestRound, recipient.String(), msg.Digest()) if err != nil { - jww.WARN.Printf("[SendCMIX-%s]SendCmix failed to process round"+ - " (will retry): %v", cmixParams.DebugTag, err) + jww.WARN.Printf("[SendCMIX-%s] SendCmix failed to process round "+ + "(will retry): %v", cmixParams.DebugTag, err) continue } - jww.TRACE.Printf("[SendCMIX-%s]round %v processed, firstGW: %s", + jww.TRACE.Printf("[SendCMIX-%s] Round %v processed, firstGW: %s", cmixParams.DebugTag, bestRound, firstGateway) // Build the messages to send - wrappedMsg, encMsg, ephID, err := buildSlotMessage(msg, recipient, firstGateway, stream, senderId, bestRound, roundKeys) if err != nil { return 0, ephemeral.Id{}, err } - jww.INFO.Printf("[SendCMIX-%s] Sending to EphID %d (%s), "+ - "on round %d (msgDigest: %s, ecrMsgDigest: %s) "+ - "via gateway %s", cmixParams.DebugTag, - ephID.Int64(), recipient, bestRound.ID, msg.Digest(), - encMsg.Digest(), firstGateway.String()) + jww.INFO.Printf("[SendCMIX-%s] Sending to EphID %d (%s), on round %d "+ + "(msgDigest: %s, ecrMsgDigest: %s) via gateway %s", + cmixParams.DebugTag, ephID.Int64(), recipient, bestRound.ID, + msg.Digest(), encMsg.Digest(), firstGateway.String()) // Send the payload sendFunc := func(host *connect.Host, target *id.ID, timeout time.Duration) (interface{}, error) { wrappedMsg.Target = target.Marshal() - jww.TRACE.Printf("[SendCMIX-%s]sendFunc %s", cmixParams.DebugTag, host) - timeout = calculateSendTimeout(bestRound, maxTimeout) + jww.TRACE.Printf( + "[SendCMIX-%s] sendFunc %s", cmixParams.DebugTag, host) + // Use the smaller of the two timeout durations + timeout = calculateSendTimeout(bestRound, maxTimeout) calculatedTimeout := calculateSendTimeout(bestRound, maxTimeout) if calculatedTimeout < timeout { timeout = calculatedTimeout } // Send the message - result, err := comms.SendPutMessage(host, wrappedMsg, - timeout) - jww.TRACE.Printf("[SendCMIX-%s]sendFunc %s putmsg", cmixParams.DebugTag, host) + result, err := comms.SendPutMessage(host, wrappedMsg, timeout) + jww.TRACE.Printf("[SendCMIX-%s] sendFunc %s put message", + cmixParams.DebugTag, host) if err != nil { - err := handlePutMessageError(firstGateway, nodes, - recipient.String(), bestRound, err) - jww.TRACE.Printf("[SendCMIX-%s] sendFunc %s err %+v", + err := handlePutMessageError( + firstGateway, nodes, recipient.String(), bestRound, err) + jww.TRACE.Printf("[SendCMIX-%s] sendFunc %s error: %+v", cmixParams.DebugTag, host, err) - return result, errors.WithMessagef(err, - "SendCmix %s", unrecoverableError) + return result, errors.WithMessagef( + err, "SendCmix %s", unrecoverableError) } + return result, err } + jww.TRACE.Printf("[SendCMIX-%s] sendToPreferred %s", cmixParams.DebugTag, firstGateway) - result, err := sender.SendToPreferred( - []*id.ID{firstGateway}, sendFunc, cmixParams.Stop, cmixParams.SendTimeout) + + result, err := sender.SendToPreferred([]*id.ID{firstGateway}, sendFunc, + cmixParams.Stop, cmixParams.SendTimeout) jww.DEBUG.Printf("[SendCMIX-%s] sendToPreferred %s returned", cmixParams.DebugTag, firstGateway) @@ -232,18 +243,18 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, return 0, ephemeral.Id{}, err } - // if the comm errors or the message fails to send, continue retrying. + // If the comm errors or the message fails to send, continue retrying if err != nil { if strings.Contains(err.Error(), rateLimiting.ClientRateLimitErr) { - jww.ERROR.Printf("[SendCMIX-%s] SendCmix failed to send to EphID %d (%s) on "+ - "round %d: %+v", cmixParams.DebugTag, ephID.Int64(), recipient, - bestRound.ID, err) + jww.ERROR.Printf("[SendCMIX-%s] SendCmix failed to send to "+ + "EphID %d (%s) on round %d: %+v", cmixParams.DebugTag, + ephID.Int64(), recipient, bestRound.ID, err) return 0, ephemeral.Id{}, err } - jww.ERROR.Printf("[SendCMIX-%s] SendCmix failed to send to EphID %d (%s) on "+ - "round %d, trying a new round: %+v", cmixParams.DebugTag, ephID.Int64(), recipient, - bestRound.ID, err) + jww.ERROR.Printf("[SendCMIX-%s] SendCmix failed to send to "+ + "EphID %d (%s) on round %d, trying a new round: %+v", + cmixParams.DebugTag, ephID.Int64(), recipient, bestRound.ID, err) continue } @@ -251,20 +262,22 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, gwSlotResp := result.(*pb.GatewaySlotResponse) if gwSlotResp.Accepted { m := fmt.Sprintf("[SendCMIX-%s] Successfully sent to EphID %v "+ - "(source: %s) in round %d (msgDigest: %s), "+ - "elapsed: %s numRoundTries: %d", cmixParams.DebugTag, ephID.Int64(), - recipient, bestRound.ID, msg.Digest(), - elapsed, numRoundTries) + "(source: %s) in round %d (msgDigest: %s), elapsed: %s "+ + "numRoundTries: %d", cmixParams.DebugTag, ephID.Int64(), + recipient, bestRound.ID, msg.Digest(), elapsed, numRoundTries) + jww.INFO.Print(m) events.Report(1, "MessageSend", "Metric", m) + return id.Round(bestRound.ID), ephID, nil } else { - jww.FATAL.Panicf("[SendCMIX-%s] Gateway %s returned no error, but failed "+ - "to accept message when sending to EphID %d (%s) on round %d", - cmixParams.DebugTag, firstGateway, ephID.Int64(), recipient, bestRound.ID) + jww.FATAL.Panicf("[SendCMIX-%s] Gateway %s returned no error, "+ + "but failed to accept message when sending to EphID %d (%s) "+ + "on round %d", cmixParams.DebugTag, firstGateway, ephID.Int64(), + recipient, bestRound.ID) } } - return 0, ephemeral.Id{}, errors.New("failed to send the message, " + - "unknown error") + return 0, ephemeral.Id{}, + errors.New("failed to send the message, unknown error") } diff --git a/network/sendCmixUtils.go b/network/sendCmixUtils.go index 0d0f89094..8174fac14 100644 --- a/network/sendCmixUtils.go +++ b/network/sendCmixUtils.go @@ -25,19 +25,21 @@ import ( "time" ) -// Interface for SendCMIX comms; allows mocking this in testing. +// SendCmixCommsInterface is the interface for SendCMIX comms; allows mocking +// this in testing. type SendCmixCommsInterface interface { - // SendPutMessage places a cMix message on the gateway to be - // sent through cMix. + // SendPutMessage places a cMix message on the gateway to be sent through + // cMix. SendPutMessage(host *connect.Host, message *pb.GatewaySlot, timeout time.Duration) (*pb.GatewaySlotResponse, error) - // SendPutManyMessages places a list of cMix messages on the gateway - // to be sent through cMix. + + // SendPutManyMessages places a list of cMix messages on the gateway to be + // sent through cMix. SendPutManyMessages(host *connect.Host, messages *pb.GatewaySlots, timeout time.Duration) (*pb.GatewaySlotResponse, error) } -// how much in the future a round needs to be to send to it +// How much in the future a round needs to be to send to it const sendTimeBuffer = 1000 * time.Millisecond const unrecoverableError = "failed with an unrecoverable error" @@ -45,26 +47,25 @@ const unrecoverableError = "failed with an unrecoverable error" // PutManyMessage network call. A printable error will be returned giving more // context. If the error is not among recoverable errors, then the recoverable // boolean will be returned false. If the error is among recoverable errors, -// then the boolean will return true. -// recoverable means we should try resending to the round +// then the boolean will return true. Recoverable means we should try resending +// to the round. func handlePutMessageError(firstGateway *id.ID, nodes nodes.Registrar, - recipientString string, bestRound *pb.RoundInfo, - err error) (returnErr error) { + recipientString string, bestRound *pb.RoundInfo, err error) (returnErr error) { // If the comm errors or the message fails to send, then continue retrying; // otherwise, return if it sends properly if strings.Contains(err.Error(), "try a different round.") { - return errors.WithMessagef(err, "Failed to send to [%s] due to "+ - "round error with round %d, bailing...", + return errors.WithMessagef(err, + "Failed to send to [%s] due to round error with round %d, bailing...", recipientString, bestRound.ID) } else if strings.Contains(err.Error(), "Could not authenticate client. "+ - "Is the client registered with this nodes?") { + "Is the client registered with these nodes?") { // If send failed due to the gateway not recognizing the authorization, // then renegotiate with the nodes to refresh it nodeID := firstGateway.DeepCopy() nodeID.SetType(id.Node) - // DeleteFingerprint the keys and re-register + // Delete the keys and re-register nodes.RemoveNode(nodeID) nodes.TriggerNodeRegistration(nodeID) @@ -73,7 +74,7 @@ func handlePutMessageError(firstGateway *id.ID, nodes nodes.Registrar, recipientString, firstGateway) } - return errors.WithMessage(err, "Failed to put cmix message") + return errors.WithMessage(err, "Failed to put cMix message") } @@ -91,27 +92,26 @@ func processRound(nodes nodes.Registrar, bestRound *pb.RoundInfo, } topology := connect.NewCircuit(idList) - // get the keys for the round, reject if any nodes do not have keying + // Get the keys for the round, reject if any nodes do not have keying // relationships roundKeys, err := nodes.GetNodeKeys(topology) if err != nil { - return nil, nil, errors.WithMessagef(err, "Failed to get keys for round %d", bestRound.ID) + return nil, nil, errors.WithMessagef( + err, "Failed to get keys for round %d", bestRound.ID) } - // get the gateway to transmit to + // Get the gateway to transmit to firstGateway := topology.GetNodeAtIndex(0).DeepCopy() firstGateway.SetType(id.Gateway) return firstGateway, roundKeys, nil } -// buildSlotMessage is a helper function which forms a slotted message to send -// to a gateway. It encrypts passed in message and generates an address ID for -// the recipient. +// buildSlotMessage forms a slotted message to send to a gateway. It encrypts +// passed in message and generates an address ID for the recipient. func buildSlotMessage(msg format.Message, recipient *id.ID, target *id.ID, stream *fastRNG.Stream, senderId *id.ID, bestRound *pb.RoundInfo, - mixCrypt nodes.MixCypher) (*pb.GatewaySlot, - format.Message, ephemeral.Id, + mixCrypt nodes.MixCypher) (*pb.GatewaySlot, format.Message, ephemeral.Id, error) { // Set the address ID @@ -120,7 +120,7 @@ func buildSlotMessage(msg format.Message, recipient *id.ID, target *id.ID, int64(bestRound.Timestamps[states.QUEUED])) if err != nil { jww.FATAL.Panicf("Failed to generate address ID when sending to %s "+ - "(msgDigest: %s): %+v", err, recipient, msg.Digest()) + "(msgDigest: %s): %+v", err, recipient, msg.Digest()) } ephIdFilled, err := ephID.Fill(uint(bestRound.AddressSpaceSize), stream) @@ -166,9 +166,9 @@ func buildSlotMessage(msg format.Message, recipient *id.ID, target *id.ID, return slot, encMsg, ephID, nil } -// handleMissingNodeKeys signals to the nodes registration thread to register a +// handleMissingNodeKeys signals to the node registration thread to register a // nodes if keys are missing. Identity is triggered automatically when the nodes -// is first seen, so this should on trigger on rare events. +// are first seen, so this should on trigger on rare events. func handleMissingNodeKeys(instance *network.Instance, newNodeChan chan network.NodeGateway, nodes []*id.ID) { for _, n := range nodes { @@ -187,14 +187,15 @@ func handleMissingNodeKeys(instance *network.Instance, } } -// messageListToStrings serializes a list of message.TargetedCmixMessage into a -// string of comma seperated recipient IDs and a string of comma seperated -// message digests. Duplicate recipient IDs are printed once. Intended for use -// in printing to log. -func messageListToStrings(msgList []assembeledCmixMessage) (string, string) { +// messageListToStrings serializes a list of assembledCmixMessage into a string +// of comma seperated recipient IDs and a string of comma seperated message +// digests. Duplicate recipient IDs are printed once. Intended for use in +// printing to log. +func messageListToStrings(msgList []assembledCmixMessage) (string, string) { idStrings := make([]string, 0, len(msgList)) idMap := make(map[id.ID]bool, len(msgList)) msgDigests := make([]string, len(msgList)) + for i, msg := range msgList { if !idMap[*msg.Recipient] { idStrings = append(idStrings, msg.Recipient.String()) @@ -229,18 +230,19 @@ func ephemeralIdListToString(idList []ephemeral.Id) string { } func calculateSendTimeout(best *pb.RoundInfo, max time.Duration) time.Duration { - roundStartTime := time.Unix(0, - int64(best.Timestamps[states.QUEUED])) + roundStartTime := time.Unix(0, int64(best.Timestamps[states.QUEUED])) + // 250ms AFTER the round starts to hear the response. - timeout := roundStartTime.Sub( - netTime.Now().Add(250 * time.Millisecond)) + timeout := roundStartTime.Sub(netTime.Now().Add(250 * time.Millisecond)) if timeout > max { timeout = max } + // time.Duration is a signed int, so check for negative if timeout < 0 { // TODO: should this produce a warning? timeout = 100 * time.Millisecond } + return timeout } diff --git a/network/sendManyCmix.go b/network/sendManyCmix.go index fc070733d..3b7819336 100644 --- a/network/sendManyCmix.go +++ b/network/sendManyCmix.go @@ -41,38 +41,38 @@ type TargetedCmixMessage struct { Mac []byte } -// SendManyCMIX sends many "raw" CMIX message payloads to the provided +// SendManyCMIX sends many "raw" cMix message payloads to the provided // recipients all in the same round. -// Returns the round ID of the round the payloads was sent or an error -// if it fails. -// This does not have end to end encryption on it and is used exclusively as a -// send for higher order cryptographic protocols. Do not use unless implementing -// a protocol on top. -// Due to sending multiple payloads, this leaks more metadata than a standard -// cmix send and should be in general avoided. -// recipient - cMix ID of the recipient -// fingerprint - Key Fingerprint. 256 bit field to store a 255 bit +// Returns the round ID of the round the payloads was sent or an error if it +// fails. +// This does not have end-to-end encryption on it and is used exclusively as +// a send for higher order cryptographic protocols. Do not use unless +// implementing a protocol on top. +// Due to sending multiple payloads, this leaks more metadata than a +// standard cMix send and should be in general avoided. +// recipient - cMix ID of the recipient. +// fingerprint - Key Fingerprint. 256-bit field to store a 255-bit // fingerprint, highest order bit must be 0 (panic otherwise). If your // system does not use key fingerprints, this must be random bits. // service - Reception Service. The backup way for a client to identify // messages on receipt via trial hashing and to identify notifications. -// If unused, use messages.RandomService to fill the field with random data +// If unused, use message.GetRandomService to fill the field with +// random data. // payload - Contents of the message. Cannot exceed the payload size for a // cMix message (panic otherwise). -// mac - 256 bit field to store a 255 bit mac, highest order bit must be 0 +// mac - 256-bit field to store a 255-bit mac, highest order bit must be 0 // (panic otherwise). If used, fill with random bits. // Will return an error if the network is unhealthy or if it fails to send // (along with the reason). Blocks until successful send or err. -// Does not support Critical Messages // WARNING: Do not roll your own crypto func (m *manager) SendManyCMIX(messages []TargetedCmixMessage, p CMIXParams) (id.Round, []ephemeral.Id, error) { if !m.Monitor.IsHealthy() { - return 0, []ephemeral.Id{}, errors.New("Cannot send cmix " + - "message when the network is not healthy") + return 0, []ephemeral.Id{}, errors.New( + "Cannot send cMix message when the network is not healthy") } - acms := make([]assembeledCmixMessage, len(messages)) + acms := make([]assembledCmixMessage, len(messages)) for i := range messages { msg := format.NewMessage(m.session.GetCmixGroup().GetP().ByteLen()) msg.SetKeyFP(messages[i].Fingerprint) @@ -80,7 +80,7 @@ func (m *manager) SendManyCMIX(messages []TargetedCmixMessage, msg.SetMac(messages[i].Mac) msg.SetSIH(messages[i].Service.Hash(msg.GetContents())) - acms[i] = assembeledCmixMessage{ + acms[i] = assembledCmixMessage{ Recipient: messages[i].Recipient, Message: msg, } @@ -91,7 +91,7 @@ func (m *manager) SendManyCMIX(messages []TargetedCmixMessage, m.session.GetTransmissionID(), m.comms) } -type assembeledCmixMessage struct { +type assembledCmixMessage struct { Recipient *id.ID Message format.Message } @@ -108,7 +108,7 @@ type assembeledCmixMessage struct { // which can be registered with the network instance to get a callback on its // status. func sendManyCmixHelper(sender gateway.Sender, - msgs []assembeledCmixMessage, param CMIXParams, instance *network.Instance, + msgs []assembledCmixMessage, param CMIXParams, instance *network.Instance, grp *cyclic.Group, registrar nodes.Registrar, rng *fastRNG.StreamGenerator, events event.Manager, senderId *id.ID, comms SendCmixCommsInterface) ( -- GitLab