Skip to content

Commit 7746169

Browse files
author
Matt Howlett
committed
merged
2 parents d675774 + 7235eaa commit 7746169

File tree

116 files changed

+2814
-1381
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

116 files changed

+2814
-1381
lines changed

Confluent.Kafka.sln

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MultiProducer", "examples\M
4747
EndProject
4848
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Producer", "examples\Producer\Producer.csproj", "{C5114860-FBED-491E-8DB5-E8B51F2A2E6C}"
4949
EndProject
50+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ConfigGen", "src\ConfigGen\ConfigGen.csproj", "{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}"
51+
EndProject
5052
Global
5153
GlobalSection(SolutionConfigurationPlatforms) = preSolution
5254
Debug|Any CPU = Debug|Any CPU
@@ -288,6 +290,18 @@ Global
288290
{C5114860-FBED-491E-8DB5-E8B51F2A2E6C}.Release|x64.Build.0 = Release|Any CPU
289291
{C5114860-FBED-491E-8DB5-E8B51F2A2E6C}.Release|x86.ActiveCfg = Release|Any CPU
290292
{C5114860-FBED-491E-8DB5-E8B51F2A2E6C}.Release|x86.Build.0 = Release|Any CPU
293+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
294+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Debug|Any CPU.Build.0 = Debug|Any CPU
295+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Debug|x64.ActiveCfg = Debug|Any CPU
296+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Debug|x64.Build.0 = Debug|Any CPU
297+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Debug|x86.ActiveCfg = Debug|Any CPU
298+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Debug|x86.Build.0 = Debug|Any CPU
299+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Release|Any CPU.ActiveCfg = Release|Any CPU
300+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Release|Any CPU.Build.0 = Release|Any CPU
301+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Release|x64.ActiveCfg = Release|Any CPU
302+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Release|x64.Build.0 = Release|Any CPU
303+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Release|x86.ActiveCfg = Release|Any CPU
304+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F}.Release|x86.Build.0 = Release|Any CPU
291305
EndGlobalSection
292306
GlobalSection(NestedProjects) = preSolution
293307
{09C3255B-1972-4EB8-91D0-FB9F5CD82BCB} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
@@ -309,5 +323,6 @@ Global
309323
{CFDE34BC-071A-4AC2-AC03-B621DC719181} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
310324
{E98D8892-1573-4C6C-9B2B-704BA76EDC96} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
311325
{C5114860-FBED-491E-8DB5-E8B51F2A2E6C} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
326+
{5642B638-F4AA-4C77-BE6E-C0FFF7CC103F} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
312327
EndGlobalSection
313328
EndGlobal

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class Program
9393
var config = new Dictionary<string, object> { { "bootstrap.servers", "localhost:9092" } };
9494

9595
// A Producer for sending messages with null keys and UTF-8 encoded values.
96-
using (var p = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
96+
using (var p = new Producer<Null, string>(config))
9797
{
9898
try
9999
{
@@ -133,7 +133,7 @@ class Program
133133
? $"Delivered message to {r.TopicPartitionOffset}"
134134
: $"Delivery Error: {r.Error.Reason}");
135135

136-
using (var p = new Producer<Null, string>(conf, null, new StringSerializer(Encoding.UTF8)))
136+
using (var p = new Producer<Null, string>(conf))
137137
{
138138
for (int i=0; i<100; ++i)
139139
{
@@ -168,7 +168,7 @@ class Program
168168
{ "auto.offset.reset", "earliest" }
169169
};
170170

171-
using (var c = new Consumer<Ignore, string>(conf, null, new StringDeserializer(Encoding.UTF8)))
171+
using (var c = new Consumer<Ignore, string>(conf))
172172
{
173173
c.Subscribe("my-topic");
174174

examples/AdminClient/Program.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ public class Program
3030

3131
static void ListGroups(string brokerList)
3232
{
33-
var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } };
34-
35-
using (var adminClient = new AdminClient(config))
33+
using (var adminClient = new AdminClient(new AdminClientConfig { BootstrapServers = brokerList }))
3634
{
3735
var groups = adminClient.ListGroups(TimeSpan.FromSeconds(10));
3836
Console.WriteLine($"Consumer Groups:");
@@ -54,8 +52,7 @@ static void ListGroups(string brokerList)
5452

5553
static void PrintMetadata(string brokerList)
5654
{
57-
var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } };
58-
using (var adminClient = new AdminClient(config))
55+
using (var adminClient = new AdminClient(new AdminClientConfig { BootstrapServers = brokerList }))
5956
{
6057
var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
6158
Console.WriteLine($"{meta.OriginatingBrokerId} {meta.OriginatingBrokerName}");

examples/AvroBlogExamples/Program.cs

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,10 @@ class Program
3535
{
3636
static void ProduceGeneric(string bootstrapServers, string schemaRegistryUrl)
3737
{
38-
var config = new Dictionary<string, object>
39-
{
40-
{ "bootstrap.servers", bootstrapServers },
41-
{ "schema.registry.url", schemaRegistryUrl }
42-
};
38+
var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
4339

44-
using (var producer = new Producer<Null, GenericRecord>(config, null, new AvroSerializer<GenericRecord>()))
40+
using (var serdeProvider = new AvroSerdeProvider(new AvroSerdeProviderConfig { SchemaRegistryUrl = schemaRegistryUrl }))
41+
using (var producer = new Producer<Null, GenericRecord>(producerConfig, null, serdeProvider.CreateValueSerializer<GenericRecord>()))
4542
{
4643
var logLevelSchema = (EnumSchema)Schema.Parse(
4744
File.ReadAllText("LogLevel.asvc"));
@@ -68,13 +65,10 @@ static void ProduceGeneric(string bootstrapServers, string schemaRegistryUrl)
6865

6966
static void ProduceSpecific(string bootstrapServers, string schemaRegistryUrl)
7067
{
71-
var config = new Dictionary<string, object>
72-
{
73-
{ "bootstrap.servers", bootstrapServers },
74-
{ "schema.registry.url", schemaRegistryUrl }
75-
};
68+
var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
7669

77-
using (var producer = new Producer<Null, MessageTypes.LogMessage>(config, null, new AvroSerializer<MessageTypes.LogMessage>()))
70+
using (var serdeProvider = new AvroSerdeProvider(new AvroSerdeProviderConfig { SchemaRegistryUrl = schemaRegistryUrl }))
71+
using (var producer = new Producer<Null, MessageTypes.LogMessage>(producerConfig, null, serdeProvider.CreateValueSerializer<MessageTypes.LogMessage>()))
7872
{
7973
producer.ProduceAsync("log-messages",
8074
new Message<Null, MessageTypes.LogMessage>
@@ -99,16 +93,15 @@ static void ConsumeSpecific(string bootstrapServers, string schemaRegistryUrl)
9993
cts.Cancel();
10094
};
10195

102-
var consumerConfig = new Dictionary<string, object>
96+
var consumerConfig = new ConsumerConfig
10397
{
104-
{ "group.id", Guid.NewGuid().ToString() },
105-
{ "bootstrap.servers", bootstrapServers },
106-
{ "schema.registry.url", schemaRegistryUrl },
107-
{ "auto.offset.reset", "beginning" }
98+
GroupId = Guid.NewGuid().ToString(),
99+
BootstrapServers = bootstrapServers,
100+
AutoOffsetReset = AutoOffsetResetType.Earliest
108101
};
109102

110-
using (var consumer = new Consumer<Null, MessageTypes.LogMessage>(
111-
consumerConfig, null, new AvroDeserializer<MessageTypes.LogMessage>()))
103+
using (var serdeProvider = new AvroSerdeProvider(new AvroSerdeProviderConfig { SchemaRegistryUrl = schemaRegistryUrl }))
104+
using (var consumer = new Consumer<Null, MessageTypes.LogMessage>(consumerConfig, null, serdeProvider.CreateDeserializer<MessageTypes.LogMessage>()))
112105
{
113106
consumer.Subscribe("log-messages");
114107

examples/AvroGeneric/Program.cs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,28 +39,25 @@ static void Main(string[] args)
3939
string schemaRegistryUrl = args[1];
4040
string topicName = args[2];
4141

42-
var producerConfig = new Dictionary<string, object>
42+
var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
43+
44+
var avroConfig = new AvroSerdeProviderConfig
4345
{
44-
{ "bootstrap.servers", bootstrapServers },
4546
// Note: you can specify more than one schema registry url using the
46-
// schema.registry.url property for redundancy (comma separated list).
47-
// The property name is not plural to follow the convention set by
48-
// the Java implementation.
49-
{ "schema.registry.url", schemaRegistryUrl },
47+
// schemaRegistryUrl property for redundancy (comma separated list).
48+
SchemaRegistryUrl = schemaRegistryUrl,
5049
// optional schema registry client properties:
51-
// { "schema.registry.connection.timeout.ms", 5000 },
52-
// { "schema.registry.max.cached.schemas", 10 },
50+
// SchemaRegistryRequestTimeoutMs = 5000,
51+
// SchemaRegistryMaxCachedSchemas = 10,
5352
// optional avro serializer properties:
54-
// { "avro.serializer.buffer.bytes", 50 },
55-
// { "avro.serializer.auto.register.schemas", true }
53+
// AvroSerializerBufferBytes = 50,
54+
// AvroSerializerAutoRegisterSchemas = true
5655
};
5756

58-
var consumerConfig = new Dictionary<string, object>
57+
var consumerConfig = new ConsumerConfig
5958
{
60-
{ "bootstrap.servers", bootstrapServers },
61-
{ "group.id", Guid.NewGuid() },
62-
{ "schema.registry.url", schemaRegistryUrl },
63-
{ "error_cb", (Action<ErrorEvent>)(e => Console.WriteLine($"Error [{e.Level}]: {e.Error.Reason}")) }
59+
BootstrapServers = bootstrapServers,
60+
GroupId = Guid.NewGuid().ToString()
6461
};
6562

6663
// var s = (RecordSchema)Schema.Parse(File.ReadAllText("my-schema.json"));
@@ -80,8 +77,12 @@ static void Main(string[] args)
8077
CancellationTokenSource cts = new CancellationTokenSource();
8178
var consumeTask = Task.Run(() =>
8279
{
83-
using (var consumer = new Consumer<string, GenericRecord>(consumerConfig, new AvroDeserializer<string>(), new AvroDeserializer<GenericRecord>()))
80+
using (var serdeProvider = new AvroSerdeProvider(avroConfig))
81+
using (var consumer = new Consumer<string, GenericRecord>(consumerConfig, serdeProvider.CreateDeserializer<string>(), serdeProvider.CreateDeserializer<GenericRecord>()))
8482
{
83+
consumer.OnError += (_, e)
84+
=> Console.WriteLine($"Error: {e.Reason}");
85+
8586
consumer.Subscribe(topicName);
8687

8788
while (!cts.Token.IsCancellationRequested)
@@ -101,7 +102,8 @@ static void Main(string[] args)
101102
}
102103
}, cts.Token);
103104

104-
using (var producer = new Producer<string, GenericRecord>(producerConfig, new AvroSerializer<string>(), new AvroSerializer<GenericRecord>()))
105+
using (var serdeProvider = new AvroSerdeProvider(avroConfig))
106+
using (var producer = new Producer<string, GenericRecord>(producerConfig, serdeProvider.CreateKeySerializer<string>(), serdeProvider.CreateValueSerializer<GenericRecord>()))
105107
{
106108
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
107109

examples/AvroSpecific/Program.cs

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -37,50 +37,29 @@ static void Main(string[] args)
3737
string schemaRegistryUrl = args[1];
3838
string topicName = args[2];
3939

40-
var producerConfig = new Dictionary<string, object>
40+
var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
41+
42+
var avroConfig = new AvroSerdeProviderConfig
4143
{
42-
{ "bootstrap.servers", bootstrapServers },
4344
// Note: you can specify more than one schema registry url using the
4445
// schema.registry.url property for redundancy (comma separated list).
4546
// The property name is not plural to follow the convention set by
4647
// the Java implementation.
47-
{ "schema.registry.url", schemaRegistryUrl },
48+
SchemaRegistryUrl = schemaRegistryUrl,
4849
// optional schema registry client properties:
49-
{ "schema.registry.connection.timeout.ms", 5000 },
50-
{ "schema.registry.max.cached.schemas", 10 },
50+
SchemaRegistryRequestTimeoutMs = 5000,
51+
SchemaRegistryMaxCachedSchemas = 10,
5152
// optional avro serializer properties:
52-
{ "avro.serializer.buffer.bytes", 50 },
53-
{ "avro.serializer.auto.register.schemas", true }
53+
AvroSerializerBufferBytes = 50,
54+
AvroSerializerAutoRegisterSchemas = true
5455
};
5556

56-
var consumerConfig = new Dictionary<string, object>
57+
var consumerConfig = new ConsumerConfig
5758
{
58-
{ "bootstrap.servers", bootstrapServers },
59-
{ "group.id", Guid.NewGuid() },
60-
{ "schema.registry.url", schemaRegistryUrl },
61-
{ "error_cb", (Action<ErrorEvent>)(e => Console.WriteLine($"Error [{e.Level}]: {e.Error.Reason}")) }
59+
BootstrapServers = bootstrapServers,
60+
GroupId = Guid.NewGuid().ToString()
6261
};
6362

64-
// Note: Each AvroSerializer and AvroDeserializer instance created below internally creates and manages
65-
// the lifecycle of a CachedSchemaRegistry instance, taking it's configuration properties from the
66-
// dictionary supplied to the Producer/Consumer constructor. This is the most straightforward way
67-
// to use these (de)serializers, however you can also pass a CachedSchemaRegistry instance into the
68-
// constructors directly, which avoids duplicate connections to Schema Registry in the case where
69-
// you're using more than one AvroSerializer/Deserializer. E.g.:
70-
//
71-
// var schemaRegistryConfig = new Dictionary<string, object>
72-
// {
73-
// { "schema.registry.url", schemaRegistryUrl },
74-
// { "schema.registry.connection.timeout.ms", 5000 }, // optional
75-
// { "schema.registry.max.cached.schemas", 10 } // optional
76-
// };
77-
//
78-
// using (var schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig))
79-
// using (var consumer = new Consumer<int, User>(consumerConfig, new AvroDeserializer<int>(schemaRegistryClient), new AvroDeserializer<User>(schemaRegistryClient)))
80-
// using (var producer = new Producer<int, User>(producerConfig, new AvroSerializer<int>(schemaRegistryClient), new AvroSerializer<User>(schemaRegistryClient)))
81-
// {
82-
// ...
83-
8463
// Note: The User class in this project was generated using the Confluent fork of the avrogen.exe tool
8564
// (avaliable from: https://github.com/confluentinc/avro/tree/confluent-fork) which includes modifications
8665
// that prevent namespace clashes with user namespaces that include the identifier 'Avro'. AvroSerializer
@@ -90,8 +69,12 @@ static void Main(string[] args)
9069
CancellationTokenSource cts = new CancellationTokenSource();
9170
var consumeTask = Task.Run(() =>
9271
{
93-
using (var consumer = new Consumer<string, User>(consumerConfig, new AvroDeserializer<string>(), new AvroDeserializer<User>()))
72+
using (var serdeProvider = new AvroSerdeProvider(avroConfig))
73+
using (var consumer = new Consumer<string, User>(consumerConfig, serdeProvider.CreateDeserializer<string>(), serdeProvider.CreateDeserializer<User>()))
9474
{
75+
consumer.OnError += (_, e)
76+
=> Console.WriteLine($"Error: {e.Reason}");
77+
9578
consumer.Subscribe(topicName);
9679

9780
while (!cts.Token.IsCancellationRequested)
@@ -111,7 +94,8 @@ static void Main(string[] args)
11194
}
11295
}, cts.Token);
11396

114-
using (var producer = new Producer<string, User>(producerConfig, new AvroSerializer<string>(), new AvroSerializer<User>()))
97+
using (var serdeProvider = new AvroSerdeProvider(avroConfig))
98+
using (var producer = new Producer<string, User>(producerConfig, serdeProvider.CreateKeySerializer<string>(), serdeProvider.CreateValueSerializer<User>()))
11599
{
116100
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
117101

0 commit comments

Comments
 (0)