Skip to content

Commit 9f1050a

Browse files
authored
GH-2891: Fix isAckAfterHandle with No group.id
Resolves #2891 When using manual partition assignment and `AckMode.MANUAL`, it is possible to have a `null` `group.id`, whereby Kafka does not maintain any committed offsets. In this case, we should not attempt to commit the offset after recovery, even if the error handler `ackAfterHandle` property is true. **cherry-pick to 3.0.x, 2.9.x**
1 parent 6270601 commit 9f1050a

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2259,7 +2259,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
22592259
try {
22602260
this.batchFailed = true;
22612261
invokeBatchErrorHandler(records, recordList, e);
2262-
commitOffsetsIfNeeded(records);
2262+
commitOffsetsIfNeededAfterHandlingError(records);
22632263
}
22642264
catch (KafkaException ke) {
22652265
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
@@ -2280,8 +2280,8 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
22802280
return null;
22812281
}
22822282

2283-
private void commitOffsetsIfNeeded(final ConsumerRecords<K, V> records) {
2284-
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle())
2283+
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecords<K, V> records) {
2284+
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null)
22852285
|| this.producer != null) {
22862286
if (this.remainingRecords != null) {
22872287
ConsumerRecord<K, V> firstUncommitted = this.remainingRecords.iterator().next();
@@ -2744,7 +2744,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27442744
}
27452745
try {
27462746
invokeErrorHandler(cRecord, iterator, e);
2747-
commitOffsetsIfNeeded(cRecord);
2747+
commitOffsetsIfNeededAfterHandlingError(cRecord);
27482748
}
27492749
catch (KafkaException ke) {
27502750
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
@@ -2763,8 +2763,8 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27632763
});
27642764
}
27652765

2766-
private void commitOffsetsIfNeeded(final ConsumerRecord<K, V> cRecord) {
2767-
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle())
2766+
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {
2767+
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null)
27682768
|| this.producer != null) {
27692769
if (this.isManualAck) {
27702770
this.commitRecovered = true;

0 commit comments

Comments
 (0)