Skip to content

Commit 832b1de

Browse files
committed
[WIP] initial commit of bin script to download and fixity check archive zips from preservation cloud buckets
1 parent 469ca58 commit 832b1de

File tree

1 file changed

+200
-0
lines changed

1 file changed

+200
-0
lines changed
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
#!/usr/bin/env ruby
2+
# frozen_string_literal: true
3+
4+
# This script will pull objects from the preservation cloud archives, and fixity check
5+
# them, using the preservation_catalog Audit::ChecksumValidator class. It will report
6+
# any errors that are found. The list of druids to retrieve is specified by any combo
7+
# of druids listed in a file (one bare druid per line with no druid: prefix), random
8+
# sampling of Moabs smaller than the zip segmenting threshold, and random sampling of
9+
# Moabs larger than the zip segmenting threshold.
10+
#
11+
# WARNING: this does some naive things that assume that the druid lists won't be more than
12+
# a few hundred long, or possibly a few thousand, since druid lists will be plucked from query
13+
# results and held in memory, parsed naively from file and held in memory, made unique with
14+
# Ruby Array#uniq, etc.
15+
16+
# Set default environment if not already set by the command line
17+
ENV["RAILS_ENV"] ||= "production"
18+
19+
require_relative '../config/environment'
20+
require 'optparse'
21+
22+
include ActionView::Helpers::NumberHelper
23+
24+
# somewhat duplicative of DruidVersionZip.ZIP_SPLIT_SIZE = '10g', but that's
25+
# zip format, and this is just bytes as an int, which is what the query wants
26+
ZIP_SEGMENT_THRESHOLD_GB = 10
27+
ZIP_SEGMENT_THRESHOLD = ZIP_SEGMENT_THRESHOLD_GB.gigabytes
28+
29+
options = {
30+
druid_list_file: nil,
31+
single_part_druid_sample_count: 0,
32+
multipart_druid_sample_count: 0,
33+
fixity_check_base_location: "/tmp/#{ENV['USER']}/archive_fixity_checking/",
34+
endpoints_to_audit: 'aws_s3_west_2,aws_s3_east_1,gcp_s3_south_1,ibm_us_south',
35+
dry_run: false,
36+
quiet: false
37+
}
38+
39+
parser = OptionParser.new do |option_parser|
40+
option_parser.banner = "Usage: #{$PROGRAM_NAME} [options]"
41+
option_parser.on('--druid_list_file DRUID_LIST_FILE',
42+
'file with a list of provided druids, e.g. from integration tests, manual tests, your own queries, etc')
43+
option_parser.on('--fixity_check_base_location FIXITY_CHECK_BASE_LOCATION',
44+
'target directory for downloading cloud archived Moabs, where they will be inflated and fixity checked. ensure sufficient free space.')
45+
option_parser.on('--single_part_druid_sample_count SINGLE_PART_DRUID_SAMPLE_COUNT',
46+
'number of < 10 GB Moabs to query for and retrieve (default: 0)')
47+
option_parser.on('--multipart_druid_sample_count MULTIPART_DRUID_SAMPLE_COUNT',
48+
'number of > 10 GB Moabs to query for and retrieve (default: 0)')
49+
option_parser.on('--endpoints_to_audit ENDPOINTS_TO_AUDIT',
50+
'list of cloud endpoints to audit (comma-separated, no spaces, names from config)')
51+
option_parser.on('--[no-]dry_run', 'Simulate the bag creation (defaults to false)')
52+
option_parser.on('--[no-]quiet', 'Do not output progress information (defaults to false)')
53+
option_parser.on('-h', '--help', 'Displays help.') do
54+
$stdout.puts option_parser
55+
exit
56+
end
57+
end
58+
59+
parser.parse!(into: options)
60+
exit_code = 0 # we can update if/when we hit problems
61+
62+
63+
logger = ActiveSupport::BroadcastLogger.new(
64+
Logger.new(Rails.root.join('log', 'audit_archive_zip_checksum_validation.log'))
65+
)
66+
logger.broadcast_to(Logger.new($stdout)) unless options[:quiet]
67+
68+
69+
endpoints_to_audit = options[:endpoints_to_audit].split(',')
70+
fixity_check_base_location = Pathname(options[:fixity_check_base_location])
71+
72+
73+
druids = []
74+
75+
if options[:druid_list_file].present? && File.file?(options[:druid_list_file])
76+
druids += File.readlines(options[:druid_list_file], chomp: true)
77+
end
78+
79+
if options[:single_part_druid_sample_count].positive?
80+
po_list =
81+
PreservedObject.joins(
82+
zipped_moab_versions: [:zip_parts, :zip_endpoint]
83+
).group(
84+
'preserved_objects.druid', 'zip_endpoint.endpoint_name'
85+
).having(
86+
'SUM(zip_parts.size) < :max_size',
87+
{ max_size: ZIP_SEGMENT_THRESHOLD } # we segment zips into 10 GB chunks
88+
).order(
89+
'RANDOM()'
90+
).limit(
91+
options[:single_part_druid_sample_count]
92+
).pluck(
93+
:druid, 'COUNT(zipped_moab_versions.id)', 'zip_endpoint.endpoint_name', 'COUNT(zip_parts.id)', Arel.sql('ARRAY_AGG((version, suffix))'), 'PG_SIZE_PRETTY(SUM(zip_parts.size))', 'SUM(zip_parts.size)'
94+
)
95+
96+
total_size = number_to_human_size(po_list.map { |row| row.last }.sum)
97+
logger.info("sub #{ZIP_SEGMENT_THRESHOLD} GB preserved_objects results (#{total_size} total): #{po_list}")
98+
druids += po_list.map { |row| row.first }.uniq
99+
end
100+
101+
if options[:multipart_druid_sample_count].positive?
102+
multipart_zip_po_list =
103+
PreservedObject.joins(
104+
zipped_moab_versions: [:zip_parts, :zip_endpoint]
105+
).group(
106+
'preserved_objects.druid', :version, 'zip_endpoint.endpoint_name'
107+
).having(
108+
'SUM(zip_parts.size) > :min_size',
109+
{ min_size: ZIP_SEGMENT_THRESHOLD } # we segment zips into 10 GB chunks
110+
).order(
111+
'RANDOM()'
112+
).limit(
113+
options[:multipart_druid_sample_count]
114+
).pluck(
115+
:druid, :version, 'COUNT(zipped_moab_versions.id)', 'zip_endpoint.endpoint_name', 'COUNT(zip_parts.id)', 'ARRAY_AGG(suffix)', 'PG_SIZE_PRETTY(SUM(zip_parts.size))', 'SUM(zip_parts.size)'
116+
)
117+
118+
total_size = number_to_human_size(multipart_zip_po_list.map { |row| row.last }.sum)
119+
logger.info("over #{ZIP_SEGMENT_THRESHOLD} GB preserved_objects results (#{total_size} total): #{multipart_zip_po_list}")
120+
druids += multipart_zip_po_list.map { |row| row.first }.uniq
121+
end
122+
123+
logger.info("druids to check: #{druids}")
124+
125+
zp_relation = ZipPart.joins(
126+
zipped_moab_version: [:preserved_object, :zip_endpoint]
127+
).where(
128+
preserved_object: { druid: druids },
129+
zip_endpoint: { endpoint_name: endpoints_to_audit }
130+
).order(:endpoint_name, :druid, :version, :suffix)
131+
132+
zp_relation.pluck(:endpoint_name, :druid, :version, :suffix, :md5, :size).each do |endpoint_name, druid, version, suffix, db_md5, db_size|
133+
s3_key = Replication::DruidVersionZip.new(druid, version).s3_key(suffix)
134+
download_path = Pathname(fixity_check_base_location.join(endpoint_name, s3_key))
135+
s3_object = ZipEndpoint.find_by!(endpoint_name:).bucket.object(s3_key)
136+
logger.info("=== retrieve #{s3_key} from #{endpoint_name}")
137+
logger.debug("download_path=#{download_path}")
138+
logger.debug("download_path.exist?=#{download_path.exist?}")
139+
logger.debug("download_path.dirname=#{download_path.dirname}")
140+
logger.debug("download_path.dirname.exist?=#{download_path.dirname.exist?}")
141+
logger.debug("s3_key=#{s3_key}")
142+
logger.debug("s3_object.exists?=#{s3_object.exists?}")
143+
logger.debug("db_md5=#{db_md5}")
144+
logger.debug("db_size=#{db_size}")
145+
logger.debug("s3_object.metadata=#{s3_object.metadata}")
146+
if options[:dry_run]
147+
logger.info("DRY RUN: skipping download and fresh MD5 computation of #{s3_key} from #{endpoint_name}")
148+
else
149+
FileUtils.mkdir_p(download_path.dirname) unless download_path.dirname.exist?
150+
unless download_path.exist?
151+
logger.info("downloading #{s3_key} from #{endpoint_name} (#{number_to_human_size(db_size)} expected)")
152+
s3_object.download_file(download_path)
153+
logger.info("downloaded #{s3_key} from #{endpoint_name} (#{number_to_human_size(File.size(download_path.to_s))} retrieved)")
154+
else
155+
logger.info("skipping download of #{s3_key} from #{endpoint_name}, already downloaded")
156+
end
157+
fresh_md5 = Digest::MD5.file(download_path)
158+
logger.info("fresh_md5.hexdigest=#{fresh_md5.hexdigest}")
159+
logger.info("fresh_md5.hexdigest==db_md5: #{fresh_md5.hexdigest==db_md5 ? '✅' : '🚨' }")
160+
end
161+
end
162+
163+
zp_relation.where(suffix: '.zip').pluck(:endpoint_name, :druid, :version, :suffix).each do |endpoint_name, druid, version, suffix|
164+
logger.info("=== unzip #{druid} #{version} from #{endpoint_name}")
165+
s3_key = Replication::DruidVersionZip.new(druid, version).s3_key(suffix)
166+
download_path = Pathname(fixity_check_base_location.join(endpoint_name, s3_key))
167+
if options[:dry_run]
168+
logger.info("DRY RUN, skipping unzipping #{download_path.basename} in #{download_path.dirname}")
169+
else
170+
logger.info("unzipping #{download_path.basename} in #{download_path.dirname}")
171+
# TODO: delete option to unzip so that it cleans up after itself? i don't think that's the default behavior?
172+
logger.debug(Open3.capture2e("unzip #{download_path.basename}", chdir: download_path.dirname))
173+
end
174+
end
175+
176+
endpoints_to_audit.each do |endpoint_name|
177+
druids.each do |druid|
178+
logger.info("=== fixity check unzipped Moab for #{druid} from #{endpoint_name}")
179+
storage_location = fixity_check_base_location.join(endpoint_name)
180+
unless ZippedMoabVersion.joins(:zip_endpoint, :preserved_object).where(preserved_object: { druid: }, zip_endpoint: { endpoint_name: }).exists?
181+
logger.info("#{endpoint_name} doesn't have any ZMVs for #{druid}, skipping fixity check")
182+
next
183+
end
184+
185+
if options[:dry_run]
186+
logger.info("DRY RUN, skipping checksum validation for #{druid} in #{storage_location}")
187+
else
188+
logger.info "Starting checksum validation for #{druid} in #{storage_location} (NOTE: this may take some time!)"
189+
checksum_validator = Audit::ChecksumValidator.new(
190+
logger:,
191+
moab_storage_object: MoabOnStorage.moab(storage_location:, druid:),
192+
emit_results: true
193+
)
194+
checksum_validator.validate
195+
exit_code = 1 if checksum_validator.results.error_results.present?
196+
end
197+
end
198+
end
199+
200+
exit exit_code

0 commit comments

Comments
 (0)