Skip to content

Commit c226c9e

Browse files
garyrussellartembilan
authored andcommitted
GH-990: Run AfterRollbackProcessor in Tx
Resolves #990 Provide a mechanism to start a new transaction within which to invoke the processor, so if it recovers the failed record, its offset can be sent to the transaction. **cherry-pick to 2.2.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java
1 parent 2d8a094 commit c226c9e

File tree

6 files changed

+151
-30
lines changed

6 files changed

+151
-30
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 the original author or authors.
2+
* Copyright 2018-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,11 +44,17 @@ public interface AfterRollbackProcessor<K, V> {
4444
* processing individual records; this allows the processor to recover (skip) the
4545
* failed record rather than re-seeking it. This is not possible with a batch listener
4646
* since only the listener itself knows which record in the batch keeps failing.
47+
* IMPORTANT: If invoked in a transaction when the listener was invoked with a single
48+
* record, the transaction id will be based on the container group.id and the
49+
* topic/partition of the failed record, to avoid issues with zombie fencing. So,
50+
* generally, only its offset should be sent to the transaction. For other behavior
51+
* the process method should manage its own transaction.
4752
* @param records the records.
4853
* @param consumer the consumer.
4954
* @param exception the exception
5055
* @param recoverable the recoverable.
5156
* @since 2.2
57+
* @see #isProcessInTransaction()
5258
*/
5359
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception, boolean recoverable);
5460

@@ -61,4 +67,17 @@ default void clearThreadState() {
6167
// NOSONAR
6268
}
6369

70+
/**
71+
* Return true to invoke {@link #process(List, Consumer, Exception, boolean)} in a new
72+
* transaction. Because the container cannot infer the desired behavior, the processor
73+
* is responsible for sending the offset to the transaction if it decides to skip the
74+
* failing record.
75+
* @return true to run in a transaction; default false.
76+
* @since 2.2.5
77+
* @see #process(List, Consumer, Exception, boolean)
78+
*/
79+
default boolean isProcessInTransaction() {
80+
return false;
81+
}
82+
6483
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 the original author or authors.
2+
* Copyright 2018-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -120,9 +120,15 @@ protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?,
120120
record.key(), record.value(), headers);
121121
}
122122

123-
private void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> template) {
123+
/**
124+
* Override this if you want more than just logging of the send result.
125+
* @param outRecord the record to send.
126+
* @param kafkaTemplate the template.
127+
* @since 2.2.5
128+
*/
129+
protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate) {
124130
try {
125-
template.send(outRecord).addCallback(result -> {
131+
kafkaTemplate.send(outRecord).addCallback(result -> {
126132
if (logger.isDebugEnabled()) {
127133
logger.debug("Successful dead-letter publication: " + result);
128134
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 the original author or authors.
2+
* Copyright 2018-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,14 +16,18 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collections;
1920
import java.util.List;
2021
import java.util.function.BiConsumer;
2122

2223
import org.apache.commons.logging.Log;
2324
import org.apache.commons.logging.LogFactory;
2425
import org.apache.kafka.clients.consumer.Consumer;
2526
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
28+
import org.apache.kafka.common.TopicPartition;
2629

30+
import org.springframework.kafka.core.KafkaTemplate;
2731
import org.springframework.kafka.support.SeekUtils;
2832
import org.springframework.lang.Nullable;
2933

@@ -49,6 +53,10 @@ public class DefaultAfterRollbackProcessor<K, V> implements AfterRollbackProcess
4953

5054
private final FailedRecordTracker failureTracker;
5155

56+
private boolean processInTransaction;
57+
58+
private KafkaTemplate<K, V> kafkaTemplate;
59+
5260
/**
5361
* Construct an instance with the default recoverer which simply logs the record after
5462
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
@@ -96,8 +104,42 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
96104
@Override
97105
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
98106
boolean recoverable) {
99-
SeekUtils.doSeeks(((List) records),
100-
consumer, exception, recoverable, this.failureTracker::skip, logger);
107+
108+
if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable, this.failureTracker::skip, logger)
109+
&& this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
110+
ConsumerRecord<K, V> skipped = records.get(0);
111+
this.kafkaTemplate.sendOffsetsToTransaction(
112+
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
113+
new OffsetAndMetadata(skipped.offset() + 1)));
114+
}
115+
}
116+
117+
@Override
118+
public boolean isProcessInTransaction() {
119+
return this.processInTransaction;
120+
}
121+
122+
/**
123+
* Set to true to run the {@link #process(List, Consumer, Exception, boolean)}
124+
* method in a transaction. Requires a {@link KafkaTemplate}.
125+
* @param processInTransaction true to process in a transaction.
126+
* @since 2.2.5
127+
* @see #process(List, Consumer, Exception, boolean)
128+
* @see #setKafkaTemplate(KafkaTemplate)
129+
*/
130+
public void setProcessInTransaction(boolean processInTransaction) {
131+
this.processInTransaction = processInTransaction;
132+
}
133+
134+
/**
135+
* Set a {@link KafkaTemplate} to use to send the offset of a recovered record
136+
* to a transaction.
137+
* @param kafkaTemplate the template
138+
* @since 2.2.5
139+
* @see #setProcessInTransaction(boolean)
140+
*/
141+
public void setKafkaTemplate(KafkaTemplate<K, V> kafkaTemplate) {
142+
this.kafkaTemplate = kafkaTemplate;
101143
}
102144

103145
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -573,14 +573,14 @@ private Object findDeserializerClass(Map<String, Object> props, boolean isValue)
573573
}
574574
}
575575

576-
private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> consumer) {
576+
private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscribingConsumer) {
577577
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
578578
ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
579579
if (this.containerProperties.getTopicPattern() != null) {
580-
consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
580+
subscribingConsumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
581581
}
582582
else {
583-
consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
583+
subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
584584
}
585585
}
586586
else {
@@ -592,7 +592,7 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> consum
592592
new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent(),
593593
topicPartition.getPosition()));
594594
}
595-
consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
595+
subscribingConsumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
596596
}
597597
}
598598

@@ -695,7 +695,7 @@ public void run() {
695695
try {
696696
pollAndInvoke();
697697
}
698-
catch (WakeupException e) {
698+
catch (@SuppressWarnings("unused") WakeupException e) {
699699
// Ignore, we're stopping
700700
}
701701
catch (NoOffsetForPartitionException nofpe) {
@@ -814,7 +814,7 @@ public void wrapUp() {
814814
try {
815815
this.consumer.unsubscribe();
816816
}
817-
catch (WakeupException e) {
817+
catch (@SuppressWarnings("unused") WakeupException e) {
818818
// No-op. Continue process
819819
}
820820
}
@@ -900,7 +900,7 @@ private void processAck(ConsumerRecord<K, V> record) {
900900
try {
901901
ackImmediate(record);
902902
}
903-
catch (WakeupException e) {
903+
catch (@SuppressWarnings("unused") WakeupException e) {
904904
// ignore - not polling
905905
}
906906
}
@@ -950,6 +950,7 @@ private void invokeBatchListener(final ConsumerRecords<K, V> records) {
950950
@SuppressWarnings({ UNCHECKED, RAW_TYPES })
951951
private void invokeBatchListenerInTx(final ConsumerRecords<K, V> records,
952952
final List<ConsumerRecord<K, V>> recordList) {
953+
953954
try {
954955
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
955956

@@ -972,15 +973,34 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
972973
this.logger.error("Transaction rolled back", e);
973974
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse =
974975
(AfterRollbackProcessor<K, V>) getAfterRollbackProcessor();
975-
if (recordList == null) {
976-
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false);
976+
if (afterRollbackProcessorToUse.isProcessInTransaction() && this.transactionTemplate != null) {
977+
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
978+
979+
@Override
980+
protected void doInTransactionWithoutResult(TransactionStatus status) {
981+
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
982+
}
983+
984+
});
977985
}
978986
else {
979-
afterRollbackProcessorToUse.process(recordList, this.consumer, e, false);
987+
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
980988
}
981989
}
982990
}
983991

992+
private void batchAfterRollback(final ConsumerRecords<K, V> records,
993+
final List<ConsumerRecord<K, V>> recordList, RuntimeException e,
994+
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {
995+
996+
if (recordList == null) {
997+
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false);
998+
}
999+
else {
1000+
afterRollbackProcessorToUse.process(recordList, this.consumer, e, false);
1001+
}
1002+
}
1003+
9841004
private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V> records) {
9851005
return StreamSupport.stream(records.spliterator(), false)
9861006
.collect(Collectors.toList());
@@ -1020,7 +1040,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
10201040
throw er;
10211041
}
10221042
}
1023-
catch (InterruptedException e) {
1043+
catch (@SuppressWarnings("unused") InterruptedException e) {
10241044
Thread.currentThread().interrupt();
10251045
}
10261046
return null;
@@ -1101,7 +1121,7 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
11011121
* Invoke the listener with each record in a separate transaction.
11021122
* @param records the records.
11031123
*/
1104-
@SuppressWarnings({ UNCHECKED, RAW_TYPES })
1124+
@SuppressWarnings(RAW_TYPES)
11051125
private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
11061126
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
11071127
while (iterator.hasNext()) {
@@ -1132,20 +1152,40 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
11321152
}
11331153
catch (RuntimeException e) {
11341154
this.logger.error("Transaction rolled back", e);
1135-
List<ConsumerRecord<K, V>> unprocessed = new ArrayList<>();
1136-
unprocessed.add(record);
1137-
while (iterator.hasNext()) {
1138-
unprocessed.add(iterator.next());
1139-
}
1140-
((AfterRollbackProcessor<K, V>) getAfterRollbackProcessor())
1141-
.process(unprocessed, this.consumer, e, true);
1155+
recordAfterRollback(iterator, record, e);
11421156
}
11431157
finally {
11441158
TransactionSupport.clearTransactionIdSuffix();
11451159
}
11461160
}
11471161
}
11481162

1163+
private void recordAfterRollback(Iterator<ConsumerRecord<K, V>> iterator, final ConsumerRecord<K, V> record,
1164+
RuntimeException e) {
1165+
1166+
List<ConsumerRecord<K, V>> unprocessed = new ArrayList<>();
1167+
unprocessed.add(record);
1168+
while (iterator.hasNext()) {
1169+
unprocessed.add(iterator.next());
1170+
}
1171+
@SuppressWarnings(UNCHECKED)
1172+
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse =
1173+
(AfterRollbackProcessor<K, V>) getAfterRollbackProcessor();
1174+
if (afterRollbackProcessorToUse.isProcessInTransaction() && this.transactionTemplate != null) {
1175+
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
1176+
1177+
@Override
1178+
protected void doInTransactionWithoutResult(TransactionStatus status) {
1179+
afterRollbackProcessorToUse.process(unprocessed, ListenerConsumer.this.consumer, e, true);
1180+
}
1181+
1182+
});
1183+
}
1184+
else {
1185+
afterRollbackProcessorToUse.process(unprocessed, this.consumer, e, true);
1186+
}
1187+
}
1188+
11491189
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
11501190
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
11511191
while (iterator.hasNext()) {
@@ -1499,7 +1539,7 @@ private void commitIfNecessary() {
14991539
this.consumer.commitAsync(commits, this.commitCallback);
15001540
}
15011541
}
1502-
catch (WakeupException e) {
1542+
catch (@SuppressWarnings("unused") WakeupException e) {
15031543
// ignore - not polling
15041544
this.logger.debug("Woken up during commit");
15051545
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyBoolean;
2122
import static org.mockito.ArgumentMatchers.anyLong;
2223
import static org.mockito.ArgumentMatchers.anyMap;
2324
import static org.mockito.ArgumentMatchers.anyString;
@@ -494,6 +495,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
494495
consumer.close();
495496
}
496497

498+
@SuppressWarnings("unchecked")
497499
@Test
498500
public void testMaxFailures() throws Exception {
499501
logger.info("Start testMaxFailures");
@@ -520,14 +522,15 @@ public void testMaxFailures() throws Exception {
520522
latch.countDown();
521523
});
522524

523-
@SuppressWarnings({ "rawtypes", "unchecked" })
525+
@SuppressWarnings({ "rawtypes" })
524526
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
525527
containerProps.setTransactionManager(tm);
526528
KafkaMessageListenerContainer<Integer, String> container =
527529
new KafkaMessageListenerContainer<>(cf, containerProps);
528530
container.setBeanName("testMaxFailures");
529531
final CountDownLatch recoverLatch = new CountDownLatch(1);
530-
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template) {
532+
final KafkaTemplate<Object, Object> dlTemplate = spy(new KafkaTemplate<>(pf));
533+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(dlTemplate) {
531534

532535
@Override
533536
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
@@ -536,8 +539,10 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
536539
}
537540

538541
};
539-
DefaultAfterRollbackProcessor<Integer, String> afterRollbackProcessor =
542+
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor =
540543
spy(new DefaultAfterRollbackProcessor<>(recoverer, 3));
544+
afterRollbackProcessor.setProcessInTransaction(true);
545+
afterRollbackProcessor.setKafkaTemplate(dlTemplate);
541546
container.setAfterRollbackProcessor(afterRollbackProcessor);
542547
final CountDownLatch stopLatch = new CountDownLatch(1);
543548
container.setApplicationEventPublisher(e -> {
@@ -579,7 +584,12 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
579584
assertThat(headers.get("baz")).isEqualTo("qux".getBytes());
580585
pf.destroy();
581586
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
587+
verify(afterRollbackProcessor, times(3)).isProcessInTransaction();
588+
verify(afterRollbackProcessor, times(3)).process(any(), any(), any(), anyBoolean());
582589
verify(afterRollbackProcessor).clearThreadState();
590+
verify(dlTemplate).send(any(ProducerRecord.class));
591+
verify(dlTemplate).sendOffsetsToTransaction(
592+
Collections.singletonMap(new TopicPartition(topic3, 0), new OffsetAndMetadata(1L)));
583593
logger.info("Stop testMaxAttempts");
584594
}
585595

src/reference/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2834,6 +2834,10 @@ In such cases, the application listener must handle a record that keeps failing.
28342834

28352835
See also <<dead-letters>>.
28362836

2837+
Starting with version 2.2.5, the `DefaultAfterRollbackProcessor` can be invoked in a new transaction (started after the failed transaction rolls back).
2838+
Then, if you are using the `DeadLetterPublishingRecoverer` to publish a failed record, the processor will send the recovered record's offset in the original topic/partition to the transaction.
2839+
To enable this feature, set the `processInTransaction` and `kafkaTemplate` properties on the `DefaultAfterRollbackProcessor`.
2840+
28372841
[[dead-letters]]
28382842
===== Publishing Dead-letter Records
28392843

0 commit comments

Comments
 (0)