Skip to content

Commit a13c9c0

Browse files
committed
Refactor ShareKafkaListenerContainerFactory and fix compilation warnings
- Change autoStartup and phase from nullable wrappers to primitives with defaults - Add @SuppressWarnings(NullAway.Init) to applicationEventPublisher and applicationContext - Use ShareAcknowledgementMode.fromString() API instead of manual string parsing - Use ArgumentMatchers with explicit type parameters to eliminate unchecked warnings in tests - Remove deprecated imports from ShareRecordMessagingMessageListenerAdapter - Use FQCN for JacksonProjectingMessageConverter and ProjectingMessageConverter - Fix syntax error in kafka-queues.adoc documentation example Signed-off-by: Soby Chacko <[email protected]>
1 parent 43b8c43 commit a13c9c0

File tree

6 files changed

+39
-102
lines changed

6 files changed

+39
-102
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -467,9 +467,10 @@ This feature helps developers quickly identify when acknowledgment calls are mis
467467
else {
468468
acknowledgment.release(); // Temporary failure - retry later
469469
}
470-
else {
471-
acknowledgment.reject(); // Invalid order - don't retry
472-
}
470+
}
471+
else {
472+
acknowledgment.reject(); // Invalid order - don't retry
473+
}
473474
}
474475
catch (Exception e) {
475476
// Exception automatically triggers REJECT

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
653653
containerFactory, beanName);
654654

655655
if (listenerContainerFactory instanceof ShareKafkaListenerContainerFactory<?, ?>) {
656-
endpoint.setShareConsumer(Boolean.TRUE);
656+
endpoint.setShareConsumer(true);
657657
}
658658

659659
this.registrar.registerEndpoint(endpoint, listenerContainerFactory);

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.regex.Pattern;
2222

2323
import org.apache.kafka.clients.consumer.ConsumerConfig;
24-
import org.jspecify.annotations.Nullable;
24+
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
2525

2626
import org.springframework.context.ApplicationContext;
2727
import org.springframework.context.ApplicationContextAware;
@@ -54,17 +54,20 @@
5454
* @since 4.0
5555
*/
5656
public class ShareKafkaListenerContainerFactory<K, V>
57-
implements KafkaListenerContainerFactory<ShareKafkaMessageListenerContainer<K, V>>, ApplicationEventPublisherAware, ApplicationContextAware {
57+
implements KafkaListenerContainerFactory<ShareKafkaMessageListenerContainer<K, V>>,
58+
ApplicationEventPublisherAware, ApplicationContextAware {
5859

5960
private final ShareConsumerFactory<? super K, ? super V> shareConsumerFactory;
6061

61-
private @Nullable Boolean autoStartup;
62+
private boolean autoStartup = true;
6263

63-
private @Nullable Integer phase;
64+
private int phase = 0;
6465

65-
private @Nullable ApplicationEventPublisher applicationEventPublisher;
66+
@SuppressWarnings("NullAway.Init")
67+
private ApplicationEventPublisher applicationEventPublisher;
6668

67-
private @Nullable ApplicationContext applicationContext;
69+
@SuppressWarnings("NullAway.Init")
70+
private ApplicationContext applicationContext;
6871

6972
/**
7073
* Construct an instance with the provided consumer factory.
@@ -83,15 +86,15 @@ public void setApplicationContext(ApplicationContext applicationContext) {
8386
* Set whether containers created by this factory should auto-start.
8487
* @param autoStartup true to auto-start
8588
*/
86-
public void setAutoStartup(Boolean autoStartup) {
89+
public void setAutoStartup(boolean autoStartup) {
8790
this.autoStartup = autoStartup;
8891
}
8992

9093
/**
9194
* Set the phase in which containers created by this factory should start and stop.
9295
* @param phase the phase
9396
*/
94-
public void setPhase(Integer phase) {
97+
public void setPhase(int phase) {
9598
this.phase = phase;
9699
}
97100

@@ -126,7 +129,7 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> endpoint) {
126129
*/
127130
protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> instance, KafkaListenerEndpoint endpoint) {
128131
ContainerProperties properties = instance.getContainerProperties();
129-
Boolean effectiveAutoStartup = endpoint.getAutoStartup() != null ? endpoint.getAutoStartup() : this.autoStartup;
132+
boolean effectiveAutoStartup = endpoint.getAutoStartup() != null ? endpoint.getAutoStartup() : this.autoStartup;
130133

131134
// Validate share group configuration
132135
validateShareConfiguration(endpoint);
@@ -135,11 +138,12 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
135138
boolean explicitAck = determineExplicitAcknowledgment(properties);
136139
properties.setExplicitShareAcknowledgment(explicitAck);
137140

141+
instance.setAutoStartup(effectiveAutoStartup);
142+
instance.setPhase(this.phase);
143+
instance.setApplicationContext(this.applicationContext);
144+
instance.setApplicationEventPublisher(this.applicationEventPublisher);
145+
138146
JavaUtils.INSTANCE
139-
.acceptIfNotNull(effectiveAutoStartup, instance::setAutoStartup)
140-
.acceptIfNotNull(this.phase, instance::setPhase)
141-
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
142-
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
143147
.acceptIfNotNull(endpoint.getGroupId(), properties::setGroupId)
144148
.acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId);
145149
}
@@ -163,18 +167,8 @@ private boolean determineExplicitAcknowledgment(ContainerProperties containerPro
163167
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
164168

165169
if (clientAckMode != null) {
166-
String mode = clientAckMode.toString().toLowerCase();
167-
if ("explicit".equals(mode)) {
168-
return true;
169-
}
170-
else if ("implicit".equals(mode)) {
171-
return false;
172-
}
173-
else {
174-
throw new IllegalArgumentException(
175-
"Invalid " + ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG + ": " + mode +
176-
". Must be 'implicit' or 'explicit'");
177-
}
170+
ShareAcknowledgementMode mode = ShareAcknowledgementMode.fromString(clientAckMode.toString());
171+
return mode == ShareAcknowledgementMode.EXPLICIT;
178172
}
179173
// Default to implicit acknowledgment (false)
180174
return containerProperties.isExplicitShareAcknowledgment();

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import org.springframework.kafka.listener.AcknowledgingShareConsumerAwareMessageListener;
2727
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
2828
import org.springframework.kafka.support.ShareAcknowledgment;
29-
import org.springframework.kafka.support.converter.JacksonProjectingMessageConverter;
30-
import org.springframework.kafka.support.converter.ProjectingMessageConverter;
3129
import org.springframework.kafka.support.converter.RecordMessageConverter;
3230
import org.springframework.messaging.Message;
3331

@@ -50,10 +48,6 @@
5048
public class ShareRecordMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
5149
implements AcknowledgingShareConsumerAwareMessageListener<K, V> {
5250

53-
public ShareRecordMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method) {
54-
this(bean, method, null);
55-
}
56-
5751
public ShareRecordMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method,
5852
@Nullable KafkaListenerErrorHandler errorHandler) {
5953
super(bean, method, errorHandler);
@@ -81,8 +75,8 @@ public void onShareRecord(ConsumerRecord<K, V> record, @Nullable ShareAcknowledg
8175
}
8276
if (logger.isDebugEnabled()) {
8377
RecordMessageConverter messageConverter = getMessageConverter();
84-
if (!(messageConverter instanceof JacksonProjectingMessageConverter
85-
|| messageConverter instanceof ProjectingMessageConverter)) {
78+
if (!(messageConverter instanceof org.springframework.kafka.support.converter.JacksonProjectingMessageConverter
79+
|| messageConverter instanceof org.springframework.kafka.support.converter.ProjectingMessageConverter)) {
8680
this.logger.debug("Processing [" + message + "]");
8781
}
8882
}

spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgmentException.java

Lines changed: 0 additions & 57 deletions
This file was deleted.

spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.awaitility.Awaitility;
4949
import org.jspecify.annotations.Nullable;
5050
import org.junit.jupiter.api.Test;
51+
import org.mockito.ArgumentMatchers;
5152

5253
import org.springframework.beans.DirectFieldAccessor;
5354
import org.springframework.core.log.LogAccessor;
@@ -58,7 +59,6 @@
5859
import org.springframework.kafka.test.utils.KafkaTestUtils;
5960

6061
import static org.assertj.core.api.Assertions.assertThat;
61-
import static org.mockito.ArgumentMatchers.any;
6262
import static org.mockito.Mockito.atLeastOnce;
6363
import static org.mockito.Mockito.spy;
6464
import static org.mockito.Mockito.verify;
@@ -150,7 +150,8 @@ void shouldSupportExplicitAcknowledgmentMode(EmbeddedKafkaBroker broker) throws
150150
List<String> received = Collections.synchronizedList(new ArrayList<>());
151151
List<ShareAcknowledgment> acknowledgments = Collections.synchronizedList(new ArrayList<>());
152152

153-
containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener<String, String>) (record, acknowledgment, consumer) -> {
153+
containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener<String, String>) (
154+
record, acknowledgment, consumer) -> {
154155
received.add(record.value());
155156
acknowledgments.add(acknowledgment);
156157

@@ -264,7 +265,8 @@ void shouldEnforceExplicitAcknowledgmentConstraints(EmbeddedKafkaBroker broker)
264265
container.setBeanName("constraintTestContainer");
265266
container.start();
266267

267-
LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger", LogAccessor.class));
268+
LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger",
269+
LogAccessor.class));
268270

269271
DirectFieldAccessor accessor = new DirectFieldAccessor(container);
270272
accessor.setPropertyValue("listenerConsumer.logger", logAccessor);
@@ -280,8 +282,9 @@ void shouldEnforceExplicitAcknowledgmentConstraints(EmbeddedKafkaBroker broker)
280282
// Wait for the next poll to be blocked since no explicit acknowledgment has been made yet.
281283
// this.logger.trace(() -> "Poll blocked waiting for " + this.pendingAcknowledgments.size() +
282284
// " acknowledgments");
283-
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(
284-
() -> verify(logAccessor, atLeastOnce()).trace(any(Supplier.class)));
285+
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() ->
286+
verify(logAccessor, atLeastOnce()).trace(ArgumentMatchers.<Supplier<CharSequence>>any())
287+
);
285288

286289
assertThat(processedCount.get()).isEqualTo(3);
287290

@@ -340,7 +343,8 @@ void shouldHandlePartialAcknowledgmentCorrectly(EmbeddedKafkaBroker broker) thro
340343
container.setBeanName("partialAckTestContainer");
341344
container.start();
342345

343-
LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger", LogAccessor.class));
346+
LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger",
347+
LogAccessor.class));
344348

345349
DirectFieldAccessor accessor = new DirectFieldAccessor(container);
346350
accessor.setPropertyValue("listenerConsumer.logger", logAccessor);
@@ -363,8 +367,9 @@ void shouldHandlePartialAcknowledgmentCorrectly(EmbeddedKafkaBroker broker) thro
363367
// Wait for the next poll to be blocked since one acknowledgment is still pending.
364368
// this.logger.trace(() -> "Poll blocked waiting for " + this.pendingAcknowledgments.size() +
365369
// " acknowledgments");
366-
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(
367-
() -> verify(logAccessor, atLeastOnce()).trace(any(Supplier.class)));
370+
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() ->
371+
verify(logAccessor, atLeastOnce()).trace(ArgumentMatchers.<Supplier<CharSequence>>any())
372+
);
368373

369374
assertThat(totalProcessed.get()).isEqualTo(4);
370375

0 commit comments

Comments
 (0)