Skip to content

Commit 434410f

Browse files
committed
GH-2991: Extract AmqpConnectionFactory abstraction
Fixes: #2991 It turns out to be not very convenient when application fails on initialization due to not having connection with the broker. So, the `AmqpConnectionFactoryBean` is wrong contract and better look into a `ConnectionFactory` abstraction instead. * Rework `AmqpConnectionFactoryBean` with its eager real connection to the `AmqpConnectionFactory` contract and `SingleAmqpConnectionFactory` implementation where real connection only happens when its `getConnection()` is called * Rework all the client components to rely on the new `AmqpConnectionFactory` contract * Make `Publisher` instantiation in a similar to `Connection` manner - on demand, when `RabbitAmqpTemplate` calls its internal `getPublisher()`
1 parent b90b672 commit 434410f

File tree

9 files changed

+174
-105
lines changed

9 files changed

+174
-105
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbitmq.client;
18+
19+
import com.rabbitmq.client.amqp.Connection;
20+
21+
/**
22+
* The contract for RabbitMQ AMQP 1.0 {@link Connection} management.
23+
*
24+
* @author Artem Bilan
25+
*
26+
* @since 4.0
27+
*/
28+
public interface AmqpConnectionFactory {
29+
30+
Connection getConnection();
31+
32+
}

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpAdmin.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.concurrent.atomic.AtomicReference;
2626

2727
import com.rabbitmq.client.amqp.AmqpException;
28-
import com.rabbitmq.client.amqp.Connection;
2928
import com.rabbitmq.client.amqp.Management;
3029
import org.jspecify.annotations.Nullable;
3130

@@ -67,7 +66,7 @@ public class RabbitAmqpAdmin
6766

6867
public static final String QUEUE_TYPE = "QUEUE_TYPE";
6968

70-
private final Connection amqpConnection;
69+
private final AmqpConnectionFactory connectionFactory;
7170

7271
private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
7372

@@ -88,8 +87,8 @@ public class RabbitAmqpAdmin
8887

8988
private volatile boolean running = false;
9089

91-
public RabbitAmqpAdmin(Connection amqpConnection) {
92-
this.amqpConnection = amqpConnection;
90+
public RabbitAmqpAdmin(AmqpConnectionFactory connectionFactory) {
91+
this.connectionFactory = connectionFactory;
9392
}
9493

9594
@Override
@@ -231,7 +230,7 @@ private void declareDeclarableBeans() {
231230
return;
232231
}
233232

234-
try (Management management = this.amqpConnection.management()) {
233+
try (Management management = getManagement()) {
235234
exchanges.forEach((exchange) -> doDeclareExchange(management, exchange));
236235
queues.forEach((queue) -> doDeclareQueue(management, queue));
237236
bindings.forEach((binding) -> doDeclareBinding(management, binding));
@@ -273,7 +272,7 @@ private <T extends Declarable> boolean declarableByMe(T dec) {
273272

274273
@Override
275274
public void declareExchange(Exchange exchange) {
276-
try (Management management = this.amqpConnection.management()) {
275+
try (Management management = getManagement()) {
277276
doDeclareExchange(management, exchange);
278277
}
279278
}
@@ -307,15 +306,15 @@ public boolean deleteExchange(String exchangeName) {
307306
return false;
308307
}
309308

310-
try (Management management = this.amqpConnection.management()) {
309+
try (Management management = getManagement()) {
311310
management.exchangeDelete(exchangeName);
312311
}
313312
return true;
314313
}
315314

316315
@Override
317316
public @Nullable Queue declareQueue() {
318-
try (Management management = this.amqpConnection.management()) {
317+
try (Management management = getManagement()) {
319318
return doDeclareQueue(management);
320319
}
321320
}
@@ -340,7 +339,7 @@ public boolean deleteExchange(String exchangeName) {
340339

341340
@Override
342341
public @Nullable String declareQueue(Queue queue) {
343-
try (Management management = this.amqpConnection.management()) {
342+
try (Management management = getManagement()) {
344343
return doDeclareQueue(management, queue);
345344
}
346345
}
@@ -378,7 +377,7 @@ public boolean deleteQueue(String queueName) {
378377
@ManagedOperation(description =
379378
"Delete a queue from the broker if unused and empty (when corresponding arguments are true")
380379
public void deleteQueue(String queueName, boolean unused, boolean empty) {
381-
try (Management management = this.amqpConnection.management()) {
380+
try (Management management = getManagement()) {
382381
Management.QueueInfo queueInfo = management.queueInfo(queueName);
383382
if ((!unused || queueInfo.consumerCount() == 0)
384383
&& (!empty || queueInfo.messageCount() == 0)) {
@@ -402,15 +401,15 @@ public void purgeQueue(String queueName, boolean noWait) {
402401
@Override
403402
@ManagedOperation(description = "Purge a queue and return the number of messages purged")
404403
public int purgeQueue(String queueName) {
405-
try (Management management = this.amqpConnection.management()) {
404+
try (Management management = getManagement()) {
406405
management.queuePurge(queueName);
407406
}
408407
return 0;
409408
}
410409

411410
@Override
412411
public void declareBinding(Binding binding) {
413-
try (Management management = this.amqpConnection.management()) {
412+
try (Management management = getManagement()) {
414413
doDeclareBinding(management, binding);
415414
}
416415
}
@@ -441,7 +440,7 @@ public void removeBinding(Binding binding) {
441440
return;
442441
}
443442

444-
try (Management management = this.amqpConnection.management()) {
443+
try (Management management = getManagement()) {
445444
Management.UnbindSpecification unbindSpecification =
446445
management.unbind()
447446
.sourceExchange(binding.getExchange())
@@ -481,7 +480,7 @@ public void removeBinding(Binding binding) {
481480

482481
@Override
483482
public @Nullable QueueInformation getQueueInfo(String queueName) {
484-
try (Management management = this.amqpConnection.management()) {
483+
try (Management management = getManagement()) {
485484
Management.QueueInfo queueInfo = management.queueInfo(queueName);
486485
QueueInformation queueInformation =
487486
new QueueInformation(queueInfo.name(), queueInfo.messageCount(), queueInfo.consumerCount());
@@ -490,6 +489,10 @@ public void removeBinding(Binding binding) {
490489
}
491490
}
492491

492+
private Management getManagement() {
493+
return this.connectionFactory.getConnection().management();
494+
}
495+
493496
private <T extends Throwable> void logOrRethrowDeclarationException(@Nullable Declarable element,
494497
String elementType, T t) throws T {
495498

@@ -566,5 +569,4 @@ else if (declarable instanceof Binding binding) {
566569
});
567570
}
568571

569-
570572
}

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTemplate.java

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
import java.time.Duration;
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.locks.Lock;
23+
import java.util.concurrent.locks.ReentrantLock;
2224

23-
import com.rabbitmq.client.amqp.Connection;
2425
import com.rabbitmq.client.amqp.Consumer;
2526
import com.rabbitmq.client.amqp.Environment;
2627
import com.rabbitmq.client.amqp.Publisher;
27-
import com.rabbitmq.client.amqp.PublisherBuilder;
2828
import com.rabbitmq.client.amqp.Resource;
2929
import org.jspecify.annotations.Nullable;
3030

@@ -42,7 +42,6 @@
4242
import org.springframework.amqp.support.converter.SmartMessageConverter;
4343
import org.springframework.amqp.utils.JavaUtils;
4444
import org.springframework.beans.factory.DisposableBean;
45-
import org.springframework.beans.factory.InitializingBean;
4645
import org.springframework.core.ParameterizedTypeReference;
4746
import org.springframework.util.Assert;
4847

@@ -54,14 +53,13 @@
5453
*
5554
* @since 4.0
5655
*/
57-
public class RabbitAmqpTemplate implements AsyncAmqpTemplate, InitializingBean, DisposableBean {
56+
public class RabbitAmqpTemplate implements AsyncAmqpTemplate, DisposableBean {
5857

59-
private final Connection connection;
58+
private final AmqpConnectionFactory connectionFactory;
6059

61-
private final PublisherBuilder publisherBuilder;
60+
private final Lock instanceLock = new ReentrantLock();
6261

63-
@SuppressWarnings("NullAway.Init")
64-
private Publisher publisher;
62+
private @Nullable Object publisher;
6563

6664
private MessageConverter messageConverter = new SimpleMessageConverter();
6765

@@ -73,17 +71,20 @@ public class RabbitAmqpTemplate implements AsyncAmqpTemplate, InitializingBean,
7371

7472
private @Nullable String defaultReceiveQueue;
7573

76-
public RabbitAmqpTemplate(Connection amqpConnection) {
77-
this.connection = amqpConnection;
78-
this.publisherBuilder = amqpConnection.publisherBuilder();
74+
private Resource.StateListener @Nullable [] stateListeners;
75+
76+
private Duration publishTimeout = Duration.ofSeconds(60);
77+
78+
public RabbitAmqpTemplate(AmqpConnectionFactory connectionFactory) {
79+
this.connectionFactory = connectionFactory;
7980
}
8081

8182
public void setListeners(Resource.StateListener... listeners) {
82-
this.publisherBuilder.listeners(listeners);
83+
this.stateListeners = listeners;
8384
}
8485

8586
public void setPublishTimeout(Duration timeout) {
86-
this.publisherBuilder.publishTimeout(timeout);
87+
this.publishTimeout = timeout;
8788
}
8889

8990
/**
@@ -100,15 +101,15 @@ public void setExchange(String exchange) {
100101
/**
101102
* Set a default routing key.
102103
* Mutually exclusive with {@link #setQueue(String)}.
103-
* @param key the default routing key.
104+
* @param routingKey the default routing key.
104105
*/
105-
public void setKey(String key) {
106-
this.defaultRoutingKey = key;
106+
public void setRoutingKey(String routingKey) {
107+
this.defaultRoutingKey = routingKey;
107108
}
108109

109110
/**
110111
* Set default queue for publishing.
111-
* Mutually exclusive with {@link #setExchange(String)} and {@link #setKey(String)}.
112+
* Mutually exclusive with {@link #setExchange(String)} and {@link #setRoutingKey(String)}.
112113
* @param queue the default queue.
113114
*/
114115
public void setQueue(String queue) {
@@ -137,14 +138,36 @@ private String getRequiredQueue() throws IllegalStateException {
137138
return name;
138139
}
139140

140-
@Override
141-
public void afterPropertiesSet() {
142-
this.publisher = this.publisherBuilder.build();
141+
private Publisher getPublisher() {
142+
Object publisherToReturn = this.publisher;
143+
if (publisherToReturn == null) {
144+
this.instanceLock.lock();
145+
try {
146+
publisherToReturn = this.publisher;
147+
if (publisherToReturn == null) {
148+
publisherToReturn =
149+
this.connectionFactory.getConnection()
150+
.publisherBuilder()
151+
.listeners(this.stateListeners)
152+
.publishTimeout(this.publishTimeout)
153+
.build();
154+
this.publisher = publisherToReturn;
155+
}
156+
}
157+
finally {
158+
this.instanceLock.unlock();
159+
}
160+
}
161+
return (Publisher) publisherToReturn;
143162
}
144163

145164
@Override
146165
public void destroy() {
147-
this.publisher.close();
166+
Object publisherToClose = this.publisher;
167+
if (publisherToClose != null) {
168+
((Publisher) publisherToClose).close();
169+
this.publisher = null;
170+
}
148171
}
149172

150173
/**
@@ -178,7 +201,7 @@ public CompletableFuture<Boolean> send(String exchange, @Nullable String routing
178201
private CompletableFuture<Boolean> doSend(@Nullable String exchange, @Nullable String routingKey,
179202
@Nullable String queue, Message message) {
180203

181-
com.rabbitmq.client.amqp.Message amqpMessage = this.publisher.message();
204+
com.rabbitmq.client.amqp.Message amqpMessage = getPublisher().message();
182205
com.rabbitmq.client.amqp.Message.MessageAddressBuilder address = amqpMessage.toAddress();
183206
JavaUtils.INSTANCE
184207
.acceptIfNotNull(exchange, address::exchange)
@@ -191,7 +214,7 @@ private CompletableFuture<Boolean> doSend(@Nullable String exchange, @Nullable S
191214

192215
CompletableFuture<Boolean> publishResult = new CompletableFuture<>();
193216

194-
this.publisher.publish(amqpMessage,
217+
getPublisher().publish(amqpMessage,
195218
(context) -> {
196219
switch (context.status()) {
197220
case ACCEPTED -> publishResult.complete(true);
@@ -271,7 +294,8 @@ public CompletableFuture<Message> receive(String queueName) {
271294
CompletableFuture<Message> messageFuture = new CompletableFuture<>();
272295

273296
Consumer consumer =
274-
this.connection.consumerBuilder()
297+
this.connectionFactory.getConnection()
298+
.consumerBuilder()
275299
.queue(queueName)
276300
.initialCredits(1)
277301
.priority(10)

0 commit comments

Comments
 (0)