@@ -34,7 +34,7 @@ def failover
34
34
sleep 3
35
35
end
36
36
37
- def start_resharding ( slot , src_node_key , dest_node_key )
37
+ def start_resharding ( slot , src_node_key , dest_node_key , slice_size : 10 )
38
38
node_map = hashify_node_map ( @clients . first )
39
39
src_node_id = node_map . fetch ( src_node_key )
40
40
src_client = find_client ( @clients , src_node_key )
@@ -45,11 +45,21 @@ def start_resharding(slot, src_node_key, dest_node_key)
45
45
dest_client . cluster ( :setslot , slot , 'IMPORTING' , src_node_id )
46
46
src_client . cluster ( :setslot , slot , 'MIGRATING' , dest_node_id )
47
47
48
+ keys_count = src_client . cluster ( :countkeysinslot , slot )
48
49
loop do
49
- keys = src_client . cluster ( :getkeysinslot , slot , 100 )
50
+ break if keys_count <= 0
51
+ keys = src_client . cluster ( :getkeysinslot , slot , slice_size )
50
52
break if keys . empty?
51
- keys . each { |k | src_client . migrate ( k , host : dest_host , port : dest_port ) }
52
- sleep 0.1
53
+ keys . each do |k |
54
+ begin
55
+ src_client . migrate ( k , host : dest_host , port : dest_port )
56
+ rescue Redis ::CommandError => err
57
+ raise unless err . message . start_with? ( 'IOERR' )
58
+ src_client . migrate ( k , host : dest_host , port : dest_port , replace : true ) # retry once
59
+ ensure
60
+ keys_count -= 1
61
+ end
62
+ end
53
63
end
54
64
end
55
65
0 commit comments