Skip to content
Snippets Groups Projects
Commit c4f98495 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

created the network/round package

parent 4415acd2
Branches
Tags
No related merge requests found
......@@ -16,9 +16,9 @@ import (
"gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/network/health"
"gitlab.com/elixxir/client/network/parse"
"gitlab.com/elixxir/client/network/rounds"
"gitlab.com/elixxir/comms/client"
"gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/signature/rsa"
"gitlab.com/xx_network/primitives/id"
// "gitlab.com/xx_network/primitives/ndf"
......@@ -54,7 +54,7 @@ type Manager struct {
historicalLookup chan id.Round
// Processing rounds
Processing *ProcessingRounds
Processing *rounds.processing
//local pointer to user ID because it is used often
uid *id.ID
......@@ -96,7 +96,7 @@ func NewManager(ctx *context.Context) (*Manager, error) {
instance: instance,
uid: cryptoUser.GetUserID(),
partitioner: parse.NewPartitioner(msgSize, ctx),
Processing: NewProcessingRounds(),
Processing: rounds.NewProcessingRounds(),
roundUpdate: make(chan *pb.RoundInfo, opts.NumWorkers),
historicalLookup: make(chan id.Round, opts.NumWorkers),
nodeRegistration: make(chan network.NodeGateway,
......@@ -129,7 +129,7 @@ func (m *Manager) StartRunners() error {
}
// Start the Network Tracker
m.runners.Add(StartTrackNetwork(m.Context, m))
m.runners.Add(rounds.StartTrackNetwork(m.Context, m))
// Message reception
m.runners.Add(StartMessageReceivers(m.Context, m))
// Node Updates
......
package network
package rounds
// File for storing info about which rounds are processing
......@@ -7,37 +7,39 @@ import (
"sync"
)
type roundStatus struct {
type status struct {
count uint
processing bool
}
// Struct with a lock so we can manage it with concurrent threads
type ProcessingRounds struct {
rounds map[id.Round]*roundStatus
type processing struct {
rounds map[id.Round]*status
sync.RWMutex
}
// NewProcessingRounds returns a processing rounds object
func NewProcessingRounds() *ProcessingRounds {
return &ProcessingRounds{
rounds: make(map[id.Round]*roundStatus),
func newProcessingRounds() *processing {
return &processing{
rounds: make(map[id.Round]*status),
}
}
// Add a round to the list of processing rounds
// the boolean is true if the round was changes from not processing to processing
// the count is the number of times the round has been processed
func (pr *ProcessingRounds) Process(id id.Round) (bool, uint) {
func (pr *processing) Process(id id.Round) (bool, uint) {
pr.Lock()
defer pr.Unlock()
if rs, ok := pr.rounds[id]; ok {
if rs.processing {
return false, rs.count
}
rs.count++
process := !rs.processing
rs.processing = true
return process, rs.count
return true, rs.count
}
pr.rounds[id] = &roundStatus{
pr.rounds[id] = &status{
count: 0,
processing: true,
}
......@@ -45,7 +47,7 @@ func (pr *ProcessingRounds) Process(id id.Round) (bool, uint) {
}
// Check if a round ID is marked as processing
func (pr *ProcessingRounds) IsProcessing(id id.Round) bool {
func (pr *processing) IsProcessing(id id.Round) bool {
pr.RLock()
defer pr.RUnlock()
if rs, ok := pr.rounds[id]; ok {
......@@ -55,7 +57,7 @@ func (pr *ProcessingRounds) IsProcessing(id id.Round) bool {
}
// set a rounds processing status to failure so it can be retried
func (pr *ProcessingRounds) Fail(id id.Round) {
func (pr *processing) Fail(id id.Round) {
pr.Lock()
defer pr.Unlock()
if rs, ok := pr.rounds[id]; ok {
......@@ -64,7 +66,7 @@ func (pr *ProcessingRounds) Fail(id id.Round) {
}
// Remove a round from the processing list
func (pr *ProcessingRounds) Remove(id id.Round) {
func (pr *processing) Remove(id id.Round) {
pr.Lock()
defer pr.Unlock()
delete(pr.rounds, id)
......
package network
package rounds
// Testing functions for Processing Round structure
import (
"gitlab.com/xx_network/primitives/id"
"gitlab.com/elixxir/client/vendor/gitlab.com/xx_network/primitives/id"
"testing"
)
// Test that the Processing function inserts the round properly
func TestProcessingRounds_Add(t *testing.T) {
pr := ProcessingRounds{rounds: make(map[id.Round]struct{})}
pr := processing{rounds: make(map[id.Round]struct{})}
pr.Add(id.Round(10))
if _, ok := pr.rounds[10]; !ok {
t.Errorf("Could not find round 10 after it was inserted into the map")
......@@ -18,7 +18,7 @@ func TestProcessingRounds_Add(t *testing.T) {
// Test that the IsProcessing function correctly finds the round
func TestProcessingRounds_IsProcessing(t *testing.T) {
pr := ProcessingRounds{rounds: make(map[id.Round]struct{})}
pr := processing{rounds: make(map[id.Round]struct{})}
pr.rounds[id.Round(10)] = struct{}{}
if !pr.IsProcessing(id.Round(10)) {
t.Errorf("IsProcessing reported round 10 is not processing after being set as processing")
......@@ -27,7 +27,7 @@ func TestProcessingRounds_IsProcessing(t *testing.T) {
// Test that the Done function removes the processing round
func TestProcessingRounds_Remove(t *testing.T) {
pr := ProcessingRounds{rounds: make(map[id.Round]struct{})}
pr := processing{rounds: make(map[id.Round]struct{})}
pr.rounds[id.Round(10)] = struct{}{}
pr.Remove(id.Round(10))
if _, ok := pr.rounds[id.Round(10)]; ok {
......
package rounds
type Manager struct {
}
......@@ -4,7 +4,7 @@
// All rights reserved. /
////////////////////////////////////////////////////////////////////////////////
package network
package rounds
// updates.go tracks the network for:
// 1. Node addition and removal
......@@ -22,6 +22,9 @@ import (
"gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/primitives/knownRounds"
//"gitlab.com/elixxir/comms/network"
//"gitlab.com/xx_network/primitives/ndf"
......@@ -34,36 +37,8 @@ import (
"time"
)
// ReadUint32 reads an integer from an io.Reader (which should be a CSPRNG)
func ReadUint32(rng io.Reader) uint32 {
var rndBytes [4]byte
i, err := rng.Read(rndBytes[:])
if i != 4 || err != nil {
panic(fmt.Sprintf("cannot read from rng: %+v", err))
}
return binary.BigEndian.Uint32(rndBytes[:])
}
// ReadRangeUint32 reduces an integer from 0, MaxUint32 to the range start, end
func ReadRangeUint32(start, end uint32, rng io.Reader) uint32 {
size := end - start
// note we could just do the part inside the () here, but then extra
// can == size which means a little bit of range is wastes, either
// choice seems negligible so we went with the "more correct"
extra := (math.MaxUint32%size + 1) % size
limit := math.MaxUint32 - extra
// Loop until we read something inside the limit
for {
res := ReadUint32(rng)
if res > limit {
continue
}
return (res % size) + start
}
}
// StartTrackNetwork starts a single TrackNetwork thread and returns a stoppable
func StartTrackNetwork(ctx *context.Context, net *Manager) stoppable.Stoppable {
func StartTrackNetwork(ctx *context.Context, net *network.Manager) stoppable.Stoppable {
stopper := stoppable.NewSingle("TrackNetwork")
go TrackNetwork(ctx, net, stopper.Quit())
return stopper
......@@ -71,7 +46,7 @@ func StartTrackNetwork(ctx *context.Context, net *Manager) stoppable.Stoppable {
// TrackNetwork polls the network to get updated on the state of nodes, the
// round status, and informs the client when messages can be retrieved.
func TrackNetwork(ctx *context.Context, network *Manager,
func TrackNetwork(ctx *context.Context, network *network.Manager,
quitCh <-chan struct{}) {
opts := params.GetDefaultNetwork()
ticker := time.NewTicker(opts.TrackNetworkPeriod)
......@@ -86,13 +61,11 @@ func TrackNetwork(ctx *context.Context, network *Manager,
}
}
func trackNetwork(ctx *context.Context, network *Manager, maxCheckCnt int) {
func trackNetwork(sess *storage.Session, rng *fastRNG.StreamGenerator, network *network.Manager, maxCheckCnt int) {
instance := ctx.Manager.GetInstance()
comms := network.Comms
ndf := instance.GetPartialNdf().Get()
rng := ctx.Rng.GetStream()
defer rng.Close()
sess := ctx.Session
// Get a random gateway
gateways := ndf.Gateways
......@@ -139,7 +112,7 @@ func trackNetwork(ctx *context.Context, network *Manager, maxCheckCnt int) {
// ---- Round Processing -----
checkedRounds := sess.GetCheckedRounds()
roundChecker := getRoundChecker(ctx, network, roundUpdates)
roundChecker := getRoundChecker(network, roundUpdates)
checkedRounds.Forward(lastTrackedRound)
checkedRounds.RangeUncheckedMasked(gwRoundsState, roundChecker,
maxCheckCnt)
......@@ -149,8 +122,7 @@ func trackNetwork(ctx *context.Context, network *Manager, maxCheckCnt int) {
// gateway to the funky round checker api to update round state.
// The returned function passes round event objects over the context
// to the rest of the message handlers for getting messages.
func getRoundChecker(ctx *context.Context,
network *Manager) func(roundID id.Round) bool {
func getRoundChecker(network *network.Manager) func(roundID id.Round) bool {
return func(roundID id.Round) bool {
//sess := ctx.Session
processing := network.Processing
......@@ -186,3 +158,31 @@ func getRoundChecker(ctx *context.Context,
return false
}
}
// ReadUint32 reads an integer from an io.Reader (which should be a CSPRNG)
func ReadUint32(rng io.Reader) uint32 {
var rndBytes [4]byte
i, err := rng.Read(rndBytes[:])
if i != 4 || err != nil {
panic(fmt.Sprintf("cannot read from rng: %+v", err))
}
return binary.BigEndian.Uint32(rndBytes[:])
}
// ReadRangeUint32 reduces an integer from 0, MaxUint32 to the range start, end
func ReadRangeUint32(start, end uint32, rng io.Reader) uint32 {
size := end - start
// note we could just do the part inside the () here, but then extra
// can == size which means a little bit of range is wastes, either
// choice seems negligible so we went with the "more correct"
extra := (math.MaxUint32%size + 1) % size
limit := math.MaxUint32 - extra
// Loop until we read something inside the limit
for {
res := ReadUint32(rng)
if res > limit {
continue
}
return (res % size) + start
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment