Skip to content

Commit 7b24247

Browse files
committed
Improve logging and test reliability for share consumer implementation
- Replace Thread.sleep() with CountDownLatch assertions in tests for reliable synchronization and faster test execution - Use LogMessage.format() instead of Supplier with String.format() for consistent logging patterns - Fix lambda expression to use effectively final variable in error logging - Change shareConsumer field from nullable Boolean to primitive boolean in AbstractKafkaListenerEndpoint for cleaner null handling - Make utility methods static where appropriate for better code organization Signed-off-by: Soby Chacko <[email protected]>
1 parent 29f415e commit 7b24247

File tree

5 files changed

+49
-41
lines changed

5 files changed

+49
-41
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
9898

9999
private @Nullable Boolean batchListener;
100100

101-
private @Nullable Boolean shareConsumer;
101+
private boolean shareConsumer;
102102

103103
private @Nullable KafkaTemplate<?, ?> replyTemplate;
104104

@@ -303,7 +303,7 @@ public void setShareConsumer(boolean shareConsumer) {
303303
* @since 4.0
304304
*/
305305
public boolean isShareConsumer() {
306-
return this.shareConsumer != null && this.shareConsumer;
306+
return this.shareConsumer;
307307
}
308308

309309
/**

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* @param <V> the value type
5151
*
5252
* @author Soby Chacko
53+
*
5354
* @since 4.0
5455
*/
5556
public class ShareKafkaListenerContainerFactory<K, V>
@@ -179,7 +180,7 @@ else if ("implicit".equals(mode)) {
179180
return containerProperties.isExplicitShareAcknowledgment();
180181
}
181182

182-
private void validateShareConfiguration(KafkaListenerEndpoint endpoint) {
183+
private static void validateShareConfiguration(KafkaListenerEndpoint endpoint) {
183184
// Validate that batch listeners aren't used with share consumers
184185
if (Boolean.TRUE.equals(endpoint.getBatchListener())) {
185186
throw new IllegalArgumentException(

spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import org.springframework.context.ApplicationEventPublisher;
3737
import org.springframework.core.log.LogAccessor;
38+
import org.springframework.core.log.LogMessage;
3839
import org.springframework.core.task.AsyncTaskExecutor;
3940
import org.springframework.core.task.SimpleAsyncTaskExecutor;
4041
import org.springframework.kafka.core.ShareConsumerFactory;
@@ -71,7 +72,9 @@
7172
* @param <V> the value type
7273
*
7374
* @author Soby Chacko
75+
*
7476
* @since 4.0
77+
*
7578
* @see ShareConsumer
7679
* @see ShareAcknowledgment
7780
*/
@@ -247,7 +250,7 @@ private class ShareListenerConsumer implements Runnable {
247250
if (this.isExplicitMode) {
248251
// Apply explicit mode configuration to consumer
249252
// Note: This should ideally be done during consumer creation in the factory
250-
this.logger.info(() -> "Share consumer configured for explicit acknowledgment mode");
253+
this.logger.info("Share consumer configured for explicit acknowledgment mode");
251254
}
252255

253256
this.consumer.subscribe(Arrays.asList(containerProperties.getTopics()));
@@ -409,17 +412,18 @@ void onRecordAcknowledged(ConsumerRecord<K, V> record) {
409412
private void processQueuedAcknowledgments() {
410413
PendingAcknowledgment<K, V> pendingAck;
411414
while ((pendingAck = this.acknowledgmentQueue.poll()) != null) {
415+
final PendingAcknowledgment<K, V> ack = pendingAck;
412416
try {
413-
this.consumer.acknowledge(pendingAck.record, pendingAck.type);
417+
this.consumer.acknowledge(ack.record, ack.type);
414418
// Find and notify the acknowledgment object
415-
ShareConsumerAcknowledgment ack = this.pendingAcknowledgments.get(pendingAck.record);
416-
if (ack != null) {
417-
ack.notifyAcknowledged(pendingAck.type);
418-
onRecordAcknowledged(pendingAck.record);
419+
ShareConsumerAcknowledgment acknowledgment = this.pendingAcknowledgments.get(ack.record);
420+
if (acknowledgment != null) {
421+
acknowledgment.notifyAcknowledged(ack.type);
422+
onRecordAcknowledged(ack.record);
419423
}
420424
}
421425
catch (Exception e) {
422-
this.logger.error(e, "Failed to process queued acknowledgment for record: " + pendingAck.record);
426+
this.logger.error(e, () -> "Failed to process queued acknowledgment for record: " + ack.record);
423427
}
424428
}
425429
}
@@ -437,7 +441,7 @@ private void checkAcknowledgmentTimeouts() {
437441
long recordAge = currentTime - entry.getValue();
438442
if (recordAge > this.ackTimeoutMs) {
439443
ConsumerRecord<K, V> record = entry.getKey();
440-
this.logger.warn(() -> String.format(
444+
this.logger.warn(LogMessage.format(
441445
"Record not acknowledged within timeout (%d seconds). " +
442446
"In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(), " +
443447
"or ack.reject() for every record. " +

spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerConstraintTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ public void onShareRecord(ConsumerRecord<String, String> record,
122122
produceTestRecords(bootstrapServers, topic, 2);
123123

124124
// Wait and verify second batch is NOT processed yet
125-
Thread.sleep(3000);
125+
// Using a latch that should NOT count down to verify blocking behavior
126+
assertThat(secondBatchLatch.await(3, TimeUnit.SECONDS)).isFalse();
126127
assertThat(totalProcessed.get()).isEqualTo(3);
127128
assertThat(secondBatchLatch.getCount()).isEqualTo(2);
128129

@@ -198,7 +199,8 @@ public void onShareRecord(ConsumerRecord<String, String> record,
198199
produceTestRecords(bootstrapServers, topic, 1);
199200

200201
// Should not process new records while one is still pending
201-
Thread.sleep(3000);
202+
// Using a latch that should NOT count down to verify blocking behavior
203+
assertThat(nextPollLatch.await(3, TimeUnit.SECONDS)).isFalse();
202204
assertThat(totalProcessed.get()).isEqualTo(4);
203205

204206
// Acknowledge the last pending record

spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.CountDownLatch;
2828
import java.util.concurrent.TimeUnit;
29-
import java.util.concurrent.atomic.AtomicBoolean;
3029
import java.util.concurrent.atomic.AtomicInteger;
3130
import java.util.concurrent.atomic.AtomicReference;
3231

@@ -164,7 +163,7 @@ public void onShareRecord(ConsumerRecord<String, String> record,
164163
assertThat(received).hasSize(3);
165164
assertThat(acknowledgments).hasSize(3);
166165
assertThat(acknowledgments).allMatch(Objects::nonNull);
167-
assertThat(acknowledgments).allMatch(this::isAcknowledgedInternal);
166+
assertThat(acknowledgments).allMatch(ShareKafkaMessageListenerContainerIntegrationTests::isAcknowledgedInternal);
168167
assertThat(acknowledgments).allMatch(ack -> getAcknowledgmentTypeInternal(ack) == AcknowledgeType.ACCEPT);
169168
}
170169
finally {
@@ -267,18 +266,19 @@ public void onShareRecord(ConsumerRecord<String, String> record,
267266
assertThat(firstBatchLatch.await(15, TimeUnit.SECONDS)).isTrue();
268267
assertThat(pendingAcks).hasSize(3);
269268

270-
// Wait a bit to ensure no more records are processed while acknowledgments are pending
271-
Thread.sleep(2000);
269+
// Produce more records for second batch while first is pending
270+
produceTestRecords(bootstrapServers, topic, 3);
271+
272+
// Verify second batch is NOT processed yet while acknowledgments are pending
273+
// Using a latch that should NOT count down to verify blocking behavior
274+
assertThat(secondBatchLatch.await(2, TimeUnit.SECONDS)).isFalse();
272275
assertThat(processedCount.get()).isEqualTo(3);
273276

274277
// Acknowledge first batch
275278
for (ShareAcknowledgment ack : pendingAcks) {
276279
ack.acknowledge();
277280
}
278281

279-
// Produce more records for second batch
280-
produceTestRecords(bootstrapServers, topic, 3);
281-
282282
// Now second batch should be processed
283283
assertThat(secondBatchLatch.await(15, TimeUnit.SECONDS)).isTrue();
284284
assertThat(processedCount.get()).isEqualTo(6);
@@ -460,12 +460,21 @@ void shouldHandleContainerLifecycle(EmbeddedKafkaBroker broker) throws Exception
460460
ContainerProperties containerProps = new ContainerProperties(topic);
461461
containerProps.setExplicitShareAcknowledgment(true);
462462

463-
AtomicBoolean listenerCalled = new AtomicBoolean(false);
463+
CountDownLatch firstProcessingLatch = new CountDownLatch(1);
464+
CountDownLatch secondProcessingLatch = new CountDownLatch(1);
465+
AtomicInteger callCount = new AtomicInteger(0);
466+
464467
containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener<String, String>() {
465468
@Override
466469
public void onShareRecord(ConsumerRecord<String, String> record,
467470
@Nullable ShareAcknowledgment acknowledgment, ShareConsumer<?, ?> consumer) {
468-
listenerCalled.set(true);
471+
int count = callCount.incrementAndGet();
472+
if (count == 1) {
473+
firstProcessingLatch.countDown();
474+
}
475+
else if (count == 2) {
476+
secondProcessingLatch.countDown();
477+
}
469478
acknowledgment.acknowledge();
470479
}
471480
});
@@ -474,35 +483,27 @@ public void onShareRecord(ConsumerRecord<String, String> record,
474483
new ShareKafkaMessageListenerContainer<>(factory, containerProps);
475484
container.setBeanName("lifecycleTestContainer");
476485

477-
// Test initial state
478486
assertThat(container.isRunning()).isFalse();
479487

480-
// Test start
481488
container.start();
482489
assertThat(container.isRunning()).isTrue();
483490

484-
// Test processing
485491
produceTestRecords(bootstrapServers, topic, 1);
486-
Thread.sleep(3000); // Give time for processing
487-
assertThat(listenerCalled.get()).isTrue();
492+
assertThat(firstProcessingLatch.await(10, TimeUnit.SECONDS)).isTrue();
488493

489-
// Test stop
490494
container.stop();
491495
assertThat(container.isRunning()).isFalse();
492496

493-
// Test restart
494-
listenerCalled.set(false);
495497
container.start();
496498
assertThat(container.isRunning()).isTrue();
497499

498500
produceTestRecords(bootstrapServers, topic, 1);
499-
Thread.sleep(3000);
500-
assertThat(listenerCalled.get()).isTrue();
501+
assertThat(secondProcessingLatch.await(10, TimeUnit.SECONDS)).isTrue();
501502

502503
container.stop();
503504
}
504505

505-
private void testBasicMessageListener(DefaultShareConsumerFactory<String, String> factory,
506+
private static void testBasicMessageListener(DefaultShareConsumerFactory<String, String> factory,
506507
String topic, String bootstrapServers, String groupId) throws Exception {
507508

508509
setShareAutoOffsetResetEarliest(bootstrapServers, groupId);
@@ -529,7 +530,7 @@ private void testBasicMessageListener(DefaultShareConsumerFactory<String, String
529530
}
530531
}
531532

532-
private void testShareConsumerAwareListener(DefaultShareConsumerFactory<String, String> factory,
533+
private static void testShareConsumerAwareListener(DefaultShareConsumerFactory<String, String> factory,
533534
String topic, String bootstrapServers, String groupId) throws Exception {
534535

535536
setShareAutoOffsetResetEarliest(bootstrapServers, groupId);
@@ -566,7 +567,7 @@ public void onShareRecord(ConsumerRecord<String, String> record,
566567
}
567568
}
568569

569-
private void testAckListenerInImplicitMode(DefaultShareConsumerFactory<String, String> factory,
570+
private static void testAckListenerInImplicitMode(DefaultShareConsumerFactory<String, String> factory,
570571
String topic, String bootstrapServers, String groupId) throws Exception {
571572

572573
setShareAutoOffsetResetEarliest(bootstrapServers, groupId);
@@ -602,7 +603,7 @@ public void onShareRecord(ConsumerRecord<String, String> record,
602603
}
603604

604605
// Utility methods
605-
private Map<String, Object> createConsumerProps(String bootstrapServers, String groupId, boolean explicit) {
606+
private static Map<String, Object> createConsumerProps(String bootstrapServers, String groupId, boolean explicit) {
606607
Map<String, Object> props = new HashMap<>();
607608
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
608609
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@@ -614,23 +615,23 @@ private Map<String, Object> createConsumerProps(String bootstrapServers, String
614615
return props;
615616
}
616617

617-
private void produceTestRecords(String bootstrapServers, String topic, int count) throws Exception {
618+
private static void produceTestRecords(String bootstrapServers, String topic, int count) throws Exception {
618619
try (var producer = createProducer(bootstrapServers)) {
619620
for (int i = 0; i < count; i++) {
620621
producer.send(new ProducerRecord<>(topic, "key" + i, "value" + i)).get();
621622
}
622623
}
623624
}
624625

625-
private KafkaProducer<String, String> createProducer(String bootstrapServers) {
626+
private static KafkaProducer<String, String> createProducer(String bootstrapServers) {
626627
Map<String, Object> producerProps = new HashMap<>();
627628
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
628629
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
629630
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
630631
return new KafkaProducer<>(producerProps);
631632
}
632633

633-
private void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception {
634+
private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception {
634635
Map<String, Object> adminProperties = new HashMap<>();
635636
adminProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
636637
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
@@ -645,7 +646,7 @@ private void setShareAutoOffsetResetEarliest(String bootstrapServers, String gro
645646
/**
646647
* Helper method to access internal acknowledgment state for testing.
647648
*/
648-
private boolean isAcknowledgedInternal(ShareAcknowledgment ack) {
649+
private static boolean isAcknowledgedInternal(ShareAcknowledgment ack) {
649650
try {
650651
java.lang.reflect.Method method = ack.getClass().getDeclaredMethod("isAcknowledged");
651652
method.setAccessible(true);
@@ -659,7 +660,7 @@ private boolean isAcknowledgedInternal(ShareAcknowledgment ack) {
659660
/**
660661
* Helper method to access internal acknowledgment type for testing.
661662
*/
662-
private AcknowledgeType getAcknowledgmentTypeInternal(ShareAcknowledgment ack) {
663+
private static AcknowledgeType getAcknowledgmentTypeInternal(ShareAcknowledgment ack) {
663664
try {
664665
java.lang.reflect.Method method = ack.getClass().getDeclaredMethod("getAcknowledgmentType");
665666
method.setAccessible(true);

0 commit comments

Comments
 (0)