Skip to content

Commit af6d5a9

Browse files
authored
GH-10471: Add the lockKey to the channel to control Redis cluster slot hashing
Fixes: #10471 As a consequence, change the `ChannelTopic` to a `PatternTopic` to keep receiving subscription events for Pub/Sub lock * Pass `unLockChannelKey` as `KEYS[2]` instead of `ARGV[2]` as values used as keys should be passed as such Signed-off-by: Severin Kistler <[email protected]> **Auto-cherry-pick to `6.5.x` & `6.4.x`**
1 parent 3580804 commit af6d5a9

File tree

1 file changed

+8
-5
lines changed

1 file changed

+8
-5
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.ConcurrentModificationException;
2424
import java.util.Date;
2525
import java.util.LinkedHashMap;
26+
import java.util.List;
2627
import java.util.Map;
2728
import java.util.Map.Entry;
2829
import java.util.UUID;
@@ -53,7 +54,7 @@
5354
import org.springframework.data.redis.core.StringRedisTemplate;
5455
import org.springframework.data.redis.core.script.DefaultRedisScript;
5556
import org.springframework.data.redis.core.script.RedisScript;
56-
import org.springframework.data.redis.listener.ChannelTopic;
57+
import org.springframework.data.redis.listener.PatternTopic;
5758
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
5859
import org.springframework.data.redis.listener.Topic;
5960
import org.springframework.integration.support.locks.DistributedLock;
@@ -97,6 +98,7 @@
9798
* @author Alex Peelman
9899
* @author Youbin Wu
99100
* @author Michal Domagala
101+
* @author Severin Kistler
100102
*
101103
* @since 4.0
102104
*
@@ -213,7 +215,7 @@ private void setupUnlockMessageListener(RedisConnectionFactory connectionFactory
213215
"'unlockNotifyMessageListener' must not have been re-initialized.");
214216
RedisLockRegistry.this.redisMessageListenerContainer = new RedisMessageListenerContainer();
215217
RedisLockRegistry.this.unlockNotifyMessageListener = new RedisPubSubLock.RedisUnLockNotifyMessageListener();
216-
final Topic topic = new ChannelTopic(this.unLockChannelKey);
218+
final Topic topic = new PatternTopic(this.unLockChannelKey + ":*");
217219
RedisMessageListenerContainer container = RedisLockRegistry.this.redisMessageListenerContainer;
218220
RedisPubSubLock.RedisUnLockNotifyMessageListener listener = RedisLockRegistry.this.unlockNotifyMessageListener;
219221
container.setConnectionFactory(connectionFactory);
@@ -667,7 +669,7 @@ private final class RedisPubSubLock extends RedisLock {
667669
private static final String UNLINK_UNLOCK_SCRIPT = """
668670
local lockClientId = redis.call('GET', KEYS[1])
669671
if (lockClientId == ARGV[1] and redis.call('UNLINK', KEYS[1]) == 1) then
670-
redis.call('PUBLISH', ARGV[2], KEYS[1])
672+
redis.call('PUBLISH', KEYS[2], KEYS[1])
671673
return true
672674
end
673675
return false
@@ -688,9 +690,10 @@ protected boolean tryRedisLockInner(long time, long expireAfter)
688690

689691
@Override
690692
protected boolean removeLockKeyInnerUnlink() {
693+
final String unLockChannelKey = RedisLockRegistry.this.unLockChannelKey + ":" + this.lockKey;
691694
return Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
692-
UNLINK_UNLOCK_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
693-
RedisLockRegistry.this.clientId, RedisLockRegistry.this.unLockChannelKey));
695+
UNLINK_UNLOCK_REDIS_SCRIPT, List.of(this.lockKey, unLockChannelKey),
696+
RedisLockRegistry.this.clientId));
694697
}
695698

696699
private boolean subscribeLock(long time, long expireAfter) throws ExecutionException, InterruptedException {

0 commit comments

Comments
 (0)