@@ -22,6 +22,13 @@ class Router
22
22
23
23
attr_reader :config
24
24
25
+ Action = Struct . new (
26
+ 'RedisCommandRoutingAction' ,
27
+ :action_method_name ,
28
+ :after_action_proc ,
29
+ keyword_init : true
30
+ )
31
+
25
32
def initialize ( config , concurrent_worker , pool : nil , **kwargs )
26
33
@config = config
27
34
@concurrent_worker = concurrent_worker
@@ -31,88 +38,20 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs)
31
38
@node . reload!
32
39
@command = ::RedisClient ::Cluster ::Command . load ( @node . replica_clients . shuffle , slow_command_timeout : config . slow_command_timeout )
33
40
@command_builder = @config . command_builder
41
+ @dedicated_actions = build_dedicated_actions
34
42
rescue ::RedisClient ::Cluster ::InitialSetupError => e
35
43
e . with_config ( config )
36
44
raise
37
45
end
38
46
39
- def send_command ( method , command , *args , &block ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/MethodLength
40
- cmd_name = command . first
41
-
42
- if cmd_name . casecmp ( 'get' ) . zero?
43
- node = assign_node ( command )
44
- try_send ( node , method , command , args , &block )
45
- elsif cmd_name . casecmp ( 'mget' ) . zero?
46
- send_multiple_keys_command ( command . first , method , command , args , &block )
47
- elsif cmd_name . casecmp ( 'set' ) . zero?
48
- node = assign_node ( command )
49
- try_send ( node , method , command , args , &block )
50
- elsif cmd_name . casecmp ( 'mset' ) . zero?
51
- send_multiple_keys_command ( command . first , method , command , args , &block )
52
- elsif cmd_name . casecmp ( 'del' ) . zero?
53
- send_multiple_keys_command ( command . first , method , command , args , &block )
54
- elsif cmd_name . casecmp ( 'ping' ) . zero?
55
- @node . send_ping ( method , command , args ) . first . then ( &TSF . call ( block ) )
56
- elsif cmd_name . casecmp ( 'wait' ) . zero?
57
- send_wait_command ( method , command , args , &block )
58
- elsif cmd_name . casecmp ( 'keys' ) . zero?
59
- @node . call_replicas ( method , command , args ) . flatten . sort_by ( &:to_s ) . then ( &TSF . call ( block ) )
60
- elsif cmd_name . casecmp ( 'dbsize' ) . zero?
61
- @node . call_replicas ( method , command , args ) . select { |e | e . is_a? ( Integer ) } . sum . then ( &TSF . call ( block ) )
62
- elsif cmd_name . casecmp ( 'scan' ) . zero?
63
- scan ( command , seed : 1 )
64
- elsif cmd_name . casecmp ( 'lastsave' ) . zero?
65
- @node . call_all ( method , command , args ) . sort_by ( &:to_i ) . then ( &TSF . call ( block ) )
66
- elsif cmd_name . casecmp ( 'role' ) . zero?
67
- @node . call_all ( method , command , args , &block )
68
- elsif cmd_name . casecmp ( 'config' ) . zero?
69
- send_config_command ( method , command , args , &block )
70
- elsif cmd_name . casecmp ( 'client' ) . zero?
71
- send_client_command ( method , command , args , &block )
72
- elsif cmd_name . casecmp ( 'cluster' ) . zero?
73
- send_cluster_command ( method , command , args , &block )
74
- elsif cmd_name . casecmp ( 'memory' ) . zero?
75
- send_memory_command ( method , command , args , &block )
76
- elsif cmd_name . casecmp ( 'script' ) . zero?
77
- send_script_command ( method , command , args , &block )
78
- elsif cmd_name . casecmp ( 'pubsub' ) . zero?
79
- send_pubsub_command ( method , command , args , &block )
80
- elsif cmd_name . casecmp ( 'watch' ) . zero?
81
- send_watch_command ( command , &block )
82
- elsif cmd_name . casecmp ( 'acl' ) . zero?
83
- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
84
- elsif cmd_name . casecmp ( 'auth' ) . zero?
85
- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
86
- elsif cmd_name . casecmp ( 'bgrewriteaof' ) . zero?
87
- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
88
- elsif cmd_name . casecmp ( 'bgsave' ) . zero?
89
- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
90
- elsif cmd_name . casecmp ( 'quit' ) . zero?
91
- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
92
- elsif cmd_name . casecmp ( 'save' ) . zero?
93
- @node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
94
- elsif cmd_name . casecmp ( 'flushall' ) . zero?
95
- @node . call_primaries ( method , command , args ) . first . then ( &TSF . call ( block ) )
96
- elsif cmd_name . casecmp ( 'flushdb' ) . zero?
97
- @node . call_primaries ( method , command , args ) . first . then ( &TSF . call ( block ) )
98
- elsif cmd_name . casecmp ( 'readonly' ) . zero?
99
- raise ::RedisClient ::Cluster ::OrchestrationCommandNotSupported . from_command ( command . first ) . with_config ( @config )
100
- elsif cmd_name . casecmp ( 'readwrite' ) . zero?
101
- raise ::RedisClient ::Cluster ::OrchestrationCommandNotSupported . from_command ( command . first ) . with_config ( @config )
102
- elsif cmd_name . casecmp ( 'shutdown' ) . zero?
103
- raise ::RedisClient ::Cluster ::OrchestrationCommandNotSupported . from_command ( command . first ) . with_config ( @config )
104
- elsif cmd_name . casecmp ( 'discard' ) . zero?
105
- raise ::RedisClient ::Cluster ::AmbiguousNodeError . from_command ( command . first ) . with_config ( @config )
106
- elsif cmd_name . casecmp ( 'exec' ) . zero?
107
- raise ::RedisClient ::Cluster ::AmbiguousNodeError . from_command ( command . first ) . with_config ( @config )
108
- elsif cmd_name . casecmp ( 'multi' ) . zero?
109
- raise ::RedisClient ::Cluster ::AmbiguousNodeError . from_command ( command . first ) . with_config ( @config )
110
- elsif cmd_name . casecmp ( 'unwatch' ) . zero?
111
- raise ::RedisClient ::Cluster ::AmbiguousNodeError . from_command ( command . first ) . with_config ( @config )
112
- else
113
- node = assign_node ( command )
114
- try_send ( node , method , command , args , &block )
115
- end
47
+ def send_command ( method , command , *args , &block ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
48
+ return assign_node_and_send_command ( method , command , args , &block ) unless @dedicated_actions . key? ( command . first )
49
+
50
+ action = @dedicated_actions [ command . first ]
51
+ return send ( action . action_method_name , method , command , args , &block ) if action . after_action_proc . nil?
52
+
53
+ reply = send ( action . action_method_name , method , command , args )
54
+ action . after_action_proc . call ( reply ) . then ( &TSF . call ( block ) )
116
55
rescue ::RedisClient ::CircuitBreaker ::OpenCircuitError
117
56
raise
118
57
rescue ::RedisClient ::Cluster ::Node ::ReloadNeeded
@@ -138,7 +77,12 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
138
77
end
139
78
140
79
# @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding
141
- def try_send ( node , method , command , args , retry_count : 3 , &block )
80
+ def assign_node_and_send_command ( method , command , args , retry_count : 3 , &block )
81
+ node = assign_node ( command )
82
+ send_command_to_node ( node , method , command , args , retry_count : retry_count , &block )
83
+ end
84
+
85
+ def send_command_to_node ( node , method , command , args , retry_count : 3 , &block )
142
86
handle_redirection ( node , command , retry_count : retry_count ) do |on_node |
143
87
if args . empty?
144
88
# prevent memory allocation for variable-length args
@@ -313,6 +257,81 @@ def close
313
257
314
258
private
315
259
260
+ def build_dedicated_actions # rubocop:disable Metrics/AbcSize
261
+ pick_first = -> ( reply ) { reply . first } # rubocop:disable Style/SymbolProc
262
+ multiple_key_action = Action . new ( action_method_name : :send_multiple_keys_command )
263
+ all_node_first_action = Action . new ( action_method_name : :send_command_to_all_nodes , after_action_proc : pick_first )
264
+ primary_first_action = Action . new ( action_method_name : :send_command_to_primaries , after_action_proc : pick_first )
265
+ not_supported_action = Action . new ( action_method_name : :fail_not_supported_command )
266
+ keyless_action = Action . new ( action_method_name : :fail_keyless_command )
267
+ actions = {
268
+ 'ping' => Action . new ( action_method_name : :send_ping_command , after_action_proc : pick_first ) ,
269
+ 'wait' => Action . new ( action_method_name : :send_wait_command ) ,
270
+ 'keys' => Action . new ( action_method_name : :send_command_to_replicas , after_action_proc : -> ( reply ) { reply . flatten . sort_by ( &:to_s ) } ) ,
271
+ 'dbsize' => Action . new ( action_method_name : :send_command_to_replicas , after_action_proc : -> ( reply ) { reply . select { |e | e . is_a? ( Integer ) } . sum } ) ,
272
+ 'scan' => Action . new ( action_method_name : :send_scan_command ) ,
273
+ 'lastsave' => Action . new ( action_method_name : :send_command_to_all_nodes , after_action_proc : -> ( reply ) { reply . sort_by ( &:to_i ) } ) ,
274
+ 'role' => Action . new ( action_method_name : :send_command_to_all_nodes ) ,
275
+ 'config' => Action . new ( action_method_name : :send_config_command ) ,
276
+ 'client' => Action . new ( action_method_name : :send_client_command ) ,
277
+ 'cluster' => Action . new ( action_method_name : :send_cluster_command ) ,
278
+ 'memory' => Action . new ( action_method_name : :send_memory_command ) ,
279
+ 'script' => Action . new ( action_method_name : :send_script_command ) ,
280
+ 'pubsub' => Action . new ( action_method_name : :send_pubsub_command ) ,
281
+ 'watch' => Action . new ( action_method_name : :send_watch_command ) ,
282
+ 'mget' => multiple_key_action ,
283
+ 'mset' => multiple_key_action ,
284
+ 'del' => multiple_key_action ,
285
+ 'acl' => all_node_first_action ,
286
+ 'auth' => all_node_first_action ,
287
+ 'bgrewriteaof' => all_node_first_action ,
288
+ 'bgsave' => all_node_first_action ,
289
+ 'quit' => all_node_first_action ,
290
+ 'save' => all_node_first_action ,
291
+ 'flushall' => primary_first_action ,
292
+ 'flushdb' => primary_first_action ,
293
+ 'readonly' => not_supported_action ,
294
+ 'readwrite' => not_supported_action ,
295
+ 'shutdown' => not_supported_action ,
296
+ 'discard' => keyless_action ,
297
+ 'exec' => keyless_action ,
298
+ 'multi' => keyless_action ,
299
+ 'unwatch' => keyless_action
300
+ } . freeze
301
+ actions . each_with_object ( { } ) do |( k , v ) , acc |
302
+ acc [ k ] = v
303
+ acc [ k . upcase ] = v
304
+ end . freeze
305
+ end
306
+
307
+ def send_command_to_all_nodes ( method , command , args , &block )
308
+ @node . call_all ( method , command , args , &block )
309
+ end
310
+
311
+ def send_command_to_primaries ( method , command , args , &block )
312
+ @node . call_primaries ( method , command , args , &block )
313
+ end
314
+
315
+ def send_command_to_replicas ( method , command , args , &block )
316
+ @node . call_replicas ( method , command , args , &block )
317
+ end
318
+
319
+ def send_ping_command ( method , command , args , &block )
320
+ @node . send_ping ( method , command , args , &block )
321
+ end
322
+
323
+ def send_scan_command ( _method , command , _args , &_block )
324
+ scan ( command , seed : 1 )
325
+ end
326
+
327
+ def fail_not_supported_command ( _method , command , _args , &_block )
328
+ raise ::RedisClient ::Cluster ::OrchestrationCommandNotSupported . from_command ( command . first ) . with_config ( @config )
329
+ end
330
+
331
+ def fail_keyless_command ( _method , command , _args , &_block )
332
+ raise ::RedisClient ::Cluster ::AmbiguousNodeError . from_command ( command . first ) . with_config ( @config )
333
+ end
334
+
316
335
def send_wait_command ( method , command , args , retry_count : 1 , &block ) # rubocop:disable Metrics/AbcSize
317
336
@node . call_primaries ( method , command , args ) . select { |r | r . is_a? ( Integer ) } . sum . then ( &TSF . call ( block ) )
318
337
rescue ::RedisClient ::Cluster ::ErrorCollection => e
@@ -363,23 +382,23 @@ def send_client_command(method, command, args, &block) # rubocop:disable Metrics
363
382
364
383
def send_cluster_command ( method , command , args , &block ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
365
384
if command [ 1 ] . casecmp ( 'addslots' ) . zero?
366
- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
385
+ fail_not_supported_command ( method , command , args , & block )
367
386
elsif command [ 1 ] . casecmp ( 'delslots' ) . zero?
368
- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
387
+ fail_not_supported_command ( method , command , args , & block )
369
388
elsif command [ 1 ] . casecmp ( 'failover' ) . zero?
370
- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
389
+ fail_not_supported_command ( method , command , args , & block )
371
390
elsif command [ 1 ] . casecmp ( 'forget' ) . zero?
372
- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
391
+ fail_not_supported_command ( method , command , args , & block )
373
392
elsif command [ 1 ] . casecmp ( 'meet' ) . zero?
374
- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
393
+ fail_not_supported_command ( method , command , args , & block )
375
394
elsif command [ 1 ] . casecmp ( 'replicate' ) . zero?
376
- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
395
+ fail_not_supported_command ( method , command , args , & block )
377
396
elsif command [ 1 ] . casecmp ( 'reset' ) . zero?
378
- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
397
+ fail_not_supported_command ( method , command , args , & block )
379
398
elsif command [ 1 ] . casecmp ( 'set-config-epoch' ) . zero?
380
- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
399
+ fail_not_supported_command ( method , command , args , & block )
381
400
elsif command [ 1 ] . casecmp ( 'setslot' ) . zero?
382
- raise :: RedisClient :: Cluster :: OrchestrationCommandNotSupported . from_command ( [ 'cluster' , command [ 1 ] ] ) . with_config ( @config )
401
+ fail_not_supported_command ( method , command , args , & block )
383
402
elsif command [ 1 ] . casecmp ( 'saveconfig' ) . zero?
384
403
@node . call_all ( method , command , args ) . first . then ( &TSF . call ( block ) )
385
404
elsif command [ 1 ] . casecmp ( 'getkeysinslot' ) . zero?
@@ -428,7 +447,7 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics
428
447
end
429
448
end
430
449
431
- def send_watch_command ( command )
450
+ def send_watch_command ( _method , command , _args , & _block )
432
451
unless block_given?
433
452
msg = 'A block required. And you need to use the block argument as a client for the transaction.'
434
453
raise ::RedisClient ::Cluster ::Transaction ::ConsistencyError . new ( msg ) . with_config ( @config )
@@ -443,8 +462,9 @@ def send_watch_command(command)
443
462
end
444
463
end
445
464
446
- def send_multiple_keys_command ( cmd , method , command , args , &block ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
465
+ def send_multiple_keys_command ( method , command , args , &block ) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
447
466
# This implementation is prioritized performance rather than readability or so.
467
+ cmd = command . first
448
468
if cmd . casecmp ( 'mget' ) . zero?
449
469
single_key_cmd = 'get'
450
470
keys_step = 1
@@ -458,7 +478,7 @@ def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:dis
458
478
raise NotImplementedError , cmd
459
479
end
460
480
461
- return try_send ( assign_node ( command ) , method , command , args , &block ) if command . size <= keys_step + 1 || ::RedisClient ::Cluster ::KeySlotConverter . hash_tag_included? ( command [ 1 ] )
481
+ return assign_node_and_send_command ( method , command , args , &block ) if command . size <= keys_step + 1 || ::RedisClient ::Cluster ::KeySlotConverter . hash_tag_included? ( command [ 1 ] )
462
482
463
483
seed = @config . use_replica? && @config . replica_affinity == :random ? nil : Random . new_seed
464
484
pipeline = ::RedisClient ::Cluster ::Pipeline . new ( self , @command_builder , @concurrent_worker , exception : true , seed : seed )
0 commit comments