Skip to content

Commit c939c58

Browse files
garyrussellartembilan
authored andcommitted
GH-1106: Fix Use Publisher CF with RT.invoke()
Fixes #1106 `RabbitTemplate.invoke()` did not honor `usePublisherConnection`.
1 parent 287977e commit c939c58

File tree

3 files changed

+50
-3
lines changed

3 files changed

+50
-3
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ public static void registerDeliveryTag(ConnectionFactory connectionFactory, Chan
205205
*/
206206
public static Connection createConnection(final ConnectionFactory connectionFactory,
207207
final boolean publisherConnectionIfPossible) {
208+
208209
if (publisherConnectionIfPossible) {
209210
ConnectionFactory publisherFactory = connectionFactory.getPublisherConnectionFactory();
210211
if (publisherFactory != null) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2166,16 +2166,21 @@ public <T> T invoke(OperationsCallback<T> action, @Nullable com.rabbitmq.client.
21662166
RabbitResourceHolder resourceHolder = null;
21672167
Connection connection = null; // NOSONAR (close)
21682168
Channel channel;
2169+
ConnectionFactory connectionFactory = getConnectionFactory();
21692170
if (isChannelTransacted()) {
2170-
resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(getConnectionFactory(), true);
2171+
resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(connectionFactory, true,
2172+
this.usePublisherConnection);
21712173
channel = resourceHolder.getChannel();
21722174
if (channel == null) {
21732175
ConnectionFactoryUtils.releaseResources(resourceHolder);
21742176
throw new IllegalStateException("Resource holder returned a null channel");
21752177
}
21762178
}
21772179
else {
2178-
connection = getConnectionFactory().createConnection(); // NOSONAR - RabbitUtils
2180+
if (this.usePublisherConnection && connectionFactory.getPublisherConnectionFactory() != null) {
2181+
connectionFactory = connectionFactory.getPublisherConnectionFactory();
2182+
}
2183+
connection = connectionFactory.createConnection(); // NOSONAR - RabbitUtils
21792184
if (connection == null) {
21802185
throw new IllegalStateException("Connection factory returned a null connection");
21812186
}
@@ -2184,7 +2189,7 @@ public <T> T invoke(OperationsCallback<T> action, @Nullable com.rabbitmq.client.
21842189
if (channel == null) {
21852190
throw new IllegalStateException("Connection returned a null channel");
21862191
}
2187-
if (!getConnectionFactory().isPublisherConfirms()) {
2192+
if (!connectionFactory.isPublisherConfirms()) {
21882193
RabbitUtils.setPhysicalCloseRequired(channel, true);
21892194
}
21902195
this.dedicatedChannels.set(channel);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,47 @@ public void testAddAndRemoveAfterReceivePostProcessors() {
518518
assertThat(afterReceivePostProcessors).containsExactly(mpp2, mpp3);
519519
}
520520

521+
@Test
522+
public void testPublisherConnWithInvoke() {
523+
org.springframework.amqp.rabbit.connection.ConnectionFactory cf = mock(
524+
org.springframework.amqp.rabbit.connection.ConnectionFactory.class);
525+
org.springframework.amqp.rabbit.connection.ConnectionFactory pcf = mock(
526+
org.springframework.amqp.rabbit.connection.ConnectionFactory.class);
527+
given(cf.getPublisherConnectionFactory()).willReturn(pcf);
528+
RabbitTemplate template = new RabbitTemplate(cf);
529+
template.setUsePublisherConnection(true);
530+
org.springframework.amqp.rabbit.connection.Connection conn = mock(
531+
org.springframework.amqp.rabbit.connection.Connection.class);
532+
Channel channel = mock(Channel.class);
533+
given(pcf.createConnection()).willReturn(conn);
534+
given(conn.isOpen()).willReturn(true);
535+
given(conn.createChannel(false)).willReturn(channel);
536+
template.invoke(t -> null);
537+
verify(pcf).createConnection();
538+
verify(conn).createChannel(false);
539+
}
540+
541+
@Test
542+
public void testPublisherConnWithInvokeInTx() {
543+
org.springframework.amqp.rabbit.connection.ConnectionFactory cf = mock(
544+
org.springframework.amqp.rabbit.connection.ConnectionFactory.class);
545+
org.springframework.amqp.rabbit.connection.ConnectionFactory pcf = mock(
546+
org.springframework.amqp.rabbit.connection.ConnectionFactory.class);
547+
given(cf.getPublisherConnectionFactory()).willReturn(pcf);
548+
RabbitTemplate template = new RabbitTemplate(cf);
549+
template.setUsePublisherConnection(true);
550+
template.setChannelTransacted(true);
551+
org.springframework.amqp.rabbit.connection.Connection conn = mock(
552+
org.springframework.amqp.rabbit.connection.Connection.class);
553+
Channel channel = mock(Channel.class);
554+
given(pcf.createConnection()).willReturn(conn);
555+
given(conn.isOpen()).willReturn(true);
556+
given(conn.createChannel(true)).willReturn(channel);
557+
template.invoke(t -> null);
558+
verify(pcf).createConnection();
559+
verify(conn).createChannel(true);
560+
}
561+
521562
@SuppressWarnings("serial")
522563
private class TestTransactionManager extends AbstractPlatformTransactionManager {
523564

0 commit comments

Comments
 (0)