Skip to content

Commit 2b953aa

Browse files
authored
Merge pull request #802 added check if commit order is bad in sync mode from gingersamurai
2 parents 343fc98 + 33f4901 commit 2b953aa

File tree

4 files changed

+69
-1
lines changed

4 files changed

+69
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added check if commit order is bad in sync mode
2+
13
## v3.49.1
24
* Added `table.options.WithIgnoreTruncated` option for `session.Execute` method
35
* Added `table.result.ErrTruncated` error for check it with `errors.Is()` outside of `ydb-go-sdk`

internal/topic/topicreaderinternal/committer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ import (
1616
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1717
)
1818

19-
var ErrCommitDisabled = xerrors.Wrap(errors.New("ydb: commits disabled"))
19+
var (
20+
ErrCommitDisabled = xerrors.Wrap(errors.New("ydb: commits disabled"))
21+
ErrWrongCommitOrderInSyncMode = xerrors.Wrap(errors.New("ydb: wrong commit order in sync mode"))
22+
)
2023

2124
type sendMessageToServerFunc func(msg rawtopicreader.ClientMessage) error
2225

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,9 @@ func (r *topicStreamReaderImpl) checkCommitRange(commitRange commitRange) error
380380
if err != nil || session != ownSession {
381381
return xerrors.WithStackTrace(PublicErrCommitSessionToExpiredSession)
382382
}
383+
if session.committedOffset() != commitRange.commitOffsetStart && r.cfg.CommitMode == CommitModeSync {
384+
return ErrWrongCommitOrderInSyncMode
385+
}
383386

384387
return nil
385388
}

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,66 @@ func TestTopicStreamReaderImpl_CommitStolen(t *testing.T) {
108108
xtest.WaitChannelClosed(t, commitReceived)
109109
xtest.WaitChannelClosed(t, readRequestReceived)
110110
})
111+
xtest.TestManyTimesWithName(t, "WrongOrderCommitWithSyncMode", func(t testing.TB) {
112+
e := newTopicReaderTestEnv(t)
113+
e.reader.cfg.CommitMode = CommitModeSync
114+
e.Start()
115+
116+
lastOffset := e.partitionSession.lastReceivedMessageOffset()
117+
const dataSize = 4
118+
// request new data portion
119+
readRequestReceived := make(empty.Chan)
120+
e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: dataSize * 2}).Do(func(_ interface{}) {
121+
close(readRequestReceived)
122+
})
123+
124+
e.SendFromServer(&rawtopicreader.ReadResponse{
125+
BytesSize: dataSize,
126+
PartitionData: []rawtopicreader.PartitionData{
127+
{
128+
PartitionSessionID: e.partitionSessionID,
129+
Batches: []rawtopicreader.Batch{
130+
{
131+
Codec: rawtopiccommon.CodecRaw,
132+
ProducerID: "1",
133+
MessageData: []rawtopicreader.MessageData{
134+
{
135+
Offset: lastOffset + 1,
136+
},
137+
},
138+
},
139+
},
140+
},
141+
},
142+
})
143+
144+
e.SendFromServer(&rawtopicreader.ReadResponse{
145+
BytesSize: dataSize,
146+
PartitionData: []rawtopicreader.PartitionData{
147+
{
148+
PartitionSessionID: e.partitionSessionID,
149+
Batches: []rawtopicreader.Batch{
150+
{
151+
Codec: rawtopiccommon.CodecRaw,
152+
ProducerID: "1",
153+
MessageData: []rawtopicreader.MessageData{
154+
{
155+
Offset: lastOffset + 2,
156+
},
157+
},
158+
},
159+
},
160+
},
161+
},
162+
})
163+
164+
opts := newReadMessageBatchOptions()
165+
opts.MinCount = 2
166+
batch, err := e.reader.ReadMessageBatch(e.ctx, opts)
167+
require.NoError(t, err)
168+
require.ErrorIs(t, e.reader.Commit(e.ctx, batch.Messages[1].getCommitRange().priv), ErrWrongCommitOrderInSyncMode)
169+
xtest.WaitChannelClosed(t, readRequestReceived)
170+
})
111171

112172
xtest.TestManyTimesWithName(t, "CommitAfterGracefulStopPartition", func(t testing.TB) {
113173
e := newTopicReaderTestEnv(t)

0 commit comments

Comments
 (0)