Skip to content

Commit 245f94c

Browse files
authored
Add default rebalance callback (#1939)
* Add default rebalance callback * Fix tests * Update changelog * Update according to the comments
1 parent dd431bd commit 245f94c

File tree

6 files changed

+81
-7
lines changed

6 files changed

+81
-7
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
- Upgraded `NJsonSchema` to v10.6.3
66
- Added `LatestCompatibilityStrict` configuration property to JsonSerializerConfig to check the compatibility with latest schema
77
when `UseLatestVersion` is set to true.
8+
- Added DeleteConsumerGroupOffset to AdminClient.
9+
10+
## Fixes
11+
12+
- During a group rebalance, partitions are now always revoked as a side effect of a call to Consume, whether or not a partitions revoked handler has been specified. Previously, if no handler was specified, the timing of when the consumer lost ownership of partitions during a rebalance was arbitrarily, frequently resulting in an erroneous state exception when committing or storing offsets.
813

914

1015
# 1.9.4

src/Confluent.Kafka/Consumer.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -691,10 +691,8 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
691691

692692
IntPtr configPtr = configHandle.DangerousGetHandle();
693693

694-
if (partitionsAssignedHandler != null || partitionsRevokedHandler != null || partitionsLostHandler != null)
695-
{
696-
Librdkafka.conf_set_rebalance_cb(configPtr, rebalanceDelegate);
697-
}
694+
Librdkafka.conf_set_rebalance_cb(configPtr, rebalanceDelegate);
695+
698696
if (offsetsCommittedHandler != null)
699697
{
700698
Librdkafka.conf_set_offset_commit_cb(configPtr, commitDelegate);

test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Drain.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public void Consumer_Drain(string bootstrapServers)
6060
while (consumer.Assignment.Count == 0)
6161
{
6262
Thread.Sleep(1000);
63+
consumer.Consume(TimeSpan.FromSeconds(1));
6364
Assert.True(cnt++ < 10);
6465
}
6566
var committed = consumer.Committed(TimeSpan.FromSeconds(10));

test/Confluent.Kafka.IntegrationTests/Tests/Consumer_StoreOffset_ErrState.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ public void Consumer_StoreOffset_ErrState(string bootstrapServers)
6060
consumer2.Subscribe(topic.Name);
6161

6262
// wait until each consumer is assigned to one partition.
63-
cr = consumer2.Consume();
63+
consumer2.Consume(TimeSpan.FromSeconds(10));
64+
consumer1.Consume(TimeSpan.FromSeconds(10));
65+
66+
cr = consumer2.Consume(TimeSpan.FromSeconds(10));
6467
Assert.Equal(1, consumer1.Assignment.Count);
6568

6669
// StoreOffset should throw when attempting to assign to a

test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Subscription_DisjointTopics.cs

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ private void DisjointTopicsSubscribeTest(String bootstrapServers, PartitionAssig
5959
Assert.Equal(topic1.Name, consumer1.Assignment[0].Topic);
6060

6161
consumer2.Subscribe(topic2.Name);
62+
// Allow rebalance to complete
63+
consumer2.Consume(TimeSpan.FromSeconds(10));
64+
consumer1.Consume(TimeSpan.FromSeconds(10));
65+
// Get the assignment
6266
consumer1.Consume(TimeSpan.FromSeconds(10));
6367
Assert.Equal(4, consumer1.Assignment.Count);
6468
Assert.Equal(topic1.Name, consumer1.Assignment[0].Topic);
@@ -67,6 +71,11 @@ private void DisjointTopicsSubscribeTest(String bootstrapServers, PartitionAssig
6771
Assert.Equal(topic2.Name, consumer2.Assignment[0].Topic);
6872

6973
consumer3.Subscribe(topic3.Name);
74+
// Allow rebalance to complete
75+
consumer3.Consume(TimeSpan.FromSeconds(10));
76+
consumer1.Consume(TimeSpan.FromSeconds(10));
77+
consumer2.Consume(TimeSpan.FromSeconds(10));
78+
// Get the assignment
7079
consumer1.Consume(TimeSpan.FromSeconds(10));
7180
Assert.Equal(4, consumer1.Assignment.Count);
7281
Assert.Equal(topic1.Name, consumer1.Assignment[0].Topic);
@@ -78,6 +87,12 @@ private void DisjointTopicsSubscribeTest(String bootstrapServers, PartitionAssig
7887
Assert.Equal(topic3.Name, consumer3.Assignment[0].Topic);
7988

8089
consumer4.Subscribe(topic4.Name);
90+
// Allow rebalance to complete
91+
consumer4.Consume(TimeSpan.FromSeconds(10));
92+
consumer1.Consume(TimeSpan.FromSeconds(10));
93+
consumer2.Consume(TimeSpan.FromSeconds(10));
94+
consumer3.Consume(TimeSpan.FromSeconds(10));
95+
// Get the assignment
8196
consumer1.Consume(TimeSpan.FromSeconds(10));
8297
Assert.Equal(4, consumer1.Assignment.Count);
8398
Assert.Equal(topic1.Name, consumer1.Assignment[0].Topic);
@@ -92,13 +107,35 @@ private void DisjointTopicsSubscribeTest(String bootstrapServers, PartitionAssig
92107
Assert.Equal(topic4.Name, consumer4.Assignment[0].Topic);
93108

94109
consumer5.Subscribe(topic2.Name);
110+
// Allow rebalance to complete
111+
consumer5.Consume(TimeSpan.FromSeconds(10));
112+
consumer1.Consume(TimeSpan.FromSeconds(10));
113+
consumer2.Consume(TimeSpan.FromSeconds(10));
114+
consumer3.Consume(TimeSpan.FromSeconds(10));
115+
consumer4.Consume(TimeSpan.FromSeconds(10));
116+
// Get the assignment
117+
consumer2.Consume(TimeSpan.FromSeconds(10));
95118
consumer5.Consume(TimeSpan.FromSeconds(10));
96119
Assert.Equal(2, consumer2.Assignment.Count);
97120
Assert.Equal(2, consumer5.Assignment.Count);
98121
Assert.Equal(topic2.Name, consumer2.Assignment[0].Topic);
99122
Assert.Equal(topic2.Name, consumer5.Assignment[0].Topic);
100123

101124
consumer6.Subscribe(new List<string> { topic3.Name, topic4.Name });
125+
// Allow rebalance to complete
126+
consumer6.Consume(TimeSpan.FromSeconds(10));
127+
consumer1.Consume(TimeSpan.FromSeconds(10));
128+
consumer2.Consume(TimeSpan.FromSeconds(10));
129+
consumer3.Consume(TimeSpan.FromSeconds(10));
130+
consumer4.Consume(TimeSpan.FromSeconds(10));
131+
consumer5.Consume(TimeSpan.FromSeconds(10));
132+
consumer6.Consume(TimeSpan.FromSeconds(10));
133+
// Get the assignment
134+
consumer1.Consume(TimeSpan.FromSeconds(10));
135+
consumer2.Consume(TimeSpan.FromSeconds(10));
136+
consumer3.Consume(TimeSpan.FromSeconds(10));
137+
consumer4.Consume(TimeSpan.FromSeconds(10));
138+
consumer5.Consume(TimeSpan.FromSeconds(10));
102139
consumer6.Consume(TimeSpan.FromSeconds(10));
103140
Assert.True(consumer3.Assignment.Count > 0);
104141
Assert.True(consumer4.Assignment.Count > 0);
@@ -109,11 +146,38 @@ private void DisjointTopicsSubscribeTest(String bootstrapServers, PartitionAssig
109146
Assert.True(consumer6.Assignment[0].Topic == topic3.Name || consumer6.Assignment[0].Topic == topic4.Name);
110147

111148
consumer1.Unsubscribe();
112-
consumer2.Consume(TimeSpan.FromSeconds(10)); // wait for rebalance.
113-
Assert.Equal(0, consumer1.Assignment.Count);
149+
// Allow rebalance to complete
150+
consumer1.Consume(TimeSpan.FromSeconds(10));
151+
consumer2.Consume(TimeSpan.FromSeconds(10));
152+
consumer3.Consume(TimeSpan.FromSeconds(10));
153+
consumer4.Consume(TimeSpan.FromSeconds(10));
154+
consumer5.Consume(TimeSpan.FromSeconds(10));
155+
consumer6.Consume(TimeSpan.FromSeconds(10));
156+
// Get the assignment
157+
consumer1.Consume(TimeSpan.FromSeconds(10));
158+
consumer2.Consume(TimeSpan.FromSeconds(10));
159+
consumer3.Consume(TimeSpan.FromSeconds(10));
160+
consumer4.Consume(TimeSpan.FromSeconds(10));
161+
consumer5.Consume(TimeSpan.FromSeconds(10));
162+
consumer6.Consume(TimeSpan.FromSeconds(10));
114163

164+
Assert.Equal(0, consumer1.Assignment.Count);
165+
// Allow rebalance to complete
115166
consumer1.Subscribe(topic1.Name);
116167
consumer1.Consume(TimeSpan.FromSeconds(10));
168+
consumer2.Consume(TimeSpan.FromSeconds(10));
169+
consumer3.Consume(TimeSpan.FromSeconds(10));
170+
consumer4.Consume(TimeSpan.FromSeconds(10));
171+
consumer5.Consume(TimeSpan.FromSeconds(10));
172+
consumer6.Consume(TimeSpan.FromSeconds(10));
173+
// Get the assignment
174+
consumer1.Consume(TimeSpan.FromSeconds(10));
175+
consumer2.Consume(TimeSpan.FromSeconds(10));
176+
consumer3.Consume(TimeSpan.FromSeconds(10));
177+
consumer4.Consume(TimeSpan.FromSeconds(10));
178+
consumer5.Consume(TimeSpan.FromSeconds(10));
179+
consumer6.Consume(TimeSpan.FromSeconds(10));
180+
117181
Assert.Equal(4, consumer1.Assignment.Count);
118182
Assert.Equal(topic1.Name, consumer1.Assignment[0].Topic);
119183

test/Confluent.Kafka.IntegrationTests/Tests/Consumer_Subscription_Regex.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,18 @@ public void Consumer_Subscription_Regex(string bootstrapServers)
6464
// discovered and corresponding rebalance.
6565
using var topic3 = new TemporaryTopic(prefix, bootstrapServers, 1);
6666
Thread.Sleep(topicMetadataRefreshPeriodMs + rebalanceWaitMs);
67+
consumer.Consume(TimeSpan.FromSeconds(10));
6768
Assert.Equal(3, consumer.Assignment.Count);
6869

6970
// Repeat a couple more times...
7071
using var topic4 = new TemporaryTopic(prefix, bootstrapServers, 1);
7172
Thread.Sleep(topicMetadataRefreshPeriodMs + rebalanceWaitMs);
73+
consumer.Consume(TimeSpan.FromSeconds(10));
7274
Assert.Equal(4, consumer.Assignment.Count);
7375

7476
using var topic5 = new TemporaryTopic(prefix, bootstrapServers, 1);
7577
Thread.Sleep(topicMetadataRefreshPeriodMs + rebalanceWaitMs);
78+
consumer.Consume(TimeSpan.FromSeconds(10));
7679
Assert.Equal(5, consumer.Assignment.Count);
7780

7881
consumer.Close();

0 commit comments

Comments
 (0)