@@ -96,7 +96,12 @@ def finish_resharding(slot:, src_node_key:, dest_node_key:)
96
96
dest = find_client_by_natted_node_key ( @clients , dest_node_key )
97
97
src = find_client_by_natted_node_key ( @clients , src_node_key )
98
98
rest = take_masters ( @clients , shard_size : @shard_size ) . reject { |c | c . equal? ( dest ) || c . equal? ( src ) }
99
- ( [ dest , src ] + rest ) . each { |cli | cli . call ( 'CLUSTER' , 'SETSLOT' , slot , 'NODE' , id ) }
99
+ ( [ dest , src ] + rest ) . each do |cli |
100
+ cli . call ( 'CLUSTER' , 'SETSLOT' , slot , 'NODE' , id )
101
+ rescue ::RedisClient ::CommandError => e
102
+ raise if e . message != 'ERR Please use SETSLOT only with masters.'
103
+ # how weird, ignore
104
+ end
100
105
end
101
106
102
107
def scale_out ( primary_url :, replica_url :) # rubocop:disable Metrics/CyclomaticComplexity
@@ -123,7 +128,7 @@ def scale_out(primary_url:, replica_url:) # rubocop:disable Metrics/CyclomaticCo
123
128
124
129
rows = fetch_and_parse_cluster_nodes ( @clients )
125
130
126
- SLOT_SIZE . times . to_a . sample ( SLOT_SIZE / @shard_size ) . each do |slot |
131
+ SLOT_SIZE . times . to_a . sample ( 100 ) . sort . each do |slot |
127
132
src = rows . find do |row |
128
133
next if row [ :slots ] . empty?
129
134
@@ -135,6 +140,50 @@ def scale_out(primary_url:, replica_url:) # rubocop:disable Metrics/CyclomaticCo
135
140
end
136
141
end
137
142
143
+ def scale_in # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
144
+ rows = fetch_and_parse_cluster_nodes ( @clients )
145
+ primary_info = rows . reject { |r | r [ :slots ] . empty? } . min_by { |r | r [ :slots ] . flat_map { |start , last | ( start ..last ) . to_a } . size }
146
+ replica_info = rows . find { |r | r [ :primary_id ] == primary_info [ :id ] }
147
+ rest_primary_node_keys = rows . reject { |r | r [ :id ] == primary_info [ :id ] || r [ :role ] == 'slave' } . map { |r | r [ :node_key ] }
148
+
149
+ primary_info [ :slots ] . each do |start , last |
150
+ ( start ..last ) . each do |slot |
151
+ src = primary_info . fetch ( :node_key )
152
+ dest = rest_primary_node_keys . sample
153
+ start_resharding ( slot : slot , src_node_key : src , dest_node_key : dest )
154
+ finish_resharding ( slot : slot , src_node_key : src , dest_node_key : dest )
155
+ end
156
+ end
157
+
158
+ id2cli = fetch_internal_id_to_client_mappings ( @clients )
159
+ replica = id2cli . fetch ( replica_info [ :id ] )
160
+ primary = id2cli . fetch ( primary_info [ :id ] )
161
+ threads = @clients . map do |cli |
162
+ Thread . new ( cli ) do |c |
163
+ Thread . pass
164
+ c . pipelined do |pi |
165
+ pi . call ( 'CLUSTER' , 'FORGET' , replica_info [ :id ] )
166
+ pi . call ( 'CLUSTER' , 'FORGET' , primary_info [ :id ] )
167
+ end
168
+ rescue ::RedisClient ::Error
169
+ # ignore
170
+ end
171
+ end
172
+ threads . each ( &:join )
173
+ replica . call ( 'CLUSTER' , 'RESET' , 'SOFT' )
174
+ primary . call ( 'CLUSTER' , 'RESET' , 'SOFT' )
175
+ @clients . reject! { |c | c . equal? ( primary ) || c . equal? ( replica ) }
176
+ @shard_size -= 1
177
+ @number_of_replicas = @replica_size * @shard_size
178
+
179
+ wait_for_cluster_to_be_ready
180
+ wait_for_state ( @clients , max_attempts : @max_attempts ) do |client |
181
+ fetch_cluster_nodes ( client ) . size == @shard_size + @number_of_replicas
182
+ rescue ::RedisClient ::ConnectionError
183
+ true
184
+ end
185
+ end
186
+
138
187
def close
139
188
@clients . each ( &:close )
140
189
end
0 commit comments