@@ -78,11 +78,13 @@ def call_loop(command, timeout = 0, &block)
78
78
end
79
79
80
80
def call_pipeline ( pipeline )
81
- node_keys , command_keys = extract_keys_in_pipeline ( pipeline )
82
- raise CrossSlotPipeliningError , command_keys if node_keys . size > 1
81
+ node_keys = pipeline . commands . map { |cmd | find_node_key ( cmd , primary_only : true ) } . compact . uniq
82
+ if node_keys . size > 1
83
+ raise ( CrossSlotPipeliningError ,
84
+ pipeline . commands . map { |cmd | @command . extract_first_key ( cmd ) } . reject ( &:empty? ) . uniq )
85
+ end
83
86
84
- node = find_node ( node_keys . first )
85
- try_send ( node , :call_pipeline , pipeline )
87
+ try_send ( find_node ( node_keys . first ) , :call_pipeline , pipeline )
86
88
end
87
89
88
90
def call_with_timeout ( command , timeout , &block )
@@ -253,14 +255,14 @@ def assign_node(command)
253
255
find_node ( node_key )
254
256
end
255
257
256
- def find_node_key ( command )
258
+ def find_node_key ( command , primary_only : false )
257
259
key = @command . extract_first_key ( command )
258
260
return if key . empty?
259
261
260
262
slot = KeySlotConverter . convert ( key )
261
263
return unless @slot . exists? ( slot )
262
264
263
- if @command . should_send_to_master? ( command )
265
+ if @command . should_send_to_master? ( command ) || primary_only
264
266
@slot . find_node_key_of_master ( slot )
265
267
else
266
268
@slot . find_node_key_of_slave ( slot )
@@ -285,11 +287,5 @@ def update_cluster_info!(node_key = nil)
285
287
@node . map ( &:disconnect )
286
288
@node , @slot = fetch_cluster_info! ( @option )
287
289
end
288
-
289
- def extract_keys_in_pipeline ( pipeline )
290
- node_keys = pipeline . commands . map { |cmd | find_node_key ( cmd ) } . compact . uniq
291
- command_keys = pipeline . commands . map { |cmd | @command . extract_first_key ( cmd ) } . reject ( &:empty? )
292
- [ node_keys , command_keys ]
293
- end
294
290
end
295
291
end
0 commit comments