Skip to content
Snippets Groups Projects
Commit 0f1ce957 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

continued documentation and cleanup

parent be2fcb10
No related branches found
No related tags found
No related merge requests found
...@@ -209,13 +209,16 @@ func loadClient(session *storage.Session, rngStreamGen *fastRNG.StreamGenerator) ...@@ -209,13 +209,16 @@ func loadClient(session *storage.Session, rngStreamGen *fastRNG.StreamGenerator)
// tracks the network events and hands them off to workers for handling // tracks the network events and hands them off to workers for handling
// - Historical Round Retrieval (/network/rounds/historical.go) // - Historical Round Retrieval (/network/rounds/historical.go)
// Retrieves data about rounds which are too old to be stored by the client // Retrieves data about rounds which are too old to be stored by the client
// - Message Retrieval Worker Group (/network/rounds/retreive.go) // - Message Retrieval Worker Group (/network/rounds/retrieve.go)
// Requests all messages in a given round from the gateway of the last node // Requests all messages in a given round from the gateway of the last node
// - Message Handling Worker Group (/network/message/reception.go) // - Message Handling Worker Group (/network/message/handle.go)
// Decrypts and partitions messages when signals via the Switchboard // Decrypts and partitions messages when signals via the Switchboard
// - Health Tracker (/network/health) // - Health Tracker (/network/health)
// Via the network instance tracks the state of the network // Via the network instance tracks the state of the network
// - // - Garbled Messages (/network/message/garbled.go)
// Can be signaled to check all recent messages which could be be decoded
// Uses a message store on disk for persistence
func (c *Client) StartNetworkFollower() error { func (c *Client) StartNetworkFollower() error {
jww.INFO.Printf("StartNetworkFollower()") jww.INFO.Printf("StartNetworkFollower()")
...@@ -241,8 +244,10 @@ func (c *Client) StartNetworkFollower() error { ...@@ -241,8 +244,10 @@ func (c *Client) StartNetworkFollower() error {
return nil return nil
} }
// stops the network follower if it is running. // StopNetworkFollower stops the network follower if it is running.
// if the network follower is running nad this fails, the client object will // It returns errors if the Follower is in the wrong status to stop or if it
// fails to stop it.
// if the network follower is running and this fails, the client object will
// most likely be in an unrecoverable state and need to be trashed. // most likely be in an unrecoverable state and need to be trashed.
func (c *Client) StopNetworkFollower(timeout time.Duration) error { func (c *Client) StopNetworkFollower(timeout time.Duration) error {
err := c.status.toStopping() err := c.status.toStopping()
......
...@@ -15,7 +15,7 @@ require ( ...@@ -15,7 +15,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/jwalterweatherman v1.1.0
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.6.2 github.com/spf13/viper v1.6.2
gitlab.com/elixxir/comms v0.0.0-20200921200427-5955a0a798b9 gitlab.com/elixxir/comms v0.0.0-20200924072138-2e2709483d89
gitlab.com/elixxir/crypto v0.0.0-20200921195205-bca0178268ec gitlab.com/elixxir/crypto v0.0.0-20200921195205-bca0178268ec
gitlab.com/elixxir/ekv v0.1.1 gitlab.com/elixxir/ekv v0.1.1
gitlab.com/elixxir/primitives v0.0.0-20200915190719-f4586ec93f50 gitlab.com/elixxir/primitives v0.0.0-20200915190719-f4586ec93f50
......
...@@ -190,6 +190,10 @@ gitlab.com/elixxir/comms v0.0.0-20200917172539-929fc227eb0c h1:go7/RknV7646Ie+nm ...@@ -190,6 +190,10 @@ gitlab.com/elixxir/comms v0.0.0-20200917172539-929fc227eb0c h1:go7/RknV7646Ie+nm
gitlab.com/elixxir/comms v0.0.0-20200917172539-929fc227eb0c/go.mod h1:yBEsOZSPyJQJvDbtlQ5L8ydy1JRgVlRoNgMDy9koQcE= gitlab.com/elixxir/comms v0.0.0-20200917172539-929fc227eb0c/go.mod h1:yBEsOZSPyJQJvDbtlQ5L8ydy1JRgVlRoNgMDy9koQcE=
gitlab.com/elixxir/comms v0.0.0-20200921200427-5955a0a798b9 h1:skzHNWCMh+T7Cn58/88Mikg2R8KnSWfzLV0w7SnerOs= gitlab.com/elixxir/comms v0.0.0-20200921200427-5955a0a798b9 h1:skzHNWCMh+T7Cn58/88Mikg2R8KnSWfzLV0w7SnerOs=
gitlab.com/elixxir/comms v0.0.0-20200921200427-5955a0a798b9/go.mod h1:uRr8j6yTjCslxZxbRe6k4ixACu9gAeF61JZH36OFFa0= gitlab.com/elixxir/comms v0.0.0-20200921200427-5955a0a798b9/go.mod h1:uRr8j6yTjCslxZxbRe6k4ixACu9gAeF61JZH36OFFa0=
gitlab.com/elixxir/comms v0.0.0-20200922163657-3e723b7170f6 h1:dFgfdATZuiPybonCBh0s4HeLB8Qw3Zm9EoLDRh2Eaaw=
gitlab.com/elixxir/comms v0.0.0-20200922163657-3e723b7170f6/go.mod h1:yBEsOZSPyJQJvDbtlQ5L8ydy1JRgVlRoNgMDy9koQcE=
gitlab.com/elixxir/comms v0.0.0-20200924072138-2e2709483d89 h1:PQalM7pnCRzZRKvdzJ6Jwz6e8bPR9H4CURzAwqYnJPE=
gitlab.com/elixxir/comms v0.0.0-20200924072138-2e2709483d89/go.mod h1:uRr8j6yTjCslxZxbRe6k4ixACu9gAeF61JZH36OFFa0=
gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4 h1:28ftZDeYEko7xptCZzeFWS1Iam95dj46TWFVVlKmw6A= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4 h1:28ftZDeYEko7xptCZzeFWS1Iam95dj46TWFVVlKmw6A=
gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c=
gitlab.com/elixxir/crypto v0.0.0-20200805174804-bdf909f2a16d/go.mod h1:cu6uNoANVLV0J6HyTL6KqVtVyh9SHU1RjJhytYlsbVQ= gitlab.com/elixxir/crypto v0.0.0-20200805174804-bdf909f2a16d/go.mod h1:cu6uNoANVLV0J6HyTL6KqVtVyh9SHU1RjJhytYlsbVQ=
......
...@@ -17,7 +17,7 @@ package network ...@@ -17,7 +17,7 @@ package network
// - /node/register.go for add/remove node events // - /node/register.go for add/remove node events
// - /rounds/historical.go for old round retrieval // - /rounds/historical.go for old round retrieval
// - /rounds/retrieve.go for message retrieval // - /rounds/retrieve.go for message retrieval
// - /message/reception.go decryption, partitioning, and signaling of messages // - /message/handle.go decryption, partitioning, and signaling of messages
// - /health/tracker.go - tracks the state of the network through the network // - /health/tracker.go - tracks the state of the network through the network
// instance // instance
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
"time" "time"
) )
//comms interface makes testing easier
type followNetworkComms interface { type followNetworkComms interface {
GetHost(hostId *id.ID) (*connect.Host, bool) GetHost(hostId *id.ID) (*connect.Host, bool)
SendPoll(host *connect.Host, message *pb.GatewayPoll) (*pb.GatewayPollResponse, error) SendPoll(host *connect.Host, message *pb.GatewayPoll) (*pb.GatewayPollResponse, error)
...@@ -55,6 +56,7 @@ func (m *manager) followNetwork(quitCh <-chan struct{}) { ...@@ -55,6 +56,7 @@ func (m *manager) followNetwork(quitCh <-chan struct{}) {
} }
} }
// executes each iteration of the follower
func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { func (m *manager) follow(rng csprng.Source, comms followNetworkComms) {
//randomly select a gateway to poll //randomly select a gateway to poll
...@@ -109,14 +111,21 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { ...@@ -109,14 +111,21 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) {
} }
// ---- Round Processing ----- // ---- Round Processing -----
//build the round checker // check rounds using the round checker function which determines if there
// are messages waiting in rounds and then sends signals to the appropriate
// handling threads
roundChecker := func(rid id.Round) bool { roundChecker := func(rid id.Round) bool {
return m.round.Checker(rid) return m.round.Checker(rid)
} }
//check rounds // get the bit vector of rounds that have been checked
checkedRounds := m.Session.GetCheckedRounds() checkedRounds := m.Session.GetCheckedRounds()
// cleave off old state in the bit vector which is deprecated from the
// network
checkedRounds.Forward(lastTrackedRound) checkedRounds.Forward(lastTrackedRound)
// loop through all rounds the client does not know about and the gateway
// does, checking the bloom filter for the user to see if there are
// messages for the user (bloom not implemented yet)
checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker, checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker,
int(m.param.MaxCheckedRounds)) int(m.param.MaxCheckedRounds))
} }
......
...@@ -5,6 +5,23 @@ import ( ...@@ -5,6 +5,23 @@ import (
"time" "time"
) )
// Messages can arrive in the network out of order. When message handling fails
// to decrypt a message, it is added to the garbled message buffer (which is
// stored on disk) and the message decryption is retried here whenever triggered.
// This can be triggered through the CheckGarbledMessages on the network manager
// and is used in the /keyExchange package on successful rekey triggering
// Triggers Garbled message checking if the queue is not full
// Exposed on the network manager
func (m *Manager) CheckGarbledMessages() {
select {
case m.triggerGarbled <- struct{}{}:
default:
}
}
//long running thread which processes garbled messages
func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) { func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) {
done := false done := false
for !done { for !done {
...@@ -17,6 +34,7 @@ func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) { ...@@ -17,6 +34,7 @@ func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) {
} }
} }
//handler for a single run of garbled messages
func (m *Manager) handleGarbledMessages() { func (m *Manager) handleGarbledMessages() {
garbledMsgs := m.Session.GetGarbledMessages() garbledMsgs := m.Session.GetGarbledMessages()
e2eKv := m.Session.E2e() e2eKv := m.Session.E2e()
...@@ -34,7 +52,8 @@ func (m *Manager) handleGarbledMessages() { ...@@ -34,7 +52,8 @@ func (m *Manager) handleGarbledMessages() {
//remove from the buffer if decryption is successful //remove from the buffer if decryption is successful
garbledMsgs.Remove(grbldMsg) garbledMsgs.Remove(grbldMsg)
//handle the successfully decrypted message //handle the successfully decrypted message
xxMsg, ok := m.partitioner.HandlePartition(sender, message.E2E, msg.GetContents()) xxMsg, ok := m.partitioner.HandlePartition(sender, message.E2E,
msg.GetContents())
if ok { if ok {
m.Switchboard.Speak(xxMsg) m.Switchboard.Speak(xxMsg)
continue continue
...@@ -42,8 +61,8 @@ func (m *Manager) handleGarbledMessages() { ...@@ -42,8 +61,8 @@ func (m *Manager) handleGarbledMessages() {
} }
} }
// fail the message if any part of the decryption fails, // fail the message if any part of the decryption fails,
// unless it is our of attempts and has been in the buffer long enough, // unless it is the last attempts and has been in the buffer long
// then remove it // enough, in which case remove it
if count == m.param.MaxChecksGarbledMessage && if count == m.param.MaxChecksGarbledMessage &&
time.Since(timestamp) > m.param.GarbledMessageWait { time.Since(timestamp) > m.param.GarbledMessageWait {
garbledMsgs.Remove(grbldMsg) garbledMsgs.Remove(grbldMsg)
......
...@@ -9,7 +9,7 @@ import ( ...@@ -9,7 +9,7 @@ import (
"time" "time"
) )
func (m *Manager) processMessages(quitCh <-chan struct{}) { func (m *Manager) handleMessages(quitCh <-chan struct{}) {
done := false done := false
for !done { for !done {
select { select {
...@@ -17,7 +17,7 @@ func (m *Manager) processMessages(quitCh <-chan struct{}) { ...@@ -17,7 +17,7 @@ func (m *Manager) processMessages(quitCh <-chan struct{}) {
done = true done = true
case bundle := <-m.messageReception: case bundle := <-m.messageReception:
for _, msg := range bundle.Messages { for _, msg := range bundle.Messages {
m.receiveMessage(msg) m.handleMessage(msg)
} }
bundle.Finish() bundle.Finish()
} }
...@@ -25,7 +25,7 @@ func (m *Manager) processMessages(quitCh <-chan struct{}) { ...@@ -25,7 +25,7 @@ func (m *Manager) processMessages(quitCh <-chan struct{}) {
} }
func (m *Manager) receiveMessage(ecrMsg format.Message) { func (m *Manager) handleMessage(ecrMsg format.Message) {
// We've done all the networking, now process the message // We've done all the networking, now process the message
fingerprint := ecrMsg.GetKeyFP() fingerprint := ecrMsg.GetKeyFP()
......
...@@ -41,22 +41,14 @@ func (m *Manager) GetMessageReceptionChannel() chan<- Bundle { ...@@ -41,22 +41,14 @@ func (m *Manager) GetMessageReceptionChannel() chan<- Bundle {
return m.messageReception return m.messageReception
} }
//Gets the channel to send received messages on
func (m *Manager) CheckGarbledMessages() {
select {
case m.triggerGarbled <- struct{}{}:
default:
}
}
//Starts all worker pool //Starts all worker pool
func (m *Manager) StartProcessies() stoppable.Stoppable { func (m *Manager) StartProcessies() stoppable.Stoppable {
multi := stoppable.NewMulti("MessageReception") multi := stoppable.NewMulti("MessageReception")
//create the message reception workers //create the message handler workers
for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ { for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ {
stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i)) stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i))
go m.processMessages(stop.Quit()) go m.handleMessages(stop.Quit())
multi.Add(stop) multi.Add(stop)
} }
......
...@@ -13,9 +13,12 @@ import ( ...@@ -13,9 +13,12 @@ import (
"time" "time"
) )
// Internal SendCmix which bypasses the network check, will attempt to send to
// Internal send e2e which bypasses the network check, for use in SendE2E and // the network without checking state. It has a built in retry system which can
// SendUnsafe which do their own network checks // be configured through the params object.
// If the message is successfully sent, the id of the round sent it is returned,
// which can be registered with the network instance to get a callback on
// its status
func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) {
timeStart := time.Now() timeStart := time.Now()
...@@ -30,11 +33,14 @@ func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err ...@@ -30,11 +33,14 @@ func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err
//find the best round to send to, excluding roudn which have been attempted //find the best round to send to, excluding roudn which have been attempted
bestRound, _ := m.Instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted) bestRound, _ := m.Instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted)
topology, err := buildToplogy(bestRound.Topology)
//build the topology
idList, err := id.NewIDListFromBytes(bestRound.Topology)
if err == nil { if err == nil {
jww.ERROR.Printf("Failed to use topology for round %v: %s", bestRound.ID, err) jww.ERROR.Printf("Failed to use topology for round %v: %s", bestRound.ID, err)
continue continue
} }
topology := connect.NewCircuit(idList)
//get they keys for the round, reject if any nodes do not have //get they keys for the round, reject if any nodes do not have
//keying relationships //keying relationships
...@@ -102,22 +108,11 @@ func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err ...@@ -102,22 +108,11 @@ func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err
return 0, errors.New("failed to send the message") return 0, errors.New("failed to send the message")
} }
func buildToplogy(nodes [][]byte) (*connect.Circuit, error) { // Signals to the node registration thread to register a node if keys are
idList := make([]*id.ID, len(nodes)) // missing. Registration is triggered automatically when the node is first seen,
for i, n := range nodes { // so this should on trigger on rare events.
nid, err := id.Unmarshal(n) func handleMissingNodeKeys(instance *network.Instance,
if err != nil { newNodeChan chan network.NodeGateway, nodes []*id.ID) {
return nil, errors.WithMessagef(err, "Failed to "+
"convert topology on node %v/%v {raw id: %v}", i, len(nodes), n)
}
idList[i] = nid
}
topology := connect.NewCircuit(idList)
return topology, nil
}
func handleMissingNodeKeys(instance *network.Instance, newNodeChan chan network.NodeGateway, nodes []*id.ID) {
for _, n := range nodes { for _, n := range nodes {
ng, err := instance.GetNodeAndGateway(n) ng, err := instance.GetNodeAndGateway(n)
if err != nil { if err != nil {
......
package rounds package rounds
import ( import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
) )
// getRoundChecker passes a context and the round infos received by the // the round checker is a single use function which is meant to be wrapped
// gateway to the funky round checker api to update round state. // and adhere to the knownRounds checker interface. it receives a round ID and
// The returned function passes round event objects over the context // looks up the state of that round to determine if the client has a message
// to the rest of the message handlers for getting messages. // waiting in it.
// It will return true if it can conclusively determine no message exists,
// returning false and set the round to processing if it needs further
// investigation.
// Once it determines messages might be waiting in a round, it determines
// if the information about that round is already present, if it is the data is
// sent to Message Retrieval Workers, otherwise it is sent to Historical Round
// Retrieval
func (m *Manager) Checker(roundID id.Round) bool { func (m *Manager) Checker(roundID id.Round) bool {
// Set round to processing, if we can // Set round to processing, if we can
processing, count := m.p.Process(roundID) processing, count := m.p.Process(roundID)
if !processing { if !processing {
// if is already processing, ignore
return false return false
} }
//if the number of times the round has been checked has hit the max, drop it
if count == m.params.MaxAttemptsCheckingARound { if count == m.params.MaxAttemptsCheckingARound {
jww.ERROR.Printf("Round %v failed the maximum number of times "+
"(%v), stopping retrval attempt", roundID,
m.params.MaxAttemptsCheckingARound)
m.p.Done(roundID) m.p.Done(roundID)
return true return true
} }
// FIXME: Spec has us SETTING processing, but not REMOVING it
// until the get messages thread completes the lookup, this
// is smell that needs refining. It seems as if there should be
// a state that lives with the round info as soon as we know
// about it that gets updated at different parts...not clear
// needs to be thought through.
//defer processing.Done(roundID)
// TODO: Bloom filter lookup -- return true when we don't have // TODO: Bloom filter lookup -- return true when we don't have
// Go get the round from the round infos, if it exists
// Go get the round from the round infos, if it exists
ri, err := m.Instance.GetRound(roundID) ri, err := m.Instance.GetRound(roundID)
if err != nil { if err != nil {
// If we didn't find it, send to historical // If we didn't find it, send to Historical Rounds Retrieval
// rounds processor
m.historicalRounds <- roundID m.historicalRounds <- roundID
} else { } else {
// IF found, send to Message Retrieval Workers
m.lookupRoundMessages <- ri m.lookupRoundMessages <- ri
} }
......
...@@ -15,6 +15,14 @@ import ( ...@@ -15,6 +15,14 @@ import (
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
) )
// Historical Rounds looks up the round history via random gateways.
// It batches these quests but never waits longer than
// params.HistoricalRoundsPeriod to do a lookup.
// Historical rounds receives input from:
// - Network Follower (/network/follow.go)
// Historical Rounds sends the output to:
// - Message Retrieval Workers (/network/round/retrieve.go)
//interface to increase east of testing of historical rounds //interface to increase east of testing of historical rounds
type historicalRoundsComms interface { type historicalRoundsComms interface {
GetHost(hostId *id.ID) (*connect.Host, bool) GetHost(hostId *id.ID) (*connect.Host, bool)
...@@ -22,10 +30,9 @@ type historicalRoundsComms interface { ...@@ -22,10 +30,9 @@ type historicalRoundsComms interface {
message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error)
} }
// ProcessHistoricalRounds analyzes round history to see if this Client // Long running thread which process historical rounds
// needs to check for messages at any of the gateways which completed // Can be killed by sending a signal to the quit channel
// those rounds. // takes a comms interface to aid in testing
// Waits to request many rounds at a time or for a timeout to trigger
func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-chan struct{}) { func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-chan struct{}) {
timerCh := make(<-chan time.Time) timerCh := make(<-chan time.Time)
...@@ -40,6 +47,16 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c ...@@ -40,6 +47,16 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
select { select {
case <-quitCh: case <-quitCh:
rng.Close() rng.Close()
// return all rounds in the queue to the input channel so they can
// be checked in the future. If the queue is full, disable them as
// processing so they are picked up from the beginning
for _, rid := range rounds {
select {
case m.historicalRounds <- id.Round(rid):
default:
m.p.NotProcessing(id.Round(rid))
}
}
done = true done = true
// if the timer elapses process rounds to ensure the delay isn't too long // if the timer elapses process rounds to ensure the delay isn't too long
case <-timerCh: case <-timerCh:
...@@ -67,6 +84,7 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c ...@@ -67,6 +84,7 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
"data: %s", err) "data: %s", err)
} }
//send the historical rounds request
hr := &pb.HistoricalRounds{ hr := &pb.HistoricalRounds{
Rounds: rounds, Rounds: rounds,
} }
...@@ -80,13 +98,20 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c ...@@ -80,13 +98,20 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C
continue continue
} }
// process the returned historical rounds.
for i, roundInfo := range response.Rounds { for i, roundInfo := range response.Rounds {
// The interface has missing returns returned as nil, such rounds
// need be be removes as processing so the network follower will
// pick them up in the future.
if roundInfo == nil { if roundInfo == nil {
jww.ERROR.Printf("could not retreive "+ jww.ERROR.Printf("could not retreive "+
"historical round %d", rounds[i]) "historical round %d", rounds[i])
m.p.Fail(id.Round(rounds[i]))
continue continue
} }
m.p.Done(id.Round(rounds[i])) // Successfully retrieved rounds are sent to the Message
// Retrieval Workers
m.lookupRoundMessages <- roundInfo m.lookupRoundMessages <- roundInfo
} }
} }
......
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
) )
type status struct { type status struct {
count uint failCount uint
processing bool processing bool
} }
...@@ -34,16 +34,15 @@ func (pr *processing) Process(id id.Round) (bool, uint) { ...@@ -34,16 +34,15 @@ func (pr *processing) Process(id id.Round) (bool, uint) {
if rs, ok := pr.rounds[id]; ok { if rs, ok := pr.rounds[id]; ok {
if rs.processing { if rs.processing {
return false, rs.count return false, rs.failCount
} }
rs.count++
rs.processing = true rs.processing = true
return true, rs.count return true, rs.failCount
} }
pr.rounds[id] = &status{ pr.rounds[id] = &status{
count: 0, failCount: 0,
processing: true, processing: true,
} }
...@@ -62,12 +61,14 @@ func (pr *processing) IsProcessing(id id.Round) bool { ...@@ -62,12 +61,14 @@ func (pr *processing) IsProcessing(id id.Round) bool {
return false return false
} }
// Fail sets a round's processing status to failed so that it can be retried. // Fail sets a round's processing status to failed and increments its fail
// counter so that it can be retried.
func (pr *processing) Fail(id id.Round) { func (pr *processing) Fail(id id.Round) {
pr.Lock() pr.Lock()
defer pr.Unlock() defer pr.Unlock()
if rs, ok := pr.rounds[id]; ok { if rs, ok := pr.rounds[id]; ok {
rs.processing = false rs.processing = false
rs.failCount++
} }
} }
...@@ -77,3 +78,13 @@ func (pr *processing) Done(id id.Round) { ...@@ -77,3 +78,13 @@ func (pr *processing) Done(id id.Round) {
defer pr.Unlock() defer pr.Unlock()
delete(pr.rounds, id) delete(pr.rounds, id)
} }
// NotProcessing sets a round's processing status to failed so that it can be
// retried but does not increment its fail counter.
func (pr *processing) NotProcessing(id id.Round) {
pr.Lock()
defer pr.Unlock()
if rs, ok := pr.rounds[id]; ok {
rs.processing = false
}
}
\ No newline at end of file
...@@ -33,10 +33,10 @@ func TestProcessing_Process(t *testing.T) { ...@@ -33,10 +33,10 @@ func TestProcessing_Process(t *testing.T) {
}{ }{
{10, true, true, 0}, {10, true, true, 0},
{10, true, false, 0}, {10, true, false, 0},
{10, false, true, 1}, {10, false, true, 0},
{100, true, true, 0}, {100, true, true, 0},
{100, true, false, 0}, {100, true, false, 0},
{100, false, true, 1}, {100, false, true, 0},
} }
for i, d := range testData { for i, d := range testData {
...@@ -86,6 +86,9 @@ func TestProcessing_Fail(t *testing.T) { ...@@ -86,6 +86,9 @@ func TestProcessing_Fail(t *testing.T) {
if pr.rounds[rid].processing { if pr.rounds[rid].processing {
t.Errorf("Fail() did not mark processing as false for round id %d.", rid) t.Errorf("Fail() did not mark processing as false for round id %d.", rid)
} }
if pr.rounds[rid].failCount != 1 {
t.Errorf("Fail() did not increment the fail count of round id %d.", rid)
}
} }
// Tests happy path of Done. // Tests happy path of Done.
......
package rounds package rounds
import ( import (
"encoding/binary"
"github.com/pkg/errors" "github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/gateway"
...@@ -40,12 +39,6 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, ...@@ -40,12 +39,6 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
} }
} }
// TODO: remove me when api fixed
func uint64ToBytes(i uint64) []byte {
bs := make([]byte, 8)
binary.LittleEndian.PutUint64(bs, 31415926)
return bs
}
func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo, func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo,
comms messageRetrievalComms) (message.Bundle, error) { comms messageRetrievalComms) (message.Bundle, error) {
...@@ -62,8 +55,7 @@ func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo, ...@@ -62,8 +55,7 @@ func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo,
// send the request // send the request
msgReq := &pb.GetMessages{ msgReq := &pb.GetMessages{
ClientID: m.Uid.Marshal(), ClientID: m.Uid.Marshal(),
//TODO: fix this, should not be a byte slice RoundID: uint64(rid),
RoundID: uint64ToBytes(uint64(rid)),
} }
msgResp, err := comms.RequestMessages(gwHost, msgReq) msgResp, err := comms.RequestMessages(gwHost, msgReq)
// Fail the round if an error occurs so it can be tried again later // Fail the round if an error occurs so it can be tried again later
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment