Skip to content

Commit cdc2d95

Browse files
Dabzsebastienviale
andauthored
KAFKA-16505: Adding dead letter queue in Kafka Streams (#17942)
Implements KIP-1034 to add support of Dead Letter Queue in Kafka Streams. Reviewers: Lucas Brutschy <[email protected]>, Bruno Cadonna <[email protected]> Co-authored-by: Sebastien Viale <[email protected]>
1 parent f52f2b9 commit cdc2d95

24 files changed

+1477
-170
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ public void shouldStopProcessingWhenProcessingExceptionHandlerReturnsNull() {
357357
final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
358358
assertEquals("Fatal user code error in processing error callback", e.getMessage());
359359
assertInstanceOf(NullPointerException.class, e.getCause());
360-
assertEquals("Invalid ProductionExceptionHandler response.", e.getCause().getMessage());
360+
assertEquals("Invalid ProcessingExceptionHandler response.", e.getCause().getMessage());
361361
assertFalse(isExecuted.get());
362362
}
363363
}
@@ -524,15 +524,15 @@ public void shouldVerifySourceRawKeyAndSourceRawValuePresentOrNotInErrorHandlerC
524524

525525
public static class ContinueProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
526526
@Override
527-
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
527+
public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
528528
if (((String) record.key()).contains("FATAL")) {
529529
throw new RuntimeException("KABOOM!");
530530
}
531531
if (((String) record.key()).contains("NULL")) {
532532
return null;
533533
}
534534
assertProcessingExceptionHandlerInputs(context, record, exception);
535-
return ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE;
535+
return Response.resume();
536536
}
537537

538538
@Override
@@ -543,9 +543,9 @@ public void configure(final Map<String, ?> configs) {
543543

544544
public static class FailProcessingExceptionHandlerMockTest implements ProcessingExceptionHandler {
545545
@Override
546-
public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
546+
public Response handleError(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
547547
assertProcessingExceptionHandlerInputs(context, record, exception);
548-
return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
548+
return Response.fail();
549549
}
550550

551551
@Override

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,15 +156,15 @@ public TestHandler() { }
156156
public void configure(final Map<String, ?> configs) { }
157157

158158
@Override
159-
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
160-
final ProducerRecord<byte[], byte[]> record,
161-
final Exception exception) {
159+
public Response handleError(final ErrorHandlerContext context,
160+
final ProducerRecord<byte[], byte[]> record,
161+
final Exception exception) {
162162
if (exception instanceof TimeoutException &&
163163
exception.getCause() != null &&
164164
exception.getCause() instanceof UnknownTopicOrPartitionException) {
165-
return ProductionExceptionHandlerResponse.CONTINUE;
165+
return Response.resume();
166166
}
167-
return ProductionExceptionHandler.super.handle(context, record, exception);
167+
return ProductionExceptionHandler.super.handleError(context, record, exception);
168168
}
169169
}
170170

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,11 @@ public class StreamsConfig extends AbstractConfig {
619619
"support \"classic\" or \"streams\". If \"streams\" is specified, then the streams rebalance protocol will be " +
620620
"used. Otherwise, the classic group protocol will be used.";
621621

622+
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.dead.letter.queue.topic.name";
623+
624+
private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If not null, the default exception handler will build and send a Dead Letter Queue record to the topic with the provided name if an error occurs.\n" +
625+
"If a custom deserialization/production or processing exception handler is set, this parameter is ignored for this handler.";
626+
622627
/** {@code log.summary.interval.ms} */
623628
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
624629
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output interval in milliseconds for logging summary information.\n" +
@@ -991,6 +996,11 @@ public class StreamsConfig extends AbstractConfig {
991996
LogAndFailExceptionHandler.class.getName(),
992997
Importance.MEDIUM,
993998
DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
999+
.define(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
1000+
Type.STRING,
1001+
null,
1002+
Importance.MEDIUM,
1003+
ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC)
9941004
.define(MAX_TASK_IDLE_MS_CONFIG,
9951005
Type.LONG,
9961006
0L,

streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,38 +18,42 @@
1818

1919
import org.apache.kafka.clients.producer.ProducerRecord;
2020
import org.apache.kafka.common.errors.RetriableException;
21+
import org.apache.kafka.streams.StreamsConfig;
2122

2223
import java.util.Map;
2324

25+
import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
26+
2427
/**
2528
* {@code ProductionExceptionHandler} that always instructs streams to fail when an exception
2629
* happens while attempting to produce result records.
2730
*/
2831
public class DefaultProductionExceptionHandler implements ProductionExceptionHandler {
29-
/**
30-
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead.
31-
*/
32-
@SuppressWarnings("deprecation")
33-
@Deprecated
32+
33+
private String deadLetterQueueTopic = null;
34+
3435
@Override
35-
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
36-
final Exception exception) {
36+
public Response handleError(final ErrorHandlerContext context,
37+
final ProducerRecord<byte[], byte[]> record,
38+
final Exception exception) {
3739
return exception instanceof RetriableException ?
38-
ProductionExceptionHandlerResponse.RETRY :
39-
ProductionExceptionHandlerResponse.FAIL;
40+
Response.retry() :
41+
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception));
4042
}
4143

44+
@SuppressWarnings("rawtypes")
4245
@Override
43-
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
44-
final ProducerRecord<byte[], byte[]> record,
45-
final Exception exception) {
46-
return exception instanceof RetriableException ?
47-
ProductionExceptionHandlerResponse.RETRY :
48-
ProductionExceptionHandlerResponse.FAIL;
46+
public Response handleSerializationError(final ErrorHandlerContext context,
47+
final ProducerRecord record,
48+
final Exception exception,
49+
final SerializationExceptionOrigin origin) {
50+
return Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception));
4951
}
5052

53+
5154
@Override
5255
public void configure(final Map<String, ?> configs) {
53-
// ignore
56+
if (configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
57+
deadLetterQueueTopic = String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
5458
}
5559
}

streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717
package org.apache.kafka.streams.errors;
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
import org.apache.kafka.clients.producer.ProducerRecord;
2021
import org.apache.kafka.common.Configurable;
2122
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
2223
import org.apache.kafka.streams.processor.ProcessorContext;
2324

25+
import java.util.Collections;
26+
import java.util.List;
27+
2428
/**
2529
* Interface that specifies how an exception from source node deserialization
2630
* (e.g., reading from Kafka) should be handled.
@@ -63,16 +67,35 @@ default DeserializationHandlerResponse handle(final ProcessorContext context,
6367
* The actual exception.
6468
*
6569
* @return Whether to continue or stop processing.
70+
*
71+
* @deprecated Use {@link #handleError(ErrorHandlerContext, ConsumerRecord, Exception)} instead.
6672
*/
73+
@Deprecated
6774
default DeserializationHandlerResponse handle(final ErrorHandlerContext context,
6875
final ConsumerRecord<byte[], byte[]> record,
6976
final Exception exception) {
7077
return handle(((DefaultErrorHandlerContext) context).processorContext().orElse(null), record, exception);
7178
}
7279

80+
/**
81+
* Inspects a record and the exception received during deserialization.
82+
*
83+
* @param context
84+
* Error handler context.
85+
* @param record
86+
* Record that failed deserialization.
87+
* @param exception
88+
* The actual exception.
89+
*
90+
* @return a {@link Response} object
91+
*/
92+
default Response handleError(final ErrorHandlerContext context, final ConsumerRecord<byte[], byte[]> record, final Exception exception) {
93+
return new Response(Result.from(handle(context, record, exception)), Collections.emptyList());
94+
}
7395
/**
7496
* Enumeration that describes the response from the exception handler.
7597
*/
98+
@Deprecated
7699
enum DeserializationHandlerResponse {
77100
/** Continue processing. */
78101
CONTINUE(0, "CONTINUE"),
@@ -95,4 +118,137 @@ enum DeserializationHandlerResponse {
95118
}
96119
}
97120

121+
/**
122+
* Enumeration that describes the response from the exception handler.
123+
*/
124+
enum Result {
125+
/** Continue processing. */
126+
RESUME(0, "RESUME"),
127+
/** Fail processing. */
128+
FAIL(1, "FAIL");
129+
130+
/**
131+
* An english description for the used option. This is for debugging only and may change.
132+
*/
133+
public final String name;
134+
135+
/**
136+
* The permanent and immutable id for the used option. This can't change ever.
137+
*/
138+
public final int id;
139+
140+
Result(final int id, final String name) {
141+
this.id = id;
142+
this.name = name;
143+
}
144+
145+
/**
146+
* Converts the deprecated enum DeserializationHandlerResponse into the new Result enum.
147+
*
148+
* @param value the old DeserializationHandlerResponse enum value
149+
* @return a {@link Result} enum value
150+
* @throws IllegalArgumentException if the provided value does not map to a valid {@link Result}
151+
*/
152+
private static DeserializationExceptionHandler.Result from(final DeserializationHandlerResponse value) {
153+
switch (value) {
154+
case FAIL:
155+
return Result.FAIL;
156+
case CONTINUE:
157+
return Result.RESUME;
158+
default:
159+
throw new IllegalArgumentException("No Result enum found for old value: " + value);
160+
}
161+
}
162+
}
163+
164+
/**
165+
* Represents the result of handling a deserialization exception.
166+
* <p>
167+
* The {@code Response} class encapsulates a {@link Result},
168+
* indicating whether processing should continue or fail, along with an optional list of
169+
* {@link ProducerRecord} instances to be sent to a dead letter queue.
170+
* </p>
171+
*/
172+
class Response {
173+
174+
private final Result result;
175+
176+
private final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords;
177+
178+
/**
179+
* Constructs a new {@code DeserializationExceptionResponse} object.
180+
*
181+
* @param result the result indicating whether processing should continue or fail;
182+
* must not be {@code null}.
183+
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
184+
*/
185+
private Response(final Result result,
186+
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
187+
this.result = result;
188+
this.deadLetterQueueRecords = deadLetterQueueRecords;
189+
}
190+
191+
/**
192+
* Creates a {@code Response} indicating that processing should fail.
193+
*
194+
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
195+
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status.
196+
*/
197+
public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
198+
return new Response(Result.FAIL, deadLetterQueueRecords);
199+
}
200+
201+
/**
202+
* Creates a {@code Response} indicating that processing should fail.
203+
*
204+
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status.
205+
*/
206+
public static Response fail() {
207+
return fail(Collections.emptyList());
208+
}
209+
210+
/**
211+
* Creates a {@code Response} indicating that processing should continue.
212+
*
213+
* @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}.
214+
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#RESUME} status.
215+
*/
216+
public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) {
217+
return new Response(Result.RESUME, deadLetterQueueRecords);
218+
}
219+
220+
/**
221+
* Creates a {@code Response} indicating that processing should continue.
222+
*
223+
* @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#RESUME} status.
224+
*/
225+
public static Response resume() {
226+
return resume(Collections.emptyList());
227+
}
228+
229+
/**
230+
* Retrieves the deserialization handler result.
231+
*
232+
* @return the {@link Result} indicating whether processing should continue or fail.
233+
*/
234+
public Result result() {
235+
return result;
236+
}
237+
238+
/**
239+
* Retrieves an unmodifiable list of records to be sent to the dead letter queue.
240+
* <p>
241+
* If the list is {@code null}, an empty list is returned.
242+
* </p>
243+
*
244+
* @return an unmodifiable list of {@link ProducerRecord} instances
245+
* for the dead letter queue, or an empty list if no records are available.
246+
*/
247+
public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
248+
if (deadLetterQueueRecords == null) {
249+
return Collections.emptyList();
250+
}
251+
return Collections.unmodifiableList(deadLetterQueueRecords);
252+
}
253+
}
98254
}

0 commit comments

Comments
 (0)