Skip to content
Snippets Groups Projects
Commit f40bff36 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

in progress rearchetecture

parent b12c1bbc
No related branches found
No related tags found
No related merge requests found
...@@ -6,16 +6,18 @@ import ( ...@@ -6,16 +6,18 @@ import (
type Network struct { type Network struct {
TrackNetworkPeriod time.Duration TrackNetworkPeriod time.Duration
NumWorkers int NumWorkers uint
MaxHistoricalRounds int // maximum number of rounds to check in a single iterations network updates
MaxCheckCnt int MaxCheckCheckedRounds uint
Rounds
} }
func GetDefaultNetwork() Network { func GetDefaultNetwork() Network {
return Network{ n := Network{
TrackNetworkPeriod: 100 * time.Millisecond, TrackNetworkPeriod: 100 * time.Millisecond,
NumWorkers: 4, NumWorkers: 4,
MaxHistoricalRounds: 100, MaxCheckCheckedRounds: 500,
MaxCheckCnt: 100,
} }
n.Rounds = GetDefaultRounds()
return n
} }
package params
import "time"
type Rounds struct {
// maximum number of times to attempt to retrieve a round from a gateway
// before giving up on it
MaxAttemptsCheckingARound uint
// number of historical rounds required to automatically send a historical
// rounds query
MaxHistoricalRounds uint
// maximum period of time a pending historical round query will wait before
// it si transmitted
HistoricalRoundsPeriod time.Duration
//Length of historical rounds channel buffer
HistoricalRoundsBufferLen uint
//Length of round lookup channel buffer
LookupRoundsBufferLen uint
}
func GetDefaultRounds() Rounds {
return Rounds{
MaxAttemptsCheckingARound: 5,
MaxHistoricalRounds: 100,
HistoricalRoundsPeriod: 100 * time.Millisecond,
HistoricalRoundsBufferLen: 1000,
LookupRoundsBufferLen: 2000,
}
}
...@@ -32,7 +32,7 @@ func (s *Single) IsRunning() bool { ...@@ -32,7 +32,7 @@ func (s *Single) IsRunning() bool {
} }
// Quit returns the read only channel it will send the stop signal on. // Quit returns the read only channel it will send the stop signal on.
func (s *Single) Quit() chan<- struct{} { func (s *Single) Quit() <-chan struct{} {
return s.quit return s.quit
} }
......
package gateway
import (
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/ndf"
"io"
"math"
)
type HostGetter interface {
GetHost(hostId *id.ID) (*connect.Host, bool)
}
func Get(ndf *ndf.NetworkDefinition, hg HostGetter, rng io.Reader) (*connect.Host, error) {
// Get a random gateway
gateways := ndf.Gateways
gwIdx := ReadRangeUint32(0, uint32(len(gateways)), rng)
gwID, err := gateways[gwIdx].GetGatewayId()
if err != nil {
return nil, errors.WithMessage(err, "failed to get Gateway")
}
gwHost, ok := hg.GetHost(gwID)
if !ok {
return nil, errors.Errorf("host for gateway %s could not be "+
"retrieved", gwID)
}
return gwHost, nil
}
func GetLast(hg HostGetter, ri *mixmessages.RoundInfo) (*connect.Host, error) {
roundTop := ri.GetTopology()
lastGw, err := id.Unmarshal(roundTop[len(roundTop)-1])
if err != nil {
return nil, err
}
lastGw.SetType(id.Gateway)
gwHost, ok := hg.GetHost(lastGw)
if !ok {
return nil, errors.Errorf("Could not find host for gateway %s", lastGw)
}
return gwHost, nil
}
// ReadUint32 reads an integer from an io.Reader (which should be a CSPRNG)
func ReadUint32(rng io.Reader) uint32 {
var rndBytes [4]byte
i, err := rng.Read(rndBytes[:])
if i != 4 || err != nil {
panic(fmt.Sprintf("cannot read from rng: %+v", err))
}
return binary.BigEndian.Uint32(rndBytes[:])
}
// ReadRangeUint32 reduces an integer from 0, MaxUint32 to the range start, end
func ReadRangeUint32(start, end uint32, rng io.Reader) uint32 {
size := end - start
// note we could just do the part inside the () here, but then extra
// can == size which means a little bit of range is wastes, either
// choice seems negligible so we went with the "more correct"
extra := (math.MaxUint32%size + 1) % size
limit := math.MaxUint32 - extra
// Loop until we read something inside the limit
for {
res := ReadUint32(rng)
if res > limit {
continue
}
return (res % size) + start
}
}
...@@ -21,6 +21,8 @@ import ( ...@@ -21,6 +21,8 @@ import (
"gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/comms/network"
"gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/crypto/signature/rsa"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/ndf"
// "gitlab.com/xx_network/primitives/ndf" // "gitlab.com/xx_network/primitives/ndf"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
"time" "time"
...@@ -31,9 +33,11 @@ import ( ...@@ -31,9 +33,11 @@ import (
// functions used by the client. // functions used by the client.
type Manager struct { type Manager struct {
// Comms pointer to send/recv messages // Comms pointer to send/recv messages
Comms *client.Comms comms *client.Comms
// Context contains all of the keying info used to send messages // Context contains all of the keying info used to send messages
Context *context.Context context *context.Context
param params.Network
// runners are the Network goroutines that handle reception // runners are the Network goroutines that handle reception
runners *stoppable.Multi runners *stoppable.Multi
...@@ -45,23 +49,18 @@ type Manager struct { ...@@ -45,23 +49,18 @@ type Manager struct {
//contains the network instance //contains the network instance
instance *network.Instance instance *network.Instance
//Partitioner //sub-managers
partitioner parse.Partitioner round *rounds.Manager
//channels //channels
nodeRegistration chan network.NodeGateway nodeRegistration chan network.NodeGateway
roundUpdate chan *pb.RoundInfo
historicalLookup chan id.Round
// Processing rounds
Processing *rounds.processing
//local pointer to user ID because it is used often //local pointer to user ID because it is used often
uid *id.ID uid *id.ID
} }
// NewManager builds a new reception manager object using inputted key fields // NewManager builds a new reception manager object using inputted key fields
func NewManager(ctx *context.Context) (*Manager, error) { func NewManager(ctx *context.Context, params params.Network, ndf *ndf.NetworkDefinition) (*Manager, error) {
//get the user from storage //get the user from storage
user := ctx.Session.User() user := ctx.Session.User()
...@@ -80,27 +79,21 @@ func NewManager(ctx *context.Context) (*Manager, error) { ...@@ -80,27 +79,21 @@ func NewManager(ctx *context.Context) (*Manager, error) {
//start network instance //start network instance
// TODO: Need to parse/retrieve the ntework string and load it // TODO: Need to parse/retrieve the ntework string and load it
// from the context storage session! // from the context storage session!
instance, err := network.NewInstance(comms.ProtoComms, nil, nil, nil) instance, err := network.NewInstance(comms.ProtoComms, ndf, nil, nil)
if err != nil { if err != nil {
return nil, errors.WithMessage(err, "failed to create"+ return nil, errors.WithMessage(err, "failed to create"+
" client network manager") " client network manager")
} }
opts := params.GetDefaultNetwork()
cm := &Manager{ cm := &Manager{
Comms: comms, comms: comms,
Context: ctx, context: ctx,
param: params,
runners: stoppable.NewMulti("network.Manager"), runners: stoppable.NewMulti("network.Manager"),
health: health.Init(ctx, 5*time.Second), health: health.Init(ctx, 5*time.Second),
instance: instance, instance: instance,
uid: cryptoUser.GetUserID(), uid: cryptoUser.GetUserID(),
partitioner: parse.NewPartitioner(msgSize, ctx),
Processing: rounds.NewProcessingRounds(),
roundUpdate: make(chan *pb.RoundInfo, opts.NumWorkers),
historicalLookup: make(chan id.Round, opts.NumWorkers),
nodeRegistration: make(chan network.NodeGateway,
opts.NumWorkers),
} }
return cm, nil return cm, nil
...@@ -109,12 +102,12 @@ func NewManager(ctx *context.Context) (*Manager, error) { ...@@ -109,12 +102,12 @@ func NewManager(ctx *context.Context) (*Manager, error) {
// GetRemoteVersion contacts the permissioning server and returns the current // GetRemoteVersion contacts the permissioning server and returns the current
// supported client version. // supported client version.
func (m *Manager) GetRemoteVersion() (string, error) { func (m *Manager) GetRemoteVersion() (string, error) {
permissioningHost, ok := m.Comms.GetHost(&id.Permissioning) permissioningHost, ok := m.comms.GetHost(&id.Permissioning)
if !ok { if !ok {
return "", errors.Errorf("no permissioning host with id %s", return "", errors.Errorf("no permissioning host with id %s",
id.Permissioning) id.Permissioning)
} }
registrationVersion, err := m.Comms.SendGetCurrentClientVersionMessage( registrationVersion, err := m.comms.SendGetCurrentClientVersionMessage(
permissioningHost) permissioningHost)
if err != nil { if err != nil {
return "", err return "", err
...@@ -129,14 +122,17 @@ func (m *Manager) StartRunners() error { ...@@ -129,14 +122,17 @@ func (m *Manager) StartRunners() error {
} }
// Start the Network Tracker // Start the Network Tracker
m.runners.Add(rounds.StartTrackNetwork(m.Context, m)) trackNetworkStopper := stoppable.NewSingle("TrackNetwork")
go m.trackNetwork(trackNetworkStopper.Quit())
m.runners.Add(trackNetworkStopper)
// Message reception // Message reception
m.runners.Add(StartMessageReceivers(m.Context, m)) m.runners.Add(StartMessageReceivers(m.Context, m))
// Node Updates // Node Updates
m.runners.Add(StartNodeKeyExchange(m.Context, m)) // Adding/Keys m.runners.Add(StartNodeKeyExchange(m.Context, m)) // Adding/Keys
m.runners.Add(StartNodeRemover(m.Context)) // Removing m.runners.Add(StartNodeRemover(m.Context)) // Removing
// Round history processing // Round history processing
m.runners.Add(StartProcessHistoricalRounds(m.Context, m)) m.runners.Add(rounds.StartProcessHistoricalRounds(m.Context, m))
// health tracker // health tracker
m.health.Start() m.health.Start()
m.runners.Add(m.health) m.runners.Add(m.health)
...@@ -163,13 +159,13 @@ func (m *Manager) GetHealthTracker() context.HealthTracker { ...@@ -163,13 +159,13 @@ func (m *Manager) GetHealthTracker() context.HealthTracker {
} }
// GetInstance returns the network instance object (ndf state) // GetInstance returns the network instance object (ndf state)
func (m *Manager) GetInstance() *network.Instance { func (m *Manager) GetInstance() *rounds.Instance {
return m.instance return m.instance
} }
// GetNodeRegistrationCh returns node registration channel for node // GetNodeRegistrationCh returns node registration channel for node
// events. // events.
func (m *Manager) GetNodeRegistrationCh() chan network.NodeGateway { func (m *Manager) GetNodeRegistrationCh() chan rounds.NodeGateway {
return m.nodeRegistration return m.nodeRegistration
} }
......
package message
package network package message
import ( import (
"github.com/golang-collections/collections/set" "github.com/golang-collections/collections/set"
...@@ -17,7 +17,7 @@ import ( ...@@ -17,7 +17,7 @@ import (
// recipient. Note that both SendE2E and SendUnsafe call SendCMIX. // recipient. Note that both SendE2E and SendUnsafe call SendCMIX.
// Returns the round ID of the round the payload was sent or an error // Returns the round ID of the round the payload was sent or an error
// if it fails. // if it fails.
func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { func (m *rounds.Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) {
if !m.health.IsRunning() { if !m.health.IsRunning() {
return 0, errors.New("Cannot send cmix message when the " + return 0, errors.New("Cannot send cmix message when the " +
"network is not healthy") "network is not healthy")
...@@ -28,7 +28,7 @@ func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err ...@@ -28,7 +28,7 @@ func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err
// Internal send e2e which bypasses the network check, for use in SendE2E and // Internal send e2e which bypasses the network check, for use in SendE2E and
// SendUnsafe which do their own network checks // SendUnsafe which do their own network checks
func (m *Manager) sendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { func (m *rounds.Manager) sendCMIX(msg format.Message, param params.CMIX) (id.Round, error) {
timeStart := time.Now() timeStart := time.Now()
attempted := set.New() attempted := set.New()
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
// All rights reserved. / // All rights reserved. /
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
package network package message
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
...@@ -19,7 +19,7 @@ import ( ...@@ -19,7 +19,7 @@ import (
// SendE2E sends an end-to-end payload to the provided recipient with // SendE2E sends an end-to-end payload to the provided recipient with
// the provided msgType. Returns the list of rounds in which parts of // the provided msgType. Returns the list of rounds in which parts of
// the message were sent or an error if it fails. // the message were sent or an error if it fails.
func (m *Manager) SendE2E(msg message.Send, e2eP params.E2E) ( func (m *rounds.Manager) SendE2E(msg message.Send, e2eP params.E2E) (
[]id.Round, error) { []id.Round, error) {
if !m.health.IsRunning() { if !m.health.IsRunning() {
...@@ -30,7 +30,7 @@ func (m *Manager) SendE2E(msg message.Send, e2eP params.E2E) ( ...@@ -30,7 +30,7 @@ func (m *Manager) SendE2E(msg message.Send, e2eP params.E2E) (
return m.sendE2E(msg, e2eP) return m.sendE2E(msg, e2eP)
} }
func (m *Manager) sendE2E(msg message.Send, param params.E2E) ([]id.Round, error) { func (m *rounds.Manager) sendE2E(msg message.Send, param params.E2E) ([]id.Round, error) {
//timestamp the message //timestamp the message
ts := time.Now() ts := time.Now()
......
package network package message
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
...@@ -17,7 +17,7 @@ import ( ...@@ -17,7 +17,7 @@ import (
// of the message were sent or an error if it fails. // of the message were sent or an error if it fails.
// NOTE: Do not use this function unless you know what you are doing. // NOTE: Do not use this function unless you know what you are doing.
// This function always produces an error message in client logging. // This function always produces an error message in client logging.
func (m *Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { func (m *rounds.Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) {
if !m.health.IsRunning() { if !m.health.IsRunning() {
return nil, errors.New("cannot send unsafe message when the " + return nil, errors.New("cannot send unsafe message when the " +
"network is not healthy") "network is not healthy")
...@@ -30,7 +30,7 @@ func (m *Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, ...@@ -30,7 +30,7 @@ func (m *Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round,
return m.sendUnsafe(msg, param) return m.sendUnsafe(msg, param)
} }
func (m *Manager) sendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { func (m *rounds.Manager) sendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) {
//timestamp the message //timestamp the message
ts := time.Now() ts := time.Now()
......
...@@ -13,6 +13,8 @@ import ( ...@@ -13,6 +13,8 @@ import (
"gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/network/rounds"
// "gitlab.com/elixxir/comms/client" // "gitlab.com/elixxir/comms/client"
// "gitlab.com/elixxir/primitives/format" // "gitlab.com/elixxir/primitives/format"
// "gitlab.com/elixxir/primitives/switchboard" // "gitlab.com/elixxir/primitives/switchboard"
...@@ -26,7 +28,7 @@ import ( ...@@ -26,7 +28,7 @@ import (
// StartNodeKeyExchange kicks off a worker pool of node key exchange routines // StartNodeKeyExchange kicks off a worker pool of node key exchange routines
func StartNodeKeyExchange(ctx *context.Context, func StartNodeKeyExchange(ctx *context.Context,
network *Manager) stoppable.Stoppable { network *rounds.Manager) stoppable.Stoppable {
stoppers := stoppable.NewMulti("NodeKeyExchangers") stoppers := stoppable.NewMulti("NodeKeyExchangers")
numWorkers := params.GetDefaultNetwork().NumWorkers numWorkers := params.GetDefaultNetwork().NumWorkers
keyCh := network.GetNodeRegistrationCh() keyCh := network.GetNodeRegistrationCh()
......
...@@ -12,17 +12,19 @@ import ( ...@@ -12,17 +12,19 @@ import (
"gitlab.com/elixxir/client/context/message" "gitlab.com/elixxir/client/context/message"
"gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/network/parse" "gitlab.com/elixxir/client/network/rounds"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/crypto/e2e" "gitlab.com/elixxir/crypto/e2e"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
//jww "github.com/spf13/jwalterweatherman" //jww "github.com/spf13/jwalterweatherman"
) )
// ReceiveMessages is called by a MessageReceiver routine whenever new CMIX // ReceiveMessages is called by a MessageReceiver routine whenever new CMIX
// messages are available at a gateway. // messages are available at a gateway.
func ReceiveMessages(ctx *context.Context, roundInfo *pb.RoundInfo, func ReceiveMessages(ctx *context.Context, roundInfo *pb.RoundInfo,
network *Manager) { network *rounds.Manager) {
msgs := getMessagesFromGateway(ctx, roundInfo) msgs := getMessagesFromGateway(ctx, roundInfo)
for _, m := range msgs { for _, m := range msgs {
receiveMessage(ctx, m) receiveMessage(ctx, m)
...@@ -33,7 +35,7 @@ func ReceiveMessages(ctx *context.Context, roundInfo *pb.RoundInfo, ...@@ -33,7 +35,7 @@ func ReceiveMessages(ctx *context.Context, roundInfo *pb.RoundInfo,
// on a channel for rounds in which to check for messages and run them through // on a channel for rounds in which to check for messages and run them through
// processing. // processing.
func StartMessageReceivers(ctx *context.Context, func StartMessageReceivers(ctx *context.Context,
network *Manager) stoppable.Stoppable { network *rounds.Manager) stoppable.Stoppable {
stoppers := stoppable.NewMulti("MessageReceivers") stoppers := stoppable.NewMulti("MessageReceivers")
opts := params.GetDefaultNetwork() opts := params.GetDefaultNetwork()
receiverCh := network.GetRoundUpdateCh() receiverCh := network.GetRoundUpdateCh()
...@@ -48,7 +50,7 @@ func StartMessageReceivers(ctx *context.Context, ...@@ -48,7 +50,7 @@ func StartMessageReceivers(ctx *context.Context,
// MessageReceiver waits until quit signal or there is a round available // MessageReceiver waits until quit signal or there is a round available
// for which to check for messages available on the round updates channel. // for which to check for messages available on the round updates channel.
func MessageReceiver(ctx *context.Context, network *Manager, func MessageReceiver(ctx *context.Context, network *rounds.Manager,
updatesCh chan *pb.RoundInfo, quitCh <-chan struct{}) { updatesCh chan *pb.RoundInfo, quitCh <-chan struct{}) {
done := false done := false
for !done { for !done {
...@@ -61,49 +63,6 @@ func MessageReceiver(ctx *context.Context, network *Manager, ...@@ -61,49 +63,6 @@ func MessageReceiver(ctx *context.Context, network *Manager,
} }
} }
func getMessagesFromGateway(ctx *context.Context,
roundInfo *pb.RoundInfo, network *Manager) []*pb.Slot {
comms := network.Comms
roundTop := roundInfo.GetTopology()
lastNode := id.NewIdFromBytes(roundTop[len(roundTop-1)])
lastGw := lastNode.SetType(id.Gateway)
gwHost := comms.GetHost(lastGw)
user := ctx.Session.User().GetCryptographicIdentity()
userID := user.GetUserID().Bytes()
// First get message id list
msgReq := pb.GetMessages{
ClientID: userID,
RoundID: roundInfo.ID,
}
msgResp, err := comms.RequestMessages(gwHost, msgReq)
if err != nil {
jww.ERROR.Printf(err.Error())
return nil
}
// If no error, then we have checked the round and finished processing
ctx.Session.GetCheckedRounds.Check(roundInfo.ID)
network.Processing.Done(roundInfo.ID)
if !msgResp.GetHasRound() {
jww.ERROR.Printf("host %s does not have roundID: %d",
gwHost, roundInfo.ID)
return nil
}
msgs := msgResp.GetMessages()
if msgs == nil || len(msgs) == 0 {
jww.ERROR.Printf("host %s has no messages for client %s "+
" in round %d", gwHost, user, roundInfo.ID)
return nil
}
return msgs
}
func receiveMessage(ctx *context.Context, rawMsg *pb.Slot) { func receiveMessage(ctx *context.Context, rawMsg *pb.Slot) {
// We've done all the networking, now process the message // We've done all the networking, now process the message
......
package rounds
import (
"gitlab.com/elixxir/comms/network"
"gitlab.com/xx_network/primitives/id"
)
// getRoundChecker passes a context and the round infos received by the
// gateway to the funky round checker api to update round state.
// The returned function passes round event objects over the context
// to the rest of the message handlers for getting messages.
func (m *Manager) Checker(roundID id.Round, instance *network.Instance) bool {
// Set round to processing, if we can
processing, count := m.p.Process(roundID)
if !processing {
return false
}
if count == m.params.MaxAttemptsCheckingARound {
m.p.Remove(roundID)
return true
}
// FIXME: Spec has us SETTING processing, but not REMOVING it
// until the get messages thread completes the lookup, this
// is smell that needs refining. It seems as if there should be
// a state that lives with the round info as soon as we know
// about it that gets updated at different parts...not clear
// needs to be thought through.
//defer processing.Remove(roundID)
// TODO: Bloom filter lookup -- return true when we don't have
// Go get the round from the round infos, if it exists
ri, err := instance.GetRound(roundID)
if err != nil {
// If we didn't find it, send to historical
// rounds processor
m.historicalRounds <- roundID
} else {
m.lookupRoundMessages <- ri
}
return false
}
...@@ -4,47 +4,45 @@ ...@@ -4,47 +4,45 @@
// All rights reserved. / // All rights reserved. /
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
package network package rounds
import ( import (
"gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/network/gateway"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"time" "time"
jww "github.com/spf13/jwalterweatherman"
) )
// StartProcessHistoricalRounds starts a worker for processing round type historicalRoundsComms interface {
// history. GetHost(hostId *id.ID) (*connect.Host, bool)
func StartProcessHistoricalRounds(ctx *context.Context, RequestHistoricalRounds(host *connect.Host,
network *Manager) stoppable.Stoppable { message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error)
stopper := stoppable.NewSingle("ProcessHistoricalRounds")
go ProcessHistoricalRounds(ctx, network, stopper.Quit())
return stopper
} }
// ProcessHistoricalRounds analyzes round history to see if this Client // ProcessHistoricalRounds analyzes round history to see if this Client
// needs to check for messages at any of the gateways which completed // needs to check for messages at any of the gateways which completed
// those rounds. // those rounds.
func ProcessHistoricalRounds(ctx *context.Context, network *Manager, func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-chan struct{}) {
quitCh <-chan struct{}) { ticker := time.NewTicker(m.params.HistoricalRoundsPeriod)
opts := params.GetDefaultNetwork()
ticker := time.NewTicker(opts.TrackNetworkPeriod) rng := m.rngGen.GetStream()
var rounds []id.Round var rounds []uint64
done := false
for !done { for {
shouldProcess := false shouldProcess := false
select { select {
case <-quitCh: case <-quitCh:
done = true rng.Close()
break
case <-ticker.C: case <-ticker.C:
if len(rounds) > 0 { if len(rounds) > 0 {
shouldProcess = true shouldProcess = true
} }
case rid := <-network.GetHistoricalLookupCh(): case rid := <-m.historicalRounds:
rounds = append(rounds, rid) rounds = append(rounds, uint64(rid))
if len(rounds) > opts.MaxHistoricalRounds { if len(rounds) > int(m.params.MaxHistoricalRounds) {
shouldProcess = true shouldProcess = true
} }
} }
...@@ -52,26 +50,32 @@ func ProcessHistoricalRounds(ctx *context.Context, network *Manager, ...@@ -52,26 +50,32 @@ func ProcessHistoricalRounds(ctx *context.Context, network *Manager,
continue continue
} }
roundInfos := processHistoricalRounds(ctx, rounds) gwHost, err := gateway.Get(m.instance.GetPartialNdf().Get(), comm, rng)
for i := range rounds { if err != nil {
if roundInfos[i] == nil { jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+
jww.ERROR.Printf("could not check "+ "data: %s", err)
}
hr := &pb.HistoricalRounds{
Rounds: rounds,
}
response, err := comm.RequestHistoricalRounds(gwHost, hr)
if err != nil {
jww.ERROR.Printf("Failed to request historical rounds "+
"data: %s", response)
// if the check fails to resolve, break the loop so they will be
// checked again
break
}
for i, roundInfo := range response.Rounds {
if roundInfo == nil {
jww.ERROR.Printf("could not retreive "+
"historical round %d", rounds[i]) "historical round %d", rounds[i])
newRounds = append(newRounds, rounds[i])
network.Processing.Done(rounds[i])
continue continue
} }
network.GetRoundUpdateCh() <- ri m.p.Remove(id.Round(rounds[i]))
m.lookupRoundMessages <- roundInfo
} }
} }
} }
\ No newline at end of file
func processHistoricalRounds(ctx *context.Context,
rids []id.Round) []*pb.RoundInfo {
// for loop over rids?
// network := ctx.GetNetwork()
// gw := network.GetGateway()
// ris := gw.GetHistoricalRounds(ctx.GetRoundList())
// return ris
return nil
}
package rounds
import (
"gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/comms/client"
"gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/primitives/id"
)
type Manager struct {
params params.Rounds
p *processing
comms *client.Comms
instance *network.Instance
rngGen *fastRNG.StreamGenerator
session *storage.Session
historicalRounds chan id.Round
lookupRoundMessages chan *mixmessages.RoundInfo
}
func New(comms *client.Comms, instance *network.Instance, session *storage.Session,
rngGen *fastRNG.StreamGenerator, params params.Rounds) (*Manager, error) {
return &Manager{
params: params,
p: newProcessingRounds(),
comms: comms,
instance: instance,
rngGen: rngGen,
session: session,
historicalRounds: make(chan id.Round, params.HistoricalRoundsBufferLen),
lookupRoundMessages: make(chan *mixmessages.RoundInfo, params.LookupRoundsBufferLen),
}, nil
}
func (m *Manager) StartProcessors() stoppable.Stoppable {
multi := stoppable.NewMulti("Rounds")
historicalRoundsStopper := stoppable.NewSingle("ProcessHistoricalRounds")
go m.processHistoricalRounds(m.comms, historicalRoundsStopper.Quit())
multi.Add(historicalRoundsStopper)
}
package rounds
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/network/gateway"
"gitlab.com/elixxir/client/storage/user"
"gitlab.com/xx_network/primitives/id"
pb "gitlab.com/elixxir/comms/mixmessages"
)
}
func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo) ([]*pb.Slot, error) {
gwHost, err := gateway.GetLast(m.comms, roundInfo)
if err != nil {
return nil, errors.WithMessage(err, "Failed to get Gateway "+
"to request from")
}
user := m.session.User().GetCryptographicIdentity()
userID := user.GetUserID().Bytes()
// First get message id list
msgReq := &pb.GetMessages{
ClientID: userID,
RoundID: roundInfo.ID,
}
msgResp, err := m.comms.RequestMessages(gwHost, msgReq)
if err != nil {
return nil, errors.WithMessagef(err, "Failed to request "+
"messages from %s for round %s", gwHost.GetId(), roundInfo.ID)
}
// If no error, then we have checked the round and finished processing
ctx.Session.GetCheckedRounds.Check(roundInfo.ID)
network.Processing.Done(roundInfo.ID)
if !msgResp.GetHasRound() {
jww.ERROR.Printf("host %s does not have roundID: %d",
gwHost, roundInfo.ID)
return nil
}
msgs := msgResp.GetMessages()
if msgs == nil || len(msgs) == 0 {
jww.ERROR.Printf("host %s has no messages for client %s "+
" in round %d", gwHost, user, roundInfo.ID)
return nil
}
return msgs
}
package rounds
import (
"gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/storage"
)
func StartProcessors(ctx *context.Context) stoppable.Stoppable {
p := newProcessingRounds()
stopper := stoppable.NewSingle("TrackNetwork")
go trackNetwork(ctx, net, stopper.Quit())
}
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
// All rights reserved. / // All rights reserved. /
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
package rounds package network
// updates.go tracks the network for: // updates.go tracks the network for:
// 1. Node addition and removal // 1. Node addition and removal
...@@ -18,26 +18,14 @@ package rounds ...@@ -18,26 +18,14 @@ package rounds
// - receive.go for message handling // - receive.go for message handling
import ( import (
"encoding/binary" "gitlab.com/elixxir/client/network/gateway"
"gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/comms/client"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/crypto/csprng" "gitlab.com/elixxir/crypto/csprng"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/primitives/knownRounds" "gitlab.com/elixxir/primitives/knownRounds"
"gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/comms/connect"
//"gitlab.com/elixxir/comms/network"
//"gitlab.com/xx_network/primitives/ndf"
"fmt"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"io"
"math"
"time" "time"
) )
...@@ -48,46 +36,35 @@ type trackNetworkComms interface { ...@@ -48,46 +36,35 @@ type trackNetworkComms interface {
// TrackNetwork polls the network to get updated on the state of nodes, the // TrackNetwork polls the network to get updated on the state of nodes, the
// round status, and informs the client when messages can be retrieved. // round status, and informs the client when messages can be retrieved.
func trackNetwork(ctx *context.Context, func (m *Manager) trackNetwork(quitCh <-chan struct{}) {
quitCh <-chan struct{}) { ticker := time.NewTicker(m.param.TrackNetworkPeriod)
opts := params.GetDefaultNetwork() rng := m.context.Rng.GetStream()
ticker := time.NewTicker(opts.TrackNetworkPeriod)
done := false for {
for !done {
select { select {
case <-quitCh: case <-quitCh:
done = true rng.Close()
break
case <-ticker.C: case <-ticker.C:
trackNetwork(ctx, network, opts.MaxCheckCnt) m.track(rng, m.comms)
} }
} }
} }
func track(sess *storage.Session, rng csprng.Source, p *processing, func (m *Manager) track(rng csprng.Source, comms trackNetworkComms) {
instance *network.Instance, comms trackNetworkComms, maxCheck int) {
ndf := instance.GetPartialNdf().Get() gwHost, err := gateway.Get(m.instance.GetPartialNdf().Get(), comms, rng)
// Get a random gateway
gateways := ndf.Gateways
gwIdx := ReadRangeUint32(0, uint32(len(gateways)), rng)
gwID, err := gateways[gwIdx].GetGatewayId()
if err != nil { if err != nil {
jww.ERROR.Printf(err.Error()) jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+
return "data: %s", err)
}
gwHost, ok := comms.GetHost(gwID)
if !ok {
jww.ERROR.Printf("could not get host for gateway %s", gwID)
return
} }
// Poll for the new NDF // Poll for the new NDF
pollReq := pb.GatewayPoll{ pollReq := pb.GatewayPoll{
Partial: &pb.NDFHash{ Partial: &pb.NDFHash{
Hash: instance.GetPartialNdf().GetHash(), Hash: m.instance.GetPartialNdf().GetHash(),
}, },
LastUpdate: uint64(instance.GetLastRoundID()), LastUpdate: uint64(m.instance.GetLastUpdateID()),
} }
pollResp, err := comms.SendPoll(gwHost, &pollReq) pollResp, err := comms.SendPoll(gwHost, &pollReq)
if err != nil { if err != nil {
...@@ -109,88 +86,27 @@ func track(sess *storage.Session, rng csprng.Source, p *processing, ...@@ -109,88 +86,27 @@ func track(sess *storage.Session, rng csprng.Source, p *processing,
// ---- NODE EVENTS ---- // ---- NODE EVENTS ----
// NOTE: this updates the structure AND sends events over the node // NOTE: this updates the structure AND sends events over the node
// update channels // update channels
err = instance.UpdatePartialNdf(newNDF) err = m.instance.UpdatePartialNdf(newNDF)
if err != nil { if err != nil {
jww.ERROR.Printf(err.Error()) jww.ERROR.Printf(err.Error())
return return
} }
err = instance.RoundUpdates(roundUpdates) err = m.instance.RoundUpdates(roundUpdates)
if err != nil { if err != nil {
jww.ERROR.Printf(err.Error()) jww.ERROR.Printf(err.Error())
return return
} }
// ---- Round Processing ----- // ---- Round Processing -----
checkedRounds := sess.GetCheckedRounds() //build the round checker
roundChecker := getRoundChecker(network, roundUpdates) roundChecker := func(rid id.Round) bool {
checkedRounds.Forward(lastTrackedRound) return m.round.Checker(rid, m.instance)
checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker, maxCheck)
}
// getRoundChecker passes a context and the round infos received by the
// gateway to the funky round checker api to update round state.
// The returned function passes round event objects over the context
// to the rest of the message handlers for getting messages.
func getRoundChecker(p *processing, instance *network.Instance, maxAttempts uint) func(roundID id.Round) bool {
return func(roundID id.Round) bool {
// Set round to processing, if we can
processing, count := p.Process(roundID)
if !processing {
return false
}
if count == maxAttempts {
p.Remove(roundID)
return true
} }
// FIXME: Spec has us SETTING processing, but not REMOVING it
// until the get messages thread completes the lookup, this
// is smell that needs refining. It seems as if there should be
// a state that lives with the round info as soon as we know
// about it that gets updated at different parts...not clear
// needs to be thought through.
//defer processing.Remove(roundID)
// TODO: Bloom filter lookup -- return true when we don't have
// Go get the round from the round infos, if it exists
ri, err := instance.GetRound(roundID) //check rounds
if err != nil { checkedRounds := m.context.Session.GetCheckedRounds()
// If we didn't find it, send to historical checkedRounds.Forward(lastTrackedRound)
// rounds processor checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker,
network.GetHistoricalLookupCh() <- roundID int(m.param.MaxCheckCheckedRounds))
} else {
network.GetRoundUpdateCh() <- ri
}
return false
}
}
// ReadUint32 reads an integer from an io.Reader (which should be a CSPRNG)
func ReadUint32(rng io.Reader) uint32 {
var rndBytes [4]byte
i, err := rng.Read(rndBytes[:])
if i != 4 || err != nil {
panic(fmt.Sprintf("cannot read from rng: %+v", err))
}
return binary.BigEndian.Uint32(rndBytes[:])
} }
// ReadRangeUint32 reduces an integer from 0, MaxUint32 to the range start, end
func ReadRangeUint32(start, end uint32, rng io.Reader) uint32 {
size := end - start
// note we could just do the part inside the () here, but then extra
// can == size which means a little bit of range is wastes, either
// choice seems negligible so we went with the "more correct"
extra := (math.MaxUint32%size + 1) % size
limit := math.MaxUint32 - extra
// Loop until we read something inside the limit
for {
res := ReadUint32(rng)
if res > limit {
continue
}
return (res % size) + start
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment