Skip to content

Commit f90d5a1

Browse files
authored
Merge branch 'master' into bulk-upsert-from-csv
2 parents 5700cf7 + e5f377a commit f90d5a1

File tree

5 files changed

+32
-13
lines changed

5 files changed

+32
-13
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
* Support bulk upsert from scv, arrow and ydb internal formats in table client
2+
3+
## v3.82.0
4+
* Fixed error on experimental `TopicListener.Close`
25
* Disabled reporting of `ydb_go_sdk_query_session_count` when metrics are disabled
36
* Disabled reporting of `ydb_go_sdk_ydb_query_session_create_latency` histogram metrics when metrics are disabled
47
* Allowed skip column for `ScanStruct` by tag `-`

internal/topic/topiclistenerinternal/stream_listener.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type streamListener struct {
3636
hasNewMessagesToSend empty.Chan
3737
syncCommitter *topicreadercommon.Committer
3838

39+
closing atomic.Bool
40+
3941
m xsync.Mutex
4042
messagesToSend []rawtopicreader.ClientMessage
4143
}
@@ -56,7 +58,7 @@ func newStreamListener(
5658

5759
res.initVars(sessionIDCounter)
5860
if err := res.initStream(connectionCtx, client); err != nil {
59-
res.closeWithTimeout(connectionCtx, err)
61+
res.goClose(connectionCtx, err)
6062

6163
return nil, err
6264
}
@@ -75,17 +77,22 @@ func newStreamListener(
7577
}
7678

7779
func (l *streamListener) Close(ctx context.Context, reason error) error {
80+
if !l.closing.CompareAndSwap(false, true) {
81+
return errTopicListenerClosed
82+
}
83+
7884
var resErrors []error
7985

86+
// should be first because background wait stop of steams
8087
if l.stream != nil {
8188
l.streamClose(reason)
8289
}
8390

84-
if err := l.syncCommitter.Close(ctx, reason); err != nil {
91+
if err := l.background.Close(ctx, reason); err != nil {
8592
resErrors = append(resErrors, err)
8693
}
8794

88-
if err := l.background.Close(ctx, reason); err != nil {
95+
if err := l.syncCommitter.Close(ctx, reason); err != nil {
8996
resErrors = append(resErrors, err)
9097
}
9198

@@ -109,10 +116,12 @@ func (l *streamListener) Close(ctx context.Context, reason error) error {
109116
return errors.Join(resErrors...)
110117
}
111118

112-
func (l *streamListener) closeWithTimeout(ctx context.Context, reason error) {
119+
func (l *streamListener) goClose(ctx context.Context, reason error) {
113120
ctx, cancel := context.WithTimeout(xcontext.ValueOnly(ctx), time.Second)
114121
l.streamClose(reason)
115-
_ = l.background.Close(ctx, reason)
122+
go func() {
123+
_ = l.background.Close(ctx, reason)
124+
}()
116125

117126
cancel()
118127
}
@@ -145,7 +154,7 @@ func (l *streamListener) initStream(ctx context.Context, client TopicClient) err
145154
err := xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
146155
"ydb: topic listener stream init timeout: %w", ctx.Err(),
147156
)))
148-
l.closeWithTimeout(ctx, err)
157+
l.goClose(ctx, err)
149158
l.streamClose(err)
150159
case <-initDone:
151160
// pass
@@ -216,7 +225,7 @@ func (l *streamListener) sendMessagesLoop(ctx context.Context) {
216225

217226
for _, m := range messages {
218227
if err := l.stream.Send(m); err != nil {
219-
l.closeWithTimeout(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
228+
l.goClose(ctx, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
220229
"ydb: failed send message by grpc to topic reader stream from listener: %w",
221230
err,
222231
))))
@@ -236,7 +245,7 @@ func (l *streamListener) receiveMessagesLoop(ctx context.Context) {
236245

237246
mess, err := l.stream.Recv()
238247
if err != nil {
239-
l.closeWithTimeout(ctx, xerrors.WithStackTrace(
248+
l.goClose(ctx, xerrors.WithStackTrace(
240249
fmt.Errorf("ydb: failed read message from the stream in the topic reader listener: %w", err),
241250
))
242251

@@ -263,7 +272,7 @@ func (l *streamListener) onReceiveServerMessage(ctx context.Context, mess rawtop
263272
// todo log
264273
}
265274
if err != nil {
266-
l.closeWithTimeout(ctx, err)
275+
l.goClose(ctx, err)
267276
}
268277
}
269278

internal/topic/topiclistenerinternal/topic_listener_reconnector.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import (
1010
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
1111
)
1212

13-
var ErrUserCloseTopic = errors.New("ydb: user closed topic listener")
13+
var (
14+
ErrUserCloseTopic = errors.New("ydb: user closed topic listener")
15+
errTopicListenerClosed = errors.New("ydb: the topic listener already closed")
16+
)
1417

1518
type TopicListenerReconnector struct {
1619
streamConfig *StreamListenerConfig
@@ -22,6 +25,7 @@ type TopicListenerReconnector struct {
2225
connectionResult error
2326
connectionCompleted empty.Chan
2427
connectionIDCounter atomic.Int64
28+
closing atomic.Bool
2529

2630
m sync.Mutex
2731
streamListener *streamListener
@@ -45,6 +49,9 @@ func NewTopicListenerReconnector(
4549
}
4650

4751
func (lr *TopicListenerReconnector) Close(ctx context.Context, reason error) error {
52+
if !lr.closing.CompareAndSwap(false, true) {
53+
return errTopicListenerClosed
54+
}
4855
var closeErrors []error
4956
err := lr.background.Close(ctx, reason)
5057
closeErrors = append(closeErrors, err)

internal/version/version.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package version
22

33
const (
44
Major = "3"
5-
Minor = "81"
6-
Patch = "4"
5+
Minor = "82"
6+
Patch = "0"
77

88
Package = "ydb-go-sdk"
99
)

tests/slo/native/query/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func main() {
135135
for i := 0; i < cfg.WriteRPS; i++ {
136136
go w.Write(ctx, &wg, writeRL, gen)
137137
}
138-
log.Println("started " + strconv.Itoa(cfg.ReadRPS) + " write workers")
138+
log.Println("started " + strconv.Itoa(cfg.WriteRPS) + " write workers")
139139

140140
metricsRL := rate.NewLimiter(rate.Every(time.Duration(cfg.ReportPeriod)*time.Millisecond), 1)
141141
wg.Add(1)

0 commit comments

Comments
 (0)