Skip to content

Commit a1ca9b3

Browse files
authored
test: verify occurrence of redirections just in case (#376)
1 parent 8cbcaf4 commit a1ca9b3

12 files changed

+283
-120
lines changed

lib/redis_client/cluster/pipeline.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,26 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco
5555
results = Array.new(commands.size)
5656
@pending_reads += size
5757
write_multi(commands)
58-
5958
redirection_indices = nil
6059
first_exception = nil
60+
6161
size.times do |index|
6262
timeout = timeouts && timeouts[index]
63-
result = read(timeout)
63+
result = read(connection_timeout(timeout))
6464
@pending_reads -= 1
65+
6566
if result.is_a?(::RedisClient::Error)
6667
result._set_command(commands[index])
68+
result._set_config(config)
69+
6770
if result.is_a?(::RedisClient::CommandError) && result.message.start_with?('MOVED', 'ASK')
6871
redirection_indices ||= []
6972
redirection_indices << index
7073
elsif exception
7174
first_exception ||= result
7275
end
7376
end
77+
7478
results[index] = result
7579
end
7680

test/command_capture_middleware.rb

Lines changed: 0 additions & 63 deletions
This file was deleted.

test/middlewares/command_capture.rb

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# frozen_string_literal: true
2+
3+
module Middlewares
4+
module CommandCapture
5+
CapturedCommand = Struct.new('CapturedCommand', :server_url, :command, :pipelined, keyword_init: true) do
6+
def inspect
7+
"#<#{self.class.name} [on #{server_url}] #{command.join(' ')} >"
8+
end
9+
end
10+
11+
# The CommandBuffer is what should be set as the :captured_commands custom option.
12+
# It needs to be threadsafe, because redis-cluster-client performs some redis operations on
13+
# multiple nodes in parallel, and in e.g. jruby it's not safe to concurrently manipulate the same array.
14+
class CommandBuffer
15+
def initialize
16+
@array = []
17+
@mutex = Mutex.new
18+
end
19+
20+
def to_a
21+
@mutex.synchronize do
22+
@array.dup
23+
end
24+
end
25+
26+
def <<(command)
27+
@mutex.synchronize do
28+
@array << command
29+
end
30+
end
31+
32+
def count(*cmd)
33+
@mutex.synchronize do
34+
next 0 if @array.empty?
35+
36+
@array.count do |e|
37+
cmd.size.times.all? { |i| cmd[i].downcase == e.command[i]&.downcase }
38+
end
39+
end
40+
end
41+
42+
def clear
43+
@mutex.synchronize do
44+
@array.clear
45+
end
46+
end
47+
end
48+
49+
def call(command, redis_config)
50+
redis_config.custom[:captured_commands] << CapturedCommand.new(
51+
server_url: ::Middlewares::CommandCapture.normalize_captured_url(redis_config.server_url),
52+
command: command,
53+
pipelined: false
54+
)
55+
super
56+
end
57+
58+
def call_pipelined(commands, redis_config)
59+
commands.map do |command|
60+
redis_config.custom[:captured_commands] << CapturedCommand.new(
61+
server_url: ::Middlewares::CommandCapture.normalize_captured_url(redis_config.server_url),
62+
command: command,
63+
pipelined: true
64+
)
65+
end
66+
super
67+
end
68+
69+
def self.normalize_captured_url(url)
70+
URI.parse(url).tap do |u|
71+
u.path = ''
72+
end.to_s
73+
end
74+
end
75+
end

test/middlewares/redirection_count.rb

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# frozen_string_literal: true
2+
3+
module Middlewares
4+
module RedirectionCount
5+
class Counter
6+
Result = Struct.new('RedirectionCountResult', :moved, :ask, keyword_init: true)
7+
8+
def initialize
9+
@moved = 0
10+
@ask = 0
11+
@mutex = Mutex.new
12+
end
13+
14+
def moved
15+
@mutex.synchronize { @moved += 1 }
16+
end
17+
18+
def ask
19+
@mutex.synchronize { @ask += 1 }
20+
end
21+
22+
def get
23+
@mutex.synchronize { Result.new(moved: @moved, ask: @ask) }
24+
end
25+
26+
def zero?
27+
@mutex.synchronize { @moved == 0 && @ask == 0 }
28+
end
29+
30+
def clear
31+
@mutex.synchronize do
32+
@moved = 0
33+
@ask = 0
34+
end
35+
end
36+
end
37+
38+
def call(cmd, cfg)
39+
super
40+
rescue ::RedisClient::CommandError => e
41+
if e.message.start_with?('MOVED')
42+
cfg.custom.fetch(:redirection_count).moved
43+
elsif e.message.start_with?('ASK')
44+
cfg.custom.fetch(:redirection_count).ask
45+
end
46+
47+
raise
48+
end
49+
50+
def call_pipelined(cmd, cfg)
51+
super
52+
rescue ::RedisClient::CommandError => e
53+
if e.message.start_with?('MOVED')
54+
cfg.custom.fetch(:redirection_count).moved
55+
elsif e.message.start_with?('ASK')
56+
cfg.custom.fetch(:redirection_count).ask
57+
end
58+
59+
raise
60+
end
61+
end
62+
end
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# frozen_string_literal: true
2+
3+
module Middlewares
4+
module RedirectionEmulation
5+
Setting = Struct.new(
6+
'RedirectionEmulationMiddlewareSetting',
7+
:slot, :to, :command, keyword_init: true
8+
)
9+
10+
def call(cmd, cfg)
11+
s = cfg.custom.fetch(:redirect)
12+
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command
13+
14+
super
15+
end
16+
17+
def call_pipelined(cmd, cfg)
18+
s = cfg.custom.fetch(:redirect)
19+
raise RedisClient::CommandError, "MOVED #{s.slot} #{s.to}" if cmd == s.command
20+
21+
super
22+
end
23+
end
24+
end

test/redirection_emulation_middleware.rb

Lines changed: 0 additions & 22 deletions
This file was deleted.

test/redis_client/cluster/test_node.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ def teardown
3636
@test_nodes&.each { |n| n&.each(&:close) }
3737
end
3838

39-
def make_node(capture_buffer: CommandCaptureMiddleware::CommandBuffer.new, pool: nil, **kwargs)
39+
def make_node(capture_buffer: ::Middlewares::CommandCapture::CommandBuffer.new, pool: nil, **kwargs)
4040
config = ::RedisClient::ClusterConfig.new(**{
4141
nodes: TEST_NODE_URIS,
4242
fixed_hostname: TEST_FIXED_HOSTNAME,
43-
middlewares: [CommandCaptureMiddleware],
43+
middlewares: [::Middlewares::CommandCapture],
4444
custom: { captured_commands: capture_buffer },
4545
**TEST_GENERIC_OPTIONS
4646
}.merge(kwargs))
@@ -596,7 +596,7 @@ def test_try_map
596596
end
597597

598598
def test_reload
599-
capture_buffer = CommandCaptureMiddleware::CommandBuffer.new
599+
capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new
600600
test_node = make_node(replica: true, capture_buffer: capture_buffer)
601601

602602
capture_buffer.clear
@@ -618,7 +618,7 @@ def test_reload
618618

619619
def test_reload_with_original_config
620620
bootstrap_node = TEST_NODE_URIS.first
621-
capture_buffer = CommandCaptureMiddleware::CommandBuffer.new
621+
capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new
622622
test_node = make_node(
623623
nodes: [bootstrap_node],
624624
replica: true,
@@ -640,7 +640,7 @@ def test_reload_with_original_config
640640
end
641641

642642
def test_reload_with_overriden_sample_size
643-
capture_buffer = CommandCaptureMiddleware::CommandBuffer.new
643+
capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new
644644
test_node = make_node(replica: true, capture_buffer: capture_buffer, max_startup_sample: 1)
645645

646646
capture_buffer.clear
@@ -661,7 +661,7 @@ def test_reload_with_overriden_sample_size
661661
end
662662

663663
def test_reload_concurrently
664-
capture_buffer = CommandCaptureMiddleware::CommandBuffer.new
664+
capture_buffer = ::Middlewares::CommandCapture::CommandBuffer.new
665665
test_node = make_node(replica: true, pool: { size: 2 }, capture_buffer: capture_buffer)
666666

667667
# Simulate refetch_node_info_list taking a long time

0 commit comments

Comments
 (0)