Skip to content

Commit 4fea245

Browse files
authored
fix: several issues (#149)
1 parent 3e4ce98 commit 4fea245

File tree

6 files changed

+43
-25
lines changed

6 files changed

+43
-25
lines changed

lib/redis_client/cluster/node.rb

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ class Node
1818
MAX_STARTUP_SAMPLE = 37
1919
MAX_THREADS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', 5))
2020
IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze
21+
SLOT_OPTIMIZATION_STRING = '0' * SLOT_SIZE
2122

2223
ReloadNeeded = Class.new(::RedisClient::Error)
24+
2325
Info = Struct.new(
24-
'RedisNode',
26+
'RedisClusterNode',
2527
:id, :node_key, :role, :primary_id, :ping_sent,
2628
:pong_recv, :config_epoch, :link_state, :slots,
2729
keyword_init: true
@@ -35,23 +37,27 @@ def replica?
3537
end
3638
end
3739

38-
SLOT_OPTIMIZATION_MAX_SHARD_SIZE = 256
39-
SLOT_OPTIMIZATION_STRING = '0' * SLOT_SIZE
40-
Slot = Struct.new('RedisSlot', :slots, :primary_node_keys, keyword_init: true) do
41-
def [](slot)
42-
primary_node_keys[slots.getbyte(slot)]
40+
Slot = Struct.new('StringArray', :string, :elements, keyword_init: true) do
41+
def [](index)
42+
raise IndexError if index < 0
43+
return if index >= string.bytesize
44+
45+
elements[string.getbyte(index)]
4346
end
4447

45-
def []=(slot, primary_node_key)
46-
index = primary_node_keys.find_index(primary_node_key)
47-
if index.nil?
48-
raise(::RedisClient::Cluster::Node::ReloadNeeded, primary_node_key) if primary_node_keys.size >= SLOT_OPTIMIZATION_MAX_SHARD_SIZE
48+
def []=(index, element)
49+
raise IndexError if index < 0
50+
return if index >= string.bytesize
51+
52+
pos = elements.find_index(element) # O(N)
53+
if pos.nil?
54+
raise(RangeError, 'full of elements') if elements.size >= 256
4955

50-
index = primary_node_keys.size
51-
primary_node_keys << primary_node_key
56+
pos = elements.size
57+
elements << element
5258
end
5359

54-
slots.setbyte(slot, index)
60+
string.setbyte(index, pos)
5561
end
5662
end
5763

@@ -229,7 +235,12 @@ def any_replica_node_key(seed: nil)
229235
def update_slot(slot, node_key)
230236
return if @mutex.locked?
231237

232-
@mutex.synchronize { @slots[slot] = node_key }
238+
@mutex.synchronize do
239+
@slots[slot] = node_key
240+
rescue RangeError
241+
@slots = Array.new(SLOT_SIZE) { |i| @slots[i] }
242+
@slots[slot] = node_key
243+
end
233244
end
234245

235246
private
@@ -256,11 +267,11 @@ def build_slot_node_mappings(node_info_list)
256267
end
257268

258269
def make_array_for_slot_node_mappings(node_info_list)
259-
return Array.new(SLOT_SIZE) if node_info_list.count(&:primary?) > SLOT_OPTIMIZATION_MAX_SHARD_SIZE
270+
return Array.new(SLOT_SIZE) if node_info_list.count(&:primary?) > 256
260271

261272
::RedisClient::Cluster::Node::Slot.new(
262-
slots: String.new(SLOT_OPTIMIZATION_STRING, encoding: Encoding::BINARY, capacity: SLOT_SIZE),
263-
primary_node_keys: node_info_list.select(&:primary?).map(&:node_key)
273+
string: String.new(SLOT_OPTIMIZATION_STRING, encoding: Encoding::BINARY, capacity: SLOT_SIZE),
274+
elements: node_info_list.select(&:primary?).map(&:node_key)
264275
)
265276
end
266277

lib/redis_client/cluster/node/latency_replica.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def initialize(replications, options, pool, **kwargs)
2121
@replications.each_value { |keys| keys.sort_by! { |k| latencies.fetch(k) } }
2222
@replica_clients = select_replica_clients(@replications, @clients)
2323
@clients_for_scanning = select_clients_for_scanning(@replications, @clients)
24-
@existed_replicas = @replications.reject { |_, v| v.empty? }.values
24+
@existed_replicas = @replications.values.reject(&:empty?)
2525
end
2626

2727
def clients_for_scanning(seed: nil) # rubocop:disable Lint/UnusedMethodArgument

lib/redis_client/cluster/router.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class RedisClient
1212
class Cluster
1313
class Router
1414
ZERO_CURSOR_FOR_SCAN = '0'
15+
METHODS_FOR_BLOCKING_CMD = %i[blocking_call_v blocking_call].freeze
1516

1617
attr_reader :node
1718

@@ -91,7 +92,7 @@ def try_send(node, method, command, args, retry_count: 3, &block) # rubocop:disa
9192
raise
9293
end
9394
rescue ::RedisClient::ConnectionError => e
94-
raise if method == :blocking_call_v || (method == :blocking_call && e.is_a?(RedisClient::ReadTimeoutError))
95+
raise if METHODS_FOR_BLOCKING_CMD.include?(method) && e.is_a?(RedisClient::ReadTimeoutError)
9596
raise if retry_count <= 0
9697

9798
update_cluster_info!

test/cluster_controller.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class ClusterController
1010
DEFAULT_TIMEOUT_SEC = 5.0
1111

1212
RedisNodeInfo = Struct.new(
13-
'RedisNodeInfo',
13+
'RedisClusterNodeInfo',
1414
:id, :node_key, :flags, :role, :myself?, :primary_id, :ping_sent, :pong_recv,
1515
:config_epoch, :link_state, :slots, :client, :client_node_key,
1616
keyword_init: true

test/redis_client/cluster/test_node.rb

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ def test_make_array_for_slot_node_mappings_optimized
382382

383383
want = node_info_list.first.node_key
384384
got = @test_node.send(:make_array_for_slot_node_mappings, node_info_list)
385-
assert_instance_of(Struct::RedisSlot, got)
385+
assert_instance_of(Struct::StringArray, got)
386386
::RedisClient::Cluster::Node::SLOT_SIZE.times do |i|
387387
got[i] = want
388388
assert_equal(want, got[i], "Case: #{i}")
@@ -415,13 +415,19 @@ def test_make_array_for_slot_node_mappings_max_shard_size
415415
end
416416

417417
got = @test_node.send(:make_array_for_slot_node_mappings, node_info_list)
418-
assert_instance_of(Struct::RedisSlot, got)
418+
assert_instance_of(Struct::StringArray, got)
419419

420420
::RedisClient::Cluster::Node::SLOT_SIZE.times { |i| got[i] = node_info_list.first.node_key }
421421

422422
got[0] = 'newbie:6379'
423423
assert_equal('newbie:6379', got[0])
424-
assert_raises(::RedisClient::Cluster::Node::ReloadNeeded) { got[0] = 'zombie:6379' }
424+
assert_raises(RangeError) { got[0] = 'zombie:6379' }
425+
426+
assert_raises(IndexError) { got[-1] = 'newbie:6379' }
427+
assert_raises(IndexError) { got[-1] }
428+
429+
got[16_384] = 'newbie:6379'
430+
assert_nil(got[16_384])
425431
end
426432

427433
def test_build_replication_mappings_regular

test/redis_client/test_cluster.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def test_call
3737

3838
assert(@client.call('PING') { |r| r == 'PONG' })
3939

40-
assert_equal(2, @client.call('HSET', 'hash', { foo: 1, bar: 2 }))
40+
assert_equal(2, @client.call('HSET', 'hash', foo: 1, bar: 2))
4141
wait_for_replication
4242
assert_equal(%w[1 2], @client.call('HMGET', 'hash', %w[foo bar]))
4343
end
@@ -53,7 +53,7 @@ def test_call_once
5353

5454
assert(@client.call_once('PING') { |r| r == 'PONG' })
5555

56-
assert_equal(2, @client.call_once('HSET', 'hash', { foo: 1, bar: 2 }))
56+
assert_equal(2, @client.call_once('HSET', 'hash', foo: 1, bar: 2))
5757
wait_for_replication
5858
assert_equal(%w[1 2], @client.call_once('HMGET', 'hash', %w[foo bar]))
5959
end

0 commit comments

Comments
 (0)