Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def blocking_call_v(timeout, command, &block)
def scan(*args, **kwargs, &block)
return to_enum(__callee__, *args, **kwargs) unless block_given?

command = @command_builder.generate(['SCAN', ZERO_CURSOR_FOR_SCAN] + args, kwargs)
command = @command_builder.generate(['scan', ZERO_CURSOR_FOR_SCAN] + args, kwargs)
seed = Random.new_seed
loop do
cursor, keys = router.scan(command, seed: seed)
Expand All @@ -77,21 +77,21 @@ def scan(*args, **kwargs, &block)
def sscan(key, *args, **kwargs, &block)
return to_enum(__callee__, key, *args, **kwargs) unless block_given?

command = @command_builder.generate(['SSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
command = @command_builder.generate(['sscan', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
router.scan_single_key(command, arity: 1, &block)
end

def hscan(key, *args, **kwargs, &block)
return to_enum(__callee__, key, *args, **kwargs) unless block_given?

command = @command_builder.generate(['HSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
command = @command_builder.generate(['hscan', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
router.scan_single_key(command, arity: 2, &block)
end

def zscan(key, *args, **kwargs, &block)
return to_enum(__callee__, key, *args, **kwargs) unless block_given?

command = @command_builder.generate(['ZSCAN', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
command = @command_builder.generate(['zscan', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
router.scan_single_key(command, arity: 2, &block)
end

Expand Down
42 changes: 22 additions & 20 deletions lib/redis_client/cluster/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,36 +90,38 @@ def find_command_info(name)
end

def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/PerceivedComplexity
if command.first.casecmp('get').zero?
find_command_info(command.first)&.first_key_position.to_i
elsif command.first.casecmp('mget').zero?
find_command_info(command.first)&.first_key_position.to_i
elsif command.first.casecmp('set').zero?
find_command_info(command.first)&.first_key_position.to_i
elsif command.first.casecmp('mset').zero?
find_command_info(command.first)&.first_key_position.to_i
elsif command.first.casecmp('del').zero?
find_command_info(command.first)&.first_key_position.to_i
elsif command.first.casecmp('eval').zero?
cmd_name = command.first

if cmd_name.casecmp('get').zero?
find_command_info(cmd_name)&.first_key_position.to_i
elsif cmd_name.casecmp('mget').zero?
find_command_info(cmd_name)&.first_key_position.to_i
elsif cmd_name.casecmp('set').zero?
find_command_info(cmd_name)&.first_key_position.to_i
elsif cmd_name.casecmp('mset').zero?
find_command_info(cmd_name)&.first_key_position.to_i
elsif cmd_name.casecmp('del').zero?
find_command_info(cmd_name)&.first_key_position.to_i
elsif cmd_name.casecmp('eval').zero?
3
elsif command.first.casecmp('evalsha').zero?
elsif cmd_name.casecmp('evalsha').zero?
3
elsif command.first.casecmp('zinterstore').zero?
elsif cmd_name.casecmp('zinterstore').zero?
3
elsif command.first.casecmp('zunionstore').zero?
elsif cmd_name.casecmp('zunionstore').zero?
3
elsif command.first.casecmp('object').zero?
elsif cmd_name.casecmp('object').zero?
2
elsif command.first.casecmp('memory').zero?
elsif cmd_name.casecmp('memory').zero?
command[1].to_s.casecmp('usage').zero? ? 2 : 0
elsif command.first.casecmp('migrate').zero?
elsif cmd_name.casecmp('migrate').zero?
command[3].empty? ? determine_optional_key_position(command, 'keys') : 3
elsif command.first.casecmp('xread').zero?
elsif cmd_name.casecmp('xread').zero?
determine_optional_key_position(command, 'streams')
elsif command.first.casecmp('xreadgroup').zero?
elsif cmd_name.casecmp('xreadgroup').zero?
determine_optional_key_position(command, 'streams')
else
find_command_info(command.first)&.first_key_position.to_i
find_command_info(cmd_name)&.first_key_position.to_i
end
end

Expand Down
51 changes: 48 additions & 3 deletions lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'redis_client/cluster/node/random_replica'
require 'redis_client/cluster/node/random_replica_or_primary'
require 'redis_client/cluster/node/latency_replica'
require 'redis_client/cluster/node_key'

class RedisClient
class Cluster
Expand Down Expand Up @@ -43,6 +44,10 @@ def primary?
def replica?
role == 'slave'
end

def serialize(str)
str << id << node_key << role << primary_id << config_epoch
end
end

class CharArray
Expand Down Expand Up @@ -338,9 +343,7 @@ def refetch_node_info_list(startup_clients) # rubocop:disable Metrics/AbcSize, M

grouped = node_info_list.compact.group_by do |info_list|
info_list.sort_by!(&:id)
info_list.each_with_object(String.new(capacity: 128 * info_list.size)) do |e, a|
a << e.id << e.node_key << e.role << e.primary_id << e.config_epoch
end
info_list.each_with_object(String.new(capacity: 128 * info_list.size)) { |e, a| e.serialize(a) }
end

grouped.max_by { |_, v| v.size }[1].first
Expand Down Expand Up @@ -375,6 +378,48 @@ def parse_cluster_node_reply(reply) # rubocop:disable Metrics/AbcSize, Metrics/C
end
end

def parse_cluster_slots_reply(reply) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
reply.group_by { |e| e[2][2] }.each_with_object([]) do |(primary_id, group), acc|
slots = group.map { |e| e[0, 2] }.freeze

group.first[2..].each do |arr|
ip = arr[0]
next if ip.nil? || ip.empty? || ip == '?'

id = arr[2]
role = id == primary_id ? 'master' : 'slave'
acc << ::RedisClient::Cluster::Node::Info.new(
id: id,
node_key: NodeKey.build_from_host_port(ip, arr[1]),
role: role,
primary_id: role == 'master' ? nil : primary_id,
slots: role == 'master' ? slots : EMPTY_ARRAY
)
end
end.freeze
end

def parse_cluster_shards_reply(reply) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
reply.each_with_object([]) do |shard, acc|
nodes = shard.fetch('nodes')
primary_id = nodes.find { |n| n.fetch('role') == 'master' }.fetch('id')

nodes.each do |node|
ip = node.fetch('ip')
next if node.fetch('health') != 'online' || ip.nil? || ip.empty? || ip == '?'

role = node.fetch('role')
acc << ::RedisClient::Cluster::Node::Info.new(
id: node.fetch('id'),
node_key: NodeKey.build_from_host_port(ip, node['port'] || node['tls-port']),
role: role == 'master' ? role : 'slave',
primary_id: role == 'master' ? nil : primary_id,
slots: role == 'master' ? shard.fetch('slots').each_slice(2).to_a.freeze : EMPTY_ARRAY
)
end
end.freeze
end

# As redirection node_key is dependent on `cluster-preferred-endpoint-type` config,
# node_key should use hostname if present in CLUSTER NODES output.
#
Expand Down
1 change: 1 addition & 0 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs)

def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/MethodLength
cmd_name = command.first

if cmd_name.casecmp('get').zero?
node = assign_node(command)
try_send(node, method, command, args, &block)
Expand Down
1 change: 0 additions & 1 deletion lib/redis_client/cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def initialize( # rubocop:disable Metrics/ParameterLists
max_startup_sample: MAX_STARTUP_SAMPLE,
**client_config
)

@replica = true & replica
@replica_affinity = replica_affinity.to_s.to_sym
@fixed_hostname = fixed_hostname.to_s
Expand Down
1 change: 0 additions & 1 deletion test/cluster_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def initialize(node_addrs,
replica_size: DEFAULT_REPLICA_SIZE,
state_check_attempts: DEFAULT_MAX_ATTEMPTS,
**kwargs)

@shard_size = shard_size
@replica_size = replica_size
@number_of_replicas = @replica_size * @shard_size
Expand Down
10 changes: 9 additions & 1 deletion test/ips_slot_node_mapping.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,19 @@ module IpsSlotNodeMapping
def run
ca = ::RedisClient::Cluster::Node::CharArray.new(SIZE, ELEMENTS)
arr = Array.new(SIZE)
hs = {}

print_letter('Mappings between slots and nodes')
fullfill(ca)
fullfill(arr)
bench({ ca.class.name.split('::').last => ca, arr.class.name => arr })
fullfill(hs)
bench(
{
ca.class.name.split('::').last => ca,
arr.class.name => arr,
hs.class.name => hs
}
)
end

def print_letter(title)
Expand Down
135 changes: 135 additions & 0 deletions test/redis_client/cluster/test_node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,141 @@ def test_parse_cluster_node_reply_failure_flags
assert_empty(@test_node.send(:parse_cluster_node_reply, info))
end

def test_parse_cluster_slots_reply
reply = [
[
0,
5460,
['10.10.1.6', 6379, '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb', {}],
['10.10.1.5', 6379, 'b60c0672f257c01d76f27eacded14b6e6f4f990e', {}]
],
[
5461,
10_922,
['10.10.1.4', 6379, '712b9a6656b38a5e002244903853fccb4d1eef4b', {}],
['10.10.1.7', 6379, '7038691c545e7caa9147030ecfb4acf1eaad0552', {}]
],
[
10_923,
16_383,
['10.10.1.8', 6379, 'ba85d0807043bb40f72bb4e1e8352b029c6e0082', {}],
['10.10.1.3', 6379, 'f2f36b472b187c577ccd93dd296e9045f473ae7a', {}]
]
]

want = [
{ id: '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb', node_key: '10.10.1.6:6379', role: 'master', primary_id: nil,
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[0, 5460]] },
{ id: 'b60c0672f257c01d76f27eacded14b6e6f4f990e', node_key: '10.10.1.5:6379', role: 'slave', primary_id: '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb',
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] },
{ id: '712b9a6656b38a5e002244903853fccb4d1eef4b', node_key: '10.10.1.4:6379', role: 'master', primary_id: nil,
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[5461, 10_922]] },
{ id: '7038691c545e7caa9147030ecfb4acf1eaad0552', node_key: '10.10.1.7:6379', role: 'slave', primary_id: '712b9a6656b38a5e002244903853fccb4d1eef4b',
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] },
{ id: 'ba85d0807043bb40f72bb4e1e8352b029c6e0082', node_key: '10.10.1.8:6379', role: 'master', primary_id: nil,
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[10_923, 16_383]] },
{ id: 'f2f36b472b187c577ccd93dd296e9045f473ae7a', node_key: '10.10.1.3:6379', role: 'slave', primary_id: 'ba85d0807043bb40f72bb4e1e8352b029c6e0082',
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] }
]

got = @test_node.send(:parse_cluster_slots_reply, reply)

assert_equal(want.sort_by { |e| e.fetch(:id) }, got.sort_by(&:id).map(&:to_h))
end

def test_parse_cluster_shards_reply
reply = [
{
'slots' => [5461, 10_922],
'nodes' => [
{
'id' => '712b9a6656b38a5e002244903853fccb4d1eef4b',
'port' => 6379,
'ip' => '10.10.1.4',
'endpoint' => '10.10.1.4',
'role' => 'master',
'replication-offset' => 98,
'health' => 'online'
},
{
'id' => '7038691c545e7caa9147030ecfb4acf1eaad0552',
'port' => 6379,
'ip' => '10.10.1.7',
'endpoint' => '10.10.1.7',
'role' => 'replica',
'replication-offset' => 98,
'health' => 'online'
}
]
},
{
'slots' => [10_923, 16_383],
'nodes' => [
{
'id' => 'ba85d0807043bb40f72bb4e1e8352b029c6e0082',
'port' => 6379,
'ip' => '10.10.1.8',
'endpoint' => '10.10.1.8',
'role' => 'master',
'replication-offset' => 98,
'health' => 'online'
},
{
'id' => 'f2f36b472b187c577ccd93dd296e9045f473ae7a',
'port' => 6379,
'ip' => '10.10.1.3',
'endpoint' => '10.10.1.3',
'role' => 'replica',
'replication-offset' => 98,
'health' => 'online'
}
]
},
{
'slots' => [0, 5460],
'nodes' => [
{
'id' => '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb',
'port' => 6379,
'ip' => '10.10.1.6',
'endpoint' => '10.10.1.6',
'role' => 'master',
'replication-offset' => 98,
'health' => 'online'
},
{
'id' => 'b60c0672f257c01d76f27eacded14b6e6f4f990e',
'port' => 6379,
'ip' => '10.10.1.5',
'endpoint' => '10.10.1.5',
'role' => 'replica',
'replication-offset' => 98,
'health' => 'online'
}
]
}
]

want = [
{ id: '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb', node_key: '10.10.1.6:6379', role: 'master', primary_id: nil,
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[0, 5460]] },
{ id: 'b60c0672f257c01d76f27eacded14b6e6f4f990e', node_key: '10.10.1.5:6379', role: 'slave', primary_id: '00c0d00f2a5eda22b2c8a8929ba27b454c4400fb',
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] },
{ id: '712b9a6656b38a5e002244903853fccb4d1eef4b', node_key: '10.10.1.4:6379', role: 'master', primary_id: nil,
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[5461, 10_922]] },
{ id: '7038691c545e7caa9147030ecfb4acf1eaad0552', node_key: '10.10.1.7:6379', role: 'slave', primary_id: '712b9a6656b38a5e002244903853fccb4d1eef4b',
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] },
{ id: 'ba85d0807043bb40f72bb4e1e8352b029c6e0082', node_key: '10.10.1.8:6379', role: 'master', primary_id: nil,
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [[10_923, 16_383]] },
{ id: 'f2f36b472b187c577ccd93dd296e9045f473ae7a', node_key: '10.10.1.3:6379', role: 'slave', primary_id: 'ba85d0807043bb40f72bb4e1e8352b029c6e0082',
ping_sent: nil, pong_recv: nil, config_epoch: nil, link_state: nil, slots: [] }
]

got = @test_node.send(:parse_cluster_shards_reply, reply)

assert_equal(want.sort_by { |e| e.fetch(:id) }, got.sort_by(&:id).map(&:to_h))
end

def test_inspect
assert_match(/^#<RedisClient::Cluster::Node [0-9., :]*>$/, @test_node.inspect)
end
Expand Down
Loading