diff --git a/README.md b/README.md index 612c2881..7e550612 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ gem 'redis-cluster-client' | `:replica_affinity` | Symbol or String | `:random` | scale reading strategy, `:random`, `random_with_primary` or `:latency` are valid | | `:fixed_hostname` | String | `nil` | required if client should connect to single endpoint with SSL | | `:slow_command_timeout` | Integer | `-1` | timeout used for "slow" queries that fetch metdata e.g. CLUSTER NODES, COMMAND | -| `:concurrency` | Hash | `{ model: :on_demand, size: 5}` | concurrency settings, `:on_demand`, `:pooled` and `:none` are valid models, size is a max number of workers, `:none` model is no concurrency, Please choose the one suited your environment if needed. | +| `:concurrency` | Hash | `{ model: :none }` | concurrency settings, `:on_demand`, `:pooled` and `:none` are valid models, size is a max number of workers, `:none` model is no concurrency, Please choose the one suited your environment if needed. | | `:connect_with_original_config` | Boolean | `false` | `true` if client should retry the connection using the original endpoint that was passed in | | `:max_startup_sample` | Integer | `3` | maximum number of nodes to fetch `CLUSTER NODES` information for startup | diff --git a/lib/redis_client/cluster/concurrent_worker.rb b/lib/redis_client/cluster/concurrent_worker.rb index 998ee12b..cd70b9c6 100644 --- a/lib/redis_client/cluster/concurrent_worker.rb +++ b/lib/redis_client/cluster/concurrent_worker.rb @@ -71,14 +71,12 @@ def inspect module_function - def create(model: :on_demand, size: 5) - size = size.positive? ? size : 5 - + def create(model: :none, size: 5) case model - when :on_demand, nil then ::RedisClient::Cluster::ConcurrentWorker::OnDemand.new(size: size) - when :pooled then ::RedisClient::Cluster::ConcurrentWorker::Pooled.new(size: size) when :none then ::RedisClient::Cluster::ConcurrentWorker::None.new - else raise ArgumentError, "Unknown model: #{model}" + when :on_demand then ::RedisClient::Cluster::ConcurrentWorker::OnDemand.new(size: size) + when :pooled then ::RedisClient::Cluster::ConcurrentWorker::Pooled.new(size: size) + else raise ArgumentError, "unknown model: #{model}" end end end diff --git a/lib/redis_client/cluster/concurrent_worker/on_demand.rb b/lib/redis_client/cluster/concurrent_worker/on_demand.rb index 72b885b6..5beae285 100644 --- a/lib/redis_client/cluster/concurrent_worker/on_demand.rb +++ b/lib/redis_client/cluster/concurrent_worker/on_demand.rb @@ -5,6 +5,8 @@ class Cluster module ConcurrentWorker class OnDemand def initialize(size:) + raise ArgumentError, "size must be positive: #{size}" unless size.positive? + @q = SizedQueue.new(size) end diff --git a/lib/redis_client/cluster/concurrent_worker/pooled.rb b/lib/redis_client/cluster/concurrent_worker/pooled.rb index 83e974b3..8677cff8 100644 --- a/lib/redis_client/cluster/concurrent_worker/pooled.rb +++ b/lib/redis_client/cluster/concurrent_worker/pooled.rb @@ -11,6 +11,8 @@ module ConcurrentWorker # So it consumes memory 1 MB multiplied a number of workers. class Pooled def initialize(size:) + raise ArgumentError, "size must be positive: #{size}" unless size.positive? + @size = size setup end diff --git a/lib/redis_client/cluster_config.rb b/lib/redis_client/cluster_config.rb index 6b0504ef..7bcf7670 100644 --- a/lib/redis_client/cluster_config.rb +++ b/lib/redis_client/cluster_config.rb @@ -19,7 +19,7 @@ class ClusterConfig VALID_NODES_KEYS = %i[ssl username password host port db].freeze MERGE_CONFIG_KEYS = %i[ssl username password].freeze IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze - MAX_WORKERS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', 5)) + MAX_WORKERS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', -1)) # for backward compatibility # It's used with slow queries of fetching meta data like CLUSTER NODES, COMMAND and so on. SLOW_COMMAND_TIMEOUT = Float(ENV.fetch('REDIS_CLIENT_SLOW_COMMAND_TIMEOUT', -1)) # It affects to strike a balance between load and stability in initialization or changed states. @@ -110,12 +110,16 @@ def server_url private def merge_concurrency_option(option) - case option - when Hash - option = option.transform_keys(&:to_sym) - { size: MAX_WORKERS }.merge(option) - else { size: MAX_WORKERS } + opts = {} + + if MAX_WORKERS.positive? + opts[:model] = :on_demand + opts[:size] = MAX_WORKERS end + + opts.merge!(option.transform_keys(&:to_sym)) if option.is_a?(Hash) + opts[:model] = :none if opts.empty? + opts.freeze end def build_node_configs(addrs) diff --git a/test/redis_client/test_cluster_config.rb b/test/redis_client/test_cluster_config.rb index 8ecd23aa..a4edc483 100644 --- a/test/redis_client/test_cluster_config.rb +++ b/test/redis_client/test_cluster_config.rb @@ -103,6 +103,18 @@ def test_command_builder assert_equal(::RedisClient::CommandBuilder, ::RedisClient::ClusterConfig.new.command_builder) end + def test_concurrency + [ + { value: nil, want: { model: :none } }, + { value: { model: :none }, want: { model: :none } }, + { value: { model: :on_demand, size: 3 }, want: { model: :on_demand, size: 3 } }, + { value: { model: :pooled, size: 6 }, want: { model: :pooled, size: 6 } } + ].each do |c| + cfg = ::RedisClient::ClusterConfig.new(concurrency: c[:value]) + assert_equal(c[:want], cfg.instance_variable_get(:@concurrency)) + end + end + def test_build_node_configs config = ::RedisClient::ClusterConfig.new [