Skip to content

Commit 7edf9f4

Browse files
garyrussellartembilan
authored andcommitted
GH-960: Seek-to-Current - commit recovered offset (#961)
* GH-960: Seek-to-Current - commit recovered offset Resolves #960 When the container is configured with `AckMode.MANUAL_IMMEDIATE`, the `SeekToCurrentErrorHandler` can be configured to commit the offset of a recovered record. * Polishing - PR Comments
1 parent 3d8de78 commit 7edf9f4

File tree

7 files changed

+166
-26
lines changed

7 files changed

+166
-26
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,7 @@ public enum AckMode {
194194

195195
/**
196196
* Whether or not to call consumer.commitSync() or commitAsync() when the
197-
* container is responsible for commits. Default true. See
198-
* https://github.com/spring-projects/spring-kafka/issues/62 At the time of
199-
* writing, async commits are not entirely reliable.
197+
* container is responsible for commits. Default true.
200198
*/
201199
private boolean syncCommits = true;
202200

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1741,26 +1741,6 @@ else if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommi
17411741

17421742
}
17431743

1744-
private static final class LoggingCommitCallback implements OffsetCommitCallback {
1745-
1746-
private static final Log logger = LogFactory.getLog(LoggingCommitCallback.class); // NOSONAR
1747-
1748-
LoggingCommitCallback() {
1749-
super();
1750-
}
1751-
1752-
@Override
1753-
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
1754-
if (exception != null) {
1755-
logger.error("Commit failed for " + offsets, exception);
1756-
}
1757-
else if (logger.isDebugEnabled()) {
1758-
logger.debug("Commits for " + offsets + " completed");
1759-
}
1760-
}
1761-
1762-
}
1763-
17641744
private static final class OffsetMetadata {
17651745

17661746
private final Long offset;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.Map;
20+
21+
import org.apache.commons.logging.Log;
22+
import org.apache.commons.logging.LogFactory;
23+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
24+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
25+
import org.apache.kafka.common.TopicPartition;
26+
27+
/**
28+
* Logs commit results at DEBUG level for success and ERROR for failures.
29+
*
30+
* @author Gary Russell
31+
* @since 2.2.4
32+
*/
33+
public final class LoggingCommitCallback implements OffsetCommitCallback {
34+
35+
private static final Log logger = LogFactory.getLog(LoggingCommitCallback.class); // NOSONAR
36+
37+
@Override
38+
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
39+
if (exception != null) {
40+
logger.error("Commit failed for " + offsets, exception);
41+
}
42+
else if (logger.isDebugEnabled()) {
43+
logger.debug("Commits for " + offsets + " completed");
44+
}
45+
}
46+
47+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,21 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collections;
1920
import java.util.List;
21+
import java.util.Map;
2022
import java.util.function.BiConsumer;
2123

2224
import org.apache.commons.logging.Log;
2325
import org.apache.commons.logging.LogFactory;
2426
import org.apache.kafka.clients.consumer.Consumer;
2527
import org.apache.kafka.clients.consumer.ConsumerRecord;
28+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
29+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
30+
import org.apache.kafka.common.TopicPartition;
2631

2732
import org.springframework.kafka.KafkaException;
33+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
2834
import org.springframework.kafka.support.SeekUtils;
2935
import org.springframework.lang.Nullable;
3036

@@ -39,10 +45,14 @@
3945
*/
4046
public class SeekToCurrentErrorHandler implements ContainerAwareErrorHandler {
4147

42-
private static final Log logger = LogFactory.getLog(SeekToCurrentErrorHandler.class); // NOSONAR
48+
protected static final Log LOGGER = LogFactory.getLog(SeekToCurrentErrorHandler.class); // NOSONAR visibility
49+
50+
private static final LoggingCommitCallback LOGGING_COMMIT_CALLBACK = new LoggingCommitCallback();
4351

4452
private final FailedRecordTracker failureTracker;
4553

54+
private boolean commitRecovered;
55+
4656
/**
4757
* Construct an instance with the default recoverer which simply logs the record after
4858
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
@@ -82,16 +92,59 @@ public SeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> rec
8292
* @since 2.2
8393
*/
8494
public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, int maxFailures) {
85-
this.failureTracker = new FailedRecordTracker(recoverer, maxFailures, logger);
95+
this.failureTracker = new FailedRecordTracker(recoverer, maxFailures, LOGGER);
96+
}
97+
98+
/**
99+
* Whether the offset for a recovered record should be committed.
100+
* @return true to commit recovered record offsets.
101+
* @since 2.2.4
102+
*/
103+
protected boolean isCommitRecovered() {
104+
return this.commitRecovered;
105+
}
106+
107+
/**
108+
* Set to true to commit the offset for a recovered record. The container
109+
* must be configured with {@link AckMode#MANUAL_IMMEDIATE}. Whether or not
110+
* the commit is sync or async depends on the container's syncCommits
111+
* property.
112+
* @param commitRecovered true to commit.
113+
* @since 2.2.4
114+
* @see #setOffsetCommitCallback(OffsetCommitCallback)
115+
*/
116+
public void setCommitRecovered(boolean commitRecovered) {
117+
this.commitRecovered = commitRecovered;
86118
}
87119

88120
@Override
89121
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
90122
Consumer<?, ?> consumer, MessageListenerContainer container) {
91123

92-
if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, logger)) {
124+
if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, LOGGER)) {
93125
throw new KafkaException("Seek to current after exception", thrownException);
94126
}
127+
else if (this.commitRecovered) {
128+
if (container.getContainerProperties().getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) {
129+
ConsumerRecord<?, ?> record = records.get(0);
130+
Map<TopicPartition, OffsetAndMetadata> offsetToCommit = Collections.singletonMap(
131+
new TopicPartition(record.topic(), record.partition()),
132+
new OffsetAndMetadata(record.offset() + 1));
133+
if (container.getContainerProperties().isSyncCommits()) {
134+
consumer.commitSync(offsetToCommit);
135+
}
136+
else {
137+
OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
138+
if (commitCallback == null) {
139+
commitCallback = LOGGING_COMMIT_CALLBACK;
140+
}
141+
consumer.commitAsync(offsetToCommit, commitCallback);
142+
}
143+
}
144+
else {
145+
LOGGER.warn("'commitRecovered' ignored, container AckMode must be MANUAL_IMMEDIATE");
146+
}
147+
}
95148
}
96149

97150
@Override

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.assertj.core.api.Assertions.fail;
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.ArgumentMatchers.eq;
23+
import static org.mockito.BDDMockito.given;
2324
import static org.mockito.Mockito.mock;
2425
import static org.mockito.Mockito.never;
2526
import static org.mockito.Mockito.spy;
@@ -28,6 +29,7 @@
2829
import static org.mockito.Mockito.verifyNoMoreInteractions;
2930

3031
import java.util.ArrayList;
32+
import java.util.Collections;
3133
import java.util.List;
3234
import java.util.Map;
3335
import java.util.concurrent.CountDownLatch;
@@ -38,6 +40,8 @@
3840
import org.apache.kafka.clients.consumer.Consumer;
3941
import org.apache.kafka.clients.consumer.ConsumerConfig;
4042
import org.apache.kafka.clients.consumer.ConsumerRecord;
43+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
44+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
4145
import org.apache.kafka.clients.producer.ProducerConfig;
4246
import org.apache.kafka.common.TopicPartition;
4347
import org.junit.ClassRule;
@@ -48,6 +52,7 @@
4852
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4953
import org.springframework.kafka.core.KafkaTemplate;
5054
import org.springframework.kafka.event.ConsumerStoppedEvent;
55+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
5156
import org.springframework.kafka.test.EmbeddedKafkaBroker;
5257
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
5358
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -160,6 +165,58 @@ public void seekToCurrentErrorHandlerRecovers() {
160165
verify(recoverer).accept(eq(records.get(0)), any());
161166
}
162167

168+
@Test
169+
public void seekToCurrentErrorHandlerRecoversManualAcksAsync() {
170+
seekToCurrentErrorHandlerRecoversManualAcks(false);
171+
}
172+
173+
@Test
174+
public void seekToCurrentErrorHandlerRecoversManualAcksSync() {
175+
seekToCurrentErrorHandlerRecoversManualAcks(true);
176+
}
177+
178+
private void seekToCurrentErrorHandlerRecoversManualAcks(boolean syncCommits) {
179+
@SuppressWarnings("unchecked")
180+
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = mock(BiConsumer.class);
181+
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, 2);
182+
eh.setCommitRecovered(true);
183+
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
184+
records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo"));
185+
records.add(new ConsumerRecord<>("foo", 1, 0, null, "bar"));
186+
Consumer<?, ?> consumer = mock(Consumer.class);
187+
MessageListenerContainer container = mock(MessageListenerContainer.class);
188+
ContainerProperties properties = new ContainerProperties("foo");
189+
properties.setAckMode(AckMode.MANUAL_IMMEDIATE);
190+
properties.setSyncCommits(syncCommits);
191+
OffsetCommitCallback commitCallback = (offsets, ex) -> { };
192+
properties.setCommitCallback(commitCallback);
193+
given(container.getContainerProperties()).willReturn(properties);
194+
try {
195+
eh.handle(new RuntimeException(), records, consumer, container);
196+
fail("Expected exception");
197+
}
198+
catch (KafkaException e) {
199+
// NOSONAR
200+
}
201+
verify(consumer).seek(new TopicPartition("foo", 0), 0L);
202+
verify(consumer).seek(new TopicPartition("foo", 1), 0L);
203+
verifyNoMoreInteractions(consumer);
204+
eh.handle(new RuntimeException(), records, consumer, container);
205+
verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 0L);
206+
if (syncCommits) {
207+
verify(consumer)
208+
.commitSync(Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)));
209+
}
210+
else {
211+
verify(consumer)
212+
.commitAsync(
213+
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)),
214+
commitCallback);
215+
}
216+
verifyNoMoreInteractions(consumer);
217+
verify(recoverer).accept(eq(records.get(0)), any());
218+
}
219+
163220
@Test
164221
public void testNeverRecover() {
165222
@SuppressWarnings("unchecked")

src/reference/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2770,6 +2770,8 @@ SeekToCurrentErrorHandler errorHandler =
27702770
----
27712771
====
27722772

2773+
Starting with version 2.2.4, when the container is configured with `AckMode.MANUAL_IMMEDIATE`, the error handler can be configured to commit the offset of recovered records; set the `commitRecovered` property to `true`.
2774+
27732775
See also <<dead-letters>>.
27742776

27752777
When using transactions, similar functionality is provided by the `DefaultAfterRollbackProcessor`.

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ See <<after-rollback>>, <<seek-to-current>>, and <<dead-letters>> for more infor
4545
The `ConsumerStoppingEvent` has been added.
4646
See <<events>> for more information.
4747

48+
The `SeekToCurrentErrorHandler` can now be configured to commit the offset of a recovered record when the container is configured with `AckMode.MANUAL_IMMEDIATE` (since 2.2.4).
49+
See <<seek-to-current>> for more information.
50+
4851
==== @KafkaListener Changes
4952

5053
You can now override the `concurrency` and `autoStartup` properties of the listener container factory by setting properties on the annotation.

0 commit comments

Comments
 (0)