Skip to content

Commit 206f50c

Browse files
authored
Merge pull request #789 from supercaracal/fix-resharding-helper
Fix resharding test helper method for cluster mode and enhance migrate command method
2 parents ae5a1e1 + 1b8984c commit 206f50c

File tree

3 files changed

+42
-12
lines changed

3 files changed

+42
-12
lines changed

lib/redis.rb

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -499,22 +499,27 @@ def restore(key, ttl, serialized_value, options = {})
499499

500500
# Transfer a key from the connected instance to another instance.
501501
#
502-
# @param [String] key
502+
# @param [String, Array<String>] key
503503
# @param [Hash] options
504504
# - `:host => String`: host of instance to migrate to
505505
# - `:port => Integer`: port of instance to migrate to
506506
# - `:db => Integer`: database to migrate to (default: same as source)
507507
# - `:timeout => Integer`: timeout (default: same as connection timeout)
508+
# - `:copy => Boolean`: Do not remove the key from the local instance.
509+
# - `:replace => Boolean`: Replace existing key on the remote instance.
508510
# @return [String] `"OK"`
509511
def migrate(key, options)
510-
host = options[:host] || raise(RuntimeError, ":host not specified")
511-
port = options[:port] || raise(RuntimeError, ":port not specified")
512-
db = (options[:db] || @client.db).to_i
513-
timeout = (options[:timeout] || @client.timeout).to_i
512+
args = [:migrate]
513+
args << (options[:host] || raise(':host not specified'))
514+
args << (options[:port] || raise(':port not specified'))
515+
args << (key.is_a?(String) ? key : '')
516+
args << (options[:db] || @client.db).to_i
517+
args << (options[:timeout] || @client.timeout).to_i
518+
args << 'COPY' if options[:copy]
519+
args << 'REPLACE' if options[:replace]
520+
args += ['KEYS', *key] if key.is_a?(Array)
514521

515-
synchronize do |client|
516-
client.call([:migrate, host, port, key, db, timeout])
517-
end
522+
synchronize { |client| client.call(args) }
518523
end
519524

520525
# Delete one or more keys.

test/commands_on_value_types_test.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,21 @@ def test_migrate
202202
actual = redis.migrate("foo", options.merge(:timeout => default_timeout + 1))
203203
expected = ["127.0.0.1", "1234", "foo", default_db.to_s, (default_timeout + 1).to_s]
204204
assert_equal expected, actual
205+
206+
# Test copy override
207+
actual = redis.migrate('foo', options.merge(copy: true))
208+
expected = ['127.0.0.1', '1234', 'foo', default_db.to_s, default_timeout.to_s, 'COPY']
209+
assert_equal expected, actual
210+
211+
# Test replace override
212+
actual = redis.migrate('foo', options.merge(replace: true))
213+
expected = ['127.0.0.1', '1234', 'foo', default_db.to_s, default_timeout.to_s, 'REPLACE']
214+
assert_equal expected, actual
215+
216+
# Test multiple keys
217+
actual = redis.migrate(%w[foo bar baz], options)
218+
expected = ['127.0.0.1', '1234', '', default_db.to_s, default_timeout.to_s, 'KEYS', 'foo', 'bar', 'baz']
219+
assert_equal expected, actual
205220
end
206221
end
207222
end

test/support/cluster/orchestrator.rb

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def failover
3939
wait_cluster_recovering(@clients)
4040
end
4141

42-
def start_resharding(slot, src_node_key, dest_node_key)
42+
def start_resharding(slot, src_node_key, dest_node_key, slice_size: 10)
4343
node_map = hashify_node_map(@clients.first)
4444
src_node_id = node_map.fetch(src_node_key)
4545
src_client = find_client(@clients, src_node_key)
@@ -50,11 +50,21 @@ def start_resharding(slot, src_node_key, dest_node_key)
5050
dest_client.cluster(:setslot, slot, 'IMPORTING', src_node_id)
5151
src_client.cluster(:setslot, slot, 'MIGRATING', dest_node_id)
5252

53+
keys_count = src_client.cluster(:countkeysinslot, slot)
5354
loop do
54-
keys = src_client.cluster(:getkeysinslot, slot, 100)
55+
break if keys_count <= 0
56+
keys = src_client.cluster(:getkeysinslot, slot, slice_size)
5557
break if keys.empty?
56-
keys.each { |k| src_client.migrate(k, host: dest_host, port: dest_port) }
57-
sleep 0.1
58+
keys.each do |k|
59+
begin
60+
src_client.migrate(k, host: dest_host, port: dest_port)
61+
rescue Redis::CommandError => err
62+
raise unless err.message.start_with?('IOERR')
63+
src_client.migrate(k, host: dest_host, port: dest_port, replace: true) # retry once
64+
ensure
65+
keys_count -= 1
66+
end
67+
end
5868
end
5969
end
6070

0 commit comments

Comments
 (0)