Skip to content

Commit 7cf31ab

Browse files
authored
fix: rename option for concurrency (#270)
1 parent 66b9deb commit 7cf31ab

File tree

9 files changed

+98
-35
lines changed

9 files changed

+98
-35
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ gem 'redis-cluster-client'
2626
| `:replica` | Boolean | `false` | `true` if client should use scale read feature |
2727
| `:replica_affinity` | Symbol or String | `:random` | scale reading strategy, `:random`, `random_with_primary` or `:latency` are valid |
2828
| `:fixed_hostname` | String | `nil` | required if client should connect to single endpoint with SSL |
29+
| `:concurrency` | Hash | `{ model: :on_demand, size: 5}` | concurrency settings, `:on_demand`, `:pooled` and `:none` are valid models, size is a max number of workers, `:none` model is no concurrency, Please choose the one suited your environment if needed. |
2930

3031
Also, [the other generic options](https://github.com/redis-rb/redis-client#configuration) can be passed.
3132
But `:url`, `:host`, `:port` and `:path` are ignored because they conflict with the `:nodes` option.

lib/redis_client/cluster.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ class Cluster
1111

1212
attr_reader :config
1313

14-
def initialize(config, pool: nil, concurrent_worker_model: nil, **kwargs)
14+
def initialize(config, pool: nil, concurrency: nil, **kwargs)
1515
@config = config
16-
@concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(model: concurrent_worker_model)
16+
@concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {}))
1717
@router = ::RedisClient::Cluster::Router.new(config, @concurrent_worker, pool: pool, **kwargs)
1818
@command_builder = config.command_builder
1919
end

lib/redis_client/cluster/concurrent_worker.rb

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
require 'redis_client/cluster/concurrent_worker/on_demand'
44
require 'redis_client/cluster/concurrent_worker/pooled'
5+
require 'redis_client/cluster/concurrent_worker/none'
56

67
class RedisClient
78
class Cluster
89
module ConcurrentWorker
9-
MAX_WORKERS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', 5))
1010
InvalidNumberOfTasks = Class.new(::RedisClient::Error)
1111

1212
class Group
@@ -30,55 +30,53 @@ def done
3030
end
3131
end
3232

33-
def initialize(worker:, size:)
34-
raise ArgumentError, "the size must be positive: #{size} given" unless size.positive?
35-
33+
def initialize(worker:, queue:, size:)
3634
@worker = worker
37-
@result_queue = SizedQueue.new(size)
35+
@queue = queue
36+
@size = size
3837
@count = 0
3938
end
4039

4140
def push(id, *args, **kwargs, &block)
42-
raise InvalidNumberOfTasks, "max size reached: #{@count}" if @count == @result_queue.max
41+
raise InvalidNumberOfTasks, "max size reached: #{@count}" if @count == @size
4342

44-
task = Task.new(id: id, queue: @result_queue, args: args, kwargs: kwargs, block: block)
43+
task = Task.new(id: id, queue: @queue, args: args, kwargs: kwargs, block: block)
4544
@worker.push(task)
4645
@count += 1
4746
nil
4847
end
4948

5049
def each
51-
raise InvalidNumberOfTasks, "expected: #{@result_queue.max}, actual: #{@count}" if @count != @result_queue.max
50+
raise InvalidNumberOfTasks, "expected: #{@size}, actual: #{@count}" if @count != @size
5251

53-
@result_queue.max.times do
54-
task = @result_queue.pop
52+
@size.times do
53+
task = @queue.pop
5554
yield(task.id, task.result)
5655
end
5756

5857
nil
5958
end
6059

6160
def close
62-
@result_queue.clear
63-
@result_queue.close
61+
@queue.clear
62+
@queue.close if @queue.respond_to?(:close)
6463
@count = 0
6564
nil
6665
end
6766
end
6867

6968
module_function
7069

71-
def create(model: :on_demand)
70+
def create(model: :on_demand, size: 5)
71+
size = size.positive? ? size : 5
72+
7273
case model
73-
when :on_demand, nil then ::RedisClient::Cluster::ConcurrentWorker::OnDemand.new
74-
when :pooled then ::RedisClient::Cluster::ConcurrentWorker::Pooled.new
74+
when :on_demand, nil then ::RedisClient::Cluster::ConcurrentWorker::OnDemand.new(size: size)
75+
when :pooled then ::RedisClient::Cluster::ConcurrentWorker::Pooled.new(size: size)
76+
when :none then ::RedisClient::Cluster::ConcurrentWorker::None.new
7577
else raise ArgumentError, "Unknown model: #{model}"
7678
end
7779
end
78-
79-
def size
80-
MAX_WORKERS.positive? ? MAX_WORKERS : 5
81-
end
8280
end
8381
end
8482
end
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# frozen_string_literal: true
2+
3+
class RedisClient
4+
class Cluster
5+
module ConcurrentWorker
6+
class None
7+
def new_group(size:)
8+
::RedisClient::Cluster::ConcurrentWorker::Group.new(
9+
worker: self,
10+
queue: Array.new(size),
11+
size: size
12+
)
13+
end
14+
15+
def push(task)
16+
task.exec
17+
end
18+
19+
def close; end
20+
end
21+
end
22+
end
23+
end

lib/redis_client/cluster/concurrent_worker/on_demand.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@ class RedisClient
44
class Cluster
55
module ConcurrentWorker
66
class OnDemand
7-
def initialize
8-
@q = SizedQueue.new(::RedisClient::Cluster::ConcurrentWorker.size)
7+
def initialize(size:)
8+
@q = SizedQueue.new(size)
99
end
1010

1111
def new_group(size:)
12-
::RedisClient::Cluster::ConcurrentWorker::Group.new(worker: self, size: size)
12+
::RedisClient::Cluster::ConcurrentWorker::Group.new(
13+
worker: self,
14+
queue: SizedQueue.new(size),
15+
size: size
16+
)
1317
end
1418

1519
def push(task)

lib/redis_client/cluster/concurrent_worker/pooled.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,19 @@ module ConcurrentWorker
1010
# It is a fixed size but we can modify the size with some environment variables.
1111
# So it consumes memory 1 MB multiplied a number of workers.
1212
class Pooled
13-
def initialize
13+
def initialize(size:)
14+
@size = size
1415
setup
1516
end
1617

1718
def new_group(size:)
1819
reset if @pid != ::RedisClient::PIDCache.pid
1920
ensure_workers if @workers.first.nil?
20-
::RedisClient::Cluster::ConcurrentWorker::Group.new(worker: self, size: size)
21+
::RedisClient::Cluster::ConcurrentWorker::Group.new(
22+
worker: self,
23+
queue: SizedQueue.new(size),
24+
size: size
25+
)
2126
end
2227

2328
def push(task)
@@ -37,7 +42,7 @@ def close
3742

3843
def setup
3944
@q = Queue.new
40-
@workers = Array.new(::RedisClient::Cluster::ConcurrentWorker.size)
45+
@workers = Array.new(@size)
4146
@pid = ::RedisClient::PIDCache.pid
4247
end
4348

lib/redis_client/cluster_config.rb

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class ClusterConfig
1717
VALID_NODES_KEYS = %i[ssl username password host port db].freeze
1818
MERGE_CONFIG_KEYS = %i[ssl username password].freeze
1919
IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze
20+
MAX_WORKERS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', 5))
2021

2122
InvalidClientConfigError = Class.new(::RedisClient::Error)
2223

@@ -27,7 +28,7 @@ def initialize(
2728
replica: false,
2829
replica_affinity: :random,
2930
fixed_hostname: '',
30-
concurrent_worker_model: nil,
31+
concurrency: nil,
3132
client_implementation: ::RedisClient::Cluster, # for redis gem
3233
**client_config
3334
)
@@ -39,7 +40,7 @@ def initialize(
3940
client_config = client_config.reject { |k, _| IGNORE_GENERIC_CONFIG_KEYS.include?(k) }
4041
@command_builder = client_config.fetch(:command_builder, ::RedisClient::CommandBuilder)
4142
@client_config = merge_generic_config(client_config, @node_configs)
42-
@concurrent_worker_model = concurrent_worker_model
43+
@concurrency = merge_concurrency_option(concurrency)
4344
@client_implementation = client_implementation
4445
@mutex = Mutex.new
4546
end
@@ -50,7 +51,7 @@ def dup
5051
replica: @replica,
5152
replica_affinity: @replica_affinity,
5253
fixed_hostname: @fixed_hostname,
53-
concurrent_worker_model: @concurrent_worker_model,
54+
concurrency: @concurrency,
5455
client_implementation: @client_implementation,
5556
**@client_config
5657
)
@@ -61,20 +62,20 @@ def inspect
6162
end
6263

6364
def read_timeout
64-
@client_config[:read_timeout] || @client_config[:timeout] || RedisClient::Config::DEFAULT_TIMEOUT
65+
@client_config[:read_timeout] || @client_config[:timeout] || ::RedisClient::Config::DEFAULT_TIMEOUT
6566
end
6667

6768
def new_pool(size: 5, timeout: 5, **kwargs)
6869
@client_implementation.new(
6970
self,
7071
pool: { size: size, timeout: timeout },
71-
concurrent_worker_model: @concurrent_worker_model,
72+
concurrency: @concurrency,
7273
**kwargs
7374
)
7475
end
7576

7677
def new_client(**kwargs)
77-
@client_implementation.new(self, concurrent_worker_model: @concurrent_worker_model, **kwargs)
78+
@client_implementation.new(self, concurrency: @concurrency, **kwargs)
7879
end
7980

8081
def per_node_key
@@ -104,6 +105,15 @@ def add_node(host, port)
104105

105106
private
106107

108+
def merge_concurrency_option(option)
109+
case option
110+
when Hash
111+
option = option.transform_keys(&:to_sym)
112+
{ size: MAX_WORKERS }.merge(option)
113+
else { size: MAX_WORKERS }
114+
end
115+
end
116+
107117
def build_node_configs(addrs)
108118
configs = Array[addrs].flatten.filter_map { |addr| parse_node_addr(addr) }
109119
raise InvalidClientConfigError, '`nodes` option is empty' if configs.empty?

test/ips_pipeline.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ module IpsPipeline
1212
def run
1313
on_demand = make_client(:on_demand)
1414
pooled = make_client(:pooled)
15+
none = make_client(:none)
1516
envoy = make_client_for_envoy
1617
cluster_proxy = make_client_for_cluster_proxy
17-
prepare(on_demand, pooled, envoy, cluster_proxy)
18+
prepare(on_demand, pooled, none, envoy, cluster_proxy)
1819
print_letter('pipelined')
1920
bench(
2021
ondemand: on_demand,
2122
pooled: pooled,
23+
none: none,
2224
envoy: envoy,
2325
cproxy: cluster_proxy
2426
)
@@ -30,7 +32,7 @@ def make_client(model)
3032
replica: true,
3133
replica_affinity: :random,
3234
fixed_hostname: TEST_FIXED_HOSTNAME,
33-
concurrent_worker_model: model,
35+
concurrency: { model: model },
3436
**TEST_GENERIC_OPTIONS
3537
).new_client
3638
end
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
require 'testing_helper'
4+
require 'redis_client/cluster/concurrent_worker/mixin'
5+
6+
class RedisClient
7+
class Cluster
8+
module ConcurrentWorker
9+
class TestNone < TestingWrapper
10+
include Mixin
11+
12+
private
13+
14+
def model
15+
:none
16+
end
17+
end
18+
end
19+
end
20+
end

0 commit comments

Comments
 (0)