diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 2696b3d..55cd483 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -64,7 +64,7 @@ def blocking_call_v(timeout, command, &block) def scan(*args, **kwargs, &block) return to_enum(__callee__, *args, **kwargs) unless block_given? - command = @command_builder.generate(['SCAN', ZERO_CURSOR_FOR_SCAN] + args, kwargs) + command = @command_builder.generate(['scan', ZERO_CURSOR_FOR_SCAN] + args, kwargs) seed = Random.new_seed loop do cursor, keys = router.scan(command, seed: seed) @@ -77,21 +77,21 @@ def scan(*args, **kwargs, &block) def sscan(key, *args, **kwargs, &block) return to_enum(__callee__, key, *args, **kwargs) unless block_given? - command = @command_builder.generate(['SSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs) + command = @command_builder.generate(['sscan', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs) router.scan_single_key(command, arity: 1, &block) end def hscan(key, *args, **kwargs, &block) return to_enum(__callee__, key, *args, **kwargs) unless block_given? - command = @command_builder.generate(['HSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs) + command = @command_builder.generate(['hscan', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs) router.scan_single_key(command, arity: 2, &block) end def zscan(key, *args, **kwargs, &block) return to_enum(__callee__, key, *args, **kwargs) unless block_given? - command = @command_builder.generate(['ZSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs) + command = @command_builder.generate(['zscan', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs) router.scan_single_key(command, arity: 2, &block) end diff --git a/lib/redis_client/cluster/command.rb b/lib/redis_client/cluster/command.rb index f2dc979..3c315bd 100644 --- a/lib/redis_client/cluster/command.rb +++ b/lib/redis_client/cluster/command.rb @@ -90,36 +90,38 @@ def find_command_info(name) end def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/PerceivedComplexity - if command.first.casecmp('get').zero? - find_command_info(command.first)&.first_key_position.to_i - elsif command.first.casecmp('mget').zero? - find_command_info(command.first)&.first_key_position.to_i - elsif command.first.casecmp('set').zero? - find_command_info(command.first)&.first_key_position.to_i - elsif command.first.casecmp('mset').zero? - find_command_info(command.first)&.first_key_position.to_i - elsif command.first.casecmp('del').zero? - find_command_info(command.first)&.first_key_position.to_i - elsif command.first.casecmp('eval').zero? + cmd_name = command.first + + if cmd_name.casecmp('get').zero? + find_command_info(cmd_name)&.first_key_position.to_i + elsif cmd_name.casecmp('mget').zero? + find_command_info(cmd_name)&.first_key_position.to_i + elsif cmd_name.casecmp('set').zero? + find_command_info(cmd_name)&.first_key_position.to_i + elsif cmd_name.casecmp('mset').zero? + find_command_info(cmd_name)&.first_key_position.to_i + elsif cmd_name.casecmp('del').zero? + find_command_info(cmd_name)&.first_key_position.to_i + elsif cmd_name.casecmp('eval').zero? 3 - elsif command.first.casecmp('evalsha').zero? + elsif cmd_name.casecmp('evalsha').zero? 3 - elsif command.first.casecmp('zinterstore').zero? + elsif cmd_name.casecmp('zinterstore').zero? 3 - elsif command.first.casecmp('zunionstore').zero? + elsif cmd_name.casecmp('zunionstore').zero? 3 - elsif command.first.casecmp('object').zero? + elsif cmd_name.casecmp('object').zero? 2 - elsif command.first.casecmp('memory').zero? + elsif cmd_name.casecmp('memory').zero? command[1].to_s.casecmp('usage').zero? ? 2 : 0 - elsif command.first.casecmp('migrate').zero? + elsif cmd_name.casecmp('migrate').zero? command[3].empty? ? determine_optional_key_position(command, 'keys') : 3 - elsif command.first.casecmp('xread').zero? + elsif cmd_name.casecmp('xread').zero? determine_optional_key_position(command, 'streams') - elsif command.first.casecmp('xreadgroup').zero? + elsif cmd_name.casecmp('xreadgroup').zero? determine_optional_key_position(command, 'streams') else - find_command_info(command.first)&.first_key_position.to_i + find_command_info(cmd_name)&.first_key_position.to_i end end diff --git a/lib/redis_client/cluster/node.rb b/lib/redis_client/cluster/node.rb index b60f5b7..c4ff2d4 100644 --- a/lib/redis_client/cluster/node.rb +++ b/lib/redis_client/cluster/node.rb @@ -7,6 +7,7 @@ require 'redis_client/cluster/node/random_replica' require 'redis_client/cluster/node/random_replica_or_primary' require 'redis_client/cluster/node/latency_replica' +require 'redis_client/cluster/node_key' class RedisClient class Cluster @@ -43,6 +44,10 @@ def primary? def replica? role == 'slave' end + + def serialize(str) + str << id << node_key << role << primary_id << config_epoch + end end class CharArray @@ -338,9 +343,7 @@ def refetch_node_info_list(startup_clients) # rubocop:disable Metrics/AbcSize, M grouped = node_info_list.compact.group_by do |info_list| info_list.sort_by!(&:id) - info_list.each_with_object(String.new(capacity: 128 * info_list.size)) do |e, a| - a << e.id << e.node_key << e.role << e.primary_id << e.config_epoch - end + info_list.each_with_object(String.new(capacity: 128 * info_list.size)) { |e, a| e.serialize(a) } end grouped.max_by { |_, v| v.size }[1].first @@ -375,6 +378,48 @@ def parse_cluster_node_reply(reply) # rubocop:disable Metrics/AbcSize, Metrics/C end end + def parse_cluster_slots_reply(reply) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + reply.group_by { |e| e[2][2] }.each_with_object([]) do |(primary_id, group), acc| + slots = group.map { |e| e[0, 2] }.freeze + + group.first[2..].each do |arr| + ip = arr[0] + next if ip.nil? || ip.empty? || ip == '?' + + id = arr[2] + role = id == primary_id ? 'master' : 'slave' + acc << ::RedisClient::Cluster::Node::Info.new( + id: id, + node_key: NodeKey.build_from_host_port(ip, arr[1]), + role: role, + primary_id: role == 'master' ? nil : primary_id, + slots: role == 'master' ? slots : EMPTY_ARRAY + ) + end + end.freeze + end + + def parse_cluster_shards_reply(reply) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + reply.each_with_object([]) do |shard, acc| + nodes = shard.fetch('nodes') + primary_id = nodes.find { |n| n.fetch('role') == 'master' }.fetch('id') + + nodes.each do |node| + ip = node.fetch('ip') + next if node.fetch('health') != 'online' || ip.nil? || ip.empty? || ip == '?' + + role = node.fetch('role') + acc << ::RedisClient::Cluster::Node::Info.new( + id: node.fetch('id'), + node_key: NodeKey.build_from_host_port(ip, node['port'] || node['tls-port']), + role: role == 'master' ? role : 'slave', + primary_id: role == 'master' ? nil : primary_id, + slots: role == 'master' ? shard.fetch('slots').each_slice(2).to_a.freeze : EMPTY_ARRAY + ) + end + end.freeze + end + # As redirection node_key is dependent on `cluster-preferred-endpoint-type` config, # node_key should use hostname if present in CLUSTER NODES output. # diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 5e0e76d..c299536 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -38,6 +38,7 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs) def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/MethodLength cmd_name = command.first + if cmd_name.casecmp('get').zero? node = assign_node(command) try_send(node, method, command, args, &block) diff --git a/lib/redis_client/cluster_config.rb b/lib/redis_client/cluster_config.rb index 7bcf767..2a6462a 100644 --- a/lib/redis_client/cluster_config.rb +++ b/lib/redis_client/cluster_config.rb @@ -47,7 +47,6 @@ def initialize( # rubocop:disable Metrics/ParameterLists max_startup_sample: MAX_STARTUP_SAMPLE, **client_config ) - @replica = true & replica @replica_affinity = replica_affinity.to_s.to_sym @fixed_hostname = fixed_hostname.to_s diff --git a/test/cluster_controller.rb b/test/cluster_controller.rb index 860efbd..38acaca 100644 --- a/test/cluster_controller.rb +++ b/test/cluster_controller.rb @@ -47,7 +47,6 @@ def initialize(node_addrs, replica_size: DEFAULT_REPLICA_SIZE, state_check_attempts: DEFAULT_MAX_ATTEMPTS, **kwargs) - @shard_size = shard_size @replica_size = replica_size @number_of_replicas = @replica_size * @shard_size diff --git a/test/ips_slot_node_mapping.rb b/test/ips_slot_node_mapping.rb index 5ecacb0..6536f07 100644 --- a/test/ips_slot_node_mapping.rb +++ b/test/ips_slot_node_mapping.rb @@ -12,11 +12,19 @@ module IpsSlotNodeMapping def run ca = ::RedisClient::Cluster::Node::CharArray.new(SIZE, ELEMENTS) arr = Array.new(SIZE) + hs = {} print_letter('Mappings between slots and nodes') fullfill(ca) fullfill(arr) - bench({ ca.class.name.split('::').last => ca, arr.class.name => arr }) + fullfill(hs) + bench( + { + ca.class.name.split('::').last => ca, + arr.class.name => arr, + hs.class.name => hs + } + ) end def print_letter(title) diff --git a/test/redis_client/cluster/test_node.rb b/test/redis_client/cluster/test_node.rb index a91ab66..5db0a6e 100644 --- a/test/redis_client/cluster/test_node.rb +++ b/test/redis_client/cluster/test_node.rb @@ -281,6 +281,141 @@ def test_parse_cluster_node_reply_failure_flags assert_empty(@test_node.send(:parse_cluster_node_reply, info)) end + def test_parse_cluster_slots_reply + reply = [ + [ + 0, + 5460, + ['10.10.1.6', 6379, '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb', {}], + ['10.10.1.5', 6379, 'b60c0672f257c01d76f27eacded14b6e6f4f990e', {}] + ], + [ + 5461, + 10_922, + ['10.10.1.4', 6379, '712b9a6656b38a5e002244903853fccb4d1eef4b', {}], + ['10.10.1.7', 6379, '7038691c545e7caa9147030ecfb4acf1eaad0552', {}] + ], + [ + 10_923, + 16_383, + ['10.10.1.8', 6379, 'ba85d0807043bb40f72bb4e1e8352b029c6e0082', {}], + ['10.10.1.3', 6379, 'f2f36b472b187c577ccd93dd296e9045f473ae7a', {}] + ] + ] + + want = [ + { id: '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb', node_key: '10.10.1.6:6379', role: 'master', primary_id: nil, + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[0, 5460]] }, + { id: 'b60c0672f257c01d76f27eacded14b6e6f4f990e', node_key: '10.10.1.5:6379', role: 'slave', primary_id: '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb', + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] }, + { id: '712b9a6656b38a5e002244903853fccb4d1eef4b', node_key: '10.10.1.4:6379', role: 'master', primary_id: nil, + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[5461, 10_922]] }, + { id: '7038691c545e7caa9147030ecfb4acf1eaad0552', node_key: '10.10.1.7:6379', role: 'slave', primary_id: '712b9a6656b38a5e002244903853fccb4d1eef4b', + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] }, + { id: 'ba85d0807043bb40f72bb4e1e8352b029c6e0082', node_key: '10.10.1.8:6379', role: 'master', primary_id: nil, + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[10_923, 16_383]] }, + { id: 'f2f36b472b187c577ccd93dd296e9045f473ae7a', node_key: '10.10.1.3:6379', role: 'slave', primary_id: 'ba85d0807043bb40f72bb4e1e8352b029c6e0082', + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] } + ] + + got = @test_node.send(:parse_cluster_slots_reply, reply) + + assert_equal(want.sort_by { |e| e.fetch(:id) }, got.sort_by(&:id).map(&:to_h)) + end + + def test_parse_cluster_shards_reply + reply = [ + { + 'slots' => [5461, 10_922], + 'nodes' => [ + { + 'id' => '712b9a6656b38a5e002244903853fccb4d1eef4b', + 'port' => 6379, + 'ip' => '10.10.1.4', + 'endpoint' => '10.10.1.4', + 'role' => 'master', + 'replication-offset' => 98, + 'health' => 'online' + }, + { + 'id' => '7038691c545e7caa9147030ecfb4acf1eaad0552', + 'port' => 6379, + 'ip' => '10.10.1.7', + 'endpoint' => '10.10.1.7', + 'role' => 'replica', + 'replication-offset' => 98, + 'health' => 'online' + } + ] + }, + { + 'slots' => [10_923, 16_383], + 'nodes' => [ + { + 'id' => 'ba85d0807043bb40f72bb4e1e8352b029c6e0082', + 'port' => 6379, + 'ip' => '10.10.1.8', + 'endpoint' => '10.10.1.8', + 'role' => 'master', + 'replication-offset' => 98, + 'health' => 'online' + }, + { + 'id' => 'f2f36b472b187c577ccd93dd296e9045f473ae7a', + 'port' => 6379, + 'ip' => '10.10.1.3', + 'endpoint' => '10.10.1.3', + 'role' => 'replica', + 'replication-offset' => 98, + 'health' => 'online' + } + ] + }, + { + 'slots' => [0, 5460], + 'nodes' => [ + { + 'id' => '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb', + 'port' => 6379, + 'ip' => '10.10.1.6', + 'endpoint' => '10.10.1.6', + 'role' => 'master', + 'replication-offset' => 98, + 'health' => 'online' + }, + { + 'id' => 'b60c0672f257c01d76f27eacded14b6e6f4f990e', + 'port' => 6379, + 'ip' => '10.10.1.5', + 'endpoint' => '10.10.1.5', + 'role' => 'replica', + 'replication-offset' => 98, + 'health' => 'online' + } + ] + } + ] + + want = [ + { id: '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb', node_key: '10.10.1.6:6379', role: 'master', primary_id: nil, + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[0, 5460]] }, + { id: 'b60c0672f257c01d76f27eacded14b6e6f4f990e', node_key: '10.10.1.5:6379', role: 'slave', primary_id: '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb', + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] }, + { id: '712b9a6656b38a5e002244903853fccb4d1eef4b', node_key: '10.10.1.4:6379', role: 'master', primary_id: nil, + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[5461, 10_922]] }, + { id: '7038691c545e7caa9147030ecfb4acf1eaad0552', node_key: '10.10.1.7:6379', role: 'slave', primary_id: '712b9a6656b38a5e002244903853fccb4d1eef4b', + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] }, + { id: 'ba85d0807043bb40f72bb4e1e8352b029c6e0082', node_key: '10.10.1.8:6379', role: 'master', primary_id: nil, + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[10_923, 16_383]] }, + { id: 'f2f36b472b187c577ccd93dd296e9045f473ae7a', node_key: '10.10.1.3:6379', role: 'slave', primary_id: 'ba85d0807043bb40f72bb4e1e8352b029c6e0082', + ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] } + ] + + got = @test_node.send(:parse_cluster_shards_reply, reply) + + assert_equal(want.sort_by { |e| e.fetch(:id) }, got.sort_by(&:id).map(&:to_h)) + end + def test_inspect assert_match(/^#$/, @test_node.inspect) end