Skip to content

Commit b5506ee

Browse files
GH-2742: Exit batch retries when container is paused (#2743)
* GH-2742: Exit batch retries when container is paused * GH-2742: fix code style spaces * GH-2742: better method name for what it does
1 parent bb34b3e commit b5506ee

File tree

4 files changed

+198
-8
lines changed

4 files changed

+198
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
*/
4646
public final class ErrorHandlingUtils {
4747

48+
static Runnable NO_OP = () -> { };
49+
4850
private ErrorHandlingUtils() {
4951
}
5052

@@ -126,25 +128,35 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
126128
consumer.poll(Duration.ZERO);
127129
}
128130
catch (WakeupException we) {
129-
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
131+
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
130132
throw new KafkaException("Woken up during retry", logLevel, we);
131133
}
132134
try {
133-
ListenerUtils.stoppableSleep(container, nextBackOff);
135+
ListenerUtils.conditionalSleep(
136+
() -> container.isRunning() &&
137+
!container.isPauseRequested() &&
138+
records.partitions().stream().noneMatch(container::isPartitionPauseRequested),
139+
nextBackOff
140+
);
134141
}
135142
catch (InterruptedException e1) {
136143
Thread.currentThread().interrupt();
137-
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
144+
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
138145
throw new KafkaException("Interrupted during retry", logLevel, e1);
139146
}
140147
if (!container.isRunning()) {
141148
throw new KafkaException("Container stopped during retries");
142149
}
150+
if (container.isPauseRequested() ||
151+
records.partitions().stream().anyMatch(container::isPartitionPauseRequested)) {
152+
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
153+
throw new KafkaException("Container paused requested during retries");
154+
}
143155
try {
144156
consumer.poll(Duration.ZERO);
145157
}
146158
catch (WakeupException we) {
147-
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
159+
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
148160
throw new KafkaException("Woken up during retry", logLevel, we);
149161
}
150162
try {
@@ -176,7 +188,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
176188
catch (Exception ex) {
177189
logger.error(ex, () -> "Recoverer threw an exception; re-seeking batch");
178190
retryListeners.forEach(listener -> listener.recoveryFailed(records, thrownException, ex));
179-
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
191+
seeker.handleBatch(thrownException, records, consumer, container, NO_OP);
180192
}
181193
}
182194
finally {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.io.ObjectInputStream;
2222
import java.io.ObjectStreamClass;
23+
import java.util.function.Supplier;
2324

2425
import org.apache.kafka.clients.consumer.ConsumerRecord;
2526
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -39,6 +40,7 @@
3940
*
4041
* @author Gary Russell
4142
* @author Francois Rosiere
43+
* @author Antonio Tomac
4244
* @since 2.0
4345
*
4446
*/
@@ -185,11 +187,22 @@ public static void unrecoverableBackOff(BackOff backOff, ThreadLocal<BackOffExec
185187
* @since 2.7
186188
*/
187189
public static void stoppableSleep(MessageListenerContainer container, long interval) throws InterruptedException {
190+
conditionalSleep(container::isRunning, interval);
191+
}
192+
193+
/**
194+
* Sleep for the desired timeout, as long as shouldSleepCondition supplies true.
195+
* @param shouldSleepCondition to.
196+
* @param interval the timeout.
197+
* @throws InterruptedException if the thread is interrupted.
198+
* @since 3.0.9
199+
*/
200+
public static void conditionalSleep(Supplier<Boolean> shouldSleepCondition, long interval) throws InterruptedException {
188201
long timeout = System.currentTimeMillis() + interval;
189202
long sleepInterval = interval > SMALL_INTERVAL_THRESHOLD ? DEFAULT_SLEEP_INTERVAL : SMALL_SLEEP_INTERVAL;
190203
do {
191204
Thread.sleep(sleepInterval);
192-
if (!container.isRunning()) {
205+
if (!shouldSleepCondition.get()) {
193206
break;
194207
}
195208
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2017-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
import static org.mockito.BDDMockito.willReturn;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.reset;
24+
import static org.mockito.Mockito.times;
25+
import static org.mockito.Mockito.verify;
26+
import static org.mockito.Mockito.verifyNoInteractions;
27+
28+
import java.util.ArrayList;
29+
import java.util.Arrays;
30+
import java.util.List;
31+
import java.util.function.BiConsumer;
32+
import java.util.stream.Collectors;
33+
34+
import org.apache.commons.logging.LogFactory;
35+
import org.apache.kafka.clients.consumer.Consumer;
36+
import org.apache.kafka.clients.consumer.ConsumerRecord;
37+
import org.apache.kafka.clients.consumer.ConsumerRecords;
38+
import org.apache.kafka.common.TopicPartition;
39+
import org.junit.jupiter.api.BeforeEach;
40+
import org.junit.jupiter.api.Test;
41+
42+
import org.springframework.classify.BinaryExceptionClassifier;
43+
import org.springframework.core.log.LogAccessor;
44+
import org.springframework.kafka.KafkaException;
45+
import org.springframework.util.backoff.BackOff;
46+
import org.springframework.util.backoff.FixedBackOff;
47+
48+
/**
49+
* @author Antonio Tomac
50+
* @since 3.0.9
51+
*
52+
*/
53+
class ErrorHandlingUtilsTest {
54+
55+
private final Exception thrownException = new RuntimeException("initial cause");
56+
private final Consumer<?, ?> consumer = mock(Consumer.class);
57+
private final MessageListenerContainer container = mock(MessageListenerContainer.class);
58+
private final Runnable listener = mock(Runnable.class);
59+
private final BackOff backOff = new FixedBackOff(1000, 3);
60+
private final CommonErrorHandler seeker = mock(CommonErrorHandler.class);
61+
@SuppressWarnings("unchecked")
62+
private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer = mock(BiConsumer.class);
63+
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(ErrorHandlingUtilsTest.class));
64+
private final List<RetryListener> retryListeners = new ArrayList<>();
65+
private final BinaryExceptionClassifier classifier = BinaryExceptionClassifier.defaultClassifier();
66+
67+
private final ConsumerRecords<?, ?> consumerRecords = recordsOf(
68+
new ConsumerRecord<>("foo", 0, 0L, "a", "a"),
69+
new ConsumerRecord<>("foo", 1, 0L, "b", "b")
70+
);
71+
72+
@SafeVarargs
73+
private <K, V> ConsumerRecords<K, V> recordsOf(ConsumerRecord<K, V>... records) {
74+
return new ConsumerRecords<>(
75+
Arrays.stream(records).collect(Collectors.groupingBy(
76+
(cr) -> new TopicPartition(cr.topic(), cr.partition())
77+
))
78+
);
79+
}
80+
81+
@BeforeEach
82+
public void resetMocks() {
83+
reset(consumer, container, listener, seeker, recoverer);
84+
willReturn(true).given(container).isRunning();
85+
willReturn(false).given(container).isPauseRequested();
86+
}
87+
88+
private void doRetries() {
89+
ErrorHandlingUtils.retryBatch(
90+
thrownException, consumerRecords, consumer, container, listener, backOff,
91+
seeker, recoverer, logger, KafkaException.Level.INFO, retryListeners,
92+
classifier, true
93+
);
94+
}
95+
96+
private long execDurationOf(Runnable runnable) {
97+
long start = System.currentTimeMillis();
98+
runnable.run();
99+
long end = System.currentTimeMillis();
100+
return end - start;
101+
}
102+
103+
@Test
104+
void testStopRetriesWhenNotRunning() {
105+
willReturn(false).given(container).isRunning();
106+
assertThatThrownBy(this::doRetries)
107+
.isInstanceOf(KafkaException.class)
108+
.message().isEqualTo("Container stopped during retries");
109+
verifyNoInteractions(seeker, listener, recoverer);
110+
}
111+
112+
@Test
113+
void testOneSuccessfulRetry() {
114+
long duration = execDurationOf(this::doRetries);
115+
assertThat(duration).as("duration of one round of sleep").isGreaterThanOrEqualTo(1000L);
116+
verifyNoInteractions(seeker, recoverer);
117+
verify(listener, times(1)).run();
118+
verifyNoInteractions(seeker, recoverer);
119+
}
120+
121+
@Test
122+
void stopRetriesWhenContainerIsPaused() {
123+
willReturn(true).given(container).isPauseRequested();
124+
long duration = execDurationOf(() ->
125+
assertThatThrownBy(this::doRetries)
126+
.isInstanceOf(KafkaException.class)
127+
.message().isEqualTo("Container paused requested during retries")
128+
);
129+
assertThat(duration)
130+
.as("duration should not be full retry interval")
131+
.isLessThan(1000L);
132+
verify(seeker).handleBatch(thrownException, consumerRecords, consumer, container, ErrorHandlingUtils.NO_OP);
133+
verifyNoInteractions(listener, recoverer);
134+
}
135+
136+
@Test
137+
void stopRetriesWhenPartitionIsPaused() {
138+
willReturn(true).given(container).isPartitionPauseRequested(new TopicPartition("foo", 1));
139+
long duration = execDurationOf(() ->
140+
assertThatThrownBy(this::doRetries)
141+
.isInstanceOf(KafkaException.class)
142+
.message().isEqualTo("Container paused requested during retries")
143+
);
144+
assertThat(duration)
145+
.as("duration should not be full retry interval")
146+
.isLessThan(1000L);
147+
verify(seeker).handleBatch(thrownException, consumerRecords, consumer, container, ErrorHandlingUtils.NO_OP);
148+
verifyNoInteractions(listener, recoverer);
149+
}
150+
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
/**
2727
* @author Gary Russell
2828
* @author Francois Rosiere
29+
* @author Antonio Tomac
2930
* @since 2.7.1
3031
*
3132
*/
@@ -40,6 +41,20 @@ void stoppableSleep() throws InterruptedException {
4041
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500);
4142
}
4243

44+
@Test
45+
void conditionalSleepWithConditionTrue() throws InterruptedException {
46+
long t1 = System.currentTimeMillis();
47+
ListenerUtils.conditionalSleep(() -> true, 500);
48+
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500);
49+
}
50+
51+
@Test
52+
void conditionalSleepWithConditionFalse() throws InterruptedException {
53+
long t1 = System.currentTimeMillis();
54+
ListenerUtils.conditionalSleep(() -> false, 500);
55+
assertThat(System.currentTimeMillis() - t1).isLessThan(500);
56+
}
57+
4358
@Test
4459
void testCreationOfOffsetAndMetadataWithoutProvider() {
4560
final MessageListenerContainer container = mock(MessageListenerContainer.class);

0 commit comments

Comments
 (0)