44 "context"
55 "errors"
66 "sync"
7+ "sync/atomic"
78 "time"
89
910 "github.com/ydb-platform/ydb-go-sdk/v3/credentials"
@@ -16,10 +17,19 @@ import (
1617)
1718
1819var (
19- errUnconnected = xerrors .Retryable (xerrors .Wrap (errors .New ("ydb: first connection attempt not finished" )))
20- ErrReaderClosed = xerrors .Wrap (errors .New ("ydb: reader closed" ))
20+ errUnconnected = xerrors .Retryable (xerrors .Wrap (
21+ errors .New ("ydb: first connection attempt not finished" ),
22+ ))
23+ errReaderClosed = xerrors .Wrap (errors .New ("ydb: reader closed" ))
24+ errCommitSessionFromOtherReader = xerrors .Wrap (errors .New ("ydb: commit with session from other reader" ))
2125)
2226
27+ var globalReaderCounter int64
28+
29+ func nextReaderID () int64 {
30+ return atomic .AddInt64 (& globalReaderCounter , 1 )
31+ }
32+
2333//nolint:lll
2434//go:generate mockgen -destination raw_topic_reader_stream_mock_test.go -package topicreaderinternal -write_package_comment=false . RawTopicReaderStream
2535
@@ -37,6 +47,7 @@ type Reader struct {
3747 reader batchedStreamReader
3848 defaultBatchConfig ReadMessageBatchOptions
3949 tracer trace.Topic
50+ readerID int64
4051}
4152
4253type ReadMessageBatchOptions struct {
@@ -78,17 +89,20 @@ func NewReader(
7889 opts ... PublicReaderOption ,
7990) Reader {
8091 cfg := convertNewParamsToStreamConfig (consumer , readSelectors , opts ... )
92+ readerID := nextReaderID ()
93+
8194 readerConnector := func (ctx context.Context ) (batchedStreamReader , error ) {
8295 stream , err := connector (ctx )
8396 if err != nil {
8497 return nil , err
8598 }
8699
87- return newTopicStreamReader (stream , cfg .topicStreamReaderConfig )
100+ return newTopicStreamReader (readerID , stream , cfg .topicStreamReaderConfig )
88101 }
89102
90103 res := Reader {
91104 reader : newReaderReconnector (
105+ readerID ,
92106 readerConnector ,
93107 cfg .OperationTimeout (),
94108 cfg .RetrySettings ,
@@ -97,13 +111,14 @@ func NewReader(
97111 ),
98112 defaultBatchConfig : cfg .DefaultBatchConfig ,
99113 tracer : cfg .Tracer ,
114+ readerID : readerID ,
100115 }
101116
102117 return res
103118}
104119
105120func (r * Reader ) Close (ctx context.Context ) error {
106- return r .reader .CloseWithError (ctx , xerrors .WithStackTrace (ErrReaderClosed ))
121+ return r .reader .CloseWithError (ctx , xerrors .WithStackTrace (errReaderClosed ))
107122}
108123
109124type readExplicitMessagesCount int
@@ -148,10 +163,21 @@ forReadBatch:
148163}
149164
150165func (r * Reader ) Commit (ctx context.Context , offsets PublicCommitRangeGetter ) (err error ) {
151- return r .reader .Commit (ctx , offsets .getCommitRange ().priv )
166+ cr := offsets .getCommitRange ().priv
167+ if cr .partitionSession .readerID != r .readerID {
168+ return errCommitSessionFromOtherReader
169+ }
170+
171+ return r .reader .Commit (ctx , cr )
152172}
153173
154174func (r * Reader ) CommitRanges (ctx context.Context , ranges []PublicCommitRange ) error {
175+ for i := range ranges {
176+ if ranges [i ].priv .partitionSession .readerID != r .readerID {
177+ return errCommitSessionFromOtherReader
178+ }
179+ }
180+
155181 commitRanges := NewCommitRangesFromPublicCommits (ranges )
156182 commitRanges .optimize ()
157183
0 commit comments