Skip to content

Commit 01cf6b3

Browse files
author
Matt Howlett
committed
merged + updated strongly typed config classes
2 parents 7746169 + 01e4b9a commit 01cf6b3

File tree

100 files changed

+941
-1782
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+941
-1782
lines changed

README.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class Program
9090
{
9191
public static async Task Main(string[] args)
9292
{
93-
var config = new Dictionary<string, object> { { "bootstrap.servers", "localhost:9092" } };
93+
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
9494

9595
// A Producer for sending messages with null keys and UTF-8 encoded values.
9696
using (var p = new Producer<Null, string>(config))
@@ -109,8 +109,8 @@ class Program
109109
}
110110
```
111111

112-
However, a server round-trip is slow (3ms at a minimum; actual latency depends on many factors).
113-
In highly concurrent scenarios you will get high overall throughput out of the producer using
112+
Note that a server round-trip is slow (3ms at a minimum; actual latency depends on many factors).
113+
In highly concurrent scenarios you will achieve high overall throughput out of the producer using
114114
the above approach, but there will be a delay on each `await` call. In stream processing
115115
applications, where you would like to process many messages in rapid succession, you would typically
116116
make use the `BeginProduce` method instead:
@@ -126,7 +126,7 @@ class Program
126126
{
127127
public static void Main(string[] args)
128128
{
129-
var conf = new Dictionary<string, object> { { "bootstrap.servers", "localhost:9092" } };
129+
var conf = new ProducerConfig { BootstrapServers = "localhost:9092" };
130130

131131
Action<DeliveryReportResult<Null, string>> handler = r =>
132132
Console.WriteLine(!r.Error.IsError
@@ -161,11 +161,11 @@ class Program
161161
{
162162
public static void Main(string[] args)
163163
{
164-
var conf = new Dictionary<string, object>
164+
var conf = new ConsumerConfig
165165
{
166-
{ "group.id", "test-consumer-group" },
167-
{ "bootstrap.servers", "localhost:9092" },
168-
{ "auto.offset.reset", "earliest" }
166+
GroupId = "test-consumer-group",
167+
BootstrapServers = "localhost:9092",
168+
AutoOffsetReset = AutoOffsetResetType.Earliest
169169
};
170170

171171
using (var c = new Consumer<Ignore, string>(conf))

examples/AvroBlogExamples/Program.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ static void ProduceGeneric(string bootstrapServers, string schemaRegistryUrl)
3838
var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
3939

4040
using (var serdeProvider = new AvroSerdeProvider(new AvroSerdeProviderConfig { SchemaRegistryUrl = schemaRegistryUrl }))
41-
using (var producer = new Producer<Null, GenericRecord>(producerConfig, null, serdeProvider.CreateValueSerializer<GenericRecord>()))
41+
using (var producer = new Producer<Null, GenericRecord>(producerConfig, null, serdeProvider.GetSerializerGenerator<GenericRecord>()))
4242
{
4343
var logLevelSchema = (EnumSchema)Schema.Parse(
4444
File.ReadAllText("LogLevel.asvc"));
@@ -68,7 +68,7 @@ static void ProduceSpecific(string bootstrapServers, string schemaRegistryUrl)
6868
var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
6969

7070
using (var serdeProvider = new AvroSerdeProvider(new AvroSerdeProviderConfig { SchemaRegistryUrl = schemaRegistryUrl }))
71-
using (var producer = new Producer<Null, MessageTypes.LogMessage>(producerConfig, null, serdeProvider.CreateValueSerializer<MessageTypes.LogMessage>()))
71+
using (var producer = new Producer<Null, MessageTypes.LogMessage>(producerConfig, null, serdeProvider.GetSerializerGenerator<MessageTypes.LogMessage>()))
7272
{
7373
producer.ProduceAsync("log-messages",
7474
new Message<Null, MessageTypes.LogMessage>
@@ -101,7 +101,7 @@ static void ConsumeSpecific(string bootstrapServers, string schemaRegistryUrl)
101101
};
102102

103103
using (var serdeProvider = new AvroSerdeProvider(new AvroSerdeProviderConfig { SchemaRegistryUrl = schemaRegistryUrl }))
104-
using (var consumer = new Consumer<Null, MessageTypes.LogMessage>(consumerConfig, null, serdeProvider.CreateDeserializer<MessageTypes.LogMessage>()))
104+
using (var consumer = new Consumer<Null, MessageTypes.LogMessage>(consumerConfig, null, serdeProvider.GetDeserializerGenerator<MessageTypes.LogMessage>()))
105105
{
106106
consumer.Subscribe("log-messages");
107107

examples/AvroGeneric/Program.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ static void Main(string[] args)
7878
var consumeTask = Task.Run(() =>
7979
{
8080
using (var serdeProvider = new AvroSerdeProvider(avroConfig))
81-
using (var consumer = new Consumer<string, GenericRecord>(consumerConfig, serdeProvider.CreateDeserializer<string>(), serdeProvider.CreateDeserializer<GenericRecord>()))
81+
using (var consumer = new Consumer<string, GenericRecord>(consumerConfig, serdeProvider.GetDeserializerGenerator<string>(), serdeProvider.GetDeserializerGenerator<GenericRecord>()))
8282
{
8383
consumer.OnError += (_, e)
8484
=> Console.WriteLine($"Error: {e.Reason}");
@@ -103,7 +103,7 @@ static void Main(string[] args)
103103
}, cts.Token);
104104

105105
using (var serdeProvider = new AvroSerdeProvider(avroConfig))
106-
using (var producer = new Producer<string, GenericRecord>(producerConfig, serdeProvider.CreateKeySerializer<string>(), serdeProvider.CreateValueSerializer<GenericRecord>()))
106+
using (var producer = new Producer<string, GenericRecord>(producerConfig, serdeProvider.GetSerializerGenerator<string>(), serdeProvider.GetSerializerGenerator<GenericRecord>()))
107107
{
108108
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
109109

examples/AvroSpecific/Program.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ static void Main(string[] args)
7070
var consumeTask = Task.Run(() =>
7171
{
7272
using (var serdeProvider = new AvroSerdeProvider(avroConfig))
73-
using (var consumer = new Consumer<string, User>(consumerConfig, serdeProvider.CreateDeserializer<string>(), serdeProvider.CreateDeserializer<User>()))
73+
using (var consumer = new Consumer<string, User>(consumerConfig, serdeProvider.GetDeserializerGenerator<string>(), serdeProvider.GetDeserializerGenerator<User>()))
7474
{
7575
consumer.OnError += (_, e)
7676
=> Console.WriteLine($"Error: {e.Reason}");
@@ -95,7 +95,7 @@ static void Main(string[] args)
9595
}, cts.Token);
9696

9797
using (var serdeProvider = new AvroSerdeProvider(avroConfig))
98-
using (var producer = new Producer<string, User>(producerConfig, serdeProvider.CreateKeySerializer<string>(), serdeProvider.CreateValueSerializer<User>()))
98+
using (var producer = new Producer<string, User>(producerConfig, serdeProvider.GetSerializerGenerator<string>(), serdeProvider.GetSerializerGenerator<User>()))
9999
{
100100
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
101101

examples/ConfluentCloud/Program.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
using System.Collections.Generic;
1919
using System.Text;
2020
using Confluent.Kafka;
21-
using Confluent.Kafka.Serialization;
2221

2322

2423
namespace ConfluentCloudExample

examples/Consumer/Program.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
using System.Text;
2323
using System.Threading;
2424
using System.Threading.Tasks;
25-
using Confluent.Kafka.Serialization;
2625

2726

2827
/// <summary>

examples/MultiProducer/Program.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
using System;
1818
using System.Text;
1919
using System.Collections.Generic;
20-
using Confluent.Kafka.Serialization;
2120

2221

2322
namespace Confluent.Kafka.Examples.MultiProducer
@@ -35,7 +34,7 @@ public static void Main(string[] args)
3534
using (var producer = new Producer<string, string>(config))
3635
{
3736
// create a producer of different type that reuses producer's Handle.
38-
var producer2 = new Producer<Null, int>(producer.Handle, null, new IntSerializer());
37+
var producer2 = new Producer<Null, int>(producer.Handle, null, Serializers.Int32);
3938

4039
// write (string, string) data to topic "first-topic", statically type checked.
4140
producer.ProduceAsync("first-topic", new Message<string, string> { Key = "my-key-value", Value = "my-value" });

examples/Producer/Program.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
using System.Threading.Tasks;
2323
using System.Collections.Generic;
2424
using Confluent.Kafka;
25-
using Confluent.Kafka.Serialization;
2625

2726

2827
namespace Confluent.Kafka.Examples.Producer

src/Confluent.Kafka.Avro/AvroDeserializer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ namespace Confluent.Kafka.Serialization
3434
/// bytes 1-4: Unique global id of the avro schema that was used for encoding (as registered in Confluent Schema Registry), big endian.
3535
/// following bytes: The serialized data.
3636
/// </remarks>
37-
public class AvroDeserializer<T> : IDeserializer<T>
37+
public class AvroDeserializer<T>
3838
{
3939
private ISchemaRegistryClient schemaRegistryClient;
4040

@@ -82,7 +82,7 @@ public T Deserialize(string topic, ReadOnlySpan<byte> data, bool isNull)
8282
}
8383

8484
/// <summary>
85-
/// Refer to: <see cref="Confluent.Kafka.Serialization.ISerializer{T}.Configure(IEnumerable{KeyValuePair{string, string}}, bool)" />
85+
/// Configure the deserializer.
8686
/// </summary>
8787
public IEnumerable<KeyValuePair<string, string>> Configure(IEnumerable<KeyValuePair<string, string>> config, bool isKey)
8888
{

src/Confluent.Kafka.Avro/AvroSerdeProvider.cs

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace Confluent.Kafka.Serialization
1414
/// </summary>
1515
public class AvroSerdeProvider : IDisposable
1616
{
17-
AvroSerdeProviderConfig config;
17+
IEnumerable<KeyValuePair<string, string>> config;
1818
ISchemaRegistryClient schemaRegistryClient;
1919

2020
/// <summary>
@@ -23,14 +23,14 @@ public class AvroSerdeProvider : IDisposable
2323
/// <param name="config">
2424
/// Configuration properties.
2525
/// </param>
26-
public AvroSerdeProvider(AvroSerdeProviderConfig config)
26+
public AvroSerdeProvider(IEnumerable<KeyValuePair<string, string>> config)
2727
{
2828
this.config = config;
2929
schemaRegistryClient = new CachedSchemaRegistryClient(config);
3030
}
3131

3232
/// <summary>
33-
/// Create a new avro deserializer. Use this deserializer with
33+
/// Create a new avro deserializer generator. Use this with
3434
/// GenericRecord, types generated using the avrogen.exe tool or
3535
/// one of the following primitive types: int, long, float,
3636
/// double, boolean, string, byte[].
@@ -41,46 +41,35 @@ public AvroSerdeProvider(AvroSerdeProviderConfig config)
4141
/// bytes 1-4: Unique global id of the avro schema that was used for encoding (as registered in Confluent Schema Registry), big endian.
4242
/// following bytes: The serialized data.
4343
/// </remarks>
44-
public IDeserializer<T> CreateDeserializer<T>()
44+
public DeserializerGenerator<T> GetDeserializerGenerator<T>()
4545
{
46-
var deserializer = new AvroDeserializer<T>(schemaRegistryClient);
47-
deserializer.Configure(config, false);
48-
return deserializer;
46+
return (forKey) =>
47+
{
48+
var deserializer = new AvroDeserializer<T>(schemaRegistryClient);
49+
deserializer.Configure(config, forKey);
50+
return (string topic, ReadOnlySpan<byte> data, bool isNull) => deserializer.Deserialize(topic, data, isNull);
51+
};
4952
}
5053

5154
/// <summary>
52-
/// Create a new avro serializer for message keys. Use this serializer
53-
/// with GenericRecord, types generated using the avrogen.exe tool or
55+
/// Create a new avro serializer generator. Use this with
56+
/// GenericRecord, types generated using the avrogen.exe tool or
5457
/// one of the following primitive types: int, long, float, double,
5558
/// boolean, string, byte[].
5659
/// </summary>
57-
public ISerializer<T> CreateKeySerializer<T>()
60+
public SerializerGenerator<T> GetSerializerGenerator<T>()
5861
{
59-
var serializer = new AvroSerializer<T>(schemaRegistryClient);
60-
serializer.Configure(config, true);
61-
return serializer;
62+
return (forKey) =>
63+
{
64+
var serializer = new AvroSerializer<T>(schemaRegistryClient);
65+
serializer.Configure(config, true);
66+
return (string topic, T data) => serializer.Serialize(topic, data);
67+
};
6268
}
6369

64-
/// <summary>
65-
/// Create a new avro serializer for message values. Use this serializer
66-
/// with GenericRecord, types generated using the avrogen.exe tool or
67-
/// one of the following primitive types: int, long, float, double,
68-
/// boolean, string, byte[].
69-
/// </summary>
70-
public ISerializer<T> CreateValueSerializer<T>()
71-
{
72-
var serializer = new AvroSerializer<T>(schemaRegistryClient);
73-
serializer.Configure(config, false);
74-
return serializer;
75-
}
76-
7770
/// <summary>
7871
/// Releases all resources owned by this object.
7972
/// </summary>
80-
public void Dispose()
81-
{
82-
schemaRegistryClient.Dispose();
83-
}
73+
public void Dispose() => schemaRegistryClient.Dispose();
8474
}
8575
}
86-

0 commit comments

Comments
 (0)