Skip to content

Commit acd8a03

Browse files
authored
GH-3957: Add JmsInboundGateway.replyToExpression (#8560)
* GH-3957: Add JmsInboundGateway.replyToExpression Fixes #3957 Sometimes we cannot use a standard `JmsReplyTo` property for sending replies from the server. A `DestinationResolver` API does not have access to the request message. * Introduce a `ChannelPublishingJmsMessageListener.replyToExpression` property to evaluate a reply destination against request JMS `Message` * Use this expression only of no `JmsReplyTo` property * Expose this property on Java DSL level * To simplify end-user experience with lambda configuration for this property, introduce a `CheckedFunction` which essentially re-throws exception "sneaky" way * Fix Javadoc for `CheckedFunction` * * Fix language in docs * Fix Javadocs lines length * Regular `catch` and re-throw in the `CheckedFunction`
1 parent 3f99424 commit acd8a03

File tree

7 files changed

+198
-15
lines changed

7 files changed

+198
-15
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.util;
18+
19+
import java.util.function.Function;
20+
21+
/**
22+
* A Function-like interface which allows throwing Error.
23+
*
24+
* @param <T> the input type.
25+
* @param <R> the output type.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 6.1
30+
*/
31+
@FunctionalInterface
32+
public interface CheckedFunction<T, R> {
33+
34+
R apply(T t) throws Throwable; // NOSONAR
35+
36+
default Function<T, R> unchecked() {
37+
return t1 -> {
38+
try {
39+
return apply(t1);
40+
}
41+
catch (Throwable t) { // NOSONAR
42+
if (t instanceof RuntimeException runtimeException) {
43+
throw runtimeException;
44+
}
45+
else if (t instanceof Error error) {
46+
throw error;
47+
}
48+
else {
49+
throw new IllegalStateException(t);
50+
}
51+
}
52+
};
53+
}
54+
55+
}

spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,7 +30,10 @@
3030
import org.springframework.beans.factory.BeanFactoryAware;
3131
import org.springframework.beans.factory.InitializingBean;
3232
import org.springframework.core.log.LogAccessor;
33+
import org.springframework.expression.Expression;
34+
import org.springframework.expression.spel.support.StandardEvaluationContext;
3335
import org.springframework.integration.core.MessagingTemplate;
36+
import org.springframework.integration.expression.ExpressionUtils;
3437
import org.springframework.integration.gateway.MessagingGatewaySupport;
3538
import org.springframework.integration.support.DefaultMessageBuilderFactory;
3639
import org.springframework.integration.support.MessageBuilderFactory;
@@ -42,6 +45,7 @@
4245
import org.springframework.jms.support.converter.SimpleMessageConverter;
4346
import org.springframework.jms.support.destination.DestinationResolver;
4447
import org.springframework.jms.support.destination.DynamicDestinationResolver;
48+
import org.springframework.lang.Nullable;
4549
import org.springframework.messaging.Message;
4650
import org.springframework.messaging.MessageChannel;
4751
import org.springframework.messaging.MessagingException;
@@ -90,12 +94,16 @@ public class ChannelPublishingJmsMessageListener
9094

9195
private DestinationResolver destinationResolver = new DynamicDestinationResolver();
9296

97+
private Expression replyToExpression;
98+
9399
private JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper();
94100

95101
private BeanFactory beanFactory;
96102

97103
private MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();
98104

105+
private StandardEvaluationContext evaluationContext;
106+
99107
/**
100108
* Specify whether a JMS reply Message is expected.
101109
* @param expectReply true if a reply is expected.
@@ -261,6 +269,17 @@ public void setDestinationResolver(DestinationResolver destinationResolver) {
261269
this.destinationResolver = destinationResolver;
262270
}
263271

272+
/**
273+
* Set a SpEL expression to resolve a 'replyTo' destination from a request
274+
* {@link jakarta.jms.Message} as a root evaluation object
275+
* if {@link jakarta.jms.Message#getJMSReplyTo()} is null.
276+
* @param replyToExpression the SpEL expression for 'replyTo' destination.
277+
* @since 6.1
278+
*/
279+
public void setReplyToExpression(Expression replyToExpression) {
280+
this.replyToExpression = replyToExpression;
281+
}
282+
264283
/**
265284
* Provide a {@link MessageConverter} implementation to use when
266285
* converting between JMS Messages and Spring Integration Messages.
@@ -382,6 +401,7 @@ public void afterPropertiesSet() {
382401
}
383402
this.gatewayDelegate.afterPropertiesSet();
384403
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
404+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
385405
}
386406

387407
protected void start() {
@@ -417,21 +437,22 @@ else if (replyMessage.getJMSCorrelationID() == null) {
417437

418438
/**
419439
* Determine a reply destination for the given message.
420-
* <p> This implementation first checks the boolean 'error' flag which signifies that the reply is an error message.
421-
* If reply is not an error it will first check the JMS Reply-To {@link Destination} of the supplied request message;
422-
* if that is not <code>null</code> it is returned; if it is <code>null</code>, then the configured
423-
* {@link #resolveDefaultReplyDestination default reply destination} is returned; if this too is <code>null</code>,
440+
* It will first check the JMS Reply-To {@link Destination}
441+
* of the supplied request message;
442+
* if that is null, then the configured {@link #replyToExpression} is evaluated
443+
* (if any), then a{@link #resolveDefaultReplyDestination default reply destination}
444+
* is returned; if this too is null,
424445
* then an {@link InvalidDestinationException} is thrown.
425446
* @param request the original incoming JMS message
426447
* @param session the JMS Session to operate on
427-
* @return the reply destination (never <code>null</code>)
448+
* @return the reply destination (never null)
428449
* @throws JMSException if thrown by JMS API methods
429450
* @throws InvalidDestinationException if no {@link Destination} can be determined
430451
* @see #setDefaultReplyDestination
431452
* @see jakarta.jms.Message#getJMSReplyTo()
432453
*/
433454
private Destination getReplyDestination(jakarta.jms.Message request, Session session) throws JMSException {
434-
Destination replyTo = request.getJMSReplyTo();
455+
Destination replyTo = resolveReplyTo(request, session);
435456
if (replyTo == null) {
436457
replyTo = resolveDefaultReplyDestination(session);
437458
if (replyTo == null) {
@@ -440,6 +461,24 @@ private Destination getReplyDestination(jakarta.jms.Message request, Session ses
440461
}
441462
}
442463
return replyTo;
464+
465+
}
466+
467+
@Nullable
468+
private Destination resolveReplyTo(jakarta.jms.Message request, Session session) throws JMSException {
469+
Destination replyTo = request.getJMSReplyTo();
470+
if (replyTo == null) {
471+
if (this.replyToExpression != null) {
472+
Object replyToValue = this.replyToExpression.getValue(this.evaluationContext, request);
473+
if (replyToValue instanceof Destination destination) {
474+
return destination;
475+
}
476+
else if (replyToValue instanceof String destinationName) {
477+
return this.destinationResolver.resolveDestinationName(session, destinationName, false);
478+
}
479+
}
480+
}
481+
return replyTo;
443482
}
444483

445484
/**

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ public void setShouldTrack(boolean shouldTrack) {
103103
}
104104

105105
/**
106-
* Set to false to prevent listener container shutdown when the endpoint is stopped.
106+
* Set to {@code false} to prevent listener container shutdown when the endpoint is stopped.
107107
* Then, if so configured, any cached consumer(s) in the container will remain.
108-
* Otherwise the shared connection and will be closed and the listener invokers shut
108+
* Otherwise, the shared connection and will be closed and the listener invokers shut
109109
* down; this behavior is new starting with version 5.1. Default: true.
110110
* @param shutdownContainerOnStop false to not shutdown.
111111
* @since 5.1

spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,11 +19,15 @@
1919
import java.util.function.Consumer;
2020

2121
import jakarta.jms.Destination;
22+
import jakarta.jms.Message;
2223

24+
import org.springframework.expression.Expression;
2325
import org.springframework.integration.dsl.MessagingGatewaySpec;
26+
import org.springframework.integration.expression.FunctionExpression;
2427
import org.springframework.integration.jms.ChannelPublishingJmsMessageListener;
2528
import org.springframework.integration.jms.JmsHeaderMapper;
2629
import org.springframework.integration.jms.JmsInboundGateway;
30+
import org.springframework.integration.util.CheckedFunction;
2731
import org.springframework.jms.listener.AbstractMessageListenerContainer;
2832
import org.springframework.jms.support.converter.MessageConverter;
2933
import org.springframework.jms.support.destination.DestinationResolver;
@@ -137,6 +141,44 @@ public S destinationResolver(DestinationResolver destinationResolver) {
137141
return _this();
138142
}
139143

144+
145+
/**
146+
* Set a SpEL expression to resolve a 'replyTo' destination from a request {@link jakarta.jms.Message}
147+
* as a root evaluation object if {@link jakarta.jms.Message#getJMSReplyTo()} is null.
148+
* @param replyToExpression the SpEL expression for 'replyTo' destination.
149+
* @return the spec.
150+
* @since 6.1
151+
* @see ChannelPublishingJmsMessageListener#setReplyToExpression(Expression)
152+
*/
153+
public S replyToExpression(String replyToExpression) {
154+
return replyToExpression(PARSER.parseExpression(replyToExpression));
155+
}
156+
157+
/**
158+
* Set a function to resolve a 'replyTo' destination from a request {@link jakarta.jms.Message}
159+
* as a root evaluation object if {@link jakarta.jms.Message#getJMSReplyTo()} is null.
160+
* @param replyToFunction the function for 'replyTo' destination.
161+
* @return the spec.
162+
* @since 6.1
163+
* @see ChannelPublishingJmsMessageListener#setReplyToExpression(Expression)
164+
*/
165+
public S replyToFunction(CheckedFunction<Message, ?> replyToFunction) {
166+
return replyToExpression(new FunctionExpression<>(replyToFunction.unchecked()));
167+
}
168+
169+
/**
170+
* Set a SpEL expression to resolve a 'replyTo' destination from a request {@link jakarta.jms.Message}
171+
* as a root evaluation object if {@link jakarta.jms.Message#getJMSReplyTo()} is null.
172+
* @param replyToExpression the SpEL expression for 'replyTo' destination.
173+
* @return the spec.
174+
* @since 6.1
175+
* @see ChannelPublishingJmsMessageListener#setReplyToExpression(Expression)
176+
*/
177+
public S replyToExpression(Expression replyToExpression) {
178+
this.target.getListener().setReplyToExpression(replyToExpression);
179+
return _this();
180+
}
181+
140182
/**
141183
* @param messageConverter the messageConverter.
142184
* @return the spec.

spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323
import java.util.concurrent.atomic.AtomicInteger;
2424

25+
import jakarta.jms.JMSException;
26+
import jakarta.jms.TextMessage;
2527
import org.junit.jupiter.api.Test;
2628

2729
import org.springframework.beans.factory.ListableBeanFactory;
@@ -137,6 +139,9 @@ public class JmsTests extends ActiveMQMultiContextTests {
137139
@Autowired
138140
private CountDownLatch redeliveryLatch;
139141

142+
@Autowired
143+
JmsTemplate jmsTemplate;
144+
140145
@Test
141146
public void testPollingFlow() {
142147
this.controlBus.send("@'integerMessageSource.inboundChannelAdapter'.start()");
@@ -264,6 +269,19 @@ public void testJmsRedeliveryFlow() throws InterruptedException {
264269
this.jmsMessageDrivenRedeliveryFlowContainer.stop();
265270
}
266271

272+
@Test
273+
public void customReplyToHeader() throws JMSException {
274+
this.jmsTemplate.send("jmsPipelineTest", session -> {
275+
TextMessage message = session.createTextMessage("test data");
276+
message.setStringProperty("myReplyTo", "replyToQueue");
277+
return message;
278+
});
279+
280+
jakarta.jms.Message replyMessage = this.jmsTemplate.receive("replyToQueue");
281+
assertThat(replyMessage).isNotNull();
282+
assertThat(replyMessage.getBody(String.class)).isEqualTo("TEST DATA");
283+
}
284+
267285
@MessagingGateway(defaultRequestChannel = "controlBus.input")
268286
private interface ControlBusGateway {
269287

@@ -278,7 +296,9 @@ public static class ContextConfiguration {
278296

279297
@Bean
280298
public JmsTemplate jmsTemplate() {
281-
return new JmsTemplate(connectionFactory);
299+
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
300+
jmsTemplate.setReceiveTimeout(10_000);
301+
return jmsTemplate;
282302
}
283303

284304
@Bean(name = PollerMetadata.DEFAULT_POLLER)
@@ -422,6 +442,7 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
422442

423443
}))
424444
.requestDestination("jmsPipelineTest")
445+
.replyToFunction(message -> message.getStringProperty("myReplyTo"))
425446
.configureListenerContainer(c ->
426447
c.transactionManager(mock(PlatformTransactionManager.class))))
427448
.filter(payload -> !"junk".equals(payload))

src/reference/asciidoc/jms.adoc

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,11 +411,32 @@ Starting with version 5.1, when the endpoint is stopped while the application re
411411
Previously, the connection and consumers remained open.
412412
To revert to the previous behavior, set the `shutdownContainerOnStop` on the `JmsInboundGateway` to `false`.
413413

414+
By default, the `JmsInboundGateway` looks for a `jakarta.jms.Message.getJMSReplyTo()` property in the received message to determine where to send a reply.
415+
Otherwise, it can be configured with a static `defaultReplyDestination`, or `defaultReplyQueueName` or `defaultReplyTopicName`.
416+
In addition, starting with version 6.1, a `replyToExpression` can be configured on a provided `ChannelPublishingJmsMessageListener` to determine the reply destination dynamically, if the standard `JMSReplyTo` property is `null` on the request.
417+
The received `jakarta.jms.Message` is used the root evaluation context object.
418+
The following example demonstrates how to use Java DSL API to configure an inbound JMS gateway with a custom reply destination resolved from the request message:
419+
420+
====
421+
[source,java]
422+
----
423+
@Bean
424+
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
425+
return IntegrationFlow.from(
426+
Jms.inboundGateway(connectionFactory)
427+
.requestDestination("requestDestination")
428+
.replyToFunction(message -> message.getStringProperty("myReplyTo")))
429+
.<String, String>transform(String::toUpperCase)
430+
.get();
431+
}
432+
----
433+
====
434+
414435
[[jms-outbound-gateway]]
415436
=== Outbound Gateway
416437

417-
The outbound gateway creates JMS messages from Spring Integration messages and sends them to a 'request-destination'.
418-
It then handles the JMS reply message either by using a selector to receive from the 'reply-destination' that you configure or, if no 'reply-destination' is provided, by creating JMS `TemporaryQueue` (or `TemporaryTopic` if `replyPubSubDomain= true`) instances.
438+
The outbound gateway creates JMS messages from Spring Integration messages and sends them to a `request-destination`.
439+
It then handles the JMS reply message either by using a selector to receive from the `reply-destination` that you configure or, if no `reply-destination` is provided, by creating JMS `TemporaryQueue` (or `TemporaryTopic` if `replyPubSubDomain= true`) instances.
419440

420441
[[jms-outbound-gateway-memory-caution]]
421442
[CAUTION]

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,14 @@ See <<./zip.adoc#zip,Zip Support>> for more information.
2727
[[x6.1-general]]
2828
=== General Changes
2929

30-
3130
[[x6.1-web-sockets]]
3231
=== Web Sockets Changes
3332

3433
A `ClientWebSocketContainer` can now be configured with a predefined `URI` instead of a combination of `uriTemplate` and `uriVariables`.
3534
See <<./web-sockets.adoc#web-socket-overview, WebSocket Overview>> for more information.
35+
36+
[[x6.1-jms]]
37+
=== JMS Changes
38+
39+
The `JmsInboundGateway`, via its `ChannelPublishingJmsMessageListener`, can now be configured with a `replyToExpression` to resolve a reply destination against the request message at runtime.
40+
See <<./jms.adoc#jms-inbound-gateway, JMS Inbound Gateway>> for more information.

0 commit comments

Comments
 (0)