Skip to content

Commit 5174eba

Browse files
authored
audit of exception use across library (#773)
* audit of exception use across library * ..ExceptionResult -> ..Report * trace.fail -> fatal * remove now unneeded using statements * remove ignored cancellation token parameters * reveiw feedback changes + revert ErrorCode.Unknown - leave as todo * update changelog * addressing review feedback
1 parent 74bef73 commit 5174eba

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+525
-398
lines changed

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# 1.0.x (unreleased)
2+
3+
## New Features
4+
5+
- Added GET subject versions to the cached schema registry client.
6+
7+
## Fixes
8+
9+
- Corrected an error in the `rd_kafka_event_type` method signature which was causing incompatibility with mono.
10+
- Audited exception use across the library and made changes in various places where appropriate.
11+
- Removed unused CancellationToken parameters (we will add them back when implemented).
12+
- Builder classes now return interfaces, not concrete classes.
13+
14+
115
# 1.0.0-beta3
216

317
## New Features

README.md

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class Program
100100
var dr = await p.ProduceAsync("test-topic", new Message<Null, string> { Value="test" });
101101
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
102102
}
103-
catch (KafkaException e)
103+
catch (ProduceException<Null, string> e)
104104
{
105105
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
106106
}
@@ -177,25 +177,26 @@ class Program
177177
cts.Cancel();
178178
};
179179

180-
while (!cts.IsCancellationRequested)
180+
try
181181
{
182-
try
183-
{
184-
var cr = c.Consume(cts.Token);
185-
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
186-
}
187-
catch (ConsumeException e)
182+
while (true)
188183
{
189-
Console.WriteLine($"Error occured: {e.Error.Reason}");
190-
}
191-
catch (OperationCanceledException)
192-
{
193-
break;
184+
try
185+
{
186+
var cr = c.Consume(cts.Token);
187+
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
188+
}
189+
catch (ConsumeException e)
190+
{
191+
Console.WriteLine($"Error occured: {e.Error.Reason}");
192+
}
194193
}
195194
}
196-
197-
// Ensure the consumer leaves the group cleanly and final offsets are committed.
198-
c.Close();
195+
catch (OperationCanceledException)
196+
{
197+
// Ensure the consumer leaves the group cleanly and final offsets are committed.
198+
c.Close();
199+
}
199200
}
200201
}
201202
}
@@ -227,11 +228,11 @@ For more information about working with Avro in .NET, refer to the the blog post
227228

228229
Errors delivered to a client's error handler should be considered informational except when the `IsFatal` flag
229230
is set to `true`, indicating that the client is in an un-recoverable state. Currently, this can only happen on
230-
the producer, and only when `enable.itempotence` has been set to `true`. In all other scenarios, clients are
231-
able to recover from all errors automatically.
231+
the producer, and only when `enable.idempotence` has been set to `true`. In all other scenarios, clients will
232+
attempt to recover from all errors automatically.
232233

233234
Although calling most methods on the clients will result in a fatal error if the client is in an un-recoverable
234-
state, you should generally only need to explicitly check for fatal errors in your `OnError` handler, and handle
235+
state, you should generally only need to explicitly check for fatal errors in your error handler, and handle
235236
this scenario there.
236237

237238
#### Producer
@@ -246,10 +247,8 @@ will be thrown.
246247

247248
#### Consumer
248249

249-
If you are using the deserializing version of the `Consumer`, any error encountered during deserialization (which
250-
happens during your call to `Consume`) will throw a `DeserializationException`. All other `Consume` errors will
251-
result in a `ConsumeException` with further information about the error and context available via the `Error` and
252-
`ConsumeResult` fields.
250+
All `Consume` errors will result in a `ConsumeException` with further information about the error and context
251+
available via the `Error` and `ConsumeResult` fields.
253252

254253

255254
### Confluent Cloud

examples/AvroBlogExamples/Program.cs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -114,24 +114,29 @@ static void ConsumeSpecific(string bootstrapServers, string schemaRegistryUrl)
114114
{
115115
consumer.Subscribe("log-messages");
116116

117-
while (!cts.IsCancellationRequested)
117+
try
118118
{
119-
try
119+
while (true)
120120
{
121-
var consumeResult = consumer.Consume(cts.Token);
121+
try
122+
{
123+
var consumeResult = consumer.Consume(cts.Token);
122124

123-
Console.WriteLine(
124-
consumeResult.Message.Timestamp.UtcDateTime.ToString("yyyy-MM-dd HH:mm:ss")
125-
+ $": [{consumeResult.Value.Severity}] {consumeResult.Value.Message}");
126-
}
127-
catch (ConsumeException e)
128-
{
129-
Console.WriteLine($"an error occured: {e.Error.Reason}");
125+
Console.WriteLine(
126+
consumeResult.Message.Timestamp.UtcDateTime.ToString("yyyy-MM-dd HH:mm:ss")
127+
+ $": [{consumeResult.Value.Severity}] {consumeResult.Value.Message}");
128+
}
129+
catch (ConsumeException e)
130+
{
131+
Console.WriteLine($"an error occured: {e.Error.Reason}");
132+
}
130133
}
131134
}
132-
133-
// commit final offsets and leave the group.
134-
consumer.Close();
135+
catch (OperationCanceledException)
136+
{
137+
// commit final offsets and leave the group.
138+
consumer.Close();
139+
}
135140
}
136141
}
137142

examples/AvroGeneric/Program.cs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,23 +68,29 @@ static async Task Main(string[] args)
6868
{
6969
consumer.Subscribe(topicName);
7070

71-
while (!cts.Token.IsCancellationRequested)
71+
try
7272
{
73-
try
73+
while (true)
7474
{
75-
var consumeResult = consumer.Consume(cts.Token);
75+
try
76+
{
77+
var consumeResult = consumer.Consume(cts.Token);
7678

77-
Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
78-
}
79-
catch (ConsumeException e)
80-
{
81-
Console.WriteLine("Consume error: " + e.Error.Reason);
79+
Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
80+
}
81+
catch (ConsumeException e)
82+
{
83+
Console.WriteLine($"Consume error: {e.Error.Reason}");
84+
}
8285
}
8386
}
84-
85-
consumer.Close();
87+
catch (OperationCanceledException)
88+
{
89+
// commit final offsets and leave the group.
90+
consumer.Close();
91+
}
8692
}
87-
}, cts.Token);
93+
});
8894

8995
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
9096
using (var producer =

examples/AvroSpecific/Program.cs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,23 +87,28 @@ static async Task Main(string[] args)
8787
{
8888
consumer.Subscribe(topicName);
8989

90-
while (!cts.Token.IsCancellationRequested)
90+
try
9191
{
92-
try
92+
while (true)
9393
{
94-
var consumeResult = consumer.Consume(cts.Token);
95-
96-
Console.WriteLine($"user key name: {consumeResult.Message.Key}, user value favorite color: {consumeResult.Value.favorite_color}");
97-
}
98-
catch (ConsumeException e)
99-
{
100-
Console.WriteLine("Consume error: " + e.Error.Reason);
94+
try
95+
{
96+
var consumeResult = consumer.Consume(cts.Token);
97+
98+
Console.WriteLine($"user key name: {consumeResult.Message.Key}, user value favorite color: {consumeResult.Value.favorite_color}");
99+
}
100+
catch (ConsumeException e)
101+
{
102+
Console.WriteLine($"Consume error: {e.Error.Reason}");
103+
}
101104
}
102105
}
103-
104-
consumer.Close();
106+
catch (OperationCanceledException)
107+
{
108+
consumer.Close();
109+
}
105110
}
106-
}, cts.Token);
111+
});
107112

108113
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
109114
using (var producer =

examples/Consumer/Program.cs

Lines changed: 60 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -79,40 +79,53 @@ public static void Run_Consume(string brokerList, List<string> topics, Cancellat
7979
{
8080
consumer.Subscribe(topics);
8181

82-
while (!cancellationToken.IsCancellationRequested)
82+
try
8383
{
84-
try
84+
while (true)
8585
{
86-
var consumeResult = consumer.Consume(cancellationToken);
87-
88-
if (consumeResult.IsPartitionEOF)
86+
try
8987
{
90-
Console.WriteLine(
91-
$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}.");
92-
93-
continue;
88+
var consumeResult = consumer.Consume(cancellationToken);
89+
90+
if (consumeResult.IsPartitionEOF)
91+
{
92+
Console.WriteLine(
93+
$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
94+
95+
continue;
96+
}
97+
98+
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");
99+
100+
if (consumeResult.Offset % commitPeriod == 0)
101+
{
102+
// The Commit method sends a "commit offsets" request to the Kafka
103+
// cluster and synchronously waits for the response. This is very
104+
// slow compared to the rate at which the consumer is capable of
105+
// consuming messages. A high performance application will typically
106+
// commit offsets relatively infrequently and be designed handle
107+
// duplicate messages in the event of failure.
108+
try
109+
{
110+
consumer.Commit(consumeResult);
111+
}
112+
catch (KafkaException e)
113+
{
114+
Console.WriteLine($"Commit error: {e.Error.Reason}");
115+
}
116+
}
94117
}
95-
96-
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");
97-
98-
if (consumeResult.Offset % commitPeriod == 0)
118+
catch (ConsumeException e)
99119
{
100-
// The Commit method sends a "commit offsets" request to the Kafka
101-
// cluster and synchronously waits for the response. This is very
102-
// slow compared to the rate at which the consumer is capable of
103-
// consuming messages. A high performance application will typically
104-
// commit offsets relatively infrequently and be designed handle
105-
// duplicate messages in the event of failure.
106-
consumer.Commit(consumeResult);
120+
Console.WriteLine($"Consume error: {e.Error.Reason}");
107121
}
108122
}
109-
catch (ConsumeException e)
110-
{
111-
Console.WriteLine($"Consume error: {e.Error}");
112-
}
113123
}
114-
115-
consumer.Close();
124+
catch (OperationCanceledException)
125+
{
126+
Console.WriteLine("Closing consumer.");
127+
consumer.Close();
128+
}
116129
}
117130
}
118131

@@ -143,28 +156,34 @@ public static void Run_ManualAssign(string brokerList, List<string> topics, Canc
143156
{
144157
consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList());
145158

146-
while (!cancellationToken.IsCancellationRequested)
159+
try
147160
{
148-
try
161+
while (true)
149162
{
150-
var consumeResult = consumer.Consume(cancellationToken);
151-
// Note: End of partition notification has not been enabled, so
152-
// it is guaranteed that the ConsumeResult instance corresponds
153-
// to a Message, and not a PartitionEOF event.
154-
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
155-
}
156-
catch (ConsumeException e)
157-
{
158-
Console.WriteLine($"Consume error: {e.Error}");
163+
try
164+
{
165+
var consumeResult = consumer.Consume(cancellationToken);
166+
// Note: End of partition notification has not been enabled, so
167+
// it is guaranteed that the ConsumeResult instance corresponds
168+
// to a Message, and not a PartitionEOF event.
169+
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
170+
}
171+
catch (ConsumeException e)
172+
{
173+
Console.WriteLine($"Consume error: {e.Error.Reason}");
174+
}
159175
}
160176
}
161-
162-
consumer.Close();
177+
catch (OperationCanceledException)
178+
{
179+
Console.WriteLine("Closing consumer.");
180+
consumer.Close();
181+
}
163182
}
164183
}
165184

166185
private static void PrintUsage()
167-
=> Console.WriteLine("Usage: .. <poll|consume|manual> <broker,broker,..> <topic> [topic..]");
186+
=> Console.WriteLine("Usage: .. <subscribe|manual> <broker,broker,..> <topic> [topic..]");
168187

169188
public static void Main(string[] args)
170189
{
@@ -188,7 +207,7 @@ public static void Main(string[] args)
188207

189208
switch (mode)
190209
{
191-
case "consume":
210+
case "subscribe":
192211
Run_Consume(brokerList, topics, cts.Token);
193212
break;
194213
case "manual":

examples/Producer/Program.cs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,20 +92,15 @@ public static async Task Main(string[] args)
9292

9393
try
9494
{
95-
// Notes:
96-
// 1. Awaiting the asynchronous produce request below prevents flow of execution
97-
// from proceeding until the acknowledgement from the broker is received (at
98-
// the expense of low throughput).
99-
// 2. Producing a message with type arguments will use the default serializers
100-
// registered for the types (in this case, UTF8 for both key and value). To
101-
// override the default serializers or provide serializers for your custom
102-
// types, you can use the RegisterSerializer method.
95+
// Notes: Awaiting the asynchronous produce request below prevents flow of execution
96+
// from proceeding until the acknowledgement from the broker is received (at the
97+
// expense of low throughput).
10398
var deliveryReport = await producer.ProduceAsync(
10499
topicName, new Message<string, string> { Key = key, Value = val });
105100

106101
Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
107102
}
108-
catch (KafkaException e)
103+
catch (ProduceException<string, string> e)
109104
{
110105
Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
111106
}

0 commit comments

Comments
 (0)