diff --git a/network/rounds/round.go b/network/rounds/round.go index 4dc1cd9c335f1b2efd70a35df28a04cf54ee9346..2fa59fb92a6f246934dca88c3258978cf83514d6 100644 --- a/network/rounds/round.go +++ b/network/rounds/round.go @@ -1,4 +1,14 @@ package rounds -type Manager struct { +import ( + "gitlab.com/elixxir/client/context" + "gitlab.com/elixxir/client/context/params" + "gitlab.com/elixxir/client/context/stoppable" + "gitlab.com/elixxir/client/storage" +) + +func StartProcessors(ctx *context.Context) stoppable.Stoppable { + p := newProcessingRounds() + stopper := stoppable.NewSingle("TrackNetwork") + go trackNetwork(ctx, net, stopper.Quit()) } diff --git a/network/rounds/track.go b/network/rounds/track.go index 8285d62065804dbede82a6d3ce6d6872df72f01b..baa3abd2f545b2c4f02aa272fd1c1993fbc9b6b6 100644 --- a/network/rounds/track.go +++ b/network/rounds/track.go @@ -22,10 +22,14 @@ import ( "gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/context/stoppable" - "gitlab.com/elixxir/client/network" + "gitlab.com/elixxir/comms/client" + "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/client/storage" + "gitlab.com/elixxir/crypto/csprng" "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/primitives/knownRounds" + "gitlab.com/xx_network/comms/connect" + //"gitlab.com/elixxir/comms/network" //"gitlab.com/xx_network/primitives/ndf" "fmt" @@ -37,16 +41,14 @@ import ( "time" ) -// StartTrackNetwork starts a single TrackNetwork thread and returns a stoppable -func StartTrackNetwork(ctx *context.Context, net *network.Manager) stoppable.Stoppable { - stopper := stoppable.NewSingle("TrackNetwork") - go TrackNetwork(ctx, net, stopper.Quit()) - return stopper +type trackNetworkComms interface { + GetHost(hostId *id.ID) (*connect.Host, bool) + SendPoll(host *connect.Host, message *pb.GatewayPoll) (*pb.GatewayPollResponse, error) } // TrackNetwork polls the network to get updated on the state of nodes, the // round status, and informs the client when messages can be retrieved. -func TrackNetwork(ctx *context.Context, network *network.Manager, +func trackNetwork(ctx *context.Context, quitCh <-chan struct{}) { opts := params.GetDefaultNetwork() ticker := time.NewTicker(opts.TrackNetworkPeriod) @@ -61,11 +63,10 @@ func TrackNetwork(ctx *context.Context, network *network.Manager, } } -func trackNetwork(sess *storage.Session, rng *fastRNG.StreamGenerator, network *network.Manager, maxCheckCnt int) { - instance := ctx.Manager.GetInstance() - comms := network.Comms +func track(sess *storage.Session, rng csprng.Source, p *processing, + instance *network.Instance, comms trackNetworkComms, maxCheck int) { + ndf := instance.GetPartialNdf().Get() - defer rng.Close() // Get a random gateway gateways := ndf.Gateways @@ -87,13 +88,14 @@ func trackNetwork(sess *storage.Session, rng *fastRNG.StreamGenerator, network * Hash: instance.GetPartialNdf().GetHash(), }, LastUpdate: uint64(instance.GetLastRoundID()), - LastMessageID: "", } pollResp, err := comms.SendPoll(gwHost, &pollReq) if err != nil { jww.ERROR.Printf(err.Error()) return } + + //handle updates newNDF := pollResp.PartialNDF lastTrackedRound := id.Round(pollResp.LastTrackedRound) roundUpdates := pollResp.Updates @@ -107,33 +109,40 @@ func trackNetwork(sess *storage.Session, rng *fastRNG.StreamGenerator, network * // ---- NODE EVENTS ---- // NOTE: this updates the structure AND sends events over the node // update channels - instance.UpdatePartialNdf(newNDF) - instance.UpdateRounds(roundUpdates) + err = instance.UpdatePartialNdf(newNDF) + if err != nil { + jww.ERROR.Printf(err.Error()) + return + } + err = instance.RoundUpdates(roundUpdates) + if err != nil { + jww.ERROR.Printf(err.Error()) + return + } // ---- Round Processing ----- checkedRounds := sess.GetCheckedRounds() roundChecker := getRoundChecker(network, roundUpdates) checkedRounds.Forward(lastTrackedRound) - checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker, - maxCheckCnt) + checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker, maxCheck) } // getRoundChecker passes a context and the round infos received by the // gateway to the funky round checker api to update round state. // The returned function passes round event objects over the context // to the rest of the message handlers for getting messages. -func getRoundChecker(network *network.Manager) func(roundID id.Round) bool { +func getRoundChecker(p *processing, instance *network.Instance, maxAttempts uint) func(roundID id.Round) bool { return func(roundID id.Round) bool { - //sess := ctx.Session - processing := network.Processing // Set round to processing, if we can - // FIXME: this appears to be a race condition -- either fix - // or make it not look like one. - if processing.IsProcessing(roundID) { + processing, count := p.Process(roundID) + if !processing { return false } - processing.Add(roundID) + if count == maxAttempts { + p.Remove(roundID) + return true + } // FIXME: Spec has us SETTING processing, but not REMOVING it // until the get messages thread completes the lookup, this // is smell that needs refining. It seems as if there should be @@ -145,7 +154,6 @@ func getRoundChecker(network *network.Manager) func(roundID id.Round) bool { // TODO: Bloom filter lookup -- return true when we don't have // Go get the round from the round infos, if it exists - instance := network.GetInstance() ri, err := instance.GetRound(roundID) if err != nil { // If we didn't find it, send to historical