|
4 | 4 | package integration |
5 | 5 |
|
6 | 6 | import ( |
| 7 | + "context" |
| 8 | + "github.com/stretchr/testify/assert" |
| 9 | + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" |
| 10 | + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter" |
7 | 11 | "testing" |
| 12 | + "time" |
8 | 13 |
|
9 | 14 | "github.com/stretchr/testify/require" |
10 | 15 |
|
@@ -35,3 +40,46 @@ func TestTopicReadMessages(t *testing.T) { |
35 | 40 | require.NoError(t, err) |
36 | 41 | require.NotEmpty(t, batch.Messages) |
37 | 42 | } |
| 43 | + |
| 44 | +func TestRegression1802_StartPartitionWithOffsetHandler(t *testing.T) { |
| 45 | + scope := newScope(t) |
| 46 | + ctx := scope.Ctx |
| 47 | + |
| 48 | + scope.Require.NoError(scope.TopicWriter().Write(ctx, |
| 49 | + topicwriter.Message{}, |
| 50 | + topicwriter.Message{}, |
| 51 | + topicwriter.Message{}, |
| 52 | + )) |
| 53 | + |
| 54 | + msg, err := scope.TopicReaderNamed("first").ReadMessage(ctx) |
| 55 | + scope.Require.NoError(err) |
| 56 | + err = scope.TopicReaderNamed("first").Commit(ctx, msg) |
| 57 | + scope.Require.NoError(err) |
| 58 | + err = scope.TopicReaderNamed("first").Close(ctx) |
| 59 | + scope.Require.NoError(err) |
| 60 | + |
| 61 | + reader, err := scope.DriverWithGRPCLogging().Topic().StartReader( |
| 62 | + scope.TopicConsumerName(), topicoptions.ReadTopic(scope.TopicPath()), |
| 63 | + topicoptions.WithReaderGetPartitionStartOffset(func(ctx context.Context, req topicoptions.GetPartitionStartOffsetRequest) (res topicoptions.GetPartitionStartOffsetResponse, err error) { |
| 64 | + res.StartFrom(2) |
| 65 | + return res, nil |
| 66 | + }), |
| 67 | + ) |
| 68 | + scope.Require.NoError(err) |
| 69 | + |
| 70 | + msg, err = reader.ReadMessage(ctx) |
| 71 | + scope.Require.NoError(err) |
| 72 | + |
| 73 | + scope.Logf("Received message offset: %v", msg.Offset) |
| 74 | + |
| 75 | + err = reader.Commit(ctx, msg) |
| 76 | + scope.Require.NoError(err) |
| 77 | + |
| 78 | + scope.Require.EventuallyWithT(func(t *assert.CollectT) { |
| 79 | + c, err := scope.Driver().Topic().DescribeTopicConsumer(ctx, scope.TopicPath(), scope.TopicConsumerName(), topicoptions.IncludeConsumerStats()) |
| 80 | + require.NoError(t, err) |
| 81 | + |
| 82 | + require.EqualValues(t, 3, c.Partitions[0].PartitionConsumerStats.CommittedOffset) |
| 83 | + }, 10*time.Second, 100*time.Millisecond) |
| 84 | + |
| 85 | +} |
0 commit comments