Skip to content

Commit fa1ee1e

Browse files
authored
Enhance performance to use thread and ensure thread safety (#28)
1 parent 8ddc111 commit fa1ee1e

File tree

5 files changed

+85
-34
lines changed

5 files changed

+85
-34
lines changed

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ gem 'redis-cluster-client'
2020
| `:fixed_hostname` | String | `nil` | required if client should connect to single endpoint with SSL |
2121

2222
Also, [the other generic options](https://github.com/redis-rb/redis-client#configuration) can be passed.
23+
But `:url`, `:host`, `:port` and `:path` are ignored because they conflict with the `:nodes` option.
2324

2425
```ruby
2526
# The following examples are Docker containers on localhost.
@@ -88,6 +89,22 @@ cli.call('CLUSTER', 'KEYSLOT', 'key3')
8889
#=> 935
8990
```
9091

92+
Also, you can use the hash tag to bias keys to the same slot.
93+
94+
```ruby
95+
cli.call('CLUSTER', 'KEYSLOT', '{key}1')
96+
#=> 12539
97+
98+
cli.call('CLUSTER', 'KEYSLOT', '{key}2')
99+
#=> 12539
100+
101+
cli.call('CLUSTER', 'KEYSLOT', '{key}3')
102+
#=> 12539
103+
104+
cli.call('MGET', '{key}1', '{key}2', '{key}3')
105+
#=> [nil, nil, nil]
106+
```
107+
91108
## ACL
92109
The cluster client internally calls `COMMAND` and `CLUSTER NODES` commands to operate correctly.
93110
So please permit it like the followings.
@@ -97,6 +114,7 @@ So please permit it like the followings.
97114
cli1 = RedisClient.cluster.new_client
98115

99116
# To create a user with permissions
117+
# Typically, user settings are configured in the config file for the server beforehand.
100118
cli1.call('ACL', 'SETUSER', 'foo', 'ON', '+COMMAND', '+CLUSTER|NODES', '+PING', '>mysecret')
101119

102120
# To initialize client with the user

lib/redis_client/cluster.rb

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ class Pipeline
1515
def initialize(client)
1616
@client = client
1717
@grouped = Hash.new([].freeze)
18-
@replies = []
1918
@size = 0
2019
end
2120

@@ -41,27 +40,32 @@ def empty?
4140
@size.zero?
4241
end
4342

44-
# TODO: use concurrency
45-
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
46-
@grouped.each do |node_key, rows|
47-
node_key = node_key.nil? ? @client.instance_variable_get(:@node).primary_node_keys.sample : node_key
48-
replies = @client.send(:find_node, node_key).pipelined do |pipeline|
49-
rows.each do |row|
50-
case row[1]
51-
when :call then pipeline.call(*row[2], **row[3])
52-
when :call_once then pipeline.call_once(*row[2], **row[3])
53-
when :blocking_call then pipeline.blocking_call(row[2], *row[3], **row[4])
54-
else raise NotImplementedError, row[1]
43+
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
44+
all_replies = []
45+
threads = @grouped.map do |k, v|
46+
Thread.new(@client, k, v) do |client, node_key, rows|
47+
Thread.pass
48+
49+
node_key = node_key.nil? ? client.instance_variable_get(:@node).primary_node_keys.sample : node_key
50+
replies = client.send(:find_node, node_key).pipelined do |pipeline|
51+
rows.each do |row|
52+
case row[1]
53+
when :call then pipeline.call(*row[2], **row[3])
54+
when :call_once then pipeline.call_once(*row[2], **row[3])
55+
when :blocking_call then pipeline.blocking_call(row[2], *row[3], **row[4])
56+
else raise NotImplementedError, row[1]
57+
end
5558
end
5659
end
57-
end
5860

59-
raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size
61+
raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size
6062

61-
rows.each_with_index { |row, idx| @replies[row.first] = replies[idx] }
63+
rows.each_with_index { |row, idx| all_replies[row.first] = replies[idx] }
64+
end
6265
end
6366

64-
@replies
67+
threads.each(&:join)
68+
all_replies
6569
end
6670
end
6771

@@ -103,6 +107,7 @@ def initialize(config, pool: nil, **kwargs)
103107
@client_kwargs = kwargs
104108
@node = fetch_cluster_info!(@config, pool: @pool, **@client_kwargs)
105109
@command = ::RedisClient::Cluster::Command.load(@node)
110+
@mutex = Mutex.new
106111
end
107112

108113
def inspect
@@ -326,9 +331,13 @@ def assign_node(*command)
326331
find_node(node_key)
327332
end
328333

329-
def find_node_key(*command, primary_only: false)
334+
def find_node_key(*command, primary_only: false) # rubocop:disable Metrics/MethodLength
330335
key = @command.extract_first_key(command)
331-
return if key.empty?
336+
if key.empty?
337+
return @node.primary_node_keys.sample if @command.should_send_to_primary?(command) || primary_only
338+
339+
return @node.replica_node_keys.sample
340+
end
332341

333342
slot = ::RedisClient::Cluster::KeySlotConverter.convert(key)
334343
return unless @node.slot_exists?(slot)
@@ -350,13 +359,15 @@ def find_node(node_key)
350359
end
351360

352361
def update_cluster_info!(node_key = nil)
353-
unless node_key.nil?
354-
host, port = ::RedisClient::Cluster::NodeKey.split(node_key)
355-
@config.add_node(host, port)
356-
end
362+
@mutex.synchronize do
363+
unless node_key.nil?
364+
host, port = ::RedisClient::Cluster::NodeKey.split(node_key)
365+
@config.add_node(host, port)
366+
end
357367

358-
@node.each(&:close)
359-
@node = fetch_cluster_info!(@config, pool: @pool, **@client_kwargs)
368+
@node.each(&:close)
369+
@node = fetch_cluster_info!(@config, pool: @pool, **@client_kwargs)
370+
end
360371
end
361372
end
362373
end

lib/redis_client/cluster/node.rb

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def initialize(options, node_info: [], pool: nil, with_replica: false, **kwargs)
7979
@slots = build_slot_node_mappings(node_info)
8080
@replications = build_replication_mappings(node_info)
8181
@clients = build_clients(options, pool: pool, **kwargs)
82+
@mutex = Mutex.new
8283
end
8384

8485
def inspect
@@ -101,6 +102,12 @@ def primary_node_keys
101102
@clients.filter_map { |k, _| primary?(k) ? k : nil }.sort
102103
end
103104

105+
def replica_node_keys
106+
return primary_node_keys if replica_disabled?
107+
108+
@clients.filter_map { |k, _| replica?(k) ? k : nil }.sort
109+
end
110+
104111
def find_by(node_key)
105112
@clients.fetch(node_key)
106113
rescue KeyError
@@ -163,7 +170,7 @@ def find_node_key_of_replica(slot)
163170
end
164171

165172
def update_slot(slot, node_key)
166-
@slots[slot] = node_key
173+
@mutex.synchronize { @slots[slot] = node_key }
167174
end
168175

169176
private
@@ -217,15 +224,17 @@ def build_clients(options, pool: nil, **kwargs)
217224
def try_map # rubocop:disable Metrics/MethodLength
218225
errors = {}
219226
results = {}
220-
221-
@clients.each do |node_key, client|
222-
reply = yield(node_key, client)
223-
results[node_key] = reply unless reply.nil?
224-
rescue ::RedisClient::CommandError => e
225-
errors[node_key] = e
226-
next
227+
threads = @clients.map do |k, v|
228+
Thread.new(k, v) do |node_key, client|
229+
Thread.pass
230+
reply = yield(node_key, client)
231+
results[node_key] = reply unless reply.nil?
232+
rescue ::RedisClient::CommandError => e
233+
errors[node_key] = e
234+
end
227235
end
228236

237+
threads.each(&:join)
229238
return results if errors.empty?
230239

231240
raise ::RedisClient::Cluster::CommandErrorCollection, errors

lib/redis_client/cluster_config.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@ class ClusterConfig
1515
VALID_SCHEMES = [DEFAULT_SCHEME, SECURE_SCHEME].freeze
1616
VALID_NODES_KEYS = %i[ssl username password host port db].freeze
1717
MERGE_CONFIG_KEYS = %i[ssl username password].freeze
18+
IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze
1819

1920
InvalidClientConfigError = Class.new(::RedisClient::Error)
2021

2122
def initialize(nodes: DEFAULT_NODES, replica: false, fixed_hostname: '', **client_config)
2223
@replica = true & replica
2324
@fixed_hostname = fixed_hostname.to_s
2425
@node_configs = build_node_configs(nodes.dup)
26+
client_config = client_config.reject { |k, _| IGNORE_GENERIC_CONFIG_KEYS.include?(k) }
2527
@client_config = merge_generic_config(client_config, @node_configs)
28+
@mutex = Mutex.new
2629
end
2730

2831
def inspect
@@ -51,11 +54,11 @@ def use_replica?
5154
end
5255

5356
def update_node(addrs)
54-
@node_configs = build_node_configs(addrs)
57+
@mutex.synchronize { @node_configs = build_node_configs(addrs) }
5558
end
5659

5760
def add_node(host, port)
58-
@node_configs << { host: host, port: port }
61+
@mutex.synchronize { @node_configs << { host: host, port: port } }
5962
end
6063

6164
def dup

test/redis_client/cluster/test_node.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,16 @@ def test_primary_node_keys
175175
assert_equal(want, got)
176176
end
177177

178+
def test_replica_node_keys
179+
want = @test_node_info.filter_map { |info| info[:role] == 'master' ? info[:node_key] : nil }.sort
180+
got = @test_node.replica_node_keys
181+
assert_equal(want, got)
182+
183+
want = @test_node_info.filter_map { |info| info[:role] == 'slave' ? info[:node_key] : nil }.sort
184+
got = @test_node_with_scale_read.replica_node_keys
185+
assert_equal(want, got)
186+
end
187+
178188
def test_find_by
179189
@test_node_info.each do |info|
180190
msg = "Case: primary only: #{info[:node_key]}"

0 commit comments

Comments
 (0)