Skip to content

Commit e138fc0

Browse files
garyrussellartembilan
authored andcommitted
GH-1370: Opt. commit current offsets on assignment
Resolves #1370 Add an option to control current offset commit behavior on partition assignment. **cherry-pick to 2.3.x**
1 parent 909af16 commit e138fc0

File tree

4 files changed

+333
-50
lines changed

4 files changed

+333
-50
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -93,6 +93,36 @@ public enum AckMode {
9393

9494
}
9595

96+
/**
97+
* Offset commit behavior during assignment.
98+
* @since 2.3.6
99+
*/
100+
public enum AssignmentCommitOption {
101+
102+
/**
103+
* Always commit the current offset during partition assignment.
104+
*/
105+
ALWAYS,
106+
107+
/**
108+
* Never commit the current offset during partition assignment.
109+
*/
110+
NEVER,
111+
112+
/**
113+
* Commit the current offset during partition assignment when auto.offset.reset is
114+
* 'latest'; transactional if so configured.
115+
*/
116+
LATEST_ONLY,
117+
118+
/**
119+
* Commit the current offset during partition assignment when auto.offset.reset is
120+
* 'latest'; use consumer commit even when transactions are being used.
121+
*/
122+
LATEST_ONLY_NO_TX
123+
124+
}
125+
96126
/**
97127
* The default {@link #setShutdownTimeout(long) shutDownTimeout} (ms).
98128
*/
@@ -183,6 +213,8 @@ public enum AckMode {
183213

184214
private boolean subBatchPerPartition;
185215

216+
private AssignmentCommitOption assignmentCommitOption = AssignmentCommitOption.ALWAYS;
217+
186218
/**
187219
* Create properties for a container that will subscribe to the specified topics.
188220
* @param topics the topics.
@@ -610,6 +642,21 @@ public void setSubBatchPerPartition(boolean subBatchPerPartition) {
610642
this.subBatchPerPartition = subBatchPerPartition;
611643
}
612644

645+
public AssignmentCommitOption getAssignmentCommitOption() {
646+
return this.assignmentCommitOption;
647+
}
648+
649+
/**
650+
* Set the assignment commit option. Default {@link AssignmentCommitOption#ALWAYS}.
651+
* In a future release it will default to {@link AssignmentCommitOption#LATEST_ONLY}.
652+
* @param assignmentCommitOption the option.
653+
* @since 2.3.6
654+
*/
655+
public void setAssignmentCommitOption(AssignmentCommitOption assignmentCommitOption) {
656+
Assert.notNull(assignmentCommitOption, "'assignmentCommitOption' cannot be null");
657+
this.assignmentCommitOption = assignmentCommitOption;
658+
}
659+
613660
@Override
614661
public String toString() {
615662
return "ContainerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 71 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
7575
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback;
7676
import org.springframework.kafka.listener.ContainerProperties.AckMode;
77+
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
7778
import org.springframework.kafka.support.Acknowledgment;
7879
import org.springframework.kafka.support.KafkaUtils;
7980
import org.springframework.kafka.support.LogIfLevelEnabled;
@@ -570,6 +571,10 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
570571
private final Duration authorizationExceptionRetryInterval =
571572
this.containerProperties.getAuthorizationExceptionRetryInterval();
572573

574+
private final AssignmentCommitOption autoCommitOption = this.containerProperties.getAssignmentCommitOption();
575+
576+
private final boolean commitCurrentOnAssignment;
577+
573578
private Map<TopicPartition, OffsetMetadata> definedPartitions;
574579

575580
private int count;
@@ -614,6 +619,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
614619
this.transactionTemplate = determineTransactionTemplate();
615620
this.genericListener = listener;
616621
this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
622+
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties);
617623
subscribeOrAssignTopics(this.consumer);
618624
GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
619625
if (listener instanceof BatchMessageListener) {
@@ -680,6 +686,20 @@ else if (listener instanceof MessageListener) {
680686
this.micrometerHolder = obtainMicrometerHolder();
681687
}
682688

689+
private boolean determineCommitCurrent(Properties consumerProperties) {
690+
if (AssignmentCommitOption.NEVER.equals(this.autoCommitOption)) {
691+
return false;
692+
}
693+
if (!this.autoCommit && AssignmentCommitOption.ALWAYS.equals(this.autoCommitOption)) {
694+
return true;
695+
}
696+
String autoOffsetReset = consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
697+
return !this.autoCommit
698+
&& (autoOffsetReset == null || autoOffsetReset.equals("latest"))
699+
&& (AssignmentCommitOption.LATEST_ONLY.equals(this.autoCommitOption)
700+
|| AssignmentCommitOption.LATEST_ONLY_NO_TX.equals(this.autoCommitOption));
701+
}
702+
683703
private long obtainMaxPollInterval(Properties consumerProperties) {
684704
Object timeout = consumerProperties.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
685705
if (timeout == null) {
@@ -2227,7 +2247,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
22272247
+ "consumer paused again, so the initial poll() will never return any records");
22282248
}
22292249
ListenerConsumer.this.assignedPartitions = new LinkedList<>(partitions);
2230-
if (!ListenerConsumer.this.autoCommit) {
2250+
if (ListenerConsumer.this.commitCurrentOnAssignment) {
22312251
// Commit initial positions - this is generally redundant but
22322252
// it protects us from the case when another consumer starts
22332253
// and rebalance would cause it to reset at the end
@@ -2244,51 +2264,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
22442264
return;
22452265
}
22462266
}
2247-
ListenerConsumer.this.commitLogger.log(() -> "Committing on assignment: " + offsetsToCommit);
2248-
if (ListenerConsumer.this.transactionTemplate != null &&
2249-
ListenerConsumer.this.kafkaTxManager != null) {
2250-
try {
2251-
offsetsToCommit.forEach((partition, offsetAndMetadata) -> {
2252-
TransactionSupport.setTransactionIdSuffix(
2253-
zombieFenceTxIdSuffix(partition.topic(), partition.partition()));
2254-
ListenerConsumer.this.transactionTemplate
2255-
.execute(new TransactionCallbackWithoutResult() {
2256-
2257-
@SuppressWarnings({ UNCHECKED, RAWTYPES })
2258-
@Override
2259-
protected void doInTransactionWithoutResult(TransactionStatus status) {
2260-
KafkaResourceHolder holder =
2261-
(KafkaResourceHolder) TransactionSynchronizationManager
2262-
.getResource(
2263-
ListenerConsumer.this.kafkaTxManager
2264-
.getProducerFactory());
2265-
if (holder != null) {
2266-
holder.getProducer()
2267-
.sendOffsetsToTransaction(
2268-
Collections.singletonMap(partition,
2269-
offsetAndMetadata),
2270-
ListenerConsumer.this.consumerGroupId);
2271-
}
2272-
}
2273-
2274-
});
2275-
});
2276-
}
2277-
finally {
2278-
TransactionSupport.clearTransactionIdSuffix();
2279-
}
2280-
}
2281-
else {
2282-
ContainerProperties containerProps = KafkaMessageListenerContainer.this.getContainerProperties();
2283-
if (containerProps.isSyncCommits()) {
2284-
ListenerConsumer.this.consumer.commitSync(offsetsToCommit,
2285-
containerProps.getSyncCommitTimeout());
2286-
}
2287-
else {
2288-
ListenerConsumer.this.consumer.commitAsync(offsetsToCommit,
2289-
containerProps.getCommitCallback());
2290-
}
2291-
}
2267+
commitCurrentOffsets(offsetsToCommit);
22922268
}
22932269
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
22942270
seekPartitions(partitions, false);
@@ -2301,6 +2277,55 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
23012277
}
23022278
}
23032279

2280+
private void commitCurrentOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
2281+
ListenerConsumer.this.commitLogger.log(() -> "Committing on assignment: " + offsetsToCommit);
2282+
if (ListenerConsumer.this.transactionTemplate != null
2283+
&& ListenerConsumer.this.kafkaTxManager != null
2284+
&& !AssignmentCommitOption.LATEST_ONLY_NO_TX.equals(ListenerConsumer.this.autoCommitOption)) {
2285+
try {
2286+
offsetsToCommit.forEach((partition, offsetAndMetadata) -> {
2287+
TransactionSupport.setTransactionIdSuffix(
2288+
zombieFenceTxIdSuffix(partition.topic(), partition.partition()));
2289+
ListenerConsumer.this.transactionTemplate
2290+
.execute(new TransactionCallbackWithoutResult() {
2291+
2292+
@SuppressWarnings({ UNCHECKED, RAWTYPES })
2293+
@Override
2294+
protected void doInTransactionWithoutResult(TransactionStatus status) {
2295+
KafkaResourceHolder holder =
2296+
(KafkaResourceHolder) TransactionSynchronizationManager
2297+
.getResource(
2298+
ListenerConsumer.this.kafkaTxManager
2299+
.getProducerFactory());
2300+
if (holder != null) {
2301+
holder.getProducer()
2302+
.sendOffsetsToTransaction(
2303+
Collections.singletonMap(partition,
2304+
offsetAndMetadata),
2305+
ListenerConsumer.this.consumerGroupId);
2306+
}
2307+
}
2308+
2309+
});
2310+
});
2311+
}
2312+
finally {
2313+
TransactionSupport.clearTransactionIdSuffix();
2314+
}
2315+
}
2316+
else {
2317+
ContainerProperties containerProps = KafkaMessageListenerContainer.this.getContainerProperties();
2318+
if (containerProps.isSyncCommits()) {
2319+
ListenerConsumer.this.consumer.commitSync(offsetsToCommit,
2320+
containerProps.getSyncCommitTimeout());
2321+
}
2322+
else {
2323+
ListenerConsumer.this.consumer.commitAsync(offsetsToCommit,
2324+
containerProps.getCommitCallback());
2325+
}
2326+
}
2327+
}
2328+
23042329
@Override
23052330
public void onPartitionsLost(Collection<TopicPartition> partitions) {
23062331
if (this.consumerAwareListener != null) {

0 commit comments

Comments
 (0)