fix(impl): Skip ProducerOnSendInterceptor execution when publishing to the local DLQ#234
Conversation
| if (dlqProducer != null) { | ||
| dlqProducer.close(GlobalDLQProductionExceptionHandlerDelegate.GRACEFUL_PERIOD); | ||
| } | ||
| dlqDelegate.configure(configs); |
There was a problem hiding this comment.
Isn't the DlqProducerService is configured 2 times?
There was a problem hiding this comment.
The LogAndSendToDlqExceptionHandlerDelegate.java implements the DeserializationExceptionHandler interface from Kafka Streams, making it a Configurable class. This means its configure() method is automatically invoked by the Kafka Streams runtime during startup.
As part of refactoring, I moved the DLQ producer setup logic to DlqProducerService.
However, since the Kafka Streams runtime does not call configure() on DlqProducerService, I resolved this by explicitly invoking DlqProducerService.configure() from within LogAndSendToDlqExceptionHandlerDelegate.configure().
So the DlqProducerService is strongly dependent from the LogAndSendToDlqExceptionHandlerDelegate implementation as the latter is the one which configure and initialize the first.
| context.forward(record, TopologyProducer.DLQ_SINK_NAME); | ||
| // Re-throw so the exception gets logged | ||
| metrics.microserviceDlqSentCounter().increment(); | ||
| throw e; |
There was a problem hiding this comment.
shouldn't less the exception being thrown?
There was a problem hiding this comment.
As the throw was only meant to log the error (see the comment in the old code "Re-throw so the exception gets logged") I concluded this is no more useful as the DlqProducerService is logging the error as part of the error handling
There was a problem hiding this comment.
Is it not so the TraceDecorator is aware of the error and marks the span in error with the exception as an event?
I hope we have a QuarkusTest testing that flow that makes sure the exception is caught... I think not, we have nothing about tracing here: https://github.com/quarkiverse/quarkus-kafka-streams-processor/blob/main/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateQuarkusTest.java nor any check for exception in TracingQuarkusTest.java
There was a problem hiding this comment.
@edeweerd1A indeed, the exception is actually meant to notify the TracerDecorator that the otel span must be failed. I modify the DlqDecorator to throw the exception after the push to the DLQ and I have created all the necessary span checks in the DlqDecoratorQuarkusTest.
Regarding the LogAndSendToDlqExceptionHandler as this one is intercepting Deserialization Exception, the TracerDecorator is not even registered in the message handling. Hence there's no trace at all.
edeweerd1A
left a comment
There was a problem hiding this comment.
I just have that one doubt
| context.forward(record, TopologyProducer.DLQ_SINK_NAME); | ||
| // Re-throw so the exception gets logged | ||
| metrics.microserviceDlqSentCounter().increment(); | ||
| throw e; |
There was a problem hiding this comment.
Is it not so the TraceDecorator is aware of the error and marks the span in error with the exception as an event?
I hope we have a QuarkusTest testing that flow that makes sure the exception is caught... I think not, we have nothing about tracing here: https://github.com/quarkiverse/quarkus-kafka-streams-processor/blob/main/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/errors/LogAndSendToDlqExceptionHandlerDelegateQuarkusTest.java nor any check for exception in TracingQuarkusTest.java
…o the local DLQ This PR resolves an inconsistency in DLQ handling where processor failures were sent to the LocalDLQ using the topology-defined Kafka producer, causing ProducerOnSendInterceptor logic to be applied and potentially altering the original poisonous message. The refactoring introduces a dedicated raw Kafka producer, explicitly marked for DLQ usage, which bypasses interceptor execution. This producer is now shared between DlqDecorator (for processor failures) and LogAndSendToDlqExceptionHandlerDelegate (for deserialization exceptions), ensuring all LocalDLQ records are pushed unmodified except for DLQ metadata headers, and providing consistent DLQ behavior across all failure scenarios. Fixes: quarkiverse#233
e7053ee to
5573cfc
Compare
This PR resolves an inconsistency in DLQ handling where processor failures were sent to the LocalDLQ using the topology-defined Kafka producer, causing ProducerOnSendInterceptor logic to be applied and potentially altering the original poisonous message.
The refactoring introduces a dedicated raw Kafka producer, explicitly marked for DLQ usage, which bypasses interceptor execution. This producer is now shared between DlqDecorator (for processor failures) and LogAndSendToDlqExceptionHandlerDelegate (for deserialization exceptions), ensuring all LocalDLQ records are pushed unmodified except for DLQ metadata headers, and providing consistent DLQ behavior across all failure scenarios.
Fixes: #233