Skip to content

Commit 4e742a4

Browse files
authored
Merge pull request #97 from supercaracal/reduce-consumption-of-threads
Reduce consumption of threads
2 parents f71a99e + 173aecc commit 4e742a4

File tree

3 files changed

+56
-45
lines changed

3 files changed

+56
-45
lines changed

lib/redis_client/cluster/node.rb

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class Node
1313
MIN_SLOT = 0
1414
MAX_SLOT = SLOT_SIZE - 1
1515
MAX_STARTUP_SAMPLE = 37
16+
MAX_THREADS = Integer(ENV.fetch('MAX_THREADS', 5))
1617
IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze
1718

1819
ReloadNeeded = Class.new(::RedisClient::Error)
@@ -39,18 +40,21 @@ def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/Cycl
3940
errors = Array.new(startup_size)
4041
startup_options = options.to_a.sample(MAX_STARTUP_SAMPLE).to_h
4142
startup_nodes = ::RedisClient::Cluster::Node.new(startup_options, **kwargs)
42-
threads = startup_nodes.each_with_index.map do |raw_client, idx|
43-
Thread.new(raw_client, idx) do |cli, i|
44-
Thread.pass
45-
reply = cli.call('CLUSTER', 'NODES')
46-
node_info_list[i] = parse_node_info(reply)
47-
rescue StandardError => e
48-
errors[i] = e
49-
ensure
50-
cli&.close
43+
startup_nodes.each_slice(MAX_THREADS * 2).with_index do |chuncked_startup_nodes, chuncked_idx|
44+
threads = chuncked_startup_nodes.each_with_index.map do |raw_client, idx|
45+
Thread.new(raw_client, (MAX_THREADS * 2 * chuncked_idx) + idx) do |cli, i|
46+
Thread.pass
47+
reply = cli.call('CLUSTER', 'NODES')
48+
node_info_list[i] = parse_node_info(reply)
49+
rescue StandardError => e
50+
errors[i] = e
51+
ensure
52+
cli&.close
53+
end
5154
end
55+
threads.each(&:join)
5256
end
53-
threads.each(&:join)
57+
5458
raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.all?(&:nil?)
5559

5660
grouped = node_info_list.compact.group_by do |rows|
@@ -267,17 +271,20 @@ def build_clients(options, pool: nil, **kwargs)
267271
def try_map # rubocop:disable Metrics/MethodLength
268272
results = {}
269273
errors = {}
270-
threads = @clients.map do |k, v|
271-
Thread.new(k, v) do |node_key, client|
272-
Thread.pass
273-
reply = yield(node_key, client)
274-
results[node_key] = reply unless reply.nil?
275-
rescue StandardError => e
276-
errors[node_key] = e
274+
@clients.each_slice(MAX_THREADS * 2) do |chuncked_clients|
275+
threads = chuncked_clients.map do |k, v|
276+
Thread.new(k, v) do |node_key, client|
277+
Thread.pass
278+
reply = yield(node_key, client)
279+
results[node_key] = reply unless reply.nil?
280+
rescue StandardError => e
281+
errors[node_key] = e
282+
end
277283
end
284+
285+
threads.each(&:join)
278286
end
279287

280-
threads.each(&:join)
281288
[results, errors]
282289
end
283290
end

lib/redis_client/cluster/pipeline.rb

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ class RedisClient
77
class Cluster
88
class Pipeline
99
ReplySizeError = Class.new(::RedisClient::Error)
10+
MAX_THREADS = Integer(ENV.fetch('MAX_THREADS', 5))
1011

1112
def initialize(router, command_builder)
1213
@router = router
@@ -62,27 +63,30 @@ def empty?
6263
end
6364

6465
# TODO: https://github.com/redis-rb/redis-cluster-client/issues/37 handle redirections
65-
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
66+
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
6667
all_replies = Array.new(@size)
6768
errors = {}
68-
threads = @grouped.map do |k, v|
69-
Thread.new(@router, k, v) do |router, node_key, rows|
70-
Thread.pass
71-
replies = router.find_node(node_key).pipelined do |pipeline|
72-
rows.each do |(_size, *row, block)|
73-
pipeline.send(*row, &block)
69+
@grouped.each_slice(MAX_THREADS * 2) do |chuncked_grouped|
70+
threads = chuncked_grouped.map do |k, v|
71+
Thread.new(@router, k, v) do |router, node_key, rows|
72+
Thread.pass
73+
replies = router.find_node(node_key).pipelined do |pipeline|
74+
rows.each do |(_size, *row, block)|
75+
pipeline.send(*row, &block)
76+
end
7477
end
75-
end
7678

77-
raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size
79+
raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size
7880

79-
rows.each_with_index { |row, idx| all_replies[row.first] = replies[idx] }
80-
rescue StandardError => e
81-
errors[node_key] = e
81+
rows.each_with_index { |row, idx| all_replies[row.first] = replies[idx] }
82+
rescue StandardError => e
83+
errors[node_key] = e
84+
end
8285
end
86+
87+
threads.each(&:join)
8388
end
8489

85-
threads.each(&:join)
8690
return all_replies if errors.empty?
8791

8892
raise ::RedisClient::Cluster::ErrorCollection, errors

test/redis_client/test_cluster.rb

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def test_inspect
2929
def test_call
3030
assert_raises(ArgumentError) { @client.call }
3131

32-
(0..9).each do |i|
32+
10.times do |i|
3333
assert_equal('OK', @client.call('SET', "key#{i}", i), "Case: SET: key#{i}")
3434
wait_for_replication
3535
assert_equal(i.to_s, @client.call('GET', "key#{i}"), "Case: GET: key#{i}")
@@ -44,7 +44,7 @@ def test_call
4444
def test_call_once
4545
assert_raises(ArgumentError) { @client.call_once }
4646

47-
(0..9).each do |i|
47+
10.times do |i|
4848
assert_equal('OK', @client.call_once('SET', "key#{i}", i), "Case: SET: key#{i}")
4949
wait_for_replication
5050
assert_equal(i.to_s, @client.call_once('GET', "key#{i}"), "Case: GET: key#{i}")
@@ -72,7 +72,7 @@ def test_blocking_call
7272
def test_scan
7373
assert_raises(ArgumentError) { @client.scan }
7474

75-
(0..9).each { |i| @client.call('SET', "key#{i}", i) }
75+
10.times { |i| @client.call('SET', "key#{i}", i) }
7676
wait_for_replication
7777
want = (0..9).map { |i| "key#{i}" }
7878
got = []
@@ -81,8 +81,8 @@ def test_scan
8181
end
8282

8383
def test_sscan
84-
(0..9).each do |i|
85-
(0..9).each { |j| @client.call('SADD', "key#{i}", "member#{j}") }
84+
10.times do |i|
85+
10.times { |j| @client.call('SADD', "key#{i}", "member#{j}") }
8686
wait_for_replication
8787
want = (0..9).map { |j| "member#{j}" }
8888
got = []
@@ -92,8 +92,8 @@ def test_sscan
9292
end
9393

9494
def test_hscan
95-
(0..9).each do |i|
96-
(0..9).each { |j| @client.call('HSET', "key#{i}", "field#{j}", j) }
95+
10.times do |i|
96+
10.times { |j| @client.call('HSET', "key#{i}", "field#{j}", j) }
9797
wait_for_replication
9898
want = (0..9).map { |j| "field#{j}" }
9999
got = []
@@ -103,8 +103,8 @@ def test_hscan
103103
end
104104

105105
def test_zscan
106-
(0..9).each do |i|
107-
(0..9).each { |j| @client.call('ZADD', "key#{i}", j, "member#{j}") }
106+
10.times do |i|
107+
10.times { |j| @client.call('ZADD', "key#{i}", j, "member#{j}") }
108108
wait_for_replication
109109
want = (0..9).map { |j| "member#{j}" }
110110
got = []
@@ -118,18 +118,18 @@ def test_pipelined
118118

119119
want = (0..9).map { 'OK' } + (1..3).to_a + %w[PONG] + (0..9).map(&:to_s) + [%w[list 2]]
120120
got = @client.pipelined do |pipeline|
121-
(0..9).each { |i| pipeline.call('SET', "string#{i}", i) }
122-
(0..2).each { |i| pipeline.call('RPUSH', 'list', i) }
121+
10.times { |i| pipeline.call('SET', "string#{i}", i) }
122+
3.times { |i| pipeline.call('RPUSH', 'list', i) }
123123
pipeline.call_once('PING')
124-
(0..9).each { |i| pipeline.call('GET', "string#{i}") }
124+
10.times { |i| pipeline.call('GET', "string#{i}") }
125125
pipeline.blocking_call(0.2, 'BRPOP', 'list', '0.1')
126126
end
127127

128128
assert_equal(want, got)
129129
end
130130

131131
def test_pubsub
132-
(0..9).each do |i|
132+
10.times do |i|
133133
pubsub = @client.pubsub
134134
pubsub.call('SUBSCRIBE', "channel#{i}")
135135
assert_equal(['subscribe', "channel#{i}", 1], pubsub.next_event(0.1))
@@ -154,7 +154,7 @@ def test_close
154154
end
155155

156156
def test_dedicated_commands
157-
(0..9).each { |i| @client.call('SET', "key#{i}", i) }
157+
10.times { |i| @client.call('SET', "key#{i}", i) }
158158
wait_for_replication
159159
[
160160
{ command: %w[ACL HELP], is_a: Array },

0 commit comments

Comments
 (0)