Skip to content

Commit 00a8164

Browse files
committed
Add unit tests
1 parent 254ccfc commit 00a8164

File tree

4 files changed

+257
-30
lines changed

4 files changed

+257
-30
lines changed

src/Confluent.Kafka/ConsumerBuilder.cs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,7 @@ public ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
157157
public ConsumerBuilder<TKey, TValue> SetStatisticsHandler(
158158
Action<IConsumer<TKey, TValue>, string> statisticsHandler)
159159
{
160-
if (this.StatisticsHandler != null)
161-
{
162-
this.StatisticsHandler += statisticsHandler;
163-
}
164-
this.StatisticsHandler = statisticsHandler;
160+
this.StatisticsHandler += statisticsHandler;
165161
return this;
166162
}
167163

@@ -180,11 +176,7 @@ public ConsumerBuilder<TKey, TValue> SetStatisticsHandler(
180176
public ConsumerBuilder<TKey, TValue> SetErrorHandler(
181177
Action<IConsumer<TKey, TValue>, Error> errorHandler)
182178
{
183-
if (this.ErrorHandler != null)
184-
{
185-
this.ErrorHandler += errorHandler;
186-
}
187-
this.ErrorHandler = errorHandler;
179+
this.ErrorHandler += errorHandler;
188180
return this;
189181
}
190182

@@ -210,11 +202,7 @@ public ConsumerBuilder<TKey, TValue> SetErrorHandler(
210202
public ConsumerBuilder<TKey, TValue> SetLogHandler(
211203
Action<IConsumer<TKey, TValue>, LogMessage> logHandler)
212204
{
213-
if (this.LogHandler != null)
214-
{
215-
this.LogHandler += logHandler;
216-
}
217-
this.LogHandler = logHandler;
205+
this.LogHandler += logHandler;
218206
return this;
219207
}
220208

src/Confluent.Kafka/ProducerBuilder.cs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,7 @@ public ProducerBuilder(IEnumerable<KeyValuePair<string, string>> config)
167167
/// </remarks>
168168
public ProducerBuilder<TKey, TValue> SetStatisticsHandler(Action<IProducer<TKey, TValue>, string> statisticsHandler)
169169
{
170-
if (this.StatisticsHandler != null)
171-
{
172-
this.StatisticsHandler += statisticsHandler;
173-
}
174-
this.StatisticsHandler = statisticsHandler;
170+
this.StatisticsHandler += statisticsHandler;
175171
return this;
176172
}
177173

@@ -222,11 +218,7 @@ public ProducerBuilder<TKey, TValue> SetDefaultPartitioner(PartitionerDelegate p
222218
/// </remarks>
223219
public ProducerBuilder<TKey, TValue> SetErrorHandler(Action<IProducer<TKey, TValue>, Error> errorHandler)
224220
{
225-
if (this.ErrorHandler != null)
226-
{
227-
this.ErrorHandler += errorHandler;
228-
}
229-
this.ErrorHandler = errorHandler;
221+
this.ErrorHandler += errorHandler;
230222
return this;
231223
}
232224

@@ -251,11 +243,7 @@ public ProducerBuilder<TKey, TValue> SetErrorHandler(Action<IProducer<TKey, TVal
251243
/// </remarks>
252244
public ProducerBuilder<TKey, TValue> SetLogHandler(Action<IProducer<TKey, TValue>, LogMessage> logHandler)
253245
{
254-
if (this.LogHandler != null)
255-
{
256-
this.LogHandler += logHandler;
257-
}
258-
this.LogHandler = logHandler;
246+
this.LogHandler += logHandler;
259247
return this;
260248
}
261249

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using System;
2+
using Xunit;
3+
4+
namespace Confluent.Kafka.IntegrationTests;
5+
6+
public sealed class SkipWhenCITheory : TheoryAttribute
7+
{
8+
private const string JenkinsBuildIdEnvVarName = "BUILD_ID";
9+
10+
public SkipWhenCITheory(string reason)
11+
{
12+
Skip = Environment.GetEnvironmentVariables().Contains(JenkinsBuildIdEnvVarName)
13+
? reason
14+
: null;
15+
}
16+
}
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
// Copyright 2016-2017 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
#pragma warning disable xUnit1026
18+
19+
using System;
20+
using System.Linq;
21+
using System.Threading;
22+
using Xunit;
23+
24+
25+
namespace Confluent.Kafka.IntegrationTests
26+
{
27+
/// <summary>
28+
/// Test multiple calls to SetLogHandler, SetStatisticsHandler and SetErrorHandler
29+
/// </summary>
30+
public partial class Tests
31+
{
32+
private const string UnreachableBootstrapServers = "localhost:9000";
33+
34+
[Theory, MemberData(nameof(KafkaParameters))]
35+
public void ProducerBuilder_SetLogHandler(string bootstrapServers)
36+
{
37+
LogToFile("start ProducerBuilder_SetLogHandler");
38+
39+
var producerConfig = new ProducerConfig
40+
{
41+
BootstrapServers = bootstrapServers,
42+
Debug = "all"
43+
};
44+
45+
ManualResetEventSlim mres1 = new(), mres2 = new();
46+
47+
using var _ = new ProducerBuilder<string, string>(producerConfig)
48+
.SetLogHandler((_, _) => mres1.Set())
49+
.SetLogHandler((_, _) => mres2.Set())
50+
.Build();
51+
52+
Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
53+
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));
54+
55+
LogToFile("end ProducerBuilder_SetLogHandler");
56+
}
57+
58+
[Theory, MemberData(nameof(KafkaParameters))]
59+
public void ProducerBuilder_SetStatisticsHandler(string bootstrapServers)
60+
{
61+
LogToFile("start ProducerBuilder_SetStatisticsHandler");
62+
63+
var producerConfig = new ProducerConfig
64+
{
65+
BootstrapServers = bootstrapServers,
66+
StatisticsIntervalMs = 100
67+
};
68+
69+
ManualResetEventSlim mres1 = new(), mres2 = new();
70+
71+
using var _ = new ProducerBuilder<string, string>(producerConfig)
72+
.SetStatisticsHandler((_, _) => mres1.Set())
73+
.SetStatisticsHandler((_, _) => mres2.Set())
74+
.Build();
75+
76+
Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
77+
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));
78+
79+
LogToFile("end ProducerBuilder_SetStatisticsHandler");
80+
}
81+
82+
[Theory, InlineData(UnreachableBootstrapServers)]
83+
public void ProducerBuilder_SetErrorHandler(string bootstrapServers)
84+
{
85+
LogToFile("start ProducerBuilder_SetErrorHandler");
86+
87+
var producerConfig = new ProducerConfig
88+
{
89+
BootstrapServers = bootstrapServers
90+
};
91+
92+
ManualResetEventSlim mres1 = new(), mres2 = new();
93+
94+
using var _ = new ProducerBuilder<string, string>(producerConfig)
95+
.SetErrorHandler((_, _) => mres1.Set())
96+
.SetErrorHandler((_, _) => mres2.Set())
97+
.Build();
98+
99+
Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
100+
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));
101+
102+
LogToFile("end ProducerBuilder_SetErrorHandler");
103+
}
104+
105+
[Theory, MemberData(nameof(KafkaParameters))]
106+
public void ConsumerBuilder_SetLogHandler(string bootstrapServers)
107+
{
108+
LogToFile("start ConsumerBuilder_SetLogHandler");
109+
110+
int N = 2;
111+
var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N);
112+
113+
var consumerConfig = new ConsumerConfig
114+
{
115+
GroupId = Guid.NewGuid().ToString(),
116+
BootstrapServers = bootstrapServers,
117+
SessionTimeoutMs = 6000,
118+
EnablePartitionEof = true,
119+
Debug = "all"
120+
};
121+
122+
ManualResetEventSlim mres1 = new(), mres2 = new();
123+
124+
using var consumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig)
125+
.SetPartitionsAssignedHandler((c, partitions) =>
126+
{
127+
Assert.Single(partitions);
128+
Assert.Equal(firstProduced.TopicPartition, partitions[0]);
129+
return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset));
130+
})
131+
.SetLogHandler((_, _) => mres1.Set())
132+
.SetLogHandler((_, _) => mres2.Set())
133+
.Build();
134+
consumer.Subscribe(singlePartitionTopic);
135+
136+
Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
137+
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));
138+
139+
LogToFile("end ConsumerBuilder_SetLogHandler");
140+
}
141+
142+
[Theory, MemberData(nameof(KafkaParameters))]
143+
public void ConsumerBuilder_SetStatisticsHandler(string bootstrapServers)
144+
{
145+
LogToFile("start ConsumerBuilder_SetStatisticsHandler");
146+
147+
int N = 2;
148+
var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N);
149+
150+
var consumerConfig = new ConsumerConfig
151+
{
152+
GroupId = Guid.NewGuid().ToString(),
153+
BootstrapServers = bootstrapServers,
154+
SessionTimeoutMs = 6000,
155+
EnablePartitionEof = true,
156+
StatisticsIntervalMs = 100
157+
};
158+
159+
ManualResetEventSlim mres1 = new(), mres2 = new();
160+
161+
using (var consumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig)
162+
.SetPartitionsAssignedHandler((c, partitions) =>
163+
{
164+
Assert.Single(partitions);
165+
Assert.Equal(firstProduced.TopicPartition, partitions[0]);
166+
return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset));
167+
})
168+
.SetStatisticsHandler((_, _) => mres1.Set())
169+
.SetStatisticsHandler((_, _) => mres2.Set())
170+
.Build())
171+
{
172+
consumer.Subscribe(singlePartitionTopic);
173+
174+
int msgCnt = 0;
175+
while (true)
176+
{
177+
var record = consumer.Consume(TimeSpan.FromMilliseconds(100));
178+
if (record == null) { continue; }
179+
if (record.IsPartitionEOF) { break; }
180+
msgCnt += 1;
181+
}
182+
183+
Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
184+
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));
185+
consumer.Close();
186+
}
187+
188+
LogToFile("end ConsumerBuilder_SetStatisticsHandler");
189+
}
190+
191+
[SkipWhenCITheory("Requires to stop the broker in the while loop to simulate broker is down."), MemberData(nameof(KafkaParameters))]
192+
public void ConsumerBuilder_SetErrorHandler(string bootstrapServers)
193+
{
194+
LogToFile("start ConsumerBuilder_SetErrorHandler");
195+
196+
int N = 2;
197+
var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N);
198+
199+
var consumerConfig = new ConsumerConfig
200+
{
201+
GroupId = Guid.NewGuid().ToString(),
202+
BootstrapServers = bootstrapServers,
203+
SessionTimeoutMs = 6000
204+
};
205+
206+
bool errorHandler1Called = false, errorHandler2Called = false;
207+
208+
using (var consumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig)
209+
.SetPartitionsAssignedHandler((c, partitions) =>
210+
{
211+
Assert.Single(partitions);
212+
Assert.Equal(firstProduced.TopicPartition, partitions[0]);
213+
return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset));
214+
})
215+
.SetErrorHandler((_, _) => errorHandler1Called = true)
216+
.SetErrorHandler((_, _) => errorHandler2Called = true)
217+
.Build())
218+
{
219+
consumer.Subscribe(singlePartitionTopic);
220+
221+
int msgCnt = 0;
222+
while (!errorHandler1Called && !errorHandler2Called)
223+
{
224+
var record = consumer.Consume(TimeSpan.FromMilliseconds(100));
225+
if (record == null) { continue; }
226+
msgCnt += 1;
227+
}
228+
229+
consumer.Close();
230+
}
231+
232+
LogToFile("end ConsumerBuilder_SetErrorHandler");
233+
}
234+
}
235+
}

0 commit comments

Comments
 (0)