diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index 76cc94181c..aec11a82f9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -51,6 +51,7 @@ * @author Andrii Pelesh * @author Antonio Tomac * @author Wang Zhiyang + * @author Sanghyeok An * * @since 2.8 * @@ -113,13 +114,18 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r throw new KafkaException("Woken up during retry", logLevel, we); } try { - ListenerUtils.conditionalSleep( + ListenerUtils.conditionalSleepWithPoll( () -> container.isRunning() && !container.isPauseRequested() && records.partitions().stream().noneMatch(container::isPartitionPauseRequested), - nextBackOff + nextBackOff, + consumer ); } + catch (WakeupException we) { + seeker.handleBatch(thrownException, records, consumer, container, NO_OP); + throw new KafkaException("Woken up during retry", logLevel, we); + } catch (InterruptedException e1) { Thread.currentThread().interrupt(); seeker.handleBatch(thrownException, records, consumer, container, NO_OP); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java index 56a44a203f..f4bbd2c6d5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java @@ -16,10 +16,12 @@ package org.springframework.kafka.listener; +import java.time.Duration; import java.util.Map; import java.util.Objects; import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.jspecify.annotations.Nullable; @@ -34,6 +36,7 @@ * @author Francois Rosiere * @author Antonio Tomac * @author Wang Zhiyang + * @author Sanghyeok An * @since 2.0 * */ @@ -147,6 +150,38 @@ public static void conditionalSleep(Supplier shouldSleepCondition, long while (System.currentTimeMillis() < timeout); } + /** + * Sleep for the desired timeout, as long as shouldSleepCondition supplies true. + * This method requires that the consumer is paused; otherwise, ConsumerRecord may be lost. + * Periodically calls {@code Consumer.poll(Duration.ZERO)} to prevent a paused consumer from being rebalanced. + * @param shouldSleepCondition to. + * @param interval the timeout. + * @param consumer the kafka consumer to call poll(). + * @throws InterruptedException if the thread is interrupted. + */ + public static void conditionalSleepWithPoll(Supplier shouldSleepCondition, + long interval, + Consumer consumer) throws InterruptedException { + boolean isFirst = true; + long timeout = System.currentTimeMillis() + interval; + long sleepInterval = interval > SMALL_INTERVAL_THRESHOLD ? DEFAULT_SLEEP_INTERVAL : SMALL_SLEEP_INTERVAL; + do { + Thread.sleep(sleepInterval); + if (!shouldSleepCondition.get()) { + break; + } + + if (isFirst) { + isFirst = false; + } + else { + // To prevent consumer group rebalancing during retry backoff. + consumer.poll(Duration.ZERO); + } + } + while (System.currentTimeMillis() < timeout); + } + /** * Create a new {@link OffsetAndMetadata} using the given container and offset. * @param container a container. @@ -165,4 +200,3 @@ public static OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListene return new OffsetAndMetadata(offset); } } -