Skip to content

Commit dc80e46

Browse files
authored
Add a web example (#1440)
* WIP: Web example * Changes following review feedback * Fix random gen * efficient -> inefficient
1 parent e5503c1 commit dc80e46

17 files changed

+622
-0
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,13 @@ class Program
200200
}
201201
```
202202

203+
### IHostedService and Web Application Integration
204+
205+
The [Web](https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/Web) example demonstrates how to integrate
206+
Apache Kafka with a web application, including how to implement `IHostedService` to realize a long running consumer poll loop, how to
207+
register a producer as a singleton service, and how to bind configuration from an injected `IConfiguration` instance.
208+
209+
203210
### Schema Registry Integration
204211

205212
The three "Serdes" packages provide serializers and deserializers for Avro, Protobuf and JSON with [Confluent Schema Registry](https://docs.confluent.io/current/schema-registry/docs/index.html) integration. The `Confluent.SchemaRegistry` nuget package provides a client for interfacing with
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2020 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Threading.Tasks;
19+
using Microsoft.AspNetCore.Mvc;
20+
using Microsoft.Extensions.Configuration;
21+
using Microsoft.Extensions.Logging;
22+
using Confluent.Kafka;
23+
24+
25+
namespace Web.Controllers
26+
{
27+
public class HomeController : Controller
28+
{
29+
private string topic;
30+
private readonly KafkaDependentProducer<Null, string> producer;
31+
private readonly ILogger logger;
32+
33+
public HomeController(KafkaDependentProducer<Null, string> producer, IConfiguration config, ILogger<HomeController> logger)
34+
{
35+
// In a real-world application, you might be using Kafka as a system of record and may wish
36+
// to update application state in your request handlers. Or you may wish to write some
37+
// analytics data related to your handler logic. In this example, we aren't doing anything
38+
// interesting, so we just write frivolous messages to demonstrate how to set things up.
39+
this.topic = config.GetValue<string>("Kafka:FrivolousTopic");
40+
this.producer = producer;
41+
this.logger = logger;
42+
}
43+
44+
public async Task<IActionResult> Index()
45+
{
46+
// Simulate a complex request handler by delaying a random amount of time.
47+
await Task.Delay((int)(new Random((int)DateTime.Now.Ticks).NextDouble()*100));
48+
49+
// Important note: DO NOT create a new producer instance every time you
50+
// need to produce a message (this is a common pattern with relational database
51+
// drivers, but it is extremely inefficient here). Instead, use a long-lived
52+
// singleton instance, as per this example.
53+
54+
// Do not delay completion of the page request on the result of produce call.
55+
// Any errors are handled out of band in deliveryReportHandler.
56+
this.producer.Produce(topic, new Message<Null, string> { Value = "Frivolous message (index)" }, deliveryReportHandler);
57+
return View();
58+
}
59+
60+
public async Task<IActionResult> Page1()
61+
{
62+
await Task.Delay((int)(new Random((int)DateTime.Now.Ticks).NextDouble()*100));
63+
64+
// Delay completion of the page request on the result of the produce call.
65+
// An exception will be thrown in the case of an error.
66+
await this.producer.ProduceAsync(topic, new Message<Null, string> { Value = "Frivolous message #1" });
67+
return View();
68+
}
69+
70+
private void deliveryReportHandler(DeliveryReport<Null, string> deliveryReport)
71+
{
72+
if (deliveryReport.Status == PersistenceStatus.NotPersisted)
73+
{
74+
// It is common to write application logs to Kafka (note: this project does not provide
75+
// an example logger implementation that does this). Such an implementation should
76+
// ideally fall back to logging messages locally in the case of delivery problems.
77+
this.logger.Log(LogLevel.Warning, $"Message delivery failed: {deliveryReport.Message.Value}");
78+
}
79+
}
80+
}
81+
}

examples/Web/KafkaClientHandle.cs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2020 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using Confluent.Kafka;
18+
using Microsoft.Extensions.Configuration;
19+
20+
21+
namespace Web
22+
{
23+
/// <summary>
24+
/// Wraps a Confluent.Kafka.IProducer instance, and allows for basic
25+
/// configuration of this via IConfiguration.
26+
///
27+
/// KafkaClientHandle does not provide any way for messages to be produced
28+
/// directly. Instead, it is a dependency of KafkaDependentProducer. You
29+
/// can create more than one instance of KafkaDependentProducer (with
30+
/// possibly differing K and V generic types) that leverages the same
31+
/// underlying producer instance exposed by the Handle property of this
32+
/// class. This is more efficient than creating separate
33+
/// Confluent.Kafka.IProducer instances for each Message type you wish to
34+
/// produce.
35+
/// </summary>
36+
public class KafkaClientHandle
37+
{
38+
IProducer<byte[], byte[]> kafkaProducer;
39+
40+
public KafkaClientHandle(IConfiguration config)
41+
{
42+
var conf = new ProducerConfig();
43+
config.GetSection("Kafka:ProducerSettings").Bind(conf);
44+
this.kafkaProducer = new ProducerBuilder<byte[], byte[]>(conf).Build();
45+
}
46+
47+
public Handle Handle { get => this.kafkaProducer.Handle; }
48+
49+
public void Dispose()
50+
{
51+
// Block until all outstanding produce requests have completed (with or
52+
// without error).
53+
kafkaProducer.Flush();
54+
kafkaProducer.Dispose();
55+
}
56+
}
57+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2020 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using Confluent.Kafka;
18+
using System;
19+
using System.Threading.Tasks;
20+
21+
22+
namespace Web
23+
{
24+
/// <summary>
25+
/// Leverages the injected KafkaClientHandle instance to allow
26+
/// Confluent.Kafka.Message{K,V}s to be produced to Kafka.
27+
/// </summary>
28+
public class KafkaDependentProducer<K,V>
29+
{
30+
IProducer<K, V> kafkaHandle;
31+
32+
public KafkaDependentProducer(KafkaClientHandle handle)
33+
{
34+
kafkaHandle = new DependentProducerBuilder<K, V>(handle.Handle).Build();
35+
}
36+
37+
/// <summary>
38+
/// Asychronously produce a message and expose delivery information
39+
/// via the returned Task. Use this method of producing if you would
40+
/// like to await the result before flow of execution continues.
41+
/// <summary>
42+
public Task ProduceAsync(string topic, Message<K, V> message)
43+
=> this.kafkaHandle.ProduceAsync(topic, message);
44+
45+
/// <summary>
46+
/// Asynchronously produce a message and expose delivery information
47+
/// via the provided callback function. Use this method of producing
48+
/// if you would like flow of execution to continue immediately, and
49+
/// handle delivery information out-of-band.
50+
/// </summary>
51+
public void Produce(string topic, Message<K, V> message, Action<DeliveryReport<K, V>> deliveryHandler = null)
52+
=> this.kafkaHandle.Produce(topic, message, deliveryHandler);
53+
54+
public void Flush(TimeSpan timeout)
55+
=> this.kafkaHandle.Flush(timeout);
56+
}
57+
}

examples/Web/Program.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright 2020 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using Microsoft.AspNetCore.Hosting;
18+
using Microsoft.Extensions.Hosting;
19+
20+
namespace Web
21+
{
22+
public class Program
23+
{
24+
public static void Main(string[] args)
25+
{
26+
CreateHostBuilder(args).Build().Run();
27+
}
28+
29+
public static IHostBuilder CreateHostBuilder(string[] args) =>
30+
Host.CreateDefaultBuilder(args)
31+
.ConfigureWebHostDefaults(webBuilder =>
32+
{
33+
webBuilder.UseStartup<Startup>();
34+
});
35+
}
36+
}

examples/Web/README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
## Web Example
2+
3+
This project demonstrates how to integrate Apache Kafka with a .NET web application. It provides
4+
examples of both producing and consuming messages.
5+
6+
7+
#### KafkaClientHandle
8+
9+
Demonstrates how to wrap the Confluent.Kafka producer in a class that can be registered as
10+
a singleton service, including how to bind client configuration from an injected IConfiguration
11+
instance to a Confluent.Kafka.ProducerConfig object.
12+
13+
Important Note: You should not create new client instances frequently (for example in the
14+
body of a request handler) - doing this is enormously expensive. Instead, you should create a
15+
single client instance that lives for the lifetime of the application as per this example.
16+
The .NET Kafka client is completely thread safe and a single instance is capable of serving
17+
the requirements tens or hundreds of thousands of concurrent web requests.
18+
19+
20+
#### KafkaDependentProducer
21+
22+
Demonstrates how to produce messages with different types using a single producer instance.
23+
This is more efficient than creating more than one producer instance.
24+
25+
26+
#### HomeController
27+
28+
Demonstrates how utilize a previously registered KafkaDependentProducer singleton service in
29+
a controller.
30+
31+
32+
#### RequestTimerMiddleware
33+
34+
Demonstrates how utilize a KafkaDependentProducer service in a middleware component that
35+
measures how long a web request takes to handle, and logs the information to Kafka.
36+
37+
38+
#### RequestTimeConsumer
39+
40+
Demonstrates how to run a Confluent.Kafka consumer as an IHostedService.
41+

0 commit comments

Comments
 (0)