Skip to content

Commit 73c7974

Browse files
authored
Add some test cases (#21)
1 parent f455e60 commit 73c7974

File tree

6 files changed

+116
-44
lines changed

6 files changed

+116
-44
lines changed

lib/redis_client/cluster.rb

Lines changed: 74 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# frozen_string_literal: true
22

3+
require 'redis_client'
34
require 'redis_client/cluster/command'
45
require 'redis_client/cluster/errors'
56
require 'redis_client/cluster/key_slot_converter'
@@ -8,6 +9,62 @@
89

910
class RedisClient
1011
class Cluster
12+
class Pipeline
13+
ReplySizeError = Class.new(::RedisClient::Error)
14+
15+
def initialize(client)
16+
@client = client
17+
@grouped = Hash.new([].freeze)
18+
@replies = []
19+
@size = 0
20+
end
21+
22+
def call(*command, **kwargs)
23+
node_key = @client.send(:find_node_key, command, primary_only: true)
24+
@grouped[node_key] += [[@size, :call, command, kwargs]]
25+
@size += 1
26+
end
27+
28+
def call_once(*command, **kwargs)
29+
node_key = @client.send(:find_node_key, command, primary_only: true)
30+
@grouped[node_key] += [[@size, :call_once, command, kwargs]]
31+
@size += 1
32+
end
33+
34+
def blocking_call(timeout, *command, **kwargs)
35+
node_key = @client.send(:find_node_key, command, primary_only: true)
36+
@grouped[node_key] += [[@size, :blocking_call, timeout, command, kwargs]]
37+
@size += 1
38+
end
39+
40+
def empty?
41+
@size.zero?
42+
end
43+
44+
# TODO: use concurrency
45+
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
46+
@grouped.each do |node_key, rows|
47+
node_key = node_key.nil? ? @client.instance_variable_get(:@node).primary_node_keys.sample : node_key
48+
replies = @client.send(:find_node, node_key).pipelined do |pipeline|
49+
rows.each do |row|
50+
case row[1]
51+
when :call then pipeline.call(*row[2], **row[3])
52+
when :call_once then pipeline.call_once(*row[2], **row[3])
53+
when :blocking_call then pipeline.blocking_call(row[2], *row[3], **row[4])
54+
else raise NotImplementedError, row[1]
55+
end
56+
end
57+
end
58+
59+
raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size
60+
61+
rows.each_with_index { |row, idx| @replies[row.first] = replies[idx] }
62+
end
63+
64+
@replies
65+
end
66+
end
67+
1168
ZERO_CURSOR_FOR_SCAN = '0'
1269

1370
def initialize(config, pool: nil, **kwargs)
@@ -22,17 +79,17 @@ def inspect
2279
"#<#{self.class.name} #{@node.node_keys.join(', ')}>"
2380
end
2481

25-
def call(*command, **kwargs, &block)
26-
send_command(:call, *command, **kwargs, &block)
82+
def call(*command, **kwargs)
83+
send_command(:call, *command, **kwargs)
2784
end
2885

29-
def call_once(*command, **kwargs, &block)
30-
send_command(:call_once, *command, **kwargs, &block)
86+
def call_once(*command, **kwargs)
87+
send_command(:call_once, *command, **kwargs)
3188
end
3289

33-
def blocking_call(timeout, *command, **kwargs, &block)
90+
def blocking_call(timeout, *command, **kwargs)
3491
node = assign_node(*command)
35-
try_send(node, :blocking_call, timeout, *command, **kwargs, &block)
92+
try_send(node, :blocking_call, timeout, *command, **kwargs)
3693
end
3794

3895
def scan(*args, **kwargs, &block)
@@ -61,14 +118,22 @@ def zscan(key, *args, **kwargs, &block)
61118
try_send(node, :zscan, key, *args, **kwargs, &block)
62119
end
63120

64-
def pipelined
121+
def mset
65122
# TODO: impl
66123
end
67124

68-
def multi
125+
def mget
69126
# TODO: impl
70127
end
71128

129+
def pipelined
130+
pipeline = ::RedisClient::Cluster::Pipeline.new(self)
131+
yield pipeline
132+
return [] if pipeline.empty? == 0
133+
134+
pipeline.execute
135+
end
136+
72137
def pubsub
73138
# TODO: impl
74139
end
@@ -82,16 +147,9 @@ def with(options = {})
82147
end
83148
alias then with
84149

85-
def id
86-
@node.flat_map(&:id).sort.join(' ')
87-
end
88-
89-
def connected?
90-
@node.any?(&:connected?)
91-
end
92-
93150
def close
94151
@node.each(&:close)
152+
nil
95153
end
96154

97155
private

lib/redis_client/cluster/errors.rb

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,5 @@ def initialize(command)
5151
super("Cluster client doesn't know which node the #{command} command should be sent to.")
5252
end
5353
end
54-
55-
# Raised when commands in pipelining include cross slot keys.
56-
class CrossSlotPipeliningError < ::RedisClient::Error
57-
def initialize(keys)
58-
super("Cluster client couldn't send pipelining to single node. "\
59-
"The commands include cross slot keys: #{ERR_ARG_NORMALIZATION.call(keys).join(',')}")
60-
end
61-
end
6254
end
6355
end

lib/redis_client/cluster/node.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ def node_keys
9595
@clients.keys.sort
9696
end
9797

98+
def primary_node_keys
99+
@clients.filter_map { |k, _| primary?(k) ? k : nil }.sort
100+
end
101+
98102
def find_by(node_key)
99103
@clients.fetch(node_key)
100104
rescue KeyError
@@ -133,7 +137,9 @@ def scale_reading_clients
133137
replica_disabled? ? primary?(node_key) : replica?(node_key)
134138
end
135139

136-
clients.values.sort_by { |client| "#{client.config.host}:#{client.config.port}" }
140+
clients.values.sort_by do |client|
141+
::RedisClient::Cluster::NodeKey.build_from_host_port(client.config.host, client.config.port)
142+
end
137143
end
138144

139145
def slot_exists?(slot)

test/redis_client/cluster/test_errors.rb

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,6 @@ def test_ambiguous_node_error
7272
assert_equal(e.message, c[:want], "Case: #{idx}")
7373
end
7474
end
75-
76-
def test_cross_slot_pipelining_error
77-
[
78-
{ keys: %w[foo bar baz], want: 'keys: foo,bar,baz' },
79-
{ keys: '', want: 'keys: ' },
80-
{ keys: nil, want: 'keys: ' }
81-
].each_with_index do |c, idx|
82-
raise ::RedisClient::Cluster::CrossSlotPipeliningError, c[:keys]
83-
rescue StandardError => e
84-
assert(e.message.end_with?(c[:want]), "Case: #{idx}")
85-
end
86-
end
8775
end
8876
end
8977
end

test/redis_client/cluster/test_node.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ def test_node_keys
169169
end
170170
end
171171

172+
def test_primary_node_keys
173+
want = @test_node_info.filter_map { |info| info[:role] == 'master' ? info[:node_key] : nil }.sort
174+
got = @test_node.primary_node_keys
175+
assert_equal(want, got)
176+
end
177+
172178
def test_find_by
173179
@test_node_info.each do |info|
174180
msg = "Case: primary only: #{info[:node_key]}"

test/redis_client/test_cluster.rb

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,18 @@ module Mixin
1212
def setup
1313
@client = new_test_client
1414
@client.call('FLUSHDB')
15-
testing_hook
15+
wait_for_replication
1616
end
1717

1818
def teardown
1919
@client.call('FLUSHDB')
20-
testing_hook
20+
wait_for_replication
2121
@client&.close
2222
end
2323

24-
def testing_hook; end
24+
def wait_for_replication
25+
@client.call('WAIT', '1', (TEST_TIMEOUT_SEC * 1000).to_i.to_s)
26+
end
2527

2628
def test_inspect
2729
assert_match(/^#<RedisClient::Cluster [0-9., :]*>$/, @client.inspect)
@@ -30,20 +32,23 @@ def test_inspect
3032
def test_call
3133
(0..9).each do |i|
3234
assert_equal('OK', @client.call('SET', "key#{i}", i), "Case: SET: key#{i}")
35+
wait_for_replication
3336
assert_equal(i.to_s, @client.call('GET', "key#{i}"), "Case: GET: key#{i}")
3437
end
3538
end
3639

3740
def test_call_once
3841
(0..9).each do |i|
3942
assert_equal('OK', @client.call_once('SET', "key#{i}", i), "Case: SET: key#{i}")
43+
wait_for_replication
4044
assert_equal(i.to_s, @client.call_once('GET', "key#{i}"), "Case: GET: key#{i}")
4145
end
4246
end
4347

4448
def test_blocking_call
4549
@client.call(*%w[RPUSH foo hello])
4650
@client.call(*%w[RPUSH foo world])
51+
wait_for_replication
4752
client_side_timeout = 0.2
4853
server_side_timeout = 0.1
4954
assert_equal(%w[foo world], @client.blocking_call(client_side_timeout, 'BRPOP', 'foo', server_side_timeout), 'Case: 1st')
@@ -56,6 +61,7 @@ def test_scan
5661
assert_raises(ArgumentError) { @client.scan }
5762

5863
(0..9).each { |i| @client.call('SET', "key#{i}", i) }
64+
wait_for_replication
5965
want = (0..9).map { |i| "key#{i}" }
6066
got = []
6167
@client.scan('COUNT', '5') { |key| got << key }
@@ -65,6 +71,7 @@ def test_scan
6571
def test_sscan
6672
(0..9).each do |i|
6773
(0..9).each { |j| @client.call('SADD', "key#{i}", "member#{j}") }
74+
wait_for_replication
6875
want = (0..9).map { |j| "member#{j}" }
6976
got = []
7077
@client.sscan("key#{i}", 'COUNT', '5') { |member| got << member }
@@ -75,6 +82,7 @@ def test_sscan
7582
def test_hscan
7683
(0..9).each do |i|
7784
(0..9).each { |j| @client.call('HSET', "key#{i}", "field#{j}", j) }
85+
wait_for_replication
7886
want = (0..9).map { |j| "field#{j}" }
7987
got = []
8088
@client.hscan("key#{i}", 'COUNT', '5') { |field| got << field }
@@ -85,12 +93,30 @@ def test_hscan
8593
def test_zscan
8694
(0..9).each do |i|
8795
(0..9).each { |j| @client.call('ZADD', "key#{i}", j, "member#{j}") }
96+
wait_for_replication
8897
want = (0..9).map { |j| "member#{j}" }
8998
got = []
9099
@client.zscan("key#{i}", 'COUNT', '5') { |member| got << member }
91100
assert_equal(want, got.sort)
92101
end
93102
end
103+
104+
def test_pipelined
105+
want = (0..9).map { 'OK' } + (1..3).to_a + %w[PONG] + (0..9).map(&:to_s) + [%w[list 2]]
106+
got = @client.pipelined do |pipeline|
107+
(0..9).each { |i| pipeline.call('SET', "string#{i}", i) }
108+
(0..2).each { |i| pipeline.call('RPUSH', 'list', i) }
109+
pipeline.call_once('PING')
110+
(0..9).each { |i| pipeline.call('GET', "string#{i}") }
111+
pipeline.blocking_call(0.2, 'BRPOP', 'list', '0.1')
112+
end
113+
114+
assert_equal(want, got)
115+
end
116+
117+
def test_close
118+
assert_nil(@client.close)
119+
end
94120
end
95121

96122
class PrimaryOnly < Minitest::Test
@@ -109,10 +135,6 @@ def new_test_client
109135
config = ::RedisClient::ClusterConfig.new(nodes: TEST_NODE_URIS, replica: true, **TEST_GENERIC_OPTIONS)
110136
::RedisClient::Cluster.new(config)
111137
end
112-
113-
def testing_hook
114-
@client.call('WAIT', '1', '300')
115-
end
116138
end
117139

118140
class Pooled < Minitest::Test

0 commit comments

Comments
 (0)