Skip to content

Commit 678d9d5

Browse files
committed
Fix test for previous back ported fix
1 parent 5fa11d3 commit 678d9d5

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,8 +1755,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
17551755
commitPendingAcks();
17561756
}
17571757
catch (Exception e) {
1758-
ListenerConsumer.this.logger.error(e, () -> "Fatal commit error after revocation "
1759-
+ partitions);
1758+
ListenerConsumer.this.logger.error("Fatal commit error after revocation " + partitions, e);
17601759
}
17611760
if (this.consumerAwareListener != null) {
17621761
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2260,7 +2260,7 @@ public void testCommitErrorHandlerCalled() throws Exception {
22602260

22612261
@SuppressWarnings({ "unchecked", "rawtypes" })
22622262
@Test
2263-
void testCommitFailsOnRevoke() throws Exception {
2263+
public void testCommitFailsOnRevoke() throws Exception {
22642264
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
22652265
Consumer<Integer, String> consumer = mock(Consumer.class);
22662266
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
@@ -2271,7 +2271,8 @@ void testCommitFailsOnRevoke() throws Exception {
22712271
records.put(topicPartition0, Arrays.asList(
22722272
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
22732273
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
2274-
records.put(new TopicPartition("foo", 1), Arrays.asList(
2274+
TopicPartition topicPartition1 = new TopicPartition("foo", 1);
2275+
records.put(topicPartition1, Arrays.asList(
22752276
new ConsumerRecord<>("foo", 1, 0L, 1, "foo"),
22762277
new ConsumerRecord<>("foo", 1, 1L, 1, "bar")));
22772278
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
@@ -2289,7 +2290,7 @@ void testCommitFailsOnRevoke() throws Exception {
22892290
}
22902291
else if (call == 1) {
22912292
rebal.get().onPartitionsRevoked(Collections.singletonList(topicPartition0));
2292-
rebal.get().onPartitionsAssigned(Collections.emptyList());
2293+
rebal.get().onPartitionsAssigned(Collections.singletonList(topicPartition1));
22932294
}
22942295
latch.countDown();
22952296
return first.getAndSet(false) ? consumerRecords : emptyRecords;
@@ -2299,11 +2300,10 @@ else if (call == 1) {
22992300
return null;
23002301
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
23012302
List<Map<TopicPartition, OffsetAndMetadata>> commits = new ArrayList<>();
2302-
AtomicBoolean firstCommit = new AtomicBoolean(true);
23032303
AtomicInteger commitCount = new AtomicInteger();
23042304
willAnswer(invoc -> {
23052305
commits.add(invoc.getArgument(0, Map.class));
2306-
if (!firstCommit.getAndSet(false)) {
2306+
if (commitCount.incrementAndGet() == 2) {
23072307
throw new CommitFailedException();
23082308
}
23092309
return null;
@@ -2314,6 +2314,7 @@ else if (call == 1) {
23142314
containerProps.setClientId("clientId");
23152315
containerProps.setIdleEventInterval(100L);
23162316
AtomicReference<Acknowledgment> acknowledgment = new AtomicReference<>();
2317+
AtomicBoolean listenerCalled = new AtomicBoolean();
23172318
class AckListener implements AcknowledgingMessageListener {
23182319
// not a lambda https://bugs.openjdk.java.net/browse/JDK-8074381
23192320

@@ -2339,14 +2340,18 @@ public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
23392340
}
23402341
}
23412342

2343+
@Override
2344+
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
2345+
listenerCalled.set(true);
2346+
}
2347+
23422348
});
2343-
Properties consumerProps = new Properties();
2344-
containerProps.setKafkaConsumerProperties(consumerProps);
23452349
KafkaMessageListenerContainer<Integer, String> container =
23462350
new KafkaMessageListenerContainer<>(cf, containerProps);
23472351
container.start();
23482352
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
23492353
assertThat(container.getAssignedPartitions()).hasSize(1);
2354+
assertThat(listenerCalled.get()).isTrue();
23502355
container.stop();
23512356
}
23522357

0 commit comments

Comments
 (0)