diff --git a/spring-jms/src/main/java/org/springframework/jms/core/JmsOperations.java b/spring-jms/src/main/java/org/springframework/jms/core/JmsOperations.java index 3561bab0183..d3243f8ffdd 100644 --- a/spring-jms/src/main/java/org/springframework/jms/core/JmsOperations.java +++ b/spring-jms/src/main/java/org/springframework/jms/core/JmsOperations.java @@ -407,6 +407,7 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor *

The {@link MessageCreator} callback creates the message given a Session, * and the specified {@code responseQueue} is set in the {@code JMSReplyTO} * header of the message. + *

A default {@link SelectorType#MESSAGE_ID} is used to correlate request and reply messages. * @param destination the destination to send the message to * @param responseQueue the destination to receive the reply from * @param messageCreator callback to create a message @@ -418,12 +419,31 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor @Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator) throws JmsException; + /** + * Send a message to the specified destination and receive the reply from the + * specified response queue. + *

The {@link MessageCreator} callback creates the message given a Session, + * and the specified {@code responseQueue} is set in the {@code JMSReplyTO} + * header of the message. + * @param destination the destination to send the message to + * @param responseQueue the destination to receive the reply from + * @param messageCreator callback to create a message + * @param selectorType the {@link SelectorType} to use for correlating request and reply messages + * @return the reply, possibly {@code null} if the message could not be received, + * for example due to a timeout + * @throws JmsException checked JMSException converted to unchecked + * @since 7.0.4 + */ + @Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType) + throws JmsException; + /** * Send a message to the specified destination and receive the reply from the * specified response queue. *

The {@link MessageCreator} callback creates the message given a Session, * and the destination with the specified {@code responseQueueName} is set in * the {@code JMSReplyTO} header of the message. + *

A default {@link SelectorType#MESSAGE_ID} is used to correlate request and reply messages. * @param destinationName the name of the destination to send the message to * (to be resolved to an actual destination by a DestinationResolver) * @param responseQueueName the name of the destination to receive the reply from @@ -437,6 +457,42 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor @Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator) throws JmsException; + /** + * Send a message to the specified destination and receive the reply from the + * specified response queue. + *

The {@link MessageCreator} callback creates the message given a Session, + * and the destination with the specified {@code responseQueueName} is set in + * the {@code JMSReplyTO} header of the message. + * @param destinationName the name of the destination to send the message to + * (to be resolved to an actual destination by a DestinationResolver) + * @param responseQueueName the name of the destination to receive the reply from + * (to be resolved to an actual destination by a DestinationResolver) + * @param messageCreator callback to create a message + * @param selectorType the {@link SelectorType} to use for correlating request and reply messages + * @return the reply, possibly {@code null} if the message could not be received, + * for example due to a timeout + * @throws JmsException checked JMSException converted to unchecked + * @since 7.0.4 + */ + @Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator, SelectorType selectorType) throws JmsException; + + /** + * Enumeration of supported selector types. + */ + enum SelectorType { + /** + * Use generated JMSCorrelationID to correlate request and reply messages. + */ + CORRELATION_ID, + /** + * Use JMSMessageID of the request message to correlate request and reply messages. + */ + MESSAGE_ID, + /** + * Do not use any selector to correlate request and reply messages. + */ + NONE + } //--------------------------------------------------------------------------------------- // Convenience methods for browsing messages diff --git a/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java b/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java index 63e9d18df04..78a9b34b9bc 100644 --- a/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java +++ b/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java @@ -16,6 +16,8 @@ package org.springframework.jms.core; +import java.util.UUID; + import io.micrometer.jakarta9.instrument.jms.JmsInstrumentation; import io.micrometer.observation.ObservationRegistry; import jakarta.jms.Connection; @@ -900,7 +902,12 @@ else if (isClientAcknowledge(session)) { @Override public @Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator) throws JmsException { - return executeLocal(session -> doSendAndReceive(session, destination, responseQueue, messageCreator), true); + return sendAndReceive(destination, responseQueue, messageCreator, SelectorType.MESSAGE_ID); + } + + @Override + public @Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType) throws JmsException { + return executeLocal(session -> doSendAndReceive(session, destination, responseQueue, messageCreator, selectorType), true); } @Override @@ -913,10 +920,15 @@ else if (isClientAcknowledge(session)) { @Override public @Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator) throws JmsException { + return sendAndReceive(destinationName, responseQueueName, messageCreator, SelectorType.MESSAGE_ID); + } + + @Override + public @Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator, SelectorType selectorType) throws JmsException { return executeLocal(session -> { Destination destination = resolveDestinationName(session, destinationName); Destination responseQueue = resolveDestinationName(session, responseQueueName); - return doSendAndReceive(session, destination, responseQueue, messageCreator); + return doSendAndReceive(session, destination, responseQueue, messageCreator, selectorType); }, true); } @@ -932,7 +944,7 @@ else if (isClientAcknowledge(session)) { TemporaryQueue responseQueue = null; try { responseQueue = session.createTemporaryQueue(); - return doSendAndReceive(session, destination, responseQueue, messageCreator); + return doSendAndReceive(session, destination, responseQueue, messageCreator, SelectorType.NONE); } finally { if (responseQueue != null) { @@ -944,24 +956,39 @@ else if (isClientAcknowledge(session)) { /** * Send a request message to the given {@link Destination destination} and block until * a reply has been received on the specified {@link Destination responseQueue}. + *

Use {@link SelectorType selectorType} to specify how the response message is correlated + * to the request message. *

Return the response message or {@code null} if no message has been received. * @throws JMSException if thrown by JMS API methods */ - protected @Nullable Message doSendAndReceive(Session session, Destination destination, Destination responseQueue, MessageCreator messageCreator) + protected @Nullable Message doSendAndReceive(Session session, Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType) throws JMSException { - Assert.notNull(messageCreator, "MessageCreator must not be null"); MessageProducer producer = null; MessageConsumer consumer = null; try { Message requestMessage = messageCreator.createMessage(session); producer = session.createProducer(destination); - consumer = session.createConsumer(responseQueue); + + String messageSelector = null; + if (selectorType == SelectorType.CORRELATION_ID) { + String correlationId = UUID.randomUUID().toString(); + requestMessage.setJMSCorrelationID(correlationId); + messageSelector = String.format("JMSCorrelationID = '%s'", correlationId); + } + requestMessage.setJMSReplyTo(responseQueue); if (logger.isDebugEnabled()) { logger.debug("Sending created message: " + requestMessage); } doSend(producer, requestMessage); + + if (selectorType == SelectorType.MESSAGE_ID) { + String messageId = requestMessage.getJMSMessageID(); + messageSelector = String.format("JMSCorrelationID = '%s'", messageId); + } + + consumer = session.createConsumer(responseQueue, messageSelector); return receiveFromConsumer(consumer, getReceiveTimeout()); } finally { diff --git a/spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateTests.java b/spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateTests.java index 5094948582b..97cf76adc43 100644 --- a/spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateTests.java @@ -35,6 +35,7 @@ import jakarta.jms.TextMessage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.springframework.jms.InvalidClientIDException; import org.springframework.jms.InvalidDestinationException; @@ -62,6 +63,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willThrow; import static org.mockito.Mockito.mock; @@ -648,7 +651,7 @@ private void doTestSendAndReceive(boolean explicitDestination, boolean useDefaul given(localSession.createTemporaryQueue()).willReturn(replyDestination); MessageConsumer messageConsumer = mock(); - given(localSession.createConsumer(replyDestination)).willReturn(messageConsumer); + given(localSession.createConsumer(replyDestination, null)).willReturn(messageConsumer); TextMessage request = mock(); @@ -715,7 +718,8 @@ private void doTestSendAndReceiveWithResponseQueue(boolean explicitDestination, given(localSession.createProducer(this.queue)).willReturn(messageProducer); MessageConsumer messageConsumer = mock(); - given(localSession.createConsumer(responseQueue)).willReturn(messageConsumer); + // Default sendAndReceive with responseQueue uses MESSAGE_ID selector + given(localSession.createConsumer(eq(responseQueue), anyString())).willReturn(messageConsumer); TextMessage request = mock(); MessageCreator messageCreator = mock(); @@ -750,6 +754,113 @@ else if (timeout == JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT) { verify(messageProducer).close(); } + @Test + void testSendAndReceiveDestinationWithResponseQueueAndCorrelationIdSelector() throws Exception { + doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.CORRELATION_ID); + } + + @Test + void testSendAndReceiveDestinationNameWithResponseQueueNameAndCorrelationIdSelector() throws Exception { + doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.CORRELATION_ID); + } + + @Test + void testSendAndReceiveDestinationWithResponseQueueAndMessageIdSelector() throws Exception { + doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.MESSAGE_ID); + } + + @Test + void testSendAndReceiveDestinationNameWithResponseQueueNameAndMessageIdSelector() throws Exception { + doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.MESSAGE_ID); + } + + @Test + void testSendAndReceiveDestinationWithResponseQueueAndNoSelector() throws Exception { + doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.NONE); + } + + @Test + void testSendAndReceiveDestinationNameWithResponseQueueNameAndNoSelector() throws Exception { + doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.NONE); + } + + private void doTestSendAndReceiveWithResponseQueueAndSelectorType(boolean explicitDestination, long timeout, + JmsOperations.SelectorType selectorType) throws Exception { + + JmsTemplate template = createTemplate(); + template.setConnectionFactory(this.connectionFactory); + template.setReceiveTimeout(timeout); + + String destinationName = "testDestination"; + String responseQueueName = "responseQueue"; + + Queue responseQueue = mock(); + given(this.jndiContext.lookup(responseQueueName)).willReturn(responseQueue); + + Session localSession = getLocalSession(); + MessageProducer messageProducer = mock(); + given(localSession.createProducer(this.queue)).willReturn(messageProducer); + + MessageConsumer messageConsumer = mock(); + if (selectorType == JmsOperations.SelectorType.NONE) { + given(localSession.createConsumer(responseQueue, null)).willReturn(messageConsumer); + } + else { + given(localSession.createConsumer(eq(responseQueue), anyString())) + .willReturn(messageConsumer); + } + + TextMessage request = mock(); + MessageCreator messageCreator = mock(); + given(messageCreator.createMessage(localSession)).willReturn(request); + + if (selectorType == JmsOperations.SelectorType.MESSAGE_ID) { + given(request.getJMSMessageID()).willReturn("ID:test-message-id-12345"); + } + + TextMessage reply = mock(); + if (timeout == JmsTemplate.RECEIVE_TIMEOUT_NO_WAIT) { + given(messageConsumer.receiveNoWait()).willReturn(reply); + } + else if (timeout == JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT) { + given(messageConsumer.receive()).willReturn(reply); + } + else { + given(messageConsumer.receive(timeout)).willReturn(reply); + } + + Message message; + if (explicitDestination) { + message = template.sendAndReceive(this.queue, responseQueue, messageCreator, selectorType); + } + else { + message = template.sendAndReceive(destinationName, responseQueueName, messageCreator, selectorType); + } + + // replyTO set on the request + verify(request).setJMSReplyTo(responseQueue); + assertThat(message).as("Reply message not received").isSameAs(reply); + verify(this.connection).start(); + verify(this.connection).close(); + verify(localSession).close(); + verify(messageConsumer).close(); + verify(messageProducer).close(); + + // Verify selector-specific behavior + if (selectorType == JmsOperations.SelectorType.CORRELATION_ID) { + ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); + verify(request).setJMSCorrelationID(captor.capture()); + verify(localSession).createConsumer(eq(responseQueue), eq("JMSCorrelationID = '" + captor.getValue() + "'")); + } + else if (selectorType == JmsOperations.SelectorType.MESSAGE_ID) { + verify(request).getJMSMessageID(); + verify(localSession).createConsumer(eq(responseQueue), eq("JMSCorrelationID = 'ID:test-message-id-12345'")); + } + else { + verify(localSession).createConsumer(responseQueue, null); + } + } + @Test void testIllegalStateException() throws Exception { doTestJmsException(new jakarta.jms.IllegalStateException(""), org.springframework.jms.IllegalStateException.class);