diff --git a/api/authenticatedChannel.go b/api/authenticatedChannel.go index 24b5a8e4af3c771d59dabc3e01d6ffa1b8125a17..746572e94e73eb85c16dc60851adb04d901297a1 100644 --- a/api/authenticatedChannel.go +++ b/api/authenticatedChannel.go @@ -19,7 +19,7 @@ import ( // RequestAuthenticatedChannel sends a request to another party to establish an // authenticated channel -// It will not run if the network status is not healthy +// It will not run if the network state is not healthy // An error will be returned if a channel already exists or if a request was // already received // When a confirmation occurs, the channel will be created and the callback @@ -57,7 +57,7 @@ func (c *Client) GetAuthenticatedChannelRequest(partner *id.ID) (contact.Contact // ConfirmAuthenticatedChannel creates an authenticated channel out of a valid // received request and sends a message to the requestor that the request has // been confirmed -// It will not run if the network status is not healthy +// It will not run if the network state is not healthy // An error will be returned if a channel already exists, if a request doest // exist, or if the passed in contact does not exactly match the received // request diff --git a/api/client.go b/api/client.go index c7d27fe3ef8aa067643bd164e9b8ab94205dc9a4..02467fe087fd01751aebca1b8aad4f3df72a5fca 100644 --- a/api/client.go +++ b/api/client.go @@ -32,7 +32,6 @@ import ( "gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/region" "math" - "sync" "time" ) @@ -60,18 +59,10 @@ type Client struct { //object containing auth interactions auth *auth.Manager - //contains stopables for all running threads - runner *stoppable.Multi - status *statusTracker - - //handler for external services - services *serviceProcessiesList + //services system to track running threads + followerServices *services clientErrorChannel chan interfaces.ClientError - - //lock to ensure only once instance of stop/start network follower is - //going at a time - followerLock sync.Mutex } // NewClient creates client storage, generates keys, connects, and registers @@ -185,14 +176,14 @@ func OpenClient(storageDir string, password []byte, parameters params.Network) ( // Set up a new context c := &Client{ - storage: storageSess, - switchboard: switchboard.New(), - rng: rngStreamGen, - comms: nil, - network: nil, - runner: stoppable.NewMulti(followerStoppableName), - status: newStatusTracker(), - parameters: parameters, + storage: storageSess, + switchboard: switchboard.New(), + rng: rngStreamGen, + comms: nil, + network: nil, + followerServices: newServices(), + parameters: parameters, + clientErrorChannel: make(chan interfaces.ClientError, 1000), } return c, nil @@ -212,9 +203,6 @@ func Login(storageDir string, password []byte, parameters params.Network) (*Clie jww.INFO.Printf("Client Logged in: \n\tTransmisstionID: %s "+ "\n\tReceptionID: %s", u.TransmissionID, u.ReceptionID) - //Attach the services interface - c.services = newServiceProcessiesList(c.runner) - // initialize comms err = c.initComms() if err != nil { @@ -281,9 +269,6 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, return nil, err } - //Attach the services interface - c.services = newServiceProcessiesList(c.runner) - //initialize comms err = c.initComms() if err != nil { @@ -358,8 +343,51 @@ func (c *Client) initPermissioning(def *ndf.NetworkDefinition) error { return nil } +func (c *Client) registerFollower() error { + //build the error callback + cer := func(source, message, trace string) { + select { + case c.clientErrorChannel <- interfaces.ClientError{ + Source: source, + Message: message, + Trace: trace, + }: + default: + jww.WARN.Printf("Failed to notify about ClientError from %s: %s", source, message) + } + } + + //register the core follower service + err := c.followerServices.add(func() (stoppable.Stoppable, error) { return c.network.Follow(cer) }) + if err != nil { + return errors.WithMessage(err, "Failed to start following "+ + "the network") + } + + //register the incremental key upgrade service + err = c.followerServices.add(c.auth.StartProcesses) + if err != nil { + return errors.WithMessage(err, "Failed to start following "+ + "the network") + } + + //register the key exchange service + keyXchange := func() (stoppable.Stoppable, error) { + return keyExchange.Start(c.switchboard, c.storage, c.network, c.parameters.Rekey) + } + err = c.followerServices.add(keyXchange) + + return nil +} + // ----- Client Functions ----- +// GetErrorsChannel returns a channel which passess errors from the +// long running threads controlled by StartNetworkFollower and StopNetworkFollower +func (c *Client) GetErrorsChannel() <-chan interfaces.ClientError { + return c.clientErrorChannel +} + // StartNetworkFollower kicks off the tracking of the network. It starts // long running network client threads and returns an object for checking // state and stopping those threads. @@ -390,94 +418,22 @@ func (c *Client) initPermissioning(def *ndf.NetworkDefinition) error { // Responds to confirmations of successful rekey operations // - Auth Callback (/auth/callback.go) // Handles both auth confirm and requests -func (c *Client) StartNetworkFollower(timeout time.Duration) (<-chan interfaces.ClientError, error) { - c.followerLock.Lock() - defer c.followerLock.Unlock() +func (c *Client) StartNetworkFollower(timeout time.Duration) error { u := c.GetUser() jww.INFO.Printf("StartNetworkFollower() \n\tTransmisstionID: %s "+ "\n\tReceptionID: %s", u.TransmissionID, u.ReceptionID) - if status := c.status.get(); status != Stopped { - return nil, errors.Errorf("Cannot Stop the Network Follower when it is not running, status: %s", status) - } - - c.clientErrorChannel = make(chan interfaces.ClientError, 1000) - - cer := func(source, message, trace string) { - select { - case c.clientErrorChannel <- interfaces.ClientError{ - Source: source, - Message: message, - Trace: trace, - }: - default: - jww.WARN.Printf("Failed to notify about ClientError from %s: %s", source, message) - } - } - - // Wait for any threads from the previous follower to close and then create - // a new stoppable - err := stoppable.WaitForStopped(c.runner, timeout) - if err != nil { - return nil, err - } else { - c.runner = stoppable.NewMulti(followerStoppableName) - } - - err = c.status.toStarting() - if err != nil { - return nil, errors.WithMessage(err, "Failed to Start the Network Follower") - } - - stopAuth := c.auth.StartProcesses() - c.runner.Add(stopAuth) - - stopFollow, err := c.network.Follow(cer) - if err != nil { - return nil, errors.WithMessage(err, "Failed to start following "+ - "the network") - } - c.runner.Add(stopFollow) - // Key exchange - c.runner.Add(keyExchange.Start(c.switchboard, c.storage, c.network, c.parameters.Rekey)) - - err = c.status.toRunning() - if err != nil { - return nil, errors.WithMessage(err, "Failed to Start the Network Follower") - } - - c.services.run(c.runner) - - return c.clientErrorChannel, nil + return c.followerServices.start(timeout) } // StopNetworkFollower stops the network follower if it is running. -// It returns errors if the Follower is in the wrong status to stop or if it +// It returns errors if the Follower is in the wrong state to stop or if it // fails to stop it. // if the network follower is running and this fails, the client object will // most likely be in an unrecoverable state and need to be trashed. func (c *Client) StopNetworkFollower() error { - c.followerLock.Lock() - defer c.followerLock.Unlock() - - if status := c.status.get(); status != Running { - return errors.Errorf("Cannot Stop the Network Follower when it is not running, status: %s", status) - } - - err := c.status.toStopping() - if err != nil { - return errors.WithMessage(err, "Failed to Stop the Network Follower") - } - err = c.runner.Close() - err2 := c.status.toStopped() - if err2 != nil { - if err == nil { - err = err2 - } else { - err = errors.WithMessage(err, err2.Error()) - } - } - return err + jww.INFO.Printf("StopNetworkFollower()") + return c.followerServices.stop() } // NetworkFollowerStatus Gets the state of the network follower. Returns: @@ -487,7 +443,7 @@ func (c *Client) StopNetworkFollower() error { // Stopping - 3000 func (c *Client) NetworkFollowerStatus() Status { jww.INFO.Printf("NetworkFollowerStatus()") - return c.status.get() + return c.followerServices.status() } // Returns the health tracker for registration and polling @@ -512,8 +468,8 @@ func (c *Client) GetRoundEvents() interfaces.RoundEvents { // AddService adds a service ot be controlled by the client thread control, // these will be started and stopped with the network follower -func (c *Client) AddService(sp ServiceProcess) { - c.services.Add(sp) +func (c *Client) AddService(sp Service) error { + return c.followerServices.add(sp) } // GetUser returns the current user Identity for this client. This @@ -543,7 +499,7 @@ func (c *Client) GetNetworkInterface() interfaces.NetworkManager { return c.network } -// GetNodeRegistrationStatus gets the current status of node registration. It +// GetNodeRegistrationStatus gets the current state of node registration. It // returns the the total number of nodes in the NDF and the number of those // which are currently registers with. An error is returned if the network is // not healthy. diff --git a/api/permissioning.go b/api/permissioning.go index 3fd0153e0116837f4d36c2e30b1bf5181e84d9b7..ef94e61daf315472ecde87eddc360335c08b445d 100644 --- a/api/permissioning.go +++ b/api/permissioning.go @@ -39,7 +39,7 @@ func (c *Client) registerWithPermissioning() error { userData.SetReceptionRegistrationValidationSignature(receptionRegValidationSignature) userData.SetRegistrationTimestamp(registrationTimestamp) - //update the registration status + //update the registration state err = c.storage.ForwardRegistrationStatus(storage.PermissioningComplete) if err != nil { return errors.WithMessage(err, "failed to update local state "+ diff --git a/api/processies.go b/api/processies.go index ad4a714992660faf386ea72e23a6e1de1d90b9f7..42630e219f517214d9d494284d12f243b43a30b3 100644 --- a/api/processies.go +++ b/api/processies.go @@ -1,48 +1,113 @@ package api import ( + "github.com/pkg/errors" "gitlab.com/elixxir/client/stoppable" "sync" + "time" ) // a service process starts itself in a new thread, returning from the // originator a stopable to control it -type ServiceProcess func() stoppable.Stoppable +type Service func() (stoppable.Stoppable, error) -type serviceProcessiesList struct { - serviceProcessies []ServiceProcess - multiStopable *stoppable.Multi - mux sync.Mutex +type services struct { + services []Service + stoppable *stoppable.Multi + state Status + mux sync.Mutex } -// newServiceProcessiesList creates a new processies list which will add its -// processies to the passed mux -func newServiceProcessiesList(m *stoppable.Multi) *serviceProcessiesList { - return &serviceProcessiesList{ - serviceProcessies: make([]ServiceProcess, 0), - multiStopable: m, +// newServiceProcessiesList creates a new services list which will add its +// services to the passed mux +func newServices() *services { + return &services{ + services: make([]Service, 0), + stoppable: stoppable.NewMulti("services"), + state: Stopped, } } -// Add adds the service process to the list and adds it to the multi-stopable -func (spl serviceProcessiesList) Add(sp ServiceProcess) { - spl.mux.Lock() - defer spl.mux.Unlock() +// Add adds the service process to the list and adds it to the multi-stopable. +// Start running it if services are running +func (s *services) add(sp Service) error { + s.mux.Lock() + defer s.mux.Unlock() - spl.serviceProcessies = append(spl.serviceProcessies, sp) - // starts the process and adds it to the stopable - // there can be a race condition between the execution of the process and - // the stopable. - spl.multiStopable.Add(sp()) + //append the process to the list + s.services = append(s.services, sp) + + //if services are running, start the process + if s.state == Running { + stop, err := sp() + if err != nil { + return errors.WithMessage(err, "Failed to start added service") + } + s.stoppable.Add(stop) + } + return nil } -// Runs all processies, to be used after a stop. Must use a new stopable -func (spl serviceProcessiesList) run(m *stoppable.Multi) { - spl.mux.Lock() - defer spl.mux.Unlock() +// Runs all services. If they are in the process of stopping, +// it will wait for the stop to complete or the timeout to ellapse +// Will error if already running +func (s *services) start(timeout time.Duration) error { + s.mux.Lock() + defer s.mux.Unlock() + + //handle various states + switch s.state { + case Stopped: + break + case Running: + return errors.New("Cannot start services when already Running") + case Stopping: + err := stoppable.WaitForStopped(s.stoppable, timeout) + if err != nil { + return errors.Errorf("Procesies did not all stop within %s, "+ + "unable to start services: %+v", timeout, err) + } + } + + //create a new stopable + s.stoppable = stoppable.NewMulti(followerStoppableName) - spl.multiStopable = m - for _, sp := range spl.serviceProcessies { - spl.multiStopable.Add(sp()) + //start all services and register with the stoppable + for _, sp := range s.services { + stop, err := sp() + if err != nil { + return errors.WithMessage(err, "Failed to start added service") + } + s.stoppable.Add(stop) } + + s.state = Running + + return nil +} + +// Stops all currently running services. Will return an +// error if the state is not "running" +func (s *services) stop() error { + s.mux.Lock() + defer s.mux.Unlock() + + if s.state != Running { + return errors.Errorf("cannot stop services when they "+ + "are not Running, services are: %s", s.state) + } + + if err := s.stoppable.Close(); err != nil { + return errors.WithMessage(err, "Failed to stop services") + } + + return nil +} + +// returns the current state of services +func (s *services) status() Status { + s.mux.Lock() + defer s.mux.Unlock() + + return s.state } diff --git a/api/results.go b/api/results.go index d87c538c38ecb9f8baebe489815cca148870d616..f3987393121402f3565ec392c1930a8b29fcafd7 100644 --- a/api/results.go +++ b/api/results.go @@ -107,7 +107,7 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, roundsResults[rnd] = Failed allRoundsSucceeded = false } else { - // If in progress, add a channel monitoring its status + // If in progress, add a channel monitoring its state roundEvents.AddRoundEventChan(rnd, sendResults, timeout-time.Millisecond, states.COMPLETED, states.FAILED) numResults++ @@ -205,7 +205,7 @@ func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds, return } - // Process historical rounds, sending back to the caller thread + // Service historical rounds, sending back to the caller thread for _, ri := range resp.Rounds { sendResults <- ds.EventReturn{ RoundInfo: ri, diff --git a/api/status.go b/api/status.go index 7fd31ae937955d12cb76e8471182c7a406df5b76..33e567725a16b457e936270a3f43acaf7c671129 100644 --- a/api/status.go +++ b/api/status.go @@ -9,15 +9,12 @@ package api import ( "fmt" - "github.com/pkg/errors" - "sync/atomic" ) type Status int const ( Stopped Status = 0 - Starting Status = 1000 Running Status = 2000 Stopping Status = 3000 ) @@ -26,62 +23,11 @@ func (s Status) String() string { switch s { case Stopped: return "Stopped" - case Starting: - return "Starting" case Running: return "Running" case Stopping: return "Stopping" default: - return fmt.Sprintf("Unknown status %d", s) + return fmt.Sprintf("Unknown state %d", s) } } - -type statusTracker struct { - s *uint32 -} - -func newStatusTracker() *statusTracker { - s := uint32(Stopped) - return &statusTracker{s: &s} -} - -func (s *statusTracker) toStarting() error { - if !atomic.CompareAndSwapUint32(s.s, uint32(Stopped), uint32(Starting)) { - return errors.Errorf("Failed to move to '%s' status, at '%s', "+ - "must be at '%s' for transition", Starting, - Status(atomic.LoadUint32(s.s)), Stopped) - } - return nil -} - -func (s *statusTracker) toRunning() error { - if !atomic.CompareAndSwapUint32(s.s, uint32(Starting), uint32(Running)) { - return errors.Errorf("Failed to move to '%s' status, at '%s', "+ - "must be at '%s' for transition", - Running, Status(atomic.LoadUint32(s.s)), Starting) - } - return nil -} - -func (s *statusTracker) toStopping() error { - if !atomic.CompareAndSwapUint32(s.s, uint32(Running), uint32(Stopping)) { - return errors.Errorf("Failed to move to '%s' status, at '%s',"+ - " must be at '%s' for transition", Stopping, - Status(atomic.LoadUint32(s.s)), Running) - } - return nil -} - -func (s *statusTracker) toStopped() error { - if !atomic.CompareAndSwapUint32(s.s, uint32(Stopping), uint32(Stopped)) { - return errors.Errorf("Failed to move to '%s' status, at '%s',"+ - " must be at '%s' for transition", Stopped, - Status(atomic.LoadUint32(s.s)), Stopping) - } - return nil -} - -func (s *statusTracker) get() Status { - return Status(atomic.LoadUint32(s.s)) -} diff --git a/auth/callback.go b/auth/callback.go index c9a7e2d12307bb616c82aefafaebca8055c026ed..b713ce5abe73b1b7f0f7444b5fcf54e5cd74aa75 100644 --- a/auth/callback.go +++ b/auth/callback.go @@ -23,7 +23,7 @@ import ( "strings" ) -func (m *Manager) StartProcesses() stoppable.Stoppable { +func (m *Manager) StartProcesses() (stoppable.Stoppable, error) { stop := stoppable.NewSingle("Auth") go func() { @@ -38,7 +38,7 @@ func (m *Manager) StartProcesses() stoppable.Stoppable { } }() - return stop + return stop, nil } func (m *Manager) processAuthMessage(msg message.Receive) { diff --git a/bindings/client.go b/bindings/client.go index 8e24a3b017ca18467f22f547a2b1a6cd2b4bda09..f6fc9ef358c1a2000767f11d9f712f7ecd22fbc5 100644 --- a/bindings/client.go +++ b/bindings/client.go @@ -205,20 +205,20 @@ func UnmarshalSendReport(b []byte) (*SendReport, error) { // Responds to sent rekeys and executes them // - KeyExchange Confirm (/keyExchange/confirm.go) // Responds to confirmations of successful rekey operations -func (c *Client) StartNetworkFollower(clientError ClientError, timeoutMS int) error { +func (c *Client) StartNetworkFollower(timeoutMS int) error { timeout := time.Duration(timeoutMS) * time.Millisecond - errChan, err := c.api.StartNetworkFollower(timeout) - if err != nil { - return errors.New(fmt.Sprintf("Failed to start the "+ - "network follower: %+v", err)) - } + return c.api.StartNetworkFollower(timeout) +} +// RegisterClientErrorCallback registers the callback to handle errors from the +// long running threads controlled by StartNetworkFollower and StopNetworkFollower +func (c *Client) RegisterClientErrorCallback(clientError ClientError) { + errChan := c.api.GetErrorsChannel() go func() { for report := range errChan { go clientError.Report(report.Source, report.Message, report.Trace) } }() - return nil } // StopNetworkFollower stops the network follower if it is running. @@ -505,12 +505,12 @@ func (c *Client) getSingle() (*single.Manager, error) { c.singleMux.Lock() defer c.singleMux.Unlock() if c.single == nil { - if !c.IsNetworkHealthy() { - return nil, errors.New("cannot return single manager, network si not healthy") - } apiClient := &c.api c.single = single.NewManager(apiClient) - apiClient.AddService(c.single.StartProcesses) + err := apiClient.AddService(c.single.StartProcesses) + if err != nil { + return nil, err + } } return c.single, nil diff --git a/bindings/group.go b/bindings/group.go index 8752c6665dac3461c8c452d9fbdb558a2f834899..00c45668e08563488884f68e5c9f76b2a0907687 100644 --- a/bindings/group.go +++ b/bindings/group.go @@ -50,7 +50,10 @@ func NewGroupManager(client *Client, requestFunc GroupRequestFunc, } // Start group request and message retrieval workers - client.api.AddService(m.StartProcesses) + err = client.api.AddService(m.StartProcesses) + if err != nil { + return GroupChat{}, err + } return GroupChat{m}, nil } diff --git a/cmd/group.go b/cmd/group.go index b8064fcb67df8c94c334017023449d1c6e8f99e9..f1e4508a72e2ce6c0c29b836e67e7b0bef8d91d9 100644 --- a/cmd/group.go +++ b/cmd/group.go @@ -38,7 +38,7 @@ var groupCmd = &cobra.Command{ _, _ = initClientCallbacks(client) - _, err := client.StartNetworkFollower(5 * time.Second) + err := client.StartNetworkFollower(5 * time.Second) if err != nil { jww.FATAL.Panicf("%+v", err) } @@ -129,7 +129,10 @@ func initGroupManager(client *api.Client) (*groupChat.Manager, } // Start group request and message receiver - client.AddService(manager.StartProcesses) + err = client.AddService(manager.StartProcesses) + if err != nil { + jww.FATAL.Panicf("Failed to start groupchat services: %+v", err) + } return manager, recChan, reqChan } diff --git a/cmd/root.go b/cmd/root.go index 3dacf0e7b93ad1e30b3f0a93699fb624e599144a..9ebdc637ac86440f250319c0db384eb16632fda3 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -93,7 +93,7 @@ var rootCmd = &cobra.Command{ } }() - _, err := client.StartNetworkFollower(5 * time.Second) + err := client.StartNetworkFollower(5 * time.Second) if err != nil { jww.FATAL.Panicf("%+v", err) } diff --git a/cmd/single.go b/cmd/single.go index b827627fa2af0be3f5ec0fa338170f7af6a36493..442279694e886c341f4479a4e2dd7a476bb23783 100644 --- a/cmd/single.go +++ b/cmd/single.go @@ -62,7 +62,7 @@ var singleCmd = &cobra.Command{ }) } - _, err := client.StartNetworkFollower(5 * time.Second) + err := client.StartNetworkFollower(5 * time.Second) if err != nil { jww.FATAL.Panicf("%+v", err) } diff --git a/cmd/ud.go b/cmd/ud.go index 2a3b3d31927ee62522444c179c7dab50c859f4cd..02cc042b49b13239d700cb58d5631c697db0ba74 100644 --- a/cmd/ud.go +++ b/cmd/ud.go @@ -62,7 +62,7 @@ var udCmd = &cobra.Command{ }) } - _, err := client.StartNetworkFollower(50 * time.Millisecond) + err := client.StartNetworkFollower(50 * time.Millisecond) if err != nil { jww.FATAL.Panicf("%+v", err) } diff --git a/groupChat/manager.go b/groupChat/manager.go index f6044ed2269ed83e2ee96899f2768b27ce76e6f6..2e2d18fadee2e5e246cf3ea42951305e14053963 100644 --- a/groupChat/manager.go +++ b/groupChat/manager.go @@ -96,7 +96,7 @@ func newManager(client *api.Client, userID *id.ID, userDhKey *cyclic.Int, } // StartProcesses starts the reception worker. -func (m *Manager) StartProcesses() stoppable.Stoppable { +func (m *Manager) StartProcesses() (stoppable.Stoppable, error) { // Start group reception worker receiveStop := stoppable.NewSingle(receiveStoppableName) receiveChan := make(chan message.Receive, rawMessageBuffSize) @@ -116,7 +116,7 @@ func (m *Manager) StartProcesses() stoppable.Stoppable { multiStoppable.Add(receiveStop) multiStoppable.Add(requestStop) - return multiStoppable + return multiStoppable, nil } // JoinGroup adds the group to the list of group chats the user is a part of. diff --git a/keyExchange/exchange.go b/keyExchange/exchange.go index 42b7a177e10595bf45dd1b11fe7715e3a0b0c4f2..121d3b2b4e0853d96a3509a188fe5da56e264179 100644 --- a/keyExchange/exchange.go +++ b/keyExchange/exchange.go @@ -22,7 +22,7 @@ const keyExchangeConfirmName = "KeyExchangeConfirm" const keyExchangeMulti = "KeyExchange" func Start(switchboard *switchboard.Switchboard, sess *storage.Session, net interfaces.NetworkManager, - params params.Rekey) stoppable.Stoppable { + params params.Rekey) (stoppable.Stoppable, error) { // register the rekey trigger thread triggerCh := make(chan message.Receive, 100) @@ -57,5 +57,5 @@ func Start(switchboard *switchboard.Switchboard, sess *storage.Session, net inte exchangeStop := stoppable.NewMulti(keyExchangeMulti) exchangeStop.Add(triggerStop) exchangeStop.Add(confirmStop) - return exchangeStop + return exchangeStop, nil } diff --git a/single/manager.go b/single/manager.go index 063f7e8541e0f8f3f9063228fbc33d618247e2b7..9562137bbe4dbb0ad3fa9885a241d79d9825a5ea 100644 --- a/single/manager.go +++ b/single/manager.go @@ -69,7 +69,7 @@ func newManager(client *api.Client, reception *reception.Store) *Manager { // StartProcesses starts the process of receiving single-use transmissions and // replies. -func (m *Manager) StartProcesses() stoppable.Stoppable { +func (m *Manager) StartProcesses() (stoppable.Stoppable, error) { // Start waiting for single-use transmission transmissionStop := stoppable.NewSingle(singleUseTransmission) transmissionChan := make(chan message.Receive, rawMessageBuffSize) @@ -87,7 +87,7 @@ func (m *Manager) StartProcesses() stoppable.Stoppable { singleUseMulti.Add(transmissionStop) singleUseMulti.Add(responseStop) - return singleUseMulti + return singleUseMulti, nil } // RegisterCallback registers a callback for received messages. diff --git a/single/manager_test.go b/single/manager_test.go index 28c3c15d0b62029c7fc0280cd83b529dd05af3ef..1a62edf8405d450a82688e05a1e7b04af3f5ef06 100644 --- a/single/manager_test.go +++ b/single/manager_test.go @@ -90,7 +90,7 @@ func TestManager_StartProcesses(t *testing.T) { m.callbackMap.registerCallback(tag, callback) - _ = m.StartProcesses() + _, _ = m.StartProcesses() m.swb.(*switchboard.Switchboard).Speak(receiveMsg) timer := time.NewTimer(50 * time.Millisecond) @@ -176,7 +176,7 @@ func TestManager_StartProcesses_Stop(t *testing.T) { m.callbackMap.registerCallback(tag, callback) - stop := m.StartProcesses() + stop, _ := m.StartProcesses() if !stop.IsRunning() { t.Error("Stoppable is not running.") } diff --git a/ud/lookup_test.go b/ud/lookup_test.go index 8bdb21a1ae0d347b8561da54a52d4c9dcf7bb33e..02162faaf2e47e555fe54dd8a6fd1d9a3dd36f03 100644 --- a/ud/lookup_test.go +++ b/ud/lookup_test.go @@ -198,6 +198,6 @@ func (s *mockSingleLookup) TransmitSingleUse(_ contact.Contact, payload []byte, return nil } -func (s *mockSingleLookup) StartProcesses() stoppable.Stoppable { - return stoppable.NewSingle("") +func (s *mockSingleLookup) StartProcesses() (stoppable.Stoppable, error) { + return stoppable.NewSingle(""), nil } diff --git a/ud/manager.go b/ud/manager.go index 1b80b1ad22bcb07fb94a5944a78000c2ef56f45d..201f1e22fd317e4611bec3486bef5e7dc32651f6 100644 --- a/ud/manager.go +++ b/ud/manager.go @@ -22,7 +22,7 @@ import ( type SingleInterface interface { TransmitSingleUse(contact.Contact, []byte, string, uint8, single.ReplyComm, time.Duration) error - StartProcesses() stoppable.Stoppable + StartProcesses() (stoppable.Stoppable, error) } type Manager struct { diff --git a/ud/search_test.go b/ud/search_test.go index ac14e1a63243fc4d0296768c23420cf1adc03f16..072862a0557ff034ecc8fc9fb9baea7180f3455d 100644 --- a/ud/search_test.go +++ b/ud/search_test.go @@ -487,6 +487,6 @@ func (s *mockSingleSearch) TransmitSingleUse(partner contact.Contact, payload [] return nil } -func (s *mockSingleSearch) StartProcesses() stoppable.Stoppable { - return stoppable.NewSingle("") +func (s *mockSingleSearch) StartProcesses() (stoppable.Stoppable, error) { + return stoppable.NewSingle(""), nil }