Skip to content

Commit 79d9a27

Browse files
authored
Merge pull request #519 separate commit for expired session and other reader
2 parents 3517f26 + 70b0beb commit 79d9a27

File tree

12 files changed

+143
-40
lines changed

12 files changed

+143
-40
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Separated errors of commit from other reader and to expired session
2+
13
## v3.42.4
24
* Added `ydb.WithDisableServerBalancer()` database/sql connector option
35

internal/topic/topicreaderinternal/committer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func (c *committer) waitCommitAck(ctx context.Context, waiter commitWaiter) erro
208208
case <-ctx.Done():
209209
return ctx.Err()
210210
case <-waiter.Session.Context().Done():
211-
return waiter.Session.Context().Err()
211+
return PublicErrCommitSessionToExpiredSession
212212
case <-waiter.Committed:
213213
return nil
214214
}

internal/topic/topicreaderinternal/committer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func TestCommitterCommitSync(t *testing.T) {
189189
sessionCancel(testErr)
190190

191191
commitErr := <-waitErr
192-
require.ErrorIs(t, commitErr, testErr)
192+
require.ErrorIs(t, commitErr, PublicErrCommitSessionToExpiredSession)
193193
})
194194
}
195195

internal/topic/topicreaderinternal/partition_session.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ type partitionSession struct {
2121
Topic string
2222
PartitionID int64
2323

24+
readerID int64
25+
connectionID string
26+
2427
ctx context.Context
2528
ctxCancel xcontext.CancelErrFunc
2629
partitionSessionID rawtopicreader.PartitionSessionID
@@ -33,6 +36,8 @@ func newPartitionSession(
3336
partitionContext context.Context,
3437
topic string,
3538
partitionID int64,
39+
readerID int64,
40+
connectionID string,
3641
partitionSessionID rawtopicreader.PartitionSessionID,
3742
committedOffset rawtopicreader.Offset,
3843
) *partitionSession {
@@ -41,6 +46,8 @@ func newPartitionSession(
4146
return &partitionSession{
4247
Topic: topic,
4348
PartitionID: partitionID,
49+
readerID: readerID,
50+
connectionID: connectionID,
4451
ctx: partitionContext,
4552
ctxCancel: cancel,
4653
partitionSessionID: partitionSessionID,

internal/topic/topicreaderinternal/reader.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"sync"
7+
"sync/atomic"
78
"time"
89

910
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
@@ -16,10 +17,19 @@ import (
1617
)
1718

1819
var (
19-
errUnconnected = xerrors.Retryable(xerrors.Wrap(errors.New("ydb: first connection attempt not finished")))
20-
ErrReaderClosed = xerrors.Wrap(errors.New("ydb: reader closed"))
20+
errUnconnected = xerrors.Retryable(xerrors.Wrap(
21+
errors.New("ydb: first connection attempt not finished"),
22+
))
23+
errReaderClosed = xerrors.Wrap(errors.New("ydb: reader closed"))
24+
errCommitSessionFromOtherReader = xerrors.Wrap(errors.New("ydb: commit with session from other reader"))
2125
)
2226

27+
var globalReaderCounter int64
28+
29+
func nextReaderID() int64 {
30+
return atomic.AddInt64(&globalReaderCounter, 1)
31+
}
32+
2333
//nolint:lll
2434
//go:generate mockgen -destination raw_topic_reader_stream_mock_test.go -package topicreaderinternal -write_package_comment=false . RawTopicReaderStream
2535

@@ -37,6 +47,7 @@ type Reader struct {
3747
reader batchedStreamReader
3848
defaultBatchConfig ReadMessageBatchOptions
3949
tracer trace.Topic
50+
readerID int64
4051
}
4152

4253
type ReadMessageBatchOptions struct {
@@ -78,17 +89,20 @@ func NewReader(
7889
opts ...PublicReaderOption,
7990
) Reader {
8091
cfg := convertNewParamsToStreamConfig(consumer, readSelectors, opts...)
92+
readerID := nextReaderID()
93+
8194
readerConnector := func(ctx context.Context) (batchedStreamReader, error) {
8295
stream, err := connector(ctx)
8396
if err != nil {
8497
return nil, err
8598
}
8699

87-
return newTopicStreamReader(stream, cfg.topicStreamReaderConfig)
100+
return newTopicStreamReader(readerID, stream, cfg.topicStreamReaderConfig)
88101
}
89102

90103
res := Reader{
91104
reader: newReaderReconnector(
105+
readerID,
92106
readerConnector,
93107
cfg.OperationTimeout(),
94108
cfg.RetrySettings,
@@ -97,13 +111,14 @@ func NewReader(
97111
),
98112
defaultBatchConfig: cfg.DefaultBatchConfig,
99113
tracer: cfg.Tracer,
114+
readerID: readerID,
100115
}
101116

102117
return res
103118
}
104119

105120
func (r *Reader) Close(ctx context.Context) error {
106-
return r.reader.CloseWithError(ctx, xerrors.WithStackTrace(ErrReaderClosed))
121+
return r.reader.CloseWithError(ctx, xerrors.WithStackTrace(errReaderClosed))
107122
}
108123

109124
type readExplicitMessagesCount int
@@ -148,10 +163,21 @@ forReadBatch:
148163
}
149164

150165
func (r *Reader) Commit(ctx context.Context, offsets PublicCommitRangeGetter) (err error) {
151-
return r.reader.Commit(ctx, offsets.getCommitRange().priv)
166+
cr := offsets.getCommitRange().priv
167+
if cr.partitionSession.readerID != r.readerID {
168+
return errCommitSessionFromOtherReader
169+
}
170+
171+
return r.reader.Commit(ctx, cr)
152172
}
153173

154174
func (r *Reader) CommitRanges(ctx context.Context, ranges []PublicCommitRange) error {
175+
for i := range ranges {
176+
if ranges[i].priv.partitionSession.readerID != r.readerID {
177+
return errCommitSessionFromOtherReader
178+
}
179+
}
180+
155181
commitRanges := NewCommitRangesFromPublicCommits(ranges)
156182
commitRanges.optimize()
157183

internal/topic/topicreaderinternal/reader_test.go

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ func TestReader_Close(t *testing.T) {
6969
readerReadMessageBatchState := newCallState()
7070

7171
go func() {
72-
readerCommitState.err = reader.Commit(context.Background(), &PublicMessage{})
72+
readerCommitState.err = reader.Commit(context.Background(), &PublicMessage{
73+
commitRange: commitRange{
74+
partitionSession: &partitionSession{},
75+
},
76+
})
7377
close(readerCommitState.callCompleted)
7478
}()
7579

@@ -100,27 +104,51 @@ func TestReader_Close(t *testing.T) {
100104
}
101105

102106
func TestReader_Commit(t *testing.T) {
103-
mc := gomock.NewController(t)
104-
defer mc.Finish()
105-
106-
baseReader := NewMockbatchedStreamReader(mc)
107-
reader := &Reader{reader: baseReader}
108-
109-
expectedRangeOk := commitRange{
110-
commitOffsetStart: 1,
111-
commitOffsetEnd: 10,
112-
partitionSession: &partitionSession{partitionSessionID: 10},
113-
}
114-
baseReader.EXPECT().Commit(gomock.Any(), expectedRangeOk).Return(nil)
115-
require.NoError(t, reader.Commit(context.Background(), &PublicMessage{commitRange: expectedRangeOk}))
116-
117-
expectedRangeErr := commitRange{
118-
commitOffsetStart: 15,
119-
commitOffsetEnd: 20,
120-
partitionSession: &partitionSession{partitionSessionID: 30},
121-
}
122-
123-
testErr := errors.New("test err")
124-
baseReader.EXPECT().Commit(gomock.Any(), expectedRangeErr).Return(testErr)
125-
require.ErrorIs(t, reader.Commit(context.Background(), &PublicMessage{commitRange: expectedRangeErr}), testErr)
107+
t.Run("OK", func(t *testing.T) {
108+
mc := gomock.NewController(t)
109+
defer mc.Finish()
110+
111+
readerID := nextReaderID()
112+
baseReader := NewMockbatchedStreamReader(mc)
113+
reader := &Reader{
114+
reader: baseReader,
115+
readerID: readerID,
116+
}
117+
118+
expectedRangeOk := commitRange{
119+
commitOffsetStart: 1,
120+
commitOffsetEnd: 10,
121+
partitionSession: &partitionSession{
122+
readerID: readerID,
123+
partitionSessionID: 10,
124+
},
125+
}
126+
baseReader.EXPECT().Commit(gomock.Any(), expectedRangeOk).Return(nil)
127+
require.NoError(t, reader.Commit(context.Background(), &PublicMessage{commitRange: expectedRangeOk}))
128+
129+
expectedRangeErr := commitRange{
130+
commitOffsetStart: 15,
131+
commitOffsetEnd: 20,
132+
partitionSession: &partitionSession{
133+
readerID: readerID,
134+
partitionSessionID: 30,
135+
},
136+
}
137+
138+
testErr := errors.New("test err")
139+
baseReader.EXPECT().Commit(gomock.Any(), expectedRangeErr).Return(testErr)
140+
require.ErrorIs(t, reader.Commit(context.Background(), &PublicMessage{commitRange: expectedRangeErr}), testErr)
141+
})
142+
143+
t.Run("CommitFromOtherReader", func(t *testing.T) {
144+
ctx := xtest.Context(t)
145+
reader := &Reader{readerID: 1}
146+
forCommit := commitRange{
147+
commitOffsetStart: 1,
148+
commitOffsetEnd: 2,
149+
partitionSession: &partitionSession{readerID: 2},
150+
}
151+
err := reader.Commit(ctx, forCommit)
152+
require.ErrorIs(t, err, errCommitSessionFromOtherReader)
153+
})
126154
}

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ import (
2323
)
2424

2525
var (
26+
PublicErrCommitSessionToExpiredSession = xerrors.Wrap(errors.New("ydb: commit to expired session"))
27+
2628
errPartitionSessionStoppedByServer = xerrors.Wrap(errors.New("ydb: topic partition session stopped by server"))
2729
errPartitionSessionStoppedBySDK = xerrors.Wrap(errors.New("ydb: topic partition session stopped by sdk"))
28-
errCommitSessionFromOtherReader = xerrors.Wrap(errors.New("ydb: commit with session from other reader"))
2930
errCommitWithNilPartitionSession = xerrors.Wrap(errors.New("ydb: commit with nil partition session"))
3031
)
3132

@@ -48,6 +49,7 @@ type topicStreamReaderImpl struct {
4849

4950
stream RawTopicReaderStream
5051
readConnectionID string
52+
readerID int64
5153

5254
m xsync.RWMutex
5355
err error
@@ -106,6 +108,7 @@ func (cfg *topicStreamReaderConfig) initMessage() *rawtopicreader.InitRequest {
106108
}
107109

108110
func newTopicStreamReader(
111+
readerID int64,
109112
stream RawTopicReaderStream,
110113
cfg topicStreamReaderConfig,
111114
) (_ *topicStreamReaderImpl, err error) {
@@ -115,7 +118,7 @@ func newTopicStreamReader(
115118
}
116119
}()
117120

118-
reader := newTopicStreamReaderStopped(stream, cfg)
121+
reader := newTopicStreamReaderStopped(readerID, stream, cfg)
119122
if err = reader.initSession(); err != nil {
120123
return nil, err
121124
}
@@ -127,6 +130,7 @@ func newTopicStreamReader(
127130
}
128131

129132
func newTopicStreamReaderStopped(
133+
readerID int64,
130134
stream RawTopicReaderStream,
131135
cfg topicStreamReaderConfig,
132136
) *topicStreamReaderImpl {
@@ -147,6 +151,7 @@ func newTopicStreamReaderStopped(
147151
batcher: newBatcher(),
148152
backgroundWorkers: *background.NewWorker(stopPump),
149153
readConnectionID: "preinitID-" + readerConnectionID.String(),
154+
readerID: readerID,
150155
rawMessagesFromBuffer: make(chan rawtopicreader.ServerMessage, 1),
151156
}
152157

@@ -361,7 +366,7 @@ func (r *topicStreamReaderImpl) checkCommitRange(commitRange commitRange) error
361366

362367
ownSession, err := r.sessionController.Get(session.partitionSessionID)
363368
if err != nil || session != ownSession {
364-
return xerrors.WithStackTrace(errCommitSessionFromOtherReader)
369+
return xerrors.WithStackTrace(PublicErrCommitSessionToExpiredSession)
365370
}
366371

367372
return nil
@@ -706,6 +711,8 @@ func (r *topicStreamReaderImpl) onStartPartitionSessionRequest(m *rawtopicreader
706711
r.ctx,
707712
m.PartitionSession.Path,
708713
m.PartitionSession.PartitionID,
714+
r.readerID,
715+
r.readConnectionID,
709716
m.PartitionSession.PartitionSessionID,
710717
m.CommittedOffset,
711718
)

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func TestTopicStreamReaderImpl_Create(t *testing.T) {
197197
}, nil)
198198
stream.EXPECT().CloseSend().Return(nil)
199199

200-
reader, err := newTopicStreamReader(stream, newTopicStreamReaderConfig())
200+
reader, err := newTopicStreamReader(nextReaderID(), stream, newTopicStreamReaderConfig())
201201
require.Error(t, err)
202202
require.Nil(t, reader)
203203
})
@@ -686,7 +686,15 @@ func TestTopicStreamReadImpl_CommitWithBadSession(t *testing.T) {
686686
e.Start()
687687

688688
cr := commitRange{
689-
partitionSession: newPartitionSession(context.Background(), "asd", 123, 222, 213),
689+
partitionSession: newPartitionSession(
690+
context.Background(),
691+
"asd",
692+
123,
693+
nextReaderID(),
694+
"bad-connection-id",
695+
222,
696+
213,
697+
),
690698
}
691699
err := e.reader.Commit(e.ctx, cr)
692700
require.Error(t, err)
@@ -733,14 +741,22 @@ func newTopicReaderTestEnv(t testing.TB) streamEnv {
733741
cfg.BufferSizeProtoBytes = initialBufferSizeBytes
734742
cfg.CommitterBatchTimeLag = 0
735743

736-
reader := newTopicStreamReaderStopped(stream, cfg)
744+
reader := newTopicStreamReaderStopped(nextReaderID(), stream, cfg)
737745
// reader.initSession() - skip stream level initialization
738746

739747
const testPartitionID = 5
740748
const testSessionID = 15
741749
const testSessionComitted = 20
742750

743-
session := newPartitionSession(ctx, "/test", testPartitionID, testSessionID, testSessionComitted)
751+
session := newPartitionSession(
752+
ctx,
753+
"/test",
754+
testPartitionID,
755+
reader.readerID,
756+
reader.readConnectionID,
757+
testSessionID,
758+
testSessionComitted,
759+
)
744760
require.NoError(t, reader.sessionController.Add(session))
745761

746762
env := streamEnv{

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type readerReconnector struct {
4242
connectTimeout time.Duration
4343

4444
closeOnce sync.Once
45+
readerID int64
4546

4647
m xsync.RWMutex
4748
streamConnectionInProgress empty.Chan // opened if connection in progress, closed if connection established
@@ -52,13 +53,15 @@ type readerReconnector struct {
5253

5354
//nolint:revive
5455
func newReaderReconnector(
56+
readerID int64,
5557
connector readerConnectFunc,
5658
connectTimeout time.Duration,
5759
retrySettings topic.RetrySettings,
5860
tracer trace.Topic,
5961
baseContext context.Context,
6062
) *readerReconnector {
6163
res := &readerReconnector{
64+
readerID: readerID,
6265
clock: clockwork.NewRealClock(),
6366
readerConnect: connector,
6467
streamErr: errUnconnected,
@@ -138,7 +141,7 @@ func (r *readerReconnector) CloseWithError(ctx context.Context, err error) error
138141
closeErr = r.background.Close(ctx, err)
139142

140143
if r.streamVal != nil {
141-
streamCloseErr := r.streamVal.CloseWithError(ctx, xerrors.WithStackTrace(ErrReaderClosed))
144+
streamCloseErr := r.streamVal.CloseWithError(ctx, xerrors.WithStackTrace(errReaderClosed))
142145
if closeErr == nil {
143146
closeErr = streamCloseErr
144147
}

0 commit comments

Comments
 (0)