Skip to content
Snippets Groups Projects
Commit 5612e728 authored by Jono Wenger's avatar Jono Wenger
Browse files

Add PauseNodeRegistrations and ChangeNumberOfNodeRegistrations

parent be43294a
No related branches found
No related tags found
2 merge requests!510Release,!432Control node reg
...@@ -99,13 +99,29 @@ func (c *Cmix) ReadyToSend() bool { ...@@ -99,13 +99,29 @@ func (c *Cmix) ReadyToSend() bool {
jww.FATAL.Panicf("Failed to get node registration status: %+v", err) jww.FATAL.Panicf("Failed to get node registration status: %+v", err)
} }
// FIXME: This is a fix put in place because not all nodes in the NDF are
// online. This should be fixed.
total = 340
return numReg >= total*7/10 return numReg >= total*7/10
} }
// IsReady returns true if at least percentReady of node registrations has
// completed. If not all have completed, then it returns false and howClose will
// be a percent (0-1) of node registrations completed.
func (c *Cmix) IsReady(percentReady float64) (isReady bool, howClose float64) {
// Check if the network is currently healthy
if !c.api.GetCmix().IsHealthy() {
return false, 0
}
numReg, numNodes, err := c.api.GetNodeRegistrationStatus()
if err != nil {
jww.FATAL.Panicf("Failed to get node registration status: %+v", err)
}
isReady = (float64(numReg) / float64(numNodes)) >= percentReady
howClose = float64(numNodes) / (float64(numReg) * percentReady)
return isReady, howClose
}
// NetworkFollowerStatus gets the state of the network follower. It returns a // NetworkFollowerStatus gets the state of the network follower. It returns a
// status with the following values: // status with the following values:
// Stopped - 0 // Stopped - 0
...@@ -144,6 +160,27 @@ func (c *Cmix) GetNodeRegistrationStatus() ([]byte, error) { ...@@ -144,6 +160,27 @@ func (c *Cmix) GetNodeRegistrationStatus() ([]byte, error) {
return json.Marshal(nodeRegReport) return json.Marshal(nodeRegReport)
} }
// PauseNodeRegistrations stops all node registrations and returns a function to
// resume them.
//
// Parameters:
// - timeoutMS - The timeout, in milliseconds, to wait when stopping threads.
func (c *Cmix) PauseNodeRegistrations(timeoutMS int) error {
timeout := time.Duration(timeoutMS) * time.Millisecond
return c.api.PauseNodeRegistrations(timeout)
}
// ChangeNumberOfNodeRegistrations changes the number of parallel node
// registrations up to the initialized maximum.
//
// Parameters:
// - toRun - The number of parallel node registrations.
// - timeoutMS - The timeout, in milliseconds, to wait when changing node
// registrations.
func (c *Cmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error {
return c.api.ChangeNumberOfNodeRegistrations(toRun, timeout)
}
// HasRunningProcessies checks if any background threads are running and returns // HasRunningProcessies checks if any background threads are running and returns
// true if one or more are. // true if one or more are.
// //
...@@ -188,12 +225,6 @@ func (c *Cmix) AddHealthCallback(nhc NetworkHealthCallback) int64 { ...@@ -188,12 +225,6 @@ func (c *Cmix) AddHealthCallback(nhc NetworkHealthCallback) int64 {
return int64(c.api.GetCmix().AddHealthCallback(nhc.Callback)) return int64(c.api.GetCmix().AddHealthCallback(nhc.Callback))
} }
// IncreaseParallelNodeRegistration increases the number of parallel node
// registrations by num
func (c *Cmix) IncreaseParallelNodeRegistration(num int) error {
return c.api.IncreaseParallelNodeRegistration(num)
}
// RemoveHealthCallback removes a health callback using its registration ID. // RemoveHealthCallback removes a health callback using its registration ID.
func (c *Cmix) RemoveHealthCallback(funcID int64) { func (c *Cmix) RemoveHealthCallback(funcID int64) {
c.api.GetCmix().RemoveHealthCallback(uint64(funcID)) c.api.GetCmix().RemoveHealthCallback(uint64(funcID))
......
...@@ -191,12 +191,12 @@ type Client interface { ...@@ -191,12 +191,12 @@ type Client interface {
AddService(clientID *id.ID, newService message.Service, AddService(clientID *id.ID, newService message.Service,
response message.Processor) response message.Processor)
//PauseNodeRegistrations stops all node registrations // PauseNodeRegistrations stops all node registrations and returns a
//and returns a function to resume them // function to resume them.
PauseNodeRegistrations(timeout time.Duration) error PauseNodeRegistrations(timeout time.Duration) error
// ChangeNumberOfNodeRegistrations changes the number of parallel node // ChangeNumberOfNodeRegistrations changes the number of parallel node
// registrations up to the initialized maximum // registrations up to the initialized maximum.
ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error
// DeleteService deletes a message service. If only a single response is // DeleteService deletes a message service. If only a single response is
......
...@@ -182,7 +182,7 @@ func NewProtoCmix_Unsafe(ndfJSON, storageDir string, password []byte, ...@@ -182,7 +182,7 @@ func NewProtoCmix_Unsafe(ndfJSON, storageDir string, password []byte,
storageSess.SetRegistrationTimestamp(protoUser.RegistrationTimestamp) storageSess.SetRegistrationTimestamp(protoUser.RegistrationTimestamp)
// Move the registration state to indicate registered with registration on // Move the registration state to indicate registered with registration on
// roto client // proto client
err = storageSess.ForwardRegistrationStatus(storage.PermissioningComplete) err = storageSess.ForwardRegistrationStatus(storage.PermissioningComplete)
if err != nil { if err != nil {
return err return err
...@@ -484,12 +484,16 @@ func (c *Cmix) GetNodeRegistrationStatus() (int, int, error) { ...@@ -484,12 +484,16 @@ func (c *Cmix) GetNodeRegistrationStatus() (int, int, error) {
return numRegistered, len(nodes) - numStale, nil return numRegistered, len(nodes) - numStale, nil
} }
// IncreaseParallelNodeRegistration increases the number of parallel node // PauseNodeRegistrations stops all node registrations and returns a function to
// registrations by num // resume them.
func (c *Cmix) IncreaseParallelNodeRegistration(num int) error { func (c *Cmix) PauseNodeRegistrations(timeout time.Duration) error {
jww.INFO.Printf("IncreaseParallelNodeRegistration(%d)", num) return c.network.PauseNodeRegistrations(timeout)
svc := c.network.IncreaseParallelNodeRegistration(num) }
return c.followerServices.add(svc)
// ChangeNumberOfNodeRegistrations changes the number of parallel node
// registrations up to the initialized maximum.
func (c *Cmix) ChangeNumberOfNodeRegistrations(toRun int, timeout time.Duration) error {
return c.network.ChangeNumberOfNodeRegistrations(toRun, timeout)
} }
// GetPreferredBins returns the geographic bin or bins that the provided two // GetPreferredBins returns the geographic bin or bins that the provided two
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment