Skip to content

Commit 84bbd8f

Browse files
authored
Merge pull request #1837 from ydb-platform/1802-bad-offsets
Fixed respect start offset from topicoptions.WithReaderGetPartitionStartOffset for commit messages
2 parents 81b0420 + f44842d commit 84bbd8f

File tree

4 files changed

+57
-2
lines changed

4 files changed

+57
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed respect start offset from topicoptions.WithReaderGetPartitionStartOffset for commit messages
12
* Added `query.AllowImplicitSessions()` option for execute queries through `query.Client.{Exec,Query,QueryResultSet,QueryRow}` without explicit sessions
23

34
## v3.112.0

internal/topic/topicreadercommon/read_partition_session.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ func NewPartitionSession(
4848
StreamPartitionSessionID: partitionSessionID,
4949
ClientPartitionSessionID: clientPartitionSessionID,
5050
}
51-
res.committedOffsetVal.Store(committedOffset.ToInt64())
52-
res.lastReceivedOffsetEndVal.Store(committedOffset.ToInt64() - 1)
51+
res.SetInitialCommitOffset(committedOffset)
5352

5453
return res
5554
}
@@ -103,6 +102,11 @@ func (s *PartitionSession) SetLastReceivedMessageOffset(v rawtopiccommon.Offset)
103102
s.lastReceivedOffsetEndVal.Store(v.ToInt64())
104103
}
105104

105+
func (s *PartitionSession) SetInitialCommitOffset(committedOffset rawtopiccommon.Offset) {
106+
s.committedOffsetVal.Store(committedOffset.ToInt64())
107+
s.lastReceivedOffsetEndVal.Store(committedOffset.ToInt64() - 1)
108+
}
109+
106110
func (s *PartitionSession) NoMoreMessages() bool {
107111
return s.noMoreMessages.Load()
108112
}

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,9 @@ func (r *topicStreamReaderImpl) onStartPartitionSessionRequestFromBuffer(
991991
commitOffset = forceOffset
992992
respMessage.CommitOffset.FromInt64Pointer(commitOffset)
993993
}
994+
if forceOffset != nil {
995+
session.SetInitialCommitOffset(rawtopiccommon.NewOffset(*forceOffset))
996+
}
994997

995998
return r.send(respMessage)
996999
}

tests/integration/topic_read_messages_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@
44
package integration
55

66
import (
7+
"context"
78
"testing"
9+
"time"
810

11+
"github.com/stretchr/testify/assert"
912
"github.com/stretchr/testify/require"
1013

1114
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
1216
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar"
17+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
1318
)
1419

1520
func TestTopicReadMessages(t *testing.T) {
@@ -35,3 +40,45 @@ func TestTopicReadMessages(t *testing.T) {
3540
require.NoError(t, err)
3641
require.NotEmpty(t, batch.Messages)
3742
}
43+
44+
func TestRegression1802_StartPartitionWithOffsetHandler(t *testing.T) {
45+
scope := newScope(t)
46+
ctx := scope.Ctx
47+
48+
scope.Require.NoError(scope.TopicWriter().Write(ctx,
49+
topicwriter.Message{},
50+
topicwriter.Message{},
51+
topicwriter.Message{},
52+
))
53+
54+
msg, err := scope.TopicReaderNamed("first").ReadMessage(ctx)
55+
scope.Require.NoError(err)
56+
err = scope.TopicReaderNamed("first").Commit(ctx, msg)
57+
scope.Require.NoError(err)
58+
err = scope.TopicReaderNamed("first").Close(ctx)
59+
scope.Require.NoError(err)
60+
61+
reader, err := scope.DriverWithGRPCLogging().Topic().StartReader(
62+
scope.TopicConsumerName(), topicoptions.ReadTopic(scope.TopicPath()),
63+
topicoptions.WithReaderGetPartitionStartOffset(func(ctx context.Context, req topicoptions.GetPartitionStartOffsetRequest) (res topicoptions.GetPartitionStartOffsetResponse, err error) {
64+
res.StartFrom(2)
65+
return res, nil
66+
}),
67+
)
68+
scope.Require.NoError(err)
69+
70+
msg, err = reader.ReadMessage(ctx)
71+
scope.Require.NoError(err)
72+
73+
scope.Logf("Received message offset: %v", msg.Offset)
74+
75+
err = reader.Commit(ctx, msg)
76+
scope.Require.NoError(err)
77+
78+
scope.Require.EventuallyWithT(func(t *assert.CollectT) {
79+
c, err := scope.Driver().Topic().DescribeTopicConsumer(ctx, scope.TopicPath(), scope.TopicConsumerName(), topicoptions.IncludeConsumerStats())
80+
require.NoError(t, err)
81+
82+
require.EqualValues(t, 3, c.Partitions[0].PartitionConsumerStats.CommittedOffset)
83+
}, 10*time.Second, 100*time.Millisecond)
84+
}

0 commit comments

Comments
 (0)