Skip to content

Commit cc2e48c

Browse files
committed
Refactor shutdown deferral logic based on review feedback
- Move @nullable to type declaration - Use pattern matching for ListenerContainerAware - Skip waiting when replyCounter is null or count is zero - Extract shutdown wait logic into separate method - Add author tag and update whats-new.adoc Signed-off-by: Jeongjun Min <[email protected]>
1 parent 7a732db commit cc2e48c

File tree

5 files changed

+29
-19
lines changed

5 files changed

+29
-19
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@
159159
* @author Alexey Platonov
160160
* @author Leonardo Ferreira
161161
* @author Ngoc Nhan
162+
* @author Jeongjun Min
162163
*
163164
* @since 1.0
164165
*/

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
* @author Jeonggi Kim
8888
* @author Java4ye
8989
* @author Thomas Badie
90+
* @author Jeongjun Min
9091
*
9192
* @since 1.0
9293
*/
@@ -692,22 +693,8 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
692693
Runnable awaitShutdown = () -> {
693694
logger.info("Waiting for workers to finish.");
694695
try {
695-
ActiveObjectCounter<Object> replyCounter = null;
696-
Object listener = getMessageListener();
697-
if (listener instanceof ListenerContainerAware) {
698-
replyCounter = ((ListenerContainerAware) listener).getPendingReplyCounter();
699-
}
700-
701-
if (replyCounter != null) {
702-
if (logger.isInfoEnabled()) {
703-
logger.info("Waiting for pending replies: " + replyCounter.getCount());
704-
}
705-
if (!replyCounter.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
706-
if (logger.isWarnEnabled()) {
707-
logger.warn("Shutdown timeout expired, but " + replyCounter.getCount()
708-
+ " pending replies still remain.");
709-
}
710-
}
696+
if (getMessageListener() instanceof ListenerContainerAware listenerContainerAware) {
697+
awaitPendingReplies(listenerContainerAware);
711698
}
712699

713700
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
@@ -756,6 +743,22 @@ private void runCallbackIfNotNull(@Nullable Runnable callback) {
756743
}
757744
}
758745

746+
private void awaitPendingReplies(ListenerContainerAware listener) throws InterruptedException {
747+
ActiveObjectCounter<Object> replyCounter = listener.getPendingReplyCounter();
748+
749+
if (replyCounter != null && replyCounter.getCount() > 0) {
750+
if (logger.isInfoEnabled()) {
751+
logger.info("Waiting for pending replies: " + replyCounter.getCount());
752+
}
753+
if (!replyCounter.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
754+
if (logger.isWarnEnabled()) {
755+
logger.warn("Shutdown timeout expired, but " + replyCounter.getCount()
756+
+ " pending replies still remain.");
757+
}
758+
}
759+
}
760+
}
761+
759762
private boolean isActive(BlockingQueueConsumer consumer) {
760763
boolean consumerActive;
761764
this.consumersLock.lock();

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/ListenerContainerAware.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* interface can have configuration verified during initialization.
2626
*
2727
* @author Gary Russell
28+
* @author Jeongjun Min
2829
* @since 1.5
2930
*
3031
*/
@@ -44,8 +45,7 @@ public interface ListenerContainerAware {
4445
* @return the counter, or null.
4546
* @since 4.0
4647
*/
47-
@Nullable
48-
default ActiveObjectCounter<Object> getPendingReplyCounter() {
48+
default @Nullable ActiveObjectCounter<Object> getPendingReplyCounter() {
4949
return null;
5050
}
5151
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
* @author Yansong Ren
105105
* @author Tim Bourquin
106106
* @author Jeonggi Kim
107+
* @author Jeongjun Min
107108
*/
108109
public class SimpleMessageListenerContainerTests {
109110

src/reference/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,9 @@ There is no need to keep out-dated utilities and recommendation is to migrate to
3636
The Jackson 2 has been deprecated for removal in whole Spring portfolio.
3737
Respective new classes have been introduced to support Jackson 3.
3838

39-
See xref:amqp/message-converters.adoc[] for more information.
39+
See xref:amqp/message-converters.adoc[] for more information.
40+
41+
[[x40-enhancements]]
42+
=== Enhancements
43+
44+
Defer `SimpleMessageListenerContainer` shutdown for pending `RabbitTemplate` replies.

0 commit comments

Comments
 (0)