Skip to content

Commit 044cc0d

Browse files
authored
feat: add concurrent worker model option (#265)
1 parent da5cfcd commit 044cc0d

File tree

6 files changed

+79
-6
lines changed

6 files changed

+79
-6
lines changed

Gemfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
source 'https://rubygems.org'
44
gemspec name: 'redis-cluster-client'
55

6+
gem 'benchmark-ips'
67
gem 'hiredis-client', '~> 0.6'
78
gem 'memory_profiler'
89
gem 'minitest'

Rakefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ Rake::TestTask.new(:bench) do |t|
3737
t.test_files = ARGV.size > 1 ? ARGV[1..] : Dir['test/**/bench_*.rb']
3838
end
3939

40+
Rake::TestTask.new(:ips) do |t|
41+
t.libs << :lib
42+
t.libs << :test
43+
t.options = '-v'
44+
t.warning = false
45+
t.test_files = ARGV.size > 1 ? ARGV[1..] : Dir['test/**/ips_*.rb']
46+
end
47+
4048
Rake::TestTask.new(:prof) do |t|
4149
t.libs << :lib
4250
t.libs << :test

lib/redis_client/cluster.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ class Cluster
1111

1212
attr_reader :config
1313

14-
def initialize(config, pool: nil, **kwargs)
14+
def initialize(config, pool: nil, concurrent_worker_model: nil, **kwargs)
1515
@config = config
16-
@concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create
16+
@concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(model: concurrent_worker_model)
1717
@router = ::RedisClient::Cluster::Router.new(config, @concurrent_worker, pool: pool, **kwargs)
1818
@command_builder = config.command_builder
1919
end

lib/redis_client/cluster/concurrent_worker.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def close
6464

6565
def create(model: :on_demand)
6666
case model
67-
when :on_demand then ::RedisClient::Cluster::ConcurrentWorker::OnDemand.new
67+
when :on_demand, nil then ::RedisClient::Cluster::ConcurrentWorker::OnDemand.new
6868
when :pooled then ::RedisClient::Cluster::ConcurrentWorker::Pooled.new
6969
else raise ArgumentError, "Unknown model: #{model}"
7070
end

lib/redis_client/cluster_config.rb

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ def initialize(
2727
replica: false,
2828
replica_affinity: :random,
2929
fixed_hostname: '',
30-
client_implementation: Cluster,
30+
concurrent_worker_model: nil,
31+
client_implementation: ::RedisClient::Cluster, # for redis gem
3132
**client_config
3233
)
3334

@@ -38,6 +39,7 @@ def initialize(
3839
client_config = client_config.reject { |k, _| IGNORE_GENERIC_CONFIG_KEYS.include?(k) }
3940
@command_builder = client_config.fetch(:command_builder, ::RedisClient::CommandBuilder)
4041
@client_config = merge_generic_config(client_config, @node_configs)
42+
@concurrent_worker_model = concurrent_worker_model
4143
@client_implementation = client_implementation
4244
@mutex = Mutex.new
4345
end
@@ -48,6 +50,7 @@ def dup
4850
replica: @replica,
4951
replica_affinity: @replica_affinity,
5052
fixed_hostname: @fixed_hostname,
53+
concurrent_worker_model: @concurrent_worker_model,
5154
client_implementation: @client_implementation,
5255
**@client_config
5356
)
@@ -62,11 +65,16 @@ def read_timeout
6265
end
6366

6467
def new_pool(size: 5, timeout: 5, **kwargs)
65-
@client_implementation.new(self, pool: { size: size, timeout: timeout }, **kwargs)
68+
@client_implementation.new(
69+
self,
70+
pool: { size: size, timeout: timeout },
71+
concurrent_worker_model: @concurrent_worker_model,
72+
**kwargs
73+
)
6674
end
6775

6876
def new_client(**kwargs)
69-
@client_implementation.new(self, **kwargs)
77+
@client_implementation.new(self, concurrent_worker_model: @concurrent_worker_model, **kwargs)
7078
end
7179

7280
def per_node_key

test/ips_pipeline.rb

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# frozen_string_literal: true
2+
3+
require 'benchmark/ips'
4+
require 'redis_cluster_client'
5+
require 'testing_constants'
6+
7+
module BenchPipeline
8+
module_function
9+
10+
ATTEMPTS = 100
11+
12+
def run
13+
on_demand = make_client(:on_demand)
14+
pooled = make_client(:pooled)
15+
prepare(on_demand, pooled)
16+
bench(on_demand, pooled)
17+
end
18+
19+
def make_client(model)
20+
::RedisClient.cluster(
21+
nodes: TEST_NODE_URIS,
22+
fixed_hostname: TEST_FIXED_HOSTNAME,
23+
concurrent_worker_model: model,
24+
**TEST_GENERIC_OPTIONS
25+
).new_client
26+
end
27+
28+
def prepare(on_demand, pooled)
29+
on_demand.pipelined do |pi|
30+
ATTEMPTS.times { |i| pi.call('SET', "key#{i}", "val#{i}") }
31+
end
32+
33+
pooled.pipelined do |pi|
34+
ATTEMPTS.times { |i| pi.call('SET', "key#{i}", "val#{i}") }
35+
end
36+
end
37+
38+
def bench(on_demand, pooled)
39+
Benchmark.ips do |x|
40+
x.time = 5
41+
x.warmup = 1
42+
43+
x.report('on_demand') do
44+
on_demand.pipelined { |pi| ATTEMPTS.times { |i| pi.call('GET', "key#{i}") } }
45+
end
46+
47+
x.report('pooled') do
48+
pooled.pipelined { |pi| ATTEMPTS.times { |i| pi.call('GET', "key#{i}") } }
49+
end
50+
51+
x.compare!
52+
end
53+
end
54+
end
55+
56+
BenchPipeline.run

0 commit comments

Comments
 (0)