Skip to content

Commit 1d1a6a7

Browse files
committed
Fix test for previous back ported fix
1 parent 2b6400d commit 1d1a6a7

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2761,7 +2761,8 @@ void testCommitFailsOnRevoke() throws Exception {
27612761
records.put(topicPartition0, Arrays.asList(
27622762
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
27632763
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
2764-
records.put(new TopicPartition("foo", 1), Arrays.asList(
2764+
TopicPartition topicPartition1 = new TopicPartition("foo", 1);
2765+
records.put(topicPartition1, Arrays.asList(
27652766
new ConsumerRecord<>("foo", 1, 0L, 1, "foo"),
27662767
new ConsumerRecord<>("foo", 1, 1L, 1, "bar")));
27672768
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
@@ -2779,7 +2780,7 @@ void testCommitFailsOnRevoke() throws Exception {
27792780
}
27802781
else if (call == 1) {
27812782
rebal.get().onPartitionsRevoked(Collections.singletonList(topicPartition0));
2782-
rebal.get().onPartitionsAssigned(Collections.emptyList());
2783+
rebal.get().onPartitionsAssigned(Collections.singletonList(topicPartition1));
27832784
}
27842785
latch.countDown();
27852786
return first.getAndSet(false) ? consumerRecords : emptyRecords;
@@ -2789,11 +2790,10 @@ else if (call == 1) {
27892790
return null;
27902791
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
27912792
List<Map<TopicPartition, OffsetAndMetadata>> commits = new ArrayList<>();
2792-
AtomicBoolean firstCommit = new AtomicBoolean(true);
27932793
AtomicInteger commitCount = new AtomicInteger();
27942794
willAnswer(invoc -> {
27952795
commits.add(invoc.getArgument(0, Map.class));
2796-
if (!firstCommit.getAndSet(false)) {
2796+
if (commitCount.incrementAndGet() == 2) {
27972797
throw new CommitFailedException();
27982798
}
27992799
return null;
@@ -2804,7 +2804,8 @@ else if (call == 1) {
28042804
containerProps.setClientId("clientId");
28052805
containerProps.setIdleEventInterval(100L);
28062806
AtomicReference<Acknowledgment> acknowledgment = new AtomicReference<>();
2807-
class AckListener implements AcknowledgingMessageListener {
2807+
AtomicBoolean consumerSeekAwareCalled = new AtomicBoolean();
2808+
class AckListener implements AcknowledgingMessageListener, ConsumerSeekAware {
28082809
// not a lambda https://bugs.openjdk.java.net/browse/JDK-8074381
28092810

28102811
@Override
@@ -2816,6 +2817,12 @@ public void onMessage(ConsumerRecord data, Acknowledgment ack) {
28162817
public void onMessage(Object data) {
28172818
}
28182819

2820+
@Override
2821+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
2822+
consumerSeekAwareCalled.set(true);
2823+
}
2824+
2825+
28192826
}
28202827
containerProps.setMessageListener(new AckListener());
28212828
containerProps.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@@ -2837,6 +2844,7 @@ public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
28372844
container.start();
28382845
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
28392846
assertThat(container.getAssignedPartitions()).hasSize(1);
2847+
assertThat(consumerSeekAwareCalled.get()).isTrue();
28402848
container.stop();
28412849
}
28422850

0 commit comments

Comments
 (0)