Skip to content

Commit 7bfc6b7

Browse files
garyrussellartembilan
authored andcommitted
GH-1646 Clear lastCommits after fixTxOffset
Resolves #1646 ``` Failed to correct transactional offset(s) java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer. ``` It is not clear how this happened since we remove revoked partitions from `lastCommits`. However, `lastCommits` should be cleared, even if successful, to avoid unnecessary processing if no records are received by the next poll. - add the `lastCommits` to the error log - clear `lastCommits` in a finally block **cherry-pick to 2.5.x** (cherry picked from commit ef6c115)
1 parent fa8cbeb commit 7bfc6b7

File tree

2 files changed

+6
-1
lines changed

2 files changed

+6
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1212,7 +1212,11 @@ private void fixTxOffsetsIfNeeded() {
12121212
}
12131213
}
12141214
catch (Exception e) {
1215-
this.logger.error(e, "Failed to correct transactional offset(s)");
1215+
this.logger.error(e, () -> "Failed to correct transactional offset(s): "
1216+
+ ListenerConsumer.this.lastCommits);
1217+
}
1218+
finally {
1219+
ListenerConsumer.this.lastCommits.clear();
12161220
}
12171221
}
12181222
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,7 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti
711711
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
712712
TopicPartition partition0 = new TopicPartition(topic, 0);
713713
assertThat(committed.get().get(partition0).offset()).isEqualTo(2L);
714+
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.lastCommits", Map.class)).isEmpty();
714715
container.stop();
715716
pf.destroy();
716717
}

0 commit comments

Comments
 (0)