Skip to content

Commit e390686

Browse files
authored
perf: reduce memory allocations for pipelining (#130)
1 parent 374a4ea commit e390686

File tree

11 files changed

+88
-62
lines changed

11 files changed

+88
-62
lines changed

.rubocop.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,18 @@ Metrics/AbcSize:
1515
- 'test/**/*'
1616

1717
Metrics/ClassLength:
18-
Max: 400
18+
Max: 500
19+
20+
Metrics/ModuleLength:
21+
Max: 500
1922

2023
Metrics/MethodLength:
24+
Max: 50
25+
Exclude:
26+
- 'test/**/*'
27+
28+
Metrics/BlockLength:
29+
Max: 25
2130
Exclude:
2231
- 'test/**/*'
2332

lib/redis_client/cluster.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def pipelined
8181
seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed
8282
pipeline = ::RedisClient::Cluster::Pipeline.new(@router, @command_builder, seed: seed)
8383
yield pipeline
84-
return [] if pipeline.empty? == 0
84+
return [] if pipeline.empty?
8585

8686
pipeline.execute
8787
end

lib/redis_client/cluster/command.rb

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class Command
1010
EMPTY_STRING = ''
1111

1212
class << self
13-
def load(nodes) # rubocop:disable Metrics/MethodLength
13+
def load(nodes)
1414
errors = []
1515
cmd = nil
1616
nodes&.each do |node|
@@ -32,10 +32,7 @@ def load(nodes) # rubocop:disable Metrics/MethodLength
3232

3333
def parse_command_details(rows)
3434
rows&.reject { |row| row[0].nil? }.to_h do |row|
35-
[
36-
::RedisClient::Cluster::NormalizedCmdName.instance.get_by_name(row[0]),
37-
{ arity: row[1], flags: row[2], first: row[3], last: row[4], step: row[5] }
38-
]
35+
[row[0].downcase, { arity: row[1], flags: row[2], first: row[3], last: row[4], step: row[5] }]
3936
end
4037
end
4138
end
@@ -85,7 +82,7 @@ def dig_details(command, key)
8582
@details.fetch(name).fetch(key)
8683
end
8784

88-
def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/MethodLength
85+
def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticComplexity
8986
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
9087
when 'eval', 'evalsha', 'zinterstore', 'zunionstore' then 3
9188
when 'object' then 2

lib/redis_client/cluster/node.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def build_connection_prelude
3737
end
3838

3939
class << self
40-
def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
40+
def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
4141
startup_size = options.size > MAX_STARTUP_SAMPLE ? MAX_STARTUP_SAMPLE : options.size
4242
node_info_list = errors = nil
4343
startup_options = options.to_a.sample(MAX_STARTUP_SAMPLE).to_h
@@ -83,7 +83,7 @@ def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/Cycl
8383

8484
# @see https://redis.io/commands/cluster-nodes/
8585
# @see https://github.com/redis/redis/blob/78960ad57b8a5e6af743d789ed8fd767e37d42b8/src/cluster.c#L4660-L4683
86-
def parse_node_info(info) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/MethodLength
86+
def parse_node_info(info) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
8787
rows = info.split("\n").map(&:split)
8888
rows.each { |arr| arr[2] = arr[2].split(',') }
8989
rows.select! { |arr| arr[7] == 'connected' && (arr[2] & %w[fail? fail handshake noaddr noflags]).empty? }
@@ -242,7 +242,7 @@ def call_multiple_nodes!(clients, method, command, args, &block)
242242
raise ::RedisClient::Cluster::ErrorCollection, errors
243243
end
244244

245-
def try_map(clients) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
245+
def try_map(clients) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
246246
results = errors = nil
247247
clients.each_slice(MAX_THREADS) do |chuncked_clients|
248248
threads = chuncked_clients.map do |k, v|

lib/redis_client/cluster/node/latency_replica.rb

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@ def any_replica_node_key(seed: nil)
3939

4040
private
4141

42-
def measure_latencies(clients) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
43-
latencies = nil
44-
clients.each_slice(::RedisClient::Cluster::Node::MAX_THREADS) do |chuncked_clients|
42+
def measure_latencies(clients) # rubocop:disable Metrics/AbcSize
43+
clients.each_slice(::RedisClient::Cluster::Node::MAX_THREADS).each_with_object({}) do |chuncked_clients, acc|
4544
threads = chuncked_clients.map do |k, v|
4645
Thread.new(k, v) do |node_key, client|
4746
Thread.pass
@@ -63,12 +62,9 @@ def measure_latencies(clients) # rubocop:disable Metrics/MethodLength, Metrics/A
6362

6463
threads.each do |t|
6564
t.join
66-
latencies ||= {}
67-
latencies[t.thread_variable_get(:node_key)] = t.thread_variable_get(:latency)
65+
acc[t.thread_variable_get(:node_key)] = t.thread_variable_get(:latency)
6866
end
6967
end
70-
71-
latencies
7268
end
7369

7470
def select_replica_clients(replications, clients)

lib/redis_client/cluster/normalized_cmd_name.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def get_by_name(name)
2828

2929
def clear
3030
@mutex.synchronize { @cache.clear }
31+
true
3132
end
3233

3334
private

lib/redis_client/cluster/pipeline.rb

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
require 'redis_client'
44
require 'redis_client/cluster/errors'
5+
require 'redis_client/middlewares'
6+
require 'redis_client/pooled'
57

68
class RedisClient
79
class Cluster
@@ -12,65 +14,70 @@ class Pipeline
1214
def initialize(router, command_builder, seed: Random.new_seed)
1315
@router = router
1416
@command_builder = command_builder
15-
@grouped = {}
16-
@size = 0
1717
@seed = seed
18+
@pipelines = @indices = nil
19+
@size = 0
1820
end
1921

2022
def call(*args, **kwargs, &block)
2123
command = @command_builder.generate(args, kwargs)
2224
node_key = @router.find_node_key(command, seed: @seed)
23-
add_row(node_key, [@size, :call_v, command, block])
25+
get_pipeline(node_key).call_v(command, &block)
26+
index_pipeline(node_key)
2427
end
2528

2629
def call_v(args, &block)
2730
command = @command_builder.generate(args)
2831
node_key = @router.find_node_key(command, seed: @seed)
29-
add_row(node_key, [@size, :call_v, command, block])
32+
get_pipeline(node_key).call_v(command, &block)
33+
index_pipeline(node_key)
3034
end
3135

3236
def call_once(*args, **kwargs, &block)
3337
command = @command_builder.generate(args, kwargs)
3438
node_key = @router.find_node_key(command, seed: @seed)
35-
add_row(node_key, [@size, :call_once_v, command, block])
39+
get_pipeline(node_key).call_once_v(command, &block)
40+
index_pipeline(node_key)
3641
end
3742

3843
def call_once_v(args, &block)
3944
command = @command_builder.generate(args)
4045
node_key = @router.find_node_key(command, seed: @seed)
41-
add_row(node_key, [@size, :call_once_v, command, block])
46+
get_pipeline(node_key).call_once_v(command, &block)
47+
index_pipeline(node_key)
4248
end
4349

4450
def blocking_call(timeout, *args, **kwargs, &block)
4551
command = @command_builder.generate(args, kwargs)
4652
node_key = @router.find_node_key(command, seed: @seed)
47-
add_row(node_key, [@size, :blocking_call_v, timeout, command, block])
53+
get_pipeline(node_key).blocking_call_v(timeout, command, &block)
54+
index_pipeline(node_key)
4855
end
4956

5057
def blocking_call_v(timeout, args, &block)
5158
command = @command_builder.generate(args)
5259
node_key = @router.find_node_key(command, seed: @seed)
53-
add_row(node_key, [@size, :blocking_call_v, timeout, command, block])
60+
get_pipeline(node_key).blocking_call_v(timeout, command, &block)
61+
index_pipeline(node_key)
5462
end
5563

5664
def empty?
5765
@size.zero?
5866
end
5967

6068
# TODO: https://github.com/redis-rb/redis-cluster-client/issues/37 handle redirections
61-
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
69+
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
6270
all_replies = errors = nil
63-
@grouped.each_slice(MAX_THREADS) do |chuncked_grouped|
64-
threads = chuncked_grouped.map do |k, v|
65-
Thread.new(@router, k, v) do |router, node_key, rows|
71+
@pipelines&.each_slice(MAX_THREADS) do |chuncked_pipelines|
72+
threads = chuncked_pipelines.map do |node_key, pipeline|
73+
Thread.new(node_key, pipeline) do |nk, pl|
6674
Thread.pass
67-
replies = do_pipelining(router.find_node(node_key), rows)
68-
raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size
75+
Thread.current.thread_variable_set(:node_key, nk)
76+
replies = do_pipelining(@router.find_node(nk), pl)
77+
raise ReplySizeError, "commands: #{pl._size}, replies: #{replies.size}" if pl._size != replies.size
6978

70-
Thread.current.thread_variable_set(:rows, rows)
7179
Thread.current.thread_variable_set(:replies, replies)
7280
rescue StandardError => e
73-
Thread.current.thread_variable_set(:node_key, node_key)
7481
Thread.current.thread_variable_set(:error, e)
7582
end
7683
end
@@ -79,7 +86,7 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
7986
t.join
8087
if t.thread_variable?(:replies)
8188
all_replies ||= Array.new(@size)
82-
t.thread_variable_get(:rows).each_with_index { |r, i| all_replies[r.first] = t.thread_variable_get(:replies)[i] }
89+
@indices[t.thread_variable_get(:node_key)].each_with_index { |gi, i| all_replies[gi] = t.thread_variable_get(:replies)[i] }
8390
elsif t.thread_variable?(:error)
8491
errors ||= {}
8592
errors[t.thread_variable_get(:node_key)] = t.thread_variable_get(:error)
@@ -94,21 +101,35 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
94101

95102
private
96103

97-
def add_row(node_key, row)
98-
@grouped[node_key] = [] unless @grouped.key?(node_key)
99-
@grouped[node_key] << row
104+
def get_pipeline(node_key)
105+
@pipelines ||= {}
106+
@pipelines[node_key] ||= ::RedisClient::Pipeline.new(@command_builder)
107+
end
108+
109+
def index_pipeline(node_key)
110+
@indices ||= {}
111+
@indices[node_key] ||= []
112+
@indices[node_key] << @size
100113
@size += 1
101114
end
102115

103-
def do_pipelining(node, rows)
104-
node.pipelined do |pipeline|
105-
rows.each do |row|
106-
case row.size
107-
when 4 then pipeline.send(row[1], row[2], &row[3])
108-
when 5 then pipeline.send(row[1], row[2], row[3], &row[4])
109-
end
116+
def do_pipelining(client, pipeline)
117+
case client
118+
when ::RedisClient then send_pipeline(client, pipeline)
119+
when ::RedisClient::Pooled then client.with { |cli| send_pipeline(cli, pipeline) }
120+
else raise NotImplementedError, "#{client.class.name}#pipelined for cluster client"
121+
end
122+
end
123+
124+
def send_pipeline(client, pipeline)
125+
results = client.send(:ensure_connected, retryable: pipeline._retryable?) do |connection|
126+
commands = pipeline._commands
127+
::RedisClient::Middlewares.call_pipelined(commands, client.config) do
128+
connection.call_pipelined(commands, pipeline._timeouts)
110129
end
111130
end
131+
132+
pipeline._coerce!(results)
112133
end
113134
end
114135
end

lib/redis_client/cluster/router.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def initialize(config, pool: nil, **kwargs)
2525
@command_builder = @config.command_builder
2626
end
2727

28-
def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
28+
def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
2929
cmd = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
3030
case cmd
3131
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
@@ -65,7 +65,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
6565

6666
# @see https://redis.io/topics/cluster-spec#redirection-and-resharding
6767
# Redirection and resharding
68-
def try_send(node, method, command, args, retry_count: 3, &block) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
68+
def try_send(node, method, command, args, retry_count: 3, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
6969
if args.empty?
7070
# prevent memory allocation for variable-length args
7171
node.send(method, command, &block)
@@ -100,7 +100,7 @@ def try_send(node, method, command, args, retry_count: 3, &block) # rubocop:disa
100100
retry
101101
end
102102

103-
def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
103+
def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/AbcSize
104104
node.send(method, *args, **kwargs, &block)
105105
rescue ::RedisClient::CommandError => e
106106
raise if retry_count <= 0
@@ -129,7 +129,7 @@ def try_delegate(node, method, *args, retry_count: 3, **kwargs, &block) # ruboco
129129
retry
130130
end
131131

132-
def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
132+
def scan(*command, seed: nil, **kwargs) # rubocop:disable Metrics/AbcSize
133133
command = @command_builder.generate(command, kwargs)
134134

135135
command[1] = ZERO_CURSOR_FOR_SCAN if command.size == 1
@@ -270,7 +270,7 @@ def assign_asking_node(err_msg)
270270
find_node(node_key)
271271
end
272272

273-
def fetch_cluster_info(config, pool: nil, **kwargs) # rubocop:disable Metrics/MethodLength
273+
def fetch_cluster_info(config, pool: nil, **kwargs)
274274
node_info = ::RedisClient::Cluster::Node.load_info(config.per_node_key, **kwargs)
275275
node_addrs = node_info.map { |info| ::RedisClient::Cluster::NodeKey.hashify(info[:node_key]) }
276276
config.update_node(node_addrs)

lib/redis_client/cluster_config.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def parse_node_addr(addr)
114114
end
115115
end
116116

117-
def parse_node_url(addr) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/MethodLength
117+
def parse_node_url(addr) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
118118
return if addr.empty?
119119

120120
uri = URI(addr)

test/redis_client/cluster/test_normalized_cmd_name.rb

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,27 +63,29 @@ def test_get_by_name
6363
end
6464

6565
def test_thread_safety
66-
results = Array.new(10, true)
67-
threads = results.each_with_index.map do |_, i|
66+
attempts = Array.new(100, true)
67+
threads = attempts.each_with_index.map do |_, i|
6868
Thread.new do
6969
Thread.pass
7070
Thread.current.thread_variable_set(:index, i)
71-
if i.even?
72-
Thread.current.thread_variable_set(:result, @subject.get_by_command(%w[SET foo bar]) == 'set')
73-
else
74-
@subject.clear
75-
end
71+
got = if i.even?
72+
@subject.get_by_command(%w[SET foo bar]) == 'set'
73+
else
74+
@subject.clear
75+
end
76+
Thread.current.thread_variable_set(:got, got)
7677
rescue StandardError
77-
Thread.current.thread_variable_set(:result, false)
78+
Thread.current.thread_variable_set(:got, false)
7879
end
7980
end
8081

8182
threads.each do |t|
8283
t.join
83-
results[t.thread_variable_get(:index)] = t.thread_variable_get(:result)
84+
attempts[t.thread_variable_get(:index)] = t.thread_variable_get(:got)
8485
end
8586

86-
refute_includes(results, false)
87+
refute_includes(attempts, false)
88+
refute_includes(attempts, nil)
8789
end
8890
end
8991
end

0 commit comments

Comments
 (0)