Skip to content

Commit 6bc083a

Browse files
committed
perf: Reduce consumption of threads
1 parent f71a99e commit 6bc083a

File tree

2 files changed

+42
-31
lines changed

2 files changed

+42
-31
lines changed

lib/redis_client/cluster/node.rb

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class Node
1313
MIN_SLOT = 0
1414
MAX_SLOT = SLOT_SIZE - 1
1515
MAX_STARTUP_SAMPLE = 37
16+
MAX_THREADS = Integer(ENV.fetch('MAX_THREADS', 5))
1617
IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze
1718

1819
ReloadNeeded = Class.new(::RedisClient::Error)
@@ -39,18 +40,21 @@ def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/Cycl
3940
errors = Array.new(startup_size)
4041
startup_options = options.to_a.sample(MAX_STARTUP_SAMPLE).to_h
4142
startup_nodes = ::RedisClient::Cluster::Node.new(startup_options, **kwargs)
42-
threads = startup_nodes.each_with_index.map do |raw_client, idx|
43-
Thread.new(raw_client, idx) do |cli, i|
44-
Thread.pass
45-
reply = cli.call('CLUSTER', 'NODES')
46-
node_info_list[i] = parse_node_info(reply)
47-
rescue StandardError => e
48-
errors[i] = e
49-
ensure
50-
cli&.close
43+
startup_nodes.each_slice(MAX_THREADS * 2).with_index do |chuncked_startup_nodes, chuncked_idx|
44+
threads = chuncked_startup_nodes.each_with_index.map do |raw_client, idx|
45+
Thread.new(raw_client, (MAX_THREADS * 2 * chuncked_idx) + idx) do |cli, i|
46+
Thread.pass
47+
reply = cli.call('CLUSTER', 'NODES')
48+
node_info_list[i] = parse_node_info(reply)
49+
rescue StandardError => e
50+
errors[i] = e
51+
ensure
52+
cli&.close
53+
end
5154
end
55+
threads.each(&:join)
5256
end
53-
threads.each(&:join)
57+
5458
raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.all?(&:nil?)
5559

5660
grouped = node_info_list.compact.group_by do |rows|
@@ -267,17 +271,20 @@ def build_clients(options, pool: nil, **kwargs)
267271
def try_map # rubocop:disable Metrics/MethodLength
268272
results = {}
269273
errors = {}
270-
threads = @clients.map do |k, v|
271-
Thread.new(k, v) do |node_key, client|
272-
Thread.pass
273-
reply = yield(node_key, client)
274-
results[node_key] = reply unless reply.nil?
275-
rescue StandardError => e
276-
errors[node_key] = e
274+
@clients.each_slice(MAX_THREADS * 2) do |chuncked_clients|
275+
threads = chuncked_clients.map do |k, v|
276+
Thread.new(k, v) do |node_key, client|
277+
Thread.pass
278+
reply = yield(node_key, client)
279+
results[node_key] = reply unless reply.nil?
280+
rescue StandardError => e
281+
errors[node_key] = e
282+
end
277283
end
284+
285+
threads.each(&:join)
278286
end
279287

280-
threads.each(&:join)
281288
[results, errors]
282289
end
283290
end

lib/redis_client/cluster/pipeline.rb

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ class RedisClient
77
class Cluster
88
class Pipeline
99
ReplySizeError = Class.new(::RedisClient::Error)
10+
MAX_THREADS = Integer(ENV.fetch('MAX_THREADS', 5))
1011

1112
def initialize(router, command_builder)
1213
@router = router
@@ -62,27 +63,30 @@ def empty?
6263
end
6364

6465
# TODO: https://github.com/redis-rb/redis-cluster-client/issues/37 handle redirections
65-
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
66+
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
6667
all_replies = Array.new(@size)
6768
errors = {}
68-
threads = @grouped.map do |k, v|
69-
Thread.new(@router, k, v) do |router, node_key, rows|
70-
Thread.pass
71-
replies = router.find_node(node_key).pipelined do |pipeline|
72-
rows.each do |(_size, *row, block)|
73-
pipeline.send(*row, &block)
69+
@grouped.each_slice(MAX_THREADS * 2) do |chuncked_grouped|
70+
threads = chuncked_grouped.map do |k, v|
71+
Thread.new(@router, k, v) do |router, node_key, rows|
72+
Thread.pass
73+
replies = router.find_node(node_key).pipelined do |pipeline|
74+
rows.each do |(_size, *row, block)|
75+
pipeline.send(*row, &block)
76+
end
7477
end
75-
end
7678

77-
raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size
79+
raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size
7880

79-
rows.each_with_index { |row, idx| all_replies[row.first] = replies[idx] }
80-
rescue StandardError => e
81-
errors[node_key] = e
81+
rows.each_with_index { |row, idx| all_replies[row.first] = replies[idx] }
82+
rescue StandardError => e
83+
errors[node_key] = e
84+
end
8285
end
86+
87+
threads.each(&:join)
8388
end
8489

85-
threads.each(&:join)
8690
return all_replies if errors.empty?
8791

8892
raise ::RedisClient::Cluster::ErrorCollection, errors

0 commit comments

Comments
 (0)