Skip to content
Snippets Groups Projects
Commit b12c1bbc authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

in progress rearchetecture

parent c4f98495
No related branches found
No related tags found
No related merge requests found
package rounds package rounds
type Manager struct { import (
"gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/storage"
)
func StartProcessors(ctx *context.Context) stoppable.Stoppable {
p := newProcessingRounds()
stopper := stoppable.NewSingle("TrackNetwork")
go trackNetwork(ctx, net, stopper.Quit())
} }
...@@ -22,10 +22,14 @@ import ( ...@@ -22,10 +22,14 @@ import (
"gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/network" "gitlab.com/elixxir/comms/client"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/crypto/csprng"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/primitives/knownRounds" "gitlab.com/elixxir/primitives/knownRounds"
"gitlab.com/xx_network/comms/connect"
//"gitlab.com/elixxir/comms/network" //"gitlab.com/elixxir/comms/network"
//"gitlab.com/xx_network/primitives/ndf" //"gitlab.com/xx_network/primitives/ndf"
"fmt" "fmt"
...@@ -37,16 +41,14 @@ import ( ...@@ -37,16 +41,14 @@ import (
"time" "time"
) )
// StartTrackNetwork starts a single TrackNetwork thread and returns a stoppable type trackNetworkComms interface {
func StartTrackNetwork(ctx *context.Context, net *network.Manager) stoppable.Stoppable { GetHost(hostId *id.ID) (*connect.Host, bool)
stopper := stoppable.NewSingle("TrackNetwork") SendPoll(host *connect.Host, message *pb.GatewayPoll) (*pb.GatewayPollResponse, error)
go TrackNetwork(ctx, net, stopper.Quit())
return stopper
} }
// TrackNetwork polls the network to get updated on the state of nodes, the // TrackNetwork polls the network to get updated on the state of nodes, the
// round status, and informs the client when messages can be retrieved. // round status, and informs the client when messages can be retrieved.
func TrackNetwork(ctx *context.Context, network *network.Manager, func trackNetwork(ctx *context.Context,
quitCh <-chan struct{}) { quitCh <-chan struct{}) {
opts := params.GetDefaultNetwork() opts := params.GetDefaultNetwork()
ticker := time.NewTicker(opts.TrackNetworkPeriod) ticker := time.NewTicker(opts.TrackNetworkPeriod)
...@@ -61,11 +63,10 @@ func TrackNetwork(ctx *context.Context, network *network.Manager, ...@@ -61,11 +63,10 @@ func TrackNetwork(ctx *context.Context, network *network.Manager,
} }
} }
func trackNetwork(sess *storage.Session, rng *fastRNG.StreamGenerator, network *network.Manager, maxCheckCnt int) { func track(sess *storage.Session, rng csprng.Source, p *processing,
instance := ctx.Manager.GetInstance() instance *network.Instance, comms trackNetworkComms, maxCheck int) {
comms := network.Comms
ndf := instance.GetPartialNdf().Get() ndf := instance.GetPartialNdf().Get()
defer rng.Close()
// Get a random gateway // Get a random gateway
gateways := ndf.Gateways gateways := ndf.Gateways
...@@ -87,13 +88,14 @@ func trackNetwork(sess *storage.Session, rng *fastRNG.StreamGenerator, network * ...@@ -87,13 +88,14 @@ func trackNetwork(sess *storage.Session, rng *fastRNG.StreamGenerator, network *
Hash: instance.GetPartialNdf().GetHash(), Hash: instance.GetPartialNdf().GetHash(),
}, },
LastUpdate: uint64(instance.GetLastRoundID()), LastUpdate: uint64(instance.GetLastRoundID()),
LastMessageID: "",
} }
pollResp, err := comms.SendPoll(gwHost, &pollReq) pollResp, err := comms.SendPoll(gwHost, &pollReq)
if err != nil { if err != nil {
jww.ERROR.Printf(err.Error()) jww.ERROR.Printf(err.Error())
return return
} }
//handle updates
newNDF := pollResp.PartialNDF newNDF := pollResp.PartialNDF
lastTrackedRound := id.Round(pollResp.LastTrackedRound) lastTrackedRound := id.Round(pollResp.LastTrackedRound)
roundUpdates := pollResp.Updates roundUpdates := pollResp.Updates
...@@ -107,33 +109,40 @@ func trackNetwork(sess *storage.Session, rng *fastRNG.StreamGenerator, network * ...@@ -107,33 +109,40 @@ func trackNetwork(sess *storage.Session, rng *fastRNG.StreamGenerator, network *
// ---- NODE EVENTS ---- // ---- NODE EVENTS ----
// NOTE: this updates the structure AND sends events over the node // NOTE: this updates the structure AND sends events over the node
// update channels // update channels
instance.UpdatePartialNdf(newNDF) err = instance.UpdatePartialNdf(newNDF)
instance.UpdateRounds(roundUpdates) if err != nil {
jww.ERROR.Printf(err.Error())
return
}
err = instance.RoundUpdates(roundUpdates)
if err != nil {
jww.ERROR.Printf(err.Error())
return
}
// ---- Round Processing ----- // ---- Round Processing -----
checkedRounds := sess.GetCheckedRounds() checkedRounds := sess.GetCheckedRounds()
roundChecker := getRoundChecker(network, roundUpdates) roundChecker := getRoundChecker(network, roundUpdates)
checkedRounds.Forward(lastTrackedRound) checkedRounds.Forward(lastTrackedRound)
checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker, checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker, maxCheck)
maxCheckCnt)
} }
// getRoundChecker passes a context and the round infos received by the // getRoundChecker passes a context and the round infos received by the
// gateway to the funky round checker api to update round state. // gateway to the funky round checker api to update round state.
// The returned function passes round event objects over the context // The returned function passes round event objects over the context
// to the rest of the message handlers for getting messages. // to the rest of the message handlers for getting messages.
func getRoundChecker(network *network.Manager) func(roundID id.Round) bool { func getRoundChecker(p *processing, instance *network.Instance, maxAttempts uint) func(roundID id.Round) bool {
return func(roundID id.Round) bool { return func(roundID id.Round) bool {
//sess := ctx.Session
processing := network.Processing
// Set round to processing, if we can // Set round to processing, if we can
// FIXME: this appears to be a race condition -- either fix processing, count := p.Process(roundID)
// or make it not look like one. if !processing {
if processing.IsProcessing(roundID) {
return false return false
} }
processing.Add(roundID) if count == maxAttempts {
p.Remove(roundID)
return true
}
// FIXME: Spec has us SETTING processing, but not REMOVING it // FIXME: Spec has us SETTING processing, but not REMOVING it
// until the get messages thread completes the lookup, this // until the get messages thread completes the lookup, this
// is smell that needs refining. It seems as if there should be // is smell that needs refining. It seems as if there should be
...@@ -145,7 +154,6 @@ func getRoundChecker(network *network.Manager) func(roundID id.Round) bool { ...@@ -145,7 +154,6 @@ func getRoundChecker(network *network.Manager) func(roundID id.Round) bool {
// TODO: Bloom filter lookup -- return true when we don't have // TODO: Bloom filter lookup -- return true when we don't have
// Go get the round from the round infos, if it exists // Go get the round from the round infos, if it exists
instance := network.GetInstance()
ri, err := instance.GetRound(roundID) ri, err := instance.GetRound(roundID)
if err != nil { if err != nil {
// If we didn't find it, send to historical // If we didn't find it, send to historical
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment