Skip to content

Commit 9962ee4

Browse files
NaccOllartembilan
authored andcommitted
GH-5123: Add LockRegistry to AbstractMessageGroupStore
Fixes: #5123 When `RedisMessageStore`, for example, adds and removes messages, it operates on two keys separately, which may cause problems in multi-threading due to non-atomic operations. Although using Redis to delay messages is not a good idea, the abnormal loss of messages in the logs alerted me when the number of requests was not large. By comparing the logs, the problem that the message group representing the metadata is not consistent with the actual message. A simple solution is to add lock like in the `SimpleMessageStore`, which is also the approach taken in this pull request. * Add `LockRegistry` to `AbstractMessageGroupStore` * Normalize access levels and method name about the lock of `MessageGroupStore` * Add document about the lock of `AbstractMessageGroupStore`
1 parent 12a643d commit 9962ee4

File tree

14 files changed

+284
-214
lines changed

14 files changed

+284
-214
lines changed

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,13 +38,12 @@
3838
* @author Gary Russell
3939
* @author Artem Bilan
4040
* @author Ngoc Nhan
41+
* @author Youbin Wu
4142
*
4243
* @since 2.1
4344
*/
4445
public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupStore implements MessageStore {
4546

46-
private static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null";
47-
4847
protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_";
4948

5049
protected static final String MESSAGE_GROUP_KEY_PREFIX = "GROUP_OF_MESSAGES_";
@@ -206,7 +205,7 @@ public MessageGroupMetadata getGroupMetadata(Object groupId) {
206205
}
207206

208207
@Override
209-
public void addMessagesToGroup(Object groupId, Message<?>... messages) {
208+
protected void doAddMessagesToGroup(Object groupId, Message<?>... messages) {
210209
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
211210
Assert.notNull(messages, "'messages' must not be null");
212211

@@ -240,7 +239,7 @@ public void addMessagesToGroup(Object groupId, Message<?>... messages) {
240239
}
241240

242241
@Override
243-
public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messages) {
242+
protected void doRemoveMessagesFromGroup(Object groupId, Collection<Message<?>> messages) {
244243
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
245244
Assert.notNull(messages, "'messages' must not be null");
246245

@@ -283,7 +282,7 @@ public Message<?> getMessageFromGroup(Object groupId, UUID messageId) {
283282
}
284283

285284
@Override
286-
public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
285+
protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) {
287286
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
288287
Assert.notNull(messageId, "'messageId' must not be null");
289288
Object mgm = doRetrieve(this.groupPrefix + groupId);
@@ -305,7 +304,7 @@ public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
305304
}
306305

307306
@Override
308-
public void completeGroup(Object groupId) {
307+
protected void doCompleteGroup(Object groupId) {
309308
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
310309
MessageGroupMetadata metadata = getGroupMetadata(groupId);
311310
if (metadata != null) {
@@ -319,7 +318,7 @@ public void completeGroup(Object groupId) {
319318
* Remove the MessageGroup with the provided group ID.
320319
*/
321320
@Override
322-
public void removeMessageGroup(Object groupId) {
321+
protected void doRemoveMessageGroup(Object groupId) {
323322
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
324323
Object mgm = doRemove(this.groupPrefix + groupId);
325324
if (mgm != null) {
@@ -337,7 +336,7 @@ public void removeMessageGroup(Object groupId) {
337336
}
338337

339338
@Override
340-
public void setGroupCondition(Object groupId, String condition) {
339+
protected void doSetGroupCondition(Object groupId, String condition) {
341340
MessageGroupMetadata metadata = getGroupMetadata(groupId);
342341
if (metadata != null) {
343342
metadata.setCondition(condition);
@@ -346,7 +345,7 @@ public void setGroupCondition(Object groupId, String condition) {
346345
}
347346

348347
@Override
349-
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
348+
protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
350349
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
351350
MessageGroupMetadata metadata = getGroupMetadata(groupId);
352351
if (metadata == null) {
@@ -359,7 +358,7 @@ public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNu
359358
}
360359

361360
@Override
362-
public Message<?> pollMessageFromGroup(Object groupId) {
361+
protected Message<?> doPollMessageFromGroup(Object groupId) {
363362
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
364363
if (groupMetadata != null) {
365364
UUID firstId = groupMetadata.firstId();

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,30 +19,41 @@
1919
import java.util.Arrays;
2020
import java.util.Collection;
2121
import java.util.LinkedHashSet;
22+
import java.util.UUID;
2223
import java.util.concurrent.locks.Lock;
2324
import java.util.concurrent.locks.ReentrantLock;
2425

2526
import org.apache.commons.logging.Log;
2627
import org.apache.commons.logging.LogFactory;
2728

29+
import org.springframework.integration.support.locks.DefaultLockRegistry;
30+
import org.springframework.integration.support.locks.LockRegistry;
31+
import org.springframework.integration.util.CheckedCallable;
32+
import org.springframework.integration.util.CheckedRunnable;
2833
import org.springframework.jmx.export.annotation.ManagedAttribute;
2934
import org.springframework.jmx.export.annotation.ManagedOperation;
3035
import org.springframework.jmx.export.annotation.ManagedResource;
3136
import org.springframework.messaging.Message;
37+
import org.springframework.util.Assert;
3238

3339
/**
3440
* @author Dave Syer
3541
* @author Oleg Zhurakousky
3642
* @author Gary Russell
3743
* @author Artem Bilan
3844
* @author Christian Tzolov
45+
* @author Youbin Wu
3946
*
4047
* @since 2.0
4148
*/
4249
@ManagedResource
4350
public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageGroupStore
4451
implements MessageGroupStore, Iterable<MessageGroup> {
4552

53+
protected static final String INTERRUPTED_WHILE_OBTAINING_LOCK = "Interrupted while obtaining lock";
54+
55+
protected static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null";
56+
4657
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final
4758

4859
private final Lock lock = new ReentrantLock();
@@ -56,6 +67,8 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG
5667

5768
private boolean timeoutOnIdle;
5869

70+
private LockRegistry lockRegistry = new DefaultLockRegistry();
71+
5972
protected AbstractMessageGroupStore() {
6073
}
6174

@@ -109,6 +122,20 @@ public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) {
109122
this.lazyLoadMessageGroups = lazyLoadMessageGroups;
110123
}
111124

125+
/**
126+
* Specify the type of the {@link LockRegistry} to ensure atomic operations
127+
* @param lockRegistry lockRegistryType
128+
* @since 6.5
129+
*/
130+
public final void setLockRegistry(LockRegistry lockRegistry) {
131+
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
132+
this.lockRegistry = lockRegistry;
133+
}
134+
135+
protected LockRegistry getLockRegistry() {
136+
return this.lockRegistry;
137+
}
138+
112139
@Override
113140
public void registerMessageGroupExpiryCallback(MessageGroupCallback callback) {
114141
if (callback instanceof UniqueExpiryCallback) {
@@ -195,12 +222,98 @@ public void removeMessagesFromGroup(Object key, Message<?>... messages) {
195222
removeMessagesFromGroup(key, Arrays.asList(messages));
196223
}
197224

225+
@Override
226+
public void removeMessagesFromGroup(Object key, Collection<Message<?>> messages) {
227+
Assert.notNull(key, GROUP_ID_MUST_NOT_BE_NULL);
228+
executeLocked(key, () -> doRemoveMessagesFromGroup(key, messages));
229+
}
230+
231+
protected abstract void doRemoveMessagesFromGroup(Object key, Collection<Message<?>> messages);
232+
233+
@Override
234+
public void addMessagesToGroup(Object groupId, Message<?>... messages) {
235+
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
236+
executeLocked(groupId, () -> doAddMessagesToGroup(groupId, messages));
237+
}
238+
239+
protected abstract void doAddMessagesToGroup(Object groupId, Message<?>... messages);
240+
198241
@Override
199242
public MessageGroup addMessageToGroup(Object groupId, Message<?> message) {
200243
addMessagesToGroup(groupId, message);
201244
return getMessageGroup(groupId);
202245
}
203246

247+
@Override
248+
public void removeMessageGroup(Object groupId) {
249+
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
250+
executeLocked(groupId, () -> doRemoveMessageGroup(groupId));
251+
}
252+
253+
protected abstract void doRemoveMessageGroup(Object groupId);
254+
255+
@Override
256+
public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
257+
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
258+
return executeLocked(groupId, () -> doRemoveMessageFromGroupById(groupId, messageId));
259+
}
260+
261+
protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) {
262+
throw new UnsupportedOperationException("Not supported for this store");
263+
}
264+
265+
@Override
266+
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
267+
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
268+
executeLocked(groupId, () -> doSetLastReleasedSequenceNumberForGroup(groupId, sequenceNumber));
269+
}
270+
271+
protected abstract void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber);
272+
273+
@Override
274+
public void completeGroup(Object groupId) {
275+
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
276+
executeLocked(groupId, () -> doCompleteGroup(groupId));
277+
}
278+
279+
protected abstract void doCompleteGroup(Object groupId);
280+
281+
@Override
282+
public void setGroupCondition(Object groupId, String condition) {
283+
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
284+
executeLocked(groupId, () -> doSetGroupCondition(groupId, condition));
285+
}
286+
287+
protected abstract void doSetGroupCondition(Object groupId, String condition);
288+
289+
@Override
290+
public Message<?> pollMessageFromGroup(Object groupId) {
291+
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
292+
return executeLocked(groupId, () -> doPollMessageFromGroup(groupId));
293+
}
294+
295+
protected abstract Message<?> doPollMessageFromGroup(Object groupId);
296+
297+
protected <T, E extends RuntimeException> T executeLocked(Object groupId, CheckedCallable<T, E> runnable) {
298+
try {
299+
return this.lockRegistry.executeLocked(groupId, runnable);
300+
}
301+
catch (InterruptedException ex) {
302+
Thread.currentThread().interrupt();
303+
throw new IllegalStateException(INTERRUPTED_WHILE_OBTAINING_LOCK, ex);
304+
}
305+
}
306+
307+
protected <E extends RuntimeException> void executeLocked(Object groupId, CheckedRunnable<E> runnable) {
308+
try {
309+
this.lockRegistry.executeLocked(groupId, runnable);
310+
}
311+
catch (InterruptedException ex) {
312+
Thread.currentThread().interrupt();
313+
throw new IllegalStateException(INTERRUPTED_WHILE_OBTAINING_LOCK, ex);
314+
}
315+
}
316+
204317
private void expire(MessageGroup group) {
205318

206319
RuntimeException exception = null;

0 commit comments

Comments
 (0)