Skip to content

Commit 78d51b6

Browse files
authored
Refactor RequestTimeConsumer. (#1446)
1 parent 4b8a8b1 commit 78d51b6

File tree

1 file changed

+22
-42
lines changed

1 file changed

+22
-42
lines changed

examples/Web/RequestTimeConsumer.cs

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -14,58 +14,54 @@
1414
//
1515
// Refer to LICENSE for more information.
1616

17+
using System;
18+
using System.Threading;
19+
using System.Threading.Tasks;
1720
using Confluent.Kafka;
1821
using Microsoft.Extensions.Configuration;
19-
using Microsoft.Extensions.DependencyInjection;
2022
using Microsoft.Extensions.Hosting;
21-
using System;
22-
using System.Threading.Tasks;
23-
using System.Threading;
24-
2523

2624
namespace Web
2725
{
2826
/// <summary>
2927
/// A simple example demonstrating how to set up a Kafka consumer as an
3028
/// IHostedService.
3129
/// </summary>
32-
public class RequestTimeConsumer : IHostedService, IDisposable
30+
public class RequestTimeConsumer : BackgroundService
3331
{
34-
string topic;
35-
ConsumerConfig consumerConfig;
36-
IConsumer<string, long> kafkaConsumer;
37-
Thread pollThread;
38-
CancellationTokenSource cancellationTokenSource;
32+
private readonly string topic;
33+
private readonly IConsumer<string, long> kafkaConsumer;
3934

4035
public RequestTimeConsumer(IConfiguration config)
4136
{
42-
consumerConfig = new ConsumerConfig();
37+
var consumerConfig = new ConsumerConfig();
4338
config.GetSection("Kafka:ConsumerSettings").Bind(consumerConfig);
4439
this.topic = config.GetValue<string>("Kafka:RequestTimeTopic");
40+
this.kafkaConsumer = new ConsumerBuilder<string, long>(consumerConfig).Build();
4541
}
4642

47-
public void Dispose()
43+
protected override Task ExecuteAsync(CancellationToken stoppingToken)
4844
{
49-
this.kafkaConsumer.Close(); // Commit offsets and leave the group cleanly.
50-
this.kafkaConsumer.Dispose();
51-
}
45+
new Thread(() => StartConsumerLoop(stoppingToken)).Start();
5246

53-
private void consumerLoop()
47+
return Task.CompletedTask;
48+
}
49+
50+
private void StartConsumerLoop(CancellationToken cancellationToken)
5451
{
5552
kafkaConsumer.Subscribe(this.topic);
5653

57-
while (true)
54+
while (!cancellationToken.IsCancellationRequested)
5855
{
5956
try
6057
{
61-
var cr = this.kafkaConsumer.Consume(this.cancellationTokenSource.Token);
58+
var cr = this.kafkaConsumer.Consume(cancellationToken);
6259

6360
// Handle message...
6461
Console.WriteLine($"{cr.Message.Key}: {cr.Message.Value}ms");
6562
}
66-
catch (TaskCanceledException)
63+
catch (OperationCanceledException)
6764
{
68-
// StopAsync called.
6965
break;
7066
}
7167
catch (ConsumeException e)
@@ -86,29 +82,13 @@ private void consumerLoop()
8682
}
8783
}
8884
}
89-
90-
public Task StartAsync(CancellationToken _cancellationToken)
85+
86+
public override void Dispose()
9187
{
92-
// The passed in cancellationToken is to allow for cancellation of the StartAsync method.
93-
// Our StartAsync implementation executes quickly - has no blocking or async calls, so
94-
// this is not needed.
95-
96-
// Create a cancellation token source to allow the consumer poll loop to be cancelled
97-
// by the StopAsync method.
98-
this.cancellationTokenSource = new CancellationTokenSource();
99-
this.kafkaConsumer = new ConsumerBuilder<string, long>(consumerConfig).Build();
100-
this.pollThread = new Thread(consumerLoop);
101-
this.pollThread.Start();
102-
return Task.CompletedTask;
103-
}
104-
105-
public async Task StopAsync(CancellationToken cancellationToken)
106-
{
107-
this.cancellationTokenSource.Cancel();
88+
this.kafkaConsumer.Close(); // Commit offsets and leave the group cleanly.
89+
this.kafkaConsumer.Dispose();
10890

109-
// Async methods should never block, so block waiting for the poll loop to finish on another
110-
// thread and await completion of that.
111-
await Task.Run(() => { this.pollThread.Join(); }, cancellationToken);
91+
base.Dispose();
11292
}
11393
}
11494
}

0 commit comments

Comments
 (0)