Skip to content

Commit fd36ae3

Browse files
committed
Fix nullability in the FluxAggregatorMessageHandler
* Make `windowSizeFunction` as `Function<Message<?>, @nullable Integer>` because `sequenceSizeHeader()` may return `null` from message headers * Extract local `subscriptionToDispose` in the `stop()` to satisfy null check context * Use `Objects.requireNonNull(signal.get())` to satisfy `Function.apply()` contract. The `if (signal.hasValue()) {` does the trick for us, but currently that is not visible for that `signal.get()` * Remove `@NullUnmarked` since we have just mitigated all the null problems
1 parent 2339c93 commit fd36ae3

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandler.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
package org.springframework.integration.aggregator;
1818

1919
import java.time.Duration;
20+
import java.util.Objects;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.function.Function;
2223
import java.util.function.Predicate;
2324

24-
import org.jspecify.annotations.NullUnmarked;
2525
import org.jspecify.annotations.Nullable;
2626
import reactor.core.Disposable;
2727
import reactor.core.publisher.Flux;
@@ -67,8 +67,8 @@ public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandle
6767
@Nullable
6868
private Predicate<Message<?>> boundaryTrigger;
6969

70-
@Nullable
71-
public Function<Message<?>, Integer> windowSizeFunction = FluxAggregatorMessageHandler::sequenceSizeHeader;
70+
private Function<Message<?>, @Nullable Integer> windowSizeFunction =
71+
FluxAggregatorMessageHandler::sequenceSizeHeader;
7272

7373
@Nullable
7474
private Function<Flux<Message<?>>, Flux<Flux<Message<?>>>> windowConfigurer;
@@ -109,7 +109,6 @@ private Flux<Message<?>> releaseBy(Flux<Message<?>> groupFlux) {
109109
.flatMap((windowFlux) -> windowFlux.transform(this.combineFunction));
110110
}
111111

112-
@NullUnmarked
113112
private Flux<Flux<Message<?>>> applyWindowOptions(Flux<Message<?>> groupFlux) {
114113
if (this.boundaryTrigger != null) {
115114
return groupFlux.windowUntil(this.boundaryTrigger);
@@ -118,7 +117,7 @@ private Flux<Flux<Message<?>>> applyWindowOptions(Flux<Message<?>> groupFlux) {
118117
.switchOnFirst((signal, group) -> {
119118
if (signal.hasValue()) {
120119
Assert.notNull(this.windowSizeFunction, "'windowSizeFunction' must not be null");
121-
Integer maxSize = this.windowSizeFunction.apply(signal.get());
120+
Integer maxSize = this.windowSizeFunction.apply(Objects.requireNonNull(signal.get()));
122121
if (maxSize != null) {
123122
if (this.windowTimespan != null) {
124123
return group.windowTimeout(maxSize, this.windowTimespan);
@@ -203,7 +202,7 @@ public void setWindowSize(int windowSize) {
203202
* @see Flux#window(int)
204203
* @see Flux#windowTimeout(int, Duration)
205204
*/
206-
public void setWindowSizeFunction(Function<Message<?>, Integer> windowSizeFunction) {
205+
public void setWindowSizeFunction(Function<Message<?>, @Nullable Integer> windowSizeFunction) {
207206
Assert.notNull(windowSizeFunction, "'windowSizeFunction' must not be null");
208207
this.windowSizeFunction = windowSizeFunction;
209208
}
@@ -255,8 +254,9 @@ public void start() {
255254

256255
@Override
257256
public void stop() {
258-
if (this.subscribed.compareAndSet(true, false) && this.subscription != null) {
259-
this.subscription.dispose();
257+
Disposable subscriptionToDispose = this.subscription;
258+
if (this.subscribed.compareAndSet(true, false) && subscriptionToDispose != null) {
259+
subscriptionToDispose.dispose();
260260
}
261261
}
262262

@@ -289,8 +289,7 @@ private Mono<Message<?>> messageForWindowFlux(Flux<Message<?>> messageFlux) {
289289
.build());
290290
}
291291

292-
@NullUnmarked
293-
private static Integer sequenceSizeHeader(Message<?> message) {
292+
private static @Nullable Integer sequenceSizeHeader(Message<?> message) {
294293
return message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, Integer.class);
295294
}
296295

0 commit comments

Comments
 (0)