Skip to content

Commit 9e4a6e2

Browse files
kistlersspring-builds
authored andcommitted
GH-10471: Add the lockKey to the channel to control Redis cluster slot hashing
Fixes: #10471 As a consequence, change the `ChannelTopic` to a `PatternTopic` to keep receiving subscription events for Pub/Sub lock * Pass `unLockChannelKey` as `KEYS[2]` instead of `ARGV[2]` as values used as keys should be passed as such Signed-off-by: Severin Kistler <[email protected]> # Conflicts: # spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java (cherry picked from commit 00fa656)
1 parent 9a3e7da commit 9e4a6e2

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.ConcurrentModificationException;
2424
import java.util.Date;
2525
import java.util.LinkedHashMap;
26+
import java.util.List;
2627
import java.util.Map;
2728
import java.util.Map.Entry;
2829
import java.util.UUID;
@@ -52,7 +53,7 @@
5253
import org.springframework.data.redis.core.StringRedisTemplate;
5354
import org.springframework.data.redis.core.script.DefaultRedisScript;
5455
import org.springframework.data.redis.core.script.RedisScript;
55-
import org.springframework.data.redis.listener.ChannelTopic;
56+
import org.springframework.data.redis.listener.PatternTopic;
5657
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
5758
import org.springframework.data.redis.listener.Topic;
5859
import org.springframework.integration.support.locks.ExpirableLockRegistry;
@@ -94,6 +95,7 @@
9495
* @author Roman Zabaluev
9596
* @author Alex Peelman
9697
* @author Youbin Wu
98+
* @author Severin Kistler
9799
*
98100
* @since 4.0
99101
*
@@ -199,7 +201,7 @@ private void setupUnlockMessageListener(RedisConnectionFactory connectionFactory
199201
"'unlockNotifyMessageListener' must not have been re-initialized.");
200202
RedisLockRegistry.this.redisMessageListenerContainer = new RedisMessageListenerContainer();
201203
RedisLockRegistry.this.unlockNotifyMessageListener = new RedisPubSubLock.RedisUnLockNotifyMessageListener();
202-
final Topic topic = new ChannelTopic(this.unLockChannelKey);
204+
final Topic topic = new PatternTopic(this.unLockChannelKey + ":*");
203205
this.redisMessageListenerContainer.setConnectionFactory(connectionFactory);
204206
this.redisMessageListenerContainer.setTaskExecutor(this.executor);
205207
this.redisMessageListenerContainer.setSubscriptionExecutor(this.executor);
@@ -665,7 +667,7 @@ private final class RedisPubSubLock extends RedisLock {
665667
private static final String UNLINK_UNLOCK_SCRIPT = """
666668
local lockClientId = redis.call('GET', KEYS[1])
667669
if (lockClientId == ARGV[1] and redis.call('UNLINK', KEYS[1]) == 1) then
668-
redis.call('PUBLISH', ARGV[2], KEYS[1])
670+
redis.call('PUBLISH', KEYS[2], KEYS[1])
669671
return true
670672
end
671673
return false
@@ -674,7 +676,7 @@ private final class RedisPubSubLock extends RedisLock {
674676
private static final String DELETE_UNLOCK_SCRIPT = """
675677
local lockClientId = redis.call('GET', KEYS[1])
676678
if (lockClientId == ARGV[1] and redis.call('DEL', KEYS[1]) == 1) then
677-
redis.call('PUBLISH', ARGV[2], KEYS[1])
679+
redis.call('PUBLISH', KEYS[2], KEYS[1])
678680
return true
679681
end
680682
return false
@@ -706,9 +708,10 @@ protected boolean removeLockKeyInnerDelete() {
706708
}
707709

708710
private boolean removeLockKeyWithScript(RedisScript<Boolean> redisScript) {
711+
String unLockChannelKeyToUse = RedisLockRegistry.this.unLockChannelKey + ":" + this.lockKey;
709712
return Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute(
710-
redisScript, Collections.singletonList(this.lockKey),
711-
RedisLockRegistry.this.clientId, RedisLockRegistry.this.unLockChannelKey));
713+
redisScript, List.of(this.lockKey, unLockChannelKeyToUse),
714+
RedisLockRegistry.this.clientId));
712715
}
713716

714717
private boolean subscribeLock(long time) throws ExecutionException, InterruptedException {

0 commit comments

Comments
 (0)