Skip to content

Commit 903955b

Browse files
authored
fix: cluster client should be able to handle redirection for the watch command (#328)
1 parent c09cd8f commit 903955b

File tree

4 files changed

+101
-63
lines changed

4 files changed

+101
-63
lines changed

lib/redis_client/cluster.rb

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,15 @@ def multi(watch: nil)
9595
if watch.nil? || watch.empty?
9696
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder)
9797
yield transaction
98+
return transaction.execute
99+
end
100+
101+
::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, resharding|
102+
transaction = ::RedisClient::Cluster::Transaction.new(
103+
@router, @command_builder, node: c, resharding: resharding
104+
)
105+
yield transaction
98106
transaction.execute
99-
else
100-
locking = ::RedisClient::Cluster::OptimisticLocking.new(watch, @router)
101-
locking.watch do |c|
102-
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, c)
103-
yield transaction
104-
transaction.execute
105-
end
106107
end
107108
end
108109

lib/redis_client/cluster/optimistic_locking.rb

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,32 @@
77
class RedisClient
88
class Cluster
99
class OptimisticLocking
10-
def initialize(keys, router)
11-
@node = find_node!(keys, router)
12-
@keys = keys
10+
def initialize(router)
11+
@router = router
1312
end
1413

15-
def watch
16-
@node.with do |c|
17-
c.call('WATCH', *@keys)
18-
reply = yield(c)
19-
c.call('UNWATCH')
20-
reply
14+
def watch(keys)
15+
ensure_safe_keys(keys)
16+
node = find_node(keys)
17+
cnt = 0 # We assume redirects occurred when incrementing it.
18+
19+
@router.handle_redirection(node, retry_count: 1) do |nd|
20+
cnt += 1
21+
nd.with do |c|
22+
c.call('WATCH', *keys)
23+
reply = yield(c, cnt > 1)
24+
c.call('UNWATCH')
25+
reply
26+
end
2127
end
2228
end
2329

2430
private
2531

26-
def find_node!(keys, router)
27-
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}" unless safe?(keys)
32+
def ensure_safe_keys(keys)
33+
return if safe?(keys)
2834

29-
node_key = router.find_primary_node_key(['WATCH', *keys])
30-
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "couldn't determine the node" if node_key.nil?
31-
32-
router.find_node(node_key)
35+
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "unsafe watch: #{keys.join(' ')}"
3336
end
3437

3538
def safe?(keys)
@@ -43,6 +46,13 @@ def safe?(keys)
4346

4447
slots.uniq.size == 1
4548
end
49+
50+
def find_node(keys)
51+
node_key = @router.find_primary_node_key(['WATCH', *keys])
52+
return @router.find_node(node_key) unless node_key.nil?
53+
54+
raise ::RedisClient::Cluster::Transaction::ConsistencyError, "couldn't determine the node"
55+
end
4656
end
4757
end
4858
end

lib/redis_client/cluster/transaction.rb

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,17 @@ class RedisClient
88
class Cluster
99
class Transaction
1010
ConsistencyError = Class.new(::RedisClient::Error)
11+
MAX_REDIRECTION = 2
1112

12-
def initialize(router, command_builder, node = nil)
13+
def initialize(router, command_builder, node: nil, resharding: false)
1314
@router = router
1415
@command_builder = command_builder
1516
@retryable = true
1617
@pipeline = ::RedisClient::Pipeline.new(@command_builder)
1718
@pending_commands = []
1819
@node = node
1920
prepare_tx unless @node.nil?
21+
@resharding_state = resharding
2022
end
2123

2224
def call(*command, **kwargs, &block)
@@ -92,7 +94,7 @@ def prepare_tx
9294

9395
def settle
9496
@pipeline.call('EXEC')
95-
send_transaction(@node, redirect: true)
97+
send_transaction(@node, redirect: MAX_REDIRECTION)
9698
end
9799

98100
def send_transaction(client, redirect:)
@@ -109,7 +111,7 @@ def send_pipeline(client, redirect:)
109111
client.middlewares.call_pipelined(commands, client.config) do
110112
connection.call_pipelined(commands, nil)
111113
rescue ::RedisClient::CommandError => e
112-
return handle_command_error!(commands, e) if redirect
114+
return handle_command_error!(client, commands, e, redirect: redirect) unless redirect.zero?
113115

114116
raise
115117
end
@@ -138,39 +140,30 @@ def coerce_results!(results, offset: 1)
138140
results
139141
end
140142

141-
def handle_command_error!(commands, err)
143+
def handle_command_error!(client, commands, err, redirect:) # rubocop:disable Metrics/AbcSize
142144
if err.message.start_with?('CROSSSLOT')
143145
raise ConsistencyError, "#{err.message}: #{err.command}"
144-
elsif err.message.start_with?('MOVED', 'ASK')
145-
ensure_the_same_node!(commands)
146-
handle_redirection(err)
146+
elsif err.message.start_with?('MOVED')
147+
ensure_the_same_node!(client, commands)
148+
node = @router.assign_redirection_node(err.message)
149+
send_transaction(node, redirect: redirect - 1)
150+
elsif err.message.start_with?('ASK')
151+
ensure_the_same_node!(client, commands)
152+
node = @router.assign_asking_node(err.message)
153+
try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err
147154
else
148155
raise err
149156
end
150157
end
151158

152-
def ensure_the_same_node!(commands)
153-
expected_node_key = NodeKey.build_from_client(@node)
159+
def ensure_the_same_node!(client, commands)
160+
node_keys = commands.map { |command| @router.find_primary_node_key(command) }.compact.uniq
161+
expected_node_key = ::RedisClient::Cluster::NodeKey.build_from_client(client)
154162

155-
commands.each do |command|
156-
node_key = @router.find_primary_node_key(command)
157-
next if node_key.nil?
158-
next if node_key == expected_node_key
159-
160-
raise ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}"
161-
end
162-
end
163+
return if !@resharding_state && node_keys.size == 1 && node_keys.first == expected_node_key
164+
return if @resharding_state && node_keys.size == 1
163165

164-
def handle_redirection(err)
165-
if err.message.start_with?('MOVED')
166-
node = @router.assign_redirection_node(err.message)
167-
send_transaction(node, redirect: false)
168-
elsif err.message.start_with?('ASK')
169-
node = @router.assign_asking_node(err.message)
170-
try_asking(node) ? send_transaction(node, redirect: false) : err
171-
else
172-
raise err
173-
end
166+
raise(ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}")
174167
end
175168

176169
def try_asking(node)

test/test_against_cluster_state.rb

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,28 @@ def test_the_state_of_cluster_resharding_with_transaction
8181
assert_equal(1, call_cnt)
8282
end
8383

84+
def test_the_state_of_cluster_resharding_with_transaction_and_watch
85+
call_cnt = 0
86+
87+
do_resharding_test do |keys|
88+
@client.multi(watch: keys) do |tx|
89+
call_cnt += 1
90+
keys.each do |key|
91+
tx.call('SET', key, '0')
92+
tx.call('INCR', key)
93+
end
94+
end
95+
96+
keys.each do |key|
97+
want = '1'
98+
got = @client.call('GET', key)
99+
assert_equal(want, got, "Case: GET: #{key}")
100+
end
101+
end
102+
103+
assert_equal(1, call_cnt)
104+
end
105+
84106
def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection
85107
# This test is excercising a very delicate race condition; i think the use of @client to set
86108
# the keys in do_resharding_test is actually causing the race condition not to happen, so this
@@ -147,6 +169,20 @@ def new_test_client
147169
end
148170
end
149171

172+
class Pooled < TestingWrapper
173+
include Mixin
174+
175+
private
176+
177+
def new_test_client
178+
::RedisClient.cluster(
179+
nodes: TEST_NODE_URIS,
180+
fixed_hostname: TEST_FIXED_HOSTNAME,
181+
**TEST_GENERIC_OPTIONS
182+
).new_pool(timeout: TEST_TIMEOUT_SEC, size: 2)
183+
end
184+
end
185+
150186
class ScaleReadRandom < TestingWrapper
151187
include Mixin
152188

@@ -162,6 +198,10 @@ def test_the_state_of_cluster_resharding_with_transaction
162198
skip('https://github.com/redis/redis/issues/11312')
163199
end
164200

201+
def test_the_state_of_cluster_resharding_with_transaction_and_watch
202+
skip('https://github.com/redis/redis/issues/11312')
203+
end
204+
165205
private
166206

167207
def new_test_client
@@ -190,6 +230,10 @@ def test_the_state_of_cluster_resharding_with_transaction
190230
skip('https://github.com/redis/redis/issues/11312')
191231
end
192232

233+
def test_the_state_of_cluster_resharding_with_transaction_and_watch
234+
skip('https://github.com/redis/redis/issues/11312')
235+
end
236+
193237
private
194238

195239
def new_test_client
@@ -218,6 +262,10 @@ def test_the_state_of_cluster_resharding_with_transaction
218262
skip('https://github.com/redis/redis/issues/11312')
219263
end
220264

265+
def test_the_state_of_cluster_resharding_with_transaction_and_watch
266+
skip('https://github.com/redis/redis/issues/11312')
267+
end
268+
221269
private
222270

223271
def new_test_client
@@ -230,18 +278,4 @@ def new_test_client
230278
).new_client
231279
end
232280
end
233-
234-
class Pooled < TestingWrapper
235-
include Mixin
236-
237-
private
238-
239-
def new_test_client
240-
::RedisClient.cluster(
241-
nodes: TEST_NODE_URIS,
242-
fixed_hostname: TEST_FIXED_HOSTNAME,
243-
**TEST_GENERIC_OPTIONS
244-
).new_pool(timeout: TEST_TIMEOUT_SEC, size: 2)
245-
end
246-
end
247281
end

0 commit comments

Comments
 (0)