Skip to content

Commit d4d6264

Browse files
authored
fix AdminClient deadlock issue + improve AdminClient example (#799)
1 parent c6eceef commit d4d6264

File tree

3 files changed

+87
-33
lines changed

3 files changed

+87
-33
lines changed

examples/AdminClient/AdminClient.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
<AssemblyName>AdminClient</AssemblyName>
66
<TargetFramework>netcoreapp2.1</TargetFramework>
77
<OutputType>Exe</OutputType>
8+
<LangVersion>7.3</LangVersion>
89
</PropertyGroup>
910

1011
<ItemGroup>

examples/AdminClient/Program.cs

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
// Refer to LICENSE for more information.
1818

1919
using System;
20-
using System.Linq;
2120
using System.Collections.Generic;
21+
using System.Linq;
22+
using System.Threading.Tasks;
2223
using Confluent.Kafka;
24+
using Confluent.Kafka.Admin;
2325

2426

2527
namespace Confluent.Kafka.Examples
@@ -28,10 +30,11 @@ public class Program
2830
{
2931
static string ToString(int[] array) => $"[{string.Join(", ", array)}]";
3032

31-
static void ListGroups(string brokerList)
33+
static void ListGroups(string bootstrapServers)
3234
{
33-
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = brokerList }).Build())
35+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
3436
{
37+
// Warning: The API for this functionality is subject to change.
3538
var groups = adminClient.ListGroups(TimeSpan.FromSeconds(10));
3639
Console.WriteLine($"Consumer Groups:");
3740
foreach (var g in groups)
@@ -50,10 +53,11 @@ static void ListGroups(string brokerList)
5053
}
5154
}
5255

53-
static void PrintMetadata(string brokerList)
56+
static void PrintMetadata(string bootstrapServers)
5457
{
55-
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = brokerList }).Build())
58+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
5659
{
60+
// Warning: The API for this functionality is subject to change.
5761
var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
5862
Console.WriteLine($"{meta.OriginatingBrokerId} {meta.OriginatingBrokerName}");
5963
meta.Brokers.ForEach(broker =>
@@ -72,19 +76,48 @@ static void PrintMetadata(string brokerList)
7276
}
7377
}
7478

75-
public static void Main(string[] args)
79+
static async Task CreateTopicAsync(string bootstrapServers, string topicName)
7680
{
77-
Console.WriteLine($"librdkafka Version: {Library.VersionString} ({Library.Version:X})");
78-
Console.WriteLine($"Debug Contexts: {string.Join(", ", Library.DebugContexts)}");
79-
80-
if (args.Contains("--list-groups"))
81+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
8182
{
82-
ListGroups(args[0]);
83+
try
84+
{
85+
await adminClient.CreateTopicsAsync(new TopicSpecification[] {
86+
new TopicSpecification { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } });
87+
}
88+
catch (CreateTopicsException e)
89+
{
90+
Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
91+
}
8392
}
93+
}
8494

85-
if (args.Contains("--metadata"))
95+
public static async Task Main(string[] args)
96+
{
97+
if (args.Length < 2)
98+
{
99+
Console.WriteLine("usage: .. <bootstrapServers> <list-groups|metadata|library-version|create-topic> [topic-name]");
100+
System.Environment.Exit(1);
101+
}
102+
103+
switch (args[1])
86104
{
87-
PrintMetadata(args[0]);
105+
case "library-version":
106+
Console.WriteLine($"librdkafka Version: {Library.VersionString} ({Library.Version:X})");
107+
Console.WriteLine($"Debug Contexts: {string.Join(", ", Library.DebugContexts)}");
108+
break;
109+
case "list-groups":
110+
ListGroups(args[0]);
111+
break;
112+
case "metadata":
113+
PrintMetadata(args[0]);
114+
break;
115+
case "create-topic":
116+
await CreateTopicAsync(args[0], args[2]);
117+
break;
118+
default:
119+
Console.WriteLine($"unknown command: {args[1]}");
120+
break;
88121
}
89122
}
90123
}

src/Confluent.Kafka/AdminClient.cs

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,9 @@ private Task StartPollTask(CancellationToken ct)
157157
{
158158
if (errorCode != ErrorCode.NoError)
159159
{
160-
((TaskCompletionSource<List<CreateTopicReport>>)adminClientResult).TrySetException(
161-
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)));
160+
Task.Run(() =>
161+
((TaskCompletionSource<List<CreateTopicReport>>)adminClientResult).TrySetException(
162+
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr))));
162163
return;
163164
}
164165

@@ -167,11 +168,14 @@ private Task StartPollTask(CancellationToken ct)
167168

168169
if (result.Any(r => r.Error.IsError))
169170
{
170-
((TaskCompletionSource<List<CreateTopicReport>>)adminClientResult).TrySetException(new CreateTopicsException(result));
171+
Task.Run(() =>
172+
((TaskCompletionSource<List<CreateTopicReport>>)adminClientResult).TrySetException(
173+
new CreateTopicsException(result)));
171174
}
172175
else
173176
{
174-
((TaskCompletionSource<List<CreateTopicReport>>)adminClientResult).TrySetResult(result);
177+
Task.Run(() =>
178+
((TaskCompletionSource<List<CreateTopicReport>>)adminClientResult).TrySetResult(result));
175179
}
176180
}
177181
break;
@@ -180,8 +184,9 @@ private Task StartPollTask(CancellationToken ct)
180184
{
181185
if (errorCode != ErrorCode.NoError)
182186
{
183-
((TaskCompletionSource<List<DeleteTopicReport>>)adminClientResult).TrySetException(
184-
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)));
187+
Task.Run(() =>
188+
((TaskCompletionSource<List<DeleteTopicReport>>)adminClientResult).TrySetException(
189+
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr))));
185190
return;
186191
}
187192

@@ -191,11 +196,14 @@ private Task StartPollTask(CancellationToken ct)
191196

192197
if (result.Any(r => r.Error.IsError))
193198
{
194-
((TaskCompletionSource<List<DeleteTopicReport>>)adminClientResult).TrySetException(new DeleteTopicsException(result));
199+
Task.Run(() =>
200+
((TaskCompletionSource<List<DeleteTopicReport>>)adminClientResult).TrySetException(
201+
new DeleteTopicsException(result)));
195202
}
196203
else
197204
{
198-
((TaskCompletionSource<List<DeleteTopicReport>>)adminClientResult).TrySetResult(result);
205+
Task.Run(() =>
206+
((TaskCompletionSource<List<DeleteTopicReport>>)adminClientResult).TrySetResult(result));
199207
}
200208
}
201209
break;
@@ -204,8 +212,9 @@ private Task StartPollTask(CancellationToken ct)
204212
{
205213
if (errorCode != ErrorCode.NoError)
206214
{
207-
((TaskCompletionSource<List<CreatePartitionsReport>>)adminClientResult).TrySetException(
208-
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)));
215+
Task.Run(() =>
216+
((TaskCompletionSource<List<CreatePartitionsReport>>)adminClientResult).TrySetException(
217+
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr))));
209218
return;
210219
}
211220

@@ -215,11 +224,14 @@ private Task StartPollTask(CancellationToken ct)
215224

216225
if (result.Any(r => r.Error.IsError))
217226
{
218-
((TaskCompletionSource<List<CreatePartitionsReport>>)adminClientResult).TrySetException(new CreatePartitionsException(result));
227+
Task.Run(() =>
228+
((TaskCompletionSource<List<CreatePartitionsReport>>)adminClientResult).TrySetException(
229+
new CreatePartitionsException(result)));
219230
}
220231
else
221232
{
222-
((TaskCompletionSource<List<CreatePartitionsReport>>)adminClientResult).TrySetResult(result);
233+
Task.Run(() =>
234+
((TaskCompletionSource<List<CreatePartitionsReport>>)adminClientResult).TrySetResult(result));
223235
}
224236
}
225237
break;
@@ -228,8 +240,9 @@ private Task StartPollTask(CancellationToken ct)
228240
{
229241
if (errorCode != ErrorCode.NoError)
230242
{
231-
((TaskCompletionSource<List<DescribeConfigsResult>>)adminClientResult).TrySetException(
232-
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)));
243+
Task.Run(() =>
244+
((TaskCompletionSource<List<DescribeConfigsResult>>)adminClientResult).TrySetException(
245+
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr))));
233246
return;
234247
}
235248

@@ -238,12 +251,15 @@ private Task StartPollTask(CancellationToken ct)
238251

239252
if (result.Any(r => r.Error.IsError))
240253
{
241-
((TaskCompletionSource<List<DescribeConfigsResult>>)adminClientResult).TrySetException(new DescribeConfigsException(result));
254+
Task.Run(() =>
255+
((TaskCompletionSource<List<DescribeConfigsResult>>)adminClientResult).TrySetException(
256+
new DescribeConfigsException(result)));
242257
}
243258
else
244259
{
245260
var nr = result.Select(a => new DescribeConfigsResult { ConfigResource = a.ConfigResource, Entries = a.Entries }).ToList();
246-
((TaskCompletionSource<List<DescribeConfigsResult>>)adminClientResult).TrySetResult(nr);
261+
Task.Run(() =>
262+
((TaskCompletionSource<List<DescribeConfigsResult>>)adminClientResult).TrySetResult(nr));
247263
}
248264
}
249265
break;
@@ -252,8 +268,9 @@ private Task StartPollTask(CancellationToken ct)
252268
{
253269
if (errorCode != ErrorCode.NoError)
254270
{
255-
((TaskCompletionSource<List<AlterConfigsReport>>)adminClientResult).TrySetException(
256-
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)));
271+
Task.Run(() =>
272+
((TaskCompletionSource<List<AlterConfigsReport>>)adminClientResult).TrySetException(
273+
new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr))));
257274
return;
258275
}
259276

@@ -263,11 +280,14 @@ private Task StartPollTask(CancellationToken ct)
263280

264281
if (result.Any(r => r.Error.IsError))
265282
{
266-
((TaskCompletionSource<List<AlterConfigsReport>>)adminClientResult).TrySetException(new AlterConfigsException(result));
283+
Task.Run(() =>
284+
((TaskCompletionSource<List<AlterConfigsReport>>)adminClientResult).TrySetException(
285+
new AlterConfigsException(result)));
267286
}
268287
else
269288
{
270-
((TaskCompletionSource<List<AlterConfigsReport>>) adminClientResult).TrySetResult(result);
289+
Task.Run(() =>
290+
((TaskCompletionSource<List<AlterConfigsReport>>) adminClientResult).TrySetResult(result));
271291
}
272292
}
273293
break;

0 commit comments

Comments
 (0)