Skip to content

Commit 0cea4ed

Browse files
committed
Add selector type support for sendAndReceive methods in JmsOperations and JmsTemplate
Signed-off-by: Michał Pisarski <[email protected]>
1 parent b164db3 commit 0cea4ed

File tree

3 files changed

+202
-8
lines changed

3 files changed

+202
-8
lines changed

spring-jms/src/main/java/org/springframework/jms/core/JmsOperations.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor
407407
* <p>The {@link MessageCreator} callback creates the message given a Session,
408408
* and the specified {@code responseQueue} is set in the {@code JMSReplyTO}
409409
* header of the message.
410+
* <p>A default {@link SelectorType#MESSAGE_ID} is used to correlate request and reply messages.
410411
* @param destination the destination to send the message to
411412
* @param responseQueue the destination to receive the reply from
412413
* @param messageCreator callback to create a message
@@ -418,12 +419,31 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor
418419
@Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator)
419420
throws JmsException;
420421

422+
/**
423+
* Send a message to the specified destination and receive the reply from the
424+
* specified response queue.
425+
* <p>The {@link MessageCreator} callback creates the message given a Session,
426+
* and the specified {@code responseQueue} is set in the {@code JMSReplyTO}
427+
* header of the message.
428+
* @param destination the destination to send the message to
429+
* @param responseQueue the destination to receive the reply from
430+
* @param messageCreator callback to create a message
431+
* @param selectorType the {@link SelectorType} to use for correlating request and reply messages
432+
* @return the reply, possibly {@code null} if the message could not be received,
433+
* for example due to a timeout
434+
* @throws JmsException checked JMSException converted to unchecked
435+
* @since 7.0.4
436+
*/
437+
@Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType)
438+
throws JmsException;
439+
421440
/**
422441
* Send a message to the specified destination and receive the reply from the
423442
* specified response queue.
424443
* <p>The {@link MessageCreator} callback creates the message given a Session,
425444
* and the destination with the specified {@code responseQueueName} is set in
426445
* the {@code JMSReplyTO} header of the message.
446+
* <p>A default {@link SelectorType#MESSAGE_ID} is used to correlate request and reply messages.
427447
* @param destinationName the name of the destination to send the message to
428448
* (to be resolved to an actual destination by a DestinationResolver)
429449
* @param responseQueueName the name of the destination to receive the reply from
@@ -437,6 +457,42 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor
437457
@Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator)
438458
throws JmsException;
439459

460+
/**
461+
* Send a message to the specified destination and receive the reply from the
462+
* specified response queue.
463+
* <p>The {@link MessageCreator} callback creates the message given a Session,
464+
* and the destination with the specified {@code responseQueueName} is set in
465+
* the {@code JMSReplyTO} header of the message.
466+
* @param destinationName the name of the destination to send the message to
467+
* (to be resolved to an actual destination by a DestinationResolver)
468+
* @param responseQueueName the name of the destination to receive the reply from
469+
* (to be resolved to an actual destination by a DestinationResolver)
470+
* @param messageCreator callback to create a message
471+
* @param selectorType the {@link SelectorType} to use for correlating request and reply messages
472+
* @return the reply, possibly {@code null} if the message could not be received,
473+
* for example due to a timeout
474+
* @throws JmsException checked JMSException converted to unchecked
475+
* @since 7.0.4
476+
*/
477+
@Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator, SelectorType selectorType) throws JmsException;
478+
479+
/**
480+
* Enumeration of supported selector types.
481+
*/
482+
enum SelectorType {
483+
/**
484+
* Use generated JMSCorrelationID to correlate request and reply messages.
485+
*/
486+
CORRELATION_ID,
487+
/**
488+
* Use JMSMessageID of the request message to correlate request and reply messages.
489+
*/
490+
MESSAGE_ID,
491+
/**
492+
* Do not use any selector to correlate request and reply messages.
493+
*/
494+
NONE
495+
}
440496

441497
//---------------------------------------------------------------------------------------
442498
// Convenience methods for browsing messages

spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.jms.core;
1818

19+
import java.util.UUID;
20+
1921
import io.micrometer.jakarta9.instrument.jms.JmsInstrumentation;
2022
import io.micrometer.observation.ObservationRegistry;
2123
import jakarta.jms.Connection;
@@ -900,7 +902,12 @@ else if (isClientAcknowledge(session)) {
900902

901903
@Override
902904
public @Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator) throws JmsException {
903-
return executeLocal(session -> doSendAndReceive(session, destination, responseQueue, messageCreator), true);
905+
return sendAndReceive(destination, responseQueue, messageCreator, SelectorType.MESSAGE_ID);
906+
}
907+
908+
@Override
909+
public @Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType) throws JmsException {
910+
return executeLocal(session -> doSendAndReceive(session, destination, responseQueue, messageCreator, selectorType), true);
904911
}
905912

906913
@Override
@@ -913,10 +920,15 @@ else if (isClientAcknowledge(session)) {
913920

914921
@Override
915922
public @Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator) throws JmsException {
923+
return sendAndReceive(destinationName, responseQueueName, messageCreator, SelectorType.MESSAGE_ID);
924+
}
925+
926+
@Override
927+
public @Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator, SelectorType selectorType) throws JmsException {
916928
return executeLocal(session -> {
917929
Destination destination = resolveDestinationName(session, destinationName);
918930
Destination responseQueue = resolveDestinationName(session, responseQueueName);
919-
return doSendAndReceive(session, destination, responseQueue, messageCreator);
931+
return doSendAndReceive(session, destination, responseQueue, messageCreator, selectorType);
920932
}, true);
921933
}
922934

@@ -932,7 +944,7 @@ else if (isClientAcknowledge(session)) {
932944
TemporaryQueue responseQueue = null;
933945
try {
934946
responseQueue = session.createTemporaryQueue();
935-
return doSendAndReceive(session, destination, responseQueue, messageCreator);
947+
return doSendAndReceive(session, destination, responseQueue, messageCreator, SelectorType.NONE);
936948
}
937949
finally {
938950
if (responseQueue != null) {
@@ -944,24 +956,39 @@ else if (isClientAcknowledge(session)) {
944956
/**
945957
* Send a request message to the given {@link Destination destination} and block until
946958
* a reply has been received on the specified {@link Destination responseQueue}.
959+
* <p>Use {@link SelectorType selectorType} to specify how the response message is correlated
960+
* to the request message.
947961
* <p>Return the response message or {@code null} if no message has been received.
948962
* @throws JMSException if thrown by JMS API methods
949963
*/
950-
protected @Nullable Message doSendAndReceive(Session session, Destination destination, Destination responseQueue, MessageCreator messageCreator)
964+
protected @Nullable Message doSendAndReceive(Session session, Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType)
951965
throws JMSException {
952-
953966
Assert.notNull(messageCreator, "MessageCreator must not be null");
954967
MessageProducer producer = null;
955968
MessageConsumer consumer = null;
956969
try {
957970
Message requestMessage = messageCreator.createMessage(session);
958971
producer = session.createProducer(destination);
959-
consumer = session.createConsumer(responseQueue);
972+
973+
String messageSelector = null;
974+
if (selectorType == SelectorType.CORRELATION_ID) {
975+
String correlationId = UUID.randomUUID().toString();
976+
requestMessage.setJMSCorrelationID(correlationId);
977+
messageSelector = "JMSCorrelationID = '" + correlationId + "'";
978+
}
979+
960980
requestMessage.setJMSReplyTo(responseQueue);
961981
if (logger.isDebugEnabled()) {
962982
logger.debug("Sending created message: " + requestMessage);
963983
}
964984
doSend(producer, requestMessage);
985+
986+
if (selectorType == SelectorType.MESSAGE_ID) {
987+
String messageId = requestMessage.getJMSMessageID();
988+
messageSelector = "JMSCorrelationID = '" + messageId + "'";
989+
}
990+
991+
consumer = session.createConsumer(responseQueue, messageSelector);
965992
return receiveFromConsumer(consumer, getReceiveTimeout());
966993
}
967994
finally {

spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateTests.java

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import jakarta.jms.TextMessage;
3636
import org.junit.jupiter.api.BeforeEach;
3737
import org.junit.jupiter.api.Test;
38+
import org.mockito.ArgumentCaptor;
3839

3940
import org.springframework.jms.InvalidClientIDException;
4041
import org.springframework.jms.InvalidDestinationException;
@@ -62,6 +63,8 @@
6263

6364
import static org.assertj.core.api.Assertions.assertThat;
6465
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
66+
import static org.mockito.ArgumentMatchers.anyString;
67+
import static org.mockito.ArgumentMatchers.eq;
6568
import static org.mockito.BDDMockito.given;
6669
import static org.mockito.BDDMockito.willThrow;
6770
import static org.mockito.Mockito.mock;
@@ -648,7 +651,7 @@ private void doTestSendAndReceive(boolean explicitDestination, boolean useDefaul
648651
given(localSession.createTemporaryQueue()).willReturn(replyDestination);
649652

650653
MessageConsumer messageConsumer = mock();
651-
given(localSession.createConsumer(replyDestination)).willReturn(messageConsumer);
654+
given(localSession.createConsumer(replyDestination, null)).willReturn(messageConsumer);
652655

653656

654657
TextMessage request = mock();
@@ -715,7 +718,8 @@ private void doTestSendAndReceiveWithResponseQueue(boolean explicitDestination,
715718
given(localSession.createProducer(this.queue)).willReturn(messageProducer);
716719

717720
MessageConsumer messageConsumer = mock();
718-
given(localSession.createConsumer(responseQueue)).willReturn(messageConsumer);
721+
// Default sendAndReceive with responseQueue uses MESSAGE_ID selector
722+
given(localSession.createConsumer(eq(responseQueue), anyString())).willReturn(messageConsumer);
719723

720724
TextMessage request = mock();
721725
MessageCreator messageCreator = mock();
@@ -750,6 +754,113 @@ else if (timeout == JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT) {
750754
verify(messageProducer).close();
751755
}
752756

757+
@Test
758+
void testSendAndReceiveDestinationWithResponseQueueAndCorrelationIdSelector() throws Exception {
759+
doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.CORRELATION_ID);
760+
}
761+
762+
@Test
763+
void testSendAndReceiveDestinationNameWithResponseQueueNameAndCorrelationIdSelector() throws Exception {
764+
doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.CORRELATION_ID);
765+
}
766+
767+
@Test
768+
void testSendAndReceiveDestinationWithResponseQueueAndMessageIdSelector() throws Exception {
769+
doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.MESSAGE_ID);
770+
}
771+
772+
@Test
773+
void testSendAndReceiveDestinationNameWithResponseQueueNameAndMessageIdSelector() throws Exception {
774+
doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.MESSAGE_ID);
775+
}
776+
777+
@Test
778+
void testSendAndReceiveDestinationWithResponseQueueAndNoSelector() throws Exception {
779+
doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.NONE);
780+
}
781+
782+
@Test
783+
void testSendAndReceiveDestinationNameWithResponseQueueNameAndNoSelector() throws Exception {
784+
doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.NONE);
785+
}
786+
787+
private void doTestSendAndReceiveWithResponseQueueAndSelectorType(boolean explicitDestination, long timeout,
788+
JmsOperations.SelectorType selectorType) throws Exception {
789+
790+
JmsTemplate template = createTemplate();
791+
template.setConnectionFactory(this.connectionFactory);
792+
template.setReceiveTimeout(timeout);
793+
794+
String destinationName = "testDestination";
795+
String responseQueueName = "responseQueue";
796+
797+
Queue responseQueue = mock();
798+
given(this.jndiContext.lookup(responseQueueName)).willReturn(responseQueue);
799+
800+
Session localSession = getLocalSession();
801+
MessageProducer messageProducer = mock();
802+
given(localSession.createProducer(this.queue)).willReturn(messageProducer);
803+
804+
MessageConsumer messageConsumer = mock();
805+
if (selectorType == JmsOperations.SelectorType.NONE) {
806+
given(localSession.createConsumer(responseQueue, null)).willReturn(messageConsumer);
807+
}
808+
else {
809+
given(localSession.createConsumer(eq(responseQueue), anyString()))
810+
.willReturn(messageConsumer);
811+
}
812+
813+
TextMessage request = mock();
814+
MessageCreator messageCreator = mock();
815+
given(messageCreator.createMessage(localSession)).willReturn(request);
816+
817+
if (selectorType == JmsOperations.SelectorType.MESSAGE_ID) {
818+
given(request.getJMSMessageID()).willReturn("ID:test-message-id-12345");
819+
}
820+
821+
TextMessage reply = mock();
822+
if (timeout == JmsTemplate.RECEIVE_TIMEOUT_NO_WAIT) {
823+
given(messageConsumer.receiveNoWait()).willReturn(reply);
824+
}
825+
else if (timeout == JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT) {
826+
given(messageConsumer.receive()).willReturn(reply);
827+
}
828+
else {
829+
given(messageConsumer.receive(timeout)).willReturn(reply);
830+
}
831+
832+
Message message;
833+
if (explicitDestination) {
834+
message = template.sendAndReceive(this.queue, responseQueue, messageCreator, selectorType);
835+
}
836+
else {
837+
message = template.sendAndReceive(destinationName, responseQueueName, messageCreator, selectorType);
838+
}
839+
840+
// replyTO set on the request
841+
verify(request).setJMSReplyTo(responseQueue);
842+
assertThat(message).as("Reply message not received").isSameAs(reply);
843+
verify(this.connection).start();
844+
verify(this.connection).close();
845+
verify(localSession).close();
846+
verify(messageConsumer).close();
847+
verify(messageProducer).close();
848+
849+
// Verify selector-specific behavior
850+
if (selectorType == JmsOperations.SelectorType.CORRELATION_ID) {
851+
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
852+
verify(request).setJMSCorrelationID(captor.capture());
853+
verify(localSession).createConsumer(eq(responseQueue), eq("JMSCorrelationID = '" + captor.getValue() + "'"));
854+
}
855+
else if (selectorType == JmsOperations.SelectorType.MESSAGE_ID) {
856+
verify(request).getJMSMessageID();
857+
verify(localSession).createConsumer(eq(responseQueue), eq("JMSCorrelationID = 'ID:test-message-id-12345'"));
858+
}
859+
else {
860+
verify(localSession).createConsumer(responseQueue, null);
861+
}
862+
}
863+
753864
@Test
754865
void testIllegalStateException() throws Exception {
755866
doTestJmsException(new jakarta.jms.IllegalStateException(""), org.springframework.jms.IllegalStateException.class);

0 commit comments

Comments
 (0)