Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • elixxir/client
  • mirabel/client
2 results
Show changes
Commits on Source (52)
Showing
with 279 additions and 45 deletions
......@@ -99,13 +99,22 @@ func (c *Cmix) ReadyToSend() bool {
jww.FATAL.Panicf("Failed to get node registration status: %+v", err)
}
// FIXME: This is a fix put in place because not all nodes in the NDF are
// online. This should be fixed.
total = 340
return numReg >= total*7/10
}
// IsReadyInfo contains information on if the network is ready and how close it
// is to being ready.
//
// Example JSON:
// {
// "IsReady": true,
// "HowClose": 0.534
// }
type IsReadyInfo struct {
IsReady bool
HowClose float64
}
// NetworkFollowerStatus gets the state of the network follower. It returns a
// status with the following values:
// Stopped - 0
......@@ -144,6 +153,44 @@ func (c *Cmix) GetNodeRegistrationStatus() ([]byte, error) {
return json.Marshal(nodeRegReport)
}
// IsReady returns true if at least percentReady of node registrations has
// completed. If not all have completed, then it returns false and howClose will
// be a percent (0-1) of node registrations completed.
//
// Parameters:
// - percentReady - The percentage of nodes required to be registered with to
// be ready. This is a number between 0 and 1.
//
// Returns:
// - JSON of [IsReadyInfo].
func (c *Cmix) IsReady(percentReady float64) ([]byte, error) {
isReady, howClose := c.api.IsReady(percentReady)
return json.Marshal(&IsReadyInfo{isReady, howClose})
}
// PauseNodeRegistrations stops all node registrations and returns a function to
// resume them.
//
// Parameters:
// - timeoutMS - The timeout, in milliseconds, to wait when stopping threads
// before failing.
func (c *Cmix) PauseNodeRegistrations(timeoutMS int) error {
timeout := time.Duration(timeoutMS) * time.Millisecond
return c.api.PauseNodeRegistrations(timeout)
}
// ChangeNumberOfNodeRegistrations changes the number of parallel node
// registrations up to the initialized maximum.
//
// Parameters:
// - toRun - The number of parallel node registrations.
// - timeoutMS - The timeout, in milliseconds, to wait when changing node
// registrations before failing.
func (c *Cmix) ChangeNumberOfNodeRegistrations(toRun, timeoutMS int) error {
timeout := time.Duration(timeoutMS) * time.Millisecond
return c.api.ChangeNumberOfNodeRegistrations(toRun, timeout)
}
// HasRunningProcessies checks if any background threads are running and returns
// true if one or more are.
//
......
......@@ -18,7 +18,7 @@ import (
)
// Change this value to set the version for this build
const currentVersion = "4.3.0"
const currentVersion = "4.3.2"
func Version() string {
out := fmt.Sprintf("Elixxir Client v%s -- %s\n\n", xxdk.SEMVER,
......
......@@ -92,6 +92,8 @@ type client struct {
// Storage of the max message length
maxMsgLen int
numNodes *uint64
}
// NewClient builds a new reception client object using inputted key fields.
......@@ -103,6 +105,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
tracker := uint64(0)
earliest := uint64(0)
numNodes := uint64(0)
netTime.SetTimeSource(localTime{})
// Create client object
......@@ -117,6 +121,7 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
maxMsgLen: tmpMsg.ContentsSize(),
skewTracker: clockSkew.New(params.ClockSkewClamp),
attemptTracker: attempts.NewSendAttempts(),
numNodes: &numNodes,
}
if params.VerboseRoundTracking {
......@@ -134,10 +139,20 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
// initialize turns on network handlers, initializing a host pool and
// network health monitors. This should be called before
// network Follow command is called.
func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
func (c *client) initialize(ndfile *ndf.NetworkDefinition) error {
//set the number of nodes
numNodes := uint64(0)
for _, n := range ndfile.Nodes {
if n.Status != ndf.Stale {
numNodes++
}
}
atomic.StoreUint64(c.numNodes, numNodes)
// Start network instance
instance, err := commNetwork.NewInstance(
c.comms.ProtoComms, ndf, nil, nil, commNetwork.None,
c.comms.ProtoComms, ndfile, nil, nil, commNetwork.None,
c.param.FastPolling)
if err != nil {
return errors.WithMessage(
......@@ -145,7 +160,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
}
c.instance = instance
addrSize := ndf.AddressSpace[len(ndf.AddressSpace)-1].Size
addrSize := ndfile.AddressSpace[len(ndfile.AddressSpace)-1].Size
c.Space = address.NewAddressSpace(addrSize)
/* Set up modules */
......@@ -165,7 +180,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
// Enable optimized HostPool initialization
poolParams.MaxPings = 50
poolParams.ForceConnection = true
sender, err := gateway.NewSender(poolParams, c.rng, ndf, c.comms,
sender, err := gateway.NewSender(poolParams, c.rng, ndfile, c.comms,
c.session, nodeChan)
if err != nil {
return err
......@@ -174,7 +189,9 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
// Set up the node registrar
c.Registrar, err = nodes.LoadRegistrar(
c.session, c.Sender, c.comms, c.rng, nodeChan)
c.session, c.Sender, c.comms, c.rng, nodeChan, func() int {
return int(atomic.LoadUint64(c.numNodes))
})
if err != nil {
return err
}
......
......@@ -27,6 +27,7 @@ import (
"encoding/binary"
"fmt"
"gitlab.com/elixxir/client/cmix/identity/receptionID"
"gitlab.com/xx_network/primitives/ndf"
"sync"
"sync/atomic"
"time"
......@@ -258,6 +259,15 @@ func (c *client) follow(identity receptionID.IdentityUse,
return
}
//set the number of nodes
numNodes := uint64(0)
for _, n := range c.instance.GetPartialNdf().Get().Nodes {
if n.Status != ndf.Stale {
numNodes++
}
}
atomic.StoreUint64(c.numNodes, numNodes)
// update gateway connections
c.UpdateNdf(c.GetInstance().GetPartialNdf().Get())
c.session.SetNDF(c.GetInstance().GetPartialNdf().Get())
......
......@@ -191,6 +191,14 @@ type Client interface {
AddService(clientID *id.ID, newService message.Service,
response message.Processor)
// PauseNodeRegistrations stops all node registrations and returns a
// function to resume them.
PauseNodeRegistrations(timeout time.Duration) error
// ChangeNumberOfNodeRegistrations changes the number of parallel node
// registrations up to the initialized maximum.
ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error
// DeleteService deletes a message service. If only a single response is
// associated with the preimage, the entire preimage is removed. If there is
// more than one response, only the given response is removed. If nil is
......
......@@ -27,6 +27,14 @@ type Registrar interface {
// to register with nodes.
StartProcesses(numParallel uint) stoppable.Stoppable
//PauseNodeRegistrations stops all node registrations
//and returns a function to resume them
PauseNodeRegistrations(timeout time.Duration) error
// ChangeNumberOfNodeRegistrations changes the number of parallel node
// registrations up to the initialized maximum
ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error
// GetNodeKeys returns a MixCypher for the topology and a list of nodes it did
// not have a key for. If there are missing keys, then returns nil.
GetNodeKeys(topology *connect.Circuit) (MixCypher, error)
......
......@@ -13,6 +13,7 @@ import (
"gitlab.com/xx_network/crypto/csprng"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
......@@ -31,16 +32,24 @@ import (
// before an interruption and how many registration attempts have
// been attempted.
func registerNodes(r *registrar, s session, stop *stoppable.Single,
inProgress, attempts *sync.Map) {
inProgress, attempts *sync.Map, index int) {
atomic.AddInt64(r.numberRunning, 1)
interval := time.Duration(500) * time.Millisecond
t := time.NewTicker(interval)
for {
select {
case <-r.pauser:
atomic.AddInt64(r.numberRunning, -1)
select {
case <-stop.Quit():
stop.ToStopped()
return
case <-r.resumer:
atomic.AddInt64(r.numberRunning, 1)
}
case <-stop.Quit():
// On a stop signal, close the thread
t.Stop()
stop.ToStopped()
atomic.AddInt64(r.numberRunning, -1)
return
case gw := <-r.c:
......@@ -107,7 +116,13 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single,
}
}
rng.Close()
case <-t.C:
}
if index >= 2 {
if float64(r.NumRegisteredNodes()) > (float64(r.numnodesGetter()) * .7) {
<-stop.Quit()
stop.ToStopped()
return
}
}
}
}
......
......@@ -20,11 +20,12 @@ import (
"gitlab.com/xx_network/primitives/ndf"
"strconv"
"sync"
"sync/atomic"
"time"
)
const InputChanLen = 1000
const maxAttempts = 2
const maxAttempts = 5
// Backoff for attempting to register with a cMix node.
var delayTable = [5]time.Duration{
......@@ -46,6 +47,20 @@ type registrar struct {
comms RegisterNodeCommsInterface
rng *fastRNG.StreamGenerator
inProgress sync.Map
// We are relying on the in progress check to ensure there is only a single
// operator at a time, as a result this is a map of ID -> int
attempts sync.Map
pauser chan interface{}
resumer chan interface{}
numberRunning *int64
maxRunning int
runnerLock sync.Mutex
numnodesGetter func() int
c chan network.NodeGateway
}
......@@ -53,12 +68,18 @@ type registrar struct {
// exist.
func LoadRegistrar(session session, sender gateway.Sender,
comms RegisterNodeCommsInterface, rngGen *fastRNG.StreamGenerator,
c chan network.NodeGateway) (Registrar, error) {
c chan network.NodeGateway, numNodesGetter func() int) (Registrar, error) {
running := int64(0)
kv := session.GetKV().Prefix(prefix)
r := &registrar{
nodes: make(map[id.ID]*key),
kv: kv,
nodes: make(map[id.ID]*key),
kv: kv,
pauser: make(chan interface{}),
resumer: make(chan interface{}),
numberRunning: &running,
numnodesGetter: numNodesGetter,
}
obj, err := kv.Get(storeKey, currentKeyVersion)
......@@ -89,24 +110,79 @@ func LoadRegistrar(session session, sender gateway.Sender,
// StartProcesses initiates numParallel amount of threads
// to register with nodes.
func (r *registrar) StartProcesses(numParallel uint) stoppable.Stoppable {
multi := stoppable.NewMulti("NodeRegistrations")
inProgress := &sync.Map{}
r.runnerLock.Lock()
defer r.runnerLock.Unlock()
// We are relying on the in progress check to ensure there is only a single
// operator at a time, as a result this is a map of ID -> int
attempts := &sync.Map{}
multi := stoppable.NewMulti("NodeRegistrations")
r.maxRunning = int(numParallel)
for i := uint(0); i < numParallel; i++ {
stop := stoppable.NewSingle("NodeRegistration " + strconv.Itoa(int(i)))
go registerNodes(r, r.session, stop, inProgress, attempts)
go registerNodes(r, r.session, stop, &r.inProgress, &r.attempts, int(i))
multi.Add(stop)
}
return multi
}
//PauseNodeRegistrations stops all node registrations
//and returns a function to resume them
func (r *registrar) PauseNodeRegistrations(timeout time.Duration) error {
r.runnerLock.Lock()
defer r.runnerLock.Unlock()
timer := time.NewTimer(timeout)
defer timer.Stop()
numRegistrations := atomic.LoadInt64(r.numberRunning)
jww.INFO.Printf("PauseNodeRegistrations() - Pausing %d registrations", numRegistrations)
for i := int64(0); i < numRegistrations; i++ {
select {
case r.pauser <- struct{}{}:
case <-timer.C:
return errors.Errorf("Timed out on pausing node registration on %d", i)
}
}
return nil
}
// ChangeNumberOfNodeRegistrations changes the number of parallel node
// registrations up to the initialized maximum
func (r *registrar) ChangeNumberOfNodeRegistrations(toRun int,
timeout time.Duration) error {
r.runnerLock.Lock()
defer r.runnerLock.Unlock()
numRunning := int(atomic.LoadInt64(r.numberRunning))
if toRun+numRunning > r.maxRunning {
return errors.Errorf("Cannot change number of " +
"running node registration to number greater than the max")
}
timer := time.NewTimer(timeout)
defer timer.Stop()
if numRunning < toRun {
jww.INFO.Printf("ChangeNumberOfNodeRegistrations(%d) Reducing number "+
"of node registrations from %d to %d", toRun, numRunning, toRun)
for i := 0; i < toRun-numRunning; i++ {
select {
case r.pauser <- struct{}{}:
case <-timer.C:
return errors.New("Timed out on reducing node registration")
}
}
} else if numRunning > toRun {
jww.INFO.Printf("ChangeNumberOfNodeRegistrations(%d) Increasing number "+
"of node registrations from %d to %d", toRun, numRunning, toRun)
for i := 0; i < toRun-numRunning; i++ {
select {
case r.resumer <- struct{}{}:
case <-timer.C:
return errors.New("Timed out on increasing node registration")
}
}
}
return nil
}
// GetNodeKeys returns a MixCypher for the topology and a list of nodes it did
// not have a key for. If there are missing keys, then returns nil.
func (r *registrar) GetNodeKeys(topology *connect.Circuit) (MixCypher, error) {
......
......@@ -38,7 +38,7 @@ func TestLoadRegistrar_New(t *testing.T) {
nodeChan := make(chan commNetwork.NodeGateway, InputChanLen)
r, err := LoadRegistrar(session, sender, &MockClientComms{},
rngGen, nodeChan)
rngGen, nodeChan, func() int { return 100 })
if err != nil {
t.Fatalf("Failed to create new registrar: %+v", err)
}
......@@ -71,7 +71,7 @@ func TestLoadRegistrar_Load(t *testing.T) {
// Load the store and check its attributes
r, err := LoadRegistrar(
testR.session, testR.sender, testR.comms, testR.rng, testR.c)
testR.session, testR.sender, testR.comms, testR.rng, testR.c, func() int { return 100 })
if err != nil {
t.Fatalf("Unable to load store: %+v", err)
}
......
......@@ -101,7 +101,7 @@ func makeTestRegistrar(mockComms *MockClientComms, t *testing.T) *registrar {
nodeChan := make(chan commNetwork.NodeGateway, InputChanLen)
r, err := LoadRegistrar(
session, sender, mockComms, rngGen, nodeChan)
session, sender, mockComms, rngGen, nodeChan, func() int { return 100 })
if err != nil {
t.Fatalf("Failed to create new registrar: %+v", err)
}
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
// This file is compiled for all architectures except WebAssembly.
//go:build !js || !wasm
package cmix
const defaultParallelNodeRegistration = 20
package cmix
const defaultParallelNodeRegistration = 12
......@@ -100,13 +100,13 @@ func GetDefaultParams() Params {
MaxCheckedRounds: 500,
RegNodesBufferLen: 1000,
NetworkHealthTimeout: 15 * time.Second,
ParallelNodeRegistrations: 20,
ParallelNodeRegistrations: defaultParallelNodeRegistration,
KnownRoundsThreshold: 1500, // 5 rounds/sec * 60 sec/min * 5 min
FastPolling: true,
VerboseRoundTracking: false,
RealtimeOnly: false,
ReplayRequests: true,
MaxParallelIdentityTracks: 20,
MaxParallelIdentityTracks: 5,
ClockSkewClamp: 50 * time.Millisecond,
}
n.Rounds = rounds.GetDefaultParams()
......
......@@ -57,7 +57,7 @@ type paramsDisk struct {
// GetDefaultParams returns a default set of Params.
func GetDefaultParams() Params {
return Params{
NumMessageRetrievalWorkers: 8,
NumMessageRetrievalWorkers: 4,
LookupRoundsBufferLen: 2000,
MaxHistoricalRoundsRetries: 3,
UncheckRoundPeriod: 20 * time.Second,
......
......@@ -184,17 +184,20 @@ func (m *mockCmix) AddFingerprint(*id.ID, format.Fingerprint, message.Processor)
func (m *mockCmix) DeleteFingerprint(*id.ID, format.Fingerprint) {}
func (m *mockCmix) DeleteClientFingerprints(*id.ID) {}
func (m *mockCmix) AddService(*id.ID, message.Service, message.Processor) {}
func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) {}
func (m *mockCmix) DeleteClientService(*id.ID) {}
func (m *mockCmix) TrackServices(message.ServicesTracker) {}
func (m *mockCmix) CheckInProgressMessages() {}
func (m *mockCmix) IsHealthy() bool { return true }
func (m *mockCmix) WasHealthy() bool { return true }
func (m *mockCmix) AddHealthCallback(func(bool)) uint64 { return 0 }
func (m *mockCmix) RemoveHealthCallback(uint64) {}
func (m *mockCmix) HasNode(*id.ID) bool { return true }
func (m *mockCmix) NumRegisteredNodes() int { return 24 }
func (m *mockCmix) TriggerNodeRegistration(*id.ID) {}
func (m *mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) {
return nil
}
func (m *mockCmix) DeleteService(*id.ID, message.Service, message.Processor) {}
func (m *mockCmix) DeleteClientService(*id.ID) {}
func (m *mockCmix) TrackServices(message.ServicesTracker) {}
func (m *mockCmix) CheckInProgressMessages() {}
func (m *mockCmix) IsHealthy() bool { return true }
func (m *mockCmix) WasHealthy() bool { return true }
func (m *mockCmix) AddHealthCallback(func(bool)) uint64 { return 0 }
func (m *mockCmix) RemoveHealthCallback(uint64) {}
func (m *mockCmix) HasNode(*id.ID) bool { return true }
func (m *mockCmix) NumRegisteredNodes() int { return 24 }
func (m *mockCmix) TriggerNodeRegistration(*id.ID) {}
func (m *mockCmix) GetRoundResults(_ time.Duration, roundCallback cmix.RoundEventCallback, _ ...id.Round) {
roundCallback(true, false, nil)
......@@ -214,6 +217,10 @@ func (m *mockCmix) RegisterAddressSpaceNotification(string) (chan uint8, error)
func (m *mockCmix) UnregisterAddressSpaceNotification(string) {}
func (m *mockCmix) GetInstance() *network.Instance { return m.instance }
func (m *mockCmix) GetVerboseRounds() string { return "" }
func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil }
func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error {
return nil
}
////////////////////////////////////////////////////////////////////////////////
// Misc set-up utils //
......
......@@ -61,6 +61,8 @@ type Manager struct {
// Pauses/Resumes the dummy send thread when triggered
statusChan chan bool
totalSent *uint64
// Interfaces
net cmix.Client
store storage.Session
......@@ -96,6 +98,7 @@ func NewManager(maxNumMessages int,
// function is a helper function for NewManager.
func newManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
net cmix.Client, store storage.Session, rng *fastRNG.StreamGenerator) *Manager {
numSent := uint64(8)
return &Manager{
maxNumMessages: maxNumMessages,
avgSendDelta: avgSendDelta,
......@@ -105,6 +108,7 @@ func newManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
net: net,
store: store,
rng: rng,
totalSent: &numSent,
}
}
......
......@@ -35,6 +35,7 @@ func Test_newManager(t *testing.T) {
statusChanLen, cap(received.statusChan))
}
received.statusChan = expected.statusChan
received.totalSent = nil
if !reflect.DeepEqual(expected, received) {
t.Errorf("New manager does not match expected."+
......
......@@ -126,6 +126,11 @@ func (m mockCmix) AddService(clientID *id.ID, newService message.Service, respon
panic("implement me")
}
func (m mockCmix) IncreaseParallelNodeRegistration(int) func() (stoppable.Stoppable, error) {
//TODO implement me
panic("implement me")
}
func (m mockCmix) DeleteService(clientID *id.ID, toDelete message.Service, processor message.Processor) {
//TODO implement me
panic("implement me")
......@@ -235,3 +240,7 @@ func (m mockCmix) GetVerboseRounds() string {
//TODO implement me
panic("implement me")
}
func (m *mockCmix) PauseNodeRegistrations(timeout time.Duration) error { return nil }
func (m *mockCmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error {
return nil
}
......@@ -21,7 +21,12 @@ import (
// Error messages for the Manager.sendThread and its helper functions.
const (
numMsgsRngErr = "failed to generate random number of messages to send: %+v"
numMsgsRngErr = "failed to generate random number of messages to send: %+v"
overrideAvgSendDelta = 10 * time.Minute
overrideRandomRange = 8 * time.Minute
overrideMaxNumMessages = 2
numSendsToOverride = 20
)
// sendThread is a thread that sends the dummy messages at random intervals.
......@@ -32,6 +37,13 @@ func (m *Manager) sendThread(stop *stoppable.Single) {
nextSendChanPtr := &(nextSendChan)
for {
if numSent := atomic.LoadUint64(m.totalSent); numSent > numSendsToOverride {
m.avgSendDelta = overrideAvgSendDelta
m.randomRange = overrideRandomRange
m.maxNumMessages = overrideMaxNumMessages
}
select {
case status := <-m.statusChan:
if status {
......@@ -71,6 +83,8 @@ func (m *Manager) sendThread(stop *stoppable.Single) {
err := m.sendMessages()
if err != nil {
jww.ERROR.Printf("Failed to send dummy messages: %+v", err)
} else {
atomic.AddUint64(m.totalSent, 1)
}
}()
case <-stop.Quit():
......
......@@ -40,6 +40,7 @@ func newTestManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
t *testing.T) *Manager {
store := storage.InitTestingSession(t)
payloadSize := store.GetCmixGroup().GetP().ByteLen()
n := uint64(0)
m := &Manager{
maxNumMessages: maxNumMessages,
avgSendDelta: avgSendDelta,
......@@ -48,6 +49,7 @@ func newTestManager(maxNumMessages int, avgSendDelta, randomRange time.Duration,
store: store,
net: newMockCmix(payloadSize),
rng: fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG),
totalSent: &n,
}
return m
......