Skip to content

Commit cadd5a4

Browse files
committed
Bump Smallrye RM from 4.9.0 to 4.10.1
1 parent a06ccd7 commit cadd5a4

File tree

17 files changed

+261
-42
lines changed

17 files changed

+261
-42
lines changed

bom/application/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
6464
<smallrye-reactive-types-converter.version>3.0.1</smallrye-reactive-types-converter.version>
6565
<smallrye-mutiny-vertx-binding.version>3.6.0</smallrye-mutiny-vertx-binding.version>
66-
<smallrye-reactive-messaging.version>4.9.0</smallrye-reactive-messaging.version>
66+
<smallrye-reactive-messaging.version>4.10.1</smallrye-reactive-messaging.version>
6767
<smallrye-stork.version>2.3.1</smallrye-stork.version>
6868
<jakarta.activation.version>2.1.2</jakarta.activation.version>
6969
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>

docs/src/main/asciidoc/kafka.adoc

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1245,6 +1245,66 @@ Reciprocally, multiple producers on the same channel can be merged by setting `m
12451245
On the `@Incoming` methods, you can control how multiple channels are merged using the `@Merge` annotation.
12461246
====
12471247

1248+
Repeating the `@Outgoing` annotation on outbound or processing methods allows another way of dispatching messages to multiple outgoing channels:
1249+
1250+
[source, java]
1251+
----
1252+
import java.util.Random;
1253+
1254+
import jakarta.enterprise.context.ApplicationScoped;
1255+
1256+
import org.eclipse.microprofile.reactive.messaging.Outgoing;
1257+
1258+
@ApplicationScoped
1259+
public class MultipleProducers {
1260+
1261+
private final Random random = new Random();
1262+
1263+
@Outgoing("generated")
1264+
@Outgoing("generated-2")
1265+
double priceBroadcast() {
1266+
return random.nextDouble();
1267+
}
1268+
1269+
}
1270+
----
1271+
1272+
In the previous example generated price will be broadcast to both outbound channels.
1273+
The following example selectively sends messages to multiple outgoing channels using the `Targeted` container object,
1274+
containing key as channel name and value as message payload.
1275+
1276+
[source, java]
1277+
----
1278+
import jakarta.enterprise.context.ApplicationScoped;
1279+
1280+
import org.eclipse.microprofile.reactive.messaging.Incoming;
1281+
import org.eclipse.microprofile.reactive.messaging.Outgoing;
1282+
1283+
import io.smallrye.reactive.messaging.Targeted;
1284+
1285+
@ApplicationScoped
1286+
public class TargetedProducers {
1287+
1288+
@Incoming("in")
1289+
@Outgoing("out1")
1290+
@Outgoing("out2")
1291+
@Outgoing("out3")
1292+
public Targeted process(double price) {
1293+
Targeted targeted = Targeted.of("out1", "Price: " + price,
1294+
"out2", "Quote: " + price);
1295+
if (price > 90.0) {
1296+
return targeted.with("out3", price);
1297+
}
1298+
return targeted;
1299+
}
1300+
1301+
}
1302+
----
1303+
1304+
Note that <<serialization-autodetection,the auto-detection for Kafka serializers>> doesn't work for signatures using the `Targeted`.
1305+
1306+
For more details on using multiple outgoings, please refer to the http://smallrye.io/smallrye-reactive-messaging/4.10.0/concepts/outgoings/[SmallRye Reactive Messaging documentation].
1307+
12481308
=== Kafka Transactions
12491309

12501310
Kafka transactions enable atomic writes to multiple Kafka topics and partitions.
@@ -2261,7 +2321,7 @@ See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture do
22612321

22622322
== Channel Decorators
22632323

2264-
SmallRye Reactive Messaging supports decorating incoming and outgoing channels for implementing cross-cutting concerns such as monitoring, tracing or message interception. For more information on implementing decorators and message interceptors see the http://smallrye.io/smallrye-reactive-messaging/3.19.1/concepts/decorators/[SmallRye Reactive Messaging documentation].
2324+
SmallRye Reactive Messaging supports decorating incoming and outgoing channels for implementing cross-cutting concerns such as monitoring, tracing or message interception. For more information on implementing decorators and message interceptors see the http://smallrye.io/smallrye-reactive-messaging/latest/concepts/decorators/[SmallRye Reactive Messaging documentation].
22652325

22662326
[[kafka-configuration]]
22672327
== Configuration Reference

extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ final class DotNames {
2828
static final DotName PROCESSOR = DotName.createSimple(org.reactivestreams.Processor.class.getName());
2929
static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName());
3030
static final DotName MULTI = DotName.createSimple(io.smallrye.mutiny.Multi.class.getName());
31+
static final DotName MULTI_SPLITTER = DotName.createSimple(io.smallrye.mutiny.operators.multi.split.MultiSplitter.class.getName());
3132

3233
static final DotName AVRO_GENERATED = DotName.createSimple("org.apache.avro.specific.AvroGenerated");
3334
static final DotName AVRO_GENERIC_RECORD = DotName.createSimple("org.apache.avro.generic.GenericRecord");

extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,11 @@ public void defaultChannelConfiguration(
181181
if (launchMode.getLaunchMode().isDevOrTest()) {
182182
if (!runtimeConfig.enableGracefulShutdownInDevAndTestMode) {
183183
List<AnnotationInstance> incomings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.INCOMING);
184+
List<AnnotationInstance> outgoings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING);
184185
List<AnnotationInstance> channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL);
185186
List<AnnotationInstance> annotations = new ArrayList<>();
186187
annotations.addAll(incomings);
188+
annotations.addAll(outgoings);
187189
annotations.addAll(channels);
188190
for (AnnotationInstance annotation : annotations) {
189191
String channelName = annotation.value().asString();
@@ -221,7 +223,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
221223
alreadyGeneratedDeserializers);
222224
}
223225

224-
for (AnnotationInstance annotation : discovery.findAnnotationsOnMethods(DotNames.OUTGOING)) {
226+
for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING)) {
225227
String channelName = annotation.value().asString();
226228
if (!discovery.isKafkaConnector(channelsManagedByConnectors, false, channelName)) {
227229
continue;
@@ -428,6 +430,7 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {
428430
if ((isPublisher(returnType) && parametersCount == 0)
429431
|| (isPublisherBuilder(returnType) && parametersCount == 0)
430432
|| (isMulti(returnType) && parametersCount == 0)
433+
|| (isMultiSplitter(returnType) && parametersCount == 0)
431434
|| (isCompletionStage(returnType) && parametersCount == 0)
432435
|| (isUni(returnType) && parametersCount == 0)) {
433436
outgoingType = returnType.asParameterizedType().arguments().get(0);
@@ -443,7 +446,8 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {
443446
|| (isUni(returnType) && parametersCount == 1)
444447
|| (isPublisher(returnType) && parametersCount == 1)
445448
|| (isPublisherBuilder(returnType) && parametersCount == 1)
446-
|| (isMulti(returnType) && parametersCount == 1)) {
449+
|| (isMulti(returnType) && parametersCount == 1)
450+
|| (isMultiSplitter(returnType) && parametersCount == 1)) {
447451
outgoingType = returnType.asParameterizedType().arguments().get(0);
448452
} else if ((isProcessor(returnType) && parametersCount == 0)
449453
|| (isProcessorBuilder(returnType) && parametersCount == 0)) {
@@ -556,6 +560,13 @@ private static boolean isMulti(Type type) {
556560
&& type.asParameterizedType().arguments().size() == 1;
557561
}
558562

563+
private static boolean isMultiSplitter(Type type) {
564+
// raw type MultiSplitter is wrong, must be MultiSplitter<Something, KeyEnum>
565+
return DotNames.MULTI_SPLITTER.equals(type.name())
566+
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
567+
&& type.asParameterizedType().arguments().size() == 2;
568+
}
569+
559570
private static boolean isSubscriber(Type type) {
560571
// raw type Subscriber is wrong, must be Subscriber<Something>
561572
return DotNames.SUBSCRIBER.equals(type.name())

extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import io.smallrye.config.common.MapBackedConfigSource;
4949
import io.smallrye.mutiny.Multi;
5050
import io.smallrye.mutiny.Uni;
51+
import io.smallrye.mutiny.operators.multi.split.MultiSplitter;
5152
import io.smallrye.reactive.messaging.MutinyEmitter;
5253
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
5354
import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
@@ -66,6 +67,7 @@ private static void doTest(Config customConfig, Tuple[] expectations, Class<?>..
6667

6768
List<Class<?>> classes = new ArrayList<>(Arrays.asList(classesToIndex));
6869
classes.add(Incoming.class);
70+
classes.add(Outgoing.class);
6971
DefaultSerdeDiscoveryState discovery = new DefaultSerdeDiscoveryState(index(classes)) {
7072
@Override
7173
Config getConfig() {
@@ -2727,6 +2729,45 @@ void method2(JsonObject msg) {
27272729

27282730
}
27292731

2732+
@Test
2733+
void repeatableOutgoings() {
2734+
Tuple[] expectations = {
2735+
tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
2736+
tuple("mp.messaging.outgoing.channel2.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
2737+
tuple("mp.messaging.outgoing.channel3.value.serializer", "io.quarkus.kafka.client.serialization.JsonObjectSerializer"),
2738+
tuple("mp.messaging.outgoing.channel4.value.serializer", "io.quarkus.kafka.client.serialization.JsonObjectSerializer"),
2739+
tuple("mp.messaging.outgoing.channel5.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
2740+
tuple("mp.messaging.outgoing.channel6.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
2741+
};
2742+
doTest(expectations, RepeatableOutgoingsChannels.class);
2743+
}
2744+
2745+
private static class RepeatableOutgoingsChannels {
2746+
2747+
@Outgoing("channel1")
2748+
@Outgoing("channel2")
2749+
String method1() {
2750+
return null;
2751+
}
2752+
2753+
@Outgoing("channel3")
2754+
@Outgoing("channel4")
2755+
JsonObject method2() {
2756+
return null;
2757+
}
2758+
2759+
enum T {
2760+
2761+
}
2762+
2763+
@Outgoing("channel5")
2764+
@Outgoing("channel6")
2765+
MultiSplitter<Long, T> method3() {
2766+
return null;
2767+
}
2768+
2769+
}
2770+
27302771
@Test
27312772
void channelNameContainingDot() {
27322773
Tuple[] expectations = {

extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT;
1010
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
1111
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
12+
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOINGS;
1213
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
1314
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
1415
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;
@@ -116,10 +117,12 @@ public static QuarkusMediatorConfiguration create(MethodInfo methodInfo, boolean
116117
incomingValues.addAll(getIncomingValues(methodInfo));
117118
configuration.setIncomings(incomingValues);
118119

119-
String outgoingValue = getValue(methodInfo, OUTGOING);
120-
configuration.setOutgoing(outgoingValue);
120+
// We need to extract the value of @Outgoing and @Outgoings (which contains an array of @Outgoing)
121+
List<String> outgoingValues = new ArrayList<>(getValues(methodInfo, OUTGOING));
122+
outgoingValues.addAll(getOutgoingValues(methodInfo));
123+
configuration.setOutgoings(outgoingValues);
121124

122-
Shape shape = mediatorConfigurationSupport.determineShape(incomingValues, outgoingValue);
125+
Shape shape = mediatorConfigurationSupport.determineShape(incomingValues, outgoingValues);
123126
configuration.setShape(shape);
124127
Acknowledgment.Strategy acknowledgment = mediatorConfigurationSupport
125128
.processSuppliedAcknowledgement(incomingValues,
@@ -161,7 +164,7 @@ public Merge.Mode get() {
161164
}
162165
}));
163166

164-
configuration.setBroadcastValue(mediatorConfigurationSupport.processBroadcast(outgoingValue,
167+
configuration.setBroadcastValue(mediatorConfigurationSupport.processBroadcast(outgoingValues,
165168
new Supplier<Integer>() {
166169
@Override
167170
public Integer get() {
@@ -176,6 +179,7 @@ public Integer get() {
176179
return null;
177180
}
178181
}));
182+
configuration.setHasTargetedOutput(mediatorConfigurationSupport.processTargetedOutput());
179183

180184
AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING);
181185
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
@@ -328,6 +332,13 @@ private static List<String> getIncomingValues(MethodInfo methodInfo) {
328332
.collect(Collectors.toList());
329333
}
330334

335+
private static List<String> getOutgoingValues(MethodInfo methodInfo) {
336+
return methodInfo.annotations().stream().filter(ai -> ai.name().equals(OUTGOINGS))
337+
.flatMap(outgoings -> Arrays.stream(outgoings.value().asNestedArray()))
338+
.map(outgoing -> outgoing.value().asString())
339+
.collect(Collectors.toList());
340+
}
341+
331342
private static String fullMethodName(MethodInfo methodInfo) {
332343
return methodInfo.declaringClass() + "#" + methodInfo.name();
333344
}

extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.smallrye.reactive.messaging.annotations.Incomings;
2424
import io.smallrye.reactive.messaging.annotations.Merge;
2525
import io.smallrye.reactive.messaging.annotations.OnOverflow;
26+
import io.smallrye.reactive.messaging.annotations.Outgoings;
2627
import io.smallrye.reactive.messaging.connector.InboundConnector;
2728
import io.smallrye.reactive.messaging.connector.OutboundConnector;
2829
import io.smallrye.reactive.messaging.keyed.KeyValueExtractor;
@@ -38,6 +39,7 @@ public final class ReactiveMessagingDotNames {
3839
static final DotName INCOMING = DotName.createSimple(Incoming.class.getName());
3940
static final DotName INCOMINGS = DotName.createSimple(Incomings.class.getName());
4041
static final DotName OUTGOING = DotName.createSimple(Outgoing.class.getName());
42+
static final DotName OUTGOINGS = DotName.createSimple(Outgoings.class.getName());
4143

4244
public static final DotName CONNECTOR = DotName.createSimple(Connector.class.getName());
4345
static final DotName CONNECTOR_ATTRIBUTES = DotName.createSimple(ConnectorAttributes.class.getName());

extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ public List<UnremovableBeanBuildItem> removalExclusions() {
159159
new UnremovableBeanBuildItem(
160160
new BeanClassAnnotationExclusion(
161161
ReactiveMessagingDotNames.OUTGOING)),
162+
new UnremovableBeanBuildItem(
163+
new BeanClassAnnotationExclusion(
164+
ReactiveMessagingDotNames.OUTGOINGS)),
162165
new UnremovableBeanBuildItem(
163166
new BeanClassAnnotationExclusion(
164167
ReactiveMessagingDotNames.MESSAGE_CONVERTER)),

extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringProcessor.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,19 @@ void extractComponents(BeanDiscoveryFinishedBuildItem beanDiscoveryFinished,
107107
ReactiveMessagingDotNames.INCOMINGS);
108108
AnnotationInstance outgoing = transformedAnnotations.getAnnotation(method,
109109
ReactiveMessagingDotNames.OUTGOING);
110+
AnnotationInstance outgoings = transformedAnnotations.getAnnotation(method,
111+
ReactiveMessagingDotNames.OUTGOINGS);
110112
AnnotationInstance blocking = transformedAnnotations.getAnnotation(method,
111113
BLOCKING);
112-
if (incoming != null || incomings != null || outgoing != null) {
114+
if (incoming != null || incomings != null || outgoing != null || outgoings != null) {
113115
handleMethodAnnotatedWithIncoming(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
114116
method, incoming);
115117
handleMethodAnnotationWithIncomings(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
116118
method, incomings);
117119
handleMethodAnnotationWithOutgoing(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
118120
method, outgoing);
121+
handleMethodAnnotationWithOutgoings(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
122+
method, outgoings);
119123

120124
if (WiringHelper.isSynthetic(method)) {
121125
continue;
@@ -218,6 +222,24 @@ private void handleMethodAnnotationWithOutgoing(BuildProducer<ChannelBuildItem>
218222
}
219223
}
220224

225+
private void handleMethodAnnotationWithOutgoings(BuildProducer<ChannelBuildItem> appChannels,
226+
BuildProducer<ValidationPhaseBuildItem.ValidationErrorBuildItem> validationErrors,
227+
BuildProducer<ConfigDescriptionBuildItem> configDescriptionBuildItemBuildProducer,
228+
MethodInfo method, AnnotationInstance outgoings) {
229+
if (outgoings != null) {
230+
for (AnnotationInstance instance : outgoings.value().asNestedArray()) {
231+
if (instance.value().asString().isEmpty()) {
232+
validationErrors.produce(new ValidationPhaseBuildItem.ValidationErrorBuildItem(
233+
new DeploymentException("Empty @Outgoing annotation on method " + method)));
234+
}
235+
configDescriptionBuildItemBuildProducer.produce(new ConfigDescriptionBuildItem(
236+
"mp.messaging.outgoing." + instance.value().asString() + ".connector", null,
237+
"The connector to use", null, null, ConfigPhase.BUILD_TIME));
238+
produceOutgoingChannel(appChannels, instance.value().asString());
239+
}
240+
}
241+
}
242+
221243
private void handleMethodAnnotationWithIncomings(BuildProducer<ChannelBuildItem> appChannels,
222244
BuildProducer<ValidationPhaseBuildItem.ValidationErrorBuildItem> validationErrors,
223245
BuildProducer<ConfigDescriptionBuildItem> configDescriptionBuildItemBuildProducer,

extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class QuarkusMediatorConfiguration implements MediatorConfiguration {
3131

3232
private List<String> incomings = new ArrayList<>();
3333

34-
private String outgoing;
34+
private List<String> outgoings = new ArrayList<>();
3535

3636
private Acknowledgment.Strategy acknowledgment;
3737

@@ -63,6 +63,8 @@ public class QuarkusMediatorConfiguration implements MediatorConfiguration {
6363
private Type valueType;
6464
private Class<? extends KeyValueExtractor> keyed;
6565

66+
private boolean hasTargetedOutput = false;
67+
6668
public String getBeanId() {
6769
return beanId;
6870
}
@@ -117,11 +119,16 @@ public void setIncomings(List<String> incomings) {
117119

118120
@Override
119121
public String getOutgoing() {
120-
return outgoing;
122+
return outgoings.get(0);
123+
}
124+
125+
@Override
126+
public List<String> getOutgoings() {
127+
return outgoings;
121128
}
122129

123-
public void setOutgoing(String outgoing) {
124-
this.outgoing = outgoing;
130+
public void setOutgoings(List<String> outgoings) {
131+
this.outgoings = outgoings;
125132
}
126133

127134
@Override
@@ -310,4 +317,17 @@ public void setValueType(Type valueType) {
310317
public void setKeyed(Class<? extends KeyValueExtractor> keyed) {
311318
this.keyed = keyed;
312319
}
320+
321+
@Override
322+
public boolean hasTargetedOutput() {
323+
return hasTargetedOutput;
324+
}
325+
326+
public boolean isHasTargetedOutput() {
327+
return hasTargetedOutput;
328+
}
329+
330+
public void setHasTargetedOutput(boolean hasTargetedOutput) {
331+
this.hasTargetedOutput = hasTargetedOutput;
332+
}
313333
}

0 commit comments

Comments
 (0)