diff --git a/api/client.go b/api/client.go index 191b68fb35ac6e8dd5d348f93b06d923eaaed6bf..1e73696714b09edc674874d6db43324e8d19b374 100644 --- a/api/client.go +++ b/api/client.go @@ -209,13 +209,16 @@ func loadClient(session *storage.Session, rngStreamGen *fastRNG.StreamGenerator) // tracks the network events and hands them off to workers for handling // - Historical Round Retrieval (/network/rounds/historical.go) // Retrieves data about rounds which are too old to be stored by the client -// - Message Retrieval Worker Group (/network/rounds/retreive.go) +// - Message Retrieval Worker Group (/network/rounds/retrieve.go) // Requests all messages in a given round from the gateway of the last node -// - Message Handling Worker Group (/network/message/reception.go) +// - Message Handling Worker Group (/network/message/handle.go) // Decrypts and partitions messages when signals via the Switchboard // - Health Tracker (/network/health) // Via the network instance tracks the state of the network -// - +// - Garbled Messages (/network/message/garbled.go) +// Can be signaled to check all recent messages which could be be decoded +// Uses a message store on disk for persistence + func (c *Client) StartNetworkFollower() error { jww.INFO.Printf("StartNetworkFollower()") @@ -241,8 +244,10 @@ func (c *Client) StartNetworkFollower() error { return nil } -// stops the network follower if it is running. -// if the network follower is running nad this fails, the client object will +// StopNetworkFollower stops the network follower if it is running. +// It returns errors if the Follower is in the wrong status to stop or if it +// fails to stop it. +// if the network follower is running and this fails, the client object will // most likely be in an unrecoverable state and need to be trashed. func (c *Client) StopNetworkFollower(timeout time.Duration) error { err := c.status.toStopping() diff --git a/bindings/interfaces.go b/bindings/interfaces.go index 4149cfad13edfb43e657b127944f85520ba52a54..2001e135a4f1aabc1e018170d53dc3118f71d683 100644 --- a/bindings/interfaces.go +++ b/bindings/interfaces.go @@ -139,7 +139,7 @@ type ContactList interface { // Listener provides a callback to hear a message // An object implementing this interface can be called back when the client -// gets a message of the type that the registerer specified at registration +// gets a message of the type that the regi sterer specified at registration // time. type Listener interface { // Hear is called to receive a message in the UI diff --git a/go.mod b/go.mod index 1ae4b813b080a26461eb8f80445a0b11aab3f541..adccbbd5cb822e50f2e12c98595032dbbd5779a6 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.6.2 - gitlab.com/elixxir/comms v0.0.0-20200921200427-5955a0a798b9 + gitlab.com/elixxir/comms v0.0.0-20200924072138-2e2709483d89 gitlab.com/elixxir/crypto v0.0.0-20200921195205-bca0178268ec gitlab.com/elixxir/ekv v0.1.1 gitlab.com/elixxir/primitives v0.0.0-20200915190719-f4586ec93f50 diff --git a/go.sum b/go.sum index 608d540ab0a7e9be922cde1dfd79a58e1ef4934b..ed8d121f9e9d7aef3869236fb103e2947a20ac00 100644 --- a/go.sum +++ b/go.sum @@ -190,6 +190,10 @@ gitlab.com/elixxir/comms v0.0.0-20200917172539-929fc227eb0c h1:go7/RknV7646Ie+nm gitlab.com/elixxir/comms v0.0.0-20200917172539-929fc227eb0c/go.mod h1:yBEsOZSPyJQJvDbtlQ5L8ydy1JRgVlRoNgMDy9koQcE= gitlab.com/elixxir/comms v0.0.0-20200921200427-5955a0a798b9 h1:skzHNWCMh+T7Cn58/88Mikg2R8KnSWfzLV0w7SnerOs= gitlab.com/elixxir/comms v0.0.0-20200921200427-5955a0a798b9/go.mod h1:uRr8j6yTjCslxZxbRe6k4ixACu9gAeF61JZH36OFFa0= +gitlab.com/elixxir/comms v0.0.0-20200922163657-3e723b7170f6 h1:dFgfdATZuiPybonCBh0s4HeLB8Qw3Zm9EoLDRh2Eaaw= +gitlab.com/elixxir/comms v0.0.0-20200922163657-3e723b7170f6/go.mod h1:yBEsOZSPyJQJvDbtlQ5L8ydy1JRgVlRoNgMDy9koQcE= +gitlab.com/elixxir/comms v0.0.0-20200924072138-2e2709483d89 h1:PQalM7pnCRzZRKvdzJ6Jwz6e8bPR9H4CURzAwqYnJPE= +gitlab.com/elixxir/comms v0.0.0-20200924072138-2e2709483d89/go.mod h1:uRr8j6yTjCslxZxbRe6k4ixACu9gAeF61JZH36OFFa0= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4 h1:28ftZDeYEko7xptCZzeFWS1Iam95dj46TWFVVlKmw6A= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.0-20200805174804-bdf909f2a16d/go.mod h1:cu6uNoANVLV0J6HyTL6KqVtVyh9SHU1RjJhytYlsbVQ= diff --git a/network/follow.go b/network/follow.go index 49f49b4e54a12c82f872c6d1aedf0390cf2ffd90..6e12f0275803bc61cf31c203e0196cd6c7e9ed68 100644 --- a/network/follow.go +++ b/network/follow.go @@ -17,7 +17,7 @@ package network // - /node/register.go for add/remove node events // - /rounds/historical.go for old round retrieval // - /rounds/retrieve.go for message retrieval -// - /message/reception.go decryption, partitioning, and signaling of messages +// - /message/handle.go decryption, partitioning, and signaling of messages // - /health/tracker.go - tracks the state of the network through the network // instance @@ -33,6 +33,7 @@ import ( "time" ) +//comms interface makes testing easier type followNetworkComms interface { GetHost(hostId *id.ID) (*connect.Host, bool) SendPoll(host *connect.Host, message *pb.GatewayPoll) (*pb.GatewayPollResponse, error) @@ -55,6 +56,7 @@ func (m *manager) followNetwork(quitCh <-chan struct{}) { } } +// executes each iteration of the follower func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { //randomly select a gateway to poll @@ -109,14 +111,21 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { } // ---- Round Processing ----- - //build the round checker + // check rounds using the round checker function which determines if there + // are messages waiting in rounds and then sends signals to the appropriate + // handling threads roundChecker := func(rid id.Round) bool { return m.round.Checker(rid) } - //check rounds + // get the bit vector of rounds that have been checked checkedRounds := m.Session.GetCheckedRounds() + // cleave off old state in the bit vector which is deprecated from the + // network checkedRounds.Forward(lastTrackedRound) + // loop through all rounds the client does not know about and the gateway + // does, checking the bloom filter for the user to see if there are + // messages for the user (bloom not implemented yet) checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker, int(m.param.MaxCheckedRounds)) } diff --git a/network/message/garbled.go b/network/message/garbled.go index c4fe424b9ccda33a5e9b72c36bde9f86b509e136..169d0281c098513675635b7d60791cd0061e7fc8 100644 --- a/network/message/garbled.go +++ b/network/message/garbled.go @@ -5,6 +5,23 @@ import ( "time" ) +// Messages can arrive in the network out of order. When message handling fails +// to decrypt a message, it is added to the garbled message buffer (which is +// stored on disk) and the message decryption is retried here whenever triggered. + +// This can be triggered through the CheckGarbledMessages on the network manager +// and is used in the /keyExchange package on successful rekey triggering + +// Triggers Garbled message checking if the queue is not full +// Exposed on the network manager +func (m *Manager) CheckGarbledMessages() { + select { + case m.triggerGarbled <- struct{}{}: + default: + } +} + +//long running thread which processes garbled messages func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) { done := false for !done { @@ -17,6 +34,7 @@ func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) { } } +//handler for a single run of garbled messages func (m *Manager) handleGarbledMessages() { garbledMsgs := m.Session.GetGarbledMessages() e2eKv := m.Session.E2e() @@ -34,7 +52,8 @@ func (m *Manager) handleGarbledMessages() { //remove from the buffer if decryption is successful garbledMsgs.Remove(grbldMsg) //handle the successfully decrypted message - xxMsg, ok := m.partitioner.HandlePartition(sender, message.E2E, msg.GetContents()) + xxMsg, ok := m.partitioner.HandlePartition(sender, message.E2E, + msg.GetContents()) if ok { m.Switchboard.Speak(xxMsg) continue @@ -42,8 +61,8 @@ func (m *Manager) handleGarbledMessages() { } } // fail the message if any part of the decryption fails, - // unless it is our of attempts and has been in the buffer long enough, - // then remove it + // unless it is the last attempts and has been in the buffer long + // enough, in which case remove it if count == m.param.MaxChecksGarbledMessage && time.Since(timestamp) > m.param.GarbledMessageWait { garbledMsgs.Remove(grbldMsg) diff --git a/network/message/reception.go b/network/message/handler.go similarity index 93% rename from network/message/reception.go rename to network/message/handler.go index e3cff5f91cf1cc448ce5724372012581b6c0251a..7e660653b85862c18ca7f7d305979aec4ff2626e 100644 --- a/network/message/reception.go +++ b/network/message/handler.go @@ -9,7 +9,7 @@ import ( "time" ) -func (m *Manager) processMessages(quitCh <-chan struct{}) { +func (m *Manager) handleMessages(quitCh <-chan struct{}) { done := false for !done { select { @@ -17,7 +17,7 @@ func (m *Manager) processMessages(quitCh <-chan struct{}) { done = true case bundle := <-m.messageReception: for _, msg := range bundle.Messages { - m.receiveMessage(msg) + m.handleMessage(msg) } bundle.Finish() } @@ -25,7 +25,7 @@ func (m *Manager) processMessages(quitCh <-chan struct{}) { } -func (m *Manager) receiveMessage(ecrMsg format.Message) { +func (m *Manager) handleMessage(ecrMsg format.Message) { // We've done all the networking, now process the message fingerprint := ecrMsg.GetKeyFP() diff --git a/network/message/manager.go b/network/message/manager.go index 3d95ef04099fe5bf997d601b0a9b4205c9385281..daa398a6cfe033f7ac0d6462306617e1d5b79e5f 100644 --- a/network/message/manager.go +++ b/network/message/manager.go @@ -41,22 +41,14 @@ func (m *Manager) GetMessageReceptionChannel() chan<- Bundle { return m.messageReception } -//Gets the channel to send received messages on -func (m *Manager) CheckGarbledMessages() { - select { - case m.triggerGarbled <- struct{}{}: - default: - } -} - //Starts all worker pool func (m *Manager) StartProcessies() stoppable.Stoppable { multi := stoppable.NewMulti("MessageReception") - //create the message reception workers + //create the message handler workers for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ { stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i)) - go m.processMessages(stop.Quit()) + go m.handleMessages(stop.Quit()) multi.Add(stop) } diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 34f910ee7a76d0dbb84d439f22453b6df44e2478..afe860183fee453e4e8c6ebddb3273f3ef7b6faf 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -13,9 +13,12 @@ import ( "time" ) - -// Internal send e2e which bypasses the network check, for use in SendE2E and -// SendUnsafe which do their own network checks +// Internal SendCmix which bypasses the network check, will attempt to send to +// the network without checking state. It has a built in retry system which can +// be configured through the params object. +// If the message is successfully sent, the id of the round sent it is returned, +// which can be registered with the network instance to get a callback on +// its status func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { timeStart := time.Now() @@ -30,11 +33,14 @@ func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err //find the best round to send to, excluding roudn which have been attempted bestRound, _ := m.Instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted) - topology, err := buildToplogy(bestRound.Topology) + + //build the topology + idList, err := id.NewIDListFromBytes(bestRound.Topology) if err == nil { jww.ERROR.Printf("Failed to use topology for round %v: %s", bestRound.ID, err) continue } + topology := connect.NewCircuit(idList) //get they keys for the round, reject if any nodes do not have //keying relationships @@ -102,22 +108,11 @@ func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err return 0, errors.New("failed to send the message") } -func buildToplogy(nodes [][]byte) (*connect.Circuit, error) { - idList := make([]*id.ID, len(nodes)) - for i, n := range nodes { - nid, err := id.Unmarshal(n) - if err != nil { - return nil, errors.WithMessagef(err, "Failed to "+ - "convert topology on node %v/%v {raw id: %v}", i, len(nodes), n) - } - idList[i] = nid - } - topology := connect.NewCircuit(idList) - return topology, nil - -} - -func handleMissingNodeKeys(instance *network.Instance, newNodeChan chan network.NodeGateway, nodes []*id.ID) { +// Signals to the node registration thread to register a node if keys are +// missing. Registration is triggered automatically when the node is first seen, +// so this should on trigger on rare events. +func handleMissingNodeKeys(instance *network.Instance, + newNodeChan chan network.NodeGateway, nodes []*id.ID) { for _, n := range nodes { ng, err := instance.GetNodeAndGateway(n) if err != nil { diff --git a/network/rounds/check.go b/network/rounds/check.go index 5ac70543d0f9221742818e89764736c0a27cc1e0..74d8f0b3361fd4a569f7d347d23be856467e3b76 100644 --- a/network/rounds/check.go +++ b/network/rounds/check.go @@ -1,40 +1,47 @@ package rounds import ( + jww "github.com/spf13/jwalterweatherman" "gitlab.com/xx_network/primitives/id" ) -// getRoundChecker passes a context and the round infos received by the -// gateway to the funky round checker api to update round state. -// The returned function passes round event objects over the context -// to the rest of the message handlers for getting messages. +// the round checker is a single use function which is meant to be wrapped +// and adhere to the knownRounds checker interface. it receives a round ID and +// looks up the state of that round to determine if the client has a message +// waiting in it. +// It will return true if it can conclusively determine no message exists, +// returning false and set the round to processing if it needs further +// investigation. +// Once it determines messages might be waiting in a round, it determines +// if the information about that round is already present, if it is the data is +// sent to Message Retrieval Workers, otherwise it is sent to Historical Round +// Retrieval func (m *Manager) Checker(roundID id.Round) bool { // Set round to processing, if we can processing, count := m.p.Process(roundID) if !processing { + // if is already processing, ignore return false } + + //if the number of times the round has been checked has hit the max, drop it if count == m.params.MaxAttemptsCheckingARound { + jww.ERROR.Printf("Round %v failed the maximum number of times "+ + "(%v), stopping retrval attempt", roundID, + m.params.MaxAttemptsCheckingARound) m.p.Done(roundID) return true } - // FIXME: Spec has us SETTING processing, but not REMOVING it - // until the get messages thread completes the lookup, this - // is smell that needs refining. It seems as if there should be - // a state that lives with the round info as soon as we know - // about it that gets updated at different parts...not clear - // needs to be thought through. - //defer processing.Done(roundID) // TODO: Bloom filter lookup -- return true when we don't have - // Go get the round from the round infos, if it exists + // Go get the round from the round infos, if it exists ri, err := m.Instance.GetRound(roundID) if err != nil { - // If we didn't find it, send to historical - // rounds processor + // If we didn't find it, send to Historical Rounds Retrieval m.historicalRounds <- roundID } else { + // IF found, send to Message Retrieval Workers m.lookupRoundMessages <- ri } diff --git a/network/rounds/historical.go b/network/rounds/historical.go index e044838406dacb7d769aafe7420f1a21f4adcc72..5047df546cc4b9b6b129c7dabbeb7113868c9145 100644 --- a/network/rounds/historical.go +++ b/network/rounds/historical.go @@ -15,6 +15,14 @@ import ( jww "github.com/spf13/jwalterweatherman" ) +// Historical Rounds looks up the round history via random gateways. +// It batches these quests but never waits longer than +// params.HistoricalRoundsPeriod to do a lookup. +// Historical rounds receives input from: +// - Network Follower (/network/follow.go) +// Historical Rounds sends the output to: +// - Message Retrieval Workers (/network/round/retrieve.go) + //interface to increase east of testing of historical rounds type historicalRoundsComms interface { GetHost(hostId *id.ID) (*connect.Host, bool) @@ -22,10 +30,9 @@ type historicalRoundsComms interface { message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) } -// ProcessHistoricalRounds analyzes round history to see if this Client -// needs to check for messages at any of the gateways which completed -// those rounds. -// Waits to request many rounds at a time or for a timeout to trigger +// Long running thread which process historical rounds +// Can be killed by sending a signal to the quit channel +// takes a comms interface to aid in testing func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-chan struct{}) { timerCh := make(<-chan time.Time) @@ -40,6 +47,16 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c select { case <-quitCh: rng.Close() + // return all rounds in the queue to the input channel so they can + // be checked in the future. If the queue is full, disable them as + // processing so they are picked up from the beginning + for _, rid := range rounds { + select { + case m.historicalRounds <- id.Round(rid): + default: + m.p.NotProcessing(id.Round(rid)) + } + } done = true // if the timer elapses process rounds to ensure the delay isn't too long case <-timerCh: @@ -67,6 +84,7 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c "data: %s", err) } + //send the historical rounds request hr := &pb.HistoricalRounds{ Rounds: rounds, } @@ -80,13 +98,20 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C continue } + + // process the returned historical rounds. for i, roundInfo := range response.Rounds { + // The interface has missing returns returned as nil, such rounds + // need be be removes as processing so the network follower will + // pick them up in the future. if roundInfo == nil { jww.ERROR.Printf("could not retreive "+ "historical round %d", rounds[i]) + m.p.Fail(id.Round(rounds[i])) continue } - m.p.Done(id.Round(rounds[i])) + // Successfully retrieved rounds are sent to the Message + // Retrieval Workers m.lookupRoundMessages <- roundInfo } } diff --git a/network/rounds/processingRounds.go b/network/rounds/processingRounds.go index 5a9442ba16cd80fc842d600f9c95475fdb6f6364..585a7656c692cdf8e21719ffd30c4960c7cb3ebf 100644 --- a/network/rounds/processingRounds.go +++ b/network/rounds/processingRounds.go @@ -8,7 +8,7 @@ import ( ) type status struct { - count uint + failCount uint processing bool } @@ -34,16 +34,15 @@ func (pr *processing) Process(id id.Round) (bool, uint) { if rs, ok := pr.rounds[id]; ok { if rs.processing { - return false, rs.count + return false, rs.failCount } - rs.count++ rs.processing = true - return true, rs.count + return true, rs.failCount } pr.rounds[id] = &status{ - count: 0, + failCount: 0, processing: true, } @@ -62,12 +61,14 @@ func (pr *processing) IsProcessing(id id.Round) bool { return false } -// Fail sets a round's processing status to failed so that it can be retried. +// Fail sets a round's processing status to failed and increments its fail +// counter so that it can be retried. func (pr *processing) Fail(id id.Round) { pr.Lock() defer pr.Unlock() if rs, ok := pr.rounds[id]; ok { rs.processing = false + rs.failCount++ } } @@ -77,3 +78,13 @@ func (pr *processing) Done(id id.Round) { defer pr.Unlock() delete(pr.rounds, id) } + +// NotProcessing sets a round's processing status to failed so that it can be +// retried but does not increment its fail counter. +func (pr *processing) NotProcessing(id id.Round) { + pr.Lock() + defer pr.Unlock() + if rs, ok := pr.rounds[id]; ok { + rs.processing = false + } +} \ No newline at end of file diff --git a/network/rounds/processingRounds_test.go b/network/rounds/processingRounds_test.go index e57fe05e4d9ba437b5b21af052c92dd719e8bfcf..8b61ad6d76a050134a03246df47c6e7996af4ed8 100644 --- a/network/rounds/processingRounds_test.go +++ b/network/rounds/processingRounds_test.go @@ -33,10 +33,10 @@ func TestProcessing_Process(t *testing.T) { }{ {10, true, true, 0}, {10, true, false, 0}, - {10, false, true, 1}, + {10, false, true, 0}, {100, true, true, 0}, {100, true, false, 0}, - {100, false, true, 1}, + {100, false, true, 0}, } for i, d := range testData { @@ -86,6 +86,9 @@ func TestProcessing_Fail(t *testing.T) { if pr.rounds[rid].processing { t.Errorf("Fail() did not mark processing as false for round id %d.", rid) } + if pr.rounds[rid].failCount != 1 { + t.Errorf("Fail() did not increment the fail count of round id %d.", rid) + } } // Tests happy path of Done. diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index b6b45e7a35383e9f3dabe116733c77de12887db4..7378332ad289aa21ef45a0e2844b29428ea06235 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -1,7 +1,6 @@ package rounds import ( - "encoding/binary" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/network/gateway" @@ -40,12 +39,6 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, } } -// TODO: remove me when api fixed -func uint64ToBytes(i uint64) []byte { - bs := make([]byte, 8) - binary.LittleEndian.PutUint64(bs, 31415926) - return bs -} func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo, comms messageRetrievalComms) (message.Bundle, error) { @@ -62,8 +55,7 @@ func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo, // send the request msgReq := &pb.GetMessages{ ClientID: m.Uid.Marshal(), - //TODO: fix this, should not be a byte slice - RoundID: uint64ToBytes(uint64(rid)), + RoundID: uint64(rid), } msgResp, err := comms.RequestMessages(gwHost, msgReq) // Fail the round if an error occurs so it can be tried again later