|
| 1 | +#!/usr/bin/env ruby |
| 2 | +# frozen_string_literal: true |
| 3 | + |
| 4 | +# This script will pull objects from the preservation cloud archives, and fixity check |
| 5 | +# them, using the preservation_catalog Audit::ChecksumValidator class. It will report |
| 6 | +# any errors that are found. The list of druids to retrieve is specified by any combo |
| 7 | +# of druids listed in a file (one bare druid per line with no druid: prefix), random |
| 8 | +# sampling of Moabs smaller than the zip segmenting threshold, and random sampling of |
| 9 | +# Moabs larger than the zip segmenting threshold. |
| 10 | +# |
| 11 | +# WARNING: this does some naive things that assume that the druid lists won't be more than |
| 12 | +# a few hundred long, or possibly a few thousand, since druid lists will be plucked from query |
| 13 | +# results and held in memory, parsed naively from file and held in memory, made unique with |
| 14 | +# Ruby Array#uniq, etc. |
| 15 | + |
| 16 | +# Set default environment if not already set by the command line |
| 17 | +ENV["RAILS_ENV"] ||= "production" |
| 18 | + |
| 19 | +require_relative '../config/environment' |
| 20 | +require 'optparse' |
| 21 | + |
| 22 | +include ActionView::Helpers::NumberHelper |
| 23 | + |
| 24 | +# somewhat duplicative of DruidVersionZip.ZIP_SPLIT_SIZE = '10g', but that's |
| 25 | +# zip format, and this is just bytes as an int, which is what the query wants |
| 26 | +ZIP_SEGMENT_THRESHOLD_GB = 10 |
| 27 | +ZIP_SEGMENT_THRESHOLD = ZIP_SEGMENT_THRESHOLD_GB.gigabytes |
| 28 | + |
| 29 | +options = { |
| 30 | + druid_list: '', |
| 31 | + druid_list_file: nil, |
| 32 | + single_part_druid_sample_count: 0, |
| 33 | + multipart_druid_sample_count: 0, |
| 34 | + fixity_check_base_location: "/tmp/#{ENV['USER']}/archive_fixity_checking/", |
| 35 | + endpoints_to_audit: 'aws_s3_west_2,aws_s3_east_1,gcp_s3_south_1,ibm_us_south', |
| 36 | + force_part_md5_comparison: false, |
| 37 | + dry_run: false, |
| 38 | + quiet: false |
| 39 | +} |
| 40 | + |
| 41 | +parser = OptionParser.new do |option_parser| |
| 42 | + option_parser.banner = "Usage: #{$PROGRAM_NAME} [options]" |
| 43 | + option_parser.on('--druid_list DRUID_LIST', |
| 44 | + 'comma-separated (no spaces) list of bare druids (no prefixes)') |
| 45 | + option_parser.on('--druid_list_file DRUID_LIST_FILE', |
| 46 | + 'file with a list of provided druids, e.g. from integration tests, manual tests, your own queries, etc') |
| 47 | + option_parser.on('--fixity_check_base_location FIXITY_CHECK_BASE_LOCATION', |
| 48 | + 'target directory for downloading cloud archived Moabs, where they will be inflated and fixity checked. ensure sufficient free space.') |
| 49 | + option_parser.on('--single_part_druid_sample_count SINGLE_PART_DRUID_SAMPLE_COUNT', |
| 50 | + 'number of < 10 GB Moabs to query for and retrieve (default: 0)') |
| 51 | + option_parser.on('--multipart_druid_sample_count MULTIPART_DRUID_SAMPLE_COUNT', |
| 52 | + 'number of > 10 GB Moabs to query for and retrieve (default: 0)') |
| 53 | + option_parser.on('--endpoints_to_audit ENDPOINTS_TO_AUDIT', |
| 54 | + 'list of cloud endpoints to audit (comma-separated, no spaces, names from config)') |
| 55 | + option_parser.on('--[no-]force_part_md5_comparison', 'Even if the zip parts are not downloaded on this run, compare the previously downloaded MD5 results to what is in the DB') |
| 56 | + option_parser.on('--[no-]dry_run', |
| 57 | + 'Simulate download and fixity check for druid list (defaults to false)') |
| 58 | + option_parser.on('--[no-]quiet', 'Do not output progress information (defaults to false)') |
| 59 | + option_parser.on('-h', '--help', 'Displays help.') do |
| 60 | + $stdout.puts option_parser |
| 61 | + exit |
| 62 | + end |
| 63 | +end |
| 64 | + |
| 65 | +parser.parse!(into: options) |
| 66 | +exit_code = 0 # we can update if/when we hit problems |
| 67 | + |
| 68 | + |
| 69 | +logger = ActiveSupport::BroadcastLogger.new( |
| 70 | + Logger.new(Rails.root.join('log', 'audit_archive_zip_checksum_validation.log')) |
| 71 | +) |
| 72 | +logger.broadcast_to(Logger.new($stdout)) unless options[:quiet] |
| 73 | + |
| 74 | + |
| 75 | +endpoints_to_audit = options[:endpoints_to_audit].split(',') |
| 76 | +fixity_check_base_location = Pathname(options[:fixity_check_base_location]) |
| 77 | + |
| 78 | + |
| 79 | +druids = options[:druid_list].split(',') |
| 80 | + |
| 81 | +if options[:druid_list_file].present? && File.file?(options[:druid_list_file]) |
| 82 | + druids += File.readlines(options[:druid_list_file], chomp: true) |
| 83 | +end |
| 84 | + |
| 85 | +if options[:single_part_druid_sample_count].positive? |
| 86 | + po_list = |
| 87 | + PreservedObject.joins( |
| 88 | + zipped_moab_versions: [:zip_parts, :zip_endpoint] |
| 89 | + ).group( |
| 90 | + 'preserved_objects.druid', 'zip_endpoint.endpoint_name' |
| 91 | + ).having( |
| 92 | + 'SUM(zip_parts.size) < :max_size', |
| 93 | + { max_size: ZIP_SEGMENT_THRESHOLD } # we segment zips into 10 GB chunks |
| 94 | + ).order( |
| 95 | + 'RANDOM()' |
| 96 | + ).limit( |
| 97 | + options[:single_part_druid_sample_count] |
| 98 | + ).pluck( |
| 99 | + :druid, 'COUNT(zipped_moab_versions.id)', 'zip_endpoint.endpoint_name', 'COUNT(zip_parts.id)', Arel.sql('ARRAY_AGG((version, suffix))'), 'PG_SIZE_PRETTY(SUM(zip_parts.size))', 'SUM(zip_parts.size)' |
| 100 | + ) |
| 101 | + |
| 102 | + total_size = number_to_human_size(po_list.map { |row| row.last }.sum) |
| 103 | + logger.info("sub #{ZIP_SEGMENT_THRESHOLD} GB preserved_objects results (#{total_size} total): #{po_list}") |
| 104 | + druids += po_list.map { |row| row.first }.uniq |
| 105 | +end |
| 106 | + |
| 107 | +if options[:multipart_druid_sample_count].positive? |
| 108 | + multipart_zip_po_list = |
| 109 | + PreservedObject.joins( |
| 110 | + zipped_moab_versions: [:zip_parts, :zip_endpoint] |
| 111 | + ).group( |
| 112 | + 'preserved_objects.druid', :version, 'zip_endpoint.endpoint_name' |
| 113 | + ).having( |
| 114 | + 'SUM(zip_parts.size) > :min_size', |
| 115 | + { min_size: ZIP_SEGMENT_THRESHOLD } # we segment zips into 10 GB chunks |
| 116 | + ).order( |
| 117 | + 'RANDOM()' |
| 118 | + ).limit( |
| 119 | + options[:multipart_druid_sample_count] |
| 120 | + ).pluck( |
| 121 | + :druid, :version, 'COUNT(zipped_moab_versions.id)', 'zip_endpoint.endpoint_name', 'COUNT(zip_parts.id)', 'ARRAY_AGG(suffix)', 'PG_SIZE_PRETTY(SUM(zip_parts.size))', 'SUM(zip_parts.size)' |
| 122 | + ) |
| 123 | + |
| 124 | + total_size = number_to_human_size(multipart_zip_po_list.map { |row| row.last }.sum) |
| 125 | + logger.info("over #{ZIP_SEGMENT_THRESHOLD} GB preserved_objects results (#{total_size} total): #{multipart_zip_po_list}") |
| 126 | + druids += multipart_zip_po_list.map { |row| row.first }.uniq |
| 127 | +end |
| 128 | + |
| 129 | +logger.info("druids to check: #{druids}") |
| 130 | + |
| 131 | +zp_relation = ZipPart.joins( |
| 132 | + zipped_moab_version: [:preserved_object, :zip_endpoint] |
| 133 | +).where( |
| 134 | + preserved_object: { druid: druids }, |
| 135 | + zip_endpoint: { endpoint_name: endpoints_to_audit } |
| 136 | +).order(:endpoint_name, :druid, :version, :suffix) |
| 137 | + |
| 138 | +zp_relation.pluck(:endpoint_name, :druid, :version, :suffix, :md5, :size).each do |endpoint_name, druid, version, suffix, db_md5, db_size| |
| 139 | + s3_key = Replication::DruidVersionZip.new(druid, version).s3_key(suffix) |
| 140 | + download_path = Pathname(fixity_check_base_location.join(endpoint_name, s3_key)) |
| 141 | + s3_object = ZipEndpoint.find_by!(endpoint_name:).bucket.object(s3_key) |
| 142 | + logger.info("=== retrieve #{s3_key} from #{endpoint_name}") |
| 143 | + logger.debug("download_path=#{download_path}") |
| 144 | + logger.debug("download_path.exist?=#{download_path.exist?}") |
| 145 | + logger.debug("download_path.dirname=#{download_path.dirname}") |
| 146 | + logger.debug("download_path.dirname.exist?=#{download_path.dirname.exist?}") |
| 147 | + logger.debug("s3_key=#{s3_key}") |
| 148 | + logger.debug("s3_object.exists?=#{s3_object.exists?}") |
| 149 | + logger.debug("db_md5=#{db_md5}") |
| 150 | + logger.debug("db_size=#{db_size}") |
| 151 | + logger.debug("s3_object.metadata=#{s3_object.metadata}") |
| 152 | + if options[:dry_run] |
| 153 | + logger.info("DRY RUN: skipping download and fresh MD5 computation of #{s3_key} from #{endpoint_name}") |
| 154 | + else |
| 155 | + FileUtils.mkdir_p(download_path.dirname) unless download_path.dirname.exist? |
| 156 | + just_downloaded = false |
| 157 | + unless download_path.exist? |
| 158 | + logger.info("downloading #{s3_key} from #{endpoint_name} (#{number_to_human_size(db_size)} expected)") |
| 159 | + s3_object.download_file(download_path) |
| 160 | + logger.info("downloaded #{s3_key} from #{endpoint_name} (#{number_to_human_size(File.size(download_path.to_s))} retrieved)") |
| 161 | + just_downloaded = true |
| 162 | + else |
| 163 | + logger.info("skipping download of #{s3_key} from #{endpoint_name}, already downloaded") |
| 164 | + end |
| 165 | + if just_downloaded || options[:force_part_md5_comparison] |
| 166 | + logger.info("comparing fresh MD5 calculation to DB value for #{download_path}") |
| 167 | + fresh_md5 = Digest::MD5.file(download_path) |
| 168 | + logger.info("fresh_md5.hexdigest=#{fresh_md5.hexdigest}") |
| 169 | + logger.info("fresh_md5.hexdigest==db_md5: #{fresh_md5.hexdigest==db_md5 ? '✅' : '🚨' }") |
| 170 | + else |
| 171 | + logger.info("skipping comparing fresh MD5 calculation to DB value for #{download_path} (already downloaded, comparison not forced)") |
| 172 | + end |
| 173 | + end |
| 174 | +end |
| 175 | + |
| 176 | +zp_relation.where(suffix: '.zip').pluck(:endpoint_name, :druid, :version, :suffix).each do |endpoint_name, druid, version, suffix| |
| 177 | + logger.info("=== unzip #{druid} #{version} from #{endpoint_name}") |
| 178 | + s3_key = Replication::DruidVersionZip.new(druid, version).s3_key(suffix) |
| 179 | + download_path = Pathname(fixity_check_base_location.join(endpoint_name, s3_key)) |
| 180 | + if options[:dry_run] |
| 181 | + logger.info("DRY RUN, skipping unzipping #{download_path.basename} in #{download_path.dirname}") |
| 182 | + else |
| 183 | + unzip_filename = |
| 184 | + if Dir.glob("#{download_path.to_s.chomp('zip')}*").size > 1 |
| 185 | + "#{download_path.basename.to_s.chomp('zip')}combined.zip".tap do |combined_filename| |
| 186 | + logger.info("multi-part zip, combining into one file (#{combined_filename}) so unzip can handle it") |
| 187 | + if File.exist?("#{download_path.dirname}/#{combined_filename}") |
| 188 | + logger.info("#{download_path.dirname}/#{combined_filename} exists, skipping combining") |
| 189 | + else |
| 190 | + # https://unix.stackexchange.com/questions/40480/how-to-unzip-a-multipart-spanned-zip-on-linux |
| 191 | + logger.info(Open3.capture2e("zip -s 0 #{download_path.basename} --out #{combined_filename}", chdir: download_path.dirname)) |
| 192 | + end |
| 193 | + end |
| 194 | + else |
| 195 | + download_path.basename |
| 196 | + end |
| 197 | + logger.info("unzipping #{unzip_filename} in #{download_path.dirname}") |
| 198 | + # TODO: delete option to unzip so that it cleans up after itself? i don't think that's the default behavior? |
| 199 | + logger.debug(Open3.capture2e("unzip #{unzip_filename}", chdir: download_path.dirname)) |
| 200 | + end |
| 201 | +end |
| 202 | + |
| 203 | +endpoints_to_audit.each do |endpoint_name| |
| 204 | + druids.each do |druid| |
| 205 | + logger.info("=== fixity check unzipped Moab for #{druid} from #{endpoint_name}") |
| 206 | + storage_location = fixity_check_base_location.join(endpoint_name) |
| 207 | + unless ZippedMoabVersion.joins(:zip_endpoint, :preserved_object).where(preserved_object: { druid: }, zip_endpoint: { endpoint_name: }).exists? |
| 208 | + logger.info("#{endpoint_name} doesn't have any ZMVs for #{druid}, skipping fixity check") |
| 209 | + next |
| 210 | + end |
| 211 | + |
| 212 | + if options[:dry_run] |
| 213 | + logger.info("DRY RUN, skipping checksum validation for #{druid} in #{storage_location}") |
| 214 | + else |
| 215 | + logger.info "Starting checksum validation for #{druid} in #{storage_location} (NOTE: this may take some time!)" |
| 216 | + checksum_validator = Audit::ChecksumValidator.new( |
| 217 | + logger:, |
| 218 | + moab_storage_object: MoabOnStorage.moab(storage_location:, druid:), |
| 219 | + emit_results: true |
| 220 | + ) |
| 221 | + checksum_validator.validate |
| 222 | + exit_code = 1 if checksum_validator.results.error_results.present? |
| 223 | + end |
| 224 | + end |
| 225 | +end |
| 226 | + |
| 227 | +exit exit_code |
0 commit comments