Skip to content

Commit 606fcfc

Browse files
GH-3712: Make ContainerPausingBackOffHandler working in batch mode.
Signed-off-by: chickenchickenlove <[email protected]>
1 parent 70bb92f commit 606fcfc

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
* @author Andrii Pelesh
5252
* @author Antonio Tomac
5353
* @author Wang Zhiyang
54+
* @author Sanghyeok An
5455
*
5556
* @since 2.8
5657
*
@@ -113,13 +114,18 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
113114
throw new KafkaException("Woken up during retry", logLevel, we);
114115
}
115116
try {
116-
ListenerUtils.conditionalSleep(
117+
ListenerUtils.conditionalSleepWithPoll(
117118
() -> container.isRunning() &&
118119
!container.isPauseRequested() &&
119120
records.partitions().stream().noneMatch(container::isPartitionPauseRequested),
120-
nextBackOff
121+
nextBackOff,
122+
consumer
121123
);
122124
}
125+
catch (WakeupException we) {
126+
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
127+
throw new KafkaException("Woken up during retry", logLevel, we);
128+
}
123129
catch (InterruptedException e1) {
124130
Thread.currentThread().interrupt();
125131
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.time.Duration;
1920
import java.util.Map;
2021
import java.util.Objects;
2122
import java.util.function.Supplier;
2223

24+
import org.apache.kafka.clients.consumer.Consumer;
2325
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2426
import org.jspecify.annotations.Nullable;
2527

@@ -34,6 +36,7 @@
3436
* @author Francois Rosiere
3537
* @author Antonio Tomac
3638
* @author Wang Zhiyang
39+
* @author Sanghyeok An
3740
* @since 2.0
3841
*
3942
*/
@@ -147,6 +150,36 @@ public static void conditionalSleep(Supplier<Boolean> shouldSleepCondition, long
147150
while (System.currentTimeMillis() < timeout);
148151
}
149152

153+
/**
154+
* Sleep for the desired timeout, as long as shouldSleepCondition supplies true.
155+
* @param shouldSleepCondition to.
156+
* @param interval the timeout.
157+
* @param consumer the kafka consumer to call poll().
158+
* @throws InterruptedException if the thread is interrupted.
159+
*/
160+
public static void conditionalSleepWithPoll(Supplier<Boolean> shouldSleepCondition,
161+
long interval,
162+
Consumer<?, ?> consumer) throws InterruptedException {
163+
boolean isFirst = true;
164+
long timeout = System.currentTimeMillis() + interval;
165+
long sleepInterval = interval > SMALL_INTERVAL_THRESHOLD ? DEFAULT_SLEEP_INTERVAL : SMALL_SLEEP_INTERVAL;
166+
do {
167+
Thread.sleep(sleepInterval);
168+
if (!shouldSleepCondition.get()) {
169+
break;
170+
}
171+
172+
if (isFirst) {
173+
isFirst = false;
174+
}
175+
else {
176+
// To prevent consumer group rebalancing during retry backoff.
177+
consumer.poll(Duration.ZERO);
178+
}
179+
}
180+
while (System.currentTimeMillis() < timeout);
181+
}
182+
150183
/**
151184
* Create a new {@link OffsetAndMetadata} using the given container and offset.
152185
* @param container a container.
@@ -165,4 +198,3 @@ public static OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListene
165198
return new OffsetAndMetadata(offset);
166199
}
167200
}
168-

0 commit comments

Comments
 (0)