Skip to content

Commit ba920ff

Browse files
lmartella1edeweerd1A
authored andcommitted
fix(impl): Skip ProducerOnSendInterceptor execution when publishing to the local DLQ
This PR resolves an inconsistency in DLQ handling where processor failures were sent to the LocalDLQ using the topology-defined Kafka producer, causing ProducerOnSendInterceptor logic to be applied and potentially altering the original poisonous message. The refactoring introduces a dedicated raw Kafka producer, explicitly marked for DLQ usage, which bypasses interceptor execution. This producer is now shared between DlqDecorator (for processor failures) and LogAndSendToDlqExceptionHandlerDelegate (for deserialization exceptions), ensuring all LocalDLQ records are pushed unmodified except for DLQ metadata headers, and providing consistent DLQ behavior across all failure scenarios. Fixes: #233
1 parent 7c4cae1 commit ba920ff

File tree

12 files changed

+876
-376
lines changed

12 files changed

+876
-376
lines changed

impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/TopologyProducer.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,12 @@
2929
import jakarta.inject.Provider;
3030

3131
import org.apache.kafka.common.serialization.Deserializer;
32-
import org.apache.kafka.common.serialization.Serializer;
3332
import org.apache.kafka.streams.KafkaClientSupplier;
3433
import org.apache.kafka.streams.Topology;
3534
import org.apache.kafka.streams.processor.api.Processor;
3635
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
3736

3837
import io.cloudevents.kafka.CloudEventDeserializer;
39-
import io.cloudevents.kafka.CloudEventSerializer;
4038
import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer;
4139
import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor;
4240
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.DefaultConfigurationCustomizer;
@@ -59,11 +57,6 @@ public class TopologyProducer {
5957
*/
6058
public static final String PROCESSOR_NAME = "Processor";
6159

62-
/**
63-
* Default sink name of the dead-letter queue
64-
*/
65-
public static final String DLQ_SINK_NAME = "DLQ";
66-
6760
/**
6861
* Class containing the configuration related to kafka streams processor
6962
*/
@@ -194,11 +187,6 @@ public Topology topology(TopologyConfigurationImpl configuration,
194187
sinkToTopicMapping
195188
.forEach((String sink, String topic) -> topology.addSink(sink, topic, configuration.getSinkKeySerializer(),
196189
configuration.getSinkValueSerializer(), PROCESSOR_NAME));
197-
if (kStreamsProcessorConfig.dlq().topic().isPresent()) {
198-
topology.addSink(DLQ_SINK_NAME, kStreamsProcessorConfig.dlq().topic().get(),
199-
configuration.getSourceKeySerde().serializer(),
200-
getSourceValueSerializer(configuration, kStreamsProcessorConfig), PROCESSOR_NAME);
201-
}
202190

203191
configuration.getStoreConfigurations()
204192
.forEach(storeConfiguration -> topology.addStateStore(storeConfiguration.getStoreBuilder(), PROCESSOR_NAME));
@@ -218,17 +206,6 @@ private Deserializer<?> getSourceValueDeserializer(TopologyConfigurationImpl con
218206
return configuration.getSourceValueSerde().deserializer();
219207
}
220208

221-
private Serializer<?> getSourceValueSerializer(TopologyConfigurationImpl configuration,
222-
KStreamsProcessorConfig kStreamsProcessorConfig) {
223-
// defaulting to CloudEventSerializer if kafkastreamsprocessor.input.is-cloud-event is true
224-
if (kStreamsProcessorConfig.input().isCloudEvent()) {
225-
CloudEventSerializer serializer = new CloudEventSerializer();
226-
serializer.configure(kStreamsProcessorConfig.dlq().cloudEventSerializerConfig(), false);
227-
return serializer;
228-
}
229-
return configuration.getSourceValueSerde().serializer();
230-
}
231-
232209
private void addGlobalStores(TopologyConfigurationImpl configuration, Topology topology) {
233210
configuration.getGlobalStoreConfigurations()
234211
.forEach(config -> {

impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/DlqDecorator.java

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
*/
2020
package io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor;
2121

22+
import static org.apache.kafka.common.record.TimestampType.CREATE_TIME;
23+
2224
import java.util.Optional;
2325
import java.util.Set;
2426

@@ -27,7 +29,9 @@
2729
import jakarta.enterprise.context.Dependent;
2830
import jakarta.inject.Inject;
2931

32+
import org.apache.kafka.clients.consumer.ConsumerRecord;
3033
import org.apache.kafka.common.KafkaException;
34+
import org.apache.kafka.common.serialization.Serializer;
3135
import org.apache.kafka.streams.processor.To;
3236
import org.apache.kafka.streams.processor.api.ContextualProcessor;
3337
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
@@ -37,12 +41,12 @@
3741
import org.apache.kafka.streams.processor.api.RecordMetadata;
3842
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
3943

44+
import io.cloudevents.kafka.CloudEventSerializer;
45+
import io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration;
4046
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
4147
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.ProcessorDecoratorPriorities;
42-
import io.quarkiverse.kafkastreamsprocessor.impl.TopologyProducer;
43-
import io.quarkiverse.kafkastreamsprocessor.impl.errors.DlqMetadataHandler;
48+
import io.quarkiverse.kafkastreamsprocessor.impl.errors.DlqProducerService;
4449
import io.quarkiverse.kafkastreamsprocessor.impl.errors.ErrorHandlingStrategy;
45-
import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics;
4650
import io.quarkiverse.kafkastreamsprocessor.spi.SinkToTopicMappingBuilder;
4751
import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;
4852
import lombok.AccessLevel;
@@ -68,56 +72,66 @@ public class DlqDecorator extends AbstractProcessorDecorator {
6872
private final Set<String> functionalSinks;
6973

7074
/**
71-
* Tool to enrich a message metadata before its storage in the dead letter queue
75+
* Whether the dead-letter queue mechanism is activated for this microservice
7276
*/
73-
private final DlqMetadataHandler dlqMetadataHandler;
77+
private final boolean activated;
7478

7579
/**
76-
* container of all metrics of the framework
80+
* Delegate that handles DLQ message production
7781
*/
78-
private final KafkaStreamsProcessorMetrics metrics;
82+
private final DlqProducerService dlqDelegate;
7983

8084
/**
81-
* Whether the dead-letter queue mechanism is activated for this microservice
85+
* The configuration of the Kafka Streams processor
8286
*/
83-
private final boolean activated;
87+
private final Configuration configuration;
88+
89+
/**
90+
* Class containing the configuration related to kafka streams processor
91+
*/
92+
private final KStreamsProcessorConfig kStreamsProcessorConfig;
8493

8594
/**
8695
* Keeping a reference to the ProcessorContext to be able to use it in the {@link Processor#process(Record)} method
8796
* whilst not implementing the little too narrowing {@link ContextualProcessor}.
8897
*/
8998
private ProcessorContext context;
9099

91-
DlqDecorator(Set<String> functionalSinks, DlqMetadataHandler dlqMetadataHandler,
92-
KafkaStreamsProcessorMetrics metrics, boolean activated) {
100+
DlqDecorator(Set<String> functionalSinks, boolean activated,
101+
DlqProducerService dlqDelegate,
102+
Configuration configuration,
103+
KStreamsProcessorConfig kStreamsProcessorConfig) {
93104
this.functionalSinks = functionalSinks;
94-
this.dlqMetadataHandler = dlqMetadataHandler;
95-
this.metrics = metrics;
96105
this.activated = activated;
106+
this.dlqDelegate = dlqDelegate;
107+
this.configuration = configuration;
108+
this.kStreamsProcessorConfig = kStreamsProcessorConfig;
97109
}
98110

99111
/**
100112
* Injection constructor
101113
*
102114
* @param sinkToTopicMappingBuilder
103115
* utility to get access to the mapping between sinks and Kafka topics
104-
* @param dlqMetadataHandler
105-
* the enricher of metadata before sending message to the dead letter queue
106-
* @param metrics
107-
* container of all metrics of the framework
108116
* @param kStreamsProcessorConfig
109117
* It contains the configuration for the error strategy configuration property value (default
110118
* {@link ErrorHandlingStrategy#CONTINUE})
111119
* and the configuration Kafka topic to use for dead letter queue (optional)
120+
* @param dlqDelegate
121+
* delegate that handles DLQ production with properly configured producer
122+
* @param configuration
123+
* the configuration of the kafka streams processor
112124
*/
113125
@Inject
114126
public DlqDecorator(
115-
SinkToTopicMappingBuilder sinkToTopicMappingBuilder, DlqMetadataHandler dlqMetadataHandler,
116-
KafkaStreamsProcessorMetrics metrics,
117-
KStreamsProcessorConfig kStreamsProcessorConfig) { // NOSONAR Optional with microprofile-config
118-
this(sinkToTopicMappingBuilder.sinkToTopicMapping().keySet(), dlqMetadataHandler, metrics,
127+
SinkToTopicMappingBuilder sinkToTopicMappingBuilder,
128+
KStreamsProcessorConfig kStreamsProcessorConfig,
129+
DlqProducerService dlqDelegate,
130+
Configuration configuration) { // NOSONAR Optional with microprofile-config
131+
this(sinkToTopicMappingBuilder.sinkToTopicMapping().keySet(),
119132
ErrorHandlingStrategy.shouldSendToDlq(kStreamsProcessorConfig.errorStrategy(),
120-
kStreamsProcessorConfig.dlq().topic()));
133+
kStreamsProcessorConfig.dlq().topic()),
134+
dlqDelegate, configuration, kStreamsProcessorConfig);
121135
}
122136

123137
/**
@@ -158,11 +172,30 @@ public void process(Record record) {
158172
} catch (RuntimeException e) { // NOSONAR
159173
Optional<RecordMetadata> recordMetadata = context.recordMetadata();
160174
if (recordMetadata.isPresent()) {
161-
dlqMetadataHandler.addMetadata(record.headers(), recordMetadata.get().topic(),
162-
recordMetadata.get().partition(), e);
163-
context.forward(record, TopologyProducer.DLQ_SINK_NAME);
164-
// Re-throw so the exception gets logged
165-
metrics.microserviceDlqSentCounter().increment();
175+
Serializer<Object> keySerializer = (Serializer<Object>) configuration.getSourceKeySerde().serializer();
176+
Serializer<Object> valueSerializer = (Serializer<Object>) getSourceValueSerializer(configuration,
177+
kStreamsProcessorConfig);
178+
byte[] serializedKey = keySerializer.serialize(recordMetadata.get().topic(), record.headers(),
179+
record.key());
180+
byte[] serializedValue = valueSerializer.serialize(recordMetadata.get().topic(), record.headers(),
181+
record.value());
182+
183+
ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(
184+
recordMetadata.get().topic(),
185+
recordMetadata.get().partition(),
186+
recordMetadata.get().offset(),
187+
record.timestamp(),
188+
CREATE_TIME,
189+
serializedKey != null ? serializedKey.length : 0,
190+
serializedValue != null ? serializedValue.length : 0,
191+
serializedKey,
192+
serializedValue,
193+
record.headers(),
194+
Optional.empty());
195+
196+
dlqDelegate.sendToDlq(consumerRecord, e, context.taskId(), false);
197+
198+
// throw exception to fail OpenTelemetry span
166199
throw e;
167200
}
168201
}
@@ -171,6 +204,17 @@ public void process(Record record) {
171204
}
172205
}
173206

207+
Serializer<?> getSourceValueSerializer(Configuration configuration,
208+
KStreamsProcessorConfig kStreamsProcessorConfig) {
209+
// defaulting to CloudEventSerializer if kafkastreamsprocessor.input.is-cloud-event is true
210+
if (kStreamsProcessorConfig.input().isCloudEvent()) {
211+
CloudEventSerializer serializer = new CloudEventSerializer();
212+
serializer.configure(kStreamsProcessorConfig.dlq().cloudEventSerializerConfig(), false);
213+
return serializer;
214+
}
215+
return configuration.getSourceValueSerde().serializer();
216+
}
217+
174218
@RequiredArgsConstructor(access = AccessLevel.MODULE)
175219
static final class DlqProcessorContextDecorator<KOut, VOut> implements InternalProcessorContext<KOut, VOut> {
176220

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*-
2+
* #%L
3+
* Quarkus Kafka Streams Processor
4+
* %%
5+
* Copyright (C) 2024 Amadeus s.a.s.
6+
* %%
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* #L%
19+
*/
20+
package io.quarkiverse.kafkastreamsprocessor.impl.errors;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
import jakarta.annotation.PreDestroy;
26+
import jakarta.enterprise.context.ApplicationScoped;
27+
import jakarta.inject.Inject;
28+
29+
import org.apache.kafka.clients.consumer.ConsumerRecord;
30+
import org.apache.kafka.clients.producer.Producer;
31+
import org.apache.kafka.clients.producer.ProducerRecord;
32+
import org.apache.kafka.common.Configurable;
33+
import org.apache.kafka.streams.KafkaClientSupplier;
34+
import org.apache.kafka.streams.processor.TaskId;
35+
36+
import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor;
37+
import io.quarkiverse.kafkastreamsprocessor.impl.KafkaClientSupplierDecorator;
38+
import io.quarkiverse.kafkastreamsprocessor.impl.metrics.KafkaStreamsProcessorMetrics;
39+
import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;
40+
import lombok.extern.slf4j.Slf4j;
41+
42+
/**
43+
* Service responsible for producing records to the Dead Letter Queue (DLQ) topic in Kafka Streams Processor.
44+
* <p>
45+
* This class manages the lifecycle and configuration of the DLQ producer, enriches records with metadata,
46+
* and tracks metrics for DLQ operations.
47+
*/
48+
@Slf4j
49+
@ApplicationScoped
50+
public class DlqProducerService implements Configurable {
51+
52+
/**
53+
* Kafka producer supplier for this framework
54+
*/
55+
private final KafkaClientSupplier clientSupplier;
56+
57+
/**
58+
* Metrics container for this framework
59+
*/
60+
private final KafkaStreamsProcessorMetrics metrics;
61+
62+
/**
63+
* Tool object that enriches the metadata of messages before sending them to the microservice's specific DLQ.
64+
*/
65+
private final DlqMetadataHandler dlqMetadataHandler;
66+
67+
/**
68+
* The class containing all the configuration related to kafka stream processor
69+
*/
70+
private final KStreamsProcessorConfig kStreamsProcessorConfig;
71+
72+
/** Producer for the dlq topic */
73+
private Producer<byte[], byte[]> dlqProducer;
74+
75+
/** True if the dead letter queue strategy is selected and properly configured */
76+
boolean sendToDlq;
77+
78+
/**
79+
* Injection constructor
80+
*
81+
* @param kafkaClientSupplier a supplier of {@link org.apache.kafka.clients.producer.KafkaProducer}
82+
* @param metrics the metrics container of this framework
83+
* @param dlqMetadataHandler tool to enrich message metadata before sending them to the microservice's DLQ
84+
* the configuration error strategy for the application. See { @link {@link ErrorHandlingStrategy}
85+
* @param kStreamsProcessorConfig The configuration related to kafka processor
86+
*/
87+
@Inject
88+
public DlqProducerService(KafkaClientSupplier kafkaClientSupplier,
89+
KafkaStreamsProcessorMetrics metrics,
90+
DlqMetadataHandler dlqMetadataHandler,
91+
KStreamsProcessorConfig kStreamsProcessorConfig) {
92+
this.clientSupplier = kafkaClientSupplier;
93+
this.metrics = metrics;
94+
this.dlqMetadataHandler = dlqMetadataHandler;
95+
this.kStreamsProcessorConfig = kStreamsProcessorConfig;
96+
}
97+
98+
public void sendToDlq(final ConsumerRecord<byte[], byte[]> record, final Exception exception, TaskId taskId,
99+
Boolean isDeserializationException) {
100+
metrics.microserviceDlqSentCounter().increment();
101+
log.error("Exception caught during {}, sending to the dead letter queue topic; " +
102+
"taskId: {}, topic: {}, partition: {}, offset: {}",
103+
isDeserializationException ? "deserialization" : "processing",
104+
taskId, record.topic(), record.partition(), record.offset(),
105+
exception);
106+
107+
dlqProducer.send(new ProducerRecord<>(kStreamsProcessorConfig.dlq().topic().get(), null, record.timestamp(),
108+
record.key(), record.value(),
109+
dlqMetadataHandler.withMetadata(record.headers(), record.topic(), record.partition(), exception)));
110+
}
111+
112+
/**
113+
* If DLQ is active, it initializes a {@link Producer} for the DLQ with the
114+
* {@link KafkaClientSupplierDecorator#DLQ_PRODUCER} flag
115+
* so it is only decorated with {@link ProducerOnSendInterceptor} that have
116+
* {@link ProducerOnSendInterceptor#skipForDLQ()} returning <code>false</code>.
117+
* <p>
118+
* <b>Original documentation:</b>
119+
* <p>
120+
* {@inheritDoc}
121+
*/
122+
@Override
123+
public void configure(final Map<String, ?> configs) {
124+
// Resolve the DLQ strategy once to fail fast in case of misconfiguration
125+
sendToDlq = ErrorHandlingStrategy.shouldSendToDlq(kStreamsProcessorConfig.errorStrategy(),
126+
kStreamsProcessorConfig.dlq().topic());
127+
if (sendToDlq) {
128+
Map<String, Object> dlqConfigMap = new HashMap<>(configs);
129+
dlqConfigMap.put(KafkaClientSupplierDecorator.DLQ_PRODUCER, true);
130+
dlqProducer = new LogCallbackExceptionProducerDecorator(clientSupplier.getProducer(dlqConfigMap));
131+
}
132+
}
133+
134+
@PreDestroy
135+
void close() {
136+
if (dlqProducer != null) {
137+
dlqProducer.close(GlobalDLQProductionExceptionHandlerDelegate.GRACEFUL_PERIOD);
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)