Skip to content
Snippets Groups Projects
Commit 6b07f4fa authored by Jono Wenger's avatar Jono Wenger
Browse files

Clean up stoppable

parent 55b44efd
No related branches found
No related tags found
1 merge request!23Release
......@@ -16,74 +16,78 @@ import (
"time"
)
const nameTag = " with cleanup"
type CleanFunc func(duration time.Duration) error
// Cleanup wraps any stoppable and runs a callback after to stop for cleanup
// behavior. The cleanup is run under the remainder of the timeout but will not
// be canceled if the timeout runs out. The cleanup function does not run if the
// thread does not stop.
type Cleanup struct {
stop Stoppable
// the clean function receives how long it has to run before the timeout,
// this is nto expected to be used in most cases
clean func(duration time.Duration) error
// clean receives how long it has to run before the timeout, this is not
// expected to be used in most cases
clean CleanFunc
running uint32
once sync.Once
}
// NewCleanup creates a new Cleanup from the passed stoppable and function.
func NewCleanup(stop Stoppable, clean func(duration time.Duration) error) *Cleanup {
// NewCleanup creates a new Cleanup from the passed in stoppable and clean
// function.
func NewCleanup(stop Stoppable, clean CleanFunc) *Cleanup {
return &Cleanup{
stop: stop,
clean: clean,
running: 0,
running: stopped,
}
}
// IsRunning returns true if the thread is still running and its cleanup has
// completed.
func (c *Cleanup) IsRunning() bool {
return atomic.LoadUint32(&c.running) == 1
return atomic.LoadUint32(&c.running) == running
}
// Name returns the name of the stoppable denoting it has cleanup.
func (c *Cleanup) Name() string {
return c.stop.Name() + " with cleanup"
return c.stop.Name() + nameTag
}
// Close stops the contained stoppable and runs the cleanup function after. The
// cleanup function does not run if the thread does not stop.
// Close stops the wrapped stoppable and after, runs the cleanup function. The
// cleanup function does not run if the thread fails to stop.
func (c *Cleanup) Close(timeout time.Duration) error {
var err error
c.once.Do(
func() {
defer atomic.StoreUint32(&c.running, 0)
defer atomic.StoreUint32(&c.running, stopped)
start := netTime.Now()
// Run the stoppable
// Close each stoppable
if err := c.stop.Close(timeout); err != nil {
err = errors.WithMessagef(err, "Cleanup for %s not executed",
err = errors.WithMessagef(err, "Cleanup not executed for %s",
c.stop.Name())
return
}
// Run the cleanup function with the remaining time as a timeout
elapsed := time.Since(start)
elapsed := netTime.Now().Sub(start)
complete := make(chan error, 1)
go func() {
complete <- c.clean(elapsed)
}()
timer := time.NewTimer(elapsed)
select {
case err := <-complete:
if err != nil {
err = errors.WithMessagef(err, "Cleanup for %s failed",
c.stop.Name())
}
case <-timer.C:
err = errors.Errorf("Clean up for %s timeout", c.stop.Name())
case <-time.NewTimer(elapsed).C:
err = errors.Errorf("Clean up for %s timed out after %s",
c.stop.Name(), elapsed)
}
})
......
......@@ -11,52 +11,68 @@ import (
"testing"
)
// Tests happy path of NewCleanup().
// Tests that NewCleanup returns a Cleanup that is stopped with the given
// Stoppable.
func TestNewCleanup(t *testing.T) {
single := NewSingle("test name")
single := NewSingle("testSingle")
cleanup := NewCleanup(single, single.Close)
if cleanup.stop != single || cleanup.running != 0 {
t.Errorf("NewCleanup() returned Single with incorrect values."+
"\n\texpected: stop: %v running: %d\n\treceived: stop: %v running: %d",
single, cleanup.stop, 0, cleanup.running)
if cleanup.stop != single {
t.Errorf("NewCleanup returned cleanup with incorrect Stoppable."+
"\nexpected: %+v\nreceived: %+v", single, cleanup.stop)
}
if cleanup.running != stopped {
t.Errorf("NewMulti returned Multi with incorrect running."+
"\nexpected: %d\nreceived: %d", stopped, cleanup.running)
}
}
// Tests happy path of Cleanup.IsRunning().
// Tests that Cleanup.IsRunning returns the expected value when the Cleanup is
// marked as both running and not running.
func TestCleanup_IsRunning(t *testing.T) {
single := NewSingle("test name")
single := NewSingle("threadName")
cleanup := NewCleanup(single, single.Close)
if cleanup.IsRunning() {
t.Errorf("IsRunning() returned false when it should be running.")
t.Errorf("IsRunning returned the wrong value when running."+
"\nexpected: %t\nreceived: %t", true, cleanup.IsRunning())
}
cleanup.running = 1
if !cleanup.IsRunning() {
t.Errorf("IsRunning() returned true when it should not be running.")
cleanup.running = running
if !single.IsRunning() {
t.Errorf("IsRunning returned the wrong value when running."+
"\nexpected: %t\nreceived: %t", false, single.IsRunning())
}
}
// Tests happy path of Cleanup.Name().
// Unit test of Cleanup.Name.
func TestCleanup_Name(t *testing.T) {
name := "test name"
name := "threadName"
single := NewSingle(name)
cleanup := NewCleanup(single, single.Close)
if name+" with cleanup" != cleanup.Name() {
t.Errorf("Name() returned the incorrect string."+
"\n\texpected: %s\n\treceived: %s", name+" with cleanup", cleanup.Name())
if name+nameTag != cleanup.Name() {
t.Errorf("Name did not return the expected name."+
"\nexpected: %s\nreceived: %s", name+nameTag, cleanup.Name())
}
}
// Tests happy path of Cleanup.Close().
func TestCleanup_Close(t *testing.T) {
single := NewSingle("test name")
single := NewSingle("threadName")
cleanup := NewCleanup(single, single.Close)
// go func() {
// select {
// case <-time.NewTimer(10 * time.Millisecond).C:
// t.Error("Timed out waiting for quit channel.")
// case <-single.Quit():
// }
// }()
err := cleanup.Close(0)
if err != nil {
t.Errorf("Close() returned an error: %v", err)
t.Errorf("Close() returned an error: %+v", err)
}
}
......@@ -8,14 +8,17 @@
package stoppable
import (
"fmt"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"strings"
"sync"
"sync/atomic"
"time"
)
// Error message.
const closeMultiErr = "MultiStopper %s failed to close %d/%d stoppers"
type Multi struct {
stoppables []Stoppable
name string
......@@ -24,17 +27,17 @@ type Multi struct {
once sync.Once
}
// NewMulti returns a new multi stoppable.
// NewMulti returns a new multi Stoppable.
func NewMulti(name string) *Multi {
return &Multi{
name: name,
running: 1,
running: running,
}
}
// IsRunning returns true if the thread is still running.
// IsRunning returns true if stoppable is marked as running.
func (m *Multi) IsRunning() bool {
return atomic.LoadUint32(&m.running) == 1
return atomic.LoadUint32(&m.running) == running
}
// Add adds the given stoppable to the list of stoppables.
......@@ -44,21 +47,18 @@ func (m *Multi) Add(stoppable Stoppable) {
m.mux.Unlock()
}
// Name returns the name of the multi stoppable and the names of all stoppables
// Name returns the name of the Multi Stoppable and the names of all stoppables
// it contains.
func (m *Multi) Name() string {
m.mux.RLock()
names := m.name + ": {"
for _, s := range m.stoppables {
names += s.Name() + ", "
}
if len(m.stoppables) > 0 {
names = names[:len(names)-2]
defer m.mux.RUnlock()
names := make([]string, len(m.stoppables))
for i, s := range m.stoppables {
names[i] = s.Name()
}
names += "}"
m.mux.RUnlock()
return names
return m.name + ": {" + strings.Join(names, ", ") + "}"
}
// Close closes all child stoppers. It does not return their errors and assumes
......@@ -67,10 +67,10 @@ func (m *Multi) Close(timeout time.Duration) error {
var err error
m.once.Do(
func() {
atomic.StoreUint32(&m.running, 0)
atomic.StoreUint32(&m.running, stopped)
numErrors := uint32(0)
wg := &sync.WaitGroup{}
var numErrors uint32
var wg sync.WaitGroup
m.mux.Lock()
for _, stoppable := range m.stoppables {
......@@ -87,10 +87,9 @@ func (m *Multi) Close(timeout time.Duration) error {
wg.Wait()
if numErrors > 0 {
errStr := fmt.Sprintf("MultiStopper %s failed to close "+
"%v/%v stoppers", m.name, numErrors, len(m.stoppables))
jww.ERROR.Println(errStr)
err = errors.New(errStr)
err = errors.Errorf(
closeMultiErr, m.name, numErrors, len(m.stoppables))
jww.ERROR.Print(err.Error())
}
})
......
......@@ -8,107 +8,140 @@
package stoppable
import (
"fmt"
"reflect"
"strconv"
"strings"
"testing"
"time"
)
// Tests happy path of NewMulti().
// Tests that NewMulti returns a Multi that is running with the given name.
func TestNewMulti(t *testing.T) {
name := "test name"
name := "testMulti"
multi := NewMulti(name)
if multi.name != name || multi.running != 1 {
t.Errorf("NewMulti() returned Multi with incorrect values."+
"\n\texpected: name: %s running: %d\n\treceived: name: %s running: %d",
name, 1, multi.name, multi.running)
if multi.name != name {
t.Errorf("NewMulti returned Multi with incorrect name."+
"\nexpected: %s\nreceived: %s", name, multi.name)
}
if multi.running != running {
t.Errorf("NewMulti returned Multi with incorrect running."+
"\nexpected: %d\nreceived: %d", running, multi.running)
}
}
// Tests happy path of Multi.IsRunning().
// Tests that Multi.IsRunning returns the expected value when the Multi is
// marked as both running and not running.
func TestMulti_IsRunning(t *testing.T) {
multi := NewMulti("name")
multi := NewMulti("testMulti")
if !multi.IsRunning() {
t.Errorf("IsRunning() returned false when it should be running.")
t.Errorf("IsRunning returned the wrong value when running."+
"\nexpected: %t\nreceived: %t", true, multi.IsRunning())
}
multi.running = 0
multi.running = stopped
if multi.IsRunning() {
t.Errorf("IsRunning() returned true when it should not be running.")
t.Errorf("IsRunning returned the wrong value when not running."+
"\nexpected: %t\nreceived: %t", false, multi.IsRunning())
}
}
// Tests happy path of Multi.Add().
// Tests that Multi.Add adds all the stoppables to the list.
func TestMulti_Add(t *testing.T) {
multi := NewMulti("multi name")
singles := []*Single{
NewSingle("single name 1"),
NewSingle("single name 2"),
NewSingle("single name 3"),
multi := NewMulti("testMulti")
expected := []Stoppable{
NewSingle("testSingle0"),
NewMulti("testMulti0"),
NewSingle("testSingle1"),
NewMulti("testMulti1"),
}
for _, single := range singles {
multi.Add(single)
for _, stoppable := range expected {
multi.Add(stoppable)
}
for i, single := range singles {
if !reflect.DeepEqual(single, multi.stoppables[i]) {
t.Errorf("Add() did not add the correct Stoppables."+
"\n\texpected: %#v\n\treceived: %#v", single, multi.stoppables[i])
}
if !reflect.DeepEqual(multi.stoppables, expected) {
t.Errorf("Add did not add the correct Stoppables."+
"\nexpected: %+v\nreceived: %+v", multi.stoppables, expected)
}
}
// Tests happy path of Multi.Name().
// Unit test of Multi.Name.
func TestMulti_Name(t *testing.T) {
name := "test name"
name := "testMulti"
multi := NewMulti(name)
singles := []*Single{
NewSingle("single name 1"),
NewSingle("single name 2"),
NewSingle("single name 3"),
// Add stoppables and created list of their names
var nameList []string
for i := 0; i < 10; i++ {
newName := ""
if i%2 == 0 {
newName = "single" + strconv.Itoa(i)
multi.Add(NewSingle(newName))
} else {
newMulti := NewMulti("multi" + strconv.Itoa(i))
if i != 5 {
newMulti.Add(NewMulti("multiA"))
newMulti.Add(NewMulti("multiB"))
}
multi.Add(newMulti)
newName = newMulti.Name()
}
nameList = append(nameList, newName)
}
expectedNames := []string{
name + ": {}",
name + ": {" + singles[0].name + "}",
name + ": {" + singles[0].name + ", " + singles[1].name + "}",
name + ": {" + singles[0].name + ", " + singles[1].name + ", " + singles[2].name + "}",
expected := name + ": {" + strings.Join(nameList, ", ") + "}"
if multi.Name() != expected {
t.Errorf("Name failed to return the expected string."+
"\nexpected: %s\nreceived: %s", expected, multi.Name())
}
}
for i, single := range singles {
if expectedNames[i] != multi.Name() {
t.Errorf("Name() returned the incorrect string."+
"\n\texpected: %s\n\treceived: %s", expectedNames[0], multi.Name())
}
multi.Add(single)
// Tests that Multi.Name returns the expected string when it has no stoppables.
func TestMulti_Name_NoStoppables(t *testing.T) {
name := "testMulti"
multi := NewMulti(name)
expected := name + ": {" + "}"
if multi.Name() != expected {
t.Errorf("Name failed to return the expected string."+
"\nexpected: %s\nreceived: %s", expected, multi.Name())
}
}
// Tests happy path of Multi.Close().
// Tests that Multi.Close sends on all Single quit channels.
func TestMulti_Close(t *testing.T) {
// Create new Multi and add Singles to it
multi := NewMulti("name")
multi := NewMulti("testMulti")
singles := []*Single{
NewSingle("single name 1"),
NewSingle("single name 2"),
NewSingle("single name 3"),
NewSingle("testSingle0"),
NewSingle("testSingle1"),
NewSingle("testSingle2"),
NewSingle("testSingle3"),
NewSingle("testSingle4"),
}
for _, single := range singles {
for _, single := range singles[:3] {
multi.Add(single)
}
subMulti := NewMulti("subMulti")
for _, single := range singles[3:] {
subMulti.Add(single)
}
multi.Add(subMulti)
go func() {
select {
case <-singles[0].quit:
}
select {
case <-singles[1].quit:
}
select {
case <-singles[2].quit:
}
}()
for _, single := range singles {
go func(single *Single) {
select {
case <-time.NewTimer(5 * time.Millisecond).C:
t.Errorf("Single %s failed to quit.", single.Name())
case <-single.Quit():
}
}(single)
}
err := multi.Close(5 * time.Millisecond)
if err != nil {
......@@ -120,3 +153,46 @@ func TestMulti_Close(t *testing.T) {
t.Errorf("Close() returned an error: %v", err)
}
}
// Tests that Multi.Close sends on all Single quit channels.
func TestMulti_Close_Error(t *testing.T) {
multi := NewMulti("testMulti")
singles := []*Single{
NewSingle("testSingle0"),
NewSingle("testSingle1"),
NewSingle("testSingle2"),
NewSingle("testSingle3"),
NewSingle("testSingle4"),
}
for _, single := range singles[:3] {
multi.Add(single)
}
subMulti := NewMulti("subMulti")
for _, single := range singles[3:] {
subMulti.Add(single)
}
multi.Add(subMulti)
for _, single := range singles[:2] {
go func(single *Single) {
select {
case <-time.NewTimer(5 * time.Millisecond).C:
t.Errorf("Single %s failed to quit.", single.Name())
case <-single.Quit():
}
}(single)
}
expectedErr := fmt.Sprintf(closeMultiErr, multi.name, 0, 0)
expectedErr = strings.SplitN(expectedErr, " 0/0", 2)[0]
err := multi.Close(5 * time.Millisecond)
if err == nil || !strings.Contains(err.Error(), expectedErr) {
t.Errorf("Close() did not return the expected error."+
"\nexpected: %s\nreceived: %v", expectedErr, err)
}
err = multi.Close(0)
if err != nil {
t.Errorf("Close() returned an error: %v", err)
}
}
......@@ -15,8 +15,11 @@ import (
"time"
)
// Single allows stopping a single goroutine using a channel.
// It adheres to the stoppable interface.
// Error message.
const closeTimeoutErr = "stopper for %s failed to stop after timeout of %s"
// Single allows stopping a single goroutine using a channel. It adheres to the
// Stoppable interface.
type Single struct {
name string
quit chan struct{}
......@@ -24,43 +27,44 @@ type Single struct {
once sync.Once
}
// NewSingle returns a new single stoppable.
// NewSingle returns a new single Stoppable.
func NewSingle(name string) *Single {
return &Single{
name: name,
quit: make(chan struct{}),
running: 1,
running: running,
}
}
// IsRunning returns true if the thread is still running.
// IsRunning returns true if stoppable is marked as running.
func (s *Single) IsRunning() bool {
return atomic.LoadUint32(&s.running) == 1
return atomic.LoadUint32(&s.running) == running
}
// Quit returns the read only channel it will send the stop signal on.
// Quit returns a receive-only channel that will be triggered when the Stoppable
// quits.
func (s *Single) Quit() <-chan struct{} {
return s.quit
}
// Name returns the name of the thread. This is designed to be
// Name returns the name of the Single Stoppable.
func (s *Single) Name() string {
return s.name
}
// Close signals the thread to time out and closes if it is still running.
// Close signals the Single to close via the quit channel. Returns an error if
// sending on the quit channel times out.
func (s *Single) Close(timeout time.Duration) error {
var err error
s.once.Do(func() {
timer := time.NewTimer(timeout)
select {
case <-timer.C:
jww.ERROR.Printf("Stopper for %s failed to stop after "+
"timeout of %s", s.name, timeout)
err = errors.Errorf("%s failed to close", s.name)
case <-time.NewTimer(timeout).C:
err = errors.Errorf(closeTimeoutErr, s.name, timeout)
jww.ERROR.Print(err.Error())
case s.quit <- struct{}{}:
}
atomic.StoreUint32(&s.running, 0)
atomic.StoreUint32(&s.running, stopped)
})
return err
}
......@@ -8,96 +8,104 @@
package stoppable
import (
"fmt"
"testing"
"time"
)
// Tests happy path of NewSingle().
// Tests that NewSingle returns a Single with the correct name and running.
func TestNewSingle(t *testing.T) {
name := "test name"
name := "threadName"
single := NewSingle(name)
if single.name != name || single.running != 1 {
t.Errorf("NewSingle() returned Single with incorrect values."+
"\n\texpected: name: %s running: %d\n\treceived: name: %s running: %d",
name, 1, single.name, single.running)
if single.name != name {
t.Errorf("NewSingle returned Single with incorrect name."+
"\nexpected: %s\nreceived: %s", name, single.name)
}
if single.running != running {
t.Errorf("NewSingle returned Single with incorrect running."+
"\nexpected: %d\nreceived: %d", running, single.running)
}
}
// Tests happy path of Single.IsRunning().
// Tests that Single.IsRunning returns the expected value when the Single is
// marked as both running and not running.
func TestSingle_IsRunning(t *testing.T) {
single := NewSingle("name")
single := NewSingle("threadName")
if !single.IsRunning() {
t.Errorf("IsRunning() returned false when it should be running.")
t.Errorf("IsRunning returned the wrong value when running."+
"\nexpected: %t\nreceived: %t", true, single.IsRunning())
}
single.running = 0
single.running = stopped
if single.IsRunning() {
t.Errorf("IsRunning() returned true when it should not be running.")
t.Errorf("IsRunning returned the wrong value when not running."+
"\nexpected: %t\nreceived: %t", false, single.IsRunning())
}
}
// Tests happy path of Single.Quit().
// Tests that Single.Quit returns a channel that is triggered when the Single
// quit channel is triggered.
func TestSingle_Quit(t *testing.T) {
single := NewSingle("name")
single := NewSingle("threadName")
go func() {
time.Sleep(150 * time.Nanosecond)
single.quit <- struct{}{}
select {
case <-time.NewTimer(5 * time.Millisecond).C:
t.Error("Timed out waiting for quit channel.")
case <-single.Quit():
}
}()
timer := time.NewTimer(2 * time.Millisecond)
select {
case <-timer.C:
t.Errorf("Quit signal not received.")
case <-single.quit:
}
single.quit <- struct{}{}
}
// Tests happy path of Single.Name().
// Unit test of Single.Name.
func TestSingle_Name(t *testing.T) {
name := "test name"
name := "threadName"
single := NewSingle(name)
if name != single.Name() {
t.Errorf("Name() returned the incorrect string."+
"\n\texpected: %s\n\treceived: %s", name, single.Name())
t.Errorf("Name did not return the expected name."+
"\nexpected: %s\nreceived: %s", name, single.Name())
}
}
// Test happy path of Single.Close().
func TestSingle_Close(t *testing.T) {
single := NewSingle("name")
single := NewSingle("threadName")
go func() {
time.Sleep(150 * time.Nanosecond)
select {
case <-single.quit:
case <-time.NewTimer(10 * time.Millisecond).C:
t.Error("Timed out waiting for quit channel.")
case <-single.Quit():
}
}()
err := single.Close(5 * time.Millisecond)
if err != nil {
t.Errorf("Close() returned an error: %v", err)
t.Errorf("Close returned an error: %v", err)
}
}
// Tests that Single.Close() returns an error when the timeout is reached.
// Error path: tests that Single.Close returns an error when the timeout is
// reached.
func TestSingle_Close_Error(t *testing.T) {
single := NewSingle("name")
expectedErr := single.name + " failed to close"
single := NewSingle("threadName")
timeout := time.Millisecond
expectedErr := fmt.Sprintf(closeTimeoutErr, single.Name(), timeout)
go func() {
time.Sleep(3 * time.Millisecond)
select {
case <-single.quit:
}
time.Sleep(5 * time.Millisecond)
<-single.Quit()
}()
err := single.Close(2 * time.Millisecond)
if err == nil {
t.Errorf("Close() did not return the expected error."+
"\n\texpected: %v\n\treceived: %v", expectedErr, err)
err := single.Close(timeout)
if err == nil || err.Error() != expectedErr {
t.Errorf("Close did not return the expected error."+
"\nexpected: %s\nreceived: %v", expectedErr, err)
}
}
......@@ -9,7 +9,12 @@ package stoppable
import "time"
// Interface for stopping a goroutine.
const (
stopped = 0
running = 1
)
// Stoppable interface for stopping a goroutine.
type Stoppable interface {
Close(timeout time.Duration) error
IsRunning() bool
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment