|
| 1 | +# frozen_string_literal: true |
| 2 | + |
| 3 | +module Pwb |
| 4 | + class TenantShardMigrator |
| 5 | + class MigrationError < StandardError; end |
| 6 | + |
| 7 | + BATCH_SIZE = ENV.fetch('PWB_TENANT_MIGRATION_BATCH', 500).to_i |
| 8 | + |
| 9 | + attr_reader :website, :target_shard, :logger |
| 10 | + |
| 11 | + def initialize(website:, target_shard:, logger: Rails.logger, dry_run: false) |
| 12 | + @website = website |
| 13 | + @target_shard = normalize_shard(target_shard) |
| 14 | + @logger = logger |
| 15 | + @dry_run = dry_run |
| 16 | + end |
| 17 | + |
| 18 | + def call |
| 19 | + validate_target! |
| 20 | + return if @dry_run |
| 21 | + |
| 22 | + ActsAsTenant.without_tenant do |
| 23 | + website.with_lock do |
| 24 | + migrate_records! |
| 25 | + website.update!(shard_name: target_shard.to_s) |
| 26 | + end |
| 27 | + end |
| 28 | + end |
| 29 | + |
| 30 | + private |
| 31 | + |
| 32 | + def migrate_records! |
| 33 | + source = website.database_shard |
| 34 | + logger.info("[TenantShardMigrator] Moving website ##{website.id} from #{source} → #{target_shard}") |
| 35 | + |
| 36 | + tenant_table_names.each do |table_name| |
| 37 | + migrated = migrate_table(table_name, source: source, target: target_shard) |
| 38 | + logger.info("[TenantShardMigrator] #{table_name}: migrated #{migrated} rows") if migrated.positive? |
| 39 | + end |
| 40 | + end |
| 41 | + |
| 42 | + def migrate_table(table_name, source:, target:) |
| 43 | + total = 0 |
| 44 | + |
| 45 | + loop do |
| 46 | + rows = fetch_rows(table_name, source) |
| 47 | + break if rows.empty? |
| 48 | + |
| 49 | + ids = rows.map { |row| row['id'] } |
| 50 | + ensure_no_conflicts!(table_name, ids, target: target) |
| 51 | + insert_rows(table_name, rows, target) |
| 52 | + delete_rows(table_name, ids, source) |
| 53 | + total += rows.size |
| 54 | + end |
| 55 | + |
| 56 | + total |
| 57 | + end |
| 58 | + |
| 59 | + def fetch_rows(table_name, shard) |
| 60 | + with_connection(shard) do |connection| |
| 61 | + connection.select_all(<<~SQL, 'TenantShardMigrator').to_a |
| 62 | + SELECT * |
| 63 | + FROM #{connection.quote_table_name(table_name)} |
| 64 | + WHERE website_id = #{connection.quote(website.id)} |
| 65 | + ORDER BY id ASC |
| 66 | + LIMIT #{BATCH_SIZE} |
| 67 | + SQL |
| 68 | + end |
| 69 | + end |
| 70 | + |
| 71 | + def insert_rows(table_name, rows, shard) |
| 72 | + return if rows.empty? |
| 73 | + |
| 74 | + with_connection(shard) do |connection| |
| 75 | + connection.insert_all(rows, table_name) |
| 76 | + end |
| 77 | + end |
| 78 | + |
| 79 | + def delete_rows(table_name, ids, shard) |
| 80 | + return if ids.empty? |
| 81 | + |
| 82 | + with_connection(shard) do |connection| |
| 83 | + sql = <<~SQL.squish |
| 84 | + DELETE FROM #{connection.quote_table_name(table_name)} |
| 85 | + WHERE id IN (#{ids.map { |id| connection.quote(id) }.join(', ')}) |
| 86 | + SQL |
| 87 | + connection.execute(sql) |
| 88 | + end |
| 89 | + end |
| 90 | + |
| 91 | + def ensure_no_conflicts!(table_name, ids, target:) |
| 92 | + return if ids.empty? |
| 93 | + |
| 94 | + with_connection(target) do |connection| |
| 95 | + sql = <<~SQL.squish |
| 96 | + SELECT 1 FROM #{connection.quote_table_name(table_name)} |
| 97 | + WHERE id IN (#{ids.map { |id| connection.quote(id) }.join(', ')}) |
| 98 | + LIMIT 1 |
| 99 | + SQL |
| 100 | + conflict = connection.select_value(sql) |
| 101 | + raise MigrationError, "ID conflict detected for #{table_name}" if conflict |
| 102 | + end |
| 103 | + end |
| 104 | + |
| 105 | + def with_connection(shard) |
| 106 | + PwbTenant::ApplicationRecord.connected_to(role: :writing, shard: shard) do |
| 107 | + yield PwbTenant::ApplicationRecord.connection |
| 108 | + end |
| 109 | + end |
| 110 | + |
| 111 | + def tenant_table_names |
| 112 | + @tenant_table_names ||= begin |
| 113 | + base_connection = ActiveRecord::Base.connection |
| 114 | + base_connection.tables |
| 115 | + .reject { |table| table.in?(%w[ar_internal_metadata schema_migrations active_storage_blobs active_storage_attachments]) } |
| 116 | + .select do |table| |
| 117 | + base_connection.columns(table).any? { |column| column.name == 'website_id' } |
| 118 | + end |
| 119 | + end |
| 120 | + end |
| 121 | + |
| 122 | + def validate_target! |
| 123 | + raise MigrationError, 'Website is already on the target shard' if website.database_shard == target_shard |
| 124 | + raise MigrationError, "Shard #{target_shard} is not configured" unless physical_shard_configured?(target_shard) |
| 125 | + end |
| 126 | + |
| 127 | + def physical_shard_configured?(logical_shard) |
| 128 | + Pwb::ShardRegistry.configured?(logical_shard) |
| 129 | + end |
| 130 | + |
| 131 | + def normalize_shard(value) |
| 132 | + value = value.to_sym if value.respond_to?(:to_sym) |
| 133 | + value || :default |
| 134 | + end |
| 135 | + end |
| 136 | +end |
0 commit comments