Skip to content

Commit 29b3fef

Browse files
committed
Add selector type support for sendAndReceive methods in JmsOperations and JmsTemplate
Signed-off-by: Michał Pisarski <pisekfm@o2.pl>
1 parent b164db3 commit 29b3fef

File tree

3 files changed

+204
-8
lines changed

3 files changed

+204
-8
lines changed

spring-jms/src/main/java/org/springframework/jms/core/JmsOperations.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor
407407
* <p>The {@link MessageCreator} callback creates the message given a Session,
408408
* and the specified {@code responseQueue} is set in the {@code JMSReplyTO}
409409
* header of the message.
410+
* <p>A default {@link SelectorType#CORRELATION_ID} is used to correlate request and reply messages.
410411
* @param destination the destination to send the message to
411412
* @param responseQueue the destination to receive the reply from
412413
* @param messageCreator callback to create a message
@@ -418,12 +419,31 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor
418419
@Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator)
419420
throws JmsException;
420421

422+
/**
423+
* Send a message to the specified destination and receive the reply from the
424+
* specified response queue.
425+
* <p>The {@link MessageCreator} callback creates the message given a Session,
426+
* and the specified {@code responseQueue} is set in the {@code JMSReplyTO}
427+
* header of the message.
428+
* @param destination the destination to send the message to
429+
* @param responseQueue the destination to receive the reply from
430+
* @param messageCreator callback to create a message
431+
* @param selectorType the {@link SelectorType} to use for correlating request and reply messages
432+
* @return the reply, possibly {@code null} if the message could not be received,
433+
* for example due to a timeout
434+
* @throws JmsException checked JMSException converted to unchecked
435+
* @since 7.0.4
436+
*/
437+
@Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType)
438+
throws JmsException;
439+
421440
/**
422441
* Send a message to the specified destination and receive the reply from the
423442
* specified response queue.
424443
* <p>The {@link MessageCreator} callback creates the message given a Session,
425444
* and the destination with the specified {@code responseQueueName} is set in
426445
* the {@code JMSReplyTO} header of the message.
446+
* <p>A default {@link SelectorType#CORRELATION_ID} is used to correlate request and reply messages.
427447
* @param destinationName the name of the destination to send the message to
428448
* (to be resolved to an actual destination by a DestinationResolver)
429449
* @param responseQueueName the name of the destination to receive the reply from
@@ -437,6 +457,42 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor
437457
@Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator)
438458
throws JmsException;
439459

460+
/**
461+
* Send a message to the specified destination and receive the reply from the
462+
* specified response queue.
463+
* <p>The {@link MessageCreator} callback creates the message given a Session,
464+
* and the destination with the specified {@code responseQueueName} is set in
465+
* the {@code JMSReplyTO} header of the message.
466+
* @param destinationName the name of the destination to send the message to
467+
* (to be resolved to an actual destination by a DestinationResolver)
468+
* @param responseQueueName the name of the destination to receive the reply from
469+
* (to be resolved to an actual destination by a DestinationResolver)
470+
* @param messageCreator callback to create a message
471+
* @param selectorType the {@link SelectorType} to use for correlating request and reply messages
472+
* @return the reply, possibly {@code null} if the message could not be received,
473+
* for example due to a timeout
474+
* @throws JmsException checked JMSException converted to unchecked
475+
* @since 7.0.4
476+
*/
477+
@Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator, SelectorType selectorType) throws JmsException;
478+
479+
/**
480+
* Enumeration of supported selector types.
481+
*/
482+
enum SelectorType {
483+
/**
484+
* Use generated JMSCorrelationID to correlate request and reply messages.
485+
*/
486+
CORRELATION_ID,
487+
/**
488+
* Use JMSMessageID of the request message to correlate request and reply messages.
489+
*/
490+
MESSAGE_ID,
491+
/**
492+
* Do not use any selector to correlate request and reply messages.
493+
*/
494+
NONE
495+
}
440496

441497
//---------------------------------------------------------------------------------------
442498
// Convenience methods for browsing messages

spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import org.springframework.util.Assert;
4646
import org.springframework.util.ClassUtils;
4747

48+
import java.util.UUID;
49+
4850
/**
4951
* Helper class that simplifies synchronous JMS access code.
5052
*
@@ -900,7 +902,12 @@ else if (isClientAcknowledge(session)) {
900902

901903
@Override
902904
public @Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator) throws JmsException {
903-
return executeLocal(session -> doSendAndReceive(session, destination, responseQueue, messageCreator), true);
905+
return sendAndReceive(destination, responseQueue, messageCreator, SelectorType.CORRELATION_ID);
906+
}
907+
908+
@Override
909+
public @Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType) throws JmsException {
910+
return executeLocal(session -> doSendAndReceive(session, destination, responseQueue, messageCreator, selectorType), true);
904911
}
905912

906913
@Override
@@ -913,10 +920,15 @@ else if (isClientAcknowledge(session)) {
913920

914921
@Override
915922
public @Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator) throws JmsException {
923+
return sendAndReceive(destinationName, responseQueueName, messageCreator, SelectorType.CORRELATION_ID);
924+
}
925+
926+
@Override
927+
public @Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator, SelectorType selectorType) throws JmsException {
916928
return executeLocal(session -> {
917929
Destination destination = resolveDestinationName(session, destinationName);
918930
Destination responseQueue = resolveDestinationName(session, responseQueueName);
919-
return doSendAndReceive(session, destination, responseQueue, messageCreator);
931+
return doSendAndReceive(session, destination, responseQueue, messageCreator, selectorType);
920932
}, true);
921933
}
922934

@@ -932,7 +944,7 @@ else if (isClientAcknowledge(session)) {
932944
TemporaryQueue responseQueue = null;
933945
try {
934946
responseQueue = session.createTemporaryQueue();
935-
return doSendAndReceive(session, destination, responseQueue, messageCreator);
947+
return doSendAndReceive(session, destination, responseQueue, messageCreator, SelectorType.NONE);
936948
}
937949
finally {
938950
if (responseQueue != null) {
@@ -944,24 +956,39 @@ else if (isClientAcknowledge(session)) {
944956
/**
945957
* Send a request message to the given {@link Destination destination} and block until
946958
* a reply has been received on the specified {@link Destination responseQueue}.
959+
* <p>Use {@link SelectorType selectorType} to specify how the response message is correlated
960+
* to the request message.
947961
* <p>Return the response message or {@code null} if no message has been received.
948962
* @throws JMSException if thrown by JMS API methods
949963
*/
950-
protected @Nullable Message doSendAndReceive(Session session, Destination destination, Destination responseQueue, MessageCreator messageCreator)
964+
protected @Nullable Message doSendAndReceive(Session session, Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType)
951965
throws JMSException {
952-
953966
Assert.notNull(messageCreator, "MessageCreator must not be null");
954967
MessageProducer producer = null;
955968
MessageConsumer consumer = null;
956969
try {
957970
Message requestMessage = messageCreator.createMessage(session);
958971
producer = session.createProducer(destination);
959-
consumer = session.createConsumer(responseQueue);
972+
973+
String messageSelector = null;
974+
if (selectorType == SelectorType.CORRELATION_ID) {
975+
String correlationId = UUID.randomUUID().toString();
976+
requestMessage.setJMSCorrelationID(correlationId);
977+
messageSelector = "JMSCorrelationID = '" + correlationId + "'";
978+
}
979+
960980
requestMessage.setJMSReplyTo(responseQueue);
961981
if (logger.isDebugEnabled()) {
962982
logger.debug("Sending created message: " + requestMessage);
963983
}
964984
doSend(producer, requestMessage);
985+
986+
if (selectorType == SelectorType.MESSAGE_ID) {
987+
String messageId = requestMessage.getJMSMessageID();
988+
messageSelector = "JMSCorrelationID = '" + messageId + "'";
989+
}
990+
991+
consumer = session.createConsumer(responseQueue, messageSelector);
965992
return receiveFromConsumer(consumer, getReceiveTimeout());
966993
}
967994
finally {

spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateTests.java

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.junit.jupiter.api.BeforeEach;
3737
import org.junit.jupiter.api.Test;
3838

39+
import org.mockito.ArgumentCaptor;
3940
import org.springframework.jms.InvalidClientIDException;
4041
import org.springframework.jms.InvalidDestinationException;
4142
import org.springframework.jms.InvalidSelectorException;
@@ -62,6 +63,8 @@
6263

6364
import static org.assertj.core.api.Assertions.assertThat;
6465
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
66+
import static org.mockito.ArgumentMatchers.anyString;
67+
import static org.mockito.ArgumentMatchers.eq;
6568
import static org.mockito.BDDMockito.given;
6669
import static org.mockito.BDDMockito.willThrow;
6770
import static org.mockito.Mockito.mock;
@@ -648,7 +651,7 @@ private void doTestSendAndReceive(boolean explicitDestination, boolean useDefaul
648651
given(localSession.createTemporaryQueue()).willReturn(replyDestination);
649652

650653
MessageConsumer messageConsumer = mock();
651-
given(localSession.createConsumer(replyDestination)).willReturn(messageConsumer);
654+
given(localSession.createConsumer(replyDestination, null)).willReturn(messageConsumer);
652655

653656

654657
TextMessage request = mock();
@@ -715,7 +718,8 @@ private void doTestSendAndReceiveWithResponseQueue(boolean explicitDestination,
715718
given(localSession.createProducer(this.queue)).willReturn(messageProducer);
716719

717720
MessageConsumer messageConsumer = mock();
718-
given(localSession.createConsumer(responseQueue)).willReturn(messageConsumer);
721+
// Default sendAndReceive with responseQueue uses CORRELATION_ID selector
722+
given(localSession.createConsumer(eq(responseQueue), anyString())).willReturn(messageConsumer);
719723

720724
TextMessage request = mock();
721725
MessageCreator messageCreator = mock();
@@ -742,6 +746,8 @@ else if (timeout == JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT) {
742746

743747
// replyTO set on the request
744748
verify(request).setJMSReplyTo(responseQueue);
749+
// Correlation ID set on request for selector
750+
verify(request).setJMSCorrelationID(anyString());
745751
assertThat(message).as("Reply message not received").isSameAs(reply);
746752
verify(this.connection).start();
747753
verify(this.connection).close();
@@ -750,6 +756,113 @@ else if (timeout == JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT) {
750756
verify(messageProducer).close();
751757
}
752758

759+
@Test
760+
void testSendAndReceiveDestinationWithResponseQueueAndCorrelationIdSelector() throws Exception {
761+
doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.CORRELATION_ID);
762+
}
763+
764+
@Test
765+
void testSendAndReceiveDestinationNameWithResponseQueueNameAndCorrelationIdSelector() throws Exception {
766+
doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.CORRELATION_ID);
767+
}
768+
769+
@Test
770+
void testSendAndReceiveDestinationWithResponseQueueAndMessageIdSelector() throws Exception {
771+
doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.MESSAGE_ID);
772+
}
773+
774+
@Test
775+
void testSendAndReceiveDestinationNameWithResponseQueueNameAndMessageIdSelector() throws Exception {
776+
doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.MESSAGE_ID);
777+
}
778+
779+
@Test
780+
void testSendAndReceiveDestinationWithResponseQueueAndNoSelector() throws Exception {
781+
doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.NONE);
782+
}
783+
784+
@Test
785+
void testSendAndReceiveDestinationNameWithResponseQueueNameAndNoSelector() throws Exception {
786+
doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.NONE);
787+
}
788+
789+
private void doTestSendAndReceiveWithResponseQueueAndSelectorType(boolean explicitDestination, long timeout,
790+
JmsOperations.SelectorType selectorType) throws Exception {
791+
792+
JmsTemplate template = createTemplate();
793+
template.setConnectionFactory(this.connectionFactory);
794+
template.setReceiveTimeout(timeout);
795+
796+
String destinationName = "testDestination";
797+
String responseQueueName = "responseQueue";
798+
799+
Queue responseQueue = mock();
800+
given(this.jndiContext.lookup(responseQueueName)).willReturn(responseQueue);
801+
802+
Session localSession = getLocalSession();
803+
MessageProducer messageProducer = mock();
804+
given(localSession.createProducer(this.queue)).willReturn(messageProducer);
805+
806+
MessageConsumer messageConsumer = mock();
807+
if (selectorType == JmsOperations.SelectorType.NONE) {
808+
given(localSession.createConsumer(responseQueue, null)).willReturn(messageConsumer);
809+
}
810+
else {
811+
given(localSession.createConsumer(eq(responseQueue), anyString()))
812+
.willReturn(messageConsumer);
813+
}
814+
815+
TextMessage request = mock();
816+
MessageCreator messageCreator = mock();
817+
given(messageCreator.createMessage(localSession)).willReturn(request);
818+
819+
if (selectorType == JmsOperations.SelectorType.MESSAGE_ID) {
820+
given(request.getJMSMessageID()).willReturn("ID:test-message-id-12345");
821+
}
822+
823+
TextMessage reply = mock();
824+
if (timeout == JmsTemplate.RECEIVE_TIMEOUT_NO_WAIT) {
825+
given(messageConsumer.receiveNoWait()).willReturn(reply);
826+
}
827+
else if (timeout == JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT) {
828+
given(messageConsumer.receive()).willReturn(reply);
829+
}
830+
else {
831+
given(messageConsumer.receive(timeout)).willReturn(reply);
832+
}
833+
834+
Message message;
835+
if (explicitDestination) {
836+
message = template.sendAndReceive(this.queue, responseQueue, messageCreator, selectorType);
837+
}
838+
else {
839+
message = template.sendAndReceive(destinationName, responseQueueName, messageCreator, selectorType);
840+
}
841+
842+
// replyTO set on the request
843+
verify(request).setJMSReplyTo(responseQueue);
844+
assertThat(message).as("Reply message not received").isSameAs(reply);
845+
verify(this.connection).start();
846+
verify(this.connection).close();
847+
verify(localSession).close();
848+
verify(messageConsumer).close();
849+
verify(messageProducer).close();
850+
851+
// Verify selector-specific behavior
852+
if (selectorType == JmsOperations.SelectorType.CORRELATION_ID) {
853+
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
854+
verify(request).setJMSCorrelationID(captor.capture());
855+
verify(localSession).createConsumer(eq(responseQueue), eq("JMSCorrelationID = '" + captor.getValue() + "'"));
856+
}
857+
else if (selectorType == JmsOperations.SelectorType.MESSAGE_ID) {
858+
verify(request).getJMSMessageID();
859+
verify(localSession).createConsumer(eq(responseQueue), eq("JMSCorrelationID = 'ID:test-message-id-12345'"));
860+
}
861+
else {
862+
verify(localSession).createConsumer(responseQueue, null);
863+
}
864+
}
865+
753866
@Test
754867
void testIllegalStateException() throws Exception {
755868
doTestJmsException(new jakarta.jms.IllegalStateException(""), org.springframework.jms.IllegalStateException.class);

0 commit comments

Comments
 (0)