Skip to content

Commit cc8bf94

Browse files
authored
GH-10083: Apply Nullability to core store package
Related to: #10083 * Also apply Nullability to `core` `support.locks` package * Add `@Nullable` to generic argument of the `CheckedCallable` and `CheckedFunction` * To not use delegation in the `LockRegistry` to avoid unnecessary nullability carry-on * Remove redundant ctors from the `MessageHolder` and `MessageMetadata`. Use `@JsonCreator` instead on the parameterized one * Remove unnecessary now `Assert.notNull()` in the `MessageStore` methods * * Fix found typos * Improve logic for null message id to use `Assert.state` instead and add message context to the exception message * Remove potentially unnecessary `Assert.notNull` in critical `MessageStore` calls * Use explicit `StringBuilder` in the `AbstractKeyValueMessageStore.doAddMessage()`
1 parent 9d7673d commit cc8bf94

File tree

29 files changed

+186
-235
lines changed

29 files changed

+186
-235
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,7 @@ protected void forceComplete(MessageGroup group) { // NOSONAR Complexity
831831
}
832832
else {
833833
/*
834-
* By default empty groups are removed on the same schedule as non-empty
834+
* By default, empty groups are removed on the same schedule as non-empty
835835
* groups. A longer timeout for empty groups can be enabled by
836836
* setting minimumTimeoutForEmptyGroups.
837837
*/
@@ -923,9 +923,9 @@ protected void expireGroup(Object correlationKey, MessageGroup group, Lock lock)
923923
}
924924
}
925925

926+
@SuppressWarnings("NullAway") // Never called with an empty group
926927
protected void completeGroup(Object correlationKey, MessageGroup group, Lock lock) {
927-
Message<?> first = group.getOne();
928-
completeGroup(first, correlationKey, group, lock);
928+
completeGroup(group.getOne(), correlationKey, group, lock);
929929
}
930930

931931
@SuppressWarnings("unchecked")

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java

Lines changed: 41 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.Collection;
21+
import java.util.Collections;
2122
import java.util.HashSet;
2223
import java.util.Iterator;
2324
import java.util.List;
@@ -66,7 +67,6 @@ protected AbstractKeyValueMessageStore() {
6667
* @since 4.3.12
6768
*/
6869
protected AbstractKeyValueMessageStore(String prefix) {
69-
Assert.notNull(prefix, "'prefix' must not be null");
7070
this.messagePrefix = prefix + MESSAGE_KEY_PREFIX;
7171
this.groupPrefix = prefix + MESSAGE_GROUP_KEY_PREFIX;
7272
}
@@ -96,8 +96,7 @@ public String getGroupPrefix() {
9696
// MessageStore methods
9797

9898
@Override
99-
public Message<?> getMessage(UUID messageId) {
100-
Assert.notNull(messageId, "'messageId' must not be null");
99+
public @Nullable Message<?> getMessage(UUID messageId) {
101100
Object object = doRetrieve(this.messagePrefix + messageId);
102101
if (object != null) {
103102
return extractMessage(object);
@@ -122,8 +121,7 @@ else if (object instanceof Message<?> message) {
122121
}
123122

124123
@Override
125-
public MessageMetadata getMessageMetadata(UUID messageId) {
126-
Assert.notNull(messageId, "'messageId' must not be null");
124+
public @Nullable MessageMetadata getMessageMetadata(UUID messageId) {
127125
Object object = doRetrieve(this.messagePrefix + messageId);
128126
if (object != null) {
129127
extractMessage(object);
@@ -135,27 +133,27 @@ public MessageMetadata getMessageMetadata(UUID messageId) {
135133
}
136134

137135
@Override
138-
@SuppressWarnings("unchecked")
139136
public <T> Message<T> addMessage(Message<T> message) {
140137
doAddMessage(message);
141-
return (Message<T>) getMessage(message.getHeaders().getId());
138+
return message;
142139
}
143140

144141
protected void doAddMessage(Message<?> message) {
145142
doAddMessage(message, null);
146143
}
147144

148145
protected void doAddMessage(Message<?> message, @Nullable Object groupId) {
149-
Assert.notNull(message, "'message' must not be null");
150146
UUID messageId = message.getHeaders().getId();
151147
Assert.notNull(messageId, "Cannot store messages without an ID header");
152-
String messageKey = this.messagePrefix + (groupId != null ? groupId.toString() + '_' : "") + messageId;
148+
String messageKey =
149+
new StringBuilder(this.messagePrefix)
150+
.append(groupId != null ? groupId.toString() + '_' : "")
151+
.append(messageId).toString();
153152
doStoreIfAbsent(messageKey, new MessageHolder(message));
154153
}
155154

156155
@Override
157-
public Message<?> removeMessage(UUID id) {
158-
Assert.notNull(id, "'id' must not be null");
156+
public @Nullable Message<?> removeMessage(UUID id) {
159157
Object object = doRemove(this.messagePrefix + id);
160158
if (object != null) {
161159
return extractMessage(object);
@@ -195,9 +193,8 @@ public MessageGroup getMessageGroup(Object groupId) {
195193
}
196194

197195
@Override
198-
public MessageGroupMetadata getGroupMetadata(Object groupId) {
199-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
200-
Object mgm = this.doRetrieve(this.groupPrefix + groupId);
196+
public @Nullable MessageGroupMetadata getGroupMetadata(Object groupId) {
197+
Object mgm = doRetrieve(this.groupPrefix + groupId);
201198
if (mgm != null) {
202199
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
203200
return (MessageGroupMetadata) mgm;
@@ -206,10 +203,8 @@ public MessageGroupMetadata getGroupMetadata(Object groupId) {
206203
}
207204

208205
@Override
206+
@SuppressWarnings("NullAway") // dataflow analysis limitation
209207
protected void doAddMessagesToGroup(Object groupId, Message<?>... messages) {
210-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
211-
Assert.notNull(messages, "'messages' must not be null");
212-
213208
MessageGroupMetadata metadata = getGroupMetadata(groupId);
214209
SimpleMessageGroup group = null;
215210
if (metadata == null) {
@@ -219,7 +214,9 @@ protected void doAddMessagesToGroup(Object groupId, Message<?>... messages) {
219214
for (Message<?> message : messages) {
220215
doAddMessage(message, groupId);
221216
if (metadata != null) {
222-
metadata.add(message.getHeaders().getId());
217+
UUID id = message.getHeaders().getId();
218+
Assert.state(id != null, () -> "Message 'id' must not be null: " + message);
219+
metadata.add(id);
223220
}
224221
else {
225222
group.add(message);
@@ -241,17 +238,16 @@ protected void doAddMessagesToGroup(Object groupId, Message<?>... messages) {
241238

242239
@Override
243240
protected void doRemoveMessagesFromGroup(Object groupId, Collection<Message<?>> messages) {
244-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
245-
Assert.notNull(messages, "'messages' must not be null");
246-
247241
Object mgm = doRetrieve(this.groupPrefix + groupId);
248242
if (mgm != null) {
249243
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
250244
MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm;
251245

252246
List<UUID> ids = new ArrayList<>();
253247
for (Message<?> messageToRemove : messages) {
254-
ids.add(messageToRemove.getHeaders().getId());
248+
UUID id = messageToRemove.getHeaders().getId();
249+
Assert.state(id != null, () -> "Message 'id' must not be null: " + messageToRemove);
250+
ids.add(id);
255251
}
256252

257253
messageGroupMetadata.removeAll(ids);
@@ -269,10 +265,7 @@ protected void doRemoveMessagesFromGroup(Object groupId, Collection<Message<?>>
269265
}
270266

271267
@Override
272-
@Nullable
273-
public Message<?> getMessageFromGroup(Object groupId, UUID messageId) {
274-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
275-
Assert.notNull(messageId, "'messageId' must not be null");
268+
public @Nullable Message<?> getMessageFromGroup(Object groupId, UUID messageId) {
276269
Object object = doRetrieve(this.messagePrefix + groupId + '_' + messageId);
277270
if (object != null) {
278271
return extractMessage(object);
@@ -284,8 +277,6 @@ public Message<?> getMessageFromGroup(Object groupId, UUID messageId) {
284277

285278
@Override
286279
protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) {
287-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
288-
Assert.notNull(messageId, "'messageId' must not be null");
289280
Object mgm = doRetrieve(this.groupPrefix + groupId);
290281
if (mgm != null) {
291282
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
@@ -306,7 +297,6 @@ protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) {
306297

307298
@Override
308299
protected void doCompleteGroup(Object groupId) {
309-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
310300
MessageGroupMetadata metadata = getGroupMetadata(groupId);
311301
if (metadata != null) {
312302
metadata.complete();
@@ -320,7 +310,6 @@ protected void doCompleteGroup(Object groupId) {
320310
*/
321311
@Override
322312
protected void doRemoveMessageGroup(Object groupId) {
323-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
324313
Object mgm = doRemove(this.groupPrefix + groupId);
325314
if (mgm != null) {
326315
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
@@ -347,7 +336,6 @@ protected void doSetGroupCondition(Object groupId, String condition) {
347336

348337
@Override
349338
protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
350-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
351339
MessageGroupMetadata metadata = getGroupMetadata(groupId);
352340
if (metadata == null) {
353341
SimpleMessageGroup messageGroup = new SimpleMessageGroup(groupId);
@@ -359,7 +347,7 @@ protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int seque
359347
}
360348

361349
@Override
362-
protected Message<?> doPollMessageFromGroup(Object groupId) {
350+
protected @Nullable Message<?> doPollMessageFromGroup(Object groupId) {
363351
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
364352
if (groupMetadata != null) {
365353
UUID firstId = groupMetadata.firstId();
@@ -373,8 +361,7 @@ protected Message<?> doPollMessageFromGroup(Object groupId) {
373361
return null;
374362
}
375363

376-
private Message<?> removeMessageFromGroup(UUID id, Object groupId) {
377-
Assert.notNull(id, "'id' must not be null");
364+
private @Nullable Message<?> removeMessageFromGroup(UUID id, Object groupId) {
378365
Object object = doRemove(this.messagePrefix + groupId + '_' + id);
379366
if (object != null) {
380367
return extractMessage(object);
@@ -385,27 +372,20 @@ private Message<?> removeMessageFromGroup(UUID id, Object groupId) {
385372
}
386373

387374
@Override
388-
public Message<?> getOneMessageFromGroup(Object groupId) {
375+
public @Nullable Message<?> getOneMessageFromGroup(Object groupId) {
389376
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
390-
if (groupMetadata != null) {
391-
UUID messageId = groupMetadata.firstId();
392-
if (messageId != null) {
393-
return getMessageFromGroup(messageId, groupId);
394-
}
377+
Assert.state(groupMetadata != null, () -> "No group for: " + groupId);
378+
UUID messageId = groupMetadata.firstId();
379+
if (messageId != null) {
380+
return getMessageFromGroup(messageId, groupId);
395381
}
396382
return null;
397383
}
398384

399-
@Nullable
400385
private Message<?> getMessageFromGroup(UUID messageId, Object groupId) {
401-
Assert.notNull(messageId, "'messageId' must not be null");
402386
Object object = doRetrieve(this.messagePrefix + groupId + '_' + messageId);
403-
if (object != null) {
404-
return extractMessage(object);
405-
}
406-
else {
407-
return null;
408-
}
387+
Assert.state(object != null, () -> "No message found for: " + messageId);
388+
return extractMessage(object);
409389
}
410390

411391
@Override
@@ -423,7 +403,11 @@ public Collection<Message<?>> getMessagesForGroup(Object groupId) {
423403

424404
@Override
425405
public Stream<Message<?>> streamMessagesForGroup(Object groupId) {
426-
return getGroupMetadata(groupId)
406+
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
407+
if (groupMetadata == null) {
408+
return Stream.empty();
409+
}
410+
return groupMetadata
427411
.getMessageIds()
428412
.stream()
429413
.map((messageId) -> getMessageFromGroup(messageId, groupId));
@@ -432,9 +416,11 @@ public Stream<Message<?>> streamMessagesForGroup(Object groupId) {
432416
@Override
433417
@SuppressWarnings("unchecked")
434418
public Iterator<MessageGroup> iterator() {
435-
final Iterator<?> idIterator = normalizeKeys(
436-
(Collection<String>) doListKeys(this.groupPrefix + '*'))
437-
.iterator();
419+
Collection<?> objects = doListKeys(this.groupPrefix + '*');
420+
if (objects == null) {
421+
objects = Collections.emptyList();
422+
}
423+
Iterator<?> idIterator = normalizeKeys((Collection<String>) objects).iterator();
438424
return new MessageGroupIterator(idIterator);
439425
}
440426

@@ -464,17 +450,17 @@ public int messageGroupSize(Object groupId) {
464450
}
465451
}
466452

467-
protected abstract Object doRetrieve(Object id);
453+
protected abstract @Nullable Object doRetrieve(Object id);
468454

469455
protected abstract void doStore(Object id, Object objectToStore);
470456

471457
protected abstract void doStoreIfAbsent(Object id, Object objectToStore);
472458

473-
protected abstract Object doRemove(Object id);
459+
protected abstract @Nullable Object doRemove(Object id);
474460

475461
protected abstract void doRemoveAll(Collection<Object> ids);
476462

477-
protected abstract Collection<?> doListKeys(String keyPattern);
463+
protected abstract @Nullable Collection<?> doListKeys(String keyPattern);
478464

479465
private final class MessageGroupIterator implements Iterator<MessageGroup> {
480466

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.apache.commons.logging.Log;
2727
import org.apache.commons.logging.LogFactory;
28+
import org.jspecify.annotations.Nullable;
2829

2930
import org.springframework.integration.support.locks.DefaultLockRegistry;
3031
import org.springframework.integration.support.locks.LockRegistry;
@@ -52,8 +53,6 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG
5253

5354
protected static final String INTERRUPTED_WHILE_OBTAINING_LOCK = "Interrupted while obtaining lock";
5455

55-
protected static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null";
56-
5756
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final
5857

5958
private final Lock lock = new ReentrantLock();
@@ -212,27 +211,20 @@ public int getMessageGroupCount() {
212211
return count;
213212
}
214213

215-
@Override
216-
public MessageGroupMetadata getGroupMetadata(Object groupId) {
217-
throw new UnsupportedOperationException("Not yet implemented for this store");
218-
}
219-
220214
@Override
221215
public void removeMessagesFromGroup(Object key, Message<?>... messages) {
222216
removeMessagesFromGroup(key, Arrays.asList(messages));
223217
}
224218

225219
@Override
226220
public void removeMessagesFromGroup(Object key, Collection<Message<?>> messages) {
227-
Assert.notNull(key, GROUP_ID_MUST_NOT_BE_NULL);
228221
executeLocked(key, () -> doRemoveMessagesFromGroup(key, messages));
229222
}
230223

231224
protected abstract void doRemoveMessagesFromGroup(Object key, Collection<Message<?>> messages);
232225

233226
@Override
234227
public void addMessagesToGroup(Object groupId, Message<?>... messages) {
235-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
236228
executeLocked(groupId, () -> doAddMessagesToGroup(groupId, messages));
237229
}
238230

@@ -246,15 +238,13 @@ public MessageGroup addMessageToGroup(Object groupId, Message<?> message) {
246238

247239
@Override
248240
public void removeMessageGroup(Object groupId) {
249-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
250241
executeLocked(groupId, () -> doRemoveMessageGroup(groupId));
251242
}
252243

253244
protected abstract void doRemoveMessageGroup(Object groupId);
254245

255246
@Override
256247
public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
257-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
258248
return executeLocked(groupId, () -> doRemoveMessageFromGroupById(groupId, messageId));
259249
}
260250

@@ -264,35 +254,31 @@ protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) {
264254

265255
@Override
266256
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
267-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
268257
executeLocked(groupId, () -> doSetLastReleasedSequenceNumberForGroup(groupId, sequenceNumber));
269258
}
270259

271260
protected abstract void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber);
272261

273262
@Override
274263
public void completeGroup(Object groupId) {
275-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
276264
executeLocked(groupId, () -> doCompleteGroup(groupId));
277265
}
278266

279267
protected abstract void doCompleteGroup(Object groupId);
280268

281269
@Override
282270
public void setGroupCondition(Object groupId, String condition) {
283-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
284271
executeLocked(groupId, () -> doSetGroupCondition(groupId, condition));
285272
}
286273

287274
protected abstract void doSetGroupCondition(Object groupId, String condition);
288275

289276
@Override
290-
public Message<?> pollMessageFromGroup(Object groupId) {
291-
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
292-
return executeLocked(groupId, () -> doPollMessageFromGroup(groupId));
277+
public @Nullable Message<?> pollMessageFromGroup(Object groupId) {
278+
return this.<@Nullable Message<?>, RuntimeException>executeLocked(groupId, () -> doPollMessageFromGroup(groupId));
293279
}
294280

295-
protected abstract Message<?> doPollMessageFromGroup(Object groupId);
281+
protected abstract @Nullable Message<?> doPollMessageFromGroup(Object groupId);
296282

297283
protected <T, E extends RuntimeException> T executeLocked(Object groupId, CheckedCallable<T, E> runnable) {
298284
try {

0 commit comments

Comments
 (0)