11/*
2- * Copyright 2002-2024 the original author or authors.
2+ * Copyright 2002-2025 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
7979import org .springframework .amqp .support .converter .SimpleMessageConverter ;
8080import org .springframework .amqp .utils .test .TestUtils ;
8181import org .springframework .beans .DirectFieldAccessor ;
82+ import org .springframework .context .ApplicationContext ;
83+ import org .springframework .context .event .ContextClosedEvent ;
8284import org .springframework .expression .Expression ;
8385import org .springframework .expression .spel .standard .SpelExpressionParser ;
8486
@@ -100,6 +102,8 @@ public class RabbitTemplatePublisherCallbacksIntegration1Tests {
100102
101103 public static final String ROUTE = "test.queue.RabbitTemplatePublisherCallbacksIntegrationTests" ;
102104
105+ private static final ApplicationContext APPLICATION_CONTEXT = mock ();
106+
103107 private final ExecutorService executorService = Executors .newSingleThreadExecutor ();
104108
105109 private CachingConnectionFactory connectionFactory ;
@@ -122,25 +126,35 @@ public void create() {
122126 connectionFactory .setHost ("localhost" );
123127 connectionFactory .setChannelCacheSize (10 );
124128 connectionFactory .setPort (BrokerTestUtils .getPort ());
129+ connectionFactory .setApplicationContext (APPLICATION_CONTEXT );
130+
125131 connectionFactoryWithConfirmsEnabled = new CachingConnectionFactory ();
126132 connectionFactoryWithConfirmsEnabled .setHost ("localhost" );
127133 connectionFactoryWithConfirmsEnabled .setChannelCacheSize (100 );
128134 connectionFactoryWithConfirmsEnabled .setPort (BrokerTestUtils .getPort ());
129135 connectionFactoryWithConfirmsEnabled .setPublisherConfirmType (ConfirmType .CORRELATED );
136+ connectionFactoryWithConfirmsEnabled .setApplicationContext (APPLICATION_CONTEXT );
137+
130138 templateWithConfirmsEnabled = new RabbitTemplate (connectionFactoryWithConfirmsEnabled );
139+
131140 connectionFactoryWithReturnsEnabled = new CachingConnectionFactory ();
132141 connectionFactoryWithReturnsEnabled .setHost ("localhost" );
133142 connectionFactoryWithReturnsEnabled .setChannelCacheSize (1 );
134143 connectionFactoryWithReturnsEnabled .setPort (BrokerTestUtils .getPort ());
135144 connectionFactoryWithReturnsEnabled .setPublisherReturns (true );
145+ connectionFactoryWithReturnsEnabled .setApplicationContext (APPLICATION_CONTEXT );
146+
136147 templateWithReturnsEnabled = new RabbitTemplate (connectionFactoryWithReturnsEnabled );
137148 templateWithReturnsEnabled .setMandatory (true );
149+
138150 connectionFactoryWithConfirmsAndReturnsEnabled = new CachingConnectionFactory ();
139151 connectionFactoryWithConfirmsAndReturnsEnabled .setHost ("localhost" );
140152 connectionFactoryWithConfirmsAndReturnsEnabled .setChannelCacheSize (100 );
141153 connectionFactoryWithConfirmsAndReturnsEnabled .setPort (BrokerTestUtils .getPort ());
142154 connectionFactoryWithConfirmsAndReturnsEnabled .setPublisherConfirmType (ConfirmType .CORRELATED );
143155 connectionFactoryWithConfirmsAndReturnsEnabled .setPublisherReturns (true );
156+ connectionFactoryWithConfirmsAndReturnsEnabled .setApplicationContext (APPLICATION_CONTEXT );
157+
144158 templateWithConfirmsAndReturnsEnabled = new RabbitTemplate (connectionFactoryWithConfirmsAndReturnsEnabled );
145159 templateWithConfirmsAndReturnsEnabled .setMandatory (true );
146160 }
@@ -149,17 +163,27 @@ public void create() {
149163 public void cleanUp () {
150164 this .templateWithConfirmsEnabled .stop ();
151165 this .templateWithReturnsEnabled .stop ();
166+
167+ this .connectionFactory .onApplicationEvent (new ContextClosedEvent (APPLICATION_CONTEXT ));
152168 this .connectionFactory .destroy ();
169+
170+ this .connectionFactoryWithConfirmsEnabled .onApplicationEvent (new ContextClosedEvent (APPLICATION_CONTEXT ));
153171 this .connectionFactoryWithConfirmsEnabled .destroy ();
172+
173+ this .connectionFactoryWithReturnsEnabled .onApplicationEvent (new ContextClosedEvent (APPLICATION_CONTEXT ));
154174 this .connectionFactoryWithReturnsEnabled .destroy ();
175+
176+ this .connectionFactoryWithConfirmsAndReturnsEnabled .onApplicationEvent (new ContextClosedEvent (APPLICATION_CONTEXT ));
177+ this .connectionFactoryWithConfirmsAndReturnsEnabled .destroy ();
178+
155179 this .executorService .shutdown ();
156180 }
157181
158182 @ Test
159183 public void testPublisherConfirmReceived () throws Exception {
160184 final CountDownLatch latch = new CountDownLatch (10000 );
161185 final AtomicInteger acks = new AtomicInteger ();
162- final AtomicReference <CorrelationData > confirmCorrelation = new AtomicReference <CorrelationData >();
186+ final AtomicReference <CorrelationData > confirmCorrelation = new AtomicReference <>();
163187 AtomicReference <String > callbackThreadName = new AtomicReference <>();
164188 this .templateWithConfirmsEnabled .setConfirmCallback ((correlationData , ack , cause ) -> {
165189 acks .incrementAndGet ();
@@ -208,7 +232,7 @@ public Message postProcessMessage(Message message, Correlation correlation, Stri
208232 this .templateWithConfirmsEnabled .execute (channel -> {
209233 Map <?, ?> listenerMap = TestUtils .getPropertyValue (((ChannelProxy ) channel ).getTargetChannel (),
210234 "listenerForSeq" , Map .class );
211- await ().until (() -> listenerMap . size () == 0 );
235+ await ().until (listenerMap :: isEmpty );
212236 return null ;
213237 });
214238
@@ -227,7 +251,8 @@ public void testPublisherConfirmWithSendAndReceive() throws Exception {
227251 confirmCD .set (correlationData );
228252 latch .countDown ();
229253 });
230- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer (this .connectionFactoryWithConfirmsEnabled );
254+ SimpleMessageListenerContainer container =
255+ new SimpleMessageListenerContainer (this .connectionFactoryWithConfirmsEnabled );
231256 container .setQueueNames (ROUTE );
232257 container .setReceiveTimeout (10 );
233258 container .setMessageListener (
@@ -643,7 +668,6 @@ public void testConcurrentConfirms() throws Exception {
643668 assertThat (waitForAll3AcksLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
644669 assertThat (acks .get ()).isEqualTo (3 );
645670
646-
647671 channel .basicConsume ("foo" , false , (Map ) null , null );
648672 verify (mockChannel ).basicConsume ("foo" , false , (Map ) null , null );
649673
0 commit comments