diff --git a/context/params/network.go b/context/params/network.go index 5bd83ebf6b3555b2b7823396855f624952882dfd..7faf67e753cbb9eb773a4f75cc48fdf7e531fc69 100644 --- a/context/params/network.go +++ b/context/params/network.go @@ -5,17 +5,19 @@ import ( ) type Network struct { - TrackNetworkPeriod time.Duration - NumWorkers int - MaxHistoricalRounds int - MaxCheckCnt int + TrackNetworkPeriod time.Duration + NumWorkers uint + // maximum number of rounds to check in a single iterations network updates + MaxCheckCheckedRounds uint + Rounds } func GetDefaultNetwork() Network { - return Network{ - TrackNetworkPeriod: 100 * time.Millisecond, - NumWorkers: 4, - MaxHistoricalRounds: 100, - MaxCheckCnt: 100, + n := Network{ + TrackNetworkPeriod: 100 * time.Millisecond, + NumWorkers: 4, + MaxCheckCheckedRounds: 500, } + n.Rounds = GetDefaultRounds() + return n } diff --git a/context/params/rounds.go b/context/params/rounds.go new file mode 100644 index 0000000000000000000000000000000000000000..75de16707d0d71dedcbcd08f383793e6627b78bb --- /dev/null +++ b/context/params/rounds.go @@ -0,0 +1,31 @@ +package params + +import "time" + +type Rounds struct { + // maximum number of times to attempt to retrieve a round from a gateway + // before giving up on it + MaxAttemptsCheckingARound uint + // number of historical rounds required to automatically send a historical + // rounds query + MaxHistoricalRounds uint + // maximum period of time a pending historical round query will wait before + // it si transmitted + HistoricalRoundsPeriod time.Duration + + //Length of historical rounds channel buffer + HistoricalRoundsBufferLen uint + //Length of round lookup channel buffer + LookupRoundsBufferLen uint +} + +func GetDefaultRounds() Rounds { + return Rounds{ + MaxAttemptsCheckingARound: 5, + MaxHistoricalRounds: 100, + HistoricalRoundsPeriod: 100 * time.Millisecond, + + HistoricalRoundsBufferLen: 1000, + LookupRoundsBufferLen: 2000, + } +} diff --git a/context/stoppable/single.go b/context/stoppable/single.go index acd8da0077427215934dae20b68ff9346396492c..c5eadaefa40e8070ebb4b85ae710ff65d0f8fab6 100644 --- a/context/stoppable/single.go +++ b/context/stoppable/single.go @@ -32,7 +32,7 @@ func (s *Single) IsRunning() bool { } // Quit returns the read only channel it will send the stop signal on. -func (s *Single) Quit() chan<- struct{} { +func (s *Single) Quit() <-chan struct{} { return s.quit } diff --git a/network/gateway/gateway.go b/network/gateway/gateway.go new file mode 100644 index 0000000000000000000000000000000000000000..19ca1634f9aa23355713b9363a0a73421ce6f440 --- /dev/null +++ b/network/gateway/gateway.go @@ -0,0 +1,76 @@ +package gateway + +import ( + "encoding/binary" + "fmt" + "github.com/pkg/errors" + "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/ndf" + "io" + "math" +) + +type HostGetter interface { + GetHost(hostId *id.ID) (*connect.Host, bool) +} + +func Get(ndf *ndf.NetworkDefinition, hg HostGetter, rng io.Reader) (*connect.Host, error) { + // Get a random gateway + gateways := ndf.Gateways + gwIdx := ReadRangeUint32(0, uint32(len(gateways)), rng) + gwID, err := gateways[gwIdx].GetGatewayId() + if err != nil { + return nil, errors.WithMessage(err, "failed to get Gateway") + } + gwHost, ok := hg.GetHost(gwID) + if !ok { + return nil, errors.Errorf("host for gateway %s could not be "+ + "retrieved", gwID) + } + return gwHost, nil +} + +func GetLast(hg HostGetter, ri *mixmessages.RoundInfo) (*connect.Host, error) { + roundTop := ri.GetTopology() + lastGw, err := id.Unmarshal(roundTop[len(roundTop)-1]) + if err != nil { + return nil, err + } + lastGw.SetType(id.Gateway) + + gwHost, ok := hg.GetHost(lastGw) + if !ok { + return nil, errors.Errorf("Could not find host for gateway %s", lastGw) + } + return gwHost, nil +} + +// ReadUint32 reads an integer from an io.Reader (which should be a CSPRNG) +func ReadUint32(rng io.Reader) uint32 { + var rndBytes [4]byte + i, err := rng.Read(rndBytes[:]) + if i != 4 || err != nil { + panic(fmt.Sprintf("cannot read from rng: %+v", err)) + } + return binary.BigEndian.Uint32(rndBytes[:]) +} + +// ReadRangeUint32 reduces an integer from 0, MaxUint32 to the range start, end +func ReadRangeUint32(start, end uint32, rng io.Reader) uint32 { + size := end - start + // note we could just do the part inside the () here, but then extra + // can == size which means a little bit of range is wastes, either + // choice seems negligible so we went with the "more correct" + extra := (math.MaxUint32%size + 1) % size + limit := math.MaxUint32 - extra + // Loop until we read something inside the limit + for { + res := ReadUint32(rng) + if res > limit { + continue + } + return (res % size) + start + } +} diff --git a/network/manager.go b/network/manager.go index 48f37ea77cff997bab6b4c6aa03667f80db0be54..2af17760e61ec519333bbc765b47275222e28c5c 100644 --- a/network/manager.go +++ b/network/manager.go @@ -21,6 +21,8 @@ import ( "gitlab.com/elixxir/comms/network" "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/ndf" + // "gitlab.com/xx_network/primitives/ndf" pb "gitlab.com/elixxir/comms/mixmessages" "time" @@ -31,9 +33,11 @@ import ( // functions used by the client. type Manager struct { // Comms pointer to send/recv messages - Comms *client.Comms + comms *client.Comms // Context contains all of the keying info used to send messages - Context *context.Context + context *context.Context + + param params.Network // runners are the Network goroutines that handle reception runners *stoppable.Multi @@ -45,23 +49,18 @@ type Manager struct { //contains the network instance instance *network.Instance - //Partitioner - partitioner parse.Partitioner + //sub-managers + round *rounds.Manager //channels nodeRegistration chan network.NodeGateway - roundUpdate chan *pb.RoundInfo - historicalLookup chan id.Round - - // Processing rounds - Processing *rounds.processing //local pointer to user ID because it is used often uid *id.ID } // NewManager builds a new reception manager object using inputted key fields -func NewManager(ctx *context.Context) (*Manager, error) { +func NewManager(ctx *context.Context, params params.Network, ndf *ndf.NetworkDefinition) (*Manager, error) { //get the user from storage user := ctx.Session.User() @@ -80,27 +79,21 @@ func NewManager(ctx *context.Context) (*Manager, error) { //start network instance // TODO: Need to parse/retrieve the ntework string and load it // from the context storage session! - instance, err := network.NewInstance(comms.ProtoComms, nil, nil, nil) + instance, err := network.NewInstance(comms.ProtoComms, ndf, nil, nil) if err != nil { return nil, errors.WithMessage(err, "failed to create"+ " client network manager") } - opts := params.GetDefaultNetwork() - cm := &Manager{ - Comms: comms, - Context: ctx, - runners: stoppable.NewMulti("network.Manager"), - health: health.Init(ctx, 5*time.Second), - instance: instance, - uid: cryptoUser.GetUserID(), - partitioner: parse.NewPartitioner(msgSize, ctx), - Processing: rounds.NewProcessingRounds(), - roundUpdate: make(chan *pb.RoundInfo, opts.NumWorkers), - historicalLookup: make(chan id.Round, opts.NumWorkers), - nodeRegistration: make(chan network.NodeGateway, - opts.NumWorkers), + comms: comms, + context: ctx, + param: params, + runners: stoppable.NewMulti("network.Manager"), + health: health.Init(ctx, 5*time.Second), + instance: instance, + uid: cryptoUser.GetUserID(), + } return cm, nil @@ -109,12 +102,12 @@ func NewManager(ctx *context.Context) (*Manager, error) { // GetRemoteVersion contacts the permissioning server and returns the current // supported client version. func (m *Manager) GetRemoteVersion() (string, error) { - permissioningHost, ok := m.Comms.GetHost(&id.Permissioning) + permissioningHost, ok := m.comms.GetHost(&id.Permissioning) if !ok { return "", errors.Errorf("no permissioning host with id %s", id.Permissioning) } - registrationVersion, err := m.Comms.SendGetCurrentClientVersionMessage( + registrationVersion, err := m.comms.SendGetCurrentClientVersionMessage( permissioningHost) if err != nil { return "", err @@ -129,14 +122,17 @@ func (m *Manager) StartRunners() error { } // Start the Network Tracker - m.runners.Add(rounds.StartTrackNetwork(m.Context, m)) + trackNetworkStopper := stoppable.NewSingle("TrackNetwork") + go m.trackNetwork(trackNetworkStopper.Quit()) + m.runners.Add(trackNetworkStopper) + // Message reception m.runners.Add(StartMessageReceivers(m.Context, m)) // Node Updates m.runners.Add(StartNodeKeyExchange(m.Context, m)) // Adding/Keys m.runners.Add(StartNodeRemover(m.Context)) // Removing // Round history processing - m.runners.Add(StartProcessHistoricalRounds(m.Context, m)) + m.runners.Add(rounds.StartProcessHistoricalRounds(m.Context, m)) // health tracker m.health.Start() m.runners.Add(m.health) @@ -163,13 +159,13 @@ func (m *Manager) GetHealthTracker() context.HealthTracker { } // GetInstance returns the network instance object (ndf state) -func (m *Manager) GetInstance() *network.Instance { +func (m *Manager) GetInstance() *rounds.Instance { return m.instance } // GetNodeRegistrationCh returns node registration channel for node // events. -func (m *Manager) GetNodeRegistrationCh() chan network.NodeGateway { +func (m *Manager) GetNodeRegistrationCh() chan rounds.NodeGateway { return m.nodeRegistration } diff --git a/network/message/reception.go b/network/message/reception.go new file mode 100644 index 0000000000000000000000000000000000000000..ede1b09878eb75b4bd24601a66cce8a79b786224 --- /dev/null +++ b/network/message/reception.go @@ -0,0 +1 @@ +package message diff --git a/network/sendCmix.go b/network/message/sendCmix.go similarity index 95% rename from network/sendCmix.go rename to network/message/sendCmix.go index b4938e3656cc42786f72e3aec950682c37022191..3394d975137ebabf0b1620c0d2355b5572975ec4 100644 --- a/network/sendCmix.go +++ b/network/message/sendCmix.go @@ -1,4 +1,4 @@ -package network +package message import ( "github.com/golang-collections/collections/set" @@ -17,7 +17,7 @@ import ( // recipient. Note that both SendE2E and SendUnsafe call SendCMIX. // Returns the round ID of the round the payload was sent or an error // if it fails. -func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { +func (m *rounds.Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { if !m.health.IsRunning() { return 0, errors.New("Cannot send cmix message when the " + "network is not healthy") @@ -28,7 +28,7 @@ func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err // Internal send e2e which bypasses the network check, for use in SendE2E and // SendUnsafe which do their own network checks -func (m *Manager) sendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { +func (m *rounds.Manager) sendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { timeStart := time.Now() attempted := set.New() diff --git a/network/sendE2E.go b/network/message/sendE2E.go similarity index 93% rename from network/sendE2E.go rename to network/message/sendE2E.go index 24290abc1eadadd063ba89d5ec530257177d2211..cd161b62c15a79c34f3c40fa6d157dfc2e7d9c0a 100644 --- a/network/sendE2E.go +++ b/network/message/sendE2E.go @@ -4,7 +4,7 @@ // All rights reserved. / //////////////////////////////////////////////////////////////////////////////// -package network +package message import ( "github.com/pkg/errors" @@ -19,7 +19,7 @@ import ( // SendE2E sends an end-to-end payload to the provided recipient with // the provided msgType. Returns the list of rounds in which parts of // the message were sent or an error if it fails. -func (m *Manager) SendE2E(msg message.Send, e2eP params.E2E) ( +func (m *rounds.Manager) SendE2E(msg message.Send, e2eP params.E2E) ( []id.Round, error) { if !m.health.IsRunning() { @@ -30,7 +30,7 @@ func (m *Manager) SendE2E(msg message.Send, e2eP params.E2E) ( return m.sendE2E(msg, e2eP) } -func (m *Manager) sendE2E(msg message.Send, param params.E2E) ([]id.Round, error) { +func (m *rounds.Manager) sendE2E(msg message.Send, param params.E2E) ([]id.Round, error) { //timestamp the message ts := time.Now() diff --git a/network/sendUnsafe.go b/network/message/sendUnsafe.go similarity index 91% rename from network/sendUnsafe.go rename to network/message/sendUnsafe.go index ecef2431ac7d2de4a88e3c15fa97040746dfadcc..788c59c6d973a0603297a4c445f93c3539c2a889 100644 --- a/network/sendUnsafe.go +++ b/network/message/sendUnsafe.go @@ -1,4 +1,4 @@ -package network +package message import ( "github.com/pkg/errors" @@ -17,7 +17,7 @@ import ( // of the message were sent or an error if it fails. // NOTE: Do not use this function unless you know what you are doing. // This function always produces an error message in client logging. -func (m *Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { +func (m *rounds.Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { if !m.health.IsRunning() { return nil, errors.New("cannot send unsafe message when the " + "network is not healthy") @@ -30,7 +30,7 @@ func (m *Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, return m.sendUnsafe(msg, param) } -func (m *Manager) sendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { +func (m *rounds.Manager) sendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { //timestamp the message ts := time.Now() diff --git a/network/nodes.go b/network/nodes.go index 3559a44b5040c669c80d8a742e182ab36a5d1f73..19426b294ff800e845682a8683cf8b5d5c23953b 100644 --- a/network/nodes.go +++ b/network/nodes.go @@ -13,6 +13,8 @@ import ( "gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/context/stoppable" + "gitlab.com/elixxir/client/network/rounds" + // "gitlab.com/elixxir/comms/client" // "gitlab.com/elixxir/primitives/format" // "gitlab.com/elixxir/primitives/switchboard" @@ -26,7 +28,7 @@ import ( // StartNodeKeyExchange kicks off a worker pool of node key exchange routines func StartNodeKeyExchange(ctx *context.Context, - network *Manager) stoppable.Stoppable { + network *rounds.Manager) stoppable.Stoppable { stoppers := stoppable.NewMulti("NodeKeyExchangers") numWorkers := params.GetDefaultNetwork().NumWorkers keyCh := network.GetNodeRegistrationCh() diff --git a/network/receive.go b/network/receive.go index bc69f740a12f2bdb181819cdc837f4b139a4a818..378c926b79a02c64a402559105968b82b7280a47 100644 --- a/network/receive.go +++ b/network/receive.go @@ -12,17 +12,19 @@ import ( "gitlab.com/elixxir/client/context/message" "gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/context/stoppable" - "gitlab.com/elixxir/client/network/parse" + "gitlab.com/elixxir/client/network/rounds" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/crypto/e2e" "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/primitives/id" + //jww "github.com/spf13/jwalterweatherman" ) // ReceiveMessages is called by a MessageReceiver routine whenever new CMIX // messages are available at a gateway. func ReceiveMessages(ctx *context.Context, roundInfo *pb.RoundInfo, - network *Manager) { + network *rounds.Manager) { msgs := getMessagesFromGateway(ctx, roundInfo) for _, m := range msgs { receiveMessage(ctx, m) @@ -33,7 +35,7 @@ func ReceiveMessages(ctx *context.Context, roundInfo *pb.RoundInfo, // on a channel for rounds in which to check for messages and run them through // processing. func StartMessageReceivers(ctx *context.Context, - network *Manager) stoppable.Stoppable { + network *rounds.Manager) stoppable.Stoppable { stoppers := stoppable.NewMulti("MessageReceivers") opts := params.GetDefaultNetwork() receiverCh := network.GetRoundUpdateCh() @@ -48,7 +50,7 @@ func StartMessageReceivers(ctx *context.Context, // MessageReceiver waits until quit signal or there is a round available // for which to check for messages available on the round updates channel. -func MessageReceiver(ctx *context.Context, network *Manager, +func MessageReceiver(ctx *context.Context, network *rounds.Manager, updatesCh chan *pb.RoundInfo, quitCh <-chan struct{}) { done := false for !done { @@ -61,49 +63,6 @@ func MessageReceiver(ctx *context.Context, network *Manager, } } -func getMessagesFromGateway(ctx *context.Context, - roundInfo *pb.RoundInfo, network *Manager) []*pb.Slot { - comms := network.Comms - roundTop := roundInfo.GetTopology() - lastNode := id.NewIdFromBytes(roundTop[len(roundTop-1)]) - lastGw := lastNode.SetType(id.Gateway) - gwHost := comms.GetHost(lastGw) - - user := ctx.Session.User().GetCryptographicIdentity() - userID := user.GetUserID().Bytes() - - // First get message id list - msgReq := pb.GetMessages{ - ClientID: userID, - RoundID: roundInfo.ID, - } - msgResp, err := comms.RequestMessages(gwHost, msgReq) - if err != nil { - jww.ERROR.Printf(err.Error()) - return nil - } - - // If no error, then we have checked the round and finished processing - ctx.Session.GetCheckedRounds.Check(roundInfo.ID) - network.Processing.Done(roundInfo.ID) - - if !msgResp.GetHasRound() { - jww.ERROR.Printf("host %s does not have roundID: %d", - gwHost, roundInfo.ID) - return nil - } - - msgs := msgResp.GetMessages() - - if msgs == nil || len(msgs) == 0 { - jww.ERROR.Printf("host %s has no messages for client %s "+ - " in round %d", gwHost, user, roundInfo.ID) - return nil - } - - return msgs - -} func receiveMessage(ctx *context.Context, rawMsg *pb.Slot) { // We've done all the networking, now process the message diff --git a/network/rounds.go b/network/rounds.go deleted file mode 100644 index 9bb8b7b233dc18409181a8ba886f53709efcb8b1..0000000000000000000000000000000000000000 --- a/network/rounds.go +++ /dev/null @@ -1,77 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 Privategrity Corporation / -// / -// All rights reserved. / -//////////////////////////////////////////////////////////////////////////////// - -package network - -import ( - "gitlab.com/elixxir/client/context" - "gitlab.com/elixxir/client/context/params" - "gitlab.com/elixxir/client/context/stoppable" - pb "gitlab.com/elixxir/comms/mixmessages" - "gitlab.com/xx_network/primitives/id" - "time" -) - -// StartProcessHistoricalRounds starts a worker for processing round -// history. -func StartProcessHistoricalRounds(ctx *context.Context, - network *Manager) stoppable.Stoppable { - stopper := stoppable.NewSingle("ProcessHistoricalRounds") - go ProcessHistoricalRounds(ctx, network, stopper.Quit()) - return stopper -} - -// ProcessHistoricalRounds analyzes round history to see if this Client -// needs to check for messages at any of the gateways which completed -// those rounds. -func ProcessHistoricalRounds(ctx *context.Context, network *Manager, - quitCh <-chan struct{}) { - opts := params.GetDefaultNetwork() - ticker := time.NewTicker(opts.TrackNetworkPeriod) - var rounds []id.Round - done := false - for !done { - shouldProcess := false - select { - case <-quitCh: - done = true - case <-ticker.C: - if len(rounds) > 0 { - shouldProcess = true - } - case rid := <-network.GetHistoricalLookupCh(): - rounds = append(rounds, rid) - if len(rounds) > opts.MaxHistoricalRounds { - shouldProcess = true - } - } - if !shouldProcess { - continue - } - - roundInfos := processHistoricalRounds(ctx, rounds) - for i := range rounds { - if roundInfos[i] == nil { - jww.ERROR.Printf("could not check "+ - "historical round %d", rounds[i]) - newRounds = append(newRounds, rounds[i]) - network.Processing.Done(rounds[i]) - continue - } - network.GetRoundUpdateCh() <- ri - } - } -} - -func processHistoricalRounds(ctx *context.Context, - rids []id.Round) []*pb.RoundInfo { - // for loop over rids? - // network := ctx.GetNetwork() - // gw := network.GetGateway() - // ris := gw.GetHistoricalRounds(ctx.GetRoundList()) - // return ris - return nil -} diff --git a/network/rounds/check.go b/network/rounds/check.go new file mode 100644 index 0000000000000000000000000000000000000000..f583376e98003e3bfa9843a2d55515d466bcad3b --- /dev/null +++ b/network/rounds/check.go @@ -0,0 +1,43 @@ +package rounds + +import ( + "gitlab.com/elixxir/comms/network" + "gitlab.com/xx_network/primitives/id" +) + +// getRoundChecker passes a context and the round infos received by the +// gateway to the funky round checker api to update round state. +// The returned function passes round event objects over the context +// to the rest of the message handlers for getting messages. +func (m *Manager) Checker(roundID id.Round, instance *network.Instance) bool { + // Set round to processing, if we can + processing, count := m.p.Process(roundID) + if !processing { + return false + } + if count == m.params.MaxAttemptsCheckingARound { + m.p.Remove(roundID) + return true + } + // FIXME: Spec has us SETTING processing, but not REMOVING it + // until the get messages thread completes the lookup, this + // is smell that needs refining. It seems as if there should be + // a state that lives with the round info as soon as we know + // about it that gets updated at different parts...not clear + // needs to be thought through. + //defer processing.Remove(roundID) + + // TODO: Bloom filter lookup -- return true when we don't have + // Go get the round from the round infos, if it exists + + ri, err := instance.GetRound(roundID) + if err != nil { + // If we didn't find it, send to historical + // rounds processor + m.historicalRounds <- roundID + } else { + m.lookupRoundMessages <- ri + } + + return false +} diff --git a/network/rounds/historical.go b/network/rounds/historical.go new file mode 100644 index 0000000000000000000000000000000000000000..5d0a9b48c14529f6a8333227ecf37137beb2df02 --- /dev/null +++ b/network/rounds/historical.go @@ -0,0 +1,81 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 Privategrity Corporation / +// / +// All rights reserved. / +//////////////////////////////////////////////////////////////////////////////// + +package rounds + +import ( + "gitlab.com/elixxir/client/network/gateway" + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/primitives/id" + "time" + jww "github.com/spf13/jwalterweatherman" +) + +type historicalRoundsComms interface { + GetHost(hostId *id.ID) (*connect.Host, bool) + RequestHistoricalRounds(host *connect.Host, + message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) +} + +// ProcessHistoricalRounds analyzes round history to see if this Client +// needs to check for messages at any of the gateways which completed +// those rounds. +func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-chan struct{}) { + ticker := time.NewTicker(m.params.HistoricalRoundsPeriod) + + rng := m.rngGen.GetStream() + var rounds []uint64 + + for { + shouldProcess := false + select { + case <-quitCh: + rng.Close() + break + case <-ticker.C: + if len(rounds) > 0 { + shouldProcess = true + } + case rid := <-m.historicalRounds: + rounds = append(rounds, uint64(rid)) + if len(rounds) > int(m.params.MaxHistoricalRounds) { + shouldProcess = true + } + } + if !shouldProcess { + continue + } + + gwHost, err := gateway.Get(m.instance.GetPartialNdf().Get(), comm, rng) + if err != nil { + jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+ + "data: %s", err) + } + + hr := &pb.HistoricalRounds{ + Rounds: rounds, + } + + response, err := comm.RequestHistoricalRounds(gwHost, hr) + if err != nil { + jww.ERROR.Printf("Failed to request historical rounds "+ + "data: %s", response) + // if the check fails to resolve, break the loop so they will be + // checked again + break + } + for i, roundInfo := range response.Rounds { + if roundInfo == nil { + jww.ERROR.Printf("could not retreive "+ + "historical round %d", rounds[i]) + continue + } + m.p.Remove(id.Round(rounds[i])) + m.lookupRoundMessages <- roundInfo + } + } +} \ No newline at end of file diff --git a/network/rounds/manager.go b/network/rounds/manager.go new file mode 100644 index 0000000000000000000000000000000000000000..4e9060d21513ed70f1d73d374f673f738ae83c1e --- /dev/null +++ b/network/rounds/manager.go @@ -0,0 +1,51 @@ +package rounds + +import ( + "gitlab.com/elixxir/client/context" + "gitlab.com/elixxir/client/context/params" + "gitlab.com/elixxir/client/context/stoppable" + "gitlab.com/elixxir/client/storage" + "gitlab.com/elixxir/comms/client" + "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/comms/network" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/xx_network/primitives/id" +) + +type Manager struct { + params params.Rounds + + p *processing + + comms *client.Comms + instance *network.Instance + rngGen *fastRNG.StreamGenerator + session *storage.Session + + historicalRounds chan id.Round + lookupRoundMessages chan *mixmessages.RoundInfo +} + +func New(comms *client.Comms, instance *network.Instance, session *storage.Session, + rngGen *fastRNG.StreamGenerator, params params.Rounds) (*Manager, error) { + return &Manager{ + params: params, + p: newProcessingRounds(), + comms: comms, + instance: instance, + rngGen: rngGen, + session: session, + + historicalRounds: make(chan id.Round, params.HistoricalRoundsBufferLen), + lookupRoundMessages: make(chan *mixmessages.RoundInfo, params.LookupRoundsBufferLen), + }, nil +} + +func (m *Manager) StartProcessors() stoppable.Stoppable { + + multi := stoppable.NewMulti("Rounds") + + historicalRoundsStopper := stoppable.NewSingle("ProcessHistoricalRounds") + go m.processHistoricalRounds(m.comms, historicalRoundsStopper.Quit()) + multi.Add(historicalRoundsStopper) +} diff --git a/network/rounds/retreive.go b/network/rounds/retreive.go new file mode 100644 index 0000000000000000000000000000000000000000..828927a7617333d71958bf157439a8202df1a442 --- /dev/null +++ b/network/rounds/retreive.go @@ -0,0 +1,57 @@ +package rounds + +import ( + "github.com/pkg/errors" + "gitlab.com/elixxir/client/network/gateway" + "gitlab.com/elixxir/client/storage/user" + "gitlab.com/xx_network/primitives/id" + pb "gitlab.com/elixxir/comms/mixmessages" +) +} + + + + +func (m *Manager) getMessagesFromGateway(roundInfo *pb.RoundInfo) ([]*pb.Slot, error) { + + gwHost, err := gateway.GetLast(m.comms, roundInfo) + if err != nil { + return nil, errors.WithMessage(err, "Failed to get Gateway "+ + "to request from") + } + + user := m.session.User().GetCryptographicIdentity() + userID := user.GetUserID().Bytes() + + // First get message id list + msgReq := &pb.GetMessages{ + ClientID: userID, + RoundID: roundInfo.ID, + } + msgResp, err := m.comms.RequestMessages(gwHost, msgReq) + if err != nil { + return nil, errors.WithMessagef(err, "Failed to request "+ + "messages from %s for round %s", gwHost.GetId(), roundInfo.ID) + } + + // If no error, then we have checked the round and finished processing + ctx.Session.GetCheckedRounds.Check(roundInfo.ID) + network.Processing.Done(roundInfo.ID) + + if !msgResp.GetHasRound() { + jww.ERROR.Printf("host %s does not have roundID: %d", + gwHost, roundInfo.ID) + return nil + } + + msgs := msgResp.GetMessages() + + if msgs == nil || len(msgs) == 0 { + jww.ERROR.Printf("host %s has no messages for client %s "+ + " in round %d", gwHost, user, roundInfo.ID) + return nil + } + + return msgs + +} diff --git a/network/rounds/round.go b/network/rounds/round.go deleted file mode 100644 index 2fa59fb92a6f246934dca88c3258978cf83514d6..0000000000000000000000000000000000000000 --- a/network/rounds/round.go +++ /dev/null @@ -1,14 +0,0 @@ -package rounds - -import ( - "gitlab.com/elixxir/client/context" - "gitlab.com/elixxir/client/context/params" - "gitlab.com/elixxir/client/context/stoppable" - "gitlab.com/elixxir/client/storage" -) - -func StartProcessors(ctx *context.Context) stoppable.Stoppable { - p := newProcessingRounds() - stopper := stoppable.NewSingle("TrackNetwork") - go trackNetwork(ctx, net, stopper.Quit()) -} diff --git a/network/rounds/track.go b/network/rounds/track.go deleted file mode 100644 index baa3abd2f545b2c4f02aa272fd1c1993fbc9b6b6..0000000000000000000000000000000000000000 --- a/network/rounds/track.go +++ /dev/null @@ -1,196 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 Privategrity Corporation / -// / -// All rights reserved. / -//////////////////////////////////////////////////////////////////////////////// - -package rounds - -// updates.go tracks the network for: -// 1. Node addition and removal -// 2. New/Active/Complete rounds and their contact gateways -// This information is tracked by polling a gateway for the network definition -// file (NDF). Once it detects an event it sends it off to the proper channel -// for a worker to update the client state (add/remove a node, check for -// messages at a gateway, etc). See: -// - nodes.go for add/remove node events -// - rounds.go for round event handling & processing -// - receive.go for message handling - -import ( - "encoding/binary" - "gitlab.com/elixxir/client/context" - "gitlab.com/elixxir/client/context/params" - "gitlab.com/elixxir/client/context/stoppable" - "gitlab.com/elixxir/comms/client" - "gitlab.com/elixxir/comms/network" - "gitlab.com/elixxir/client/storage" - "gitlab.com/elixxir/crypto/csprng" - "gitlab.com/elixxir/crypto/fastRNG" - "gitlab.com/elixxir/primitives/knownRounds" - "gitlab.com/xx_network/comms/connect" - - //"gitlab.com/elixxir/comms/network" - //"gitlab.com/xx_network/primitives/ndf" - "fmt" - jww "github.com/spf13/jwalterweatherman" - pb "gitlab.com/elixxir/comms/mixmessages" - "gitlab.com/xx_network/primitives/id" - "io" - "math" - "time" -) - -type trackNetworkComms interface { - GetHost(hostId *id.ID) (*connect.Host, bool) - SendPoll(host *connect.Host, message *pb.GatewayPoll) (*pb.GatewayPollResponse, error) -} - -// TrackNetwork polls the network to get updated on the state of nodes, the -// round status, and informs the client when messages can be retrieved. -func trackNetwork(ctx *context.Context, - quitCh <-chan struct{}) { - opts := params.GetDefaultNetwork() - ticker := time.NewTicker(opts.TrackNetworkPeriod) - done := false - for !done { - select { - case <-quitCh: - done = true - case <-ticker.C: - trackNetwork(ctx, network, opts.MaxCheckCnt) - } - } -} - -func track(sess *storage.Session, rng csprng.Source, p *processing, - instance *network.Instance, comms trackNetworkComms, maxCheck int) { - - ndf := instance.GetPartialNdf().Get() - - // Get a random gateway - gateways := ndf.Gateways - gwIdx := ReadRangeUint32(0, uint32(len(gateways)), rng) - gwID, err := gateways[gwIdx].GetGatewayId() - if err != nil { - jww.ERROR.Printf(err.Error()) - return - } - gwHost, ok := comms.GetHost(gwID) - if !ok { - jww.ERROR.Printf("could not get host for gateway %s", gwID) - return - } - - // Poll for the new NDF - pollReq := pb.GatewayPoll{ - Partial: &pb.NDFHash{ - Hash: instance.GetPartialNdf().GetHash(), - }, - LastUpdate: uint64(instance.GetLastRoundID()), - } - pollResp, err := comms.SendPoll(gwHost, &pollReq) - if err != nil { - jww.ERROR.Printf(err.Error()) - return - } - - //handle updates - newNDF := pollResp.PartialNDF - lastTrackedRound := id.Round(pollResp.LastTrackedRound) - roundUpdates := pollResp.Updates - gwRoundsState := &knownRounds.KnownRounds{} - err = gwRoundsState.Unmarshal(pollResp.KnownRounds) - if err != nil { - jww.ERROR.Printf(err.Error()) - return - } - - // ---- NODE EVENTS ---- - // NOTE: this updates the structure AND sends events over the node - // update channels - err = instance.UpdatePartialNdf(newNDF) - if err != nil { - jww.ERROR.Printf(err.Error()) - return - } - err = instance.RoundUpdates(roundUpdates) - if err != nil { - jww.ERROR.Printf(err.Error()) - return - } - - // ---- Round Processing ----- - checkedRounds := sess.GetCheckedRounds() - roundChecker := getRoundChecker(network, roundUpdates) - checkedRounds.Forward(lastTrackedRound) - checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker, maxCheck) -} - -// getRoundChecker passes a context and the round infos received by the -// gateway to the funky round checker api to update round state. -// The returned function passes round event objects over the context -// to the rest of the message handlers for getting messages. -func getRoundChecker(p *processing, instance *network.Instance, maxAttempts uint) func(roundID id.Round) bool { - return func(roundID id.Round) bool { - - // Set round to processing, if we can - processing, count := p.Process(roundID) - if !processing { - return false - } - if count == maxAttempts { - p.Remove(roundID) - return true - } - // FIXME: Spec has us SETTING processing, but not REMOVING it - // until the get messages thread completes the lookup, this - // is smell that needs refining. It seems as if there should be - // a state that lives with the round info as soon as we know - // about it that gets updated at different parts...not clear - // needs to be thought through. - //defer processing.Remove(roundID) - - // TODO: Bloom filter lookup -- return true when we don't have - // Go get the round from the round infos, if it exists - - ri, err := instance.GetRound(roundID) - if err != nil { - // If we didn't find it, send to historical - // rounds processor - network.GetHistoricalLookupCh() <- roundID - } else { - network.GetRoundUpdateCh() <- ri - } - - return false - } -} - -// ReadUint32 reads an integer from an io.Reader (which should be a CSPRNG) -func ReadUint32(rng io.Reader) uint32 { - var rndBytes [4]byte - i, err := rng.Read(rndBytes[:]) - if i != 4 || err != nil { - panic(fmt.Sprintf("cannot read from rng: %+v", err)) - } - return binary.BigEndian.Uint32(rndBytes[:]) -} - -// ReadRangeUint32 reduces an integer from 0, MaxUint32 to the range start, end -func ReadRangeUint32(start, end uint32, rng io.Reader) uint32 { - size := end - start - // note we could just do the part inside the () here, but then extra - // can == size which means a little bit of range is wastes, either - // choice seems negligible so we went with the "more correct" - extra := (math.MaxUint32%size + 1) % size - limit := math.MaxUint32 - extra - // Loop until we read something inside the limit - for { - res := ReadUint32(rng) - if res > limit { - continue - } - return (res % size) + start - } -} \ No newline at end of file diff --git a/network/track.go b/network/track.go new file mode 100644 index 0000000000000000000000000000000000000000..0141a6005ce04767b9dd7e08e812073271e69347 --- /dev/null +++ b/network/track.go @@ -0,0 +1,112 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 Privategrity Corporation / +// / +// All rights reserved. / +//////////////////////////////////////////////////////////////////////////////// + +package network + +// updates.go tracks the network for: +// 1. Node addition and removal +// 2. New/Active/Complete rounds and their contact gateways +// This information is tracked by polling a gateway for the network definition +// file (NDF). Once it detects an event it sends it off to the proper channel +// for a worker to update the client state (add/remove a node, check for +// messages at a gateway, etc). See: +// - nodes.go for add/remove node events +// - rounds.go for round event handling & processing +// - receive.go for message handling + +import ( + "gitlab.com/elixxir/client/network/gateway" + "gitlab.com/elixxir/crypto/csprng" + "gitlab.com/elixxir/primitives/knownRounds" + "gitlab.com/xx_network/comms/connect" + + jww "github.com/spf13/jwalterweatherman" + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/xx_network/primitives/id" + "time" +) + +type trackNetworkComms interface { + GetHost(hostId *id.ID) (*connect.Host, bool) + SendPoll(host *connect.Host, message *pb.GatewayPoll) (*pb.GatewayPollResponse, error) +} + +// TrackNetwork polls the network to get updated on the state of nodes, the +// round status, and informs the client when messages can be retrieved. +func (m *Manager) trackNetwork(quitCh <-chan struct{}) { + ticker := time.NewTicker(m.param.TrackNetworkPeriod) + rng := m.context.Rng.GetStream() + + for { + select { + case <-quitCh: + rng.Close() + break + case <-ticker.C: + m.track(rng, m.comms) + } + } +} + +func (m *Manager) track(rng csprng.Source, comms trackNetworkComms) { + + gwHost, err := gateway.Get(m.instance.GetPartialNdf().Get(), comms, rng) + if err != nil { + jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+ + "data: %s", err) + } + + // Poll for the new NDF + pollReq := pb.GatewayPoll{ + Partial: &pb.NDFHash{ + Hash: m.instance.GetPartialNdf().GetHash(), + }, + LastUpdate: uint64(m.instance.GetLastUpdateID()), + } + pollResp, err := comms.SendPoll(gwHost, &pollReq) + if err != nil { + jww.ERROR.Printf(err.Error()) + return + } + + //handle updates + newNDF := pollResp.PartialNDF + lastTrackedRound := id.Round(pollResp.LastTrackedRound) + roundUpdates := pollResp.Updates + gwRoundsState := &knownRounds.KnownRounds{} + err = gwRoundsState.Unmarshal(pollResp.KnownRounds) + if err != nil { + jww.ERROR.Printf(err.Error()) + return + } + + // ---- NODE EVENTS ---- + // NOTE: this updates the structure AND sends events over the node + // update channels + err = m.instance.UpdatePartialNdf(newNDF) + if err != nil { + jww.ERROR.Printf(err.Error()) + return + } + err = m.instance.RoundUpdates(roundUpdates) + if err != nil { + jww.ERROR.Printf(err.Error()) + return + } + + // ---- Round Processing ----- + //build the round checker + roundChecker := func(rid id.Round) bool { + return m.round.Checker(rid, m.instance) + } + + //check rounds + checkedRounds := m.context.Session.GetCheckedRounds() + checkedRounds.Forward(lastTrackedRound) + checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker, + int(m.param.MaxCheckCheckedRounds)) +} +