Skip to content

Commit a35129b

Browse files
committed
Revert direct master commits
Revert "stop retry on unexpected call" This reverts commit 88cebd8. Revert "fix worker start after close" This reverts commit 4a46a77. Revert "fixed start func may lock while close wait stop backgrounds" This reverts commit 5500363. Revert "fix task loop" This reverts commit 23061fb.
1 parent 23061fb commit a35129b

File tree

3 files changed

+34
-250
lines changed

3 files changed

+34
-250
lines changed

internal/background/worker.go

Lines changed: 33 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"runtime/pprof"
77
"sync"
8+
"sync/atomic"
89

910
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
@@ -23,19 +24,13 @@ type Worker struct {
2324
workers sync.WaitGroup
2425
onceInit sync.Once
2526

26-
tasksCompleted empty.Chan
27-
2827
m xsync.Mutex
2928

30-
tasks chan backgroundTask
31-
32-
closed bool
29+
closed uint32
3330
stop xcontext.CancelErrFunc
3431
closeReason error
3532
}
3633

37-
type CallbackFunc func(ctx context.Context)
38-
3934
func NewWorker(parent context.Context) *Worker {
4035
w := Worker{}
4136
w.ctx, w.stop = xcontext.WithErrCancel(parent)
@@ -49,52 +44,54 @@ func (b *Worker) Context() context.Context {
4944
return b.ctx
5045
}
5146

52-
func (b *Worker) Start(name string, f CallbackFunc) {
47+
func (b *Worker) Start(name string, f func(ctx context.Context)) {
48+
if atomic.LoadUint32(&b.closed) != 0 {
49+
f(b.ctx)
50+
return
51+
}
52+
5353
b.init()
5454

55-
b.m.WithLock(func() {
56-
if b.closed {
57-
return
58-
}
55+
b.m.Lock()
56+
defer b.m.Unlock()
5957

60-
b.tasks <- backgroundTask{
61-
callback: f,
62-
name: name,
63-
}
64-
})
58+
if b.ctx.Err() != nil {
59+
return
60+
}
61+
62+
b.workers.Add(1)
63+
go func() {
64+
defer b.workers.Done()
65+
66+
pprof.Do(b.ctx, pprof.Labels("background", name), f)
67+
}()
6568
}
6669

6770
func (b *Worker) Done() <-chan struct{} {
6871
b.init()
6972

73+
b.m.Lock()
74+
defer b.m.Unlock()
75+
7076
return b.ctx.Done()
7177
}
7278

7379
func (b *Worker) Close(ctx context.Context, err error) error {
74-
b.init()
75-
76-
var resErr error
77-
b.m.WithLock(func() {
78-
if b.closed {
79-
resErr = xerrors.WithStackTrace(ErrAlreadyClosed)
80-
return
81-
}
80+
if !atomic.CompareAndSwapUint32(&b.closed, 0, 1) {
81+
return xerrors.WithStackTrace(ErrAlreadyClosed)
82+
}
8283

83-
b.closed = true
84+
b.init()
8485

85-
close(b.tasks)
86-
b.closeReason = err
87-
if b.closeReason == nil {
88-
b.closeReason = errClosedWithNilReason
89-
}
86+
b.m.Lock()
87+
defer b.m.Unlock()
9088

91-
b.stop(err)
92-
})
93-
if resErr != nil {
94-
return resErr
89+
b.closeReason = err
90+
if b.closeReason == nil {
91+
b.closeReason = errClosedWithNilReason
9592
}
9693

97-
<-b.tasksCompleted
94+
b.stop(err)
9895

9996
bgCompleted := make(empty.Chan)
10097

@@ -123,27 +120,5 @@ func (b *Worker) init() {
123120
if b.ctx == nil {
124121
b.ctx, b.stop = xcontext.WithErrCancel(context.Background())
125122
}
126-
b.tasks = make(chan backgroundTask)
127-
b.tasksCompleted = make(empty.Chan)
128-
go b.starterLoop()
129123
})
130124
}
131-
132-
func (b *Worker) starterLoop() {
133-
defer close(b.tasksCompleted)
134-
135-
for bgTask := range b.tasks {
136-
b.workers.Add(1)
137-
138-
go func(task backgroundTask) {
139-
defer b.workers.Done()
140-
141-
pprof.Do(b.ctx, pprof.Labels("background", task.name), task.callback)
142-
}(bgTask)
143-
}
144-
}
145-
146-
type backgroundTask struct {
147-
callback CallbackFunc
148-
name string
149-
}

internal/background/worker_test.go

Lines changed: 0 additions & 191 deletions
This file was deleted.

internal/topic/topicreaderinternal/stream_reconnector_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func TestTopicReaderReconnectorConnectionLoop(t *testing.T) {
222222
},
223223
{
224224
callback: func(ctx context.Context) (batchedStreamReader, error) {
225-
t.Fatal()
225+
t.Error()
226226
return nil, errors.New("unexpected call")
227227
},
228228
},

0 commit comments

Comments
 (0)