GH-4328: Expose native Kafka Streams DLQ configuration#4360
GH-4328: Expose native Kafka Streams DLQ configuration#4360loicgreffier wants to merge 1 commit intospring-projects:mainfrom
Conversation
6c94761 to
a0db242
Compare
| * | ||
| */ | ||
| public class RecoveringDeserializationExceptionHandler implements DeserializationExceptionHandler { | ||
| public class RecoveringDeserializationExceptionHandler |
There was a problem hiding this comment.
Native DLQ in RecoveringDeserializationExceptionHandler (and the shared base logic) matches what we wanted on #4328. We’re on 4.1, so if something needs to change for the right long-term shape, that’s fine as long as we deprecate the old path and make the migration obvious.
There was a problem hiding this comment.
I did have one concern regarding the DeadLetterPublishingRecoverer#HeaderNames class, whether it should be moved to new DeadLetterRecordManager class. This would require deprecating DeadLetterPublishingRecoverer#HeaderNames in favor of DeadLetterRecordManager#HeaderNames, as well as public accessors like DeadLetterPublishingRecoverer#setHeaderNamesSupplier. But this move might not be necessary.
Regarding the RecoveringDeserializationExceptionHandler, it is currently backward compatible. We could consider dropping support for the Recoverer in favor of the native DLQ, but this would only work if the DeadLetterPublishingRecoverer implementation is used, if a custom implementation is used there would be no migration path.
There was a problem hiding this comment.
Agree on both — HeaderNames move can be a follow-up if we decide it's worth the API churn. On the recoverer: keeping it is the right call since custom implementations have no native DLQ migration path; deprecating the old property name constant is enough for now.
...afka/src/main/java/org/springframework/kafka/streams/AbstractRecoveringExceptionHandler.java
Show resolved
Hide resolved
...afka/src/main/java/org/springframework/kafka/streams/AbstractRecoveringExceptionHandler.java
Outdated
Show resolved
Hide resolved
...afka/src/main/java/org/springframework/kafka/streams/AbstractRecoveringExceptionHandler.java
Show resolved
Hide resolved
...ka/src/main/java/org/springframework/kafka/listener/NativeDeadLetterDestinationResolver.java
Outdated
Show resolved
Hide resolved
...ng-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java
Show resolved
Hide resolved
...ka/src/main/java/org/springframework/kafka/streams/RecoveringProductionExceptionHandler.java
Outdated
Show resolved
Hide resolved
...ka/src/main/java/org/springframework/kafka/streams/RecoveringProductionExceptionHandler.java
Outdated
Show resolved
Hide resolved
...ka/src/main/java/org/springframework/kafka/streams/RecoveringProcessingExceptionHandler.java
Outdated
Show resolved
Hide resolved
...ka/src/main/java/org/springframework/kafka/streams/RecoveringProcessingExceptionHandler.java
Outdated
Show resolved
Hide resolved
|
@loicgreffier Many thanks for the draft PR. I added some initial feedback that I wanted to convey. Please take a look and see what you think. Overall, the PR looks great. Some general things to keep in mind - we need to add a reference docs section for this (and some javadoc comments - see my inline comments). We also need a new entry in the |
8411197 to
6ec0036
Compare
| The default value of `sameIntervalTopicReuseStrategy` in `RetryTopicConfigurationBuilder` has been changed from `MULTIPLE_TOPICS` to `SINGLE_TOPIC` to align with the `@RetryableTopic` annotation default. | ||
| See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information. | ||
|
|
||
| [[x41-kafka-streams-native-dlq]] |
There was a problem hiding this comment.
Could you add a link to the relevant ref docs sections from whats-new? Look at the other entries here for examples.
There was a problem hiding this comment.
I’ve updated the whats-new section with multiple links to the reference documentation
|
|
||
| - If a xref:streams.adoc#dead-letter-destination-resolver[`KafkaStreamsDeadLetterDestinationResolver`] is defined, resume the stream and forward the failed record to the resolved topic-partition using the native Kafka Streams DLQ. | ||
| - If `errors.dead.letter.queue.topic.name` is defined and set to a topic name, resume the stream and forward the failed record to that topic using the native Kafka Streams DLQ. | ||
| - If a `ConsumerRecordRecoverer` implementation is defined, evaluate it, and resume the stream with no dead-letter records, as it is expected to be handled by the `ConsumerRecordRecoverer`. The framework provides the `DeadLetterPublishingRecoverer` which sends the failed record to a dead-letter topic. |
There was a problem hiding this comment.
Our preference is one sentence per asciidoc line. If you can change it like that, would be great, but not a big deal, I can do that on merge also.
There was a problem hiding this comment.
I've updated the section so it now renders 4 bullet points describing the 4 recovery paths one after the other. So this #4360 (comment) is basically fixed.
Explanations regarding bullet point 3, about the ConsumerRecordRecoverer/DeadLetterPublishingRecoverer appear right after bullet point 4... Maybe it would be better to have a dedicated section on how to configure the DeadLetterPublishingRecoverer for an exception handler.
|
|
||
| - The Kafka Streams property `errors.dead.letter.queue.topic.name`. | ||
| - An implementation of the new `KafkaStreamsDeadLetterDestinationResolver` functional interface for dynamic resolution. | ||
| - Through the `setDeadLetterQueueTopic()` of the `StreamsBuilderFactoryBean`. |
There was a problem hiding this comment.
Isn't it setDeadLetterTopicName?
| */ | ||
| @FunctionalInterface | ||
| public interface KafkaStreamsDeadLetterDestinationResolver { | ||
| TopicPartition apply(ErrorHandlerContext errorHandlerContext, ConsumerRecord<?, ?> record, Exception exception); |
There was a problem hiding this comment.
apply sounds like a function - how about resolve?
There was a problem hiding this comment.
Changed from apply to resolve
|
|
||
| Of course, the `recoverer()` bean can be your own implementation of `ConsumerRecordRecoverer`. | ||
|
|
||
| - Fail the stream with no dead-letter records. |
There was a problem hiding this comment.
Did you try to render this? Looks like an orphaned bullet item in the list. Might want to align this.
|
@loicgreffier Some more minor nitpicks. Once you address those, feel free to remove the draft status on the PR. |
|
@sobychacko I've removed the draft status. Regarding the documentation:
If any sentence or wording isn't clear and need to be revised, please don't hesitate to let me know. EDIT: I've added a word about the headers that are being attached to the dead-letter record. |
e4ecab1 to
4e1dc01
Compare
| - If xref:streams.adoc#dead-letter-queue-topic-name-property[`errors.dead.letter.queue.topic.name`] is defined and set to a topic name, resume the stream and forward the failed record to that topic using the native Kafka Streams DLQ. | ||
| - If a `ConsumerRecordRecoverer` implementation is defined, invoke it and resume the stream without producing dead-letter records, as handling is delegated to the `ConsumerRecordRecoverer`. | ||
| - Fail the stream without producing dead-letter records. | ||
|
|
There was a problem hiding this comment.
The DeadLetterPublishingRecoverer explanation and code example currently appear after the "Fail" bullet above, which makes it look like it's related to the fail case. Since it's really elaborating on bullet 3 (the ConsumerRecordRecoverer path), could you either:
• Move it to right before the "Fail" bullet so it immediately follows the bullet it explains, or
• Pull it into a dedicated sub-section right after the bullet list, with a note in bullet 3 pointing to it — similar to how the resolver and DLQ topic have their own sections below. (This option might be the cleanest). What do you think?
There was a problem hiding this comment.
Pull it into a dedicated sub-section right after the bullet list, with a note in bullet 3 pointing to it — similar to how the resolver and DLQ topic have their own sections below. (This option might be the cleanest). What do you think?
It aligns with #4360 (comment). I've updated this part to include a dedicated section for DeadLetterPublishingRecoverer.
|
|
||
| - The Kafka Streams property xref:streams.adoc#dead-letter-queue-topic-name-property[`errors.dead.letter.queue.topic.name`]. | ||
| - An implementation of the new xref:streams.adoc#dead-letter-destination-resolver[`KafkaStreamsDeadLetterDestinationResolver`] functional interface for dynamic resolution. | ||
| - The `setDeadLetterTopicName()` method of the `StreamsBuilderFactoryBean`. |
There was a problem hiding this comment.
Is there an xref we can add for this? since we link for everything else.
There was a problem hiding this comment.
The StreamsBuilderFactoryBean updates are documented in the new Dead Letter Queue Topic Name Property section. I've added an xref to that section.
I've wondered whether a dedicated section for StreamsBuilderFactoryBean would be worth it, but there's not that much to say
|
|
||
| Version 4.1 introduces the `RecoveringProcessingExceptionHandler`, which can take some action when an exception occurs during stream processing. | ||
| It implements the `ProcessingExceptionHandler` interface (refer to the Kafka documentation for details), introduced by https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occurring+during+processing[KIP-1033] | ||
| and follows the xref:streams.adoc#recovery-strategies[Spring for Apache Kafka recovery strategies]. |
There was a problem hiding this comment.
Each handler says "follows the recovery strategies" with a forward xref. Flipping the order — Recovery Strategies first, then the handlers — would read more naturally for someone going top to bottom. What do you think?
There was a problem hiding this comment.
Correct, especially since the Recovery Strategies section begins by introducing the 3 handlers provided by the framework:
The framework provides the following exception handlers, which follow the same recovery strategies:
- xref:streams.adoc#streams-deser-recovery[`RecoveringDeserializationExceptionHandler`]
- xref:streams.adoc#streams-processing-recovery[`RecoveringProcessingExceptionHandler`]
- xref:streams.adoc#streams-production-recovery[`RecoveringProductionExceptionHandler`]
Fixed!
| */ | ||
| @FunctionalInterface | ||
| public interface KafkaStreamsDeadLetterDestinationResolver { | ||
| TopicPartition resolve(ErrorHandlerContext errorHandlerContext, ConsumerRecord<?, ?> record, Exception exception); |
There was a problem hiding this comment.
A method-level Javadoc on resolve would be a nice addition. For example, in the docs we show return new TopicPartition("dlqTopic1", -1) — it would help to briefly explain what the -1 partition value means. It's a Kafka concept rather than a Spring one, but a short note like "use -1 to let the producer's default partitioner choose the partition" would save users from having to dig into Kafka docs to understand it.
There was a problem hiding this comment.
I've added the Javadoc.
Regarding the new TopicPartition("dlqTopic1", -1), this originally comes from this current documentation section: https://docs.spring.io/spring-kafka/reference/streams.html#streams-deser-recovery.
-1 is currently handled by the DeadLetterPublishingRecoverer:
null before constructing the ProducerRecord.
This is documented here already: https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html#dead-letters. The new Streams > Dead Letter Publishing Recoverer section links to it.
Still worth reiterating this in Javadoc and new Dead Letter Destination Resolver section.
|
@loicgreffier Thanks for all the updates. I added a final round of PR review comments. Hopefully, once that is addressed, we can take it for a final spin of review and merge. Btw - can you update the commit message with some more details about the rationale for the changes. We usually follow these guidelines for commit messages: https://cbea.ms/git-commit/ Thanks! |
|
Could you take care of the commit message? (See my comment above). Also, please sign your commits following DCO - https://spring.io/blog/2025/01/06/hello-dco-goodbye-cla-simplifying-contributions-to-spring |
62aa2f5 to
7f933c1
Compare
Extract the DLT record header-building logic into a new DeadLetterRecordManager class so it can be used outside of DeadLetterPublishingRecoverer. Add two new handlers, RecoveringProcessingExceptionHandler and RecoveringProductionExceptionHandler, to handle processing and production errors, respectively. Add a common AbstractRecoveringExceptionHandler that centralizes the shared error-handling logic for all provided recovering exception handlers. Add a KafkaStreamsDeadLetterDestinationResolver to allow users to define dead-letter routing logic used by the Kafka Streams native DLQ in the provided exception handler implementations. Update StreamsBuilderFactoryBean to expose a dead-letter queue topic name as a possible destination for all provided recovering exception handler implementations. Fixes: spring-projects#4328 Signed-off-by: Loïc Greffier <loic.greffier@michelin.com>
|
Commit message has been changed, accordingly to the original issue title |
The PR addresses the 4 points mentioned in issue #4328:
RecoveringDeserializationExceptionHandlerleverage the new Kafka Streams native DLQ introduced by KIP-1034.NativeDeadLetterDestinationResolveras an alternative to resolve the Kafka Streams native DLQ destination.RecoveringProcessingExceptionHandlerandRecoveringProductionExceptionHandler.To answer these points:
New DeadLetterRecordManager class. I've introduced a new
DeadLetterRecordManagerclass as a common place to build DLT headers, so the utilities for building headers can be leveraged by both theDeadLetterPublishingRecovererand exception handlers. I preferred this over a static utility class because of the number of parameters. One instance per handler and for theDeadLetterPublishingRecoverer.New
RecoveringProcessingExceptionHandlerandRecoveringProductionExceptionHandlerimplementations with the same recovery logic asRecoveringDeserializationExceptionHandler. I've introduced a newAbstractRecoveringExceptionHandlerthat holds the commons for all handlers.New NativeDeadLetterDestinationResolver interface. Allow users to define a native DLQ routing logic based on 3 given parameters: the
ErrorHandlerContext, the source record asConsumerRecordand the exception. All handlers leverage it. For now, it is loaded in the same way as theDeadLetterPublishingRecovererwas.I’ve updated the tests to cover the 3 recovering handler implementations. I introduced an
AbstractRecoveringExceptionHandlerTestsclass that contains commons tests. Child classes override some methods to create the corresponding handler and assert the related handling response.Concerns:
HeaderNamesclass is still located within theDeadLetterPublishingRecovererclass. It might be better to move it toDeadLetterRecordManager, but this would be a breaking change.Closes #4328