Skip to content

Commit 2694696

Browse files
committed
Add unit tests
1 parent 254ccfc commit 2694696

File tree

3 files changed

+196
-30
lines changed

3 files changed

+196
-30
lines changed

src/Confluent.Kafka/ConsumerBuilder.cs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,7 @@ public ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
157157
public ConsumerBuilder<TKey, TValue> SetStatisticsHandler(
158158
Action<IConsumer<TKey, TValue>, string> statisticsHandler)
159159
{
160-
if (this.StatisticsHandler != null)
161-
{
162-
this.StatisticsHandler += statisticsHandler;
163-
}
164-
this.StatisticsHandler = statisticsHandler;
160+
this.StatisticsHandler += statisticsHandler;
165161
return this;
166162
}
167163

@@ -180,11 +176,7 @@ public ConsumerBuilder<TKey, TValue> SetStatisticsHandler(
180176
public ConsumerBuilder<TKey, TValue> SetErrorHandler(
181177
Action<IConsumer<TKey, TValue>, Error> errorHandler)
182178
{
183-
if (this.ErrorHandler != null)
184-
{
185-
this.ErrorHandler += errorHandler;
186-
}
187-
this.ErrorHandler = errorHandler;
179+
this.ErrorHandler += errorHandler;
188180
return this;
189181
}
190182

@@ -210,11 +202,7 @@ public ConsumerBuilder<TKey, TValue> SetErrorHandler(
210202
public ConsumerBuilder<TKey, TValue> SetLogHandler(
211203
Action<IConsumer<TKey, TValue>, LogMessage> logHandler)
212204
{
213-
if (this.LogHandler != null)
214-
{
215-
this.LogHandler += logHandler;
216-
}
217-
this.LogHandler = logHandler;
205+
this.LogHandler += logHandler;
218206
return this;
219207
}
220208

src/Confluent.Kafka/ProducerBuilder.cs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,7 @@ public ProducerBuilder(IEnumerable<KeyValuePair<string, string>> config)
167167
/// </remarks>
168168
public ProducerBuilder<TKey, TValue> SetStatisticsHandler(Action<IProducer<TKey, TValue>, string> statisticsHandler)
169169
{
170-
if (this.StatisticsHandler != null)
171-
{
172-
this.StatisticsHandler += statisticsHandler;
173-
}
174-
this.StatisticsHandler = statisticsHandler;
170+
this.StatisticsHandler += statisticsHandler;
175171
return this;
176172
}
177173

@@ -222,11 +218,7 @@ public ProducerBuilder<TKey, TValue> SetDefaultPartitioner(PartitionerDelegate p
222218
/// </remarks>
223219
public ProducerBuilder<TKey, TValue> SetErrorHandler(Action<IProducer<TKey, TValue>, Error> errorHandler)
224220
{
225-
if (this.ErrorHandler != null)
226-
{
227-
this.ErrorHandler += errorHandler;
228-
}
229-
this.ErrorHandler = errorHandler;
221+
this.ErrorHandler += errorHandler;
230222
return this;
231223
}
232224

@@ -251,11 +243,7 @@ public ProducerBuilder<TKey, TValue> SetErrorHandler(Action<IProducer<TKey, TVal
251243
/// </remarks>
252244
public ProducerBuilder<TKey, TValue> SetLogHandler(Action<IProducer<TKey, TValue>, LogMessage> logHandler)
253245
{
254-
if (this.LogHandler != null)
255-
{
256-
this.LogHandler += logHandler;
257-
}
258-
this.LogHandler = logHandler;
246+
this.LogHandler += logHandler;
259247
return this;
260248
}
261249

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// Copyright 2016-2017 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
#pragma warning disable xUnit1026
18+
19+
using System;
20+
using System.Linq;
21+
using System.Threading;
22+
using Xunit;
23+
24+
25+
namespace Confluent.Kafka.IntegrationTests
26+
{
27+
/// <summary>
28+
/// Test multiple calls to SetLogHandler, SetStatisticsHandler and SetErrorHandler
29+
/// </summary>
30+
public partial class Tests
31+
{
32+
[Theory, MemberData(nameof(KafkaParameters))]
33+
public void ProducerBuilder_SetLogHandler(string bootstrapServers)
34+
{
35+
LogToFile("start ProducerBuilder_SetLogHandler");
36+
37+
var producerConfig = new ProducerConfig
38+
{
39+
BootstrapServers = bootstrapServers,
40+
Debug = "all"
41+
};
42+
43+
ManualResetEventSlim mres1 = new(), mres2 = new();
44+
45+
using var _ = new ProducerBuilder<string, string>(producerConfig)
46+
.SetLogHandler((_, _) => mres1.Set())
47+
.SetLogHandler((_, _) => mres2.Set())
48+
.Build();
49+
50+
Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
51+
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));
52+
53+
LogToFile("end ProducerBuilder_SetLogHandler");
54+
}
55+
56+
[Theory, MemberData(nameof(KafkaParameters))]
57+
public void ProducerBuilder_SetStatisticsHandler(string bootstrapServers)
58+
{
59+
LogToFile("start ProducerBuilder_SetStatisticsHandler");
60+
61+
var producerConfig = new ProducerConfig
62+
{
63+
BootstrapServers = bootstrapServers,
64+
StatisticsIntervalMs = 100
65+
};
66+
67+
ManualResetEventSlim mres1 = new(), mres2 = new();
68+
69+
using var _ = new ProducerBuilder<string, string>(producerConfig)
70+
.SetStatisticsHandler((_, _) => mres1.Set())
71+
.SetStatisticsHandler((_, _) => mres2.Set())
72+
.Build();
73+
74+
Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
75+
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));
76+
77+
LogToFile("end ProducerBuilder_SetStatisticsHandler");
78+
}
79+
80+
[Theory]
81+
[InlineData("localhost:9000")]
82+
public void ProducerBuilder_SetErrorHandler(string bootstrapServers)
83+
{
84+
LogToFile("start ProducerBuilder_SetErrorHandler");
85+
86+
var producerConfig = new ProducerConfig
87+
{
88+
BootstrapServers = bootstrapServers
89+
};
90+
91+
ManualResetEventSlim mres1 = new(), mres2 = new();
92+
93+
using var _ = new ProducerBuilder<string, string>(producerConfig)
94+
.SetErrorHandler((_, _) => mres1.Set())
95+
.SetErrorHandler((_, _) => mres2.Set())
96+
.Build();
97+
98+
Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
99+
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));
100+
101+
LogToFile("end ProducerBuilder_SetErrorHandler");
102+
}
103+
104+
[Theory, MemberData(nameof(KafkaParameters))]
105+
public void ConsumerBuilder_SetLogHandler(string bootstrapServers)
106+
{
107+
LogToFile("start ConsumerBuilder_SetLogHandler");
108+
109+
int N = 2;
110+
var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N);
111+
112+
var consumerConfig = new ConsumerConfig
113+
{
114+
GroupId = Guid.NewGuid().ToString(),
115+
BootstrapServers = bootstrapServers,
116+
SessionTimeoutMs = 6000,
117+
EnablePartitionEof = true,
118+
Debug = "all"
119+
};
120+
121+
ManualResetEventSlim mres1 = new(), mres2 = new();
122+
123+
using var consumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig)
124+
.SetPartitionsAssignedHandler((c, partitions) =>
125+
{
126+
Assert.Single(partitions);
127+
Assert.Equal(firstProduced.TopicPartition, partitions[0]);
128+
return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset));
129+
})
130+
.SetLogHandler((_, _) => mres1.Set())
131+
.SetLogHandler((_, _) => mres2.Set())
132+
.Build();
133+
consumer.Subscribe(singlePartitionTopic);
134+
135+
Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
136+
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));
137+
138+
LogToFile("end ConsumerBuilder_SetLogHandler");
139+
}
140+
141+
[Theory, MemberData(nameof(KafkaParameters))]
142+
public void ConsumerBuilder_SetStatisticsHandler(string bootstrapServers)
143+
{
144+
LogToFile("start ConsumerBuilder_SetStatisticsHandler");
145+
146+
int N = 2;
147+
var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N);
148+
149+
var consumerConfig = new ConsumerConfig
150+
{
151+
GroupId = Guid.NewGuid().ToString(),
152+
BootstrapServers = bootstrapServers,
153+
SessionTimeoutMs = 6000,
154+
EnablePartitionEof = true,
155+
StatisticsIntervalMs = 100
156+
};
157+
158+
ManualResetEventSlim mres1 = new(), mres2 = new();
159+
160+
using (var consumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig)
161+
.SetPartitionsAssignedHandler((c, partitions) =>
162+
{
163+
Assert.Single(partitions);
164+
Assert.Equal(firstProduced.TopicPartition, partitions[0]);
165+
return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset));
166+
})
167+
.SetStatisticsHandler((_, _) => mres1.Set())
168+
.SetStatisticsHandler((_, _) => mres2.Set())
169+
.Build())
170+
{
171+
consumer.Subscribe(singlePartitionTopic);
172+
173+
int msgCnt = 0;
174+
while (true)
175+
{
176+
var record = consumer.Consume(TimeSpan.FromMilliseconds(100));
177+
if (record == null) { continue; }
178+
if (record.IsPartitionEOF) { break; }
179+
msgCnt += 1;
180+
}
181+
182+
Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
183+
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));
184+
consumer.Close();
185+
}
186+
187+
LogToFile("end ConsumerBuilder_SetStatisticsHandler");
188+
}
189+
}
190+
}

0 commit comments

Comments
 (0)