Skip to content

Commit 3130e53

Browse files
committed
fix data race in background worker
1 parent 692c27a commit 3130e53

File tree

2 files changed

+26
-6
lines changed

2 files changed

+26
-6
lines changed

internal/background/worker.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,28 @@ package background
22

33
import (
44
"context"
5+
"errors"
56
"runtime/pprof"
67
"sync"
8+
"sync/atomic"
79

810
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
912
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
1013
)
1114

15+
var ErrAlreadyClosed = errors.New("background worker already closed")
16+
1217
// A Worker must not be copied after first use
1318
type Worker struct {
14-
ctx context.Context
15-
workers sync.WaitGroup
16-
19+
ctx context.Context
20+
workers sync.WaitGroup
1721
onceInit sync.Once
1822

19-
m xsync.Mutex
20-
stop xcontext.CancelErrFunc
23+
m xsync.Mutex
24+
25+
closed uint32
26+
stop xcontext.CancelErrFunc
2127
}
2228

2329
func NewWorker(parent context.Context) *Worker {
@@ -34,6 +40,10 @@ func (b *Worker) Context() context.Context {
3440
}
3541

3642
func (b *Worker) Start(name string, f func(ctx context.Context)) {
43+
if atomic.LoadUint32(&b.closed) != 0 {
44+
return
45+
}
46+
3747
b.init()
3848

3949
b.m.Lock()
@@ -61,13 +71,20 @@ func (b *Worker) Done() <-chan struct{} {
6171
}
6272

6373
func (b *Worker) Close(ctx context.Context, err error) error {
74+
if !atomic.CompareAndSwapUint32(&b.closed, 0, 1) {
75+
return xerrors.WithStackTrace(ErrAlreadyClosed)
76+
}
77+
6478
b.init()
6579

6680
b.stop(err)
6781

6882
waitCtx, waitCancel := context.WithCancel(ctx)
6983

7084
go func() {
85+
b.m.Lock()
86+
defer b.m.Unlock()
87+
7188
b.workers.Wait()
7289
waitCancel()
7390
}()

internal/topic/topicreaderinternal/committer_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/jonboulle/clockwork"
1010
"github.com/stretchr/testify/require"
1111

12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/background"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
@@ -338,7 +339,9 @@ func newTestCommitter(ctx context.Context, t testing.TB) *committer {
338339
return nil
339340
})
340341
t.Cleanup(func() {
341-
require.NoError(t, res.Close(ctx, errors.New("test comitter closed")))
342+
if err := res.Close(ctx, errors.New("test comitter closed")); err != nil {
343+
require.ErrorIs(t, err, background.ErrAlreadyClosed)
344+
}
342345
})
343346
return res
344347
}

0 commit comments

Comments
 (0)