@@ -5,6 +5,7 @@ class Kredis::Migration
55
66 def initialize ( config = :shared )
77 @redis = Kredis . configured_for config
8+ @pipeline = nil
89 # TODO: Replace script loading with `copy` command once Redis 6.2+ is the minimum supported version.
910 @copy_sha = @redis . script "load" , "redis.call('SETNX', KEYS[2], redis.call('GET', KEYS[1])); return 1;"
1011 end
@@ -23,7 +24,7 @@ def migrate(from:, to:)
2324
2425 if to . present? && from != namespaced_to
2526 log_migration "Migrating key #{ from } to #{ namespaced_to } " do
26- @redis . evalsha @copy_sha , keys : [ from , namespaced_to ]
27+ connection . evalsha @copy_sha , keys : [ from , namespaced_to ]
2728 end
2829 else
2930 log_migration "Skipping blank/unaltered migration key #{ from } → #{ to } "
@@ -32,18 +33,26 @@ def migrate(from:, to:)
3233
3334 def delete_all ( key_pattern )
3435 each_key_batch_matching ( key_pattern ) do |keys |
35- @redis . del *keys
36+ connection . del *keys
3637 end
3738 end
3839
3940 private
4041 SCAN_BATCH_SIZE = 1_000
4142
43+ def connection
44+ @pipeline || @redis
45+ end
46+
4247 def each_key_batch_matching ( key_pattern , &block )
4348 cursor = "0"
4449 begin
4550 cursor , keys = @redis . scan ( cursor , match : key_pattern , count : SCAN_BATCH_SIZE )
46- @redis . pipelined { yield keys }
51+ @redis . pipelined do |pipeline |
52+ @pipeline = pipeline
53+ yield keys
54+ @pipeline = nil
55+ end
4756 end until cursor == "0"
4857 end
4958
0 commit comments