Skip to content

Commit 2339c93

Browse files
committed
Add nullability to org.springframework.aggregator & org.springframework.acks packages
Note in this commit that there cases where methods are called where the parameters passed in may contain a null. In those cases the attribute that contains the potential null is tested with an `Assert.notNull`. This may not be the best approach, let's discuss. Also note that in FluxAggregatorMessageHandler line 112 `@NullUnmarked` is used on the the applyWindowOptions method. This is because the consumer.apply on line 121 can not be null according to JSpecify. But we can't make that assumption for how the user implemented the fuction. There maybe a better way of handling this besides marking the method as `NullUnmarked`. Let's discuss. Similarly in FluxAggregatorMessageHandler the way in which sequenceSizeHeader is utilized it had to be `@NullUnmarked`. Let's discuss Make sure all annotations are applied consistently
1 parent 1b9ee5d commit 2339c93

24 files changed

+130
-43
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
22
* Provides classes related to message acknowledgment.
33
*/
4-
@org.springframework.lang.NonNullApi
4+
@org.jspecify.annotations.NullMarked
55
package org.springframework.integration.acks;

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121

2222
import org.apache.commons.logging.Log;
2323
import org.apache.commons.logging.LogFactory;
24+
import org.jspecify.annotations.Nullable;
2425

2526
import org.springframework.beans.BeansException;
2627
import org.springframework.beans.factory.BeanFactory;
@@ -56,6 +57,7 @@ public abstract class AbstractAggregatingMessageGroupProcessor implements Messag
5657

5758
private boolean messageBuilderFactorySet;
5859

60+
@SuppressWarnings("NullAway.Init")
5961
private BeanFactory beanFactory;
6062

6163
@Override
@@ -117,6 +119,7 @@ protected Map<String, Object> aggregateHeaders(MessageGroup group) {
117119
return getHeadersFunction().apply(group);
118120
}
119121

122+
@Nullable
120123
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
121124

122125
}

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.function.BiFunction;
3333

3434
import org.aopalliance.aop.Advice;
35+
import org.jspecify.annotations.Nullable;
3536

3637
import org.springframework.aop.framework.ProxyFactory;
3738
import org.springframework.beans.factory.BeanFactory;
@@ -58,13 +59,13 @@
5859
import org.springframework.integration.support.locks.LockRegistry;
5960
import org.springframework.integration.support.management.ManageableLifecycle;
6061
import org.springframework.integration.util.UUIDConverter;
61-
import org.springframework.lang.Nullable;
6262
import org.springframework.messaging.Message;
6363
import org.springframework.messaging.MessageChannel;
6464
import org.springframework.messaging.MessageDeliveryException;
6565
import org.springframework.messaging.MessageHandlingException;
6666
import org.springframework.messaging.core.DestinationResolutionException;
6767
import org.springframework.messaging.support.GenericMessage;
68+
import org.springframework.scheduling.TaskScheduler;
6869
import org.springframework.util.Assert;
6970
import org.springframework.util.CollectionUtils;
7071
import org.springframework.util.ObjectUtils;
@@ -125,8 +126,10 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
125126

126127
private boolean releaseStrategySet;
127128

129+
@SuppressWarnings("NullAway.Init")
128130
private MessageChannel discardChannel;
129131

132+
@Nullable
130133
private String discardChannelName;
131134

132135
private boolean sendPartialResultOnExpiry;
@@ -143,18 +146,23 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
143146

144147
private boolean releasePartialSequences;
145148

149+
@Nullable
146150
private Expression groupTimeoutExpression;
147151

152+
@Nullable
148153
private List<Advice> forceReleaseAdviceChain;
149154

150155
private long expireTimeout;
151156

157+
@Nullable
152158
private Duration expireDuration;
153159

154160
private MessageGroupProcessor forceReleaseProcessor = new ForceReleaseMessageGroupProcessor();
155161

162+
@SuppressWarnings("NullAway.Init")
156163
private EvaluationContext evaluationContext;
157164

165+
@Nullable
158166
private ApplicationEventPublisher applicationEventPublisher;
159167

160168
private boolean expireGroupsUponTimeout = true;
@@ -165,10 +173,11 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
165173

166174
private volatile boolean running;
167175

176+
@Nullable
168177
private BiFunction<Message<?>, String, String> groupConditionSupplier;
169178

170179
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
171-
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
180+
@Nullable CorrelationStrategy correlationStrategy, @Nullable ReleaseStrategy releaseStrategy) {
172181

173182
Assert.notNull(processor, "'processor' must not be null");
174183
Assert.notNull(store, "'store' must not be null");
@@ -504,6 +513,7 @@ public MessageChannel getDiscardChannel() {
504513
return this.discardChannel;
505514
}
506515

516+
@Nullable
507517
protected String getDiscardChannelName() {
508518
return this.discardChannelName;
509519
}
@@ -532,6 +542,7 @@ protected boolean isReleasePartialSequences() {
532542
return this.releasePartialSequences;
533543
}
534544

545+
@Nullable
535546
protected Expression getGroupTimeoutExpression() {
536547
return this.groupTimeoutExpression;
537548
}
@@ -641,8 +652,10 @@ protected boolean isExpireGroupsUponCompletion() {
641652
}
642653

643654
private void removeEmptyGroupAfterTimeout(UUID groupId, long timeout) {
655+
TaskScheduler taskScheduler = getTaskScheduler();
656+
Assert.notNull(taskScheduler, "'taskScheduler' must not be null");
644657
ScheduledFuture<?> scheduledFuture =
645-
getTaskScheduler()
658+
taskScheduler
646659
.schedule(() -> {
647660
Lock lock = this.lockRegistry.obtain(groupId.toString());
648661

@@ -699,8 +712,10 @@ else if ((Long) groupTimeout > 0) {
699712
Object groupId = messageGroup.getGroupId();
700713
long timestamp = messageGroup.getTimestamp();
701714
long lastModified = messageGroup.getLastModified();
715+
TaskScheduler taskScheduler = getTaskScheduler();
716+
Assert.notNull(taskScheduler, "'taskScheduler' must not be null");
702717
ScheduledFuture<?> scheduledFuture =
703-
getTaskScheduler()
718+
taskScheduler
704719
.schedule(() -> {
705720
try {
706721
processForceRelease(groupId, timestamp, lastModified);
@@ -753,7 +768,7 @@ private void discardMessage(Message<?> message) {
753768
* @param group The group.
754769
* @param completedMessages The completed messages.
755770
*/
756-
protected abstract void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages);
771+
protected abstract void afterRelease(MessageGroup group, @Nullable Collection<Message<?>> completedMessages);
757772

758773
/**
759774
* Subclasses may override if special action is needed because the group was released or discarded
@@ -912,14 +927,12 @@ protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock)
912927
}
913928

914929
protected void completeGroup(Object correlationKey, MessageGroup group, Lock lock) {
915-
Message<?> first = null;
916-
if (group != null) {
917-
first = group.getOne();
918-
}
930+
Message<?> first = group.getOne();
919931
completeGroup(first, correlationKey, group, lock);
920932
}
921933

922934
@SuppressWarnings("unchecked")
935+
@Nullable
923936
protected Collection<Message<?>> completeGroup(Message<?> message, Object correlationKey, MessageGroup group,
924937
Lock lock) {
925938

@@ -929,6 +942,7 @@ protected Collection<Message<?>> completeGroup(Message<?> message, Object correl
929942
this.logger.debug(() -> "Completing group with correlationKey [" + correlationKey + "]");
930943

931944
result = this.outputProcessor.processMessageGroup(group);
945+
Assert.notNull(result, "the group returned a null result");
932946
if (isResultCollectionOfMessages(result)) {
933947
partialSequence = (Collection<Message<?>>) result;
934948
}
@@ -988,6 +1002,7 @@ private static boolean isResultCollectionOfMessages(Object result) {
9881002
return false;
9891003
}
9901004

1005+
@Nullable
9911006
protected Object obtainGroupTimeout(MessageGroup group) {
9921007
if (this.groupTimeoutExpression != null) {
9931008
Object timeout = this.groupTimeoutExpression.getValue(this.evaluationContext, group);
@@ -1024,7 +1039,9 @@ public void start() {
10241039
if (this.expireTimeout > 0) {
10251040
purgeOrphanedGroups();
10261041
if (this.expireDuration != null) {
1027-
getTaskScheduler()
1042+
TaskScheduler taskScheduler = getTaskScheduler();
1043+
Assert.notNull(taskScheduler, "'taskScheduler' must not be null");
1044+
taskScheduler
10281045
.scheduleWithFixedDelay(this::purgeOrphanedGroups, this.expireDuration);
10291046
}
10301047
}
@@ -1062,6 +1079,7 @@ public void purgeOrphanedGroups() {
10621079

10631080
protected static class SequenceAwareMessageGroup extends SimpleMessageGroup {
10641081

1082+
@Nullable
10651083
private final SimpleMessageGroup sourceGroup;
10661084

10671085
public SequenceAwareMessageGroup(MessageGroup messageGroup) {
@@ -1124,6 +1142,7 @@ private class ForceReleaseMessageGroupProcessor implements MessageGroupProcessor
11241142
}
11251143

11261144
@Override
1145+
@Nullable
11271146
public Object processMessageGroup(MessageGroup group) {
11281147
forceComplete(group);
11291148
return null;

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@
1818

1919
import java.util.Collection;
2020

21+
import org.jspecify.annotations.Nullable;
22+
2123
import org.springframework.integration.IntegrationPatternType;
2224
import org.springframework.integration.store.MessageGroup;
2325
import org.springframework.integration.store.MessageGroupStore;
@@ -95,7 +97,7 @@ protected boolean shouldSplitOutput(Iterable<?> reply) {
9597
* @param completedMessages The completed messages. Ignored in this implementation.
9698
*/
9799
@Override
98-
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {
100+
protected void afterRelease(MessageGroup messageGroup, @Nullable Collection<Message<?>> completedMessages) {
99101
Object groupId = messageGroup.getGroupId();
100102
MessageGroupStore messageStore = getMessageStore();
101103
messageStore.completeGroup(groupId);

spring-integration-core/src/main/java/org/springframework/integration/aggregator/BarrierMessageHandler.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,13 +21,14 @@
2121
import java.util.concurrent.SynchronousQueue;
2222
import java.util.concurrent.TimeUnit;
2323

24+
import org.jspecify.annotations.Nullable;
25+
2426
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2527
import org.springframework.integration.IntegrationPatternType;
2628
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
2729
import org.springframework.integration.handler.DiscardingMessageHandler;
2830
import org.springframework.integration.handler.MessageTriggerAction;
2931
import org.springframework.integration.store.SimpleMessageGroup;
30-
import org.springframework.lang.Nullable;
3132
import org.springframework.messaging.Message;
3233
import org.springframework.messaging.MessageChannel;
3334
import org.springframework.messaging.MessageHandlingException;
@@ -67,8 +68,10 @@ public class BarrierMessageHandler extends AbstractReplyProducingMessageHandler
6768

6869
private final MessageGroupProcessor messageGroupProcessor;
6970

71+
@Nullable
7072
private String discardChannelName;
7173

74+
@Nullable
7275
private MessageChannel discardChannel;
7376

7477
/**
@@ -158,7 +161,7 @@ public BarrierMessageHandler(long requestTimeout, long triggerTimeout, Correlati
158161
* @since 5.4
159162
*/
160163
public BarrierMessageHandler(long requestTimeout, long triggerTimeout, MessageGroupProcessor outputProcessor,
161-
CorrelationStrategy correlationStrategy) {
164+
@Nullable CorrelationStrategy correlationStrategy) {
162165

163166
Assert.notNull(outputProcessor, "'messageGroupProcessor' cannot be null");
164167
this.messageGroupProcessor = outputProcessor;
@@ -217,6 +220,7 @@ public IntegrationPatternType getIntegrationPatternType() {
217220
}
218221

219222
@Override
223+
@Nullable
220224
protected Object handleRequestMessage(Message<?> requestMessage) {
221225
Object key = this.correlationStrategy.getCorrelationKey(requestMessage);
222226
if (key == null) {
@@ -246,6 +250,7 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
246250
return null;
247251
}
248252

253+
@Nullable
249254
private Object processRelease(Object key, Message<?> requestMessage, Message<?> releaseMessage) {
250255
this.suspensions.remove(key);
251256
if (releaseMessage.getPayload() instanceof Throwable) {
@@ -265,6 +270,7 @@ private Object processRelease(Object key, Message<?> requestMessage, Message<?>
265270
* @param releaseMessage the release message.
266271
* @return the result.
267272
*/
273+
@Nullable
268274
protected Object buildResult(Object key, Message<?> requestMessage, Message<?> releaseMessage) {
269275
SimpleMessageGroup group = new SimpleMessageGroup(key);
270276
group.add(requestMessage);

spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelatingMessageBarrier.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,13 +20,16 @@
2020
import java.util.concurrent.ConcurrentHashMap;
2121
import java.util.concurrent.ConcurrentMap;
2222

23+
import org.jspecify.annotations.Nullable;
24+
2325
import org.springframework.core.log.LogMessage;
2426
import org.springframework.integration.core.MessageSource;
2527
import org.springframework.integration.handler.AbstractMessageHandler;
2628
import org.springframework.integration.store.MessageGroup;
2729
import org.springframework.integration.store.MessageGroupStore;
2830
import org.springframework.integration.store.SimpleMessageStore;
2931
import org.springframework.messaging.Message;
32+
import org.springframework.util.Assert;
3033

3134
/**
3235
* This Endpoint serves as a barrier for messages that should not be processed yet. The decision when a message can be
@@ -58,8 +61,10 @@ public class CorrelatingMessageBarrier extends AbstractMessageHandler implements
5861

5962
private final MessageGroupStore store;
6063

64+
@Nullable
6165
private CorrelationStrategy correlationStrategy;
6266

67+
@Nullable
6368
private ReleaseStrategy releaseStrategy;
6469

6570
public CorrelatingMessageBarrier() {
@@ -88,7 +93,9 @@ public void setReleaseStrategy(ReleaseStrategy releaseStrategy) {
8893

8994
@Override
9095
protected void handleMessageInternal(Message<?> message) {
96+
Assert.notNull(this.correlationStrategy, "'correlationStrategy' must not be null");
9197
Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
98+
Assert.notNull(correlationKey, "The correlation key is required");
9299
Object lock = getLock(correlationKey);
93100
synchronized (lock) {
94101
this.store.addMessagesToGroup(correlationKey, message);
@@ -103,12 +110,14 @@ private Object getLock(Object correlationKey) {
103110

104111
@SuppressWarnings("unchecked")
105112
@Override
113+
@Nullable
106114
public Message<Object> receive() {
107115
for (Object key : this.correlationLocks.keySet()) {
108116
Object lock = getLock(key);
109117
synchronized (lock) {
110118
MessageGroup group = this.store.getMessageGroup(key);
111119
//group might be removed by another thread
120+
Assert.notNull(this.releaseStrategy, "'releaseStrategy' must not be null");
112121
if (group != null && this.releaseStrategy.canRelease(group)) {
113122
Message<?> nextMessage = null;
114123

spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelationStrategy.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.aggregator;
1818

19+
import org.jspecify.annotations.Nullable;
20+
1921
import org.springframework.messaging.Message;
2022

2123
/**
@@ -35,6 +37,7 @@ public interface CorrelationStrategy {
3537
* @param message The message.
3638
* @return The correlation key.
3739
*/
40+
@Nullable
3841
Object getCorrelationKey(Message<?> message);
3942

4043
}

0 commit comments

Comments
 (0)