Skip to content
Snippets Groups Projects
Commit 9918ae67 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

implemented the changes to the follower so it will multihead and only get updates on the first run

parent 71c9bc59
Branches
Tags
4 merge requests!510Release,!419rewrote the health tracker to both consider if there are waiting rounds and...,!363Xx4160/multi head,!340Project/channels
......@@ -26,6 +26,8 @@ import (
"bytes"
"encoding/binary"
"fmt"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"sync"
"sync/atomic"
"time"
......@@ -68,7 +70,10 @@ func (c *client) followNetwork(report ClientErrorReport,
TrackTicker := time.NewTicker(debugTrackPeriod)
rng := c.rng.GetStream()
// adbandon tracks rounds which data was not found out about in
// the verbose rounds debugging mode
abandon := func(round id.Round) { return }
dummyAbandon := func(round id.Round) { return }
if c.verboseRounds != nil {
abandon = func(round id.Round) {
c.verboseRounds.denote(round, Abandoned)
......@@ -82,7 +87,43 @@ func (c *client) followNetwork(report ClientErrorReport,
stop.ToStopped()
return
case <-ticker.C:
c.follow(report, rng, c.comms, stop, abandon)
// get the list of identities to track
stream := c.rng.GetStream()
toTrack, err := c.Tracker.GetEphemeralIdentities(
int(c.param.MaxParallelIdentityTracks),
stream,
c.Space.GetAddressSpaceWithoutWait())
stream.Close()
if err != nil {
jww.ERROR.Printf("failed to get identities to track")
continue
}
// set up tracking tools
wg := &sync.WaitGroup{}
wg.Add(len(toTrack))
// trigger the first separately because it will get network state
// updates
go func() {
c.follow(toTrack[0], report, rng, c.comms, stop, abandon,
true)
wg.Done()
}()
//trigger all others without getting network state updates
for i := 1; i < len(toTrack); i++ {
go func() {
c.follow(toTrack[i], report, rng, c.comms, stop,
dummyAbandon, false)
wg.Done()
}()
}
//wait for all to complete
wg.Wait()
case <-TrackTicker.C:
numPolls := atomic.SwapUint64(c.tracker, 0)
if c.numLatencies != 0 {
......@@ -108,18 +149,10 @@ func (c *client) followNetwork(report ClientErrorReport,
}
}
// follow executes each iteration of the follower.
func (c *client) follow(report ClientErrorReport, rng csprng.Source,
comms followNetworkComms, stop *stoppable.Single,
abandon func(round id.Round)) {
// Get the identity we will poll for
identity, err := c.GetEphemeralIdentity(
rng, c.Space.GetAddressSpaceWithoutWait())
if err != nil {
jww.FATAL.Panicf(
"Failed to get an identity, this should be impossible: %+v", err)
}
// follow executes an iteration of the follower for a specific identity
func (c *client) follow(identity receptionID.IdentityUse,
report ClientErrorReport, rng csprng.Source, comms followNetworkComms,
stop *stoppable.Single, abandon func(round id.Round), getUpdates bool) {
// While polling with a fake identity, it is necessary to have populated
// earliestRound data. However, as with fake identities, we want the values
......@@ -147,6 +180,7 @@ func (c *client) follow(report ClientErrorReport, rng csprng.Source,
ClientVersion: []byte(version.String()),
FastPolling: c.param.FastPolling,
LastRound: uint64(identity.ER.Get()),
DisableUpdates: !getUpdates,
}
result, err := c.SendToAny(func(host *connect.Host) (interface{}, error) {
......
......@@ -51,6 +51,7 @@ type Tracker interface {
AddIdentity(id *id.ID, validUntil time.Time, persistent bool)
RemoveIdentity(id *id.ID)
GetEphemeralIdentity(rng io.Reader, addressSize uint8) (receptionID.IdentityUse, error)
GetEphemeralIdentities(num int, rng io.Reader, addressSize uint8) ([]receptionID.IdentityUse, error)
GetIdentity(get *id.ID) (TrackedID, error)
}
......
......@@ -58,6 +58,10 @@ type Params struct {
// times.
ReplayRequests bool
// MaxParallelIdentityTracks is the maximum number of parallel identities
// the system will poll in one iteration of the follower
MaxParallelIdentityTracks uint
Rounds rounds.Params
Pickup pickup.Params
Message message.Params
......@@ -80,6 +84,7 @@ type paramsDisk struct {
Pickup pickup.Params
Message message.Params
Historical rounds.Params
MaxParallelIdentityTracks uint
}
// GetDefaultParams returns a Params object containing the
......@@ -96,6 +101,7 @@ func GetDefaultParams() Params {
VerboseRoundTracking: false,
RealtimeOnly: false,
ReplayRequests: true,
MaxParallelIdentityTracks: 20,
}
n.Rounds = rounds.GetDefaultParams()
n.Pickup = pickup.GetDefaultParams()
......@@ -135,6 +141,7 @@ func (p Params) MarshalJSON() ([]byte, error) {
Pickup: p.Pickup,
Message: p.Message,
Historical: p.Historical,
MaxParallelIdentityTracks: p.MaxParallelIdentityTracks,
}
return json.Marshal(&pDisk)
......@@ -163,6 +170,7 @@ func (p *Params) UnmarshalJSON(data []byte) error {
Pickup: pDisk.Pickup,
Message: pDisk.Message,
Historical: pDisk.Historical,
MaxParallelIdentityTracks: pDisk.MaxParallelIdentityTracks,
}
return nil
......
......@@ -13,34 +13,43 @@ require (
github.com/spf13/jwalterweatherman v1.1.0
github.com/spf13/viper v1.12.0
gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f
gitlab.com/elixxir/comms v0.0.4-0.20220603231314-e47e4af13326
gitlab.com/elixxir/comms v0.0.4-0.20220826205301-ea9f03d7469b
gitlab.com/elixxir/crypto v0.0.7-0.20220826192201-c00efec3c556
gitlab.com/elixxir/ekv v0.1.7
gitlab.com/elixxir/primitives v0.0.3-0.20220606195757-40f7a589347f
gitlab.com/xx_network/comms v0.0.4-0.20220630163702-f3d372ef6acd
gitlab.com/elixxir/primitives v0.0.3-0.20220810173935-592f34a88326
gitlab.com/xx_network/comms v0.0.4-0.20220826205007-a667162c07c7
gitlab.com/xx_network/crypto v0.0.5-0.20220729193517-1e5e96f39f6e
gitlab.com/xx_network/primitives v0.0.4-0.20220712193914-aebd8544396e
go.uber.org/ratelimit v0.2.0
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
golang.org/x/net v0.0.0-20220802222814-0bcc04d9c69b
google.golang.org/grpc v1.48.0
golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c
google.golang.org/grpc v1.49.0
google.golang.org/protobuf v1.28.1
)
require (
git.xx.network/elixxir/grpc-web-go-client v0.0.0-20220826174128-b60c76b23331 // indirect
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/badoux/checkmail v1.2.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/elliotchance/orderedmap v1.4.0 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/improbable-eng/grpc-web v0.15.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.11.7 // indirect
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
github.com/ktr0731/grpc-web-go-client v0.2.8 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.2 // indirect
github.com/rs/cors v1.8.2 // indirect
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
......@@ -50,10 +59,12 @@ require (
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/zeebo/blake3 v0.2.3 // indirect
gitlab.com/xx_network/ring v0.0.3-0.20220222211904-da613960ad93 // indirect
go.uber.org/atomic v1.10.0 // indirect
golang.org/x/sys v0.0.0-20220731174439-a90be440212d // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78 // indirect
google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc // indirect
gopkg.in/ini.v1 v1.66.6 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
nhooyr.io/websocket v1.8.7 // indirect
)
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment