Skip to content
Snippets Groups Projects
Commit 61d09121 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

added stoppable

parent 60e0ed72
No related branches found
No related tags found
No related merge requests found
package stoppable
import (
"github.com/pkg/errors"
"sync"
"sync/atomic"
"time"
)
type Multi struct {
stoppables []Stoppable
name string
running *uint32
errors []error
mux sync.RWMutex
}
//returns a new multi stoppable
func NewMulti(name string) *Multi {
running := uint32(1)
return &Multi{
name: name,
running: &running,
}
}
// returns true if the thread is still running
func (m *Multi) IsRunning() bool {
return atomic.LoadUint32(m.running) == 1
}
// adds the given stoppable to the list of stoppables
func (m *Multi) Add(stoppable Stoppable) {
m.mux.Lock()
m.stoppables = append(m.stoppables, stoppable)
m.mux.Unlock()
}
// returns the name of the multi stoppable and the names of all stoppables it
// contains
func (m *Multi) Name() string {
m.mux.RLock()
names := m.name + ": {"
for _, s := range m.stoppables {
names += s.Name() + ", "
}
if len(m.stoppables) > 0 {
names = names[:len(names)-2]
}
names += "}"
m.mux.RUnlock()
return names
}
// closes all child stoppers. It does not return their errors and assumes they
// print them to the log
func (m *Multi) Close(timeout time.Duration) error {
if !m.IsRunning() {
return nil
}
m.mux.Lock()
defer m.mux.Unlock()
numErrors := uint32(0)
wg := &sync.WaitGroup{}
for _, stopable := range m.stoppables {
wg.Add(1)
go func() {
if stopable.Close(timeout) != nil {
atomic.AddUint32(&numErrors, 1)
}
wg.Done()
}()
}
wg.Wait()
atomic.StoreUint32(m.running, 0)
if numErrors > 0 {
return errors.Errorf("MultiStopper %s failed to close "+
"%v/%v stoppers", m.name, numErrors, len(m.stoppables))
}
return nil
}
package stoppable
import (
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"sync/atomic"
"time"
)
// Single allows stopping a single goroutine using a channel
// adheres to the stoppable interface
type Single struct {
name string
quit chan struct{}
running *uint32
}
//returns a new single stoppable
func NewSingle(name string) *Single {
running := uint32(1)
return &Single{
name: name,
quit: make(chan struct{}),
running: &running,
}
}
// returns true if the thread is still running
func (s *Single) IsRunning() bool {
return atomic.LoadUint32(s.running) == 1
}
// returns the read only channel it will send the stop signal on
func (s *Single) Sigal() bool {
return atomic.LoadUint32(s.running) == 1
}
// returns the name of the thread. This is designed to be
func (s *Single) Name() string {
return s.name
}
// Close signals thread to time out and closes if it is still running.
func (s *Single) Close(timeout time.Duration) error {
if !s.IsRunning() {
return nil
}
defer atomic.StoreUint32(s.running, 0)
timer := time.NewTimer(timeout)
select {
case <-timer.C:
jww.ERROR.Printf("Stopper for %s failed to stop after "+
"timeout of %s", s.name, timeout)
return errors.Errorf("%s failed to close", s.name)
case <-s.quit:
return nil
}
}
package stoppable
import "time"
// Interface for stopping a goroutine
type Stoppable interface {
Close(timeout time.Duration) error
IsRunning() bool
Name() string
}
...@@ -36,6 +36,8 @@ func (u *User) SetUsername(username string) error { ...@@ -36,6 +36,8 @@ func (u *User) SetUsername(username string) error {
return errors.WithMessage(err, "Failed to store the username") return errors.WithMessage(err, "Failed to store the username")
} }
u.username = username
return nil return nil
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment