Skip to content
Snippets Groups Projects
Commit 856041f2 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Reception stubbed out to receive messages thread. Needs additional definition before it's complete.

parent c0d76602
No related branches found
No related tags found
No related merge requests found
package params
import (
"time"
)
type Network struct {
TrackNetworkPeriod time.Duration
NumWorkers int
MaxHistoricalRounds int
}
func GetDefaultNetwork() Network {
return Network{
TrackNetworkPeriod: 100 * time.Millisecond,
NumWorkers: 4,
MaxHistoricalRounds: 100,
}
}
...@@ -32,7 +32,7 @@ func (s *Single) IsRunning() bool { ...@@ -32,7 +32,7 @@ func (s *Single) IsRunning() bool {
} }
// Quit returns the read only channel it will send the stop signal on. // Quit returns the read only channel it will send the stop signal on.
func (s *Single) Quit() chan<- struct{} { func (s *Single) Quit() <-chan struct{} {
return s.quit return s.quit
} }
......
...@@ -21,7 +21,7 @@ require ( ...@@ -21,7 +21,7 @@ require (
gitlab.com/elixxir/primitives v0.0.0-20200915190719-f4586ec93f50 gitlab.com/elixxir/primitives v0.0.0-20200915190719-f4586ec93f50
gitlab.com/xx_network/comms v0.0.0-20200915154643-d533291041b7 gitlab.com/xx_network/comms v0.0.0-20200915154643-d533291041b7
gitlab.com/xx_network/crypto v0.0.0-20200812183430-c77a5281c686 gitlab.com/xx_network/crypto v0.0.0-20200812183430-c77a5281c686
gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2 gitlab.com/xx_network/primitives v0.0.0-20200915204206-eb0287ed0031
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/sys v0.0.0-20200828194041-157a740278f4 // indirect golang.org/x/sys v0.0.0-20200828194041-157a740278f4 // indirect
google.golang.org/protobuf v1.25.0 google.golang.org/protobuf v1.25.0
......
...@@ -264,6 +264,8 @@ gitlab.com/xx_network/primitives v0.0.0-20200804183002-f99f7a7284da h1:CCVslUwNC ...@@ -264,6 +264,8 @@ gitlab.com/xx_network/primitives v0.0.0-20200804183002-f99f7a7284da h1:CCVslUwNC
gitlab.com/xx_network/primitives v0.0.0-20200804183002-f99f7a7284da/go.mod h1:OK9xevzWCaPO7b1wiluVJGk7R5ZsuC7pHY5hteZFQug= gitlab.com/xx_network/primitives v0.0.0-20200804183002-f99f7a7284da/go.mod h1:OK9xevzWCaPO7b1wiluVJGk7R5ZsuC7pHY5hteZFQug=
gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2 h1:Jvv2fLk+2ULDCXiVTPI8Jlg2fBrmq2NSA+dlsN5t2Pk= gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2 h1:Jvv2fLk+2ULDCXiVTPI8Jlg2fBrmq2NSA+dlsN5t2Pk=
gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2/go.mod h1:OK9xevzWCaPO7b1wiluVJGk7R5ZsuC7pHY5hteZFQug= gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2/go.mod h1:OK9xevzWCaPO7b1wiluVJGk7R5ZsuC7pHY5hteZFQug=
gitlab.com/xx_network/primitives v0.0.0-20200915204206-eb0287ed0031 h1:QU+UGZPdQXUsKtOfR7hLxjg3/+MI1d+NqXgREQxzOe4=
gitlab.com/xx_network/primitives v0.0.0-20200915204206-eb0287ed0031/go.mod h1:wtdCMr7DPePz9qwctNoAUzZtbOSHSedcK++3Df3psjA=
gitlab.com/xx_network/ring v0.0.2 h1:TlPjlbFdhtJrwvRgIg4ScdngMTaynx/ByHBRZiXCoL0= gitlab.com/xx_network/ring v0.0.2 h1:TlPjlbFdhtJrwvRgIg4ScdngMTaynx/ByHBRZiXCoL0=
gitlab.com/xx_network/ring v0.0.2/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM= gitlab.com/xx_network/ring v0.0.2/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
......
...@@ -12,6 +12,7 @@ package network ...@@ -12,6 +12,7 @@ package network
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/network/health" "gitlab.com/elixxir/client/network/health"
"gitlab.com/elixxir/comms/client" "gitlab.com/elixxir/comms/client"
...@@ -19,6 +20,7 @@ import ( ...@@ -19,6 +20,7 @@ import (
"gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/crypto/signature/rsa"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
// "gitlab.com/xx_network/primitives/ndf" // "gitlab.com/xx_network/primitives/ndf"
pb "gitlab.com/elixxir/comms/mixmessages"
"time" "time"
) )
...@@ -43,6 +45,11 @@ type Manager struct { ...@@ -43,6 +45,11 @@ type Manager struct {
//channels //channels
nodeRegistration chan network.NodeGateway nodeRegistration chan network.NodeGateway
roundUpdate chan *pb.RoundInfo
historicalLookup chan id.Round
// Processing rounds
Processing *ProcessingRounds
//local pointer to user ID because it is used often //local pointer to user ID because it is used often
uid *id.ID uid *id.ID
...@@ -74,6 +81,8 @@ func NewManager(ctx *context.Context) (*Manager, error) { ...@@ -74,6 +81,8 @@ func NewManager(ctx *context.Context) (*Manager, error) {
" client network manager") " client network manager")
} }
opts := params.GetDefaultNetwork()
cm := &Manager{ cm := &Manager{
Comms: comms, Comms: comms,
Context: ctx, Context: ctx,
...@@ -81,6 +90,11 @@ func NewManager(ctx *context.Context) (*Manager, error) { ...@@ -81,6 +90,11 @@ func NewManager(ctx *context.Context) (*Manager, error) {
health: health.Init(ctx, 5*time.Second), health: health.Init(ctx, 5*time.Second),
instance: instance, instance: instance,
uid: cryptoUser.GetUserID(), uid: cryptoUser.GetUserID(),
Processing: NewProcessingRounds(),
roundUpdate: make(chan *pb.RoundInfo, opts.NumWorkers),
historicalLookup: make(chan id.Round, opts.NumWorkers),
nodeRegistration: make(chan network.NodeGateway,
opts.NumWorkers),
} }
return cm, nil return cm, nil
...@@ -111,12 +125,12 @@ func (m *Manager) StartRunners() error { ...@@ -111,12 +125,12 @@ func (m *Manager) StartRunners() error {
// Start the Network Tracker // Start the Network Tracker
m.runners.Add(StartTrackNetwork(m.Context, m)) m.runners.Add(StartTrackNetwork(m.Context, m))
// Message reception // Message reception
m.runners.Add(StartMessageReceivers(m.Context)) m.runners.Add(StartMessageReceivers(m.Context, m))
// Node Updates // Node Updates
m.runners.Add(StartNodeKeyExchange(m.Context)) // Adding/Keys m.runners.Add(StartNodeKeyExchange(m.Context, m)) // Adding/Keys
m.runners.Add(StartNodeRemover(m.Context)) // Removing m.runners.Add(StartNodeRemover(m.Context)) // Removing
// Round history processing // Round history processing
m.runners.Add(StartProcessHistoricalRounds(m.Context)) m.runners.Add(StartProcessHistoricalRounds(m.Context, m))
// health tracker // health tracker
m.health.Start() m.health.Start()
m.runners.Add(m.health) m.runners.Add(m.health)
...@@ -146,3 +160,19 @@ func (m *Manager) GetHealthTracker() context.HealthTracker { ...@@ -146,3 +160,19 @@ func (m *Manager) GetHealthTracker() context.HealthTracker {
func (m *Manager) GetInstance() *network.Instance { func (m *Manager) GetInstance() *network.Instance {
return m.instance return m.instance
} }
// GetNodeRegistrationCh returns node registration channel for node
// events.
func (m *Manager) GetNodeRegistrationCh() chan network.NodeGateway {
return m.nodeRegistration
}
// GetRoundUpdateCh returns the network managers round update channel
func (m *Manager) GetRoundUpdateCh() chan *pb.RoundInfo {
return m.roundUpdate
}
// GetHistoricalLookupCh returns the historical round lookup channel
func (m *Manager) GetHistoricalLookupCh() chan id.Round {
return m.historicalLookup
}
...@@ -20,16 +20,20 @@ import ( ...@@ -20,16 +20,20 @@ import (
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
// "sync" // "sync"
// "time" // "time"
"fmt"
jww "github.com/spf13/jwalterweatherman"
) )
// StartNodeKeyExchange kicks off a worker pool of node key exchange routines // StartNodeKeyExchange kicks off a worker pool of node key exchange routines
func StartNodeKeyExchange(ctx *context.Context) stoppable.Stoppable { func StartNodeKeyExchange(ctx *context.Context,
network *Manager) stoppable.Stoppable {
stoppers := stoppable.NewMulti("NodeKeyExchangers") stoppers := stoppable.NewMulti("NodeKeyExchangers")
numWorkers := params.GetDefaultNodeKeys().WorkerPoolSize numWorkers := params.GetDefaultNetwork().NumWorkers
keyCh := make(chan network.NodeGateway, numWorkers) keyCh := network.GetNodeRegistrationCh()
ctx.Manager.GetInstance().SetAddNodeChan(keyCh) ctx.Manager.GetInstance().SetAddNodeChan(keyCh)
for i := 0; i < ctx.GetNumNodeKeyExchangers(); i++ { for i := 0; i < numWorkers; i++ {
stopper := stoppable.NewSingle("NodeKeyExchange" + i) stopper := stoppable.NewSingle(
fmt.Sprintf("NodeKeyExchange%d", i))
go ExchangeNodeKeys(ctx, keyCh, stopper.Quit()) go ExchangeNodeKeys(ctx, keyCh, stopper.Quit())
stoppers.Add(stopper) stoppers.Add(stopper)
} }
...@@ -60,9 +64,10 @@ func StartNodeRemover(ctx *context.Context) stoppable.Stoppable { ...@@ -60,9 +64,10 @@ func StartNodeRemover(ctx *context.Context) stoppable.Stoppable {
numWorkers := params.GetDefaultNodeKeys().WorkerPoolSize numWorkers := params.GetDefaultNodeKeys().WorkerPoolSize
remCh := make(chan *id.ID, numWorkers) remCh := make(chan *id.ID, numWorkers)
ctx.Manager.GetInstance().SetRemoveNodeChan(remCh) ctx.Manager.GetInstance().SetRemoveNodeChan(remCh)
for i := 0; i < ctx.GetNumNodeRemovers(); i++ { for i := uint(0); i < numWorkers; i++ {
stopper := stoppable.NewSingle("RemoveNode" + i) stopper := stoppable.NewSingle(
go RemoveNode(ctx, remCh, quitCh) fmt.Sprintf("RemoveNode%d", i))
go RemoveNode(ctx, remCh, stopper.Quit())
stoppers.Add(stopper) stoppers.Add(stopper)
} }
return stoppers return stoppers
......
...@@ -14,6 +14,13 @@ type ProcessingRounds struct { ...@@ -14,6 +14,13 @@ type ProcessingRounds struct {
sync.RWMutex sync.RWMutex
} }
// NewProcessingRounds returns a processing rounds object
func NewProcessingRounds() *ProcessingRounds {
return &ProcessingRounds{
rounds: make(map[id.Round]struct{}),
}
}
// Add a round to the list of processing rounds // Add a round to the list of processing rounds
func (pr *ProcessingRounds) Add(id id.Round) { func (pr *ProcessingRounds) Add(id id.Round) {
pr.Lock() pr.Lock()
......
...@@ -7,56 +7,61 @@ ...@@ -7,56 +7,61 @@
package network package network
import ( import (
"fmt"
"gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/client/context/stoppable"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
//jww "github.com/spf13/jwalterweatherman"
) )
// ReceiveMessage is called by a MessageReceiver routine whenever a new CMIX // ReceiveMessages is called by a MessageReceiver routine whenever new CMIX
// message is available. // messages are available at a gateway.
func ReceiveMessage(ctx *context.Context, m *format.Message) { func ReceiveMessages(ctx *context.Context, roundInfo *pb.RoundInfo) {
// decrypted, err := decrypt(ctx, m) // Returns MessagePart msgs := getMessagesFromGateway(ctx, roundInfo)
// if err != nil { for _, m := range msgs {
// // Add to error/garbled messages list receiveMessage(ctx, m)
// jww.WARN.Errorf("Could not decode message: %+v", err) }
// ctx.GetGarbledMesssages().Add(m)
// }
// // Reconstruct the partitioned message
// completeMsg := constructMessageFromPartition(ctx, decrypted) // Returns ClientMessage
// if completeMsg != nil {
// ctx.GetSwitchBoard().Say(completeMsg)
// }
} }
// StartMessageReceivers starts a worker pool of message receivers, which listen // StartMessageReceivers starts a worker pool of message receivers, which listen
// on a channel for messages and run them through processing. // on a channel for rounds in which to check for messages and run them through
func StartMessageReceivers(ctx *context.Context) stoppable.Stoppable { // processing.
// We assume receivers channel is set up elsewhere, but note that this func StartMessageReceivers(ctx *context.Context,
// would also be a reasonable place under assumption of 1 call to network *Manager) stoppable.Stoppable {
// message receivers (would also make sense to .Close it instead of
// using quit channel, which somewhat simplifies for loop later.
stoppers := stoppable.NewMulti("MessageReceivers") stoppers := stoppable.NewMulti("MessageReceivers")
// receiverCh := ctx.GetNetwork().GetMessageReceiverCh() opts := params.GetDefaultNetwork()
// for i := 0; i < ctx.GetNumReceivers(); i++ { receiverCh := network.GetRoundUpdateCh()
// stopper := stoppable.NewSingle("MessageReceiver" + i) for i := 0; i < opts.NumWorkers; i++ {
// go MessageReceiver(ctx, messagesCh, stopper.Quit()) stopper := stoppable.NewSingle(
// stoppers.Add(stopper) fmt.Sprintf("MessageReceiver%d", i))
// } go MessageReceiver(ctx, receiverCh, stopper.Quit())
stoppers.Add(stopper)
}
return stoppers return stoppers
} }
// MessageReceiver waits until quit signal or there is a message // MessageReceiver waits until quit signal or there is a round available
// available on the messages channel. // for which to check for messages available on the round updates channel.
func MessageReceiver(ctx *context.Context, messagesCh chan format.Message, func MessageReceiver(ctx *context.Context, updatesCh chan *pb.RoundInfo,
quitCh <-chan struct{}) { quitCh <-chan struct{}) {
done := false done := false
for !done { for !done {
select { select {
case <-quitCh: case <-quitCh:
done = true done = true
// case m := <-messagesCh: case round := <-updatesCh:
// ReceiveMessage(ctx, m) ReceiveMessages(ctx, round)
}
} }
} }
func getMessagesFromGateway(ctx *context.Context,
roundInfo *pb.RoundInfo) []format.Message {
return nil
}
func receiveMessage(ctx *context.Context, msg format.Message) {
// do stuff
} }
...@@ -8,57 +8,59 @@ package network ...@@ -8,57 +8,59 @@ package network
import ( import (
"gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/client/context/stoppable"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
// "time" "gitlab.com/xx_network/primitives/id"
"time"
) )
// StartProcessHistoricalRounds starts a worker for processing round // StartProcessHistoricalRounds starts a worker for processing round
// history. // history.
func StartProcessHistoricalRounds(ctx *context.Context) stoppable.Stoppable { func StartProcessHistoricalRounds(ctx *context.Context,
network *Manager) stoppable.Stoppable {
stopper := stoppable.NewSingle("ProcessHistoricalRounds") stopper := stoppable.NewSingle("ProcessHistoricalRounds")
go ProcessHistoricalRounds(ctx, stopper.Quit()) go ProcessHistoricalRounds(ctx, network, stopper.Quit())
return stopper return stopper
} }
// ProcessHistoricalRounds analyzes round history to see if this Client // ProcessHistoricalRounds analyzes round history to see if this Client
// needs to check for messages at any of the gateways which completed // needs to check for messages at any of the gateways which completed
// those rounds. // those rounds.
func ProcessHistoricalRounds(ctx *context.Context, quitCh <-chan struct{}) { func ProcessHistoricalRounds(ctx *context.Context, network *Manager,
// ticker := time.NewTicker(ctx.GetTrackNetworkPeriod()) quitCh <-chan struct{}) {
// var rounds []RoundID opts := params.GetDefaultNetwork()
ticker := time.NewTicker(opts.TrackNetworkPeriod)
var rounds []id.Round
done := false done := false
for !done { for !done {
//shouldProcess := false shouldProcess := false
select { select {
case <-quitCh: case <-quitCh:
done = true done = true
// case <-ticker: case <-ticker.C:
// if len(rounds) > 0 { if len(rounds) > 0 {
// shouldProcess = true shouldProcess = true
// } }
// case rid := <-ctx.GetHistoricalRoundsCh(): case rid := <-network.GetHistoricalLookupCh():
// rounds = append(rounds, rid) rounds = append(rounds, rid)
// if len(rounds) > ctx.GetSendSize() { if len(rounds) > opts.MaxHistoricalRounds {
// shouldProcess = true shouldProcess = true
// } }
// } }
// if !shouldProcess { if !shouldProcess {
// continue continue
// } }
// var roundInfos []*RoundInfo roundInfos := processHistoricalRounds(ctx, rounds)
// roundInfos = processHistoricalRounds(ctx, rounds) for _, ri := range roundInfos {
// rounds := make([]RoundID) network.GetRoundUpdateCh() <- ri
// for _, ri := range roundInfos {
// ctx.GetMessagesCh() <- ri
// }
} }
} }
} }
func processHistoricalRounds(ctx *context.Context, func processHistoricalRounds(ctx *context.Context,
rids []uint64) []*pb.RoundInfo { rids []id.Round) []*pb.RoundInfo {
// for loop over rids? // for loop over rids?
// network := ctx.GetNetwork() // network := ctx.GetNetwork()
// gw := network.GetGateway() // gw := network.GetGateway()
......
...@@ -20,9 +20,14 @@ package network ...@@ -20,9 +20,14 @@ package network
import ( import (
"encoding/binary" "encoding/binary"
"gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/comms/network" //"gitlab.com/elixxir/comms/network"
"gitlab.com/xx_network/primitives/ndf" //"gitlab.com/xx_network/primitives/ndf"
"fmt"
jww "github.com/spf13/jwalterweatherman"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/xx_network/primitives/id"
"io" "io"
"math" "math"
"time" "time"
...@@ -67,13 +72,14 @@ func StartTrackNetwork(ctx *context.Context, net *Manager) stoppable.Stoppable { ...@@ -67,13 +72,14 @@ func StartTrackNetwork(ctx *context.Context, net *Manager) stoppable.Stoppable {
// round status, and informs the client when messages can be retrieved. // round status, and informs the client when messages can be retrieved.
func TrackNetwork(ctx *context.Context, network *Manager, func TrackNetwork(ctx *context.Context, network *Manager,
quitCh <-chan struct{}) { quitCh <-chan struct{}) {
ticker := time.NewTicker(ctx.GetTrackNetworkPeriod()) opts := params.GetDefaultNetwork()
ticker := time.NewTicker(opts.TrackNetworkPeriod)
done := false done := false
for !done { for !done {
select { select {
case <-quitCh: case <-quitCh:
done = true done = true
case <-ticker: case <-ticker.C:
trackNetwork(ctx, network) trackNetwork(ctx, network)
} }
} }
...@@ -83,12 +89,19 @@ func trackNetwork(ctx *context.Context, network *Manager) { ...@@ -83,12 +89,19 @@ func trackNetwork(ctx *context.Context, network *Manager) {
instance := ctx.Manager.GetInstance() instance := ctx.Manager.GetInstance()
comms := network.Comms comms := network.Comms
ndf := instance.GetPartialNdf().Get() ndf := instance.GetPartialNdf().Get()
rng := ctx.Rng rng := ctx.Rng.GetStream()
defer rng.Close()
sess := ctx.Session
// Get a random gateway // Get a random gateway
gateways := ndf.Gateways gateways := ndf.Gateways
gwID := gateways[ReadRangeUint32(0, len(gateways), rng)].GetGatewayId() gwIdx := ReadRangeUint32(0, uint32(len(gateways)), rng)
gwHost, ok := comms.GetHost(gwHost) gwID, err := gateways[gwIdx].GetGatewayId()
if err != nil {
jww.ERROR.Printf(err.Error())
return
}
gwHost, ok := comms.GetHost(gwID)
if !ok { if !ok {
jww.ERROR.Printf("could not get host for gateway %s", gwID) jww.ERROR.Printf("could not get host for gateway %s", gwID)
return return
...@@ -96,18 +109,22 @@ func trackNetwork(ctx *context.Context, network *Manager) { ...@@ -96,18 +109,22 @@ func trackNetwork(ctx *context.Context, network *Manager) {
// Poll for the new NDF // Poll for the new NDF
pollReq := pb.GatewayPoll{ pollReq := pb.GatewayPoll{
NDFHash: instance.GetPartialNdf().GetHash(), Partial: &pb.NDFHash{
LastRound: instance.GetLastRoundID(), Hash: instance.GetPartialNdf().GetHash(),
LastMessageID: nil, },
LastUpdate: uint64(instance.GetLastRoundID()),
LastMessageID: "",
} }
pollResp, err := comms.SendPoll(gwHost) pollResp, err := comms.SendPoll(gwHost, &pollReq)
if err != nil { if err != nil {
jww.ERROR.Printf(err) jww.ERROR.Printf(err.Error())
return
} }
newNDF := pollResp.NDF newNDF := pollResp.PartialNDF
lastRoundInfo := pollResp.RoundInfo lastRoundInfo := pollResp.LastRound
roundUpdates := pollResp.Updates roundUpdates := pollResp.Updates
newMessageIDs := pollRespon.NewMessageIDs // This is likely unused in favor of new API
//newMessageIDs := pollResp.NewMessageIDs
// ---- NODE EVENTS ---- // ---- NODE EVENTS ----
// NOTE: this updates the structure AND sends events over the node // NOTE: this updates the structure AND sends events over the node
...@@ -115,15 +132,64 @@ func trackNetwork(ctx *context.Context, network *Manager) { ...@@ -115,15 +132,64 @@ func trackNetwork(ctx *context.Context, network *Manager) {
instance.UpdatePartialNdf(newNDF) instance.UpdatePartialNdf(newNDF)
// ---- Round Processing ----- // ---- Round Processing -----
checkedRounds := sess.GetCheckedRounds()
roundChecker := getRoundChecker(ctx, network, roundUpdates)
checkedRounds.RangeUnchecked(id.Round(lastRoundInfo.ID), roundChecker)
// rounds, err = network.UpdateRounds(ctx, ndf) // FIXME: Seems odd/like a race condition to do this here, but this is
// if err != nil { // spec. Fix this to either eliminate race condition or not make it
// // ... // weird. This is tied to if a round is processing. It appears that
// } // if it is processing OR already checked is the state we care about,
// because we really want to know if we should look it up and process,
// and that could be done via storage inside range Unchecked?
for _, ri := range roundUpdates {
checkedRounds.Check(id.Round(ri.ID))
}
}
// err = rounds.GetKnownRound().MaskedRange(gateway, // getRoundChecker passes a context and the round infos received by the
// network.CheckRoundsFunction) // gateway to the funky round checker api to update round state.
// if err != nil { // The returned function passes round event objects over the context
// // ... // to the rest of the message handlers for getting messages.
// } func getRoundChecker(ctx *context.Context, network *Manager,
roundInfos []*pb.RoundInfo) func(roundID id.Round) bool {
return func(roundID id.Round) bool {
//sess := ctx.Session
processing := network.Processing
// Set round to processing, if we can
// FIXME: this appears to be a race condition -- either fix
// or make it not look like one.
if processing.IsProcessing(roundID) {
return false
}
processing.Add(roundID)
// FIXME: Spec has us SETTING processing, but not REMOVING it
// until the get messages thread completes the lookup, this
// is smell that needs refining. It seems as if there should be
// a state that lives with the round info as soon as we know
// about it that gets updated at different parts...not clear
// needs to be thought through.
//defer processing.Remove(roundID)
// TODO: Bloom filter lookup -- return true when we don't have
// Go get the round from the round infos, if it exists
// For now, if we have the round in th round updates,
// process it, otherwise call historical rounds code
// to go find it if possible.
for _, ri := range roundInfos {
rID := id.Round(ri.ID)
if rID == roundID {
// Send to get message processor
network.GetRoundUpdateCh() <- ri
return false
}
}
// If we didn't find it, send to historical rounds processor
network.GetHistoricalLookupCh() <- roundID
return false
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment