Skip to content
Snippets Groups Projects
Commit 4083743f authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

finished start network follower

parent aae122b6
Branches
Tags
No related merge requests found
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
"gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/crypto/signature/rsa"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/ndf"
"time"
) )
type Client struct { type Client struct {
...@@ -45,6 +46,10 @@ type Client struct { ...@@ -45,6 +46,10 @@ type Client struct {
network interfaces.NetworkManager network interfaces.NetworkManager
//object used to register and communicate with permissioning //object used to register and communicate with permissioning
permissioning *permissioning.Permissioning permissioning *permissioning.Permissioning
//contains stopables for all running threads
runner *stoppable.Multi
status *statusTracker
} }
// NewClient creates client storage, generates keys, connects, and registers // NewClient creates client storage, generates keys, connects, and registers
...@@ -146,6 +151,8 @@ func loadClient(session *storage.Session, rngStreamGen *fastRNG.StreamGenerator) ...@@ -146,6 +151,8 @@ func loadClient(session *storage.Session, rngStreamGen *fastRNG.StreamGenerator)
rng: rngStreamGen, rng: rngStreamGen,
comms: nil, comms: nil,
network: nil, network: nil,
runner: stoppable.NewMulti("client"),
status: newStatusTracker(),
} }
//get the user from session //get the user from session
...@@ -207,21 +214,59 @@ func loadClient(session *storage.Session, rngStreamGen *fastRNG.StreamGenerator) ...@@ -207,21 +214,59 @@ func loadClient(session *storage.Session, rngStreamGen *fastRNG.StreamGenerator)
// - Message Handling Worker Group (/network/message/reception.go) // - Message Handling Worker Group (/network/message/reception.go)
// Decrypts and partitions messages when signals via the Switchboard // Decrypts and partitions messages when signals via the Switchboard
// - Health Tracker (/network/health) // - Health Tracker (/network/health)
func (c *Client) StartNetworkFollower() (stoppable.Stoppable, error) { // Via the network instance tracks the state of the network
// -
func (c *Client) StartNetworkFollower() error {
jww.INFO.Printf("StartNetworkFollower()") jww.INFO.Printf("StartNetworkFollower()")
multi := stoppable.NewMulti("client")
err := c.status.toStarting()
if err != nil {
return errors.WithMessage(err, "Failed to Start the Network Follower")
}
stopFollow, err := c.network.Follow() stopFollow, err := c.network.Follow()
if err != nil { if err != nil {
return nil, errors.WithMessage(err, "Failed to start following "+ return errors.WithMessage(err, "Failed to start following "+
"the network") "the network")
} }
multi.Add(stopFollow) c.runner.Add(stopFollow)
// Key exchange // Key exchange
multi.Add(keyExchange.Start(c.switchboard, c.storage, c.network)) c.runner.Add(keyExchange.Start(c.switchboard, c.storage, c.network))
return multi, nil
err = c.status.toRunning()
if err != nil {
return errors.WithMessage(err, "Failed to Start the Network Follower")
} }
return nil
}
// stops the network follower if it is running.
// if the network follower is running nad this fails, the client object will
// most likely be in an unrecoverable state and need to be trashed.
func (c *Client) StopNetworkFollower(timeout time.Duration) error {
err := c.status.toStopping()
if err != nil {
return errors.WithMessage(err, "Failed to Stop the Network Follower")
}
err = c.runner.Close(timeout)
if err != nil {
return errors.WithMessage(err, "Failed to Stop the Network Follower")
}
c.runner = stoppable.NewMulti("client")
err = c.status.toStopped()
if err != nil {
return errors.WithMessage(err, "Failed to Stop the Network Follower")
}
return nil
}
//gets the state of the network follower
func (c *Client) NetworkFollowerStatus() Status {
return c.status.get()
}
// SendE2E sends an end-to-end payload to the provided recipient with // SendE2E sends an end-to-end payload to the provided recipient with
......
package api
import (
"fmt"
"github.com/pkg/errors"
"sync/atomic"
)
type Status int
const (
Stopped Status = 0
Starting Status = 1000
Running Status = 2000
Stopping Status = 3000
)
func (s Status) String() string {
switch s {
case Stopped:
return "Stopped"
case Starting:
return "Starting"
case Running:
return "Running"
case Stopping:
return "Stopping"
default:
return fmt.Sprintf("Unknown status %d", s)
}
}
type statusTracker struct {
s *uint32
}
func newStatusTracker() *statusTracker {
s := uint32(Stopped)
return &statusTracker{s: &s}
}
func (s *statusTracker) toStarting() error {
if !atomic.CompareAndSwapUint32(s.s, uint32(Stopped), uint32(Starting)) {
return errors.Errorf("Failed to move to '%s' status, at '%s', "+
"must be at '%s' for transition", Starting, atomic.LoadUint32(s.s),
Stopped)
}
return nil
}
func (s *statusTracker) toRunning() error {
if !atomic.CompareAndSwapUint32(s.s, uint32(Starting), uint32(Running)) {
return errors.Errorf("Failed to move to '%s' status, at '%s', "+
"must be at '%s' for transition", Running, atomic.LoadUint32(s.s),
Starting)
}
return nil
}
func (s *statusTracker) toStopping() error {
if !atomic.CompareAndSwapUint32(s.s, uint32(Running), uint32(Stopping)) {
return errors.Errorf("Failed to move to '%s' status, at '%s',"+
" must be at '%s' for transition", Stopping, atomic.LoadUint32(s.s),
Running)
}
return nil
}
func (s *statusTracker) toStopped() error {
if !atomic.CompareAndSwapUint32(s.s, uint32(Stopping), uint32(Stopped)) {
return errors.Errorf("Failed to move to '%s' status, at '%s',"+
" must be at '%s' for transition", Stopped, atomic.LoadUint32(s.s),
Stopping)
}
return nil
}
func (s *statusTracker) get() Status {
return Status(atomic.LoadUint32(s.s))
}
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
) )
//interface to increase east of testing of historical rounds
type historicalRoundsComms interface { type historicalRoundsComms interface {
GetHost(hostId *id.ID) (*connect.Host, bool) GetHost(hostId *id.ID) (*connect.Host, bool)
RequestHistoricalRounds(host *connect.Host, RequestHistoricalRounds(host *connect.Host,
...@@ -24,8 +25,10 @@ type historicalRoundsComms interface { ...@@ -24,8 +25,10 @@ type historicalRoundsComms interface {
// ProcessHistoricalRounds analyzes round history to see if this Client // ProcessHistoricalRounds analyzes round history to see if this Client
// needs to check for messages at any of the gateways which completed // needs to check for messages at any of the gateways which completed
// those rounds. // those rounds.
// Waits to request many rounds at a time or for a timeout to trigger
func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-chan struct{}) { func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-chan struct{}) {
ticker := time.NewTicker(m.params.HistoricalRoundsPeriod)
timer := make(chan time.Time)
rng := m.Rng.GetStream() rng := m.Rng.GetStream()
var rounds []uint64 var rounds []uint64
...@@ -33,14 +36,17 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c ...@@ -33,14 +36,17 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
done := false done := false
for !done { for !done {
shouldProcess := false shouldProcess := false
// wait for a quit or new round to check
select { select {
case <-quitCh: case <-quitCh:
rng.Close() rng.Close()
done = true done = true
// if the timer elapses process rounds to ensure the delay isn't too long
case <-ticker.C: case <-ticker.C:
if len(rounds) > 0 { if len(rounds) > 0 {
shouldProcess = true shouldProcess = true
} }
// get new round to lookup and force a lookup if
case rid := <-m.historicalRounds: case rid := <-m.historicalRounds:
rounds = append(rounds, uint64(rid)) rounds = append(rounds, uint64(rid))
if len(rounds) > int(m.params.MaxHistoricalRounds) { if len(rounds) > int(m.params.MaxHistoricalRounds) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment