Skip to content

Commit 9485e07

Browse files
committed
Kafka serde discovery generates serializer for dead letter queue failure strategy
Resolves quarkusio#34931
1 parent 6be99a4 commit 9485e07

File tree

2 files changed

+145
-26
lines changed

2 files changed

+145
-26
lines changed

extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,16 @@ static boolean hasStateStoreConfig(String stateStoreName, Config config) {
9191
return stateStores.contains(stateStoreName);
9292
}
9393

94+
static boolean hasDLQConfig(String channelName, Config config) {
95+
String propertyKey = getChannelPropertyKey(channelName, "failure-strategy", true);
96+
Optional<String> channelFailureStrategy = config.getOptionalValue(propertyKey, String.class);
97+
Optional<String> failureStrategy = channelFailureStrategy.or(() -> getConnectorProperty("failure-strategy", config));
98+
99+
return failureStrategy.isPresent()
100+
&& (failureStrategy.get().equals("dead-letter-queue")
101+
|| failureStrategy.get().equals("delayed-retry-topic"));
102+
}
103+
94104
private static Optional<String> getConnectorProperty(String keySuffix, Config config) {
95105
return config.getOptionalValue("mp.messaging.connector." + KafkaConnector.CONNECTOR_NAME + "." + keySuffix,
96106
String.class);
@@ -207,8 +217,8 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
207217
BuildProducer<RunTimeConfigurationDefaultBuildItem> config,
208218
BuildProducer<GeneratedClassBuildItem> generatedClass,
209219
BuildProducer<ReflectiveClassBuildItem> reflection) {
210-
Map<String, String> alreadyGeneratedSerializers = new HashMap<>();
211-
Map<String, String> alreadyGeneratedDeserializers = new HashMap<>();
220+
Map<String, Result> alreadyGeneratedSerializers = new HashMap<>();
221+
Map<String, Result> alreadyGeneratedDeserializers = new HashMap<>();
212222
for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.INCOMING)) {
213223
String channelName = annotation.value().asString();
214224
if (!discovery.isKafkaConnector(channelsManagedByConnectors, true, channelName)) {
@@ -220,7 +230,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
220230
Type incomingType = getIncomingTypeFromMethod(method);
221231

222232
processIncomingType(discovery, config, incomingType, channelName, generatedClass, reflection,
223-
alreadyGeneratedDeserializers);
233+
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);
224234
}
225235

226236
for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING)) {
@@ -257,7 +267,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
257267
Type incomingType = getIncomingTypeFromChannelInjectionPoint(injectionPointType);
258268

259269
processIncomingType(discovery, config, incomingType, channelName, generatedClass, reflection,
260-
alreadyGeneratedDeserializers);
270+
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);
261271

262272
processKafkaTransactions(discovery, config, channelName, injectionPointType);
263273

@@ -293,11 +303,12 @@ private void processKafkaTransactions(DefaultSerdeDiscoveryState discovery,
293303
private void processIncomingType(DefaultSerdeDiscoveryState discovery,
294304
BuildProducer<RunTimeConfigurationDefaultBuildItem> config, Type incomingType, String channelName,
295305
BuildProducer<GeneratedClassBuildItem> generatedClass, BuildProducer<ReflectiveClassBuildItem> reflection,
296-
Map<String, String> alreadyGeneratedDeserializers) {
306+
Map<String, Result> alreadyGeneratedDeserializers, Map<String, Result> alreadyGeneratedSerializers) {
297307
extractKeyValueType(incomingType, (key, value, isBatchType) -> {
298-
Result keyDeserializer = deserializerFor(discovery, key, generatedClass, reflection, alreadyGeneratedDeserializers);
299-
Result valueDeserializer = deserializerFor(discovery, value, generatedClass, reflection,
300-
alreadyGeneratedDeserializers);
308+
Result keyDeserializer = deserializerFor(discovery, key, true, channelName, generatedClass, reflection,
309+
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);
310+
Result valueDeserializer = deserializerFor(discovery, value, false, channelName, generatedClass, reflection,
311+
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);
301312

302313
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
303314
getChannelPropertyKey(channelName, "key.deserializer", true), keyDeserializer);
@@ -494,7 +505,7 @@ private Type getOutgoingTypeFromChannelInjectionPoint(Type injectionPointType) {
494505

495506
private void processOutgoingType(DefaultSerdeDiscoveryState discovery, Type outgoingType,
496507
BiConsumer<Result, Result> serializerAcceptor, BuildProducer<GeneratedClassBuildItem> generatedClass,
497-
BuildProducer<ReflectiveClassBuildItem> reflection, Map<String, String> alreadyGeneratedSerializer) {
508+
BuildProducer<ReflectiveClassBuildItem> reflection, Map<String, Result> alreadyGeneratedSerializer) {
498509
extractKeyValueType(outgoingType, (key, value, isBatch) -> {
499510
Result keySerializer = serializerFor(discovery, key, generatedClass, reflection,
500511
alreadyGeneratedSerializer);
@@ -766,10 +777,14 @@ private static boolean isRawMessage(Type type) {
766777
);
767778
// @formatter:on
768779

769-
private Result deserializerFor(DefaultSerdeDiscoveryState discovery, Type type,
780+
private Result deserializerFor(DefaultSerdeDiscoveryState discovery,
781+
Type type,
782+
boolean key,
783+
String channelName,
770784
BuildProducer<GeneratedClassBuildItem> generatedClass,
771785
BuildProducer<ReflectiveClassBuildItem> reflection,
772-
Map<String, String> alreadyGeneratedSerializers) {
786+
Map<String, Result> alreadyGeneratedDeserializers,
787+
Map<String, Result> alreadyGeneratedSerializers) {
773788
Result result = serializerDeserializerFor(discovery, type, false);
774789
if (result != null && !result.exists) {
775790
// avoid returning Result.nonexistent() to callers, they expect a non-null Result to always be known
@@ -779,24 +794,34 @@ private Result deserializerFor(DefaultSerdeDiscoveryState discovery, Type type,
779794
// also, only generate the serializer/deserializer for classes and only generate once
780795
if (result == null && type != null && generatedClass != null && type.kind() == Type.Kind.CLASS) {
781796
// Check if already generated
782-
String clazz = alreadyGeneratedSerializers.get(type.toString());
783-
if (clazz == null) {
784-
clazz = JacksonSerdeGenerator.generateDeserializer(generatedClass, type);
797+
result = alreadyGeneratedDeserializers.get(type.toString());
798+
if (result == null) {
799+
String clazz = JacksonSerdeGenerator.generateDeserializer(generatedClass, type);
785800
LOGGER.infof("Generating Jackson deserializer for type %s", type.name().toString());
786801
// Deserializers are access by reflection.
787802
reflection.produce(
788803
ReflectiveClassBuildItem.builder(clazz).methods().build());
789-
alreadyGeneratedSerializers.put(type.toString(), clazz);
804+
alreadyGeneratedDeserializers.put(type.toString(), result);
805+
// if the channel has a DLQ config generate a serializer as well
806+
if (hasDLQConfig(channelName, discovery.getConfig())) {
807+
Result serializer = serializerFor(discovery, type, generatedClass, reflection, alreadyGeneratedSerializers);
808+
if (serializer != null) {
809+
result = Result.of(clazz)
810+
.with(key, "dead-letter-queue.key.serializer", serializer.value)
811+
.with(!key, "dead-letter-queue.value.serializer", serializer.value);
812+
}
813+
} else {
814+
result = Result.of(clazz);
815+
}
790816
}
791-
result = Result.of(clazz);
792817
}
793818
return result;
794819
}
795820

796821
private Result serializerFor(DefaultSerdeDiscoveryState discovery, Type type,
797822
BuildProducer<GeneratedClassBuildItem> generatedClass,
798823
BuildProducer<ReflectiveClassBuildItem> reflection,
799-
Map<String, String> alreadyGeneratedSerializers) {
824+
Map<String, Result> alreadyGeneratedSerializers) {
800825
Result result = serializerDeserializerFor(discovery, type, true);
801826
if (result != null && !result.exists) {
802827
// avoid returning Result.nonexistent() to callers, they expect a non-null Result to always be known
@@ -806,16 +831,16 @@ private Result serializerFor(DefaultSerdeDiscoveryState discovery, Type type,
806831
// also, only generate the serializer/deserializer for classes and only generate once
807832
if (result == null && type != null && generatedClass != null && type.kind() == Type.Kind.CLASS) {
808833
// Check if already generated
809-
String clazz = alreadyGeneratedSerializers.get(type.toString());
810-
if (clazz == null) {
811-
clazz = JacksonSerdeGenerator.generateSerializer(generatedClass, type);
834+
result = alreadyGeneratedSerializers.get(type.toString());
835+
if (result == null) {
836+
String clazz = JacksonSerdeGenerator.generateSerializer(generatedClass, type);
812837
LOGGER.infof("Generating Jackson serializer for type %s", type.name().toString());
813838
// Serializers are access by reflection.
814839
reflection.produce(
815840
ReflectiveClassBuildItem.builder(clazz).methods().build());
816-
alreadyGeneratedSerializers.put(type.toString(), clazz);
841+
result = Result.of(clazz);
842+
alreadyGeneratedSerializers.put(type.toString(), result);
817843
}
818-
result = Result.of(clazz);
819844
}
820845

821846
return result;

extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java

Lines changed: 98 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.List;
1212
import java.util.Map;
1313
import java.util.concurrent.CompletionStage;
14+
import java.util.function.Function;
1415

1516
import jakarta.inject.Inject;
1617

@@ -22,6 +23,7 @@
2223
import org.apache.kafka.common.header.Headers;
2324
import org.apache.kafka.common.serialization.Deserializer;
2425
import org.apache.kafka.common.serialization.Serializer;
26+
import org.assertj.core.api.Assert;
2527
import org.assertj.core.groups.Tuple;
2628
import org.eclipse.microprofile.config.Config;
2729
import org.eclipse.microprofile.config.spi.ConfigProviderResolver;
@@ -40,7 +42,9 @@
4042
import org.reactivestreams.Publisher;
4143
import org.reactivestreams.Subscriber;
4244

45+
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
4346
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
47+
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
4448
import io.quarkus.kafka.client.serialization.JsonbSerializer;
4549
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
4650
import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem;
@@ -63,7 +67,16 @@ private static void doTest(Tuple[] expectations, Class<?>... classesToIndex) {
6367
}
6468

6569
private static void doTest(Config customConfig, Tuple[] expectations, Class<?>... classesToIndex) {
70+
doTest(customConfig, expectations, Collections.emptyList(), Collections.emptyList(), classesToIndex);
71+
}
72+
73+
@SuppressWarnings({ "unchecked", "rawtypes" })
74+
private static void doTest(Config customConfig, Tuple[] expectations,
75+
List<Function<String, Assert>> generatedNames,
76+
List<Function<String, Assert>> reflectiveNames, Class<?>... classesToIndex) {
6677
List<RunTimeConfigurationDefaultBuildItem> configs = new ArrayList<>();
78+
List<GeneratedClassBuildItem> generated = new ArrayList<>();
79+
List<ReflectiveClassBuildItem> reflective = new ArrayList<>();
6780

6881
List<Class<?>> classes = new ArrayList<>(Arrays.asList(classesToIndex));
6982
classes.add(Incoming.class);
@@ -81,11 +94,35 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> list, boolean in
8194
};
8295
try {
8396
new SmallRyeReactiveMessagingKafkaProcessor().discoverDefaultSerdeConfig(discovery, Collections.emptyList(),
84-
configs::add, null, null);
97+
configs::add,
98+
(generatedNames == null) ? null : generated::add,
99+
(reflectiveNames == null) ? null : reflective::add);
85100

86101
assertThat(configs)
87102
.extracting(RunTimeConfigurationDefaultBuildItem::getKey, RunTimeConfigurationDefaultBuildItem::getValue)
88-
.containsExactlyInAnyOrder(expectations);
103+
.allSatisfy(tuple -> {
104+
Object[] e = tuple.toArray();
105+
String key = (String) e[0];
106+
String value = (String) e[1];
107+
assertThat(Arrays.stream(expectations).filter(t -> key.equals(t.toArray()[0])))
108+
.hasSize(1)
109+
.satisfiesOnlyOnce(t -> {
110+
Object o = t.toArray()[1];
111+
if (o instanceof String) {
112+
assertThat(value).isEqualTo((String) o);
113+
} else {
114+
((Function<String, Assert>) o).apply(value);
115+
}
116+
});
117+
});
118+
119+
assertThat(generated)
120+
.extracting(GeneratedClassBuildItem::getName)
121+
.allSatisfy(s -> assertThat(generatedNames).satisfiesOnlyOnce(c -> c.apply(s)));
122+
123+
assertThat(reflective)
124+
.flatExtracting(ReflectiveClassBuildItem::getClassNames)
125+
.allSatisfy(s -> assertThat(reflectiveNames).satisfiesOnlyOnce(c -> c.apply(s)));
89126
} finally {
90127
// must not leak the Config instance associated to the system classloader
91128
if (customConfig == null) {
@@ -94,6 +131,14 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> list, boolean in
94131
}
95132
}
96133

134+
Function<String, Assert> assertMatches(String regex) {
135+
return s -> assertThat(s).matches(regex);
136+
}
137+
138+
Function<String, Assert> assertStartsWith(String starts) {
139+
return s -> assertThat(s).startsWith(starts);
140+
}
141+
97142
private static IndexView index(List<Class<?>> classes) {
98143
Indexer indexer = new Indexer();
99144
for (Class<?> clazz : classes) {
@@ -2570,11 +2615,14 @@ public void genericSerdeImplementationAutoDetect() {
25702615

25712616
Tuple[] expectations1 = {
25722617
tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
2573-
25742618
tuple("mp.messaging.incoming.channel2.key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"),
2619+
tuple("mp.messaging.incoming.channel3.value.deserializer", assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
25752620
tuple("mp.messaging.incoming.channel2.value.deserializer", "io.quarkus.kafka.client.serialization.JsonObjectDeserializer"),
25762621
};
25772622

2623+
var generated1 = List.of(assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Deserializer_"));
2624+
var reflective1 = List.of(assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_"));
2625+
25782626
Tuple[] expectations2 = {
25792627
tuple("mp.messaging.outgoing.channel1.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$MySerializer"),
25802628

@@ -2602,7 +2650,7 @@ public void genericSerdeImplementationAutoDetect() {
26022650
};
26032651
// @formatter:on
26042652

2605-
doTest(expectations1, CustomSerdeImplementation.class, CustomDto.class);
2653+
doTest(null, expectations1, generated1, reflective1, CustomSerdeImplementation.class, CustomDto.class);
26062654

26072655
doTest(expectations2, CustomSerdeImplementation.class, CustomDto.class,
26082656
MySerializer.class,
@@ -2795,5 +2843,51 @@ void method1(KafkaRecord<Integer, String> msg) {
27952843

27962844
}
27972845

2846+
@Test
2847+
void deadLetterQueue() {
2848+
Tuple[] expectations = {
2849+
tuple("mp.messaging.incoming.channel1.value.deserializer",
2850+
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
2851+
tuple("mp.messaging.incoming.channel1.dead-letter-queue.value.serializer",
2852+
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")),
2853+
tuple("mp.messaging.incoming.channel2.key.deserializer",
2854+
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
2855+
tuple("mp.messaging.incoming.channel2.value.deserializer",
2856+
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
2857+
tuple("mp.messaging.incoming.channel2.dead-letter-queue.key.serializer",
2858+
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")),
2859+
tuple("mp.messaging.incoming.channel2.dead-letter-queue.value.serializer",
2860+
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")),
2861+
};
2862+
var generated = List.of(
2863+
assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Deserializer_"),
2864+
assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Serializer_")
2865+
);
2866+
var reflective = List.of(
2867+
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_"),
2868+
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")
2869+
);
2870+
doTest(new SmallRyeConfigBuilder()
2871+
.withSources(new MapBackedConfigSource("test", Map.of(
2872+
"mp.messaging.incoming.channel1.failure-strategy", "dead-letter-queue",
2873+
"mp.messaging.incoming.channel2.failure-strategy", "delayed-retry-topic")) {
2874+
})
2875+
.build(), expectations, generated, reflective, DeadLetterQueue.class);
2876+
}
2877+
2878+
private static class DeadLetterQueue {
2879+
2880+
@Incoming("channel1")
2881+
void method1(CustomDto msg) {
2882+
2883+
}
2884+
2885+
@Incoming("channel2")
2886+
void method2(Record<CustomDto, CustomDto> msg) {
2887+
2888+
}
2889+
2890+
}
2891+
27982892

27992893
}

0 commit comments

Comments
 (0)