Skip to content

Commit 17004b1

Browse files
authored
added confluent cloud example (#503)
* added confluent cloud example * review comments
1 parent 44127bd commit 17004b1

File tree

3 files changed

+134
-0
lines changed

3 files changed

+134
-0
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ or with specific classes generated using the `avrogen` tool
151151
dotnet /path/to/avrogen.dll -s your_schema.asvc .
152152
```
153153

154+
### Confluent Cloud
155+
156+
The [Confluent Cloud example](examples/ConfluentCloud) demonstrates how to configure the .NET client for use with [Confluent Cloud](https://www.confluent.io/confluent-cloud/).
157+
158+
154159
## Build
155160

156161
To build the library or any test or example project, run the following from within the relevant project directory:
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>netcoreapp2.0</TargetFramework>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="Confluent.Kafka" Version="0.11.4" />
10+
</ItemGroup>
11+
12+
</Project>

examples/ConfluentCloud/Program.cs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright 2018 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Text;
20+
using Confluent.Kafka;
21+
using Confluent.Kafka.Serialization;
22+
23+
24+
namespace ConfluentCloudExample
25+
{
26+
/// <summary>
27+
/// This is a simple example demonstrating how to produce a message to
28+
/// Confluent Cloud then read it back again.
29+
///
30+
/// https://www.confluent.io/confluent-cloud/
31+
///
32+
/// Confluent Cloud does not auto-create topics. You will need to use the ccloud
33+
/// cli to create the dotnet-test-topic topic before running this example. The
34+
/// <ccloud bootstrap servers>, <ccloud key> and <ccloud secret> parameters are
35+
/// available via the confluent cloud web interface. For more information,
36+
/// refer to the quick-start:
37+
///
38+
/// https://docs.confluent.io/current/cloud-quickstart.html
39+
/// </summary>
40+
class Program
41+
{
42+
static void Main(string[] args)
43+
{
44+
var pConfig = new Dictionary<string, object>
45+
{
46+
{ "bootstrap.servers", "<ccloud bootstrap servers>" },
47+
{ "broker.version.fallback", "0.10.0.0" },
48+
{ "api.version.fallback.ms", 0 },
49+
{ "sasl.mechanisms", "PLAIN" },
50+
{ "security.protocol", "SASL_SSL" },
51+
// On Windows, default trusted root CA certificates are stored in the Windows Registry.
52+
// They are not automatically discovered by Confluent.Kafka and it's not possible to
53+
// reference them using the `ssl.ca.location` property. You will need to obtain these
54+
// from somewhere else, for example use the cacert.pem file distributed with curl:
55+
// https://curl.haxx.se/ca/cacert.pem and reference that file in the `ssl.ca.location`
56+
// property:
57+
{ "ssl.ca.location", "/usr/local/etc/openssl/cert.pem" }, // suitable configuration for linux, osx.
58+
// { "ssl.ca.location", "c:\\path\\to\\cacert.pem" }, // windows
59+
{ "sasl.username", "<ccloud key>" },
60+
{ "sasl.password", "<ccloud secret>" }
61+
};
62+
63+
using (var producer = new Producer<Null, string>(pConfig, null, new StringSerializer(Encoding.UTF8)))
64+
{
65+
producer.ProduceAsync("dotnet-test-topic", null, "test value")
66+
.ContinueWith(result =>
67+
{
68+
var msg = result.Result;
69+
if (msg.Error.Code != ErrorCode.NoError)
70+
{
71+
Console.WriteLine($"failed to deliver message: {msg.Error.Reason}");
72+
}
73+
else
74+
{
75+
Console.WriteLine($"delivered to: {result.Result.TopicPartitionOffset}");
76+
}
77+
});
78+
79+
producer.Flush(TimeSpan.FromSeconds(10));
80+
}
81+
82+
var cConfig = new Dictionary<string, object>
83+
{
84+
{ "bootstrap.servers", "<confluent cloud bootstrap servers>" },
85+
{ "broker.version.fallback", "0.10.0.0" },
86+
{ "api.version.fallback.ms", 0 },
87+
{ "sasl.mechanisms", "PLAIN" },
88+
{ "security.protocol", "SASL_SSL" },
89+
{ "ssl.ca.location", "/usr/local/etc/openssl/cert.pem" }, // suitable configuration for linux, osx.
90+
// { "ssl.ca.location", "c:\\path\\to\\cacert.pem" }, // windows
91+
{ "sasl.username", "<confluent cloud key>" },
92+
{ "sasl.password", "<confluent cloud secret>" },
93+
{ "group.id", Guid.NewGuid().ToString() },
94+
{ "auto.offset.reset", "smallest" }
95+
};
96+
97+
using (var consumer = new Consumer<Null, string>(cConfig, null, new StringDeserializer(Encoding.UTF8)))
98+
{
99+
consumer.Subscribe("dotnet-test-topic");
100+
101+
consumer.OnConsumeError += (_, err)
102+
=> Console.WriteLine($"consume error: {err.Error.Reason}");
103+
104+
consumer.OnMessage += (_, msg)
105+
=> Console.WriteLine($"consumed: {msg.Value}");
106+
107+
consumer.OnPartitionEOF += (_, tpo)
108+
=> Console.WriteLine($"end of partition: {tpo}");
109+
110+
while (true)
111+
{
112+
consumer.Poll(TimeSpan.FromMilliseconds(100));
113+
}
114+
}
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)