From 4d329a5143e72ef326944731ae65d92ef241d022 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 4 Apr 2025 21:35:11 -0400 Subject: [PATCH] Further changes for nullability in the config package Signed-off-by: Soby Chacko --- ...AbstractKafkaListenerContainerFactory.java | 5 +- .../config/AbstractKafkaListenerEndpoint.java | 69 +++++++------------ ...ncurrentKafkaListenerContainerFactory.java | 6 +- .../kafka/config/KafkaListenerEndpoint.java | 6 +- .../config/KafkaListenerEndpointAdapter.java | 24 +++---- .../KafkaListenerEndpointRegistrar.java | 24 +++---- .../kafka/config/KafkaStreamsCustomizer.java | 3 +- .../config/MethodKafkaListenerEndpoint.java | 8 +-- .../config/StreamsBuilderFactoryBean.java | 19 ++--- .../kafka/support/JavaUtils.java | 2 +- 10 files changed, 61 insertions(+), 105 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index e95f901264..331161d8c8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -105,8 +105,7 @@ public abstract class AbstractKafkaListenerContainerFactory batchToRecordAdapter; - @SuppressWarnings("NullAway.Init") - private ApplicationContext applicationContext; + private @Nullable ApplicationContext applicationContext; private @Nullable ContainerCustomizer containerCustomizer; @@ -404,7 +403,7 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint aklEndpoint) * @param instance the container instance to configure. * @param endpoint the endpoint. */ - @SuppressWarnings({"deprecation", "NullAway"}) + @SuppressWarnings({"NullAway"}) protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) { ContainerProperties properties = instance.getContainerProperties(); BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern", diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index a9e6f00788..6eb6e1f95b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -82,8 +82,7 @@ public abstract class AbstractKafkaListenerEndpoint private final Collection topicPartitions = new ArrayList<>(); - @SuppressWarnings("NullAway.Init") - private BeanFactory beanFactory; + private @Nullable BeanFactory beanFactory; private @Nullable BeanExpressionResolver resolver; @@ -115,8 +114,7 @@ public abstract class AbstractKafkaListenerEndpoint private @Nullable BatchToRecordAdapter batchToRecordAdapter; - @SuppressWarnings("NullAway.Init") - private byte[] listenerInfo; + private byte @Nullable [] listenerInfo; private @Nullable String correlationHeaderName; @@ -135,23 +133,19 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanResolver = new BeanFactoryResolver(beanFactory); } - @Nullable - protected BeanFactory getBeanFactory() { + protected @Nullable BeanFactory getBeanFactory() { return this.beanFactory; } - @Nullable - protected BeanExpressionResolver getResolver() { + protected @Nullable BeanExpressionResolver getResolver() { return this.resolver; } - @Nullable - protected BeanExpressionContext getBeanExpressionContext() { + protected @Nullable BeanExpressionContext getBeanExpressionContext() { return this.expressionContext; } - @Nullable - protected BeanResolver getBeanResolver() { + protected @Nullable BeanResolver getBeanResolver() { return this.beanResolver; } @@ -164,14 +158,12 @@ public void setMainListenerId(@Nullable String id) { } @Override - @Nullable - public String getMainListenerId() { + public @Nullable String getMainListenerId() { return this.mainListenerId; } - @Nullable @Override - public String getId() { + public @Nullable String getId() { return this.id; } @@ -185,9 +177,8 @@ public void setGroupId(@Nullable String groupId) { this.groupId = groupId; } - @Nullable @Override - public String getGroupId() { + public @Nullable String getGroupId() { return this.groupId; } @@ -233,9 +224,8 @@ public void setTopicPartitions(TopicPartitionOffset... topicPartitions) { * @return the topicPartitions for this endpoint. * @since 2.3 */ - @Nullable @Override - public TopicPartitionOffset[] getTopicPartitionsToAssign() { + public TopicPartitionOffset @Nullable [] getTopicPartitionsToAssign() { return this.topicPartitions.toArray(new TopicPartitionOffset[0]); } @@ -254,15 +244,13 @@ public void setTopicPattern(@Nullable Pattern topicPattern) { * Return the topicPattern for this endpoint. * @return the topicPattern for this endpoint. */ - @Nullable @Override - public Pattern getTopicPattern() { + public @Nullable Pattern getTopicPattern() { return this.topicPattern; } - @Nullable @Override - public String getGroup() { + public @Nullable String getGroup() { return this.group; } @@ -290,8 +278,7 @@ public boolean isBatchListener() { * @since 2.8 */ @Override - @Nullable - public Boolean getBatchListener() { + public @Nullable Boolean getBatchListener() { return this.batchListener; } @@ -313,13 +300,11 @@ public void setReplyTemplate(KafkaTemplate replyTemplate) { this.replyTemplate = replyTemplate; } - @Nullable - protected KafkaTemplate getReplyTemplate() { + protected @Nullable KafkaTemplate getReplyTemplate() { return this.replyTemplate; } - @Nullable - protected RecordFilterStrategy getRecordFilterStrategy() { + protected @Nullable RecordFilterStrategy getRecordFilterStrategy() { return this.recordFilterStrategy; } @@ -344,9 +329,8 @@ public void setAckDiscarded(boolean ackDiscarded) { this.ackDiscarded = ackDiscarded; } - @Nullable @Override - public String getClientIdPrefix() { + public @Nullable String getClientIdPrefix() { return this.clientIdPrefix; } @@ -361,8 +345,7 @@ public void setClientIdPrefix(@Nullable String clientIdPrefix) { } @Override - @Nullable - public Integer getConcurrency() { + public @Nullable Integer getConcurrency() { return this.concurrency; } @@ -376,8 +359,7 @@ public void setConcurrency(@Nullable Integer concurrency) { } @Override - @Nullable - public Boolean getAutoStartup() { + public @Nullable Boolean getAutoStartup() { return this.autoStartup; } @@ -400,8 +382,7 @@ public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigu } @Override - @Nullable - public Properties getConsumerProperties() { + public @Nullable Properties getConsumerProperties() { return this.consumerProperties; } @@ -436,9 +417,8 @@ public void setSplitIterables(boolean splitIterables) { } @Override - @SuppressWarnings("NullAway") // Dataflow analysis limitation - public byte[] getListenerInfo() { - return this.listenerInfo; // NOSONAR + public byte @Nullable [] getListenerInfo() { + return this.listenerInfo; } /** @@ -450,8 +430,7 @@ public void setListenerInfo(byte[] listenerInfo) { // NOSONAR this.listenerInfo = listenerInfo; // NOSONAR } - @Nullable - protected BatchToRecordAdapter getBatchToRecordAdapter() { + protected @Nullable BatchToRecordAdapter getBatchToRecordAdapter() { return this.batchToRecordAdapter; } @@ -525,7 +504,7 @@ public void setupListenerContainer(MessageListenerContainer listenerContainer, protected abstract MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container, @Nullable MessageConverter messageConverter); - @SuppressWarnings({"unchecked", "NullAway"}) + @SuppressWarnings("unchecked") private void setupMessageListener(MessageListenerContainer container, @Nullable MessageConverter messageConverter) { @@ -535,8 +514,6 @@ private void setupMessageListener(MessageListenerContainer container, .acceptIfNotNull(this.correlationHeaderName, adapter::setCorrelationHeaderName); adapter.setSplitIterables(this.splitIterables); Object messageListener = adapter; - Assert.state(messageListener != null, - () -> "Endpoint [" + this + "] must provide a non null message listener"); if (this.recordFilterStrategy != null) { if (isBatchListener()) { if (((MessagingMessageListenerAdapter) messageListener).isConsumerRecords()) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java index 72cb58740a..5130825a04 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java @@ -31,10 +31,10 @@ *

* This should be the default for most users and a good transition paths for those that * are used to building such container definitions manually. - * + *

* This factory is primarily for building containers for {@code KafkaListener} annotated * methods but can also be used to create any container. - * + *

* Only containers for {@code KafkaListener} annotated methods are added to the * {@code KafkaListenerEndpointRegistry}. * @@ -62,7 +62,7 @@ public void setConcurrency(Integer concurrency) { @Override protected ConcurrentMessageListenerContainer createContainerInstance(KafkaListenerEndpoint endpoint) { - @Nullable TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign(); + TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign(); if (topicPartitions != null && topicPartitions.length > 0) { ContainerProperties properties = new ContainerProperties(topicPartitions); return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java index c8b7e8937b..0e8624ef2d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java @@ -74,8 +74,7 @@ public interface KafkaListenerEndpoint { * @return the topicPartitions for this endpoint. * @since 2.3 */ - @Nullable - TopicPartitionOffset[] getTopicPartitionsToAssign(); + TopicPartitionOffset @Nullable [] getTopicPartitionsToAssign(); /** * Return the topicPattern for this endpoint. @@ -152,8 +151,7 @@ void setupListenerContainer(MessageListenerContainer listenerContainer, * @return the info. * @since 2.8.4 */ - @SuppressWarnings("NullAway") // Dataflow analysis limitation - default byte[] getListenerInfo() { + default byte @Nullable [] getListenerInfo() { return null; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java index dcf0945d98..77a5019628 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java @@ -38,21 +38,18 @@ class KafkaListenerEndpointAdapter implements KafkaListenerEndpoint { KafkaListenerEndpointAdapter() { } - @Nullable @Override - public String getId() { + public @Nullable String getId() { return null; } - @Nullable @Override - public String getGroupId() { + public @Nullable String getGroupId() { return null; } - @Nullable @Override - public String getGroup() { + public @Nullable String getGroup() { return null; } @@ -61,33 +58,28 @@ public String getGroup() { return Collections.emptyList(); } - @Nullable @Override - public TopicPartitionOffset[] getTopicPartitionsToAssign() { + public TopicPartitionOffset @Nullable [] getTopicPartitionsToAssign() { return new TopicPartitionOffset[0]; } - @Nullable @Override - public Pattern getTopicPattern() { + public @Nullable Pattern getTopicPattern() { return null; } - @Nullable @Override - public String getClientIdPrefix() { + public @Nullable String getClientIdPrefix() { return null; } - @Nullable @Override - public Integer getConcurrency() { + public @Nullable Integer getConcurrency() { return null; } - @Nullable @Override - public Boolean getAutoStartup() { // NOSONAR + public @Nullable Boolean getAutoStartup() { // NOSONAR return null; // NOSONAR null check by caller } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java index 8b66c37b35..b3b703322e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java @@ -63,8 +63,7 @@ public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, Initial private @Nullable String containerFactoryBeanName; - @SuppressWarnings("NullAway.Init") - private BeanFactory beanFactory; + private @Nullable BeanFactory beanFactory; private boolean startImmediately; @@ -84,8 +83,7 @@ public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) * @return the {@link KafkaListenerEndpointRegistry} instance for this * registrar, may be {@code null}. */ - @Nullable - public KafkaListenerEndpointRegistry getEndpointRegistry() { + public @Nullable KafkaListenerEndpointRegistry getEndpointRegistry() { return this.endpointRegistry; } @@ -130,8 +128,7 @@ public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHand * Return the custom {@link MessageHandlerMethodFactory} to use, if any. * @return the custom {@link MessageHandlerMethodFactory} to use, if any. */ - @Nullable - public MessageHandlerMethodFactory getMessageHandlerMethodFactory() { + public @Nullable MessageHandlerMethodFactory getMessageHandlerMethodFactory() { return this.messageHandlerMethodFactory; } @@ -173,8 +170,7 @@ public void setBeanFactory(BeanFactory beanFactory) { * @return the validator. * @since 2.2 */ - @Nullable - public Validator getValidator() { + public @Nullable Validator getValidator() { return this.validator; } @@ -279,13 +275,13 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint) { private record KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint, @Nullable KafkaListenerContainerFactory containerFactory) { - private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint, - @Nullable KafkaListenerContainerFactory containerFactory) { - - this.endpoint = endpoint; - this.containerFactory = containerFactory; - } + private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint, + @Nullable KafkaListenerContainerFactory containerFactory) { + this.endpoint = endpoint; + this.containerFactory = containerFactory; } + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaStreamsCustomizer.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaStreamsCustomizer.java index 4b492cf768..6df6d203f2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaStreamsCustomizer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaStreamsCustomizer.java @@ -21,7 +21,6 @@ import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; -import org.jspecify.annotations.Nullable; /** * Callback interface that can be used to configure {@link KafkaStreams} directly. @@ -49,7 +48,7 @@ public interface KafkaStreamsCustomizer { * @since 3.3.0 */ default KafkaStreams initKafkaStreams( - @Nullable Topology topology, + Topology topology, Properties properties, KafkaClientSupplier clientSupplier ) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index d0f3b187d1..80de421ee0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -68,8 +68,7 @@ public class MethodKafkaListenerEndpoint extends AbstractKafkaListenerEndp @SuppressWarnings("NullAway.Init") private Method method; - @SuppressWarnings("NullAway.Init") - private MessageHandlerMethodFactory messageHandlerMethodFactory; + private @Nullable MessageHandlerMethodFactory messageHandlerMethodFactory; private @Nullable KafkaListenerErrorHandler errorHandler; @@ -195,6 +194,8 @@ protected MessagingMessageListenerAdapter createMessageListener(MessageLis * @return the handler adapter. */ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) { + Assert.state(this.messageHandlerMethodFactory != null, + "MessageHandlerMethodFactory must not be null"); InvocableHandlerMethod invocableHandlerMethod = this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod()); return new HandlerAdapter(invocableHandlerMethod); @@ -239,8 +240,7 @@ protected MessagingMessageListenerAdapter createMessageListenerInstance( return listener; } - @SuppressWarnings("null") - private String resolve(String value) { + private @Nullable String resolve(String value) { BeanExpressionContext beanExpressionContext = getBeanExpressionContext(); BeanExpressionResolver resolver = getResolver(); if (resolver != null && beanExpressionContext != null) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java index fc6a612c93..a6c27ae9f0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java @@ -87,6 +87,7 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean JavaUtils acceptIfNotNull(@Nullable T value, Consumer consumer) { * @param consumer the consumer. * @return this. */ - public JavaUtils acceptIfHasText(String value, Consumer consumer) { + public JavaUtils acceptIfHasText(@Nullable String value, Consumer consumer) { if (StringUtils.hasText(value)) { consumer.accept(value); }