@@ -292,6 +292,24 @@ public boolean isInExpectedState() {
292292 return isRunning () || isStoppedNormally ();
293293 }
294294
295+ @ Override
296+ public void pause () {
297+ super .pause ();
298+ KafkaMessageListenerContainer <K , V >.ListenerConsumer consumer = this .listenerConsumer ;
299+ if (consumer != null ) {
300+ consumer .wakeIfNecessary ();
301+ }
302+ }
303+
304+ @ Override
305+ public void resume () {
306+ super .resume ();
307+ KafkaMessageListenerContainer <K , V >.ListenerConsumer consumer = this .listenerConsumer ;
308+ if (consumer != null ) {
309+ this .listenerConsumer .wakeIfNecessary ();
310+ }
311+ }
312+
295313 @ Override
296314 public Map <String , Map <MetricName , ? extends Metric >> metrics () {
297315 ListenerConsumer listenerConsumerForMetrics = this .listenerConsumer ;
@@ -369,7 +387,7 @@ protected void doStop(final Runnable callback, boolean normal) {
369387 if (isRunning ()) {
370388 this .listenerConsumerFuture .addCallback (new StopCallback (callback ));
371389 setRunning (false );
372- this .listenerConsumer .wakeIfNecessary ();
390+ this .listenerConsumer .wakeIfNecessaryForStop ();
373391 setStoppedNormally (normal );
374392 }
375393 }
@@ -1303,7 +1321,7 @@ protected void pollAndInvoke() {
13031321 ConsumerRecords <K , V > records = doPoll ();
13041322 if (!this .polling .compareAndSet (true , false ) && records != null ) {
13051323 /*
1306- * There is a small race condition where wakeIfNecessary was called between
1324+ * There is a small race condition where wakeIfNecessaryForStop was called between
13071325 * exiting the poll and before we reset the boolean.
13081326 */
13091327 if (records .count () > 0 ) {
@@ -1521,12 +1539,18 @@ private void checkRebalanceCommits() {
15211539 }
15221540 }
15231541
1524- void wakeIfNecessary () {
1542+ void wakeIfNecessaryForStop () {
15251543 if (this .polling .getAndSet (false )) {
15261544 this .consumer .wakeup ();
15271545 }
15281546 }
15291547
1548+ void wakeIfNecessary () {
1549+ if (this .polling .get ()) {
1550+ this .consumer .wakeup ();
1551+ }
1552+ }
1553+
15301554 private void debugRecords (@ Nullable ConsumerRecords <K , V > records ) {
15311555 if (records != null ) {
15321556 this .logger .debug (() -> "Received: " + records .count () + " records" );
@@ -2427,7 +2451,7 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
24272451
24282452 private void nackSleepAndReset () {
24292453 try {
2430- Thread . sleep ( this .nackSleep );
2454+ ListenerUtils . stoppableSleep ( KafkaMessageListenerContainer . this . thisOrParentContainer , this .nackSleep );
24312455 }
24322456 catch (@ SuppressWarnings (UNUSED ) InterruptedException e ) {
24332457 Thread .currentThread ().interrupt ();
0 commit comments