diff --git a/stoppable/cleanup.go b/stoppable/cleanup.go index b1e2c4561dc87c2af1ec654d48b787d47ca8182e..0a84235087dfc0d420c8695679f036294ac9460c 100644 --- a/stoppable/cleanup.go +++ b/stoppable/cleanup.go @@ -16,74 +16,78 @@ import ( "time" ) +const nameTag = " with cleanup" + +type CleanFunc func(duration time.Duration) error + // Cleanup wraps any stoppable and runs a callback after to stop for cleanup // behavior. The cleanup is run under the remainder of the timeout but will not // be canceled if the timeout runs out. The cleanup function does not run if the // thread does not stop. type Cleanup struct { stop Stoppable - // the clean function receives how long it has to run before the timeout, - // this is nto expected to be used in most cases - clean func(duration time.Duration) error + // clean receives how long it has to run before the timeout, this is not + // expected to be used in most cases + clean CleanFunc running uint32 once sync.Once } -// NewCleanup creates a new Cleanup from the passed stoppable and function. -func NewCleanup(stop Stoppable, clean func(duration time.Duration) error) *Cleanup { +// NewCleanup creates a new Cleanup from the passed in stoppable and clean +// function. +func NewCleanup(stop Stoppable, clean CleanFunc) *Cleanup { return &Cleanup{ stop: stop, clean: clean, - running: 0, + running: stopped, } } // IsRunning returns true if the thread is still running and its cleanup has // completed. func (c *Cleanup) IsRunning() bool { - return atomic.LoadUint32(&c.running) == 1 + return atomic.LoadUint32(&c.running) == running } // Name returns the name of the stoppable denoting it has cleanup. func (c *Cleanup) Name() string { - return c.stop.Name() + " with cleanup" + return c.stop.Name() + nameTag } -// Close stops the contained stoppable and runs the cleanup function after. The -// cleanup function does not run if the thread does not stop. +// Close stops the wrapped stoppable and after, runs the cleanup function. The +// cleanup function does not run if the thread fails to stop. func (c *Cleanup) Close(timeout time.Duration) error { var err error c.once.Do( func() { - defer atomic.StoreUint32(&c.running, 0) + defer atomic.StoreUint32(&c.running, stopped) start := netTime.Now() - // Run the stoppable + // Close each stoppable if err := c.stop.Close(timeout); err != nil { - err = errors.WithMessagef(err, "Cleanup for %s not executed", + err = errors.WithMessagef(err, "Cleanup not executed for %s", c.stop.Name()) return } // Run the cleanup function with the remaining time as a timeout - elapsed := time.Since(start) + elapsed := netTime.Now().Sub(start) complete := make(chan error, 1) go func() { complete <- c.clean(elapsed) }() - timer := time.NewTimer(elapsed) - select { case err := <-complete: if err != nil { err = errors.WithMessagef(err, "Cleanup for %s failed", c.stop.Name()) } - case <-timer.C: - err = errors.Errorf("Clean up for %s timeout", c.stop.Name()) + case <-time.NewTimer(elapsed).C: + err = errors.Errorf("Clean up for %s timed out after %s", + c.stop.Name(), elapsed) } }) diff --git a/stoppable/cleanup_test.go b/stoppable/cleanup_test.go index 8bc7fe0be09e69b0809cbd97194dbbe58902918f..bc60be5def7b0a6296711262c456da591adfd08e 100644 --- a/stoppable/cleanup_test.go +++ b/stoppable/cleanup_test.go @@ -11,52 +11,68 @@ import ( "testing" ) -// Tests happy path of NewCleanup(). +// Tests that NewCleanup returns a Cleanup that is stopped with the given +// Stoppable. func TestNewCleanup(t *testing.T) { - single := NewSingle("test name") + single := NewSingle("testSingle") cleanup := NewCleanup(single, single.Close) - if cleanup.stop != single || cleanup.running != 0 { - t.Errorf("NewCleanup() returned Single with incorrect values."+ - "\n\texpected: stop: %v running: %d\n\treceived: stop: %v running: %d", - single, cleanup.stop, 0, cleanup.running) + if cleanup.stop != single { + t.Errorf("NewCleanup returned cleanup with incorrect Stoppable."+ + "\nexpected: %+v\nreceived: %+v", single, cleanup.stop) + } + + if cleanup.running != stopped { + t.Errorf("NewMulti returned Multi with incorrect running."+ + "\nexpected: %d\nreceived: %d", stopped, cleanup.running) } } -// Tests happy path of Cleanup.IsRunning(). +// Tests that Cleanup.IsRunning returns the expected value when the Cleanup is +// marked as both running and not running. func TestCleanup_IsRunning(t *testing.T) { - single := NewSingle("test name") + single := NewSingle("threadName") cleanup := NewCleanup(single, single.Close) if cleanup.IsRunning() { - t.Errorf("IsRunning() returned false when it should be running.") + t.Errorf("IsRunning returned the wrong value when running."+ + "\nexpected: %t\nreceived: %t", true, cleanup.IsRunning()) } - cleanup.running = 1 - if !cleanup.IsRunning() { - t.Errorf("IsRunning() returned true when it should not be running.") + cleanup.running = running + if !single.IsRunning() { + t.Errorf("IsRunning returned the wrong value when running."+ + "\nexpected: %t\nreceived: %t", false, single.IsRunning()) } } -// Tests happy path of Cleanup.Name(). +// Unit test of Cleanup.Name. func TestCleanup_Name(t *testing.T) { - name := "test name" + name := "threadName" single := NewSingle(name) cleanup := NewCleanup(single, single.Close) - if name+" with cleanup" != cleanup.Name() { - t.Errorf("Name() returned the incorrect string."+ - "\n\texpected: %s\n\treceived: %s", name+" with cleanup", cleanup.Name()) + if name+nameTag != cleanup.Name() { + t.Errorf("Name did not return the expected name."+ + "\nexpected: %s\nreceived: %s", name+nameTag, cleanup.Name()) } } // Tests happy path of Cleanup.Close(). func TestCleanup_Close(t *testing.T) { - single := NewSingle("test name") + single := NewSingle("threadName") cleanup := NewCleanup(single, single.Close) + // go func() { + // select { + // case <-time.NewTimer(10 * time.Millisecond).C: + // t.Error("Timed out waiting for quit channel.") + // case <-single.Quit(): + // } + // }() + err := cleanup.Close(0) if err != nil { - t.Errorf("Close() returned an error: %v", err) + t.Errorf("Close() returned an error: %+v", err) } } diff --git a/stoppable/multi.go b/stoppable/multi.go index 0636b84fa6e4a3d17e5a74431e42e796d84de45b..5e60ab2d7759d6874f8018a18643a96dc1e7af26 100644 --- a/stoppable/multi.go +++ b/stoppable/multi.go @@ -8,14 +8,17 @@ package stoppable import ( - "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" + "strings" "sync" "sync/atomic" "time" ) +// Error message. +const closeMultiErr = "MultiStopper %s failed to close %d/%d stoppers" + type Multi struct { stoppables []Stoppable name string @@ -24,17 +27,17 @@ type Multi struct { once sync.Once } -// NewMulti returns a new multi stoppable. +// NewMulti returns a new multi Stoppable. func NewMulti(name string) *Multi { return &Multi{ name: name, - running: 1, + running: running, } } -// IsRunning returns true if the thread is still running. +// IsRunning returns true if stoppable is marked as running. func (m *Multi) IsRunning() bool { - return atomic.LoadUint32(&m.running) == 1 + return atomic.LoadUint32(&m.running) == running } // Add adds the given stoppable to the list of stoppables. @@ -44,21 +47,18 @@ func (m *Multi) Add(stoppable Stoppable) { m.mux.Unlock() } -// Name returns the name of the multi stoppable and the names of all stoppables +// Name returns the name of the Multi Stoppable and the names of all stoppables // it contains. func (m *Multi) Name() string { m.mux.RLock() - names := m.name + ": {" - for _, s := range m.stoppables { - names += s.Name() + ", " - } - if len(m.stoppables) > 0 { - names = names[:len(names)-2] + defer m.mux.RUnlock() + + names := make([]string, len(m.stoppables)) + for i, s := range m.stoppables { + names[i] = s.Name() } - names += "}" - m.mux.RUnlock() - return names + return m.name + ": {" + strings.Join(names, ", ") + "}" } // Close closes all child stoppers. It does not return their errors and assumes @@ -67,10 +67,10 @@ func (m *Multi) Close(timeout time.Duration) error { var err error m.once.Do( func() { - atomic.StoreUint32(&m.running, 0) + atomic.StoreUint32(&m.running, stopped) - numErrors := uint32(0) - wg := &sync.WaitGroup{} + var numErrors uint32 + var wg sync.WaitGroup m.mux.Lock() for _, stoppable := range m.stoppables { @@ -87,10 +87,9 @@ func (m *Multi) Close(timeout time.Duration) error { wg.Wait() if numErrors > 0 { - errStr := fmt.Sprintf("MultiStopper %s failed to close "+ - "%v/%v stoppers", m.name, numErrors, len(m.stoppables)) - jww.ERROR.Println(errStr) - err = errors.New(errStr) + err = errors.Errorf( + closeMultiErr, m.name, numErrors, len(m.stoppables)) + jww.ERROR.Print(err.Error()) } }) diff --git a/stoppable/multi_test.go b/stoppable/multi_test.go index 5999f838ae44d0b64a96cd27c0fc1d8e28ee49fd..b633e1d77d4e80317cff8ca938c0ff440e855baa 100644 --- a/stoppable/multi_test.go +++ b/stoppable/multi_test.go @@ -8,107 +8,140 @@ package stoppable import ( + "fmt" "reflect" + "strconv" + "strings" "testing" "time" ) -// Tests happy path of NewMulti(). +// Tests that NewMulti returns a Multi that is running with the given name. func TestNewMulti(t *testing.T) { - name := "test name" + name := "testMulti" multi := NewMulti(name) - if multi.name != name || multi.running != 1 { - t.Errorf("NewMulti() returned Multi with incorrect values."+ - "\n\texpected: name: %s running: %d\n\treceived: name: %s running: %d", - name, 1, multi.name, multi.running) + if multi.name != name { + t.Errorf("NewMulti returned Multi with incorrect name."+ + "\nexpected: %s\nreceived: %s", name, multi.name) + } + + if multi.running != running { + t.Errorf("NewMulti returned Multi with incorrect running."+ + "\nexpected: %d\nreceived: %d", running, multi.running) } } -// Tests happy path of Multi.IsRunning(). +// Tests that Multi.IsRunning returns the expected value when the Multi is +// marked as both running and not running. func TestMulti_IsRunning(t *testing.T) { - multi := NewMulti("name") + multi := NewMulti("testMulti") if !multi.IsRunning() { - t.Errorf("IsRunning() returned false when it should be running.") + t.Errorf("IsRunning returned the wrong value when running."+ + "\nexpected: %t\nreceived: %t", true, multi.IsRunning()) } - multi.running = 0 + multi.running = stopped if multi.IsRunning() { - t.Errorf("IsRunning() returned true when it should not be running.") + t.Errorf("IsRunning returned the wrong value when not running."+ + "\nexpected: %t\nreceived: %t", false, multi.IsRunning()) } } -// Tests happy path of Multi.Add(). +// Tests that Multi.Add adds all the stoppables to the list. func TestMulti_Add(t *testing.T) { - multi := NewMulti("multi name") - singles := []*Single{ - NewSingle("single name 1"), - NewSingle("single name 2"), - NewSingle("single name 3"), + multi := NewMulti("testMulti") + expected := []Stoppable{ + NewSingle("testSingle0"), + NewMulti("testMulti0"), + NewSingle("testSingle1"), + NewMulti("testMulti1"), } - for _, single := range singles { - multi.Add(single) + for _, stoppable := range expected { + multi.Add(stoppable) } - for i, single := range singles { - if !reflect.DeepEqual(single, multi.stoppables[i]) { - t.Errorf("Add() did not add the correct Stoppables."+ - "\n\texpected: %#v\n\treceived: %#v", single, multi.stoppables[i]) - } + if !reflect.DeepEqual(multi.stoppables, expected) { + t.Errorf("Add did not add the correct Stoppables."+ + "\nexpected: %+v\nreceived: %+v", multi.stoppables, expected) } } -// Tests happy path of Multi.Name(). +// Unit test of Multi.Name. func TestMulti_Name(t *testing.T) { - name := "test name" + name := "testMulti" multi := NewMulti(name) - singles := []*Single{ - NewSingle("single name 1"), - NewSingle("single name 2"), - NewSingle("single name 3"), + + // Add stoppables and created list of their names + var nameList []string + for i := 0; i < 10; i++ { + newName := "" + if i%2 == 0 { + newName = "single" + strconv.Itoa(i) + multi.Add(NewSingle(newName)) + } else { + newMulti := NewMulti("multi" + strconv.Itoa(i)) + if i != 5 { + newMulti.Add(NewMulti("multiA")) + newMulti.Add(NewMulti("multiB")) + } + multi.Add(newMulti) + newName = newMulti.Name() + } + nameList = append(nameList, newName) } - expectedNames := []string{ - name + ": {}", - name + ": {" + singles[0].name + "}", - name + ": {" + singles[0].name + ", " + singles[1].name + "}", - name + ": {" + singles[0].name + ", " + singles[1].name + ", " + singles[2].name + "}", + + expected := name + ": {" + strings.Join(nameList, ", ") + "}" + + if multi.Name() != expected { + t.Errorf("Name failed to return the expected string."+ + "\nexpected: %s\nreceived: %s", expected, multi.Name()) } +} - for i, single := range singles { - if expectedNames[i] != multi.Name() { - t.Errorf("Name() returned the incorrect string."+ - "\n\texpected: %s\n\treceived: %s", expectedNames[0], multi.Name()) - } - multi.Add(single) +// Tests that Multi.Name returns the expected string when it has no stoppables. +func TestMulti_Name_NoStoppables(t *testing.T) { + name := "testMulti" + multi := NewMulti(name) + + expected := name + ": {" + "}" + + if multi.Name() != expected { + t.Errorf("Name failed to return the expected string."+ + "\nexpected: %s\nreceived: %s", expected, multi.Name()) } } -// Tests happy path of Multi.Close(). +// Tests that Multi.Close sends on all Single quit channels. func TestMulti_Close(t *testing.T) { - // Create new Multi and add Singles to it - multi := NewMulti("name") + multi := NewMulti("testMulti") singles := []*Single{ - NewSingle("single name 1"), - NewSingle("single name 2"), - NewSingle("single name 3"), + NewSingle("testSingle0"), + NewSingle("testSingle1"), + NewSingle("testSingle2"), + NewSingle("testSingle3"), + NewSingle("testSingle4"), } - for _, single := range singles { + for _, single := range singles[:3] { multi.Add(single) } + subMulti := NewMulti("subMulti") + for _, single := range singles[3:] { + subMulti.Add(single) + } + multi.Add(subMulti) - go func() { - select { - case <-singles[0].quit: - } - select { - case <-singles[1].quit: - } - select { - case <-singles[2].quit: - } - }() + for _, single := range singles { + go func(single *Single) { + select { + case <-time.NewTimer(5 * time.Millisecond).C: + t.Errorf("Single %s failed to quit.", single.Name()) + case <-single.Quit(): + } + }(single) + } err := multi.Close(5 * time.Millisecond) if err != nil { @@ -120,3 +153,46 @@ func TestMulti_Close(t *testing.T) { t.Errorf("Close() returned an error: %v", err) } } + +// Tests that Multi.Close sends on all Single quit channels. +func TestMulti_Close_Error(t *testing.T) { + multi := NewMulti("testMulti") + singles := []*Single{ + NewSingle("testSingle0"), + NewSingle("testSingle1"), + NewSingle("testSingle2"), + NewSingle("testSingle3"), + NewSingle("testSingle4"), + } + for _, single := range singles[:3] { + multi.Add(single) + } + subMulti := NewMulti("subMulti") + for _, single := range singles[3:] { + subMulti.Add(single) + } + multi.Add(subMulti) + + for _, single := range singles[:2] { + go func(single *Single) { + select { + case <-time.NewTimer(5 * time.Millisecond).C: + t.Errorf("Single %s failed to quit.", single.Name()) + case <-single.Quit(): + } + }(single) + } + expectedErr := fmt.Sprintf(closeMultiErr, multi.name, 0, 0) + expectedErr = strings.SplitN(expectedErr, " 0/0", 2)[0] + + err := multi.Close(5 * time.Millisecond) + if err == nil || !strings.Contains(err.Error(), expectedErr) { + t.Errorf("Close() did not return the expected error."+ + "\nexpected: %s\nreceived: %v", expectedErr, err) + } + + err = multi.Close(0) + if err != nil { + t.Errorf("Close() returned an error: %v", err) + } +} diff --git a/stoppable/single.go b/stoppable/single.go index 2e8fa78a2a1c3f4f12752f045cb93c541da8412b..df2a5468c4aa1a348ea3b26c85949cca2b54ec0a 100644 --- a/stoppable/single.go +++ b/stoppable/single.go @@ -15,8 +15,11 @@ import ( "time" ) -// Single allows stopping a single goroutine using a channel. -// It adheres to the stoppable interface. +// Error message. +const closeTimeoutErr = "stopper for %s failed to stop after timeout of %s" + +// Single allows stopping a single goroutine using a channel. It adheres to the +// Stoppable interface. type Single struct { name string quit chan struct{} @@ -24,43 +27,44 @@ type Single struct { once sync.Once } -// NewSingle returns a new single stoppable. +// NewSingle returns a new single Stoppable. func NewSingle(name string) *Single { return &Single{ name: name, quit: make(chan struct{}), - running: 1, + running: running, } } -// IsRunning returns true if the thread is still running. +// IsRunning returns true if stoppable is marked as running. func (s *Single) IsRunning() bool { - return atomic.LoadUint32(&s.running) == 1 + return atomic.LoadUint32(&s.running) == running } -// Quit returns the read only channel it will send the stop signal on. +// Quit returns a receive-only channel that will be triggered when the Stoppable +// quits. func (s *Single) Quit() <-chan struct{} { return s.quit } -// Name returns the name of the thread. This is designed to be +// Name returns the name of the Single Stoppable. func (s *Single) Name() string { return s.name } -// Close signals the thread to time out and closes if it is still running. +// Close signals the Single to close via the quit channel. Returns an error if +// sending on the quit channel times out. func (s *Single) Close(timeout time.Duration) error { var err error s.once.Do(func() { - timer := time.NewTimer(timeout) select { - case <-timer.C: - jww.ERROR.Printf("Stopper for %s failed to stop after "+ - "timeout of %s", s.name, timeout) - err = errors.Errorf("%s failed to close", s.name) + case <-time.NewTimer(timeout).C: + err = errors.Errorf(closeTimeoutErr, s.name, timeout) + jww.ERROR.Print(err.Error()) case s.quit <- struct{}{}: } - atomic.StoreUint32(&s.running, 0) + atomic.StoreUint32(&s.running, stopped) }) + return err } diff --git a/stoppable/single_test.go b/stoppable/single_test.go index ceb5a9ecf6235a5de1884ed4d469f0a46aa0c0f4..4898a4a59008c6ea747f7a54c59ee4b84388b23c 100644 --- a/stoppable/single_test.go +++ b/stoppable/single_test.go @@ -8,96 +8,104 @@ package stoppable import ( + "fmt" "testing" "time" ) -// Tests happy path of NewSingle(). +// Tests that NewSingle returns a Single with the correct name and running. func TestNewSingle(t *testing.T) { - name := "test name" + name := "threadName" single := NewSingle(name) - if single.name != name || single.running != 1 { - t.Errorf("NewSingle() returned Single with incorrect values."+ - "\n\texpected: name: %s running: %d\n\treceived: name: %s running: %d", - name, 1, single.name, single.running) + if single.name != name { + t.Errorf("NewSingle returned Single with incorrect name."+ + "\nexpected: %s\nreceived: %s", name, single.name) + } + + if single.running != running { + t.Errorf("NewSingle returned Single with incorrect running."+ + "\nexpected: %d\nreceived: %d", running, single.running) } } -// Tests happy path of Single.IsRunning(). +// Tests that Single.IsRunning returns the expected value when the Single is +// marked as both running and not running. func TestSingle_IsRunning(t *testing.T) { - single := NewSingle("name") + single := NewSingle("threadName") if !single.IsRunning() { - t.Errorf("IsRunning() returned false when it should be running.") + t.Errorf("IsRunning returned the wrong value when running."+ + "\nexpected: %t\nreceived: %t", true, single.IsRunning()) } - single.running = 0 + single.running = stopped if single.IsRunning() { - t.Errorf("IsRunning() returned true when it should not be running.") + t.Errorf("IsRunning returned the wrong value when not running."+ + "\nexpected: %t\nreceived: %t", false, single.IsRunning()) } } -// Tests happy path of Single.Quit(). +// Tests that Single.Quit returns a channel that is triggered when the Single +// quit channel is triggered. func TestSingle_Quit(t *testing.T) { - single := NewSingle("name") + single := NewSingle("threadName") go func() { - time.Sleep(150 * time.Nanosecond) - single.quit <- struct{}{} + select { + case <-time.NewTimer(5 * time.Millisecond).C: + t.Error("Timed out waiting for quit channel.") + case <-single.Quit(): + } }() - timer := time.NewTimer(2 * time.Millisecond) - select { - case <-timer.C: - t.Errorf("Quit signal not received.") - case <-single.quit: - } + single.quit <- struct{}{} } -// Tests happy path of Single.Name(). +// Unit test of Single.Name. func TestSingle_Name(t *testing.T) { - name := "test name" + name := "threadName" single := NewSingle(name) if name != single.Name() { - t.Errorf("Name() returned the incorrect string."+ - "\n\texpected: %s\n\treceived: %s", name, single.Name()) + t.Errorf("Name did not return the expected name."+ + "\nexpected: %s\nreceived: %s", name, single.Name()) } } // Test happy path of Single.Close(). func TestSingle_Close(t *testing.T) { - single := NewSingle("name") + single := NewSingle("threadName") go func() { - time.Sleep(150 * time.Nanosecond) select { - case <-single.quit: + case <-time.NewTimer(10 * time.Millisecond).C: + t.Error("Timed out waiting for quit channel.") + case <-single.Quit(): } }() err := single.Close(5 * time.Millisecond) if err != nil { - t.Errorf("Close() returned an error: %v", err) + t.Errorf("Close returned an error: %v", err) } } -// Tests that Single.Close() returns an error when the timeout is reached. +// Error path: tests that Single.Close returns an error when the timeout is +// reached. func TestSingle_Close_Error(t *testing.T) { - single := NewSingle("name") - expectedErr := single.name + " failed to close" + single := NewSingle("threadName") + timeout := time.Millisecond + expectedErr := fmt.Sprintf(closeTimeoutErr, single.Name(), timeout) go func() { - time.Sleep(3 * time.Millisecond) - select { - case <-single.quit: - } + time.Sleep(5 * time.Millisecond) + <-single.Quit() }() - err := single.Close(2 * time.Millisecond) - if err == nil { - t.Errorf("Close() did not return the expected error."+ - "\n\texpected: %v\n\treceived: %v", expectedErr, err) + err := single.Close(timeout) + if err == nil || err.Error() != expectedErr { + t.Errorf("Close did not return the expected error."+ + "\nexpected: %s\nreceived: %v", expectedErr, err) } } diff --git a/stoppable/stoppable.go b/stoppable/stoppable.go index 06947eb3bf5ff5ae3523b927e8fb7792848fd762..b3b219edf9883e12cc301c8570199c35e40bbb27 100644 --- a/stoppable/stoppable.go +++ b/stoppable/stoppable.go @@ -9,7 +9,12 @@ package stoppable import "time" -// Interface for stopping a goroutine. +const ( + stopped = 0 + running = 1 +) + +// Stoppable interface for stopping a goroutine. type Stoppable interface { Close(timeout time.Duration) error IsRunning() bool