Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ gem 'redis-cluster-client'
| `:replica_affinity` | Symbol or String | `:random` | scale reading strategy, `:random`, `random_with_primary` or `:latency` are valid |
| `:fixed_hostname` | String | `nil` | required if client should connect to single endpoint with SSL |
| `:slow_command_timeout` | Integer | `-1` | timeout used for "slow" queries that fetch metdata e.g. CLUSTER NODES, COMMAND |
| `:concurrency` | Hash | `{ model: :on_demand, size: 5}` | concurrency settings, `:on_demand`, `:pooled` and `:none` are valid models, size is a max number of workers, `:none` model is no concurrency, Please choose the one suited your environment if needed. |
| `:concurrency` | Hash | `{ model: :none }` | concurrency settings, `:on_demand`, `:pooled` and `:none` are valid models, size is a max number of workers, `:none` model is no concurrency, Please choose the one suited your environment if needed. |
| `:connect_with_original_config` | Boolean | `false` | `true` if client should retry the connection using the original endpoint that was passed in |
| `:max_startup_sample` | Integer | `3` | maximum number of nodes to fetch `CLUSTER NODES` information for startup |

Expand Down
10 changes: 4 additions & 6 deletions lib/redis_client/cluster/concurrent_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,12 @@ def inspect

module_function

def create(model: :on_demand, size: 5)
size = size.positive? ? size : 5

def create(model: :none, size: 5)
case model
when :on_demand, nil then ::RedisClient::Cluster::ConcurrentWorker::OnDemand.new(size: size)
when :pooled then ::RedisClient::Cluster::ConcurrentWorker::Pooled.new(size: size)
when :none then ::RedisClient::Cluster::ConcurrentWorker::None.new
else raise ArgumentError, "Unknown model: #{model}"
when :on_demand then ::RedisClient::Cluster::ConcurrentWorker::OnDemand.new(size: size)
when :pooled then ::RedisClient::Cluster::ConcurrentWorker::Pooled.new(size: size)
else raise ArgumentError, "unknown model: #{model}"
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/redis_client/cluster/concurrent_worker/on_demand.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ class Cluster
module ConcurrentWorker
class OnDemand
def initialize(size:)
raise ArgumentError, "size must be positive: #{size}" unless size.positive?

@q = SizedQueue.new(size)
end

Expand Down
2 changes: 2 additions & 0 deletions lib/redis_client/cluster/concurrent_worker/pooled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ module ConcurrentWorker
# So it consumes memory 1 MB multiplied a number of workers.
class Pooled
def initialize(size:)
raise ArgumentError, "size must be positive: #{size}" unless size.positive?

@size = size
setup
end
Expand Down
16 changes: 10 additions & 6 deletions lib/redis_client/cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ClusterConfig
VALID_NODES_KEYS = %i[ssl username password host port db].freeze
MERGE_CONFIG_KEYS = %i[ssl username password].freeze
IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze
MAX_WORKERS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', 5))
MAX_WORKERS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', -1)) # for backward compatibility
# It's used with slow queries of fetching meta data like CLUSTER NODES, COMMAND and so on.
SLOW_COMMAND_TIMEOUT = Float(ENV.fetch('REDIS_CLIENT_SLOW_COMMAND_TIMEOUT', -1))
# It affects to strike a balance between load and stability in initialization or changed states.
Expand Down Expand Up @@ -110,12 +110,16 @@ def server_url
private

def merge_concurrency_option(option)
case option
when Hash
option = option.transform_keys(&:to_sym)
{ size: MAX_WORKERS }.merge(option)
else { size: MAX_WORKERS }
opts = {}

if MAX_WORKERS.positive?
opts[:model] = :on_demand
opts[:size] = MAX_WORKERS
end

opts.merge!(option.transform_keys(&:to_sym)) if option.is_a?(Hash)
opts[:model] = :none if opts.empty?
opts.freeze
end

def build_node_configs(addrs)
Expand Down
12 changes: 12 additions & 0 deletions test/redis_client/test_cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ def test_command_builder
assert_equal(::RedisClient::CommandBuilder, ::RedisClient::ClusterConfig.new.command_builder)
end

def test_concurrency
[
{ value: nil, want: { model: :none } },
{ value: { model: :none }, want: { model: :none } },
{ value: { model: :on_demand, size: 3 }, want: { model: :on_demand, size: 3 } },
{ value: { model: :pooled, size: 6 }, want: { model: :pooled, size: 6 } }
].each do |c|
cfg = ::RedisClient::ClusterConfig.new(concurrency: c[:value])
assert_equal(c[:want], cfg.instance_variable_get(:@concurrency))
end
end

def test_build_node_configs
config = ::RedisClient::ClusterConfig.new
[
Expand Down