Skip to content

Commit a1d56d3

Browse files
mikael-carlstedtgaryrussell
authored andcommitted
GH-2489: Retry Commits If Necessary on Revoke
Closes #2489 * Retry commits that have failed temporarily due to rebalance in progress when onPartitionsRevoked is called. * Adjust expectations for unit tests where commits are retried (previous expectation accepted the defect that failed commits for subsequently revoked partitions were not retried)
1 parent 4ac63fa commit a1d56d3

File tree

2 files changed

+3
-2
lines changed

2 files changed

+3
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3446,6 +3446,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
34463446
}
34473447
try {
34483448
// Wait until now to commit, in case the user listener added acks
3449+
checkRebalanceCommits();
34493450
commitPendingAcks();
34503451
fixTxOffsetsIfNeeded();
34513452
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3351,7 +3351,7 @@ void testCommitRebalanceInProgressBatch() throws Exception {
33513351
assertThat(commits).hasSize(3);
33523352
assertThat(commits.get(0)).hasSize(2); // assignment
33533353
assertThat(commits.get(1)).hasSize(2); // batch commit
3354-
assertThat(commits.get(2)).hasSize(1); // re-commit
3354+
assertThat(commits.get(2)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
33553355
});
33563356
}
33573357

@@ -3364,7 +3364,7 @@ void testCommitRebalanceInProgressRecord() throws Exception {
33643364
assertThat(commits.get(2)).hasSize(1);
33653365
assertThat(commits.get(3)).hasSize(1);
33663366
assertThat(commits.get(4)).hasSize(1);
3367-
assertThat(commits.get(5)).hasSize(1); // re-commit
3367+
assertThat(commits.get(5)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
33683368
assertThat(commits.get(5).get(new TopicPartition("foo", 1)))
33693369
.isNotNull()
33703370
.extracting(om -> om.offset())

0 commit comments

Comments
 (0)