@@ -1157,51 +1157,45 @@ public <T> T receiveAndConvert(String queueName, long timeoutMillis, Parameteriz
11571157 }
11581158
11591159 @ Override
1160- @ Nullable
11611160 public <R , S > boolean receiveAndReply (ReceiveAndReplyCallback <R , S > callback ) throws AmqpException {
11621161 return receiveAndReply (this .getRequiredQueue (), callback );
11631162 }
11641163
11651164 @ Override
11661165 @ SuppressWarnings (UNCHECKED )
1167- @ Nullable
11681166 public <R , S > boolean receiveAndReply (final String queueName , ReceiveAndReplyCallback <R , S > callback ) throws AmqpException {
11691167 return receiveAndReply (queueName , callback , (ReplyToAddressCallback <S >) this .defaultReplyToAddressCallback );
11701168
11711169 }
11721170
11731171 @ Override
1174- @ Nullable
11751172 public <R , S > boolean receiveAndReply (ReceiveAndReplyCallback <R , S > callback , final String exchange , final String routingKey )
11761173 throws AmqpException {
11771174 return receiveAndReply (this .getRequiredQueue (), callback , exchange , routingKey );
11781175 }
11791176
11801177 @ Override
1181- @ Nullable
11821178 public <R , S > boolean receiveAndReply (final String queueName , ReceiveAndReplyCallback <R , S > callback , final String replyExchange ,
11831179 final String replyRoutingKey ) throws AmqpException {
11841180 return receiveAndReply (queueName , callback ,
11851181 (request , reply ) -> new Address (replyExchange , replyRoutingKey ));
11861182 }
11871183
11881184 @ Override
1189- @ Nullable
11901185 public <R , S > boolean receiveAndReply (ReceiveAndReplyCallback <R , S > callback , ReplyToAddressCallback <S > replyToAddressCallback )
11911186 throws AmqpException {
11921187 return receiveAndReply (this .getRequiredQueue (), callback , replyToAddressCallback );
11931188 }
11941189
11951190 @ Override
1196- @ Nullable
11971191 public <R , S > boolean receiveAndReply (String queueName , ReceiveAndReplyCallback <R , S > callback ,
11981192 ReplyToAddressCallback <S > replyToAddressCallback ) throws AmqpException {
11991193 return doReceiveAndReply (queueName , callback , replyToAddressCallback );
12001194 }
12011195
1202- @ Nullable
12031196 private <R , S > boolean doReceiveAndReply (final String queueName , final ReceiveAndReplyCallback <R , S > callback ,
12041197 final ReplyToAddressCallback <S > replyToAddressCallback ) throws AmqpException {
1198+
12051199 return execute (channel -> {
12061200 Message receiveMessage = receiveForReply (queueName , channel );
12071201 if (receiveMessage != null ) {
@@ -2006,17 +2000,7 @@ private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionF
20062000 invokeScope = true ;
20072001 }
20082002 try {
2009- if (this .confirmsOrReturnsCapable == null ) {
2010- determineConfirmsReturnsCapability (connectionFactory );
2011- }
2012- if (this .confirmsOrReturnsCapable ) {
2013- addListener (channel );
2014- }
2015- if (logger .isDebugEnabled ()) {
2016- logger .debug (
2017- "Executing callback " + action .getClass ().getSimpleName () + " on RabbitMQ Channel: " + channel );
2018- }
2019- return action .doInRabbit (channel );
2003+ return invokeAction (action , connectionFactory , channel );
20202004 }
20212005 catch (Exception ex ) {
20222006 if (isChannelLocallyTransacted (channel )) {
@@ -2025,18 +2009,40 @@ private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionF
20252009 throw convertRabbitAccessException (ex );
20262010 }
20272011 finally {
2028- if (!invokeScope ) {
2029- if (resourceHolder != null ) {
2030- ConnectionFactoryUtils .releaseResources (resourceHolder );
2031- }
2032- else {
2033- RabbitUtils .closeChannel (channel );
2034- RabbitUtils .closeConnection (connection );
2035- }
2012+ cleanUpAfterAction (channel , invokeScope , resourceHolder , connection );
2013+ }
2014+ }
2015+
2016+ private void cleanUpAfterAction (Channel channel , boolean invokeScope , RabbitResourceHolder resourceHolder ,
2017+ Connection connection ) {
2018+
2019+ if (!invokeScope ) {
2020+ if (resourceHolder != null ) {
2021+ ConnectionFactoryUtils .releaseResources (resourceHolder );
2022+ }
2023+ else {
2024+ RabbitUtils .closeChannel (channel );
2025+ RabbitUtils .closeConnection (connection );
20362026 }
20372027 }
20382028 }
20392029
2030+ private <T > T invokeAction (ChannelCallback <T > action , ConnectionFactory connectionFactory , Channel channel )
2031+ throws Exception {
2032+
2033+ if (this .confirmsOrReturnsCapable == null ) {
2034+ determineConfirmsReturnsCapability (connectionFactory );
2035+ }
2036+ if (this .confirmsOrReturnsCapable ) {
2037+ addListener (channel );
2038+ }
2039+ if (logger .isDebugEnabled ()) {
2040+ logger .debug (
2041+ "Executing callback " + action .getClass ().getSimpleName () + " on RabbitMQ Channel: " + channel );
2042+ }
2043+ return action .doInRabbit (channel );
2044+ }
2045+
20402046 @ Override
20412047 @ Nullable
20422048 public <T > T invoke (OperationsCallback <T > action , @ Nullable com .rabbitmq .client .ConfirmCallback acks ,
0 commit comments