-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix(RedisMessageStore): RedisMessageStore add lock #9680
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
dd83235
af5a737
ab0519d
5f5d40c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| /* | ||
| * Copyright 2002-2023 the original author or authors. | ||
| * Copyright 2002-2024 the original author or authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
|
|
@@ -19,30 +19,42 @@ | |
| import java.util.Arrays; | ||
| import java.util.Collection; | ||
| import java.util.LinkedHashSet; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.locks.Lock; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
|
|
||
| import org.apache.commons.logging.Log; | ||
| import org.apache.commons.logging.LogFactory; | ||
|
|
||
| import org.springframework.integration.support.locks.DefaultLockRegistry; | ||
| import org.springframework.integration.support.locks.LockRegistry; | ||
| import org.springframework.integration.util.CheckedCallable; | ||
| import org.springframework.integration.util.CheckedRunnable; | ||
| import org.springframework.jmx.export.annotation.ManagedAttribute; | ||
| import org.springframework.jmx.export.annotation.ManagedOperation; | ||
| import org.springframework.jmx.export.annotation.ManagedResource; | ||
| import org.springframework.messaging.Message; | ||
| import org.springframework.messaging.MessagingException; | ||
| import org.springframework.util.Assert; | ||
|
|
||
| /** | ||
| * @author Dave Syer | ||
| * @author Oleg Zhurakousky | ||
| * @author Gary Russell | ||
| * @author Artem Bilan | ||
| * @author Christian Tzolov | ||
| * @author Youbin Wu | ||
| * | ||
| * @since 2.0 | ||
| */ | ||
| @ManagedResource | ||
| public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageGroupStore | ||
| implements MessageGroupStore, Iterable<MessageGroup> { | ||
|
|
||
| protected static final String INTERRUPTED_WHILE_OBTAINING_LOCK = "Interrupted while obtaining lock"; | ||
|
|
||
| protected static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null"; | ||
|
|
||
| protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final | ||
|
|
||
| private final Lock lock = new ReentrantLock(); | ||
|
|
@@ -56,11 +68,15 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG | |
|
|
||
| private boolean timeoutOnIdle; | ||
|
|
||
| protected LockRegistry lockRegistry; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This must be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right, even LockRegister does not need to be exposed to subclasses. But SimpleMessageStore is special, it needs lockRegister and has its own semaphore mechanism. I hope you can handle the follow-up work, because I feel there will be many special considerations in it. I will not update this PR again
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please, confirm that you are OK that I'll pick it up whenever you have left. I'll do that, thought, in new year, when I switch to Happy holidays! |
||
|
|
||
| protected AbstractMessageGroupStore() { | ||
| this.lockRegistry = new DefaultLockRegistry(); | ||
| } | ||
|
|
||
| protected AbstractMessageGroupStore(boolean lazyLoadMessageGroups) { | ||
| this.lazyLoadMessageGroups = lazyLoadMessageGroups; | ||
| this.lockRegistry = new DefaultLockRegistry(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -109,6 +125,16 @@ public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) { | |
| this.lazyLoadMessageGroups = lazyLoadMessageGroups; | ||
| } | ||
|
|
||
| /** | ||
| * Specify the type of the {@link LockRegistry} to ensure atomic operations | ||
| * @param lockRegistry lockRegistryType | ||
| * @since 6.5 | ||
| */ | ||
| public void setLockRegistry(LockRegistry lockRegistry) { | ||
| Assert.notNull(lockRegistry, "The LockRegistry cannot be null"); | ||
| this.lockRegistry = lockRegistry; | ||
| } | ||
|
|
||
| @Override | ||
| public void registerMessageGroupExpiryCallback(MessageGroupCallback callback) { | ||
| if (callback instanceof UniqueExpiryCallback) { | ||
|
|
@@ -195,12 +221,98 @@ public void removeMessagesFromGroup(Object key, Message<?>... messages) { | |
| removeMessagesFromGroup(key, Arrays.asList(messages)); | ||
| } | ||
|
|
||
| @Override | ||
| public void removeMessagesFromGroup(Object key, Collection<Message<?>> messages) { | ||
| Assert.notNull(key, GROUP_ID_MUST_NOT_BE_NULL); | ||
| lockExecute(key, () -> removeMessagesFromGroupInner(key, messages)); | ||
| } | ||
|
|
||
| protected abstract void removeMessagesFromGroupInner(Object key, Collection<Message<?>> messages); | ||
|
|
||
| @Override | ||
| public void addMessagesToGroup(Object groupId, Message<?>... messages) { | ||
| Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); | ||
| lockExecute(groupId, () -> addMessagesToGroupInner(groupId, messages)); | ||
| } | ||
|
|
||
| protected abstract void addMessagesToGroupInner(Object groupId, Message<?>... messages); | ||
|
|
||
| @Override | ||
| public MessageGroup addMessageToGroup(Object groupId, Message<?> message) { | ||
| addMessagesToGroup(groupId, message); | ||
| return getMessageGroup(groupId); | ||
| } | ||
|
|
||
| @Override | ||
| public void removeMessageGroup(Object groupId) { | ||
| Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); | ||
| lockExecute(groupId, () -> removeMessageGroupInner(groupId)); | ||
| } | ||
|
|
||
| protected abstract void removeMessageGroupInner(Object groupId); | ||
|
|
||
| @Override | ||
| public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { | ||
| Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); | ||
| return lockExecute(groupId, () -> removeMessageFromGroupByIdInner(groupId, messageId)); | ||
| } | ||
|
|
||
| protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { | ||
| throw new UnsupportedOperationException("Not supported for this store"); | ||
| } | ||
|
|
||
| @Override | ||
| public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { | ||
| Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); | ||
| lockExecute(groupId, () -> setLastReleasedSequenceNumberForGroupInner(groupId, sequenceNumber)); | ||
| } | ||
|
|
||
| protected abstract void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber); | ||
|
|
||
| @Override | ||
| public void completeGroup(Object groupId) { | ||
| Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); | ||
| lockExecute(groupId, () -> completeGroupInner(groupId)); | ||
| } | ||
|
|
||
| protected abstract void completeGroupInner(Object groupId); | ||
|
|
||
| @Override | ||
| public void setGroupCondition(Object groupId, String condition) { | ||
| Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); | ||
| lockExecute(groupId, () -> setGroupConditionInner(groupId, condition)); | ||
| } | ||
|
|
||
| protected abstract void setGroupConditionInner(Object groupId, String condition); | ||
|
|
||
| @Override | ||
| public Message<?> pollMessageFromGroup(Object groupId) { | ||
| Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); | ||
| return lockExecute(groupId, () -> pollMessageFromGroupInner(groupId)); | ||
| } | ||
|
|
||
| protected abstract Message<?> pollMessageFromGroupInner(Object groupId); | ||
|
|
||
| protected <T, E extends RuntimeException> T lockExecute(Object groupId, CheckedCallable<T, E> runnable) { | ||
NaccOll marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| try { | ||
| return this.lockRegistry.executeLocked(groupId, runnable); | ||
| } | ||
| catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); | ||
| } | ||
| } | ||
|
|
||
| protected <E extends RuntimeException> void lockExecute(Object groupId, CheckedRunnable<E> runnable) { | ||
| try { | ||
| this.lockRegistry.executeLocked(groupId, runnable); | ||
| } | ||
| catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); | ||
| } | ||
| } | ||
|
|
||
| private void expire(MessageGroup group) { | ||
|
|
||
| RuntimeException exception = null; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.