Skip to content

Commit 9967efe

Browse files
committed
fix tests and check disabled commit
1 parent 1acfced commit 9967efe

File tree

2 files changed

+36
-21
lines changed

2 files changed

+36
-21
lines changed

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,9 @@ func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange commitRa
360360
}
361361

362362
func (r *topicStreamReaderImpl) checkCommitRange(commitRange commitRange) error {
363+
if r.cfg.CommitMode == CommitModeNone {
364+
return ErrCommitDisabled
365+
}
363366
session := commitRange.partitionSession
364367

365368
if session == nil {

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -679,29 +679,41 @@ func TestTopicStreamReadImpl_BatchReaderWantMoreMessagesThenBufferCanHold(t *tes
679679
}
680680

681681
func TestTopicStreamReadImpl_CommitWithBadSession(t *testing.T) {
682-
sleep := func() {
683-
time.Sleep(time.Second / 10)
684-
}
685-
e := newTopicReaderTestEnv(t)
686-
e.Start()
687-
688-
cr := commitRange{
689-
partitionSession: newPartitionSession(
690-
context.Background(),
691-
"asd",
692-
123,
693-
nextReaderID(),
694-
"bad-connection-id",
695-
222,
696-
213,
697-
),
698-
}
699-
err := e.reader.Commit(e.ctx, cr)
700-
require.Error(t, err)
682+
commitByMode := func(mode PublicCommitMode) error {
683+
sleep := func() {
684+
time.Sleep(time.Second / 10)
685+
}
686+
e := newTopicReaderTestEnv(t)
687+
e.reader.cfg.CommitMode = mode
688+
e.Start()
701689

702-
sleep()
690+
cr := commitRange{
691+
partitionSession: newPartitionSession(
692+
context.Background(),
693+
"asd",
694+
123,
695+
nextReaderID(),
696+
"bad-connection-id",
697+
222,
698+
213,
699+
),
700+
}
701+
commitErr := e.reader.Commit(e.ctx, cr)
703702

704-
require.False(t, e.reader.closed)
703+
sleep()
704+
705+
require.False(t, e.reader.closed)
706+
return commitErr
707+
}
708+
t.Run("CommitModeNone", func(t *testing.T) {
709+
require.ErrorIs(t, commitByMode(CommitModeNone), ErrCommitDisabled)
710+
})
711+
t.Run("CommitModeSync", func(t *testing.T) {
712+
require.ErrorIs(t, commitByMode(CommitModeSync), PublicErrCommitSessionToExpiredSession)
713+
})
714+
t.Run("CommitModeAsync", func(t *testing.T) {
715+
require.NoError(t, commitByMode(CommitModeAsync))
716+
})
705717
}
706718

707719
type streamEnv struct {

0 commit comments

Comments
 (0)