Skip to content

Commit 5500363

Browse files
committed
fixed start func may lock while close wait stop backgrounds
1 parent 4a46a77 commit 5500363

File tree

2 files changed

+127
-54
lines changed

2 files changed

+127
-54
lines changed

internal/background/worker.go

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"sync"
88

99
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
10-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xatomic"
1110
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1211
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1312
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
@@ -24,9 +23,13 @@ type Worker struct {
2423
workers sync.WaitGroup
2524
onceInit sync.Once
2625

26+
tasksCompleted empty.Chan
27+
2728
m xsync.Mutex
2829

29-
closed xatomic.Bool
30+
tasks chan backgroundTask
31+
32+
closed bool
3033
stop xcontext.CancelErrFunc
3134
closeReason error
3235
}
@@ -47,27 +50,17 @@ func (b *Worker) Context() context.Context {
4750
}
4851

4952
func (b *Worker) Start(name string, f CallbackFunc) {
50-
if b.closed.Load() {
51-
return
52-
}
53-
5453
b.init()
5554

56-
if b.ctx.Err() != nil {
57-
return
58-
}
59-
6055
b.m.WithLock(func() {
61-
if b.closed.Load() {
56+
if b.closed {
6257
return
6358
}
64-
b.workers.Add(1)
6559

66-
go func() {
67-
defer b.workers.Done()
68-
69-
pprof.Do(b.ctx, pprof.Labels("background", name), f)
70-
}()
60+
b.tasks <- backgroundTask{
61+
callback: f,
62+
name: name,
63+
}
7164
})
7265
}
7366

@@ -78,27 +71,34 @@ func (b *Worker) Done() <-chan struct{} {
7871
}
7972

8073
func (b *Worker) Close(ctx context.Context, err error) error {
81-
if b.closed.Swap(true) {
82-
return xerrors.WithStackTrace(ErrAlreadyClosed)
83-
}
84-
8574
b.init()
8675

76+
var resErr error
8777
b.m.WithLock(func() {
78+
if b.closed {
79+
resErr = xerrors.WithStackTrace(ErrAlreadyClosed)
80+
return
81+
}
82+
83+
b.closed = true
84+
85+
close(b.tasks)
8886
b.closeReason = err
8987
if b.closeReason == nil {
9088
b.closeReason = errClosedWithNilReason
9189
}
9290

9391
b.stop(err)
9492
})
93+
if resErr != nil {
94+
return resErr
95+
}
96+
97+
<-b.tasksCompleted
9598

9699
bgCompleted := make(empty.Chan)
97100

98101
go func() {
99-
b.m.Lock()
100-
defer b.m.Unlock()
101-
102102
b.workers.Wait()
103103
close(bgCompleted)
104104
}()
@@ -123,5 +123,27 @@ func (b *Worker) init() {
123123
if b.ctx == nil {
124124
b.ctx, b.stop = xcontext.WithErrCancel(context.Background())
125125
}
126+
b.tasks = make(chan backgroundTask)
127+
b.tasksCompleted = make(empty.Chan)
128+
go b.starterLoop()
126129
})
127130
}
131+
132+
func (b *Worker) starterLoop() {
133+
defer close(b.tasksCompleted)
134+
135+
for bgTask := range b.tasks {
136+
b.workers.Add(1)
137+
138+
go func() {
139+
defer b.workers.Done()
140+
141+
pprof.Do(b.ctx, pprof.Labels("background", bgTask.name), bgTask.callback)
142+
}()
143+
}
144+
}
145+
146+
type backgroundTask struct {
147+
callback CallbackFunc
148+
name string
149+
}

internal/background/worker_test.go

Lines changed: 81 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -95,44 +95,95 @@ func TestWorkerClose(t *testing.T) {
9595
}
9696

9797
func TestWorkerConcurrentStartAndClose(t *testing.T) {
98-
targetClose := int64(10000)
99-
parallel := 10
98+
xtest.TestManyTimes(t, func(t testing.TB) {
99+
targetClose := int64(10000)
100+
parallel := 10
100101

101-
var counter int64
102+
var counter int64
102103

103-
ctx := xtest.Context(t)
104-
w := NewWorker(ctx)
104+
ctx := xtest.Context(t)
105+
w := NewWorker(ctx)
106+
107+
closeIndex := int64(0)
108+
closed := make(empty.Chan)
109+
go func() {
110+
xtest.SpinWaitCondition(t, nil, func() bool {
111+
return atomic.LoadInt64(&counter) > targetClose
112+
})
113+
require.NoError(t, w.Close(ctx, nil))
114+
closeIndex = atomic.LoadInt64(&counter)
115+
close(closed)
116+
}()
117+
118+
stopNewStarts := xatomic.Bool{}
119+
for i := 0; i < parallel; i++ {
120+
go func() {
121+
for {
122+
if stopNewStarts.Load() {
123+
return
124+
}
125+
126+
go func() {
127+
w.Start("test", func(ctx context.Context) {
128+
atomic.AddInt64(&counter, 1)
129+
})
130+
}()
131+
}
132+
}()
133+
}
134+
135+
xtest.WaitChannelClosed(t, closed)
136+
runtime.Gosched()
137+
require.Equal(t, closeIndex, atomic.LoadInt64(&counter))
138+
stopNewStarts.Store(true)
139+
})
140+
}
141+
142+
func TestWorkerStartCompletedWhileLongWait(t *testing.T) {
143+
xtest.TestManyTimes(t, func(t testing.TB) {
144+
ctx := xtest.Context(t)
145+
w := NewWorker(ctx)
146+
147+
allowStop := make(empty.Chan)
148+
closeStarted := make(empty.Chan)
149+
w.Start("test", func(ctx context.Context) {
150+
<-ctx.Done()
151+
close(closeStarted)
105152

106-
closeIndex := int64(0)
107-
closed := make(empty.Chan)
108-
go func() {
109-
xtest.SpinWaitCondition(t, nil, func() bool {
110-
return atomic.LoadInt64(&counter) > targetClose
153+
<-allowStop
111154
})
112-
require.NoError(t, w.Close(ctx, nil))
113-
closeIndex = atomic.LoadInt64(&counter)
114-
close(closed)
115-
}()
116155

117-
stopNewStarts := xatomic.Bool{}
118-
for i := 0; i < parallel; i++ {
156+
closed := make(empty.Chan)
157+
158+
callStartFinished := make(empty.Chan)
119159
go func() {
120-
for {
121-
if stopNewStarts.Load() {
122-
return
123-
}
160+
defer close(callStartFinished)
161+
start := time.Now()
124162

125-
go func() {
126-
w.Start("test", func(ctx context.Context) {
127-
atomic.AddInt64(&counter, 1)
128-
})
129-
}()
163+
for time.Since(start) < time.Millisecond {
164+
w.Start("test2", func(ctx context.Context) {
165+
// pass
166+
})
130167
}
131168
}()
132-
}
133169

134-
xtest.WaitChannelClosed(t, closed)
135-
runtime.Gosched()
136-
require.Equal(t, closeIndex, atomic.LoadInt64(&counter))
137-
stopNewStarts.Store(true)
170+
go func() {
171+
defer close(closed)
172+
173+
_ = w.Close(ctx, nil)
174+
}()
175+
176+
xtest.WaitChannelClosed(t, callStartFinished)
177+
runtime.Gosched()
178+
179+
select {
180+
case <-closed:
181+
t.Fatal()
182+
default:
183+
// pass
184+
}
185+
186+
close(allowStop)
187+
xtest.WaitChannelClosed(t, closed)
188+
})
138189
}

0 commit comments

Comments
 (0)