Skip to content

Commit 633f566

Browse files
garyrussellartembilan
authored andcommitted
GH-1273: Fix offset commit after recovery
Resolves #1273 Previously, offsets were only committed if an error handler "handled" an error (did not throw an exception), when transactions are being used. Now, if an error handler successfully recovers, the offset will automatically be committed. Since this is a behavior change, a new boolean `ackAfterHandle()` is added to the `GenericErrorHandler`, default `false` and the `SeekToCurrentErrorHandler` returns `true`; it is the only framwork handler that can recover from failures. In a future release, this boolean will be `true` by default. * Fix default ackAfterHandle=true * Add docs
1 parent e671eea commit 633f566

File tree

6 files changed

+195
-18
lines changed

6 files changed

+195
-18
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/GenericErrorHandler.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,15 @@ default void clearThreadState() {
5656
// NOSONAR
5757
}
5858

59+
/**
60+
* Return true if the offset should be committed for a handled error (no exception
61+
* thrown).
62+
* @return true to commit.
63+
* @since 2.3.2
64+
*/
65+
default boolean isAckAfterHandle() {
66+
// TODO: Default true in the next release.
67+
return false;
68+
}
69+
5970
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,14 +1264,24 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
12641264
}
12651265
catch (RuntimeException e) {
12661266
failureTimer(sample);
1267-
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
1267+
boolean acked = this.containerProperties.isAckOnError() && !this.autoCommit && producer == null;
1268+
if (acked) {
12681269
this.acks.addAll(getHighestOffsetRecords(records));
12691270
}
12701271
if (this.batchErrorHandler == null) {
12711272
throw e;
12721273
}
12731274
try {
1274-
invokeBatchErrorHandler(records, producer, e);
1275+
invokeBatchErrorHandler(records, e);
1276+
// unlikely, but possible, that a batch error handler "handles" the error
1277+
if ((!acked && !this.autoCommit && this.batchErrorHandler.isAckAfterHandle()) || producer != null) {
1278+
if (!acked) {
1279+
this.acks.addAll(getHighestOffsetRecords(records));
1280+
}
1281+
if (producer != null) {
1282+
sendOffsetsToTransaction(producer);
1283+
}
1284+
}
12751285
}
12761286
catch (RuntimeException ee) {
12771287
this.logger.error(ee, "Error handler threw an exception");
@@ -1376,21 +1386,14 @@ private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
13761386
}
13771387
}
13781388

1379-
private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
1380-
@SuppressWarnings(RAW_TYPES) @Nullable Producer producer, RuntimeException e) {
1381-
1389+
private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records, RuntimeException e) {
13821390
if (this.batchErrorHandler instanceof ContainerAwareBatchErrorHandler) {
13831391
this.batchErrorHandler.handle(decorateException(e), records, this.consumer,
13841392
KafkaMessageListenerContainer.this.container);
13851393
}
13861394
else {
13871395
this.batchErrorHandler.handle(decorateException(e), records, this.consumer);
13881396
}
1389-
// if the handler handled the error (no exception), go ahead and commit
1390-
if (producer != null) {
1391-
this.acks.addAll(getHighestOffsetRecords(records));
1392-
sendOffsetsToTransaction(producer);
1393-
}
13941397
}
13951398

13961399
private void invokeRecordListener(final ConsumerRecords<K, V> records) {
@@ -1536,14 +1539,18 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
15361539
}
15371540
catch (RuntimeException e) {
15381541
failureTimer(sample);
1539-
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
1542+
boolean acked = this.containerProperties.isAckOnError() && !this.autoCommit && producer == null;
1543+
if (acked) {
15401544
ackCurrent(record);
15411545
}
15421546
if (this.errorHandler == null) {
15431547
throw e;
15441548
}
15451549
try {
15461550
invokeErrorHandler(record, producer, iterator, e);
1551+
if ((!acked && !this.autoCommit && this.errorHandler.isAckAfterHandle()) || producer != null) {
1552+
ackCurrent(record, producer);
1553+
}
15471554
}
15481555
catch (RuntimeException ee) {
15491556
this.logger.error(ee, "Error handler threw an exception");
@@ -1629,9 +1636,6 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
16291636
else {
16301637
this.errorHandler.handle(decorateException(e), record, this.consumer);
16311638
}
1632-
if (producer != null) {
1633-
ackCurrent(record, producer);
1634-
}
16351639
}
16361640

16371641
private Exception decorateException(RuntimeException e) {

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class SeekToCurrentErrorHandler extends FailedRecordProcessor implements
5454

5555
private static final LoggingCommitCallback LOGGING_COMMIT_CALLBACK = new LoggingCommitCallback();
5656

57+
private boolean ackAfterHandle = true;
58+
5759
/**
5860
* Construct an instance with the default recoverer which simply logs the record after
5961
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
@@ -166,6 +168,7 @@ public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced ja
166168
* @since 2.3
167169
* @deprecated in favor of {@link #setClassifications(Map, boolean)}.
168170
*/
171+
@Override
169172
@Deprecated
170173
public void setClassifier(BinaryExceptionClassifier classifier) {
171174
Assert.notNull(classifier, "'classifier' + cannot be null");
@@ -205,4 +208,18 @@ public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records
205208
}
206209
}
207210

211+
@Override
212+
public boolean isAckAfterHandle() {
213+
return this.ackAfterHandle;
214+
}
215+
216+
/**
217+
* Set to false to tell the container to NOT commit the offset for a recovered record.
218+
* @param ackAfterHandle false to suppress committing the offset.
219+
* @since 2.3.2
220+
*/
221+
public void setAckAfterHandle(boolean ackAfterHandle) {
222+
this.ackAfterHandle = ackAfterHandle;
223+
}
224+
208225
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.springframework.kafka.test.utils.ContainerTestUtils;
100100
import org.springframework.kafka.test.utils.KafkaTestUtils;
101101
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
102+
import org.springframework.util.backoff.FixedBackOff;
102103

103104
/**
104105
* Tests for the listener container.
@@ -615,6 +616,71 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
615616
container.stop();
616617
}
617618

619+
@SuppressWarnings("unchecked")
620+
@Test
621+
public void testRecordAckAfterRecoveryMock() throws Exception {
622+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
623+
Consumer<Integer, String> consumer = mock(Consumer.class);
624+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
625+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
626+
records.put(new TopicPartition("foo", 0), Arrays.asList(
627+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
628+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
629+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
630+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
631+
Thread.sleep(50);
632+
return consumerRecords;
633+
});
634+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
635+
new TopicPartitionOffset("foo", 0) };
636+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
637+
containerProps.setGroupId("grp");
638+
containerProps.setAckMode(AckMode.RECORD);
639+
containerProps.setMissingTopicsFatal(false);
640+
final CountDownLatch latch = new CountDownLatch(2);
641+
MessageListener<Integer, String> messageListener = spy(
642+
new MessageListener<Integer, String>() { // Cannot be lambda: Mockito doesn't mock final classes
643+
644+
@Override
645+
public void onMessage(ConsumerRecord<Integer, String> data) {
646+
latch.countDown();
647+
if (latch.getCount() == 0) {
648+
records.clear();
649+
}
650+
if (data.offset() == 1L) {
651+
throw new IllegalStateException();
652+
}
653+
}
654+
655+
});
656+
657+
final CountDownLatch commitLatch = new CountDownLatch(2);
658+
659+
willAnswer(i -> {
660+
commitLatch.countDown();
661+
return null;
662+
}
663+
).given(consumer).commitSync(anyMap(), any());
664+
665+
containerProps.setMessageListener(messageListener);
666+
containerProps.setClientId("clientId");
667+
KafkaMessageListenerContainer<Integer, String> container =
668+
new KafkaMessageListenerContainer<>(cf, containerProps);
669+
SeekToCurrentErrorHandler errorHandler = spy(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 0)));
670+
container.setErrorHandler(errorHandler);
671+
container.start();
672+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
673+
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
674+
InOrder inOrder = inOrder(messageListener, consumer, errorHandler);
675+
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
676+
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
677+
inOrder.verify(consumer).commitSync(anyMap(), any());
678+
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
679+
inOrder.verify(errorHandler).handle(any(), any(), any(), any());
680+
inOrder.verify(consumer).commitSync(anyMap(), any());
681+
container.stop();
682+
}
683+
618684
@SuppressWarnings("unchecked")
619685
@Test
620686
public void testRecordAckAfterStop() throws Exception {
@@ -1121,6 +1187,66 @@ public void testBatchListenerErrors() throws Exception {
11211187
logger.info("Stop batch listener errors");
11221188
}
11231189

1190+
@SuppressWarnings("unchecked")
1191+
@Test
1192+
public void testBatchListenerAckAfterRecoveryMock() throws Exception {
1193+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
1194+
Consumer<Integer, String> consumer = mock(Consumer.class);
1195+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
1196+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
1197+
records.put(new TopicPartition("foo", 0), Arrays.asList(
1198+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
1199+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
1200+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
1201+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
1202+
Thread.sleep(50);
1203+
return consumerRecords;
1204+
});
1205+
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
1206+
new TopicPartitionOffset("foo", 0) };
1207+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
1208+
containerProps.setGroupId("grp");
1209+
containerProps.setMissingTopicsFatal(false);
1210+
final CountDownLatch latch = new CountDownLatch(1);
1211+
BatchMessageListener<Integer, String> messageListener = spy(
1212+
new BatchMessageListener<Integer, String>() { // Cannot be lambda: Mockito doesn't mock final classes
1213+
1214+
@Override
1215+
public void onMessage(List<ConsumerRecord<Integer, String>> data) {
1216+
latch.countDown();
1217+
throw new IllegalStateException();
1218+
}
1219+
1220+
1221+
});
1222+
1223+
final CountDownLatch commitLatch = new CountDownLatch(1);
1224+
1225+
willAnswer(i -> {
1226+
commitLatch.countDown();
1227+
records.clear();
1228+
return null;
1229+
}
1230+
).given(consumer).commitSync(anyMap(), any());
1231+
1232+
containerProps.setMessageListener(messageListener);
1233+
containerProps.setClientId("clientId");
1234+
KafkaMessageListenerContainer<Integer, String> container =
1235+
new KafkaMessageListenerContainer<>(cf, containerProps);
1236+
BatchErrorHandler errorHandler = mock(BatchErrorHandler.class);
1237+
given(errorHandler.isAckAfterHandle()).willReturn(true);
1238+
container.setBatchErrorHandler(errorHandler);
1239+
container.start();
1240+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
1241+
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
1242+
InOrder inOrder = inOrder(messageListener, consumer, errorHandler);
1243+
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
1244+
inOrder.verify(messageListener).onMessage(any());
1245+
inOrder.verify(errorHandler).handle(any(), any(), any());
1246+
inOrder.verify(consumer).commitSync(anyMap(), any());
1247+
container.stop();
1248+
}
1249+
11241250
@Test
11251251
public void testSeek() throws Exception {
11261252
Map<String, Object> props = KafkaTestUtils.consumerProps("test11", "false", embeddedKafka);

src/reference/asciidoc/kafka.adoc

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1736,11 +1736,9 @@ IMPORTANT: The `FilteringBatchMessageListenerAdapter` is ignored if your `@Kafka
17361736
[[retrying-deliveries]]
17371737
===== Retrying Deliveries
17381738

1739-
If your listener throws an exception, the default behavior is to invoke the `ErrorHandler`, if configured, or logged otherwise.
1740-
1741-
NOTE: Two error handler interfaces (`ErrorHandler` and `BatchErrorHandler`) are provided.
1742-
You must configure the appropriate type to match the <<message-listeners,message listener>>.
1739+
If your listener throws an exception, the default behavior is to invoke the <<error-handlers>>, if configured, or logged otherwise.
17431740

1741+
NOTE:
17441742
To retry deliveries, a convenient listener adapter `RetryingMessageListenerAdapter` is provided.
17451743

17461744
You can configure it with a `RetryTemplate` and `RecoveryCallback<Void>` - see the https://github.com/spring-projects/spring-retry[spring-retry] project for information about these components.
@@ -3400,8 +3398,22 @@ This resets each topic/partition in the batch to the lowest offset in the batch.
34003398

34013399
NOTE: The preceding two examples are simplistic implementations, and you would probably want more checking in the error handler.
34023400

3401+
[[error-handlers]]
34033402
===== Container Error Handlers
34043403

3404+
Two error handler interfaces (`ErrorHandler` and `BatchErrorHandler`) are provided.
3405+
You must configure the appropriate type to match the <<message-listeners,message listener>>.
3406+
3407+
By default, errors are simply logged when transactions are not being used.
3408+
When transactions are being used, no error handlers are configured, by default, so that the exception will roll back the transaction.
3409+
If you provide a custom error handler when using transactions, it must throw an exception if you want the transaction rolled back.
3410+
3411+
Starting with version 2.3.2, these interfaces have a default method `isAckAfterHandle()` which is called by the container to determine whether the offset(s) should be committed if the error handler returns without throwing an exception.
3412+
This returns false by default, for backwards compatibility.
3413+
In most cases, however, we expect that the offset should be committed.
3414+
For example, the <<seek-to-current, `SeekToCurrentErrorHandler`>> returns `true` if a record is recovered (after any retries, if so configured).
3415+
In a future release, we expect to change this default to `true`.
3416+
34053417
You can specify a global error handler to be used for all listeners in the container factory.
34063418
The following example shows how to do so:
34073419

@@ -3441,6 +3453,8 @@ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,
34413453

34423454
By default, if an annotated listener method throws an exception, it is thrown to the container, and the message is handled according to the container configuration.
34433455

3456+
If you are using Spring Boot, you simply need to add the error handler as a `@Bean` and boot will add it to the auto-configured factory.
3457+
34443458
===== Consumer-Aware Container Error Handlers
34453459

34463460
The container-level error handlers (`ErrorHandler` and `BatchErrorHandler`) have sub-interfaces called `ConsumerAwareErrorHandler` and `ConsumerAwareBatchErrorHandler`.
@@ -3593,6 +3607,9 @@ Again, the maximum delay must be less than the `max.poll.interval.ms` consumer p
35933607

35943608
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
35953609

3610+
Starting with version 2.3.2, after a record has been recovered, its offset will be committed (if one of the container `AckMode` s is configured).
3611+
To revert to the previous behavior, set the error handler's `ackAfterHandle` property to false.
3612+
35963613
===== Container Stopping Error Handlers
35973614

35983615
The `ContainerStoppingErrorHandler` (used with record listeners) stops the container if the listener throws an exception.

src/reference/asciidoc/whats-new.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ The `SeekToCurrentErrorHandler` now treats certain exceptions as fatal and disab
7070

7171
The `SeekToCurrentErrorHandler` and `SeekToCurrentBatchErrorHandler` can now be configured to apply a `BackOff` (thread sleep) between delivery attempts.
7272

73+
Starting with version 2.3.2, recovered records' offsets will be committed when the error handler returns after recovering a failed record.
74+
7375
See <<seek-to-current>> for more information.
7476

7577
The `DeadLetterPublishingRecoverer`, when used in conjunction with an `ErrorHandlingDeserializer2`, now sets the payload of the message sent to the dead-letter topic, to the original value that could not be deserialized.

0 commit comments

Comments
 (0)