Skip to content

Commit c0eadcd

Browse files
committed
More jspeicify nullability changes in spring-kafka-test
1 parent 419b569 commit c0eadcd

File tree

7 files changed

+58
-57
lines changed

7 files changed

+58
-57
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -251,12 +251,7 @@ private void start() {
251251
}
252252

253253
createKafkaTopics(this.topics);
254-
if (this.brokerListProperty == null) {
255-
this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY);
256-
}
257-
if (this.brokerListProperty != null) {
258-
System.setProperty(this.brokerListProperty, getBrokersAsString());
259-
}
254+
System.setProperty(this.brokerListProperty, getBrokersAsString());
260255
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
261256
}
262257

@@ -275,7 +270,7 @@ private static void setConfigProperty(KafkaClusterTestKit.Builder clusterBuilder
275270

276271
@Override
277272
public void destroy() {
278-
AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
273+
AtomicReference<@Nullable Throwable> shutdownFailure = new AtomicReference<>();
279274
Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure);
280275
if (shutdownFailure.get() != null) {
281276
throw new IllegalStateException("Failed to shut down embedded Kafka cluster", shutdownFailure.get());
@@ -564,11 +559,11 @@ public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsT
564559
public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume) {
565560
List<String> notEmbedded = Arrays.stream(topicsToConsume)
566561
.filter(topic -> !this.topics.contains(topic))
567-
.collect(Collectors.toList());
562+
.toList();
568563
if (!notEmbedded.isEmpty()) {
569564
throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list");
570565
}
571-
final AtomicReference<Collection<TopicPartition>> assigned = new AtomicReference<>();
566+
final AtomicReference<@Nullable Collection<TopicPartition>> assigned = new AtomicReference<>();
572567
consumer.subscribe(Arrays.asList(topicsToConsume), new ConsumerRebalanceListener() {
573568

574569
@Override
@@ -586,18 +581,19 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
586581
while (assigned.get() == null && n++ < 600) { // NOSONAR magic #
587582
consumer.poll(Duration.ofMillis(100)); // force assignment NOSONAR magic #
588583
}
589-
if (assigned.get() != null) {
584+
Collection<TopicPartition> topicPartitions = assigned.get();
585+
if (topicPartitions != null) {
590586
LOGGER.debug(() -> "Partitions assigned "
591-
+ assigned.get()
587+
+ topicPartitions
592588
+ "; re-seeking to "
593589
+ (seekToEnd ? "end; " : "beginning"));
594590
if (seekToEnd) {
595-
consumer.seekToEnd(assigned.get());
591+
consumer.seekToEnd(topicPartitions);
596592
// seekToEnd is asynchronous. query the position to force the seek to happen now.
597-
assigned.get().forEach(consumer::position);
593+
topicPartitions.forEach(consumer::position);
598594
}
599595
else {
600-
consumer.seekToBeginning(assigned.get());
596+
consumer.seekToBeginning(topicPartitions);
601597
}
602598
}
603599
else {

spring-kafka-test/src/main/java/org/springframework/kafka/test/assertj/KafkaConditions.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
2020
import org.apache.kafka.common.record.TimestampType;
2121
import org.assertj.core.api.Condition;
22+
import org.jspecify.annotations.Nullable;
2223

2324
/**
2425
* AssertJ custom {@link Condition}s.
@@ -92,15 +93,16 @@ public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) {
9293

9394
public static class ConsumerRecordKeyCondition<K> extends Condition<ConsumerRecord<K, ?>> {
9495

95-
private final K key;
96+
private final @Nullable K key;
9697

9798
public ConsumerRecordKeyCondition(K key) {
9899
super("a ConsumerRecord with 'key' " + key);
99100
this.key = key;
100101
}
101102

102103
@Override
103-
public boolean matches(ConsumerRecord<K, ?> value) {
104+
@SuppressWarnings("NullAway") // Dataflow analysis limitation
105+
public boolean matches(@Nullable ConsumerRecord<@Nullable K, ?> value) {
104106
if (value == null) {
105107
return false;
106108
}
@@ -113,15 +115,16 @@ public boolean matches(ConsumerRecord<K, ?> value) {
113115

114116
public static class ConsumerRecordValueCondition<V> extends Condition<ConsumerRecord<?, V>> {
115117

116-
private final V payload;
118+
private final @Nullable V payload;
117119

118120
public ConsumerRecordValueCondition(V payload) {
119121
super("a ConsumerRecord with 'value' " + payload);
120122
this.payload = payload;
121123
}
122124

123125
@Override
124-
public boolean matches(ConsumerRecord<?, V> value) {
126+
@SuppressWarnings("NullAway") // Dataflow analysis limitation
127+
public boolean matches(@Nullable ConsumerRecord<?, @Nullable V> value) {
125128
if (value == null) {
126129
return false;
127130
}
@@ -164,7 +167,7 @@ public ConsumerRecordTimestampCondition(TimestampType type, long ts) {
164167
}
165168

166169
@Override
167-
public boolean matches(ConsumerRecord<?, ?> value) {
170+
public boolean matches(@Nullable ConsumerRecord<?, ?> value) {
168171
return value != null &&
169172
(value.timestampType() == this.type && value.timestamp() == this.ts);
170173
}
@@ -181,7 +184,7 @@ public ConsumerRecordPartitionCondition(int partition) {
181184
}
182185

183186
@Override
184-
public boolean matches(ConsumerRecord<?, ?> value) {
187+
public boolean matches(@Nullable ConsumerRecord<?, ?> value) {
185188
return value != null && value.partition() == this.partition;
186189
}
187190

spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2024 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.Optional;
2222

23+
import org.jspecify.annotations.Nullable;
2324
import org.junit.jupiter.api.extension.AfterAllCallback;
2425
import org.junit.jupiter.api.extension.ConditionEvaluationResult;
2526
import org.junit.jupiter.api.extension.ExecutionCondition;
@@ -54,7 +55,7 @@ public class EmbeddedKafkaCondition implements ExecutionCondition, AfterAllCallb
5455

5556
private static final String EMBEDDED_BROKER = "embedded-kafka";
5657

57-
private static final ThreadLocal<EmbeddedKafkaBroker> BROKERS = new ThreadLocal<>();
58+
private static final ThreadLocal<@Nullable EmbeddedKafkaBroker> BROKERS = new ThreadLocal<>();
5859

5960
@Override
6061
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
@@ -117,9 +118,11 @@ private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
117118
return EmbeddedKafkaBrokerFactory.create(embedded);
118119
}
119120

120-
private EmbeddedKafkaBroker getBrokerFromStore(ExtensionContext context) {
121-
EmbeddedKafkaBroker embeddedKafkaBrokerFromParentStore =
122-
getParentStore(context)
121+
@SuppressWarnings("NullAway") // Dataflow analysis limitation.
122+
private @Nullable EmbeddedKafkaBroker getBrokerFromStore(ExtensionContext context) {
123+
Store parentStore = getParentStore(context);
124+
EmbeddedKafkaBroker embeddedKafkaBrokerFromParentStore = parentStore == null ? null :
125+
parentStore
123126
.get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class);
124127
return embeddedKafkaBrokerFromParentStore == null
125128
? getStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class)
@@ -130,12 +133,12 @@ private Store getStore(ExtensionContext context) {
130133
return context.getStore(Namespace.create(getClass(), context));
131134
}
132135

133-
private Store getParentStore(ExtensionContext context) {
134-
ExtensionContext parent = context.getParent().get();
135-
return parent.getStore(Namespace.create(getClass(), parent));
136+
private @Nullable Store getParentStore(ExtensionContext context) {
137+
ExtensionContext parent = context.getParent().orElse(null);
138+
return parent == null ? null : parent.getStore(Namespace.create(getClass(), parent));
136139
}
137140

138-
public static EmbeddedKafkaBroker getBroker() {
141+
public static @Nullable EmbeddedKafkaBroker getBroker() {
139142
return BROKERS.get();
140143
}
141144

spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/KafkaMatchers.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import org.hamcrest.Description;
2222
import org.hamcrest.DiagnosingMatcher;
2323
import org.hamcrest.Matcher;
24+
import org.jspecify.annotations.Nullable;
2425

2526
/**
2627
* Hamcrest {@link Matcher}s utilities.
@@ -90,7 +91,7 @@ public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) {
9091
public static class ConsumerRecordKeyMatcher<K>
9192
extends DiagnosingMatcher<ConsumerRecord<K, ?>> {
9293

93-
private final K key;
94+
private final @Nullable K key;
9495

9596
public ConsumerRecordKeyMatcher(K key) {
9697
this.key = key;
@@ -99,13 +100,14 @@ public ConsumerRecordKeyMatcher(K key) {
99100
@Override
100101
public void describeTo(Description description) {
101102
description.appendText("a ConsumerRecord with key ")
102-
.appendText(this.key.toString());
103+
.appendText(this.key == null ? "null" : this.key.toString());
103104
}
104105

105106
@Override
106-
protected boolean matches(Object item, Description mismatchDescription) {
107+
@SuppressWarnings("NullAway") // Dataflow analysis limitation
108+
protected boolean matches(@Nullable Object item, Description mismatchDescription) {
107109
@SuppressWarnings(UNCHECKED)
108-
ConsumerRecord<K, Object> record = (ConsumerRecord<K, Object>) item;
110+
ConsumerRecord<@Nullable K, Object> record = (ConsumerRecord<K, Object>) item;
109111
boolean matches = record != null
110112
&& ((record.key() == null && this.key == null)
111113
|| record.key().equals(this.key));
@@ -132,7 +134,7 @@ public void describeTo(Description description) {
132134
}
133135

134136
@Override
135-
protected boolean matches(Object item, Description mismatchDescription) {
137+
protected boolean matches(@Nullable Object item, Description mismatchDescription) {
136138
@SuppressWarnings(UNCHECKED)
137139
ConsumerRecord<Object, V> record = (ConsumerRecord<Object, V>) item;
138140
boolean matches = record != null && record.value().equals(this.payload);
@@ -159,7 +161,7 @@ public void describeTo(Description description) {
159161
}
160162

161163
@Override
162-
protected boolean matches(Object item, Description mismatchDescription) {
164+
protected boolean matches(@Nullable Object item, Description mismatchDescription) {
163165
@SuppressWarnings(UNCHECKED)
164166
ConsumerRecord<Object, Object> record = (ConsumerRecord<Object, Object>) item;
165167
boolean matches = record != null && record.partition() == this.partition;
@@ -183,7 +185,7 @@ public ConsumerRecordTimestampMatcher(TimestampType type, long ts) {
183185
}
184186

185187
@Override
186-
protected boolean matches(Object item, Description mismatchDescription) {
188+
protected boolean matches(@Nullable Object item, Description mismatchDescription) {
187189
@SuppressWarnings(UNCHECKED)
188190
ConsumerRecord<Object, Object> record = (ConsumerRecord<Object, Object>) item;
189191

spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,8 @@ private int[] ports(String ports) {
165165

166166
@Override
167167
public void testPlanExecutionFinished(TestPlan testPlan) {
168-
if (this.embeddedKafkaBroker != null) {
169-
this.embeddedKafkaBroker.destroy();
170-
this.logger.info("Stopped global Embedded Kafka.");
171-
}
168+
this.embeddedKafkaBroker.destroy();
169+
this.logger.info("Stopped global Embedded Kafka.");
172170
}
173171

174172
}

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/ContainerTestUtils.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.List;
2323
import java.util.concurrent.atomic.AtomicReference;
2424

25+
import org.jspecify.annotations.Nullable;
26+
2527
import org.springframework.util.Assert;
2628
import org.springframework.util.ReflectionUtils;
2729

@@ -119,14 +121,13 @@ private static void waitForSingleContainerAssignment(Object container, int parti
119121
}
120122

121123
private static Method getAssignedPartitionsMethod(Class<?> clazz) {
122-
final AtomicReference<Method> theMethod = new AtomicReference<Method>();
124+
final AtomicReference<@Nullable Method> theMethod = new AtomicReference<>();
123125
ReflectionUtils.doWithMethods(clazz,
124-
method -> theMethod.set(method),
126+
theMethod::set,
125127
method -> method.getName().equals("getAssignedPartitions") && method.getParameterTypes().length == 0);
126-
if (theMethod.get() == null) {
127-
throw new IllegalStateException(clazz + " has no getAssignedPartitions() method");
128-
}
129-
return theMethod.get();
128+
Method method = theMethod.get();
129+
Assert.state(method != null, "No getAssignedPartitions() method");
130+
return method;
130131
}
131132

132133
private static class ContainerTestUtilsException extends RuntimeException {

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,11 @@ public final class KafkaTestUtils {
6666

6767
private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(KafkaTestUtils.class)); // NOSONAR
6868

69-
@SuppressWarnings("NullAway.Init")
70-
private static Properties defaults;
69+
private static final Properties defaults = new Properties();
70+
71+
static {
72+
defaults.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
73+
}
7174

7275
private KafkaTestUtils() {
7376
}
@@ -195,7 +198,7 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
195198
reset.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), tp -> rec.offset());
196199
}
197200
});
198-
reset.forEach((tp, off) -> consumer.seek(tp, off));
201+
reset.forEach(consumer::seek);
199202
try {
200203
Thread.sleep(50); // NOSONAR magic#
201204
}
@@ -299,7 +302,7 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
299302
* @see Consumer#endOffsets(Collection, Duration)
300303
*/
301304
public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, String topic,
302-
Integer... partitions) {
305+
Integer @Nullable ... partitions) {
303306

304307
Collection<TopicPartition> tps;
305308
if (partitions == null || partitions.length == 0) {
@@ -440,11 +443,6 @@ else if (i == tokens.length - 1) {
440443
* @since 2.2.5
441444
*/
442445
public static Properties defaultPropertyOverrides() {
443-
if (defaults == null) {
444-
Properties props = new Properties();
445-
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
446-
defaults = props;
447-
}
448446
return defaults;
449447
}
450448
}

0 commit comments

Comments
 (0)