Skip to content

Commit 54cda76

Browse files
author
Ashish Khanal
committed
- Add background service, setup the consumer and logging in Program.cs
- Consumer working successfully 🎉
1 parent 0b05141 commit 54cda76

File tree

6 files changed

+75
-9
lines changed

6 files changed

+75
-9
lines changed

Consumer/Consumer.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
<ItemGroup>
1212
<PackageReference Include="Confluent.Kafka" Version="2.3.0" />
1313
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
14+
<PackageReference Include="Confluent.SchemaRegistry" Version="2.3.0" />
15+
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.3.0" />
1416
</ItemGroup>
1517

1618
<ItemGroup>

Consumer/Program.cs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
1-
using Microsoft.Extensions.Configuration;
2-
using Confluent.Kafka;
1+
using Confluent.Kafka;
2+
using Confluent.Kafka.SyncOverAsync;
3+
using Confluent.SchemaRegistry.Serdes;
4+
using Consumer.Workers;
5+
using Microsoft.Extensions.Configuration;
6+
using Microsoft.Extensions.DependencyInjection;
7+
using Microsoft.Extensions.Hosting;
8+
using Microsoft.Extensions.Logging;
9+
using Microsoft.Extensions.Options;
10+
11+
HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);
312

413
//using var host = Host.CreateApplicationBuilder(args).Build();
514
//var config = host.Services.GetRequiredService<IConfiguration>(); // This works like below EXCEPT the user secrets part.
@@ -9,6 +18,25 @@
918
.AddUserSecrets<Program>()
1019
.Build();
1120

12-
var consumerConfig = config.GetSection("KafkaConsumer").Get<ConsumerConfig>()!;
21+
// Populate consumer config
22+
builder.Services.Configure<ConsumerConfig>(config.GetSection("KafkaConsumer"));
23+
24+
builder.Services.AddSingleton<IConsumer<string, Biometrics>>(sp =>
25+
{
26+
var consumerConfig = sp.GetRequiredService<IOptions<ConsumerConfig>>();
27+
return new ConsumerBuilder<string, Biometrics>(consumerConfig.Value)
28+
.SetValueDeserializer(new JsonDeserializer<Biometrics>().AsSyncOverAsync())
29+
.Build();
30+
});
31+
32+
builder.Services.AddHostedService<HeartRateZoneWorker>();
33+
34+
using IHost host = builder.Build();
35+
// https://learn.microsoft.com/en-us/dotnet/core/extensions/logging?tabs=command-line#create-logs-in-main
36+
var logger = host.Services.GetRequiredService<ILogger<Program>>();
37+
logger.LogInformation("Host created. Now running it.");
38+
39+
await host.RunAsync();
1340

14-
Console.ReadLine();
41+
public record Biometrics(Guid DeviceId, List<HeartRate> HeartRates);
42+
public record HeartRate(DateTime DateTime, int Value);
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using Confluent.Kafka;
2+
using Microsoft.Extensions.Hosting;
3+
using Microsoft.Extensions.Logging;
4+
5+
namespace Consumer.Workers;
6+
7+
public class HeartRateZoneWorker(
8+
ILogger<HeartRateZoneWorker> logger,
9+
IConsumer<string, Biometrics> consumer) : BackgroundService
10+
{
11+
private const string BiometricsImportedTopicName = "BiometricsImported";
12+
13+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
14+
{
15+
consumer.Subscribe(BiometricsImportedTopicName);
16+
17+
while (!stoppingToken.IsCancellationRequested)
18+
{
19+
var result = consumer.Consume(stoppingToken);
20+
logger.LogInformation("Message Received: {0}", result.Message.Value);
21+
await Task.CompletedTask;
22+
23+
//await Task.Delay(1000, stoppingToken);
24+
}
25+
26+
consumer.Close();
27+
}
28+
29+
public override async Task StopAsync(CancellationToken stoppingToken)
30+
{
31+
logger.LogInformation("HeartRateZoneWorker is stopping.");
32+
await base.StopAsync(stoppingToken);
33+
}
34+
}

Consumer/appsettings.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
"BootstrapServers": "pkc-4rn2p.canadacentral.azure.confluent.cloud:9092",
99
"SecurityProtocol": "SaslSsl",
1010
"SaslMechanism": "PLAIN",
11-
"ClientId": "my-dotnet-kafka",
12-
"GroupId": "my-dotnet-kafka-group",
11+
"ClientId": "dotnet-kafka-consumer",
12+
"GroupId": "dotnet-kafka-consumer",
1313
"AutoOffsetReset": "earliest",
14+
"EnableAutoCommit": true,
1415
"SaslUsername": "comes from user-secrets' secrets.json",
1516
"SaslPassword": "comes from user-secrets' secrets.json"
1617
},

Producer/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
var result = await producer.ProduceAsync(biometricsImportedTopicName, message);
5353
producer.Flush();
5454
app.Logger.LogInformation("Accepted Biometrics");
55-
return TypedResults.Accepted("", result.Value); // result is of type <string, Biometrics>, so you're just returning the accepted Biometrics
55+
return TypedResults.Accepted("", result.Value); // result.Value is just message.Value
5656
})
5757
.WithName("RecordMeasurements")
5858
.WithOpenApi();

Producer/appsettings.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
"SaslMechanism": "PLAIN",
1313
"SaslUsername": "comes from user-secrets' secrets.json",
1414
"SaslPassword": "comes from user-secrets' secrets.json",
15-
"ClientId": "my-dotnet-kafka"
15+
"ClientId": "dotnet-kafka-producer"
1616
},
1717
"KafkaSchemaRegistry": {
1818
"URL": "https://psrc-gq7pv.westus2.azure.confluent.cloud",
19-
"BasicAuthCredentialsSource": "UserInfo"
19+
"BasicAuthCredentialsSource": "UserInfo",
20+
"BasicAuthUserInfo": "comes from user-secrets' secrets.json"
2021
}
2122
}

0 commit comments

Comments
 (0)