Skip to content

Commit 8f158be

Browse files
authored
fix: handle redirection errors in pipelining (#131)
1 parent e390686 commit 8f158be

File tree

6 files changed

+172
-50
lines changed

6 files changed

+172
-50
lines changed

.rubocop.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ Metrics/MethodLength:
2626
- 'test/**/*'
2727

2828
Metrics/BlockLength:
29-
Max: 25
29+
Max: 40
3030
Exclude:
3131
- 'test/**/*'
3232

lib/redis_client/cluster/node/latency_replica.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def find_node_key_of_replica(primary_node_key, seed: nil) # rubocop:disable Lint
3434

3535
def any_replica_node_key(seed: nil)
3636
random = seed.nil? ? Random : Random.new(seed)
37-
@existed_replicas.sample(random: random).first
37+
@existed_replicas.sample(random: random)&.first
3838
end
3939

4040
private

lib/redis_client/cluster/pipeline.rb

Lines changed: 144 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,70 +2,141 @@
22

33
require 'redis_client'
44
require 'redis_client/cluster/errors'
5+
require 'redis_client/connection_mixin'
56
require 'redis_client/middlewares'
67
require 'redis_client/pooled'
78

89
class RedisClient
910
class Cluster
1011
class Pipeline
12+
class Extended < ::RedisClient::Pipeline
13+
attr_reader :outer_indices
14+
15+
def initialize(command_builder)
16+
super
17+
@outer_indices = nil
18+
end
19+
20+
def add_outer_index(index)
21+
@outer_indices ||= []
22+
@outer_indices << index
23+
end
24+
25+
def get_inner_index(outer_index)
26+
@outer_indices&.find_index(outer_index)
27+
end
28+
29+
def get_callee_method(inner_index)
30+
if @timeouts.is_a?(Array) && !@timeouts[inner_index].nil?
31+
:blocking_call_v
32+
elsif _retryable?
33+
:call_once_v
34+
else
35+
:call_v
36+
end
37+
end
38+
39+
def get_command(inner_index)
40+
@commands.is_a?(Array) ? @commands[inner_index] : nil
41+
end
42+
43+
def get_timeout(inner_index)
44+
@timeouts.is_a?(Array) ? @timeouts[inner_index] : nil
45+
end
46+
47+
def get_block(inner_index)
48+
@blocks.is_a?(Array) ? @blocks[inner_index] : nil
49+
end
50+
end
51+
52+
::RedisClient::ConnectionMixin.module_eval do
53+
def call_pipelined_aware_of_redirection(commands, timeouts) # rubocop:disable Metrics/AbcSize
54+
size = commands.size
55+
results = Array.new(commands.size)
56+
@pending_reads += size
57+
write_multi(commands)
58+
59+
redirection_indices = nil
60+
size.times do |index|
61+
timeout = timeouts && timeouts[index]
62+
result = read(timeout)
63+
@pending_reads -= 1
64+
if result.is_a?(CommandError)
65+
result._set_command(commands[index])
66+
if result.message.start_with?('MOVED', 'ASK')
67+
redirection_indices ||= []
68+
redirection_indices << index
69+
end
70+
end
71+
72+
results[index] = result
73+
end
74+
75+
return results if redirection_indices.nil?
76+
77+
err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new
78+
err.replies = results
79+
err.indices = redirection_indices
80+
raise err
81+
end
82+
end
83+
1184
ReplySizeError = Class.new(::RedisClient::Error)
85+
86+
class RedirectionNeeded < ::RedisClient::Error
87+
attr_accessor :replies, :indices
88+
end
89+
1290
MAX_THREADS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', 5))
1391

1492
def initialize(router, command_builder, seed: Random.new_seed)
1593
@router = router
1694
@command_builder = command_builder
1795
@seed = seed
18-
@pipelines = @indices = nil
96+
@pipelines = nil
1997
@size = 0
2098
end
2199

22100
def call(*args, **kwargs, &block)
23101
command = @command_builder.generate(args, kwargs)
24102
node_key = @router.find_node_key(command, seed: @seed)
25-
get_pipeline(node_key).call_v(command, &block)
26-
index_pipeline(node_key)
103+
append_pipeline(node_key).call_v(command, &block)
27104
end
28105

29106
def call_v(args, &block)
30107
command = @command_builder.generate(args)
31108
node_key = @router.find_node_key(command, seed: @seed)
32-
get_pipeline(node_key).call_v(command, &block)
33-
index_pipeline(node_key)
109+
append_pipeline(node_key).call_v(command, &block)
34110
end
35111

36112
def call_once(*args, **kwargs, &block)
37113
command = @command_builder.generate(args, kwargs)
38114
node_key = @router.find_node_key(command, seed: @seed)
39-
get_pipeline(node_key).call_once_v(command, &block)
40-
index_pipeline(node_key)
115+
append_pipeline(node_key).call_once_v(command, &block)
41116
end
42117

43118
def call_once_v(args, &block)
44119
command = @command_builder.generate(args)
45120
node_key = @router.find_node_key(command, seed: @seed)
46-
get_pipeline(node_key).call_once_v(command, &block)
47-
index_pipeline(node_key)
121+
append_pipeline(node_key).call_once_v(command, &block)
48122
end
49123

50124
def blocking_call(timeout, *args, **kwargs, &block)
51125
command = @command_builder.generate(args, kwargs)
52126
node_key = @router.find_node_key(command, seed: @seed)
53-
get_pipeline(node_key).blocking_call_v(timeout, command, &block)
54-
index_pipeline(node_key)
127+
append_pipeline(node_key).blocking_call_v(timeout, command, &block)
55128
end
56129

57130
def blocking_call_v(timeout, args, &block)
58131
command = @command_builder.generate(args)
59132
node_key = @router.find_node_key(command, seed: @seed)
60-
get_pipeline(node_key).blocking_call_v(timeout, command, &block)
61-
index_pipeline(node_key)
133+
append_pipeline(node_key).blocking_call_v(timeout, command, &block)
62134
end
63135

64136
def empty?
65137
@size.zero?
66138
end
67139

68-
# TODO: https://github.com/redis-rb/redis-cluster-client/issues/37 handle redirections
69140
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
70141
all_replies = errors = nil
71142
@pipelines&.each_slice(MAX_THREADS) do |chuncked_pipelines|
@@ -77,40 +148,47 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
77148
raise ReplySizeError, "commands: #{pl._size}, replies: #{replies.size}" if pl._size != replies.size
78149

79150
Thread.current.thread_variable_set(:replies, replies)
151+
rescue ::RedisClient::Cluster::Pipeline::RedirectionNeeded => e
152+
Thread.current.thread_variable_set(:redirection_needed, e)
80153
rescue StandardError => e
81154
Thread.current.thread_variable_set(:error, e)
82155
end
83156
end
84157

85158
threads.each do |t|
86159
t.join
160+
87161
if t.thread_variable?(:replies)
88162
all_replies ||= Array.new(@size)
89-
@indices[t.thread_variable_get(:node_key)].each_with_index { |gi, i| all_replies[gi] = t.thread_variable_get(:replies)[i] }
163+
@pipelines[t.thread_variable_get(:node_key)]
164+
.outer_indices
165+
.each_with_index { |outer, inner| all_replies[outer] = t.thread_variable_get(:replies)[inner] }
166+
elsif t.thread_variable?(:redirection_needed)
167+
all_replies ||= Array.new(@size)
168+
pipeline = @pipelines[t.thread_variable_get(:node_key)]
169+
err = t.thread_variable_get(:redirection_needed)
170+
err.indices.each { |i| err.replies[i] = handle_redirection(err.replies[i], pipeline, i) }
171+
pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = err.replies[inner] }
90172
elsif t.thread_variable?(:error)
91173
errors ||= {}
92174
errors[t.thread_variable_get(:node_key)] = t.thread_variable_get(:error)
93175
end
94176
end
95177
end
96178

97-
return all_replies if errors.nil?
179+
raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil?
98180

99-
raise ::RedisClient::Cluster::ErrorCollection, errors
181+
all_replies
100182
end
101183

102184
private
103185

104-
def get_pipeline(node_key)
186+
def append_pipeline(node_key)
105187
@pipelines ||= {}
106-
@pipelines[node_key] ||= ::RedisClient::Pipeline.new(@command_builder)
107-
end
108-
109-
def index_pipeline(node_key)
110-
@indices ||= {}
111-
@indices[node_key] ||= []
112-
@indices[node_key] << @size
188+
@pipelines[node_key] ||= ::RedisClient::Cluster::Pipeline::Extended.new(@command_builder)
189+
@pipelines[node_key].add_outer_index(@size)
113190
@size += 1
191+
@pipelines[node_key]
114192
end
115193

116194
def do_pipelining(client, pipeline)
@@ -125,12 +203,52 @@ def send_pipeline(client, pipeline)
125203
results = client.send(:ensure_connected, retryable: pipeline._retryable?) do |connection|
126204
commands = pipeline._commands
127205
::RedisClient::Middlewares.call_pipelined(commands, client.config) do
128-
connection.call_pipelined(commands, pipeline._timeouts)
206+
connection.call_pipelined_aware_of_redirection(commands, pipeline._timeouts)
129207
end
130208
end
131209

132210
pipeline._coerce!(results)
133211
end
212+
213+
def handle_redirection(err, pipeline, inner_index)
214+
return err unless err.is_a?(::RedisClient::CommandError)
215+
216+
if err.message.start_with?('MOVED')
217+
node = @router.assign_redirection_node(err.message)
218+
try_redirection(node, pipeline, inner_index)
219+
elsif err.message.start_with?('ASK')
220+
node = @router.assign_asking_node(err.message)
221+
try_asking(node) ? try_redirection(node, pipeline, inner_index) : err
222+
else
223+
err
224+
end
225+
end
226+
227+
def try_redirection(node, pipeline, inner_index)
228+
redirect_command(node, pipeline, inner_index)
229+
rescue StandardError => e
230+
e
231+
end
232+
233+
def redirect_command(node, pipeline, inner_index)
234+
method = pipeline.get_callee_method(inner_index)
235+
command = pipeline.get_command(inner_index)
236+
timeout = pipeline.get_timeout(inner_index)
237+
block = pipeline.get_block(inner_index)
238+
args = timeout.nil? ? [] : [timeout]
239+
240+
if block.nil?
241+
@router.try_send(node, method, command, args)
242+
else
243+
@router.try_send(node, method, command, args, &block)
244+
end
245+
end
246+
247+
def try_asking(node)
248+
node.call('ASKING') == 'OK'
249+
rescue StandardError
250+
false
251+
end
134252
end
135253
end
136254
end

lib/redis_client/cluster/router.rb

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def find_node_key(command, seed: nil)
172172
def find_node(node_key, retry_count: 3)
173173
@node.find_by(node_key)
174174
rescue ::RedisClient::Cluster::Node::ReloadNeeded
175-
raise ::RedieClient::Cluster::NodeMightBeDown if retry_count <= 0
175+
raise ::RedisClient::Cluster::NodeMightBeDown if retry_count <= 0
176176

177177
update_cluster_info!
178178
retry_count -= 1
@@ -183,6 +183,18 @@ def command_exists?(name)
183183
@command.exists?(name)
184184
end
185185

186+
def assign_redirection_node(err_msg)
187+
_, slot, node_key = err_msg.split
188+
slot = slot.to_i
189+
@node.update_slot(slot, node_key)
190+
find_node(node_key)
191+
end
192+
193+
def assign_asking_node(err_msg)
194+
_, _, node_key = err_msg.split
195+
find_node(node_key)
196+
end
197+
186198
private
187199

188200
def send_wait_command(method, command, args, retry_count: 3, &block)
@@ -258,18 +270,6 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics
258270
end
259271
end
260272

261-
def assign_redirection_node(err_msg)
262-
_, slot, node_key = err_msg.split
263-
slot = slot.to_i
264-
@node.update_slot(slot, node_key)
265-
find_node(node_key)
266-
end
267-
268-
def assign_asking_node(err_msg)
269-
_, _, node_key = err_msg.split
270-
find_node(node_key)
271-
end
272-
273273
def fetch_cluster_info(config, pool: nil, **kwargs)
274274
node_info = ::RedisClient::Cluster::Node.load_info(config.per_node_key, **kwargs)
275275
node_addrs = node_info.map { |info| ::RedisClient::Cluster::NodeKey.hashify(info[:node_key]) }

test/redis_client/test_cluster.rb

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,22 @@ def test_pipelined
140140
end
141141

142142
def test_pipelined_with_errors
143-
assert_raises(::RedisClient::Cluster::ErrorCollection) do
144-
@client.pipelined do |pipeline|
145-
10.times do |i|
146-
pipeline.call('SET', "string#{i}", i)
147-
pipeline.call('SET', "string#{i}", i, 'too many args')
148-
pipeline.call('SET', "string#{i}", i + 10)
149-
end
143+
got = @client.pipelined do |pipeline|
144+
10.times do |i|
145+
pipeline.call('SET', "string#{i}", i)
146+
pipeline.call('SET', "string#{i}", i, 'too many args')
147+
pipeline.call('SET', "string#{i}", i + 10)
150148
end
151149
end
152150

151+
assert_equal(30, got.size)
152+
153+
10.times do |i|
154+
assert_equal('OK', got[(3 * i) + 0])
155+
assert_instance_of(::RedisClient::CommandError, got[(3 * i) + 1])
156+
assert_equal('OK', got[(3 * i) + 2])
157+
end
158+
153159
wait_for_replication
154160

155161
10.times { |i| assert_equal((i + 10).to_s, @client.call('GET', "string#{i}")) }

test/test_against_cluster_state.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ def test_the_state_of_cluster_resharding
4747
end
4848

4949
def test_the_state_of_cluster_resharding_with_pipelining
50-
skip('TODO: https://github.com/redis-rb/redis-cluster-client/issues/37')
51-
5250
do_resharding_test do |keys|
5351
values = @client.pipelined do |pipeline|
5452
keys.each { |key| pipeline.call('GET', key) }

0 commit comments

Comments
 (0)