@@ -22,6 +22,13 @@ class Router
2222
2323 attr_reader :config
2424
25+ Action = Struct . new (
26+ 'RedisCommandRoutingAction' ,
27+ :action_method_name ,
28+ :after_action_proc ,
29+ keyword_init : true
30+ )
31+
2532 def initialize ( config , concurrent_worker , pool : nil , **kwargs )
2633 @config = config
2734 @concurrent_worker = concurrent_worker
@@ -31,88 +38,20 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs)
3138 @node . reload!
3239 @command = ::RedisClient ::Cluster ::Command . load ( @node . replica_clients . shuffle , slow_command_timeout : config . slow_command_timeout )
3340 @command_builder = @config . command_builder
41+ @dedicated_actions = build_dedicated_actions
3442 rescue ::RedisClient ::Cluster ::InitialSetupError => e
3543 e . with_config ( config )
3644 raise
3745 end
3846
39- def send_command ( method , command , *args , &block ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/MethodLength
40- cmd_name = command . first
41-
42- if cmd_name . casecmp ( 'get' ) . zero?
43- node = assign_node ( command )
44- try_send ( node , method , command , args , &block )
45- elsif cmd_name . casecmp ( 'mget' ) . zero?
46- send_multiple_keys_command ( command . first , method , command , args , &block )
47- elsif cmd_name . casecmp ( 'set' ) . zero?
48- node = assign_node ( command )
49- try_send ( node , method , command , args , &block )
50- elsif cmd_name . casecmp ( 'mset' ) . zero?
51- send_multiple_keys_command ( command . first , method , command , args , &block )
52- elsif cmd_name . casecmp ( 'del' ) . zero?
53- send_multiple_keys_command ( command . first , method , command , args , &block )
54- elsif cmd_name . casecmp ( 'ping' ) . zero?
55- @node . send_ping ( method , command , args ) . first . then ( &TSF . call ( block ) )
56- elsif cmd_name . casecmp ( 'wait' ) . zero?
57- send_wait_command ( method , command , args , &block )
58- elsif cmd_name . casecmp ( 'keys' ) . zero?
59- @node . call_replicas ( method , command , args ) . flatten . sort_by ( &:to_s ) . then ( &TSF . call ( block ) )
60- elsif cmd_name . casecmp ( 'dbsize' ) . zero?
61- @node . call_replicas ( method , command , args ) . select { |e | e . is_a? ( Integer ) } . sum . then ( &TSF . call ( block ) )
62- elsif cmd_name . casecmp ( 'scan' ) . zero?
63- scan ( command , seed : 1 )
64- elsif cmd_name . casecmp ( 'lastsave' ) . zero?
65- @node . call_all ( method , command , args ) . sort_by ( &:to_i ) . then ( &TSF . call ( block ) )
66- elsif cmd_name . casecmp ( 'role' ) . zero?
67- @node . call_all ( method , command , args , &block )
68- elsif cmd_name . casecmp ( 'config' ) . zero?
69- send_config_command ( method , command , args , &block )
70- elsif cmd_name . casecmp ( 'client' ) . zero?
71- send_client_command ( method , command , args , &block )
72- elsif cmd_name . casecmp ( 'cluster' ) . zero?
73- send_cluster_command ( method , command , args , &block )
74- elsif cmd_name . casecmp ( 'memory' ) . zero?
75- send_memory_command ( method , command , args , &block )
76- elsif cmd_name . casecmp ( 'script' ) . zero?
77- send_script_command ( method , command , args , &block )
78- elsif cmd_name . casecmp ( 'pubsub' ) . zero?
79- send_pubsub_command ( method , command , args , &block )
80- elsif cmd_name . casecmp ( 'watch' ) . zero?
81- send_watch_command ( command , &block )
82- elsif cmd_name . casecmp ( 'acl' ) . zero?
83- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
84- elsif cmd_name . casecmp ( 'auth' ) . zero?
85- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
86- elsif cmd_name . casecmp ( 'bgrewriteaof' ) . zero?
87- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
88- elsif cmd_name . casecmp ( 'bgsave' ) . zero?
89- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
90- elsif cmd_name . casecmp ( 'quit' ) . zero?
91- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
92- elsif cmd_name . casecmp ( 'save' ) . zero?
93- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
94- elsif cmd_name . casecmp ( 'flushall' ) . zero?
95- @node . call_primaries ( method , command , args ) . first . then ( &TSF . call ( block ) )
96- elsif cmd_name . casecmp ( 'flushdb' ) . zero?
97- @node . call_primaries ( method , command , args ) . first . then ( &TSF . call ( block ) )
98- elsif cmd_name . casecmp ( 'readonly' ) . zero?
99- raise ::RedisClient ::Cluster ::OrchestrationCommandNotSupported . from_command ( command . first ) . with_config ( @config )
100- elsif cmd_name . casecmp ( 'readwrite' ) . zero?
101- raise ::RedisClient ::Cluster ::OrchestrationCommandNotSupported . from_command ( command . first ) . with_config ( @config )
102- elsif cmd_name . casecmp ( 'shutdown' ) . zero?
103- raise ::RedisClient ::Cluster ::OrchestrationCommandNotSupported . from_command ( command . first ) . with_config ( @config )
104- elsif cmd_name . casecmp ( 'discard' ) . zero?
105- raise ::RedisClient ::Cluster ::AmbiguousNodeError . from_command ( command . first ) . with_config ( @config )
106- elsif cmd_name . casecmp ( 'exec' ) . zero?
107- raise ::RedisClient ::Cluster ::AmbiguousNodeError . from_command ( command . first ) . with_config ( @config )
108- elsif cmd_name . casecmp ( 'multi' ) . zero?
109- raise ::RedisClient ::Cluster ::AmbiguousNodeError . from_command ( command . first ) . with_config ( @config )
110- elsif cmd_name . casecmp ( 'unwatch' ) . zero?
111- raise ::RedisClient ::Cluster ::AmbiguousNodeError . from_command ( command . first ) . with_config ( @config )
112- else
113- node = assign_node ( command )
114- try_send ( node , method , command , args , &block )
115- end
47+ def send_command ( method , command , *args , &block ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
48+ return assign_node_and_send_command ( method , command , args , &block ) unless @dedicated_actions . key? ( command . first )
49+
50+ action = @dedicated_actions [ command . first ]
51+ return send ( action . action_method_name , method , command , args , &block ) if action . after_action_proc . nil?
52+
53+ reply = send ( action . action_method_name , method , command , args )
54+ action . after_action_proc . call ( reply ) . then ( &TSF . call ( block ) )
11655 rescue ::RedisClient ::CircuitBreaker ::OpenCircuitError
11756 raise
11857 rescue ::RedisClient ::Cluster ::Node ::ReloadNeeded
@@ -138,7 +77,12 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
13877 end
13978
14079 # @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding
141- def try_send ( node , method , command , args , retry_count : 3 , &block )
80+ def assign_node_and_send_command ( method , command , args , retry_count : 3 , &block )
81+ node = assign_node ( command )
82+ send_command_to_node ( node , method , command , args , retry_count : retry_count , &block )
83+ end
84+
85+ def send_command_to_node ( node , method , command , args , retry_count : 3 , &block )
14286 handle_redirection ( node , command , retry_count : retry_count ) do |on_node |
14387 if args . empty?
14488 # prevent memory allocation for variable-length args
@@ -313,6 +257,81 @@ def close
313257
314258 private
315259
260+ def build_dedicated_actions # rubocop:disable Metrics/AbcSize
261+ pick_first = -> ( reply ) { reply . first } # rubocop:disable Style/SymbolProc
262+ multiple_key_action = Action . new ( action_method_name : :send_multiple_keys_command )
263+ all_node_first_action = Action . new ( action_method_name : :send_command_to_all_nodes , after_action_proc : pick_first )
264+ primary_first_action = Action . new ( action_method_name : :send_command_to_primaries , after_action_proc : pick_first )
265+ not_supported_action = Action . new ( action_method_name : :fail_not_supported_command )
266+ keyless_action = Action . new ( action_method_name : :fail_keyless_command )
267+ actions = {
268+ 'ping' => Action . new ( action_method_name : :send_ping_command , after_action_proc : pick_first ) ,
269+ 'wait' => Action . new ( action_method_name : :send_wait_command ) ,
270+ 'keys' => Action . new ( action_method_name : :send_command_to_replicas , after_action_proc : -> ( reply ) { reply . flatten . sort_by ( &:to_s ) } ) ,
271+ 'dbsize' => Action . new ( action_method_name : :send_command_to_replicas , after_action_proc : -> ( reply ) { reply . select { |e | e . is_a? ( Integer ) } . sum } ) ,
272+ 'scan' => Action . new ( action_method_name : :send_scan_command ) ,
273+ 'lastsave' => Action . new ( action_method_name : :send_command_to_all_nodes , after_action_proc : -> ( reply ) { reply . sort_by ( &:to_i ) } ) ,
274+ 'role' => Action . new ( action_method_name : :send_command_to_all_nodes ) ,
275+ 'config' => Action . new ( action_method_name : :send_config_command ) ,
276+ 'client' => Action . new ( action_method_name : :send_client_command ) ,
277+ 'cluster' => Action . new ( action_method_name : :send_cluster_command ) ,
278+ 'memory' => Action . new ( action_method_name : :send_memory_command ) ,
279+ 'script' => Action . new ( action_method_name : :send_script_command ) ,
280+ 'pubsub' => Action . new ( action_method_name : :send_pubsub_command ) ,
281+ 'watch' => Action . new ( action_method_name : :send_watch_command ) ,
282+ 'mget' => multiple_key_action ,
283+ 'mset' => multiple_key_action ,
284+ 'del' => multiple_key_action ,
285+ 'acl' => all_node_first_action ,
286+ 'auth' => all_node_first_action ,
287+ 'bgrewriteaof' => all_node_first_action ,
288+ 'bgsave' => all_node_first_action ,
289+ 'quit' => all_node_first_action ,
290+ 'save' => all_node_first_action ,
291+ 'flushall' => primary_first_action ,
292+ 'flushdb' => primary_first_action ,
293+ 'readonly' => not_supported_action ,
294+ 'readwrite' => not_supported_action ,
295+ 'shutdown' => not_supported_action ,
296+ 'discard' => keyless_action ,
297+ 'exec' => keyless_action ,
298+ 'multi' => keyless_action ,
299+ 'unwatch' => keyless_action
300+ } . freeze
301+ actions . each_with_object ( { } ) do |( k , v ) , acc |
302+ acc [ k ] = v
303+ acc [ k . upcase ] = v
304+ end . freeze
305+ end
306+
307+ def send_command_to_all_nodes ( method , command , args , &block )
308+ @node . call_all ( method , command , args , &block )
309+ end
310+
311+ def send_command_to_primaries ( method , command , args , &block )
312+ @node . call_primaries ( method , command , args , &block )
313+ end
314+
315+ def send_command_to_replicas ( method , command , args , &block )
316+ @node . call_replicas ( method , command , args , &block )
317+ end
318+
319+ def send_ping_command ( method , command , args , &block )
320+ @node . send_ping ( method , command , args , &block )
321+ end
322+
323+ def send_scan_command ( _method , command , _args , &_block )
324+ scan ( command , seed : 1 )
325+ end
326+
327+ def fail_not_supported_command ( _method , command , _args , &_block )
328+ raise ::RedisClient ::Cluster ::OrchestrationCommandNotSupported . from_command ( command . first ) . with_config ( @config )
329+ end
330+
331+ def fail_keyless_command ( _method , command , _args , &_block )
332+ raise ::RedisClient ::Cluster ::AmbiguousNodeError . from_command ( command . first ) . with_config ( @config )
333+ end
334+
316335 def send_wait_command ( method , command , args , retry_count : 1 , &block ) # rubocop:disable Metrics/AbcSize
317336 @node . call_primaries ( method , command , args ) . select { |r | r . is_a? ( Integer ) } . sum . then ( &TSF . call ( block ) )
318337 rescue ::RedisClient ::Cluster ::ErrorCollection => e
@@ -363,23 +382,23 @@ def send_client_command(method, command, args, &block) # rubocop:disable Metrics
363382
364383 def send_cluster_command ( method , command , args , &block ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
365384 if command [ 1 ] . casecmp ( 'addslots' ) . zero?
366- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
385+ fail_not_supported_command ( method , command , args , & block )
367386 elsif command [ 1 ] . casecmp ( 'delslots' ) . zero?
368- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
387+ fail_not_supported_command ( method , command , args , & block )
369388 elsif command [ 1 ] . casecmp ( 'failover' ) . zero?
370- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
389+ fail_not_supported_command ( method , command , args , & block )
371390 elsif command [ 1 ] . casecmp ( 'forget' ) . zero?
372- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
391+ fail_not_supported_command ( method , command , args , & block )
373392 elsif command [ 1 ] . casecmp ( 'meet' ) . zero?
374- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
393+ fail_not_supported_command ( method , command , args , & block )
375394 elsif command [ 1 ] . casecmp ( 'replicate' ) . zero?
376- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
395+ fail_not_supported_command ( method , command , args , & block )
377396 elsif command [ 1 ] . casecmp ( 'reset' ) . zero?
378- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
397+ fail_not_supported_command ( method , command , args , & block )
379398 elsif command [ 1 ] . casecmp ( 'set-config-epoch' ) . zero?
380- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
399+ fail_not_supported_command ( method , command , args , & block )
381400 elsif command [ 1 ] . casecmp ( 'setslot' ) . zero?
382- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
401+ fail_not_supported_command ( method , command , args , & block )
383402 elsif command [ 1 ] . casecmp ( 'saveconfig' ) . zero?
384403 @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
385404 elsif command [ 1 ] . casecmp ( 'getkeysinslot' ) . zero?
@@ -428,7 +447,7 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics
428447 end
429448 end
430449
431- def send_watch_command ( command )
450+ def send_watch_command ( _method , command , _args , & _block )
432451 unless block_given?
433452 msg = 'A block required. And you need to use the block argument as a client for the transaction.'
434453 raise ::RedisClient ::Cluster ::Transaction ::ConsistencyError . new ( msg ) . with_config ( @config )
@@ -443,8 +462,9 @@ def send_watch_command(command)
443462 end
444463 end
445464
446- def send_multiple_keys_command ( cmd , method , command , args , &block ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
465+ def send_multiple_keys_command ( method , command , args , &block ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
447466 # This implementation is prioritized performance rather than readability or so.
467+ cmd = command . first
448468 if cmd . casecmp ( 'mget' ) . zero?
449469 single_key_cmd = 'get'
450470 keys_step = 1
@@ -458,7 +478,7 @@ def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:dis
458478 raise NotImplementedError , cmd
459479 end
460480
461- return try_send ( assign_node ( command ) , method , command , args , &block ) if command . size <= keys_step + 1 || ::RedisClient ::Cluster ::KeySlotConverter . hash_tag_included? ( command [ 1 ] )
481+ return assign_node_and_send_command ( method , command , args , &block ) if command . size <= keys_step + 1 || ::RedisClient ::Cluster ::KeySlotConverter . hash_tag_included? ( command [ 1 ] )
462482
463483 seed = @config . use_replica? && @config . replica_affinity == :random ? nil : Random . new_seed
464484 pipeline = ::RedisClient ::Cluster ::Pipeline . new ( self , @command_builder , @concurrent_worker , exception : true , seed : seed )
0 commit comments