Skip to content

Commit efcd612

Browse files
authored
Merge pull request #44526 from ozangunalp/serde_detection_instance_channel
Support Serde detection for Instance injection of Messaging channels
2 parents 69c0b8e + c858117 commit efcd612

File tree

6 files changed

+80
-20
lines changed

6 files changed

+80
-20
lines changed

extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
final class DotNames {
66
// @formatter:off
7+
static final DotName INSTANCE = DotName.createSimple(jakarta.enterprise.inject.Instance.class.getName());
8+
static final DotName INJECTABLE_INSTANCE = DotName.createSimple(io.quarkus.arc.InjectableInstance.class.getName());
9+
static final DotName PROVIDER = DotName.createSimple(jakarta.inject.Provider.class.getName());
710
static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName());
811
static final DotName INCOMINGS = DotName.createSimple(io.smallrye.reactive.messaging.annotations.Incomings.class.getName());
912
static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName());

extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.jboss.jandex.DotName;
2121
import org.jboss.jandex.IndexView;
2222
import org.jboss.jandex.MethodInfo;
23-
import org.jboss.jandex.MethodParameterInfo;
2423
import org.jboss.jandex.Type;
2524
import org.jboss.logging.Logger;
2625

@@ -354,15 +353,19 @@ private void processIncomingType(DefaultSerdeDiscoveryState discovery,
354353
}
355354

356355
private Type getInjectionPointType(AnnotationInstance annotation) {
357-
switch (annotation.target().kind()) {
358-
case FIELD:
359-
return annotation.target().asField().type();
360-
case METHOD_PARAMETER:
361-
MethodParameterInfo parameter = annotation.target().asMethodParameter();
362-
return parameter.method().parameterType(parameter.position());
363-
default:
364-
return null;
365-
}
356+
return switch (annotation.target().kind()) {
357+
case FIELD -> handleInstanceChannelInjection(annotation.target().asField().type());
358+
case METHOD_PARAMETER -> handleInstanceChannelInjection(annotation.target().asMethodParameter().type());
359+
default -> null;
360+
};
361+
}
362+
363+
private Type handleInstanceChannelInjection(Type type) {
364+
return (DotNames.INSTANCE.equals(type.name())
365+
|| DotNames.PROVIDER.equals(type.name())
366+
|| DotNames.INJECTABLE_INSTANCE.equals(type.name()))
367+
? type.asParameterizedType().arguments().get(0)
368+
: type;
366369
}
367370

368371
private void handleAdditionalProperties(String channelName, boolean incoming, DefaultSerdeDiscoveryState discovery,

extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.concurrent.CompletionStage;
1414
import java.util.function.Function;
1515

16+
import jakarta.enterprise.inject.Instance;
1617
import jakarta.inject.Inject;
1718

1819
import org.apache.avro.generic.GenericRecord;
@@ -41,6 +42,7 @@
4142
import org.reactivestreams.Publisher;
4243
import org.reactivestreams.Subscriber;
4344

45+
import io.quarkus.arc.InjectableInstance;
4446
import io.quarkus.commons.classloading.ClassLoaderHelper;
4547
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
4648
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
@@ -111,6 +113,7 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> list, boolean in
111113

112114
assertThat(configs)
113115
.extracting(RunTimeConfigurationDefaultBuildItem::getKey, RunTimeConfigurationDefaultBuildItem::getValue)
116+
.hasSize(expectations.length)
114117
.allSatisfy(tuple -> {
115118
Object[] e = tuple.toArray();
116119
String key = (String) e[0];
@@ -3048,5 +3051,26 @@ private static class ChannelChildSerializer {
30483051
Multi<JsonbDto> channel2;
30493052
}
30503053

3054+
@Test
3055+
void instanceInjectionPoint() {
3056+
Tuple[] expectations = {
3057+
tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
3058+
tuple("mp.messaging.incoming.channel2.value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"),
3059+
tuple("mp.messaging.outgoing.channel3.value.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"),
3060+
};
3061+
doTest(expectations, InstanceInjectionPoint.class);
3062+
}
3063+
3064+
private static class InstanceInjectionPoint {
3065+
@Channel("channel1")
3066+
Instance<Emitter<String>> emitter1;
3067+
3068+
@Channel("channel2")
3069+
Instance<Multi<Integer>> channel2;
3070+
3071+
@Channel("channel3")
3072+
InjectableInstance<MutinyEmitter<Double>> channel3;
3073+
}
3074+
30513075

30523076
}

extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
final class DotNames {
66
// @formatter:off
7+
static final DotName INSTANCE = DotName.createSimple(jakarta.enterprise.inject.Instance.class.getName());
8+
static final DotName INJECTABLE_INSTANCE = DotName.createSimple(io.quarkus.arc.InjectableInstance.class.getName());
9+
static final DotName PROVIDER = DotName.createSimple(jakarta.inject.Provider.class.getName());
710
static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName());
811
static final DotName INCOMINGS = DotName.createSimple(io.smallrye.reactive.messaging.annotations.Incomings.class.getName());
912
static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName());

extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.jboss.jandex.AnnotationInstance;
1111
import org.jboss.jandex.DotName;
1212
import org.jboss.jandex.MethodInfo;
13-
import org.jboss.jandex.MethodParameterInfo;
1413
import org.jboss.jandex.Type;
1514
import org.jboss.logging.Logger;
1615

@@ -144,15 +143,19 @@ private static String incomingSchemaKey(String channelName) {
144143
}
145144

146145
private Type getInjectionPointType(AnnotationInstance annotation) {
147-
switch (annotation.target().kind()) {
148-
case FIELD:
149-
return annotation.target().asField().type();
150-
case METHOD_PARAMETER:
151-
MethodParameterInfo parameter = annotation.target().asMethodParameter();
152-
return parameter.method().parameterType(parameter.position());
153-
default:
154-
return null;
155-
}
146+
return switch (annotation.target().kind()) {
147+
case FIELD -> handleInstanceChannelInjection(annotation.target().asField().type());
148+
case METHOD_PARAMETER -> handleInstanceChannelInjection(annotation.target().asMethodParameter().type());
149+
default -> null;
150+
};
151+
}
152+
153+
private Type handleInstanceChannelInjection(Type type) {
154+
return (DotNames.INSTANCE.equals(type.name())
155+
|| DotNames.PROVIDER.equals(type.name())
156+
|| DotNames.INJECTABLE_INSTANCE.equals(type.name()))
157+
? type.asParameterizedType().arguments().get(0)
158+
: type;
156159
}
157160

158161
private void produceRuntimeConfigurationDefaultBuildItem(DefaultSchemaDiscoveryState discovery,

extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
import java.util.concurrent.CompletionStage;
1616
import java.util.function.Supplier;
1717

18+
import jakarta.enterprise.inject.Instance;
1819
import jakarta.enterprise.inject.Produces;
1920
import jakarta.inject.Inject;
21+
import jakarta.inject.Provider;
2022

2123
import org.apache.avro.specific.AvroGenerated;
2224
import org.apache.pulsar.client.api.Messages;
@@ -40,6 +42,7 @@
4042
import org.reactivestreams.Publisher;
4143
import org.reactivestreams.Subscriber;
4244

45+
import io.quarkus.arc.InjectableInstance;
4346
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
4447
import io.quarkus.commons.classloading.ClassLoaderHelper;
4548
import io.quarkus.deployment.annotations.BuildProducer;
@@ -2108,5 +2111,26 @@ Multi<GenericPayload<Long>> method4() {
21082111
}
21092112
}
21102113

2114+
@Test
2115+
void instanceInjectionPoint() {
2116+
Tuple[] expectations = {
2117+
tuple("mp.messaging.outgoing.channel1.schema", "STRING"),
2118+
tuple("mp.messaging.incoming.channel2.schema", "INT32"),
2119+
tuple("mp.messaging.outgoing.channel3.schema", "DOUBLE"),
2120+
};
2121+
doTest(expectations, InstanceInjectionPoint.class);
2122+
}
2123+
2124+
private static class InstanceInjectionPoint {
2125+
@Channel("channel1")
2126+
Instance<Emitter<String>> emitter1;
2127+
2128+
@Channel("channel2")
2129+
Provider<Multi<Integer>> channel2;
2130+
2131+
@Channel("channel3")
2132+
InjectableInstance<MutinyEmitter<Double>> channel3;
2133+
}
2134+
21112135

21122136
}

0 commit comments

Comments
 (0)