Skip to content

Commit 41806c3

Browse files
authored
Merge branch 'master' into xsql-conn-close
2 parents af5168f + fe6fa01 commit 41806c3

File tree

15 files changed

+191
-64
lines changed

15 files changed

+191
-64
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
* Fixed closing of `database/sql` connection (aka `YDB` session)
22
* Made `session.Close()` as `nop` for idled session
33
* Implemented goroutine for closing idle connection in `database/sql` driver
4+
* Separated errors of commit from other reader and to expired session
5+
* Fixed wrapping error in `internal/balancer/Balancer.wrapCall()`
46

57
## v3.42.4
68
* Added `ydb.WithDisableServerBalancer()` database/sql connector option

internal/balancer/balancer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,10 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc
279279
}
280280

281281
if err = f(ctx, cc); err != nil {
282-
return xerrors.WithStackTrace(err)
282+
if conn.UseWrapping(ctx) {
283+
return xerrors.WithStackTrace(err)
284+
}
285+
return err
283286
}
284287

285288
return nil

internal/conn/conn.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,10 @@ func (c *conn) Invoke(
297297
opts ...grpc.CallOption,
298298
) (err error) {
299299
var (
300-
opID string
301-
issues []trace.Issue
302-
wrapping = needWrapping(ctx)
303-
onDone = trace.DriverOnConnInvoke(
300+
opID string
301+
issues []trace.Issue
302+
useWrapping = UseWrapping(ctx)
303+
onDone = trace.DriverOnConnInvoke(
304304
c.config.Trace(),
305305
&ctx,
306306
c.endpoint,
@@ -330,7 +330,7 @@ func (c *conn) Invoke(
330330

331331
err = cc.Invoke(ctx, method, req, res, append(opts, grpc.Trailer(&md))...)
332332
if err != nil {
333-
if wrapping {
333+
if useWrapping {
334334
err = xerrors.FromGRPCError(err,
335335
xerrors.WithAddress(c.Address()),
336336
)
@@ -353,7 +353,7 @@ func (c *conn) Invoke(
353353
for _, issue := range o.GetOperation().GetIssues() {
354354
issues = append(issues, issue)
355355
}
356-
if wrapping {
356+
if useWrapping {
357357
switch {
358358
case !o.GetOperation().GetReady():
359359
return xerrors.WithStackTrace(errOperationNotReady)
@@ -386,9 +386,9 @@ func (c *conn) NewStream(
386386
c.endpoint.Copy(),
387387
trace.Method(method),
388388
)
389-
wrapping = needWrapping(ctx)
390-
cc *grpc.ClientConn
391-
s grpc.ClientStream
389+
useWrapping = UseWrapping(ctx)
390+
cc *grpc.ClientConn
391+
s grpc.ClientStream
392392
)
393393

394394
defer func() {
@@ -418,7 +418,7 @@ func (c *conn) NewStream(
418418

419419
s, err = cc.NewStream(ctx, desc, method, opts...)
420420
if err != nil {
421-
if wrapping {
421+
if useWrapping {
422422
err = xerrors.Retryable(
423423
xerrors.FromGRPCError(err,
424424
xerrors.WithAddress(c.Address()),
@@ -436,7 +436,7 @@ func (c *conn) NewStream(
436436
return &grpcClientStream{
437437
ClientStream: s,
438438
c: c,
439-
wrapping: wrapping,
439+
wrapping: useWrapping,
440440
sentMark: sentMark,
441441
onDone: func(ctx context.Context, md metadata.MD) {
442442
cancel()

internal/conn/context.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ func WithoutWrapping(ctx context.Context) context.Context {
88
return context.WithValue(ctx, ctxNoWrappingKey{}, true)
99
}
1010

11-
func needWrapping(ctx context.Context) bool {
11+
func UseWrapping(ctx context.Context) bool {
1212
b, ok := ctx.Value(ctxNoWrappingKey{}).(bool)
1313
return !ok || !b
1414
}

internal/topic/topicreaderinternal/committer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func (c *committer) waitCommitAck(ctx context.Context, waiter commitWaiter) erro
208208
case <-ctx.Done():
209209
return ctx.Err()
210210
case <-waiter.Session.Context().Done():
211-
return waiter.Session.Context().Err()
211+
return PublicErrCommitSessionToExpiredSession
212212
case <-waiter.Committed:
213213
return nil
214214
}

internal/topic/topicreaderinternal/committer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func TestCommitterCommitSync(t *testing.T) {
189189
sessionCancel(testErr)
190190

191191
commitErr := <-waitErr
192-
require.ErrorIs(t, commitErr, testErr)
192+
require.ErrorIs(t, commitErr, PublicErrCommitSessionToExpiredSession)
193193
})
194194
}
195195

internal/topic/topicreaderinternal/partition_session.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ type partitionSession struct {
2121
Topic string
2222
PartitionID int64
2323

24+
readerID int64
25+
connectionID string
26+
2427
ctx context.Context
2528
ctxCancel xcontext.CancelErrFunc
2629
partitionSessionID rawtopicreader.PartitionSessionID
@@ -33,6 +36,8 @@ func newPartitionSession(
3336
partitionContext context.Context,
3437
topic string,
3538
partitionID int64,
39+
readerID int64,
40+
connectionID string,
3641
partitionSessionID rawtopicreader.PartitionSessionID,
3742
committedOffset rawtopicreader.Offset,
3843
) *partitionSession {
@@ -41,6 +46,8 @@ func newPartitionSession(
4146
return &partitionSession{
4247
Topic: topic,
4348
PartitionID: partitionID,
49+
readerID: readerID,
50+
connectionID: connectionID,
4451
ctx: partitionContext,
4552
ctxCancel: cancel,
4653
partitionSessionID: partitionSessionID,

internal/topic/topicreaderinternal/reader.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"sync"
7+
"sync/atomic"
78
"time"
89

910
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
@@ -16,10 +17,19 @@ import (
1617
)
1718

1819
var (
19-
errUnconnected = xerrors.Retryable(xerrors.Wrap(errors.New("ydb: first connection attempt not finished")))
20-
ErrReaderClosed = xerrors.Wrap(errors.New("ydb: reader closed"))
20+
errUnconnected = xerrors.Retryable(xerrors.Wrap(
21+
errors.New("ydb: first connection attempt not finished"),
22+
))
23+
errReaderClosed = xerrors.Wrap(errors.New("ydb: reader closed"))
24+
errCommitSessionFromOtherReader = xerrors.Wrap(errors.New("ydb: commit with session from other reader"))
2125
)
2226

27+
var globalReaderCounter int64
28+
29+
func nextReaderID() int64 {
30+
return atomic.AddInt64(&globalReaderCounter, 1)
31+
}
32+
2333
//nolint:lll
2434
//go:generate mockgen -destination raw_topic_reader_stream_mock_test.go -package topicreaderinternal -write_package_comment=false . RawTopicReaderStream
2535

@@ -37,6 +47,7 @@ type Reader struct {
3747
reader batchedStreamReader
3848
defaultBatchConfig ReadMessageBatchOptions
3949
tracer trace.Topic
50+
readerID int64
4051
}
4152

4253
type ReadMessageBatchOptions struct {
@@ -78,17 +89,20 @@ func NewReader(
7889
opts ...PublicReaderOption,
7990
) Reader {
8091
cfg := convertNewParamsToStreamConfig(consumer, readSelectors, opts...)
92+
readerID := nextReaderID()
93+
8194
readerConnector := func(ctx context.Context) (batchedStreamReader, error) {
8295
stream, err := connector(ctx)
8396
if err != nil {
8497
return nil, err
8598
}
8699

87-
return newTopicStreamReader(stream, cfg.topicStreamReaderConfig)
100+
return newTopicStreamReader(readerID, stream, cfg.topicStreamReaderConfig)
88101
}
89102

90103
res := Reader{
91104
reader: newReaderReconnector(
105+
readerID,
92106
readerConnector,
93107
cfg.OperationTimeout(),
94108
cfg.RetrySettings,
@@ -97,13 +111,14 @@ func NewReader(
97111
),
98112
defaultBatchConfig: cfg.DefaultBatchConfig,
99113
tracer: cfg.Tracer,
114+
readerID: readerID,
100115
}
101116

102117
return res
103118
}
104119

105120
func (r *Reader) Close(ctx context.Context) error {
106-
return r.reader.CloseWithError(ctx, xerrors.WithStackTrace(ErrReaderClosed))
121+
return r.reader.CloseWithError(ctx, xerrors.WithStackTrace(errReaderClosed))
107122
}
108123

109124
type readExplicitMessagesCount int
@@ -148,10 +163,21 @@ forReadBatch:
148163
}
149164

150165
func (r *Reader) Commit(ctx context.Context, offsets PublicCommitRangeGetter) (err error) {
151-
return r.reader.Commit(ctx, offsets.getCommitRange().priv)
166+
cr := offsets.getCommitRange().priv
167+
if cr.partitionSession.readerID != r.readerID {
168+
return errCommitSessionFromOtherReader
169+
}
170+
171+
return r.reader.Commit(ctx, cr)
152172
}
153173

154174
func (r *Reader) CommitRanges(ctx context.Context, ranges []PublicCommitRange) error {
175+
for i := range ranges {
176+
if ranges[i].priv.partitionSession.readerID != r.readerID {
177+
return errCommitSessionFromOtherReader
178+
}
179+
}
180+
155181
commitRanges := NewCommitRangesFromPublicCommits(ranges)
156182
commitRanges.optimize()
157183

internal/topic/topicreaderinternal/reader_test.go

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ func TestReader_Close(t *testing.T) {
6969
readerReadMessageBatchState := newCallState()
7070

7171
go func() {
72-
readerCommitState.err = reader.Commit(context.Background(), &PublicMessage{})
72+
readerCommitState.err = reader.Commit(context.Background(), &PublicMessage{
73+
commitRange: commitRange{
74+
partitionSession: &partitionSession{},
75+
},
76+
})
7377
close(readerCommitState.callCompleted)
7478
}()
7579

@@ -100,27 +104,51 @@ func TestReader_Close(t *testing.T) {
100104
}
101105

102106
func TestReader_Commit(t *testing.T) {
103-
mc := gomock.NewController(t)
104-
defer mc.Finish()
105-
106-
baseReader := NewMockbatchedStreamReader(mc)
107-
reader := &Reader{reader: baseReader}
108-
109-
expectedRangeOk := commitRange{
110-
commitOffsetStart: 1,
111-
commitOffsetEnd: 10,
112-
partitionSession: &partitionSession{partitionSessionID: 10},
113-
}
114-
baseReader.EXPECT().Commit(gomock.Any(), expectedRangeOk).Return(nil)
115-
require.NoError(t, reader.Commit(context.Background(), &PublicMessage{commitRange: expectedRangeOk}))
116-
117-
expectedRangeErr := commitRange{
118-
commitOffsetStart: 15,
119-
commitOffsetEnd: 20,
120-
partitionSession: &partitionSession{partitionSessionID: 30},
121-
}
122-
123-
testErr := errors.New("test err")
124-
baseReader.EXPECT().Commit(gomock.Any(), expectedRangeErr).Return(testErr)
125-
require.ErrorIs(t, reader.Commit(context.Background(), &PublicMessage{commitRange: expectedRangeErr}), testErr)
107+
t.Run("OK", func(t *testing.T) {
108+
mc := gomock.NewController(t)
109+
defer mc.Finish()
110+
111+
readerID := nextReaderID()
112+
baseReader := NewMockbatchedStreamReader(mc)
113+
reader := &Reader{
114+
reader: baseReader,
115+
readerID: readerID,
116+
}
117+
118+
expectedRangeOk := commitRange{
119+
commitOffsetStart: 1,
120+
commitOffsetEnd: 10,
121+
partitionSession: &partitionSession{
122+
readerID: readerID,
123+
partitionSessionID: 10,
124+
},
125+
}
126+
baseReader.EXPECT().Commit(gomock.Any(), expectedRangeOk).Return(nil)
127+
require.NoError(t, reader.Commit(context.Background(), &PublicMessage{commitRange: expectedRangeOk}))
128+
129+
expectedRangeErr := commitRange{
130+
commitOffsetStart: 15,
131+
commitOffsetEnd: 20,
132+
partitionSession: &partitionSession{
133+
readerID: readerID,
134+
partitionSessionID: 30,
135+
},
136+
}
137+
138+
testErr := errors.New("test err")
139+
baseReader.EXPECT().Commit(gomock.Any(), expectedRangeErr).Return(testErr)
140+
require.ErrorIs(t, reader.Commit(context.Background(), &PublicMessage{commitRange: expectedRangeErr}), testErr)
141+
})
142+
143+
t.Run("CommitFromOtherReader", func(t *testing.T) {
144+
ctx := xtest.Context(t)
145+
reader := &Reader{readerID: 1}
146+
forCommit := commitRange{
147+
commitOffsetStart: 1,
148+
commitOffsetEnd: 2,
149+
partitionSession: &partitionSession{readerID: 2},
150+
}
151+
err := reader.Commit(ctx, forCommit)
152+
require.ErrorIs(t, err, errCommitSessionFromOtherReader)
153+
})
126154
}

0 commit comments

Comments
 (0)