Skip to content

Commit c71fae9

Browse files
author
Matt Howlett
authored
A couple of consumer integration tests (#1881)
* A couple of consumer integration tests * Changes following review feedback
1 parent 76da76c commit c71fae9

File tree

3 files changed

+237
-1
lines changed

3 files changed

+237
-1
lines changed

test/Confluent.Kafka.IntegrationTests/TemporaryTopic.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@ public class TemporaryTopic : IDisposable
2828
public string Name { get; set; }
2929

3030
public TemporaryTopic(string bootstrapServers, int numPartitions)
31+
: this("dotnet_test_", bootstrapServers, numPartitions) {}
32+
33+
public TemporaryTopic(string prefix, string bootstrapServers, int numPartitions)
3134
{
3235
this.bootstrapServers = bootstrapServers;
33-
this.Name = "dotnet_test_" + Guid.NewGuid().ToString();
36+
this.Name = prefix + Guid.NewGuid().ToString();
3437

3538
var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();
3639
adminClient.CreateTopicsAsync(new List<TopicSpecification> {
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
#pragma warning disable xUnit1026
18+
19+
using System;
20+
using System.Collections.Generic;
21+
using Xunit;
22+
23+
24+
namespace Confluent.Kafka.IntegrationTests
25+
{
26+
public partial class Tests
27+
{
28+
private void DisjointTopicsSubscribeTest(String bootstrapServers, PartitionAssignmentStrategy assignmentStrategy)
29+
{
30+
var consumerConfig = new ConsumerConfig
31+
{
32+
GroupId = Guid.NewGuid().ToString(),
33+
BootstrapServers = bootstrapServers,
34+
SessionTimeoutMs = 6000,
35+
AutoOffsetReset = AutoOffsetReset.Earliest,
36+
PartitionAssignmentStrategy = assignmentStrategy
37+
};
38+
39+
using (var topic1 = new TemporaryTopic(bootstrapServers, 4))
40+
using (var topic2 = new TemporaryTopic(bootstrapServers, 4))
41+
using (var topic3 = new TemporaryTopic(bootstrapServers, 4))
42+
using (var topic4 = new TemporaryTopic(bootstrapServers, 4))
43+
using (var consumer1 = new ConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
44+
using (var consumer2 = new ConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
45+
using (var consumer3 = new ConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
46+
using (var consumer4 = new ConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
47+
using (var consumer5 = new ConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
48+
using (var consumer6 = new ConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
49+
{
50+
Util.ProduceNullStringMessages(bootstrapServers, topic1.Name, 100, 1000);
51+
Util.ProduceNullStringMessages(bootstrapServers, topic2.Name, 100, 1000);
52+
Util.ProduceNullStringMessages(bootstrapServers, topic3.Name, 100, 1000);
53+
Util.ProduceNullStringMessages(bootstrapServers, topic4.Name, 100, 1000);
54+
55+
consumer1.Subscribe(topic1.Name);
56+
// Consume -> wait for assignment
57+
consumer1.Consume(TimeSpan.FromSeconds(10));
58+
Assert.Equal(4, consumer1.Assignment.Count);
59+
Assert.Equal(topic1.Name, consumer1.Assignment[0].Topic);
60+
61+
consumer2.Subscribe(topic2.Name);
62+
consumer1.Consume(TimeSpan.FromSeconds(10));
63+
Assert.Equal(4, consumer1.Assignment.Count);
64+
Assert.Equal(topic1.Name, consumer1.Assignment[0].Topic);
65+
consumer2.Consume(TimeSpan.FromSeconds(10));
66+
Assert.Equal(4, consumer2.Assignment.Count);
67+
Assert.Equal(topic2.Name, consumer2.Assignment[0].Topic);
68+
69+
consumer3.Subscribe(topic3.Name);
70+
consumer1.Consume(TimeSpan.FromSeconds(10));
71+
Assert.Equal(4, consumer1.Assignment.Count);
72+
Assert.Equal(topic1.Name, consumer1.Assignment[0].Topic);
73+
consumer2.Consume(TimeSpan.FromSeconds(10));
74+
Assert.Equal(4, consumer2.Assignment.Count);
75+
Assert.Equal(topic2.Name, consumer2.Assignment[0].Topic);
76+
consumer3.Consume(TimeSpan.FromSeconds(10));
77+
Assert.Equal(4, consumer3.Assignment.Count);
78+
Assert.Equal(topic3.Name, consumer3.Assignment[0].Topic);
79+
80+
consumer4.Subscribe(topic4.Name);
81+
consumer1.Consume(TimeSpan.FromSeconds(10));
82+
Assert.Equal(4, consumer1.Assignment.Count);
83+
Assert.Equal(topic1.Name, consumer1.Assignment[0].Topic);
84+
consumer2.Consume(TimeSpan.FromSeconds(10));
85+
Assert.Equal(4, consumer2.Assignment.Count);
86+
Assert.Equal(topic2.Name, consumer2.Assignment[0].Topic);
87+
consumer3.Consume(TimeSpan.FromSeconds(10));
88+
Assert.Equal(4, consumer3.Assignment.Count);
89+
Assert.Equal(topic3.Name, consumer3.Assignment[0].Topic);
90+
consumer4.Consume(TimeSpan.FromSeconds(10));
91+
Assert.Equal(4, consumer4.Assignment.Count);
92+
Assert.Equal(topic4.Name, consumer4.Assignment[0].Topic);
93+
94+
consumer5.Subscribe(topic2.Name);
95+
consumer5.Consume(TimeSpan.FromSeconds(10));
96+
Assert.Equal(2, consumer2.Assignment.Count);
97+
Assert.Equal(2, consumer5.Assignment.Count);
98+
Assert.Equal(topic2.Name, consumer2.Assignment[0].Topic);
99+
Assert.Equal(topic2.Name, consumer5.Assignment[0].Topic);
100+
101+
consumer6.Subscribe(new List<string> { topic3.Name, topic4.Name });
102+
consumer6.Consume(TimeSpan.FromSeconds(10));
103+
Assert.True(consumer3.Assignment.Count > 0);
104+
Assert.True(consumer4.Assignment.Count > 0);
105+
Assert.True(consumer6.Assignment.Count > 0);
106+
Assert.Equal(8, consumer3.Assignment.Count + consumer4.Assignment.Count + consumer6.Assignment.Count);
107+
Assert.Equal(topic3.Name, consumer3.Assignment[0].Topic);
108+
Assert.Equal(topic4.Name, consumer4.Assignment[0].Topic);
109+
Assert.True(consumer6.Assignment[0].Topic == topic3.Name || consumer6.Assignment[0].Topic == topic4.Name);
110+
111+
consumer1.Unsubscribe();
112+
consumer2.Consume(TimeSpan.FromSeconds(10)); // wait for rebalance.
113+
Assert.Equal(0, consumer1.Assignment.Count);
114+
115+
consumer1.Subscribe(topic1.Name);
116+
consumer1.Consume(TimeSpan.FromSeconds(10));
117+
Assert.Equal(4, consumer1.Assignment.Count);
118+
Assert.Equal(topic1.Name, consumer1.Assignment[0].Topic);
119+
120+
consumer1.Close();
121+
consumer2.Close();
122+
consumer3.Close();
123+
consumer4.Close();
124+
consumer5.Close();
125+
consumer6.Close();
126+
}
127+
}
128+
129+
/// <summary>
130+
/// Check various scenarios where the same consumer group subscribes to
131+
/// different topics in a disjoint fashion.
132+
/// </summary>
133+
[Theory, MemberData(nameof(KafkaParameters))]
134+
public void Consumer_Subscription_DisjointTopics(string bootstrapServers)
135+
{
136+
LogToFile("start Consumer_Subscription_DisjointTopics");
137+
138+
DisjointTopicsSubscribeTest(bootstrapServers, PartitionAssignmentStrategy.Range);
139+
DisjointTopicsSubscribeTest(bootstrapServers, PartitionAssignmentStrategy.RoundRobin);
140+
DisjointTopicsSubscribeTest(bootstrapServers, PartitionAssignmentStrategy.CooperativeSticky);
141+
142+
Assert.Equal(0, Library.HandleCount);
143+
LogToFile("end Consumer_Subscription_DisjointTopics");
144+
}
145+
146+
}
147+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
#pragma warning disable xUnit1026
18+
19+
using System;
20+
using System.Threading;
21+
using Xunit;
22+
23+
24+
namespace Confluent.Kafka.IntegrationTests
25+
{
26+
public partial class Tests
27+
{
28+
/// <summary>
29+
/// Test regex subscription in scenario where new topics are added.
30+
/// </summary>
31+
[Theory, MemberData(nameof(KafkaParameters))]
32+
public void Consumer_Subscription_Regex(string bootstrapServers)
33+
{
34+
LogToFile("start Consumer_Subscription_Regex");
35+
36+
var topicMetadataRefreshPeriodMs = 1000;
37+
var rebalanceWaitMs = 2000;
38+
39+
var consumerConfig = new ConsumerConfig
40+
{
41+
GroupId = Guid.NewGuid().ToString(),
42+
BootstrapServers = bootstrapServers,
43+
SessionTimeoutMs = 6000,
44+
AutoOffsetReset = AutoOffsetReset.Earliest,
45+
TopicMetadataRefreshIntervalMs = topicMetadataRefreshPeriodMs
46+
};
47+
48+
string prefix = "dotnet_test_" + Guid.NewGuid().ToString() + "_";
49+
50+
using (var topic1 = new TemporaryTopic(prefix, bootstrapServers, 1))
51+
using (var topic2 = new TemporaryTopic(prefix, bootstrapServers, 1))
52+
using (var consumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
53+
{
54+
Util.ProduceNullStringMessages(bootstrapServers, topic1.Name, 100, 100);
55+
Util.ProduceNullStringMessages(bootstrapServers, topic2.Name, 100, 100);
56+
57+
consumer.Subscribe("^" + prefix + ".*$");
58+
59+
// Wait for assignment
60+
consumer.Consume(TimeSpan.FromSeconds(10));
61+
Assert.Equal(2, consumer.Assignment.Count);
62+
63+
// Create new matching topic, wait long enough for metadata to be
64+
// discovered and corresponding rebalance.
65+
using var topic3 = new TemporaryTopic(prefix, bootstrapServers, 1);
66+
Thread.Sleep(topicMetadataRefreshPeriodMs + rebalanceWaitMs);
67+
Assert.Equal(3, consumer.Assignment.Count);
68+
69+
// Repeat a couple more times...
70+
using var topic4 = new TemporaryTopic(prefix, bootstrapServers, 1);
71+
Thread.Sleep(topicMetadataRefreshPeriodMs + rebalanceWaitMs);
72+
Assert.Equal(4, consumer.Assignment.Count);
73+
74+
using var topic5 = new TemporaryTopic(prefix, bootstrapServers, 1);
75+
Thread.Sleep(topicMetadataRefreshPeriodMs + rebalanceWaitMs);
76+
Assert.Equal(5, consumer.Assignment.Count);
77+
78+
consumer.Close();
79+
}
80+
81+
Assert.Equal(0, Library.HandleCount);
82+
LogToFile("end Consumer_Subscription_Regex");
83+
}
84+
85+
}
86+
}

0 commit comments

Comments
 (0)