@@ -9,54 +9,48 @@ class Pipeline
99 ReplySizeError = Class . new ( ::RedisClient ::Error )
1010 MAX_THREADS = Integer ( ENV . fetch ( 'REDIS_CLIENT_MAX_THREADS' , 5 ) )
1111
12- def initialize ( router , command_builder )
12+ def initialize ( router , command_builder , seed : Random . new_seed )
1313 @router = router
1414 @command_builder = command_builder
15- @grouped = Hash . new ( [ ] . freeze )
15+ @grouped = { }
1616 @size = 0
17- @seed = Random . new_seed
17+ @seed = seed
1818 end
1919
2020 def call ( *args , **kwargs , &block )
2121 command = @command_builder . generate ( args , kwargs )
2222 node_key = @router . find_node_key ( command , seed : @seed )
23- @grouped [ node_key ] += [ [ @size , :call_v , command , block ] ]
24- @size += 1
23+ add_line ( node_key , [ @size , :call_v , command , block ] )
2524 end
2625
2726 def call_v ( args , &block )
2827 command = @command_builder . generate ( args )
2928 node_key = @router . find_node_key ( command , seed : @seed )
30- @grouped [ node_key ] += [ [ @size , :call_v , command , block ] ]
31- @size += 1
29+ add_line ( node_key , [ @size , :call_v , command , block ] )
3230 end
3331
3432 def call_once ( *args , **kwargs , &block )
3533 command = @command_builder . generate ( args , kwargs )
3634 node_key = @router . find_node_key ( command , seed : @seed )
37- @grouped [ node_key ] += [ [ @size , :call_once_v , command , block ] ]
38- @size += 1
35+ add_line ( node_key , [ @size , :call_once_v , command , block ] )
3936 end
4037
4138 def call_once_v ( args , &block )
4239 command = @command_builder . generate ( args )
4340 node_key = @router . find_node_key ( command , seed : @seed )
44- @grouped [ node_key ] += [ [ @size , :call_once_v , command , block ] ]
45- @size += 1
41+ add_line ( node_key , [ @size , :call_once_v , command , block ] )
4642 end
4743
4844 def blocking_call ( timeout , *args , **kwargs , &block )
4945 command = @command_builder . generate ( args , kwargs )
5046 node_key = @router . find_node_key ( command , seed : @seed )
51- @grouped [ node_key ] += [ [ @size , :blocking_call_v , timeout , command , block ] ]
52- @size += 1
47+ add_line ( node_key , [ @size , :blocking_call_v , timeout , command , block ] )
5348 end
5449
5550 def blocking_call_v ( timeout , args , &block )
5651 command = @command_builder . generate ( args )
5752 node_key = @router . find_node_key ( command , seed : @seed )
58- @grouped [ node_key ] += [ [ @size , :blocking_call_v , timeout , command , block ] ]
59- @size += 1
53+ add_line ( node_key , [ @size , :blocking_call_v , timeout , command , block ] )
6054 end
6155
6256 def empty?
@@ -92,6 +86,14 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
9286
9387 raise ::RedisClient ::Cluster ::ErrorCollection , errors
9488 end
89+
90+ private
91+
92+ def add_line ( node_key , line )
93+ @grouped [ node_key ] = [ ] unless @grouped . key? ( node_key )
94+ @grouped [ node_key ] << line
95+ @size += 1
96+ end
9597 end
9698 end
9799end
0 commit comments