Skip to content

Commit fb1977c

Browse files
garyrussellartembilan
authored andcommitted
GH-1587: Fix NPE with Foreign TM and fixTxOffsets
Resolves #1587 Code was using the presence of a transaction template instead of the KTM. **cherry-pick to 2.5.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java # spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java # src/reference/asciidoc/kafka.adoc
1 parent 36abb92 commit fb1977c

File tree

4 files changed

+87
-3
lines changed

4 files changed

+87
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,10 @@ public boolean isFixTxOffsets() {
410410
* functionally affect the consumer but some users have expressed concern that the
411411
* "lag" is non-zero. Set this to true and the container will correct such
412412
* mis-reported offsets. The check is performed before the next poll to avoid adding
413-
* significant complexity to the commit processing.
413+
* significant complexity to the commit processing. IMPORTANT: At the time of writing,
414+
* the lag will only be corrected if the consumer is configured with
415+
* {@code isolation.level=read_committed} and {@code max.poll.records} is greater than
416+
* 1. See https://issues.apache.org/jira/browse/KAFKA-10683 for more information.
414417
* @param fixTxOffsets true to correct the offset(s).
415418
* @since 2.5.6
416419
*/

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1167,7 +1167,7 @@ private void fixTxOffsetsIfNeeded() {
11671167
});
11681168
if (toFix.size() > 0) {
11691169
this.logger.debug(() -> "Fixing TX offsets: " + toFix);
1170-
if (this.transactionTemplate == null) {
1170+
if (this.kafkaTxManager == null) {
11711171
if (this.syncCommits) {
11721172
commitSync(toFix);
11731173
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@
117117
*
118118
*/
119119
@EmbeddedKafka(topics = { TransactionalContainerTests.topic1, TransactionalContainerTests.topic2,
120-
TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4 },
120+
TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4,
121+
TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7 },
121122
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
122123
public class TransactionalContainerTests {
123124

@@ -133,6 +134,12 @@ public class TransactionalContainerTests {
133134

134135
public static final String topic4 = "txTopic4";
135136

137+
public static final String topic5 = "txTopic5";
138+
139+
public static final String topic6 = "txTopic6";
140+
141+
public static final String topic7 = "txTopic7";
142+
136143
private static EmbeddedKafkaBroker embeddedKafka;
137144

138145
@BeforeAll
@@ -595,6 +602,78 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
595602
consumer.close();
596603
}
597604

605+
@Test
606+
public void testFixLag() throws InterruptedException {
607+
testFixLagGuts(topic5, 0);
608+
}
609+
610+
@Test
611+
public void testFixLagKTM() throws InterruptedException {
612+
testFixLagGuts(topic6, 1);
613+
}
614+
615+
@Test
616+
public void testFixLagOtherTM() throws InterruptedException {
617+
testFixLagGuts(topic7, 2);
618+
}
619+
620+
@SuppressWarnings("unchecked")
621+
private void testFixLagGuts(String topic, int whichTm) throws InterruptedException {
622+
logger.info("Start testFixLag");
623+
Map<String, Object> props = KafkaTestUtils.consumerProps("txTest2", "false", embeddedKafka);
624+
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
625+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
626+
ContainerProperties containerProps = new ContainerProperties(topic);
627+
containerProps.setGroupId("txTest2");
628+
containerProps.setPollTimeout(500L);
629+
containerProps.setIdleEventInterval(500L);
630+
containerProps.setFixTxOffsets(true);
631+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
632+
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
633+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
634+
pf.setTransactionIdPrefix("fl.");
635+
switch (whichTm) {
636+
case 0:
637+
break;
638+
case 1:
639+
containerProps.setTransactionManager(new KafkaTransactionManager<>(pf));
640+
break;
641+
case 2:
642+
containerProps.setTransactionManager(new SomeOtherTransactionManager());
643+
}
644+
645+
final KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
646+
final CountDownLatch latch = new CountDownLatch(1);
647+
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
648+
});
649+
650+
KafkaMessageListenerContainer<Integer, String> container =
651+
new KafkaMessageListenerContainer<>(cf, containerProps);
652+
container.setBeanName("testRollbackRecord");
653+
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
654+
container.setApplicationEventPublisher(event -> {
655+
if (event instanceof ListenerContainerIdleEvent) {
656+
Consumer<?, ?> consumer = ((ListenerContainerIdleEvent) event).getConsumer();
657+
committed.set(consumer.committed(Collections.singleton(new TopicPartition(topic, 0))));
658+
if (committed.get().get(new TopicPartition(topic, 0)) != null) {
659+
latch.countDown();
660+
}
661+
}
662+
});
663+
container.start();
664+
665+
template.setDefaultTopic(topic);
666+
template.executeInTransaction(t -> {
667+
template.sendDefault(0, 0, "foo");
668+
return null;
669+
});
670+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
671+
TopicPartition partition0 = new TopicPartition(topic, 0);
672+
assertThat(committed.get().get(partition0).offset()).isEqualTo(2L);
673+
container.stop();
674+
pf.destroy();
675+
}
676+
598677
@SuppressWarnings({ "unchecked", "deprecation" })
599678
@Test
600679
public void testMaxFailures() throws Exception {

src/reference/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2241,6 +2241,8 @@ The default executor creates threads named `<name>-C-n`; with the `KafkaMessageL
22412241
This does not functionally affect the consumer but some users have expressed concern that the "lag" is non-zero.
22422242
Set this property to `true` and the container will correct such mis-reported offsets.
22432243
The check is performed before the next poll to avoid adding significant complexity to the commit processing.
2244+
At the time of writing, the lag will only be corrected if the consumer is configured with `isolation.level=read_committed` and `max.poll.records` is greater than 1.
2245+
See https://issues.apache.org/jira/browse/KAFKA-10683[KAFKA-10683] for more information.
22442246

22452247
|groupId
22462248
|`null`

0 commit comments

Comments
 (0)