Skip to content

Commit 4f20673

Browse files
committed
Address Sonar Issues
1 parent df9b014 commit 4f20673

12 files changed

+64
-42
lines changed

spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaAvroBeanRegistrationAotProcessor.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,13 @@ private static void checkType(@Nullable Type paramType, Set<Class<?>> avroTypes)
105105
avroTypes.add((Class<?>) paramType);
106106
}
107107
}
108-
else if (container) {
109-
if (paramType instanceof ParameterizedType) {
110-
Type[] generics = ((ParameterizedType) paramType).getActualTypeArguments();
111-
if (generics.length > 0) {
112-
checkAvro(generics[0], avroTypes);
113-
}
114-
if (generics.length == 2) {
115-
checkAvro(generics[1], avroTypes);
116-
}
108+
else if (container && paramType instanceof ParameterizedType) {
109+
Type[] generics = ((ParameterizedType) paramType).getActualTypeArguments();
110+
if (generics.length > 0) {
111+
checkAvro(generics[0], avroTypes);
112+
}
113+
if (generics.length == 2) {
114+
checkAvro(generics[1], avroTypes);
117115
}
118116
}
119117
}

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,11 @@ protected BeanResolver getBeanResolver() {
148148
return this.beanResolver;
149149
}
150150

151-
public void setId(String id) {
151+
public void setId(@Nullable String id) {
152152
this.id = id;
153153
}
154154

155-
public void setMainListenerId(String id) {
155+
public void setMainListenerId(@Nullable String id) {
156156
this.mainListenerId = id;
157157
}
158158

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.springframework.kafka.event.ContainerStoppedEvent;
4949
import org.springframework.kafka.support.KafkaHeaders;
5050
import org.springframework.kafka.support.TopicPartitionOffset;
51+
import org.springframework.lang.NonNull;
5152
import org.springframework.lang.Nullable;
5253
import org.springframework.util.Assert;
5354
import org.springframework.util.StringUtils;
@@ -67,6 +68,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
6768
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
6869
ApplicationContextAware {
6970

71+
private static final String VERSION_2_8 = "2.8";
72+
7073
/**
7174
* The default {@link org.springframework.context.SmartLifecycle} phase for listener
7275
* containers {@value #DEFAULT_PHASE}.
@@ -85,7 +88,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
8588

8689
private final Set<TopicPartition> pauseRequestedPartitions = ConcurrentHashMap.newKeySet();
8790

88-
private String beanName;
91+
@NonNull
92+
private String beanName = "noBeanNameSet";
8993

9094
private ApplicationEventPublisher applicationEventPublisher;
9195

@@ -217,7 +221,7 @@ public ApplicationEventPublisher getApplicationEventPublisher() {
217221
* @deprecated in favor of {@link #setCommonErrorHandler(CommonErrorHandler)}
218222
* @see #setCommonErrorHandler(CommonErrorHandler)
219223
*/
220-
@Deprecated(since = "2.8", forRemoval = true) // in 3.1
224+
@Deprecated(since = VERSION_2_8, forRemoval = true) // in 3.1
221225
public void setErrorHandler(ErrorHandler errorHandler) {
222226
this.errorHandler = errorHandler;
223227
}
@@ -229,7 +233,7 @@ public void setErrorHandler(ErrorHandler errorHandler) {
229233
* @deprecated in favor of {@link #setCommonErrorHandler(CommonErrorHandler)}
230234
* @see #setCommonErrorHandler(CommonErrorHandler)
231235
*/
232-
@Deprecated(since = "2.8", forRemoval = true) // in 3.1
236+
@Deprecated(since = VERSION_2_8, forRemoval = true) // in 3.1
233237
public void setGenericErrorHandler(@Nullable GenericErrorHandler<?> errorHandler) {
234238
this.errorHandler = errorHandler;
235239
}
@@ -241,7 +245,7 @@ public void setGenericErrorHandler(@Nullable GenericErrorHandler<?> errorHandler
241245
* @deprecated in favor of {@link #setCommonErrorHandler(CommonErrorHandler)}
242246
* @see #setCommonErrorHandler(CommonErrorHandler)
243247
*/
244-
@Deprecated(since = "2.8", forRemoval = true) // in 3.1
248+
@Deprecated(since = VERSION_2_8, forRemoval = true) // in 3.1
245249
public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
246250
this.errorHandler = errorHandler;
247251
}
@@ -253,7 +257,7 @@ public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
253257
* @deprecated in favor of {@link #getCommonErrorHandler()}
254258
* @see #getCommonErrorHandler()
255259
*/
256-
@Deprecated(since = "2.8", forRemoval = true) // in 3.1
260+
@Deprecated(since = VERSION_2_8, forRemoval = true) // in 3.1
257261
@Nullable
258262
public GenericErrorHandler<?> getGenericErrorHandler() {
259263
return this.errorHandler;
@@ -374,7 +378,6 @@ public String getGroupId() {
374378
}
375379

376380
@Override
377-
@Nullable
378381
public String getListenerId() {
379382
return this.beanName; // the container factory sets the bean name to the id attribute
380383
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
614614
: new LoggingCommitCallback();
615615

616616
private final OffsetAndMetadataProvider offsetAndMetadataProvider = this.containerProperties.getOffsetAndMetadataProvider() == null
617-
? (listenerMetadata, offset) -> new OffsetAndMetadata(offset)
617+
? (metadata, offset) -> new OffsetAndMetadata(offset)
618618
: this.containerProperties.getOffsetAndMetadataProvider();
619619

620620
private final ListenerMetadata listenerMetadata = new DefaultListenerMetadata(KafkaMessageListenerContainer.this);
@@ -1327,14 +1327,7 @@ public boolean isLongLived() {
13271327

13281328
@Override // NOSONAR complexity
13291329
public void run() {
1330-
publishConsumerStartingEvent();
1331-
this.consumerThread = Thread.currentThread();
1332-
setupSeeks();
1333-
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
1334-
this.count = 0;
1335-
this.last = System.currentTimeMillis();
1336-
initAssignedPartitions();
1337-
publishConsumerStartedEvent();
1330+
initialize();
13381331
Throwable exitThrowable = null;
13391332
boolean failedAuthRetry = false;
13401333
while (isRunning()) {
@@ -1400,6 +1393,17 @@ public void run() {
14001393
wrapUp(exitThrowable);
14011394
}
14021395

1396+
protected void initialize() {
1397+
publishConsumerStartingEvent();
1398+
this.consumerThread = Thread.currentThread();
1399+
setupSeeks();
1400+
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
1401+
this.count = 0;
1402+
this.last = System.currentTimeMillis();
1403+
initAssignedPartitions();
1404+
publishConsumerStartedEvent();
1405+
}
1406+
14031407
private void setupSeeks() {
14041408
if (this.consumerSeekAwareListener != null) {
14051409
this.consumerSeekAwareListener.registerSeekCallback(this);

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ default String getGroupId() {
191191
* @return the id or bean name.
192192
* @since 2.2.5
193193
*/
194-
@Nullable
195194
default String getListenerId() {
196195
throw new UnsupportedOperationException("This container does not support retrieving the listener id");
197196
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.springframework.kafka.listener.SeekUtils;
4242
import org.springframework.kafka.listener.TimestampedException;
4343
import org.springframework.kafka.support.KafkaHeaders;
44+
import org.springframework.lang.Nullable;
4445
import org.springframework.util.Assert;
4546

4647
/**
@@ -336,7 +337,7 @@ private long getFailureTimestamp(Exception e) {
336337
: Instant.now().toEpochMilli();
337338
}
338339

339-
private TimestampedException getTimestampedException(Throwable e) {
340+
private TimestampedException getTimestampedException(@Nullable Throwable e) {
340341
if (e == null) {
341342
throw new IllegalArgumentException("Provided exception does not contain a "
342343
+ TimestampedException.class.getSimpleName() + " cause.");
@@ -360,6 +361,7 @@ private long getOriginalTimestampHeaderLong(ConsumerRecord<?, ?> consumerRecord)
360361
: consumerRecord.timestamp();
361362
}
362363

364+
@Nullable
363365
private Header getOriginaTimeStampHeader(ConsumerRecord<?, ?> consumerRecord) {
364366
return consumerRecord.headers()
365367
.lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP);

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ public static class Properties {
137137

138138
private final long timeout;
139139

140+
@Nullable
140141
private final Boolean autoStartDltHandler;
141142

142143
/**

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.springframework.classify.BinaryExceptionClassifier;
2626
import org.springframework.kafka.core.KafkaOperations;
27+
import org.springframework.lang.Nullable;
2728
import org.springframework.util.StringUtils;
2829

2930
/**
@@ -60,6 +61,7 @@ public class DestinationTopicPropertiesFactory {
6061

6162
private final long timeout;
6263

64+
@Nullable
6365
private Boolean autoStartDltHandler;
6466

6567
public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List<Long> backOffValues,
@@ -89,7 +91,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
8991
* @return this factory.
9092
* @since 2.8
9193
*/
92-
public DestinationTopicPropertiesFactory autoStartDltHandler(Boolean autoStart) {
94+
public DestinationTopicPropertiesFactory autoStartDltHandler(@Nullable Boolean autoStart) {
9395
this.autoStartDltHandler = autoStart;
9496
return this;
9597
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,14 @@ private class RetryTopicListenerContainerFactoryDecorator
208208
implements KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<?, ?>> {
209209

210210
private final ConcurrentKafkaListenerContainerFactory<?, ?> delegate;
211+
211212
private final Configuration configuration;
213+
212214
private final boolean isSetContainerProperties;
213215

214216
RetryTopicListenerContainerFactoryDecorator(ConcurrentKafkaListenerContainerFactory<?, ?> delegate,
215-
Configuration configuration,
216-
boolean isSetContainerProperties) {
217+
Configuration configuration, boolean isSetContainerProperties) {
218+
217219
this.delegate = delegate;
218220
this.configuration = configuration;
219221
this.isSetContainerProperties = isSetContainerProperties;
@@ -226,6 +228,7 @@ private class RetryTopicListenerContainerFactoryDecorator
226228

227229
private ConcurrentMessageListenerContainer<?, ?> decorate(
228230
ConcurrentMessageListenerContainer<?, ?> listenerContainer) {
231+
229232
String mainListenerId = listenerContainer.getMainListenerId();
230233
if (mainListenerId == null) {
231234
mainListenerId = listenerContainer.getListenerId();

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryResolver.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,29 +81,31 @@ public ListenerContainerFactoryResolver(BeanFactory beanFactory) {
8181

8282
ConcurrentKafkaListenerContainerFactory<?, ?> resolveFactoryForMainEndpoint(
8383
@Nullable KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotationInstance,
84-
String defaultContainerFactoryBeanName,
85-
Configuration config) {
84+
String defaultContainerFactoryBeanName, Configuration config) {
85+
8686
KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation =
8787
getFactoryFromKLA(factoryFromKafkaListenerAnnotationInstance, defaultContainerFactoryBeanName);
8888
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = resolveFactory(this.mainEndpointResolvers,
8989
factoryFromKafkaListenerAnnotation, config);
90-
return this.mainEndpointCache.addIfAbsent(factoryFromKafkaListenerAnnotation, config, resolvedFactory);
90+
return this.mainEndpointCache.addIfAbsent(factoryFromKafkaListenerAnnotation, config, resolvedFactory); // NOSONAR
9191
}
9292

9393
ConcurrentKafkaListenerContainerFactory<?, ?> resolveFactoryForRetryEndpoint(
9494
@Nullable KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotationInstance,
95-
String defaultContainerFactoryBeanName,
96-
Configuration config) {
95+
String defaultContainerFactoryBeanName, Configuration config) {
96+
9797
KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation =
9898
getFactoryFromKLA(factoryFromKafkaListenerAnnotationInstance, defaultContainerFactoryBeanName);
9999
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = resolveFactory(this.retryEndpointResolvers,
100100
factoryFromKafkaListenerAnnotation, config);
101-
return this.retryEndpointCache.addIfAbsent(factoryFromKafkaListenerAnnotation, config, resolvedFactory);
101+
return this.retryEndpointCache.addIfAbsent(factoryFromKafkaListenerAnnotation, config, resolvedFactory); // NOSONAR
102102
}
103103

104104
@Nullable
105-
private KafkaListenerContainerFactory<?> getFactoryFromKLA(KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotationInstance,
106-
String defaultContainerFactoryBeanName) {
105+
private KafkaListenerContainerFactory<?> getFactoryFromKLA(
106+
@Nullable KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotationInstance,
107+
String defaultContainerFactoryBeanName) {
108+
107109
KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation =
108110
factoryFromKafkaListenerAnnotationInstance;
109111
if (factoryFromKafkaListenerAnnotation == null) {
@@ -113,7 +115,7 @@ private KafkaListenerContainerFactory<?> getFactoryFromKLA(KafkaListenerContaine
113115
}
114116

115117
private ConcurrentKafkaListenerContainerFactory<?, ?> resolveFactory(List<FactoryResolver> factoryResolvers,
116-
KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation,
118+
@Nullable KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation,
117119
Configuration config) {
118120

119121
ConcurrentKafkaListenerContainerFactory<?, ?> verifiedFactoryFromKafkaListenerAnnotation = verifyClass(
@@ -132,7 +134,9 @@ private KafkaListenerContainerFactory<?> getFactoryFromKLA(KafkaListenerContaine
132134
}
133135

134136
@Nullable
135-
private ConcurrentKafkaListenerContainerFactory<?, ?> verifyClass(KafkaListenerContainerFactory<?> fromKafkaListenerAnnotationFactory) {
137+
private ConcurrentKafkaListenerContainerFactory<?, ?> verifyClass(
138+
@Nullable KafkaListenerContainerFactory<?> fromKafkaListenerAnnotationFactory) {
139+
136140
return fromKafkaListenerAnnotationFactory != null
137141
&& ConcurrentKafkaListenerContainerFactory.class.isAssignableFrom(fromKafkaListenerAnnotationFactory.getClass())
138142
? (ConcurrentKafkaListenerContainerFactory<?, ?>) fromKafkaListenerAnnotationFactory

0 commit comments

Comments
 (0)