@@ -17,8 +17,53 @@ class Cluster
1717 class Router
1818 ZERO_CURSOR_FOR_SCAN = '0'
1919 TSF = -> ( f , x ) { f . nil? ? x : f . call ( x ) } . curry
20+ DEDICATED_ACTIONS = lambda do # rubocop:disable Metrics/BlockLength
21+ pick_first = -> ( reply ) { reply . first } # rubocop:disable Style/SymbolProc
22+ multiple_key_action = Action . new ( method_name : :send_multiple_keys_command )
23+ all_node_first_action = Action . new ( method_name : :send_command_to_all_nodes , reply_transformer : pick_first )
24+ primary_first_action = Action . new ( method_name : :send_command_to_primaries , reply_transformer : pick_first )
25+ not_supported_action = Action . new ( method_name : :fail_not_supported_command )
26+ keyless_action = Action . new ( method_name : :fail_keyless_command )
27+ {
28+ 'ping' => Action . new ( method_name : :send_ping_command , reply_transformer : pick_first ) ,
29+ 'wait' => Action . new ( method_name : :send_wait_command ) ,
30+ 'keys' => Action . new ( method_name : :send_command_to_replicas , reply_transformer : -> ( reply ) { reply . flatten . sort_by ( &:to_s ) } ) ,
31+ 'dbsize' => Action . new ( method_name : :send_command_to_replicas , reply_transformer : -> ( reply ) { reply . select { |e | e . is_a? ( Integer ) } . sum } ) ,
32+ 'scan' => Action . new ( method_name : :send_scan_command ) ,
33+ 'lastsave' => Action . new ( method_name : :send_command_to_all_nodes , reply_transformer : -> ( reply ) { reply . sort_by ( &:to_i ) } ) ,
34+ 'role' => Action . new ( method_name : :send_command_to_all_nodes ) ,
35+ 'config' => Action . new ( method_name : :send_config_command ) ,
36+ 'client' => Action . new ( method_name : :send_client_command ) ,
37+ 'cluster' => Action . new ( method_name : :send_cluster_command ) ,
38+ 'memory' => Action . new ( method_name : :send_memory_command ) ,
39+ 'script' => Action . new ( method_name : :send_script_command ) ,
40+ 'pubsub' => Action . new ( method_name : :send_pubsub_command ) ,
41+ 'watch' => Action . new ( method_name : :send_watch_command ) ,
42+ 'mget' => multiple_key_action ,
43+ 'mset' => multiple_key_action ,
44+ 'del' => multiple_key_action ,
45+ 'acl' => all_node_first_action ,
46+ 'auth' => all_node_first_action ,
47+ 'bgrewriteaof' => all_node_first_action ,
48+ 'bgsave' => all_node_first_action ,
49+ 'quit' => all_node_first_action ,
50+ 'save' => all_node_first_action ,
51+ 'flushall' => primary_first_action ,
52+ 'flushdb' => primary_first_action ,
53+ 'readonly' => not_supported_action ,
54+ 'readwrite' => not_supported_action ,
55+ 'shutdown' => not_supported_action ,
56+ 'discard' => keyless_action ,
57+ 'exec' => keyless_action ,
58+ 'multi' => keyless_action ,
59+ 'unwatch' => keyless_action
60+ } . each_with_object ( { } ) do |( k , v ) , acc |
61+ acc [ k ] = v
62+ acc [ k . upcase ] = v
63+ end
64+ end . call . freeze
2065
21- private_constant :ZERO_CURSOR_FOR_SCAN , :TSF
66+ private_constant :ZERO_CURSOR_FOR_SCAN , :TSF , :DEDICATED_ACTIONS
2267
2368 attr_reader :config
2469
@@ -38,16 +83,15 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs)
3883 @node . reload!
3984 @command = ::RedisClient ::Cluster ::Command . load ( @node . replica_clients . shuffle , slow_command_timeout : config . slow_command_timeout )
4085 @command_builder = @config . command_builder
41- @dedicated_actions = build_dedicated_actions
4286 rescue ::RedisClient ::Cluster ::InitialSetupError => e
4387 e . with_config ( config )
4488 raise
4589 end
4690
4791 def send_command ( method , command , *args , &block ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
48- return assign_node_and_send_command ( method , command , args , &block ) unless @dedicated_actions . key? ( command . first )
92+ return assign_node_and_send_command ( method , command , args , &block ) unless DEDICATED_ACTIONS . key? ( command . first )
4993
50- action = @dedicated_actions [ command . first ]
94+ action = DEDICATED_ACTIONS [ command . first ]
5195 return send ( action . method_name , method , command , args , &block ) if action . reply_transformer . nil?
5296
5397 reply = send ( action . method_name , method , command , args )
@@ -257,53 +301,6 @@ def close
257301
258302 private
259303
260- def build_dedicated_actions # rubocop:disable Metrics/AbcSize
261- pick_first = -> ( reply ) { reply . first } # rubocop:disable Style/SymbolProc
262- multiple_key_action = Action . new ( method_name : :send_multiple_keys_command )
263- all_node_first_action = Action . new ( method_name : :send_command_to_all_nodes , reply_transformer : pick_first )
264- primary_first_action = Action . new ( method_name : :send_command_to_primaries , reply_transformer : pick_first )
265- not_supported_action = Action . new ( method_name : :fail_not_supported_command )
266- keyless_action = Action . new ( method_name : :fail_keyless_command )
267- actions = {
268- 'ping' => Action . new ( method_name : :send_ping_command , reply_transformer : pick_first ) ,
269- 'wait' => Action . new ( method_name : :send_wait_command ) ,
270- 'keys' => Action . new ( method_name : :send_command_to_replicas , reply_transformer : -> ( reply ) { reply . flatten . sort_by ( &:to_s ) } ) ,
271- 'dbsize' => Action . new ( method_name : :send_command_to_replicas , reply_transformer : -> ( reply ) { reply . select { |e | e . is_a? ( Integer ) } . sum } ) ,
272- 'scan' => Action . new ( method_name : :send_scan_command ) ,
273- 'lastsave' => Action . new ( method_name : :send_command_to_all_nodes , reply_transformer : -> ( reply ) { reply . sort_by ( &:to_i ) } ) ,
274- 'role' => Action . new ( method_name : :send_command_to_all_nodes ) ,
275- 'config' => Action . new ( method_name : :send_config_command ) ,
276- 'client' => Action . new ( method_name : :send_client_command ) ,
277- 'cluster' => Action . new ( method_name : :send_cluster_command ) ,
278- 'memory' => Action . new ( method_name : :send_memory_command ) ,
279- 'script' => Action . new ( method_name : :send_script_command ) ,
280- 'pubsub' => Action . new ( method_name : :send_pubsub_command ) ,
281- 'watch' => Action . new ( method_name : :send_watch_command ) ,
282- 'mget' => multiple_key_action ,
283- 'mset' => multiple_key_action ,
284- 'del' => multiple_key_action ,
285- 'acl' => all_node_first_action ,
286- 'auth' => all_node_first_action ,
287- 'bgrewriteaof' => all_node_first_action ,
288- 'bgsave' => all_node_first_action ,
289- 'quit' => all_node_first_action ,
290- 'save' => all_node_first_action ,
291- 'flushall' => primary_first_action ,
292- 'flushdb' => primary_first_action ,
293- 'readonly' => not_supported_action ,
294- 'readwrite' => not_supported_action ,
295- 'shutdown' => not_supported_action ,
296- 'discard' => keyless_action ,
297- 'exec' => keyless_action ,
298- 'multi' => keyless_action ,
299- 'unwatch' => keyless_action
300- } . freeze
301- actions . each_with_object ( { } ) do |( k , v ) , acc |
302- acc [ k ] = v
303- acc [ k . upcase ] = v
304- end . freeze
305- end
306-
307304 def send_command_to_all_nodes ( method , command , args , &block )
308305 @node . call_all ( method , command , args , &block )
309306 end
0 commit comments