Skip to content

Commit 96ddb14

Browse files
authored
Merge pull request #1485 fix-listener-close
2 parents 28775df + 896a012 commit 96ddb14

File tree

3 files changed

+27
-10
lines changed

3 files changed

+27
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed error on experimental TopicListener.Close
12
* Disabled reporting of `ydb_go_sdk_query_session_count` when metrics are disabled
23
* Disabled reporting of `ydb_go_sdk_ydb_query_session_create_latency` histogram metrics when metrics are disabled
34
* Allowed skip column for `ScanStruct` by tag `-`

internal/topic/topiclistenerinternal/stream_listener.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type streamListener struct {
3636
hasNewMessagesToSend empty.Chan
3737
syncCommitter *topicreadercommon.Committer
3838

39+
closing atomic.Bool
40+
3941
m xsync.Mutex
4042
messagesToSend []rawtopicreader.ClientMessage
4143
}
@@ -56,7 +58,7 @@ func newStreamListener(
5658

5759
res.initVars(sessionIDCounter)
5860
if err := res.initStream(connectionCtx, client); err != nil {
59-
res.closeWithTimeout(connectionCtx, err)
61+
res.goClose(connectionCtx, err)
6062

6163
return nil, err
6264
}
@@ -75,17 +77,22 @@ func newStreamListener(
7577
}
7678

7779
func (l *streamListener) Close(ctx context.Context, reason error) error {
80+
if !l.closing.CompareAndSwap(false, true) {
81+
return errTopicListenerClosed
82+
}
83+
7884
var resErrors []error
7985

86+
// should be first because background wait stop of steams
8087
if l.stream != nil {
8188
l.streamClose(reason)
8289
}
8390

84-
if err := l.syncCommitter.Close(ctx, reason); err != nil {
91+
if err := l.background.Close(ctx, reason); err != nil {
8592
resErrors = append(resErrors, err)
8693
}
8794

88-
if err := l.background.Close(ctx, reason); err != nil {
95+
if err := l.syncCommitter.Close(ctx, reason); err != nil {
8996
resErrors = append(resErrors, err)
9097
}
9198

@@ -109,10 +116,12 @@ func (l *streamListener) Close(ctx context.Context, reason error) error {
109116
return errors.Join(resErrors...)
110117
}
111118

112-
func (l *streamListener) closeWithTimeout(ctx context.Context, reason error) {
119+
func (l *streamListener) goClose(ctx context.Context, reason error) {
113120
ctx, cancel := context.WithTimeout(xcontext.ValueOnly(ctx), time.Second)
114121
l.streamClose(reason)
115-
_ = l.background.Close(ctx, reason)
122+
go func() {
123+
_ = l.background.Close(ctx, reason)
124+
}()
116125

117126
cancel()
118127
}
@@ -145,7 +154,7 @@ func (l *streamListener) initStream(ctx context.Context, client TopicClient) err
145154
err := xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
146155
"ydb: topic listener stream init timeout: %w", ctx.Err(),
147156
)))
148-
l.closeWithTimeout(ctx, err)
157+
l.goClose(ctx, err)
149158
l.streamClose(err)
150159
case <-initDone:
151160
// pass
@@ -216,7 +225,7 @@ func (l *streamListener) sendMessagesLoop(ctx context.Context) {
216225

217226
for _, m := range messages {
218227
if err := l.stream.Send(m); err != nil {
219-
l.closeWithTimeout(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
228+
l.goClose(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
220229
"ydb: failed send message by grpc to topic reader stream from listener: %w",
221230
err,
222231
))))
@@ -236,7 +245,7 @@ func (l *streamListener) receiveMessagesLoop(ctx context.Context) {
236245

237246
mess, err := l.stream.Recv()
238247
if err != nil {
239-
l.closeWithTimeout(ctx, xerrors.WithStackTrace(
248+
l.goClose(ctx, xerrors.WithStackTrace(
240249
fmt.Errorf("ydb: failed read message from the stream in the topic reader listener: %w", err),
241250
))
242251

@@ -263,7 +272,7 @@ func (l *streamListener) onReceiveServerMessage(ctx context.Context, mess rawtop
263272
// todo log
264273
}
265274
if err != nil {
266-
l.closeWithTimeout(ctx, err)
275+
l.goClose(ctx, err)
267276
}
268277
}
269278

internal/topic/topiclistenerinternal/topic_listener_reconnector.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import (
1010
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
1111
)
1212

13-
var ErrUserCloseTopic = errors.New("ydb: user closed topic listener")
13+
var (
14+
ErrUserCloseTopic = errors.New("ydb: user closed topic listener")
15+
errTopicListenerClosed = errors.New("ydb: the topic listener already closed")
16+
)
1417

1518
type TopicListenerReconnector struct {
1619
streamConfig *StreamListenerConfig
@@ -22,6 +25,7 @@ type TopicListenerReconnector struct {
2225
connectionResult error
2326
connectionCompleted empty.Chan
2427
connectionIDCounter atomic.Int64
28+
closing atomic.Bool
2529

2630
m sync.Mutex
2731
streamListener *streamListener
@@ -45,6 +49,9 @@ func NewTopicListenerReconnector(
4549
}
4650

4751
func (lr *TopicListenerReconnector) Close(ctx context.Context, reason error) error {
52+
if !lr.closing.CompareAndSwap(false, true) {
53+
return errTopicListenerClosed
54+
}
4855
var closeErrors []error
4956
err := lr.background.Close(ctx, reason)
5057
closeErrors = append(closeErrors, err)

0 commit comments

Comments
 (0)