Skip to content

Commit 564b1c1

Browse files
committed
Apply code changes from code review
1 parent 2339c93 commit 564b1c1

9 files changed

+23
-31
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.commons.logging.Log;
2323
import org.apache.commons.logging.LogFactory;
24-
import org.jspecify.annotations.Nullable;
2524

2625
import org.springframework.beans.BeansException;
2726
import org.springframework.beans.factory.BeanFactory;
@@ -119,7 +118,6 @@ protected Map<String, Object> aggregateHeaders(MessageGroup group) {
119118
return getHeadersFunction().apply(group);
120119
}
121120

122-
@Nullable
123121
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
124122

125123
}

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import org.springframework.messaging.MessageHandlingException;
6666
import org.springframework.messaging.core.DestinationResolutionException;
6767
import org.springframework.messaging.support.GenericMessage;
68-
import org.springframework.scheduling.TaskScheduler;
6968
import org.springframework.util.Assert;
7069
import org.springframework.util.CollectionUtils;
7170
import org.springframework.util.ObjectUtils;
@@ -652,10 +651,8 @@ protected boolean isExpireGroupsUponCompletion() {
652651
}
653652

654653
private void removeEmptyGroupAfterTimeout(UUID groupId, long timeout) {
655-
TaskScheduler taskScheduler = getTaskScheduler();
656-
Assert.notNull(taskScheduler, "'taskScheduler' must not be null");
657654
ScheduledFuture<?> scheduledFuture =
658-
taskScheduler
655+
getTaskScheduler()
659656
.schedule(() -> {
660657
Lock lock = this.lockRegistry.obtain(groupId.toString());
661658

@@ -712,10 +709,8 @@ else if ((Long) groupTimeout > 0) {
712709
Object groupId = messageGroup.getGroupId();
713710
long timestamp = messageGroup.getTimestamp();
714711
long lastModified = messageGroup.getLastModified();
715-
TaskScheduler taskScheduler = getTaskScheduler();
716-
Assert.notNull(taskScheduler, "'taskScheduler' must not be null");
717712
ScheduledFuture<?> scheduledFuture =
718-
taskScheduler
713+
getTaskScheduler()
719714
.schedule(() -> {
720715
try {
721716
processForceRelease(groupId, timestamp, lastModified);
@@ -1039,9 +1034,7 @@ public void start() {
10391034
if (this.expireTimeout > 0) {
10401035
purgeOrphanedGroups();
10411036
if (this.expireDuration != null) {
1042-
TaskScheduler taskScheduler = getTaskScheduler();
1043-
Assert.notNull(taskScheduler, "'taskScheduler' must not be null");
1044-
taskScheduler
1037+
getTaskScheduler()
10451038
.scheduleWithFixedDelay(this::purgeOrphanedGroups, this.expireDuration);
10461039
}
10471040
}

spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818

1919
import java.util.Map;
2020

21-
import org.jspecify.annotations.Nullable;
22-
2321
import org.springframework.beans.factory.BeanFactory;
2422
import org.springframework.core.convert.ConversionService;
2523
import org.springframework.integration.store.MessageGroup;
24+
import org.springframework.util.Assert;
2625

2726
/**
2827
* A {@link MessageGroupProcessor} implementation that evaluates a SpEL expression. The SpEL context root is the list of
@@ -60,9 +59,10 @@ public void setExpectedType(Class<?> expectedType) {
6059
* {@link org.springframework.integration.core.MessagingTemplate} to send downstream.
6160
*/
6261
@Override
63-
@Nullable
6462
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
65-
return this.processor.process(group.getMessages());
63+
Object object = this.processor.process(group.getMessages());
64+
Assert.notNull(object, "Result from processor must not be null");
65+
return object;
6666
}
6767

6868
}

spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageListProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,10 @@ public void setExpectedType(Class<?> expectedType) {
104104
* evaluation result Object will be returned.
105105
*/
106106
@Override
107-
@Nullable
108107
public Object process(Collection<? extends Message<?>> messages) {
109-
return this.evaluateExpression(this.expression, messages, this.expectedType);
108+
Object object = this.evaluateExpression(this.expression, messages, this.expectedType);
109+
Assert.state(object != null, "Failed to evaluate expression: " + this.expression);
110+
return object;
110111
}
111112

112113
}

spring-integration-core/src/main/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.aggregator;
1818

1919
import java.time.Duration;
20+
import java.util.Objects;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.function.Function;
2223
import java.util.function.Predicate;
@@ -67,8 +68,7 @@ public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandle
6768
@Nullable
6869
private Predicate<Message<?>> boundaryTrigger;
6970

70-
@Nullable
71-
public Function<Message<?>, Integer> windowSizeFunction = FluxAggregatorMessageHandler::sequenceSizeHeader;
71+
private Function<Message<?>, @Nullable Integer> windowSizeFunction = FluxAggregatorMessageHandler::sequenceSizeHeader;
7272

7373
@Nullable
7474
private Function<Flux<Message<?>>, Flux<Flux<Message<?>>>> windowConfigurer;
@@ -203,7 +203,7 @@ public void setWindowSize(int windowSize) {
203203
* @see Flux#window(int)
204204
* @see Flux#windowTimeout(int, Duration)
205205
*/
206-
public void setWindowSizeFunction(Function<Message<?>, Integer> windowSizeFunction) {
206+
public void setWindowSizeFunction(Function<Message<?>, @Nullable Integer> windowSizeFunction) {
207207
Assert.notNull(windowSizeFunction, "'windowSizeFunction' must not be null");
208208
this.windowSizeFunction = windowSizeFunction;
209209
}
@@ -289,9 +289,9 @@ private Mono<Message<?>> messageForWindowFlux(Flux<Message<?>> messageFlux) {
289289
.build());
290290
}
291291

292-
@NullUnmarked
292+
293293
private static Integer sequenceSizeHeader(Message<?> message) {
294-
return message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, Integer.class);
294+
return Objects.requireNonNull(message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, Integer.class));
295295
}
296296

297297
}

spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageListProcessor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
@FunctionalInterface
3030
public interface MessageListProcessor {
3131

32-
@Nullable
3332
Object process(Collection<? extends Message<?>> messages);
3433

3534
}

spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@
2020
import java.util.Collection;
2121
import java.util.Map;
2222

23-
import org.jspecify.annotations.Nullable;
24-
2523
import org.springframework.beans.factory.BeanFactory;
2624
import org.springframework.core.convert.ConversionService;
2725
import org.springframework.integration.annotation.Aggregator;
2826
import org.springframework.integration.store.MessageGroup;
2927
import org.springframework.integration.support.management.ManageableLifecycle;
3028
import org.springframework.messaging.Message;
29+
import org.springframework.util.Assert;
3130

3231
/**
3332
* MessageGroupProcessor that serves as an adapter for the invocation of a POJO method.
@@ -87,10 +86,11 @@ public void setBeanFactory(BeanFactory beanFactory) {
8786
}
8887

8988
@Override
90-
@Nullable
9189
protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
9290
final Collection<Message<?>> messagesUpForProcessing = group.getMessages();
93-
return this.processor.process(messagesUpForProcessing, headers);
91+
Object object = this.processor.process(messagesUpForProcessing, headers);
92+
Assert.notNull(object, "Result from processor must not be null");
93+
return object;
9494
}
9595

9696
@Override

spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,10 @@ public static MessageChannel getErrorChannel(BeanFactory beanFactory) {
140140
* @param beanFactory BeanFactory for lookup, must not be null.
141141
* @return The {@link TaskScheduler} bean whose name is "taskScheduler" if available.
142142
*/
143-
@Nullable
144143
public static TaskScheduler getTaskScheduler(BeanFactory beanFactory) {
145-
return getBeanOfType(beanFactory, TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class);
144+
TaskScheduler taskScheduler = getBeanOfType(beanFactory, TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class);
145+
Assert.state(taskScheduler != null, "No such bean '" + TASK_SCHEDULER_BEAN_NAME + "'");
146+
return taskScheduler;
146147
}
147148

148149
/**

spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,11 +280,11 @@ public void setTaskScheduler(TaskScheduler taskScheduler) {
280280
this.taskScheduler = taskScheduler;
281281
}
282282

283-
@Nullable
284283
protected TaskScheduler getTaskScheduler() {
285284
if (this.taskScheduler == null && this.beanFactory != null) {
286285
this.taskScheduler = IntegrationContextUtils.getTaskScheduler(this.beanFactory);
287286
}
287+
Assert.notNull(this.taskScheduler, "'taskScheduler' must not be null");
288288
return this.taskScheduler;
289289
}
290290

0 commit comments

Comments
 (0)