8
8
9
9
class RedisClient
10
10
class Cluster
11
+ ZERO_CURSOR_FOR_SCAN = '0'
12
+
11
13
def initialize ( config , pool : nil , **kwargs )
12
14
@config = config . dup
13
15
@pool = pool
@@ -34,7 +36,14 @@ def blocking_call(timeout, *command, **kwargs, &block)
34
36
end
35
37
36
38
def scan ( *args , **kwargs , &block )
37
- _scan ( :scan , *args , **kwargs , &block )
39
+ raise ArgumentError , 'block required' unless block
40
+
41
+ cursor = ZERO_CURSOR_FOR_SCAN
42
+ loop do
43
+ cursor , keys = _scan ( 'SCAN' , cursor , *args , **kwargs )
44
+ keys . each ( &block )
45
+ break if cursor == ZERO_CURSOR_FOR_SCAN
46
+ end
38
47
end
39
48
40
49
def sscan ( key , *args , **kwargs , &block )
@@ -83,32 +92,6 @@ def connected?
83
92
84
93
def close
85
94
@node . each ( &:close )
86
- true
87
- end
88
-
89
- # TODO: remove
90
- def call_pipeline ( pipeline )
91
- node_keys = pipeline . commands . filter_map { |cmd | find_node_key ( cmd , primary_only : true ) } . uniq
92
- if node_keys . size > 1
93
- raise ( CrossSlotPipeliningError ,
94
- pipeline . commands . map { |cmd | @command . extract_first_key ( cmd ) } . reject ( &:empty? ) . uniq )
95
- end
96
-
97
- try_send ( find_node ( node_keys . first ) , :call_pipeline , pipeline )
98
- end
99
-
100
- # TODO: remove
101
- def process ( commands , &block )
102
- if commands . size == 1 &&
103
- %w[ unsubscribe punsubscribe ] . include? ( commands . first . first . to_s . downcase ) &&
104
- commands . first . size == 1
105
-
106
- # Node is indeterminate. We do just a best-effort try here.
107
- @node . process_all ( commands , &block )
108
- else
109
- node = assign_node ( commands . first )
110
- try_send ( node , :process , commands , &block )
111
- end
112
95
end
113
96
114
97
private
@@ -131,7 +114,7 @@ def send_command(method, *command, **kwargs, &block) # rubocop:disable Metrics/A
131
114
when 'wait' then @node . call_primary ( method , *command , **kwargs , &block ) . sum
132
115
when 'keys' then @node . call_replica ( method , *command , **kwargs , &block ) . flatten . sort
133
116
when 'dbsize' then @node . call_replica ( method , *command , **kwargs , &block ) . sum
134
- when 'scan' then _scan ( method , *command , **kwargs , & block )
117
+ when 'scan' then _scan ( *command , **kwargs )
135
118
when 'lastsave' then @node . call_all ( method , *command , **kwargs , &block ) . sort
136
119
when 'role' then @node . call_all ( method , *command , **kwargs , &block )
137
120
when 'config' then send_config_command ( method , *command , **kwargs , &block )
@@ -233,7 +216,8 @@ def try_send(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:di
233
216
raise
234
217
end
235
218
236
- def _scan ( method , *command , **kwargs , &block ) # rubocop:disable Metrics/MethodLength
219
+ def _scan ( *command , **kwargs ) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
220
+ command [ 1 ] = ZERO_CURSOR_FOR_SCAN if command . size == 1
237
221
input_cursor = Integer ( command [ 1 ] )
238
222
239
223
client_index = input_cursor % 256
@@ -242,11 +226,11 @@ def _scan(method, *command, **kwargs, &block) # rubocop:disable Metrics/MethodLe
242
226
clients = @node . scale_reading_clients
243
227
244
228
client = clients [ client_index ]
245
- return [ '0' , [ ] ] unless client
229
+ return [ ZERO_CURSOR_FOR_SCAN , [ ] ] unless client
246
230
247
231
command [ 1 ] = raw_cursor . to_s
248
232
249
- result_cursor , result_keys = client . send ( method , *command , **kwargs , & block )
233
+ result_cursor , result_keys = client . call ( *command , **kwargs )
250
234
result_cursor = Integer ( result_cursor )
251
235
252
236
client_index += 1 if result_cursor == 0
0 commit comments