Skip to content

Commit 76da76c

Browse files
author
Matt Howlett
authored
Integration test for StoreOffset on unassigned partition (#1877)
1 parent 7567e89 commit 76da76c

File tree

2 files changed

+93
-2
lines changed

2 files changed

+93
-2
lines changed

test/Confluent.Kafka.IntegrationTests/Tests/Consumer_StoreOffset.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public partial class Tests
2828
/// Simple Consumer StoreOffsets test.
2929
/// </summary>
3030
[Theory, MemberData(nameof(KafkaParameters))]
31-
public void Consumer_StoreOffsets(string bootstrapServers)
31+
public void Consumer_StoreOffset(string bootstrapServers)
3232
{
3333
LogToFile("start Consumer_StoreOffset");
3434

@@ -41,7 +41,7 @@ public void Consumer_StoreOffsets(string bootstrapServers)
4141
EnableAutoOffsetStore = false
4242
};
4343

44-
var producerConfig = new ProducerConfig{ BootstrapServers = bootstrapServers };
44+
var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
4545

4646
IEnumerable<TopicPartition> assignment = null;
4747

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
#pragma warning disable xUnit1026
18+
19+
using System;
20+
using Xunit;
21+
22+
23+
namespace Confluent.Kafka.IntegrationTests
24+
{
25+
public partial class Tests
26+
{
27+
/// <summary>
28+
/// Test behavior of StoreOffset on unassigned partition.
29+
/// </summary>
30+
[Theory, MemberData(nameof(KafkaParameters))]
31+
public void Consumer_StoreOffset_ErrState(string bootstrapServers)
32+
{
33+
LogToFile("start Consumer_StoreOffset_ErrState");
34+
35+
var consumerConfig = new ConsumerConfig
36+
{
37+
GroupId = Guid.NewGuid().ToString(),
38+
BootstrapServers = bootstrapServers,
39+
AutoOffsetReset = AutoOffsetReset.Earliest,
40+
EnableAutoCommit = true,
41+
EnableAutoOffsetStore = false
42+
};
43+
44+
using (var topic = new TemporaryTopic(bootstrapServers, 2))
45+
using (var consumer1 = new ConsumerBuilder<Null, string>(consumerConfig).Build())
46+
using (var consumer2 = new ConsumerBuilder<Null, string>(consumerConfig).Build())
47+
{
48+
Util.ProduceNullStringMessages(bootstrapServers, topic.Name, 100, 1000);
49+
50+
consumer1.Subscribe(topic.Name);
51+
52+
// wait until consumer is assigned to both partitions.
53+
ConsumeResult<Null, string> cr = consumer1.Consume();
54+
Assert.Equal(2, consumer1.Assignment.Count);
55+
56+
// store offsets on both partitions should not throw.
57+
consumer1.StoreOffset(new TopicPartitionOffset(topic.Name, 0, 1));
58+
consumer1.StoreOffset(new TopicPartitionOffset(topic.Name, 1, 1));
59+
60+
consumer2.Subscribe(topic.Name);
61+
62+
// wait until each consumer is assigned to one partition.
63+
cr = consumer2.Consume();
64+
Assert.Equal(1, consumer1.Assignment.Count);
65+
66+
// StoreOffset should throw when attempting to assign to a
67+
// partition no longer assigned.
68+
bool threw = false;
69+
try
70+
{
71+
consumer1.StoreOffset(new TopicPartitionOffset(topic.Name, 0, 2));
72+
consumer1.StoreOffset(new TopicPartitionOffset(topic.Name, 1, 2));
73+
}
74+
catch (KafkaException e)
75+
{
76+
Assert.Equal(ErrorCode.Local_State, e.Error.Code);
77+
Assert.False(e.Error.IsFatal);
78+
threw = true;
79+
}
80+
Assert.True(threw);
81+
82+
consumer1.Close();
83+
consumer2.Close();
84+
}
85+
86+
Assert.Equal(0, Library.HandleCount);
87+
LogToFile("end Consumer_StoreOffset_ErrState");
88+
}
89+
90+
}
91+
}

0 commit comments

Comments
 (0)