Skip to content

Commit 1257da3

Browse files
committed
Vert.x EventBus: use codec selector for non-concrete types
- fixes quarkusio#36172
1 parent f64aaf4 commit 1257da3

File tree

7 files changed

+163
-39
lines changed

7 files changed

+163
-39
lines changed

docs/src/main/asciidoc/vertx-reference.adoc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -712,9 +712,11 @@ Read xref:./virtual-threads.adoc[the virtual thread guide] for more details.
712712

713713
=== Use codecs
714714

715-
The https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x Event Bus] uses codecs to _serialize_ and _deserialize_ objects.
715+
The https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x Event Bus] uses https://vertx.io/docs/vertx-core/java/#_message_codecs[codecs] to _serialize_ and _deserialize_ message objects.
716716
Quarkus provides a default codec for local delivery.
717-
So you can exchange objects as follows:
717+
This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with `@ConsumeEvent` whete `ConsumeEvent#local() == true` (which is the default).
718+
719+
So that you can exchange the message objects as follows:
718720

719721
[source, java]
720722
----

extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusCodecProcessor.java

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
import static io.quarkus.vertx.deployment.VertxConstants.LOCAL_EVENT_BUS_CODEC;
66
import static io.quarkus.vertx.deployment.VertxConstants.UNI;
77

8+
import java.lang.reflect.Modifier;
89
import java.util.Arrays;
910
import java.util.Collection;
1011
import java.util.HashMap;
12+
import java.util.HashSet;
1113
import java.util.List;
1214
import java.util.Map;
1315
import java.util.Set;
@@ -17,11 +19,13 @@
1719
import org.jboss.jandex.AnnotationInstance;
1820
import org.jboss.jandex.AnnotationTarget;
1921
import org.jboss.jandex.AnnotationValue;
22+
import org.jboss.jandex.ClassInfo;
2023
import org.jboss.jandex.DotName;
2124
import org.jboss.jandex.IndexView;
2225
import org.jboss.jandex.MethodInfo;
2326
import org.jboss.jandex.ParameterizedType;
2427
import org.jboss.jandex.Type;
28+
import org.jboss.jandex.Type.Kind;
2529
import org.jboss.logging.Logger;
2630

2731
import io.quarkus.arc.deployment.BeanArchiveIndexBuildItem;
@@ -46,51 +50,71 @@ public void registerCodecs(
4650
BeanArchiveIndexBuildItem beanArchiveIndexBuildItem,
4751
CombinedIndexBuildItem combinedIndex,
4852
BuildProducer<MessageCodecBuildItem> messageCodecs,
49-
BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
53+
BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
54+
BuildProducer<LocalCodecSelectorTypesBuildItem> localCodecSelectorTypes) {
5055

5156
final IndexView index = beanArchiveIndexBuildItem.getIndex();
5257
Collection<AnnotationInstance> consumeEventAnnotationInstances = index.getAnnotations(CONSUME_EVENT);
5358
Map<DotName, DotName> codecByTypes = new HashMap<>();
59+
Set<DotName> selectorTypes = new HashSet<>();
60+
5461
for (AnnotationInstance consumeEventAnnotationInstance : consumeEventAnnotationInstances) {
5562
AnnotationTarget typeTarget = consumeEventAnnotationInstance.target();
5663
if (typeTarget.kind() != AnnotationTarget.Kind.METHOD) {
57-
throw new UnsupportedOperationException("@ConsumeEvent annotation must target a method");
64+
throw new IllegalStateException("@ConsumeEvent annotation must target a method");
5865
}
59-
66+
AnnotationValue local = consumeEventAnnotationInstance.value("local");
67+
boolean isLocal = local == null || local.asBoolean();
6068
MethodInfo method = typeTarget.asMethod();
61-
Type codecTargetFromReturnType = extractPayloadTypeFromReturn(method);
62-
Type codecTargetFromParameter = extractPayloadTypeFromParameter(method);
6369

70+
Type codecTargetFromParameter = extractPayloadTypeFromParameter(method);
6471
// If the @ConsumeEvent set the codec, use this codec. It applies to the parameter
6572
AnnotationValue codec = consumeEventAnnotationInstance.value("codec");
6673
if (codec != null && codec.asClass().kind() == Type.Kind.CLASS) {
6774
if (codecTargetFromParameter == null) {
6875
throw new IllegalStateException("Invalid `codec` argument in @ConsumeEvent - no parameter");
6976
}
7077
codecByTypes.put(codecTargetFromParameter.name(), codec.asClass().asClassType().name());
71-
} else if (codecTargetFromParameter != null) {
72-
// Codec is not set, check if we have a built-in codec
73-
if (!hasBuiltInCodec(codecTargetFromParameter)) {
74-
// Ensure local delivery.
75-
AnnotationValue local = consumeEventAnnotationInstance.value("local");
76-
if (local != null && !local.asBoolean()) {
77-
throw new UnsupportedOperationException(
78-
"The generic message codec can only be used for local delivery,"
79-
+ ", implement your own event bus codec for " + codecTargetFromParameter.name()
80-
.toString());
81-
} else if (!codecByTypes.containsKey(codecTargetFromParameter.name())) {
78+
} else if (codecTargetFromParameter != null && !hasBuiltInCodec(codecTargetFromParameter)) {
79+
// Codec is not set and built-in codecs cannot be used
80+
if (!isLocal) {
81+
throw new IllegalStateException(
82+
"The Local Message Codec can only be used for local delivery,"
83+
+ " you will need to implement a message codec for " + codecTargetFromParameter.name()
84+
.toString()
85+
+ " and make use of @ConsumeEvent#codec()");
86+
} else if (!codecByTypes.containsKey(codecTargetFromParameter.name())) {
87+
if (isConcreteClass(codecTargetFromParameter, index)) {
88+
// The default codec makes only sense for concrete classes
8289
LOGGER.debugf("Local Message Codec registered for type %s",
8390
codecTargetFromParameter);
8491
codecByTypes.put(codecTargetFromParameter.name(), LOCAL_EVENT_BUS_CODEC);
92+
} else {
93+
LOGGER.debugf("Local Message Codec will be selected for type %s", codecTargetFromParameter);
94+
selectorTypes.add(codecTargetFromParameter.name());
8595
}
8696
}
8797
}
8898

89-
if (codecTargetFromReturnType != null && !hasBuiltInCodec(codecTargetFromReturnType)
90-
&& !codecByTypes.containsKey(codecTargetFromReturnType.name())) {
91-
92-
LOGGER.debugf("Local Message Codec registered for type %s", codecTargetFromReturnType);
93-
codecByTypes.put(codecTargetFromReturnType.name(), LOCAL_EVENT_BUS_CODEC);
99+
Type codecTargetFromReturnType = extractPayloadTypeFromReturn(method);
100+
if (codecTargetFromReturnType != null && !hasBuiltInCodec(codecTargetFromReturnType)) {
101+
if (!isLocal) {
102+
throw new IllegalStateException(
103+
"The Local Message Codec can only be used for local delivery,"
104+
+ " you will need to modify the method to consume io.vertx.core.eventbus.Message, implement a message codec for "
105+
+ codecTargetFromReturnType.name()
106+
.toString()
107+
+ " and make use of io.vertx.core.eventbus.DeliveryOptions");
108+
} else if (!codecByTypes.containsKey(codecTargetFromReturnType.name())) {
109+
if (isConcreteClass(codecTargetFromReturnType, index)) {
110+
// The default codec makes only sense for concrete classes
111+
LOGGER.debugf("Local Message Codec registered for type %s", codecTargetFromReturnType);
112+
codecByTypes.put(codecTargetFromReturnType.name(), LOCAL_EVENT_BUS_CODEC);
113+
} else {
114+
LOGGER.debugf("Local Message Codec will be selected for type %s", codecTargetFromReturnType);
115+
selectorTypes.add(codecTargetFromReturnType.name());
116+
}
117+
}
94118
}
95119
}
96120

@@ -133,6 +157,9 @@ public void accept(String name) {
133157
reflectiveClass.produce(ReflectiveClassBuildItem.builder(name).methods().build());
134158
}
135159
});
160+
161+
localCodecSelectorTypes.produce(new LocalCodecSelectorTypesBuildItem(
162+
selectorTypes.stream().map(Object::toString).collect(Collectors.toSet())));
136163
}
137164

138165
private static final List<String> BUILT_IN_CODECS = Arrays.asList(
@@ -220,4 +247,14 @@ private static boolean hasBuiltInCodec(Type type) {
220247
private static boolean isMessageClass(ParameterizedType type) {
221248
return VertxConstants.isMessage(type.name());
222249
}
250+
251+
private static boolean isConcreteClass(Type type, IndexView index) {
252+
if (type != null && type.kind() == Kind.CLASS) {
253+
ClassInfo clazz = index.getClassByName(type.name());
254+
if (clazz != null) {
255+
return !clazz.isInterface() && !Modifier.isAbstract(clazz.flags());
256+
}
257+
}
258+
return false;
259+
}
223260
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.quarkus.vertx.deployment;
2+
3+
import java.util.Set;
4+
5+
import io.quarkus.builder.item.SimpleBuildItem;
6+
7+
/**
8+
* Carries all types for which the {@link io.quarkus.vertx.LocalEventBusCodec} should be selected automatically.
9+
*/
10+
public final class LocalCodecSelectorTypesBuildItem extends SimpleBuildItem {
11+
12+
private final Set<String> types;
13+
14+
LocalCodecSelectorTypesBuildItem(Set<String> types) {
15+
this.types = types;
16+
}
17+
18+
public Set<String> getTypes() {
19+
return types;
20+
}
21+
22+
}

extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static io.quarkus.vertx.deployment.VertxConstants.isMessage;
55
import static io.quarkus.vertx.deployment.VertxConstants.isMessageHeaders;
66

7+
import java.util.ArrayList;
78
import java.util.HashMap;
89
import java.util.List;
910
import java.util.Map;
@@ -45,7 +46,6 @@
4546
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
4647
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
4748
import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem;
48-
import io.quarkus.deployment.recording.RecorderContext;
4949
import io.quarkus.gizmo.ClassOutput;
5050
import io.quarkus.vertx.ConsumeEvent;
5151
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
@@ -74,7 +74,7 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder rec
7474
BuildProducer<GeneratedClassBuildItem> generatedClass,
7575
AnnotationProxyBuildItem annotationProxy, LaunchModeBuildItem launchMode, ShutdownContextBuildItem shutdown,
7676
BuildProducer<ServiceStartBuildItem> serviceStart, BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
77-
List<MessageCodecBuildItem> codecs, RecorderContext recorderContext) {
77+
List<MessageCodecBuildItem> codecs, LocalCodecSelectorTypesBuildItem localCodecSelectorTypes) {
7878
Map<String, ConsumeEvent> messageConsumerConfigurations = new HashMap<>();
7979
ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true);
8080
for (EventConsumerBusinessMethodItem businessMethod : messageConsumerBusinessMethods) {
@@ -87,15 +87,20 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder rec
8787
reflectiveClass.produce(ReflectiveClassBuildItem.builder(invokerClass).build());
8888
}
8989

90+
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
9091
Map<Class<?>, Class<?>> codecByClass = new HashMap<>();
9192
for (MessageCodecBuildItem messageCodecItem : codecs) {
92-
codecByClass.put(recorderContext.classProxy(messageCodecItem.getType()),
93-
recorderContext.classProxy(messageCodecItem.getCodec()));
93+
codecByClass.put(tryLoad(messageCodecItem.getType(), tccl), tryLoad(messageCodecItem.getCodec(), tccl));
94+
}
95+
96+
List<Class<?>> selectorTypes = new ArrayList<>();
97+
for (String name : localCodecSelectorTypes.getTypes()) {
98+
selectorTypes.add(tryLoad(name, tccl));
9499
}
95100

96101
recorder.configureVertx(vertx.getVertx(), messageConsumerConfigurations,
97102
launchMode.getLaunchMode(),
98-
shutdown, codecByClass);
103+
shutdown, codecByClass, selectorTypes);
99104
serviceStart.produce(new ServiceStartBuildItem("vertx"));
100105
return new VertxBuildItem(recorder.forceStart(vertx.getVertx()));
101106
}
@@ -190,4 +195,12 @@ void faultToleranceIntegration(Capabilities capabilities, BuildProducer<ServiceP
190195
"io.smallrye.faulttolerance.vertx.VertxEventLoop"));
191196
}
192197
}
198+
199+
private Class<?> tryLoad(String name, ClassLoader tccl) {
200+
try {
201+
return tccl.loadClass(name);
202+
} catch (ClassNotFoundException e) {
203+
throw new IllegalStateException("Unable to load type: " + name, e);
204+
}
205+
}
193206
}

extensions/vertx/deployment/src/test/java/io/quarkus/vertx/EventBusCodecTest.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package io.quarkus.vertx;
22

33
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
45

56
import java.lang.annotation.ElementType;
67
import java.lang.annotation.Retention;
78
import java.lang.annotation.RetentionPolicy;
89
import java.lang.annotation.Target;
910
import java.util.concurrent.CompletableFuture;
1011
import java.util.concurrent.CompletionStage;
12+
import java.util.function.Function;
13+
import java.util.function.Supplier;
1114

1215
import jakarta.inject.Inject;
1316

@@ -60,10 +63,10 @@ public void testWithUserCodec() {
6063

6164
@Test
6265
public void testWithUserCodecNonLocal() {
63-
Greeting hello = vertx.eventBus().<Greeting> request("nl-pet", new Pet("neo", "rabbit"))
66+
String hello = vertx.eventBus().<String> request("nl-pet", new Pet("neo", "rabbit"))
6467
.onItem().transform(Message::body)
6568
.await().indefinitely();
66-
assertThat(hello.getMessage()).isEqualTo("Non Local Hello NEO");
69+
assertEquals("Non Local Hello NEO", hello);
6770
}
6871

6972
@Test
@@ -79,6 +82,20 @@ public void testWithSubclass() {
7982
assertThat(hello.getMessage()).isEqualTo("Hello my-subclass-event");
8083
}
8184

85+
@Test
86+
public void testWithInterfaceCodecTarget() {
87+
Supplier<String> supplier = vertx.eventBus()
88+
.<Supplier<String>> request("hello-supplier", new Function<String, String>() {
89+
@Override
90+
public String apply(String value) {
91+
return value.toLowerCase();
92+
}
93+
})
94+
.onItem().transform(Message::body)
95+
.await().indefinitely();
96+
assertEquals("foo", supplier.get());
97+
}
98+
8299
static class Greeting {
83100
private final String message;
84101

@@ -118,12 +135,23 @@ void messageTypeWithTypeAnnotation(@NonNull Person person) {
118135
public CompletionStage<Greeting> hello(Event event) {
119136
return CompletableFuture.completedFuture(new Greeting("Hello " + event.getProperty()));
120137
}
138+
139+
@ConsumeEvent("hello-supplier")
140+
public Supplier<String> helloSupplier(Function<String, String> fun) {
141+
return new Supplier<String>() {
142+
143+
@Override
144+
public String get() {
145+
return fun.apply("FOO");
146+
}
147+
};
148+
}
121149
}
122150

123151
static class MyNonLocalBean {
124152
@ConsumeEvent(value = "nl-pet", codec = MyPetCodec.class, local = false)
125-
public CompletionStage<Greeting> hello(Pet p) {
126-
return CompletableFuture.completedFuture(new Greeting("Non Local Hello " + p.getName()));
153+
public CompletionStage<String> hello(Pet p) {
154+
return CompletableFuture.completedFuture("Non Local Hello " + p.getName());
127155
}
128156
}
129157

extensions/vertx/deployment/src/test/java/io/quarkus/vertx/MutinyCodecTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.quarkus.vertx;
22

33
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
45

56
import jakarta.inject.Inject;
67

@@ -54,10 +55,10 @@ public void testWithUserCodec() {
5455

5556
@Test
5657
public void testWithUserCodecNonLocal() {
57-
Greeting hello = vertx.eventBus().<Greeting> request("nl-pet", new Pet("neo", "rabbit"))
58+
String hello = vertx.eventBus().<String> request("nl-pet", new Pet("neo", "rabbit"))
5859
.onItem().transform(Message::body)
5960
.await().indefinitely();
60-
assertThat(hello.getMessage()).isEqualTo("Non Local Hello NEO");
61+
assertEquals("Non Local Hello NEO", hello);
6162
}
6263

6364
static class Greeting {
@@ -90,9 +91,9 @@ public Uni<Greeting> hello(Pet p) {
9091

9192
static class MyNonLocalBean {
9293
@ConsumeEvent(value = "nl-pet", codec = MyPetCodec.class, local = false)
93-
public Uni<Greeting> hello(Pet p) {
94+
public Uni<String> hello(Pet p) {
9495
return Uni.createFrom().item(
95-
() -> new Greeting("Non Local Hello " + p.getName()))
96+
() -> "Non Local Hello " + p.getName())
9697
.emitOn(Infrastructure.getDefaultExecutor());
9798
}
9899
}

0 commit comments

Comments
 (0)