Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor
* <p>The {@link MessageCreator} callback creates the message given a Session,
* and the specified {@code responseQueue} is set in the {@code JMSReplyTO}
* header of the message.
* <p>A default {@link SelectorType#MESSAGE_ID} is used to correlate request and reply messages.
* @param destination the destination to send the message to
* @param responseQueue the destination to receive the reply from
* @param messageCreator callback to create a message
Expand All @@ -418,12 +419,31 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor
@Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator)
throws JmsException;

/**
* Send a message to the specified destination and receive the reply from the
* specified response queue.
* <p>The {@link MessageCreator} callback creates the message given a Session,
* and the specified {@code responseQueue} is set in the {@code JMSReplyTO}
* header of the message.
* @param destination the destination to send the message to
* @param responseQueue the destination to receive the reply from
* @param messageCreator callback to create a message
* @param selectorType the {@link SelectorType} to use for correlating request and reply messages
* @return the reply, possibly {@code null} if the message could not be received,
* for example due to a timeout
* @throws JmsException checked JMSException converted to unchecked
* @since 7.0.4
*/
@Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType)
throws JmsException;

/**
* Send a message to the specified destination and receive the reply from the
* specified response queue.
* <p>The {@link MessageCreator} callback creates the message given a Session,
* and the destination with the specified {@code responseQueueName} is set in
* the {@code JMSReplyTO} header of the message.
* <p>A default {@link SelectorType#MESSAGE_ID} is used to correlate request and reply messages.
* @param destinationName the name of the destination to send the message to
* (to be resolved to an actual destination by a DestinationResolver)
* @param responseQueueName the name of the destination to receive the reply from
Expand All @@ -437,6 +457,42 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor
@Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator)
throws JmsException;

/**
* Send a message to the specified destination and receive the reply from the
* specified response queue.
* <p>The {@link MessageCreator} callback creates the message given a Session,
* and the destination with the specified {@code responseQueueName} is set in
* the {@code JMSReplyTO} header of the message.
* @param destinationName the name of the destination to send the message to
* (to be resolved to an actual destination by a DestinationResolver)
* @param responseQueueName the name of the destination to receive the reply from
* (to be resolved to an actual destination by a DestinationResolver)
* @param messageCreator callback to create a message
* @param selectorType the {@link SelectorType} to use for correlating request and reply messages
* @return the reply, possibly {@code null} if the message could not be received,
* for example due to a timeout
* @throws JmsException checked JMSException converted to unchecked
* @since 7.0.4
*/
@Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator, SelectorType selectorType) throws JmsException;

/**
* Enumeration of supported selector types.
*/
enum SelectorType {
/**
* Use generated JMSCorrelationID to correlate request and reply messages.
*/
CORRELATION_ID,
/**
* Use JMSMessageID of the request message to correlate request and reply messages.
*/
MESSAGE_ID,
/**
* Do not use any selector to correlate request and reply messages.
*/
NONE
}

//---------------------------------------------------------------------------------------
// Convenience methods for browsing messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.jms.core;

import java.util.UUID;

import io.micrometer.jakarta9.instrument.jms.JmsInstrumentation;
import io.micrometer.observation.ObservationRegistry;
import jakarta.jms.Connection;
Expand Down Expand Up @@ -900,7 +902,12 @@ else if (isClientAcknowledge(session)) {

@Override
public @Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator) throws JmsException {
return executeLocal(session -> doSendAndReceive(session, destination, responseQueue, messageCreator), true);
return sendAndReceive(destination, responseQueue, messageCreator, SelectorType.MESSAGE_ID);
}

@Override
public @Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType) throws JmsException {
return executeLocal(session -> doSendAndReceive(session, destination, responseQueue, messageCreator, selectorType), true);
}

@Override
Expand All @@ -913,10 +920,15 @@ else if (isClientAcknowledge(session)) {

@Override
public @Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator) throws JmsException {
return sendAndReceive(destinationName, responseQueueName, messageCreator, SelectorType.MESSAGE_ID);
}

@Override
public @Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator, SelectorType selectorType) throws JmsException {
return executeLocal(session -> {
Destination destination = resolveDestinationName(session, destinationName);
Destination responseQueue = resolveDestinationName(session, responseQueueName);
return doSendAndReceive(session, destination, responseQueue, messageCreator);
return doSendAndReceive(session, destination, responseQueue, messageCreator, selectorType);
}, true);
}

Expand All @@ -932,7 +944,7 @@ else if (isClientAcknowledge(session)) {
TemporaryQueue responseQueue = null;
try {
responseQueue = session.createTemporaryQueue();
return doSendAndReceive(session, destination, responseQueue, messageCreator);
return doSendAndReceive(session, destination, responseQueue, messageCreator, SelectorType.NONE);
}
finally {
if (responseQueue != null) {
Expand All @@ -944,24 +956,39 @@ else if (isClientAcknowledge(session)) {
/**
* Send a request message to the given {@link Destination destination} and block until
* a reply has been received on the specified {@link Destination responseQueue}.
* <p>Use {@link SelectorType selectorType} to specify how the response message is correlated
* to the request message.
* <p>Return the response message or {@code null} if no message has been received.
* @throws JMSException if thrown by JMS API methods
*/
protected @Nullable Message doSendAndReceive(Session session, Destination destination, Destination responseQueue, MessageCreator messageCreator)
protected @Nullable Message doSendAndReceive(Session session, Destination destination, Destination responseQueue, MessageCreator messageCreator, SelectorType selectorType)
throws JMSException {

Assert.notNull(messageCreator, "MessageCreator must not be null");
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
Message requestMessage = messageCreator.createMessage(session);
producer = session.createProducer(destination);
consumer = session.createConsumer(responseQueue);

String messageSelector = null;
if (selectorType == SelectorType.CORRELATION_ID) {
String correlationId = UUID.randomUUID().toString();
requestMessage.setJMSCorrelationID(correlationId);
messageSelector = String.format("JMSCorrelationID = '%s'", correlationId);
}

requestMessage.setJMSReplyTo(responseQueue);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + requestMessage);
}
doSend(producer, requestMessage);

if (selectorType == SelectorType.MESSAGE_ID) {
String messageId = requestMessage.getJMSMessageID();
messageSelector = String.format("JMSCorrelationID = '%s'", messageId);
}

consumer = session.createConsumer(responseQueue, messageSelector);
return receiveFromConsumer(consumer, getReceiveTimeout());
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import jakarta.jms.TextMessage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import org.springframework.jms.InvalidClientIDException;
import org.springframework.jms.InvalidDestinationException;
Expand Down Expand Up @@ -62,6 +63,8 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -648,7 +651,7 @@ private void doTestSendAndReceive(boolean explicitDestination, boolean useDefaul
given(localSession.createTemporaryQueue()).willReturn(replyDestination);

MessageConsumer messageConsumer = mock();
given(localSession.createConsumer(replyDestination)).willReturn(messageConsumer);
given(localSession.createConsumer(replyDestination, null)).willReturn(messageConsumer);


TextMessage request = mock();
Expand Down Expand Up @@ -715,7 +718,8 @@ private void doTestSendAndReceiveWithResponseQueue(boolean explicitDestination,
given(localSession.createProducer(this.queue)).willReturn(messageProducer);

MessageConsumer messageConsumer = mock();
given(localSession.createConsumer(responseQueue)).willReturn(messageConsumer);
// Default sendAndReceive with responseQueue uses MESSAGE_ID selector
given(localSession.createConsumer(eq(responseQueue), anyString())).willReturn(messageConsumer);

TextMessage request = mock();
MessageCreator messageCreator = mock();
Expand Down Expand Up @@ -750,6 +754,113 @@ else if (timeout == JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT) {
verify(messageProducer).close();
}

@Test
void testSendAndReceiveDestinationWithResponseQueueAndCorrelationIdSelector() throws Exception {
doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.CORRELATION_ID);
}

@Test
void testSendAndReceiveDestinationNameWithResponseQueueNameAndCorrelationIdSelector() throws Exception {
doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.CORRELATION_ID);
}

@Test
void testSendAndReceiveDestinationWithResponseQueueAndMessageIdSelector() throws Exception {
doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.MESSAGE_ID);
}

@Test
void testSendAndReceiveDestinationNameWithResponseQueueNameAndMessageIdSelector() throws Exception {
doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.MESSAGE_ID);
}

@Test
void testSendAndReceiveDestinationWithResponseQueueAndNoSelector() throws Exception {
doTestSendAndReceiveWithResponseQueueAndSelectorType(true, 1000L, JmsOperations.SelectorType.NONE);
}

@Test
void testSendAndReceiveDestinationNameWithResponseQueueNameAndNoSelector() throws Exception {
doTestSendAndReceiveWithResponseQueueAndSelectorType(false, 1000L, JmsOperations.SelectorType.NONE);
}

private void doTestSendAndReceiveWithResponseQueueAndSelectorType(boolean explicitDestination, long timeout,
JmsOperations.SelectorType selectorType) throws Exception {

JmsTemplate template = createTemplate();
template.setConnectionFactory(this.connectionFactory);
template.setReceiveTimeout(timeout);

String destinationName = "testDestination";
String responseQueueName = "responseQueue";

Queue responseQueue = mock();
given(this.jndiContext.lookup(responseQueueName)).willReturn(responseQueue);

Session localSession = getLocalSession();
MessageProducer messageProducer = mock();
given(localSession.createProducer(this.queue)).willReturn(messageProducer);

MessageConsumer messageConsumer = mock();
if (selectorType == JmsOperations.SelectorType.NONE) {
given(localSession.createConsumer(responseQueue, null)).willReturn(messageConsumer);
}
else {
given(localSession.createConsumer(eq(responseQueue), anyString()))
.willReturn(messageConsumer);
}

TextMessage request = mock();
MessageCreator messageCreator = mock();
given(messageCreator.createMessage(localSession)).willReturn(request);

if (selectorType == JmsOperations.SelectorType.MESSAGE_ID) {
given(request.getJMSMessageID()).willReturn("ID:test-message-id-12345");
}

TextMessage reply = mock();
if (timeout == JmsTemplate.RECEIVE_TIMEOUT_NO_WAIT) {
given(messageConsumer.receiveNoWait()).willReturn(reply);
}
else if (timeout == JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT) {
given(messageConsumer.receive()).willReturn(reply);
}
else {
given(messageConsumer.receive(timeout)).willReturn(reply);
}

Message message;
if (explicitDestination) {
message = template.sendAndReceive(this.queue, responseQueue, messageCreator, selectorType);
}
else {
message = template.sendAndReceive(destinationName, responseQueueName, messageCreator, selectorType);
}

// replyTO set on the request
verify(request).setJMSReplyTo(responseQueue);
assertThat(message).as("Reply message not received").isSameAs(reply);
verify(this.connection).start();
verify(this.connection).close();
verify(localSession).close();
verify(messageConsumer).close();
verify(messageProducer).close();

// Verify selector-specific behavior
if (selectorType == JmsOperations.SelectorType.CORRELATION_ID) {
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(request).setJMSCorrelationID(captor.capture());
verify(localSession).createConsumer(eq(responseQueue), eq("JMSCorrelationID = '" + captor.getValue() + "'"));
}
else if (selectorType == JmsOperations.SelectorType.MESSAGE_ID) {
verify(request).getJMSMessageID();
verify(localSession).createConsumer(eq(responseQueue), eq("JMSCorrelationID = 'ID:test-message-id-12345'"));
}
else {
verify(localSession).createConsumer(responseQueue, null);
}
}

@Test
void testIllegalStateException() throws Exception {
doTestJmsException(new jakarta.jms.IllegalStateException(""), org.springframework.jms.IllegalStateException.class);
Expand Down