Skip to content

Commit c136901

Browse files
authored
Merge pull request #9270 from starius/goroutine-manager-bool
fn: improvements for GoroutineManager
2 parents f4a1299 + 891e962 commit c136901

File tree

2 files changed

+49
-15
lines changed

2 files changed

+49
-15
lines changed

fn/goroutine_manager.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@ package fn
22

33
import (
44
"context"
5-
"errors"
65
"sync"
76
)
87

9-
// ErrStopping is returned when trying to add a new goroutine while stopping.
10-
var ErrStopping = errors.New("can not add goroutine, stopping")
11-
128
// GoroutineManager is used to launch goroutines until context expires or the
139
// manager is stopped. The Stop method blocks until all started goroutines stop.
1410
type GoroutineManager struct {
@@ -29,8 +25,10 @@ func NewGoroutineManager(ctx context.Context) *GoroutineManager {
2925
}
3026
}
3127

32-
// Go starts a new goroutine if the manager is not stopping.
33-
func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
28+
// Go tries to start a new goroutine and returns a boolean indicating its
29+
// success. It fails iff the goroutine manager is stopping or its context passed
30+
// to NewGoroutineManager has expired.
31+
func (g *GoroutineManager) Go(f func(ctx context.Context)) bool {
3432
// Calling wg.Add(1) and wg.Wait() when wg's counter is 0 is a race
3533
// condition, since it is not clear should Wait() block or not. This
3634
// kind of race condition is detected by Go runtime and results in a
@@ -43,7 +41,7 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
4341
defer g.mu.Unlock()
4442

4543
if g.ctx.Err() != nil {
46-
return ErrStopping
44+
return false
4745
}
4846

4947
g.wg.Add(1)
@@ -52,7 +50,7 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
5250
f(g.ctx)
5351
}()
5452

55-
return nil
53+
return true
5654
}
5755

5856
// Stop prevents new goroutines from being added and waits for all running
@@ -66,7 +64,7 @@ func (g *GoroutineManager) Stop() {
6664
// safe, since it can't run in parallel with wg.Add(1) call in Go, since
6765
// we just cancelled the context and even if Go call starts running here
6866
// after acquiring the mutex, it would see that the context has expired
69-
// and return ErrStopping instead of calling wg.Add(1).
67+
// and return false instead of calling wg.Add(1).
7068
g.wg.Wait()
7169
}
7270

fn/goroutine_manager_test.go

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package fn
22

33
import (
44
"context"
5+
"sync"
56
"testing"
67
"time"
78

@@ -19,7 +20,7 @@ func TestGoroutineManager(t *testing.T) {
1920

2021
taskChan := make(chan struct{})
2122

22-
require.NoError(t, m.Go(func(ctx context.Context) {
23+
require.True(t, m.Go(func(ctx context.Context) {
2324
<-taskChan
2425
}))
2526

@@ -37,7 +38,7 @@ func TestGoroutineManager(t *testing.T) {
3738
require.Greater(t, stopDelay, time.Second)
3839

3940
// Make sure new goroutines do not start after Stop.
40-
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping)
41+
require.False(t, m.Go(func(ctx context.Context) {}))
4142

4243
// When Stop() is called, the internal context expires and m.Done() is
4344
// closed. Test this.
@@ -56,7 +57,7 @@ func TestGoroutineManagerContextExpires(t *testing.T) {
5657

5758
m := NewGoroutineManager(ctx)
5859

59-
require.NoError(t, m.Go(func(ctx context.Context) {
60+
require.True(t, m.Go(func(ctx context.Context) {
6061
<-ctx.Done()
6162
}))
6263

@@ -79,7 +80,7 @@ func TestGoroutineManagerContextExpires(t *testing.T) {
7980
}
8081

8182
// Make sure new goroutines do not start after context expiry.
82-
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping)
83+
require.False(t, m.Go(func(ctx context.Context) {}))
8384

8485
// Stop will wait for all goroutines to stop.
8586
m.Stop()
@@ -107,15 +108,50 @@ func TestGoroutineManagerStress(t *testing.T) {
107108
// implementation, this test crashes under `-race`.
108109
for i := 0; i < 100; i++ {
109110
taskChan := make(chan struct{})
110-
err := m.Go(func(ctx context.Context) {
111+
ok := m.Go(func(ctx context.Context) {
111112
close(taskChan)
112113
})
113114
// If goroutine was started, wait for its completion.
114-
if err == nil {
115+
if ok {
115116
<-taskChan
116117
}
117118
}
118119

119120
// Wait for Stop to complete.
120121
<-stopChan
121122
}
123+
124+
// TestGoroutineManagerStopsStress launches many Stop() calls in parallel with a
125+
// task exiting. It attempts to catch a race condition between wg.Done() and
126+
// wg.Wait() calls. According to documentation of wg.Wait() this is acceptable,
127+
// therefore this test passes even with -race.
128+
func TestGoroutineManagerStopsStress(t *testing.T) {
129+
t.Parallel()
130+
131+
m := NewGoroutineManager(context.Background())
132+
133+
// jobChan is used to make the task to finish.
134+
jobChan := make(chan struct{})
135+
136+
// Start a task and wait inside it until we start calling Stop() method.
137+
ok := m.Go(func(ctx context.Context) {
138+
<-jobChan
139+
})
140+
require.True(t, ok)
141+
142+
// Now launch many gorotines calling Stop() method in parallel.
143+
var wg sync.WaitGroup
144+
for i := 0; i < 100; i++ {
145+
wg.Add(1)
146+
go func() {
147+
defer wg.Done()
148+
m.Stop()
149+
}()
150+
}
151+
152+
// Exit the task in parallel with Stop() calls.
153+
close(jobChan)
154+
155+
// Wait until all the Stop() calls complete.
156+
wg.Wait()
157+
}

0 commit comments

Comments
 (0)