Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe

private @Nullable BatchToRecordAdapter<K, V> batchToRecordAdapter;

@SuppressWarnings("NullAway.Init")
private ApplicationContext applicationContext;
private @Nullable ApplicationContext applicationContext;

private @Nullable ContainerCustomizer<K, V, C> containerCustomizer;

Expand Down Expand Up @@ -404,7 +403,7 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint)
* @param instance the container instance to configure.
* @param endpoint the endpoint.
*/
@SuppressWarnings({"deprecation", "NullAway"})
@SuppressWarnings({"NullAway"})
protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
ContainerProperties properties = instance.getContainerProperties();
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>

private final Collection<TopicPartitionOffset> topicPartitions = new ArrayList<>();

@SuppressWarnings("NullAway.Init")
private BeanFactory beanFactory;
private @Nullable BeanFactory beanFactory;

private @Nullable BeanExpressionResolver resolver;

Expand Down Expand Up @@ -115,8 +114,7 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>

private @Nullable BatchToRecordAdapter<K, V> batchToRecordAdapter;

@SuppressWarnings("NullAway.Init")
private byte[] listenerInfo;
private byte @Nullable [] listenerInfo;

private @Nullable String correlationHeaderName;

Expand All @@ -135,23 +133,19 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanResolver = new BeanFactoryResolver(beanFactory);
}

@Nullable
protected BeanFactory getBeanFactory() {
protected @Nullable BeanFactory getBeanFactory() {
return this.beanFactory;
}

@Nullable
protected BeanExpressionResolver getResolver() {
protected @Nullable BeanExpressionResolver getResolver() {
return this.resolver;
}

@Nullable
protected BeanExpressionContext getBeanExpressionContext() {
protected @Nullable BeanExpressionContext getBeanExpressionContext() {
return this.expressionContext;
}

@Nullable
protected BeanResolver getBeanResolver() {
protected @Nullable BeanResolver getBeanResolver() {
return this.beanResolver;
}

Expand All @@ -164,14 +158,12 @@ public void setMainListenerId(@Nullable String id) {
}

@Override
@Nullable
public String getMainListenerId() {
public @Nullable String getMainListenerId() {
return this.mainListenerId;
}

@Nullable
@Override
public String getId() {
public @Nullable String getId() {
return this.id;
}

Expand All @@ -185,9 +177,8 @@ public void setGroupId(@Nullable String groupId) {
this.groupId = groupId;
}

@Nullable
@Override
public String getGroupId() {
public @Nullable String getGroupId() {
return this.groupId;
}

Expand Down Expand Up @@ -233,9 +224,8 @@ public void setTopicPartitions(TopicPartitionOffset... topicPartitions) {
* @return the topicPartitions for this endpoint.
* @since 2.3
*/
@Nullable
@Override
public TopicPartitionOffset[] getTopicPartitionsToAssign() {
public TopicPartitionOffset @Nullable [] getTopicPartitionsToAssign() {
return this.topicPartitions.toArray(new TopicPartitionOffset[0]);
}

Expand All @@ -254,15 +244,13 @@ public void setTopicPattern(@Nullable Pattern topicPattern) {
* Return the topicPattern for this endpoint.
* @return the topicPattern for this endpoint.
*/
@Nullable
@Override
public Pattern getTopicPattern() {
public @Nullable Pattern getTopicPattern() {
return this.topicPattern;
}

@Nullable
@Override
public String getGroup() {
public @Nullable String getGroup() {
return this.group;
}

Expand Down Expand Up @@ -290,8 +278,7 @@ public boolean isBatchListener() {
* @since 2.8
*/
@Override
@Nullable
public Boolean getBatchListener() {
public @Nullable Boolean getBatchListener() {
return this.batchListener;
}

Expand All @@ -313,13 +300,11 @@ public void setReplyTemplate(KafkaTemplate<?, ?> replyTemplate) {
this.replyTemplate = replyTemplate;
}

@Nullable
protected KafkaTemplate<?, ?> getReplyTemplate() {
protected @Nullable KafkaTemplate<?, ?> getReplyTemplate() {
return this.replyTemplate;
}

@Nullable
protected RecordFilterStrategy<? super K, ? super V> getRecordFilterStrategy() {
protected @Nullable RecordFilterStrategy<? super K, ? super V> getRecordFilterStrategy() {
return this.recordFilterStrategy;
}

Expand All @@ -344,9 +329,8 @@ public void setAckDiscarded(boolean ackDiscarded) {
this.ackDiscarded = ackDiscarded;
}

@Nullable
@Override
public String getClientIdPrefix() {
public @Nullable String getClientIdPrefix() {
return this.clientIdPrefix;
}

Expand All @@ -361,8 +345,7 @@ public void setClientIdPrefix(@Nullable String clientIdPrefix) {
}

@Override
@Nullable
public Integer getConcurrency() {
public @Nullable Integer getConcurrency() {
return this.concurrency;
}

Expand All @@ -376,8 +359,7 @@ public void setConcurrency(@Nullable Integer concurrency) {
}

@Override
@Nullable
public Boolean getAutoStartup() {
public @Nullable Boolean getAutoStartup() {
return this.autoStartup;
}

Expand All @@ -400,8 +382,7 @@ public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigu
}

@Override
@Nullable
public Properties getConsumerProperties() {
public @Nullable Properties getConsumerProperties() {
return this.consumerProperties;
}

Expand Down Expand Up @@ -436,9 +417,8 @@ public void setSplitIterables(boolean splitIterables) {
}

@Override
@SuppressWarnings("NullAway") // Dataflow analysis limitation
public byte[] getListenerInfo() {
return this.listenerInfo; // NOSONAR
public byte @Nullable [] getListenerInfo() {
return this.listenerInfo;
}

/**
Expand All @@ -450,8 +430,7 @@ public void setListenerInfo(byte[] listenerInfo) { // NOSONAR
this.listenerInfo = listenerInfo; // NOSONAR
}

@Nullable
protected BatchToRecordAdapter<K, V> getBatchToRecordAdapter() {
protected @Nullable BatchToRecordAdapter<K, V> getBatchToRecordAdapter() {
return this.batchToRecordAdapter;
}

Expand Down Expand Up @@ -525,7 +504,7 @@ public void setupListenerContainer(MessageListenerContainer listenerContainer,
protected abstract MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
@Nullable MessageConverter messageConverter);

@SuppressWarnings({"unchecked", "NullAway"})
@SuppressWarnings("unchecked")
private void setupMessageListener(MessageListenerContainer container,
@Nullable MessageConverter messageConverter) {

Expand All @@ -535,8 +514,6 @@ private void setupMessageListener(MessageListenerContainer container,
.acceptIfNotNull(this.correlationHeaderName, adapter::setCorrelationHeaderName);
adapter.setSplitIterables(this.splitIterables);
Object messageListener = adapter;
Assert.state(messageListener != null,
() -> "Endpoint [" + this + "] must provide a non null message listener");
if (this.recordFilterStrategy != null) {
if (isBatchListener()) {
if (((MessagingMessageListenerAdapter<K, V>) messageListener).isConsumerRecords()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
* <p>
* This should be the default for most users and a good transition paths for those that
* are used to building such container definitions manually.
*
* <p>
* This factory is primarily for building containers for {@code KafkaListener} annotated
* methods but can also be used to create any container.
*
* <p>
* Only containers for {@code KafkaListener} annotated methods are added to the
* {@code KafkaListenerEndpointRegistry}.
*
Expand Down Expand Up @@ -62,7 +62,7 @@ public void setConcurrency(Integer concurrency) {

@Override
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
@Nullable TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
if (topicPartitions != null && topicPartitions.length > 0) {
ContainerProperties properties = new ContainerProperties(topicPartitions);
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ public interface KafkaListenerEndpoint {
* @return the topicPartitions for this endpoint.
* @since 2.3
*/
@Nullable
TopicPartitionOffset[] getTopicPartitionsToAssign();
TopicPartitionOffset @Nullable [] getTopicPartitionsToAssign();

/**
* Return the topicPattern for this endpoint.
Expand Down Expand Up @@ -152,8 +151,7 @@ void setupListenerContainer(MessageListenerContainer listenerContainer,
* @return the info.
* @since 2.8.4
*/
@SuppressWarnings("NullAway") // Dataflow analysis limitation
default byte[] getListenerInfo() {
default byte @Nullable [] getListenerInfo() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,18 @@ class KafkaListenerEndpointAdapter implements KafkaListenerEndpoint {
KafkaListenerEndpointAdapter() {
}

@Nullable
@Override
public String getId() {
public @Nullable String getId() {
return null;
}

@Nullable
@Override
public String getGroupId() {
public @Nullable String getGroupId() {
return null;
}

@Nullable
@Override
public String getGroup() {
public @Nullable String getGroup() {
return null;
}

Expand All @@ -61,33 +58,28 @@ public String getGroup() {
return Collections.emptyList();
}

@Nullable
@Override
public TopicPartitionOffset[] getTopicPartitionsToAssign() {
public TopicPartitionOffset @Nullable [] getTopicPartitionsToAssign() {
return new TopicPartitionOffset[0];
}

@Nullable
@Override
public Pattern getTopicPattern() {
public @Nullable Pattern getTopicPattern() {
return null;
}

@Nullable
@Override
public String getClientIdPrefix() {
public @Nullable String getClientIdPrefix() {
return null;
}

@Nullable
@Override
public Integer getConcurrency() {
public @Nullable Integer getConcurrency() {
return null;
}

@Nullable
@Override
public Boolean getAutoStartup() { // NOSONAR
public @Nullable Boolean getAutoStartup() { // NOSONAR
return null; // NOSONAR null check by caller
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, Initial

private @Nullable String containerFactoryBeanName;

@SuppressWarnings("NullAway.Init")
private BeanFactory beanFactory;
private @Nullable BeanFactory beanFactory;

private boolean startImmediately;

Expand All @@ -84,8 +83,7 @@ public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry)
* @return the {@link KafkaListenerEndpointRegistry} instance for this
* registrar, may be {@code null}.
*/
@Nullable
public KafkaListenerEndpointRegistry getEndpointRegistry() {
public @Nullable KafkaListenerEndpointRegistry getEndpointRegistry() {
return this.endpointRegistry;
}

Expand Down Expand Up @@ -130,8 +128,7 @@ public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHand
* Return the custom {@link MessageHandlerMethodFactory} to use, if any.
* @return the custom {@link MessageHandlerMethodFactory} to use, if any.
*/
@Nullable
public MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
public @Nullable MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
return this.messageHandlerMethodFactory;
}

Expand Down Expand Up @@ -173,8 +170,7 @@ public void setBeanFactory(BeanFactory beanFactory) {
* @return the validator.
* @since 2.2
*/
@Nullable
public Validator getValidator() {
public @Nullable Validator getValidator() {
return this.validator;
}

Expand Down Expand Up @@ -279,13 +275,13 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint) {
private record KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
@Nullable KafkaListenerContainerFactory<?> containerFactory) {

private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
@Nullable KafkaListenerContainerFactory<?> containerFactory) {

this.endpoint = endpoint;
this.containerFactory = containerFactory;
}
private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
@Nullable KafkaListenerContainerFactory<?> containerFactory) {

this.endpoint = endpoint;
this.containerFactory = containerFactory;
}

}

}
Loading