diff --git a/context/params/network.go b/context/params/network.go new file mode 100644 index 0000000000000000000000000000000000000000..440e79bf8ad02add3049046c8ce25a9908ff0ead --- /dev/null +++ b/context/params/network.go @@ -0,0 +1,19 @@ +package params + +import ( + "time" +) + +type Network struct { + TrackNetworkPeriod time.Duration + NumWorkers int + MaxHistoricalRounds int +} + +func GetDefaultNetwork() Network { + return Network{ + TrackNetworkPeriod: 100 * time.Millisecond, + NumWorkers: 4, + MaxHistoricalRounds: 100, + } +} diff --git a/context/stoppable/single.go b/context/stoppable/single.go index acd8da0077427215934dae20b68ff9346396492c..c5eadaefa40e8070ebb4b85ae710ff65d0f8fab6 100644 --- a/context/stoppable/single.go +++ b/context/stoppable/single.go @@ -32,7 +32,7 @@ func (s *Single) IsRunning() bool { } // Quit returns the read only channel it will send the stop signal on. -func (s *Single) Quit() chan<- struct{} { +func (s *Single) Quit() <-chan struct{} { return s.quit } diff --git a/go.mod b/go.mod index d9dd4e336723205b3dd0452afb100c9f56762ebd..15ea65f965919c0b6283753ddb34db8136cf3fdf 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( gitlab.com/elixxir/primitives v0.0.0-20200915190719-f4586ec93f50 gitlab.com/xx_network/comms v0.0.0-20200915154643-d533291041b7 gitlab.com/xx_network/crypto v0.0.0-20200812183430-c77a5281c686 - gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2 + gitlab.com/xx_network/primitives v0.0.0-20200915204206-eb0287ed0031 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/sys v0.0.0-20200828194041-157a740278f4 // indirect google.golang.org/protobuf v1.25.0 diff --git a/go.sum b/go.sum index 2b4b41de94b358f6313e4001fb97f7ca14a4662d..a4010ef3ade48bce78c1b1c736041266cf6608ad 100644 --- a/go.sum +++ b/go.sum @@ -264,6 +264,8 @@ gitlab.com/xx_network/primitives v0.0.0-20200804183002-f99f7a7284da h1:CCVslUwNC gitlab.com/xx_network/primitives v0.0.0-20200804183002-f99f7a7284da/go.mod h1:OK9xevzWCaPO7b1wiluVJGk7R5ZsuC7pHY5hteZFQug= gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2 h1:Jvv2fLk+2ULDCXiVTPI8Jlg2fBrmq2NSA+dlsN5t2Pk= gitlab.com/xx_network/primitives v0.0.0-20200812183720-516a65a4a9b2/go.mod h1:OK9xevzWCaPO7b1wiluVJGk7R5ZsuC7pHY5hteZFQug= +gitlab.com/xx_network/primitives v0.0.0-20200915204206-eb0287ed0031 h1:QU+UGZPdQXUsKtOfR7hLxjg3/+MI1d+NqXgREQxzOe4= +gitlab.com/xx_network/primitives v0.0.0-20200915204206-eb0287ed0031/go.mod h1:wtdCMr7DPePz9qwctNoAUzZtbOSHSedcK++3Df3psjA= gitlab.com/xx_network/ring v0.0.2 h1:TlPjlbFdhtJrwvRgIg4ScdngMTaynx/ByHBRZiXCoL0= gitlab.com/xx_network/ring v0.0.2/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= diff --git a/network/manager.go b/network/manager.go index 34c836040cfd1cdc632b4e8ed41753a71168f5b3..59fc45219f59077fc5d75a365c38933dd228030b 100644 --- a/network/manager.go +++ b/network/manager.go @@ -12,6 +12,7 @@ package network import ( "github.com/pkg/errors" "gitlab.com/elixxir/client/context" + "gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/client/network/health" "gitlab.com/elixxir/comms/client" @@ -19,6 +20,7 @@ import ( "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/primitives/id" // "gitlab.com/xx_network/primitives/ndf" + pb "gitlab.com/elixxir/comms/mixmessages" "time" ) @@ -43,6 +45,11 @@ type Manager struct { //channels nodeRegistration chan network.NodeGateway + roundUpdate chan *pb.RoundInfo + historicalLookup chan id.Round + + // Processing rounds + Processing *ProcessingRounds //local pointer to user ID because it is used often uid *id.ID @@ -74,13 +81,20 @@ func NewManager(ctx *context.Context) (*Manager, error) { " client network manager") } + opts := params.GetDefaultNetwork() + cm := &Manager{ - Comms: comms, - Context: ctx, - runners: stoppable.NewMulti("network.Manager"), - health: health.Init(ctx, 5*time.Second), - instance: instance, - uid: cryptoUser.GetUserID(), + Comms: comms, + Context: ctx, + runners: stoppable.NewMulti("network.Manager"), + health: health.Init(ctx, 5*time.Second), + instance: instance, + uid: cryptoUser.GetUserID(), + Processing: NewProcessingRounds(), + roundUpdate: make(chan *pb.RoundInfo, opts.NumWorkers), + historicalLookup: make(chan id.Round, opts.NumWorkers), + nodeRegistration: make(chan network.NodeGateway, + opts.NumWorkers), } return cm, nil @@ -111,12 +125,12 @@ func (m *Manager) StartRunners() error { // Start the Network Tracker m.runners.Add(StartTrackNetwork(m.Context, m)) // Message reception - m.runners.Add(StartMessageReceivers(m.Context)) + m.runners.Add(StartMessageReceivers(m.Context, m)) // Node Updates - m.runners.Add(StartNodeKeyExchange(m.Context)) // Adding/Keys - m.runners.Add(StartNodeRemover(m.Context)) // Removing + m.runners.Add(StartNodeKeyExchange(m.Context, m)) // Adding/Keys + m.runners.Add(StartNodeRemover(m.Context)) // Removing // Round history processing - m.runners.Add(StartProcessHistoricalRounds(m.Context)) + m.runners.Add(StartProcessHistoricalRounds(m.Context, m)) // health tracker m.health.Start() m.runners.Add(m.health) @@ -146,3 +160,19 @@ func (m *Manager) GetHealthTracker() context.HealthTracker { func (m *Manager) GetInstance() *network.Instance { return m.instance } + +// GetNodeRegistrationCh returns node registration channel for node +// events. +func (m *Manager) GetNodeRegistrationCh() chan network.NodeGateway { + return m.nodeRegistration +} + +// GetRoundUpdateCh returns the network managers round update channel +func (m *Manager) GetRoundUpdateCh() chan *pb.RoundInfo { + return m.roundUpdate +} + +// GetHistoricalLookupCh returns the historical round lookup channel +func (m *Manager) GetHistoricalLookupCh() chan id.Round { + return m.historicalLookup +} diff --git a/network/nodes.go b/network/nodes.go index a70f8465a7927a6c3932aecc9c950c10d7d2555b..3559a44b5040c669c80d8a742e182ab36a5d1f73 100644 --- a/network/nodes.go +++ b/network/nodes.go @@ -20,16 +20,20 @@ import ( "gitlab.com/xx_network/primitives/id" // "sync" // "time" + "fmt" + jww "github.com/spf13/jwalterweatherman" ) // StartNodeKeyExchange kicks off a worker pool of node key exchange routines -func StartNodeKeyExchange(ctx *context.Context) stoppable.Stoppable { +func StartNodeKeyExchange(ctx *context.Context, + network *Manager) stoppable.Stoppable { stoppers := stoppable.NewMulti("NodeKeyExchangers") - numWorkers := params.GetDefaultNodeKeys().WorkerPoolSize - keyCh := make(chan network.NodeGateway, numWorkers) + numWorkers := params.GetDefaultNetwork().NumWorkers + keyCh := network.GetNodeRegistrationCh() ctx.Manager.GetInstance().SetAddNodeChan(keyCh) - for i := 0; i < ctx.GetNumNodeKeyExchangers(); i++ { - stopper := stoppable.NewSingle("NodeKeyExchange" + i) + for i := 0; i < numWorkers; i++ { + stopper := stoppable.NewSingle( + fmt.Sprintf("NodeKeyExchange%d", i)) go ExchangeNodeKeys(ctx, keyCh, stopper.Quit()) stoppers.Add(stopper) } @@ -60,9 +64,10 @@ func StartNodeRemover(ctx *context.Context) stoppable.Stoppable { numWorkers := params.GetDefaultNodeKeys().WorkerPoolSize remCh := make(chan *id.ID, numWorkers) ctx.Manager.GetInstance().SetRemoveNodeChan(remCh) - for i := 0; i < ctx.GetNumNodeRemovers(); i++ { - stopper := stoppable.NewSingle("RemoveNode" + i) - go RemoveNode(ctx, remCh, quitCh) + for i := uint(0); i < numWorkers; i++ { + stopper := stoppable.NewSingle( + fmt.Sprintf("RemoveNode%d", i)) + go RemoveNode(ctx, remCh, stopper.Quit()) stoppers.Add(stopper) } return stoppers diff --git a/network/processingrounds.go b/network/processingrounds.go index 55e1aae0750f23e89c39602d7e9ba1c0727a0bfb..4cbcfe27c5457607457768559b256a667ca98ab2 100644 --- a/network/processingrounds.go +++ b/network/processingrounds.go @@ -14,6 +14,13 @@ type ProcessingRounds struct { sync.RWMutex } +// NewProcessingRounds returns a processing rounds object +func NewProcessingRounds() *ProcessingRounds { + return &ProcessingRounds{ + rounds: make(map[id.Round]struct{}), + } +} + // Add a round to the list of processing rounds func (pr *ProcessingRounds) Add(id id.Round) { pr.Lock() diff --git a/network/receive.go b/network/receive.go index b8b11e47bf17e4ded817b28915227986ae5cf710..67abcfa427113b8de8df887676870091d89c1a3a 100644 --- a/network/receive.go +++ b/network/receive.go @@ -7,56 +7,61 @@ package network import ( + "fmt" "gitlab.com/elixxir/client/context" + "gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/context/stoppable" + pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/primitives/format" + //jww "github.com/spf13/jwalterweatherman" ) -// ReceiveMessage is called by a MessageReceiver routine whenever a new CMIX -// message is available. -func ReceiveMessage(ctx *context.Context, m *format.Message) { - // decrypted, err := decrypt(ctx, m) // Returns MessagePart - // if err != nil { - // // Add to error/garbled messages list - // jww.WARN.Errorf("Could not decode message: %+v", err) - // ctx.GetGarbledMesssages().Add(m) - // } - - // // Reconstruct the partitioned message - // completeMsg := constructMessageFromPartition(ctx, decrypted) // Returns ClientMessage - // if completeMsg != nil { - // ctx.GetSwitchBoard().Say(completeMsg) - // } +// ReceiveMessages is called by a MessageReceiver routine whenever new CMIX +// messages are available at a gateway. +func ReceiveMessages(ctx *context.Context, roundInfo *pb.RoundInfo) { + msgs := getMessagesFromGateway(ctx, roundInfo) + for _, m := range msgs { + receiveMessage(ctx, m) + } } // StartMessageReceivers starts a worker pool of message receivers, which listen -// on a channel for messages and run them through processing. -func StartMessageReceivers(ctx *context.Context) stoppable.Stoppable { - // We assume receivers channel is set up elsewhere, but note that this - // would also be a reasonable place under assumption of 1 call to - // message receivers (would also make sense to .Close it instead of - // using quit channel, which somewhat simplifies for loop later. +// on a channel for rounds in which to check for messages and run them through +// processing. +func StartMessageReceivers(ctx *context.Context, + network *Manager) stoppable.Stoppable { stoppers := stoppable.NewMulti("MessageReceivers") - // receiverCh := ctx.GetNetwork().GetMessageReceiverCh() - // for i := 0; i < ctx.GetNumReceivers(); i++ { - // stopper := stoppable.NewSingle("MessageReceiver" + i) - // go MessageReceiver(ctx, messagesCh, stopper.Quit()) - // stoppers.Add(stopper) - // } + opts := params.GetDefaultNetwork() + receiverCh := network.GetRoundUpdateCh() + for i := 0; i < opts.NumWorkers; i++ { + stopper := stoppable.NewSingle( + fmt.Sprintf("MessageReceiver%d", i)) + go MessageReceiver(ctx, receiverCh, stopper.Quit()) + stoppers.Add(stopper) + } return stoppers } -// MessageReceiver waits until quit signal or there is a message -// available on the messages channel. -func MessageReceiver(ctx *context.Context, messagesCh chan format.Message, +// MessageReceiver waits until quit signal or there is a round available +// for which to check for messages available on the round updates channel. +func MessageReceiver(ctx *context.Context, updatesCh chan *pb.RoundInfo, quitCh <-chan struct{}) { done := false for !done { select { case <-quitCh: done = true - // case m := <-messagesCh: - // ReceiveMessage(ctx, m) + case round := <-updatesCh: + ReceiveMessages(ctx, round) } } } + +func getMessagesFromGateway(ctx *context.Context, + roundInfo *pb.RoundInfo) []format.Message { + return nil +} + +func receiveMessage(ctx *context.Context, msg format.Message) { + // do stuff +} diff --git a/network/rounds.go b/network/rounds.go index 032ecb3a049f2723ba4f2c36f50a5ad17352c513..21ab22463eb5d982fcd515a5a4a024be9de3fdcd 100644 --- a/network/rounds.go +++ b/network/rounds.go @@ -8,57 +8,59 @@ package network import ( "gitlab.com/elixxir/client/context" + "gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/context/stoppable" pb "gitlab.com/elixxir/comms/mixmessages" - // "time" + "gitlab.com/xx_network/primitives/id" + "time" ) // StartProcessHistoricalRounds starts a worker for processing round // history. -func StartProcessHistoricalRounds(ctx *context.Context) stoppable.Stoppable { +func StartProcessHistoricalRounds(ctx *context.Context, + network *Manager) stoppable.Stoppable { stopper := stoppable.NewSingle("ProcessHistoricalRounds") - go ProcessHistoricalRounds(ctx, stopper.Quit()) + go ProcessHistoricalRounds(ctx, network, stopper.Quit()) return stopper } // ProcessHistoricalRounds analyzes round history to see if this Client // needs to check for messages at any of the gateways which completed // those rounds. -func ProcessHistoricalRounds(ctx *context.Context, quitCh <-chan struct{}) { - // ticker := time.NewTicker(ctx.GetTrackNetworkPeriod()) - // var rounds []RoundID +func ProcessHistoricalRounds(ctx *context.Context, network *Manager, + quitCh <-chan struct{}) { + opts := params.GetDefaultNetwork() + ticker := time.NewTicker(opts.TrackNetworkPeriod) + var rounds []id.Round done := false for !done { - //shouldProcess := false + shouldProcess := false select { case <-quitCh: done = true - // case <-ticker: - // if len(rounds) > 0 { - // shouldProcess = true - // } - // case rid := <-ctx.GetHistoricalRoundsCh(): - // rounds = append(rounds, rid) - // if len(rounds) > ctx.GetSendSize() { - // shouldProcess = true - // } - // } - // if !shouldProcess { - // continue - // } + case <-ticker.C: + if len(rounds) > 0 { + shouldProcess = true + } + case rid := <-network.GetHistoricalLookupCh(): + rounds = append(rounds, rid) + if len(rounds) > opts.MaxHistoricalRounds { + shouldProcess = true + } + } + if !shouldProcess { + continue + } - // var roundInfos []*RoundInfo - // roundInfos = processHistoricalRounds(ctx, rounds) - // rounds := make([]RoundID) - // for _, ri := range roundInfos { - // ctx.GetMessagesCh() <- ri - // } + roundInfos := processHistoricalRounds(ctx, rounds) + for _, ri := range roundInfos { + network.GetRoundUpdateCh() <- ri } } } func processHistoricalRounds(ctx *context.Context, - rids []uint64) []*pb.RoundInfo { + rids []id.Round) []*pb.RoundInfo { // for loop over rids? // network := ctx.GetNetwork() // gw := network.GetGateway() diff --git a/network/updates.go b/network/updates.go index b77cf1f021739247a6ba1201a32d79f0b8c417ea..af33b43e340d0420e991aaf14630004663bba3c2 100644 --- a/network/updates.go +++ b/network/updates.go @@ -20,9 +20,14 @@ package network import ( "encoding/binary" "gitlab.com/elixxir/client/context" + "gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/context/stoppable" - "gitlab.com/elixxir/comms/network" - "gitlab.com/xx_network/primitives/ndf" + //"gitlab.com/elixxir/comms/network" + //"gitlab.com/xx_network/primitives/ndf" + "fmt" + jww "github.com/spf13/jwalterweatherman" + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/xx_network/primitives/id" "io" "math" "time" @@ -67,13 +72,14 @@ func StartTrackNetwork(ctx *context.Context, net *Manager) stoppable.Stoppable { // round status, and informs the client when messages can be retrieved. func TrackNetwork(ctx *context.Context, network *Manager, quitCh <-chan struct{}) { - ticker := time.NewTicker(ctx.GetTrackNetworkPeriod()) + opts := params.GetDefaultNetwork() + ticker := time.NewTicker(opts.TrackNetworkPeriod) done := false for !done { select { case <-quitCh: done = true - case <-ticker: + case <-ticker.C: trackNetwork(ctx, network) } } @@ -83,12 +89,19 @@ func trackNetwork(ctx *context.Context, network *Manager) { instance := ctx.Manager.GetInstance() comms := network.Comms ndf := instance.GetPartialNdf().Get() - rng := ctx.Rng + rng := ctx.Rng.GetStream() + defer rng.Close() + sess := ctx.Session // Get a random gateway gateways := ndf.Gateways - gwID := gateways[ReadRangeUint32(0, len(gateways), rng)].GetGatewayId() - gwHost, ok := comms.GetHost(gwHost) + gwIdx := ReadRangeUint32(0, uint32(len(gateways)), rng) + gwID, err := gateways[gwIdx].GetGatewayId() + if err != nil { + jww.ERROR.Printf(err.Error()) + return + } + gwHost, ok := comms.GetHost(gwID) if !ok { jww.ERROR.Printf("could not get host for gateway %s", gwID) return @@ -96,18 +109,22 @@ func trackNetwork(ctx *context.Context, network *Manager) { // Poll for the new NDF pollReq := pb.GatewayPoll{ - NDFHash: instance.GetPartialNdf().GetHash(), - LastRound: instance.GetLastRoundID(), - LastMessageID: nil, + Partial: &pb.NDFHash{ + Hash: instance.GetPartialNdf().GetHash(), + }, + LastUpdate: uint64(instance.GetLastRoundID()), + LastMessageID: "", } - pollResp, err := comms.SendPoll(gwHost) + pollResp, err := comms.SendPoll(gwHost, &pollReq) if err != nil { - jww.ERROR.Printf(err) + jww.ERROR.Printf(err.Error()) + return } - newNDF := pollResp.NDF - lastRoundInfo := pollResp.RoundInfo + newNDF := pollResp.PartialNDF + lastRoundInfo := pollResp.LastRound roundUpdates := pollResp.Updates - newMessageIDs := pollRespon.NewMessageIDs + // This is likely unused in favor of new API + //newMessageIDs := pollResp.NewMessageIDs // ---- NODE EVENTS ---- // NOTE: this updates the structure AND sends events over the node @@ -115,15 +132,64 @@ func trackNetwork(ctx *context.Context, network *Manager) { instance.UpdatePartialNdf(newNDF) // ---- Round Processing ----- + checkedRounds := sess.GetCheckedRounds() + roundChecker := getRoundChecker(ctx, network, roundUpdates) + checkedRounds.RangeUnchecked(id.Round(lastRoundInfo.ID), roundChecker) + + // FIXME: Seems odd/like a race condition to do this here, but this is + // spec. Fix this to either eliminate race condition or not make it + // weird. This is tied to if a round is processing. It appears that + // if it is processing OR already checked is the state we care about, + // because we really want to know if we should look it up and process, + // and that could be done via storage inside range Unchecked? + for _, ri := range roundUpdates { + checkedRounds.Check(id.Round(ri.ID)) + } +} - // rounds, err = network.UpdateRounds(ctx, ndf) - // if err != nil { - // // ... - // } +// getRoundChecker passes a context and the round infos received by the +// gateway to the funky round checker api to update round state. +// The returned function passes round event objects over the context +// to the rest of the message handlers for getting messages. +func getRoundChecker(ctx *context.Context, network *Manager, + roundInfos []*pb.RoundInfo) func(roundID id.Round) bool { + return func(roundID id.Round) bool { + //sess := ctx.Session + processing := network.Processing - // err = rounds.GetKnownRound().MaskedRange(gateway, - // network.CheckRoundsFunction) - // if err != nil { - // // ... - // } + // Set round to processing, if we can + // FIXME: this appears to be a race condition -- either fix + // or make it not look like one. + if processing.IsProcessing(roundID) { + return false + } + processing.Add(roundID) + // FIXME: Spec has us SETTING processing, but not REMOVING it + // until the get messages thread completes the lookup, this + // is smell that needs refining. It seems as if there should be + // a state that lives with the round info as soon as we know + // about it that gets updated at different parts...not clear + // needs to be thought through. + //defer processing.Remove(roundID) + + // TODO: Bloom filter lookup -- return true when we don't have + // Go get the round from the round infos, if it exists + + // For now, if we have the round in th round updates, + // process it, otherwise call historical rounds code + // to go find it if possible. + for _, ri := range roundInfos { + rID := id.Round(ri.ID) + if rID == roundID { + // Send to get message processor + network.GetRoundUpdateCh() <- ri + return false + } + } + + // If we didn't find it, send to historical rounds processor + network.GetHistoricalLookupCh() <- roundID + + return false + } }