Skip to content

Commit e16b433

Browse files
committed
GH-1370: Opt. commit current offsets on assignment
Resolves #1370 Add an option to control current offset commit behavior on partition assignment. **cherry-pick to 2.3.x**
1 parent 705930c commit e16b433

File tree

4 files changed

+334
-51
lines changed

4 files changed

+334
-51
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -93,6 +93,36 @@ public enum AckMode {
9393

9494
}
9595

96+
/**
97+
* Offset commit behavior during assignment.
98+
* @since 2.3.6
99+
*/
100+
public enum AssignmentCommitOption {
101+
102+
/**
103+
* Always commit the current offset during partition assignment.
104+
*/
105+
ALWAYS,
106+
107+
/**
108+
* Never commit the current offset during partition assignment.
109+
*/
110+
NEVER,
111+
112+
/**
113+
* Commit the current offset during partition assignment when auto.offset.reset is
114+
* 'latest'; transactional if so configured.
115+
*/
116+
LATEST_ONLY,
117+
118+
/**
119+
* Commit the current offset during partition assignment when auto.offset.reset is
120+
* 'latest'; use consumer commit even when transactions are being used.
121+
*/
122+
LATEST_ONLY_NO_TX
123+
124+
}
125+
96126
/**
97127
* The default {@link #setShutdownTimeout(long) shutDownTimeout} (ms).
98128
*/
@@ -183,6 +213,8 @@ public enum AckMode {
183213

184214
private boolean subBatchPerPartition;
185215

216+
private AssignmentCommitOption assignmentCommitOption = AssignmentCommitOption.ALWAYS;
217+
186218
/**
187219
* Create properties for a container that will subscribe to the specified topics.
188220
* @param topics the topics.
@@ -610,6 +642,21 @@ public void setSubBatchPerPartition(boolean subBatchPerPartition) {
610642
this.subBatchPerPartition = subBatchPerPartition;
611643
}
612644

645+
public AssignmentCommitOption getAssignmentCommitOption() {
646+
return this.assignmentCommitOption;
647+
}
648+
649+
/**
650+
* Set the assignment commit option. Default {@link AssignmentCommitOption#ALWAYS}.
651+
* In a future release it will default to {@link AssignmentCommitOption#LATEST_ONLY}.
652+
* @param assignmentCommitOption the option.
653+
* @since 2.3.6
654+
*/
655+
public void setAssignmentCommitOption(AssignmentCommitOption assignmentCommitOption) {
656+
Assert.notNull(assignmentCommitOption, "'assignmentCommitOption' cannot be null");
657+
this.assignmentCommitOption = assignmentCommitOption;
658+
}
659+
613660
@Override
614661
public String toString() {
615662
return "ContainerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 72 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
7575
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback;
7676
import org.springframework.kafka.listener.ContainerProperties.AckMode;
77+
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
7778
import org.springframework.kafka.support.Acknowledgment;
7879
import org.springframework.kafka.support.KafkaUtils;
7980
import org.springframework.kafka.support.LogIfLevelEnabled;
@@ -570,6 +571,10 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
570571
private final Duration authorizationExceptionRetryInterval =
571572
this.containerProperties.getAuthorizationExceptionRetryInterval();
572573

574+
private final AssignmentCommitOption autoCommitOption = this.containerProperties.getAssignmentCommitOption();
575+
576+
private final boolean commitCurrentOnAssignment;
577+
573578
private Map<TopicPartition, OffsetMetadata> definedPartitions;
574579

575580
private int count;
@@ -614,6 +619,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
614619
this.transactionTemplate = determineTransactionTemplate();
615620
this.genericListener = listener;
616621
this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
622+
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties);
617623
subscribeOrAssignTopics(this.consumer);
618624
GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
619625
if (listener instanceof BatchMessageListener) {
@@ -680,6 +686,20 @@ else if (listener instanceof MessageListener) {
680686
this.micrometerHolder = obtainMicrometerHolder();
681687
}
682688

689+
private boolean determineCommitCurrent(Properties consumerProperties) {
690+
if (AssignmentCommitOption.NEVER.equals(this.autoCommitOption)) {
691+
return false;
692+
}
693+
if (!this.autoCommit && AssignmentCommitOption.ALWAYS.equals(this.autoCommitOption)) {
694+
return true;
695+
}
696+
String autoOffsetReset = consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
697+
return !this.autoCommit
698+
&& (autoOffsetReset == null || autoOffsetReset.equals("latest"))
699+
&& (AssignmentCommitOption.LATEST_ONLY.equals(this.autoCommitOption)
700+
|| AssignmentCommitOption.LATEST_ONLY_NO_TX.equals(this.autoCommitOption));
701+
}
702+
683703
private long obtainMaxPollInterval(Properties consumerProperties) {
684704
Object timeout = consumerProperties.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
685705
if (timeout == null) {
@@ -2223,8 +2243,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
22232243
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
22242244
+ "consumer paused again, so the initial poll() will never return any records");
22252245
}
2226-
ListenerConsumer.this.assignedPartitions = partitions;
2227-
if (!ListenerConsumer.this.autoCommit) {
2246+
ListenerConsumer.this.assignedPartitions = new LinkedList<>(partitions);
2247+
if (ListenerConsumer.this.commitCurrentOnAssignment) {
22282248
// Commit initial positions - this is generally redundant but
22292249
// it protects us from the case when another consumer starts
22302250
// and rebalance would cause it to reset at the end
@@ -2241,51 +2261,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
22412261
return;
22422262
}
22432263
}
2244-
ListenerConsumer.this.commitLogger.log(() -> "Committing on assignment: " + offsetsToCommit);
2245-
if (ListenerConsumer.this.transactionTemplate != null &&
2246-
ListenerConsumer.this.kafkaTxManager != null) {
2247-
try {
2248-
offsetsToCommit.forEach((partition, offsetAndMetadata) -> {
2249-
TransactionSupport.setTransactionIdSuffix(
2250-
zombieFenceTxIdSuffix(partition.topic(), partition.partition()));
2251-
ListenerConsumer.this.transactionTemplate
2252-
.execute(new TransactionCallbackWithoutResult() {
2253-
2254-
@SuppressWarnings({ UNCHECKED, RAWTYPES })
2255-
@Override
2256-
protected void doInTransactionWithoutResult(TransactionStatus status) {
2257-
KafkaResourceHolder holder =
2258-
(KafkaResourceHolder) TransactionSynchronizationManager
2259-
.getResource(
2260-
ListenerConsumer.this.kafkaTxManager
2261-
.getProducerFactory());
2262-
if (holder != null) {
2263-
holder.getProducer()
2264-
.sendOffsetsToTransaction(
2265-
Collections.singletonMap(partition,
2266-
offsetAndMetadata),
2267-
ListenerConsumer.this.consumerGroupId);
2268-
}
2269-
}
2270-
2271-
});
2272-
});
2273-
}
2274-
finally {
2275-
TransactionSupport.clearTransactionIdSuffix();
2276-
}
2277-
}
2278-
else {
2279-
ContainerProperties containerProps = KafkaMessageListenerContainer.this.getContainerProperties();
2280-
if (containerProps.isSyncCommits()) {
2281-
ListenerConsumer.this.consumer.commitSync(offsetsToCommit,
2282-
containerProps.getSyncCommitTimeout());
2283-
}
2284-
else {
2285-
ListenerConsumer.this.consumer.commitAsync(offsetsToCommit,
2286-
containerProps.getCommitCallback());
2287-
}
2288-
}
2264+
commitCurrentOffsets(offsetsToCommit);
22892265
}
22902266
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
22912267
seekPartitions(partitions, false);
@@ -2298,6 +2274,55 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
22982274
}
22992275
}
23002276

2277+
private void commitCurrentOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
2278+
ListenerConsumer.this.commitLogger.log(() -> "Committing on assignment: " + offsetsToCommit);
2279+
if (ListenerConsumer.this.transactionTemplate != null
2280+
&& ListenerConsumer.this.kafkaTxManager != null
2281+
&& !AssignmentCommitOption.LATEST_ONLY_NO_TX.equals(ListenerConsumer.this.autoCommitOption)) {
2282+
try {
2283+
offsetsToCommit.forEach((partition, offsetAndMetadata) -> {
2284+
TransactionSupport.setTransactionIdSuffix(
2285+
zombieFenceTxIdSuffix(partition.topic(), partition.partition()));
2286+
ListenerConsumer.this.transactionTemplate
2287+
.execute(new TransactionCallbackWithoutResult() {
2288+
2289+
@SuppressWarnings({ UNCHECKED, RAWTYPES })
2290+
@Override
2291+
protected void doInTransactionWithoutResult(TransactionStatus status) {
2292+
KafkaResourceHolder holder =
2293+
(KafkaResourceHolder) TransactionSynchronizationManager
2294+
.getResource(
2295+
ListenerConsumer.this.kafkaTxManager
2296+
.getProducerFactory());
2297+
if (holder != null) {
2298+
holder.getProducer()
2299+
.sendOffsetsToTransaction(
2300+
Collections.singletonMap(partition,
2301+
offsetAndMetadata),
2302+
ListenerConsumer.this.consumerGroupId);
2303+
}
2304+
}
2305+
2306+
});
2307+
});
2308+
}
2309+
finally {
2310+
TransactionSupport.clearTransactionIdSuffix();
2311+
}
2312+
}
2313+
else {
2314+
ContainerProperties containerProps = KafkaMessageListenerContainer.this.getContainerProperties();
2315+
if (containerProps.isSyncCommits()) {
2316+
ListenerConsumer.this.consumer.commitSync(offsetsToCommit,
2317+
containerProps.getSyncCommitTimeout());
2318+
}
2319+
else {
2320+
ListenerConsumer.this.consumer.commitAsync(offsetsToCommit,
2321+
containerProps.getCommitCallback());
2322+
}
2323+
}
2324+
}
2325+
23012326
}
23022327

23032328
private final class InitialOrIdleSeekCallback implements ConsumerSeekCallback {

0 commit comments

Comments
 (0)