diff --git a/api/client.go b/api/client.go index 346025c9bc5502bee5b28807ce43be23796c0730..191b68fb35ac6e8dd5d348f93b06d923eaaed6bf 100644 --- a/api/client.go +++ b/api/client.go @@ -26,6 +26,7 @@ import ( "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" + "time" ) type Client struct { @@ -45,6 +46,10 @@ type Client struct { network interfaces.NetworkManager //object used to register and communicate with permissioning permissioning *permissioning.Permissioning + + //contains stopables for all running threads + runner *stoppable.Multi + status *statusTracker } // NewClient creates client storage, generates keys, connects, and registers @@ -146,6 +151,8 @@ func loadClient(session *storage.Session, rngStreamGen *fastRNG.StreamGenerator) rng: rngStreamGen, comms: nil, network: nil, + runner: stoppable.NewMulti("client"), + status: newStatusTracker(), } //get the user from session @@ -207,21 +214,59 @@ func loadClient(session *storage.Session, rngStreamGen *fastRNG.StreamGenerator) // - Message Handling Worker Group (/network/message/reception.go) // Decrypts and partitions messages when signals via the Switchboard // - Health Tracker (/network/health) -func (c *Client) StartNetworkFollower() (stoppable.Stoppable, error) { +// Via the network instance tracks the state of the network +// - +func (c *Client) StartNetworkFollower() error { jww.INFO.Printf("StartNetworkFollower()") - multi := stoppable.NewMulti("client") + + err := c.status.toStarting() + if err != nil { + return errors.WithMessage(err, "Failed to Start the Network Follower") + } stopFollow, err := c.network.Follow() if err != nil { - return nil, errors.WithMessage(err, "Failed to start following "+ + return errors.WithMessage(err, "Failed to start following "+ "the network") } - multi.Add(stopFollow) + c.runner.Add(stopFollow) // Key exchange - multi.Add(keyExchange.Start(c.switchboard, c.storage, c.network)) - return multi, nil + c.runner.Add(keyExchange.Start(c.switchboard, c.storage, c.network)) + + err = c.status.toRunning() + if err != nil { + return errors.WithMessage(err, "Failed to Start the Network Follower") + } + + return nil } +// stops the network follower if it is running. +// if the network follower is running nad this fails, the client object will +// most likely be in an unrecoverable state and need to be trashed. +func (c *Client) StopNetworkFollower(timeout time.Duration) error { + err := c.status.toStopping() + if err != nil { + return errors.WithMessage(err, "Failed to Stop the Network Follower") + } + err = c.runner.Close(timeout) + if err != nil { + return errors.WithMessage(err, "Failed to Stop the Network Follower") + } + c.runner = stoppable.NewMulti("client") + err = c.status.toStopped() + if err != nil { + return errors.WithMessage(err, "Failed to Stop the Network Follower") + } + return nil +} + +//gets the state of the network follower +func (c *Client) NetworkFollowerStatus() Status { + return c.status.get() +} + + // SendE2E sends an end-to-end payload to the provided recipient with diff --git a/api/status.go b/api/status.go new file mode 100644 index 0000000000000000000000000000000000000000..1b3ee608d326fc092cafc76fad117548942e1e47 --- /dev/null +++ b/api/status.go @@ -0,0 +1,80 @@ +package api + +import ( + "fmt" + "github.com/pkg/errors" + "sync/atomic" +) + +type Status int + +const ( + Stopped Status = 0 + Starting Status = 1000 + Running Status = 2000 + Stopping Status = 3000 +) + +func (s Status) String() string { + switch s { + case Stopped: + return "Stopped" + case Starting: + return "Starting" + case Running: + return "Running" + case Stopping: + return "Stopping" + default: + return fmt.Sprintf("Unknown status %d", s) + } +} + +type statusTracker struct { + s *uint32 +} + +func newStatusTracker() *statusTracker { + s := uint32(Stopped) + return &statusTracker{s: &s} +} + +func (s *statusTracker) toStarting() error { + if !atomic.CompareAndSwapUint32(s.s, uint32(Stopped), uint32(Starting)) { + return errors.Errorf("Failed to move to '%s' status, at '%s', "+ + "must be at '%s' for transition", Starting, atomic.LoadUint32(s.s), + Stopped) + } + return nil +} + +func (s *statusTracker) toRunning() error { + if !atomic.CompareAndSwapUint32(s.s, uint32(Starting), uint32(Running)) { + return errors.Errorf("Failed to move to '%s' status, at '%s', "+ + "must be at '%s' for transition", Running, atomic.LoadUint32(s.s), + Starting) + } + return nil +} + +func (s *statusTracker) toStopping() error { + if !atomic.CompareAndSwapUint32(s.s, uint32(Running), uint32(Stopping)) { + return errors.Errorf("Failed to move to '%s' status, at '%s',"+ + " must be at '%s' for transition", Stopping, atomic.LoadUint32(s.s), + Running) + } + return nil +} + +func (s *statusTracker) toStopped() error { + if !atomic.CompareAndSwapUint32(s.s, uint32(Stopping), uint32(Stopped)) { + return errors.Errorf("Failed to move to '%s' status, at '%s',"+ + " must be at '%s' for transition", Stopped, atomic.LoadUint32(s.s), + Stopping) + } + return nil +} + +func (s *statusTracker) get() Status { + return Status(atomic.LoadUint32(s.s)) +} diff --git a/network/rounds/historical.go b/network/rounds/historical.go index eb0be28158bf036c2af7f2115977bedd3bc12a63..2031b86ac9a2342138714cd8be5a160c080d7e56 100644 --- a/network/rounds/historical.go +++ b/network/rounds/historical.go @@ -15,6 +15,7 @@ import ( jww "github.com/spf13/jwalterweatherman" ) +//interface to increase east of testing of historical rounds type historicalRoundsComms interface { GetHost(hostId *id.ID) (*connect.Host, bool) RequestHistoricalRounds(host *connect.Host, @@ -24,8 +25,10 @@ type historicalRoundsComms interface { // ProcessHistoricalRounds analyzes round history to see if this Client // needs to check for messages at any of the gateways which completed // those rounds. +// Waits to request many rounds at a time or for a timeout to trigger func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-chan struct{}) { - ticker := time.NewTicker(m.params.HistoricalRoundsPeriod) + + timer := make(chan time.Time) rng := m.Rng.GetStream() var rounds []uint64 @@ -33,14 +36,17 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c done := false for !done { shouldProcess := false + // wait for a quit or new round to check select { case <-quitCh: rng.Close() done = true + // if the timer elapses process rounds to ensure the delay isn't too long case <-ticker.C: if len(rounds) > 0 { shouldProcess = true } + // get new round to lookup and force a lookup if case rid := <-m.historicalRounds: rounds = append(rounds, uint64(rid)) if len(rounds) > int(m.params.MaxHistoricalRounds) {