Skip to content

Commit 9b6163e

Browse files
committed
Refactors replication.
1 parent 57c8c2f commit 9b6163e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1386
-53
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# frozen_string_literal: true
2+
3+
module Audit
4+
# Job to audit replication for a PreservedObject
5+
class ReplicationAuditJob < ApplicationJob
6+
queue_as :moab_replication_audit
7+
delegate :logger, to: Audit::ReplicationSupport
8+
9+
include UniqueJob
10+
11+
def perform(preserved_object)
12+
@preserved_object = preserved_object
13+
14+
preserved_object.populate_zipped_moab_versions!
15+
16+
::Replication::AuditService.call(preserved_object: preserved_object).each do |audit_results|
17+
AuditResultsReporter.report_results(audit_results: audit_results, logger: logger)
18+
end
19+
20+
preserved_object.update!(last_archive_audit: Time.current)
21+
22+
start_replication
23+
end
24+
25+
attr_reader :preserved_object
26+
27+
def start_replication
28+
return unless preserved_object.zipped_moab_versions.created.exists? || preserved_object.zipped_moab_versions.incomplete.exists?
29+
::ReplicationJob.perform_later(preserved_object)
30+
end
31+
end
32+
end

app/jobs/replication_job.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# frozen_string_literal: true
2+
3+
# Job to replicate a PreservedObject to cloud endpoints
4+
class ReplicationJob < ApplicationJob
5+
queue_as :replication
6+
include UniqueJob
7+
8+
def perform(preserved_object)
9+
preserved_object.populate_zipped_moab_versions!
10+
11+
::Replication::AuditService.call(preserved_object: preserved_object)
12+
13+
(1..preserved_object.current_version).each do |version|
14+
Replication::ReplicateVersionService.call(preserved_object:, version:)
15+
end
16+
end
17+
end

app/models/moab_record.rb

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,6 @@ class MoabRecord < ApplicationRecord
1414
'validity_unknown' => 6
1515
}
1616

17-
after_create :create_zipped_moab_versions!
18-
# hook for creating archive zips is here and on PreservedObject, because version and current_version must be in sync, and
19-
# even though both fields will usually be updated together in a single transaction, one has to be updated first. latter
20-
# of the two updates will actually trigger replication.
21-
after_update :create_zipped_moab_versions!, if: :saved_change_to_version? # an ActiveRecord dynamic method
2217
after_save :validate_checksums!, if: proc { |moab_record| moab_record.saved_change_to_status? && moab_record.validity_unknown? }
2318

2419
# NOTE: Since Rails 5.0, belongs_to adds the presence validator automatically, and explicit presence validation

app/models/preserved_object.rb

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,6 @@
88
class PreservedObject < ApplicationRecord
99
PREFIX_RE = /druid:/i
1010

11-
# hook for creating archive zips is here and on MoabRecord, because version and current_version must be in sync, and
12-
# even though both fields will usually be updated together in a single transaction, one has to be updated first. latter
13-
# of the two updates will actually trigger replication.
14-
after_update :create_zipped_moab_versions!, if: :saved_change_to_current_version? # an ActiveRecord dynamic method
15-
1611
has_one :moab_record, dependent: :restrict_with_exception, autosave: true
1712
has_many :zipped_moab_versions, dependent: :restrict_with_exception, inverse_of: :preserved_object
1813

@@ -51,14 +46,27 @@ def create_zipped_moab_versions!
5146
end
5247
end
5348

49+
# Creates ZippedMoabVersion for each version for each ZipEndpoint for which a ZippedMoabVersion does not already exist.
50+
# @return [Array<ZippedMoabVersion>] the ZippedMoabVersions that were created
51+
def populate_zipped_moab_versions!
52+
[].tap do |new_zipped_moab_versions|
53+
(1..current_version).each do |version|
54+
ZipEndpoint.find_each do |zip_endpoint|
55+
next if zipped_moab_versions.exists?(version: version, zip_endpoint: zip_endpoint)
56+
new_zipped_moab_versions << zipped_moab_versions.create!(version: version, zip_endpoint: zip_endpoint)
57+
end
58+
end
59+
end
60+
end
61+
5462
def as_json(*)
5563
super.except('id')
5664
end
5765

5866
# Queue a job that will check to see whether this PreservedObject has been
5967
# fully replicated to all target ZipEndpoints
6068
def audit_moab_version_replication!
61-
Audit::MoabReplicationAuditJob.perform_later(self)
69+
Audit::ReplicationAuditJob.perform_later(self)
6270
end
6371

6472
def total_size_of_moab_version(version)

app/models/replication/druid_version_zip.rb

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,28 @@ def base_key
3636
# create it.
3737
# @raise [StandardError] if there's a zip file for this druid-version, but it looks too small to be complete.
3838
def find_or_create_zip!
39-
if File.exist?(file_path)
39+
if exist?
4040
raise "zip already exists, but size (#{total_part_size}) is smaller than the moab version size (#{moab_version_size})!" unless zip_size_ok?
4141
FileUtils.touch(file_path)
4242
else
4343
create_zip!
4444
end
4545
end
4646

47+
def exist?
48+
File.exist?(file_path)
49+
end
50+
51+
# @return [Boolean] true if there is a match between the zip part files and their md5 sidecar files
52+
def complete?
53+
# There is at least one part file
54+
return false if part_paths.empty?
55+
# The set of md5 sidecar files matches the set of part files
56+
return false unless part_keys.to_set == part_keys_from_md5_sidecars.to_set
57+
# Check each md5 sidecar file against the zip part file
58+
druid_version_zip_parts.all?(&:md5_match?)
59+
end
60+
4761
# Creates a zip of Druid-Version content.
4862
# Changes directory so that the storage root (and druid tree) are not part of
4963
# the archival directory structure, just the object, e.g. starting at 'ab123cd4567/...' directory,
@@ -54,7 +68,7 @@ def create_zip!
5468
combined, status = Open3.capture2e(zip_command, chdir: work_dir.to_s)
5569
raise "zipmaker failure #{combined}" unless status.success?
5670
unless zip_size_ok?
57-
part_cleanup_errors = cleanup_zip_parts!
71+
part_cleanup_errors = cleanup_zip_parts_with_rescue!
5872
part_cleanup_err_msg = "\n-- errors cleaning up zip parts: #{part_cleanup_errors.map(&:inspect)}" if part_cleanup_errors.present?
5973
raise "zip size (#{total_part_size}) is smaller than the moab version size (#{moab_version_size})! zipmaker failure #{combined}#{part_cleanup_err_msg}"
6074
end
@@ -172,6 +186,18 @@ def moab_version_size
172186
moab_version_files.sum { |f| File.size(f) }
173187
end
174188

189+
# Deletes all zip part files and their md5 sidecar files from local zip storage
190+
def cleanup_zip_parts!
191+
parts_and_checksums_paths.each { |filepath| File.delete(filepath) }
192+
end
193+
194+
# @return [Array<DruidVersionZipPart>] all parts for this DruidVersionZip
195+
def druid_version_zip_parts
196+
part_keys.map do |part_key|
197+
Replication::DruidVersionZipPart.new(self, part_key)
198+
end
199+
end
200+
175201
private
176202

177203
# Throws an error if any of the files in the moab are not yet readable. For example due to
@@ -184,7 +210,7 @@ def check_moab_version_readability!
184210
moab_version_files.map { |f| File.stat(f) }
185211
end
186212

187-
def cleanup_zip_parts!
213+
def cleanup_zip_parts_with_rescue!
188214
errors = []
189215
parts_and_checksums_paths.map do |p|
190216
File.delete(p)
@@ -219,5 +245,14 @@ def fetch_zip_version
219245
def zip_version_regexp
220246
/This is (Zip \d+(\.\d)+\s*(\(.*\d{4}\))?)/
221247
end
248+
249+
# @return [Array<String>] relative paths, i.e. s3_part_keys for existing parts based on the md5 sidecar files
250+
def part_keys_from_md5_sidecars
251+
md5_sidecar_paths.map { |md5_path| md5_path.relative_path_from(zip_storage).to_s.delete_suffix('.md5') }
252+
end
253+
254+
def md5_sidecar_paths
255+
Pathname.glob(File.join(zip_storage, s3_key('.*.md5')))
256+
end
222257
end
223258
end

app/models/replication/druid_version_zip_part.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,10 @@ def write_md5
7373
def read_md5
7474
File.read(md5_path)
7575
end
76+
77+
# @return [Boolean] whether the md5 from the md5 sidecar file matches the computed md5
78+
def md5_match?
79+
read_md5 == hexdigest
80+
end
7681
end
7782
end

app/models/zip_endpoint.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ def audit_class
6363
raise "Failed to return audit class based on setting for #{endpoint_name}. Check setting string for accuracy."
6464
end
6565

66+
# @return [Aws::S3::Bucket] S3 bucket object for this zip endpoint
67+
def bucket
68+
@bucket ||= Replication::ProviderFactory.create(zip_endpoint: self).bucket
69+
end
70+
6671
def to_s
6772
endpoint_name
6873
end

app/models/zip_part.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
# This model's data is populated by Replication::DeliveryDispatcherJob.
66
class ZipPart < ApplicationRecord
77
belongs_to :zipped_moab_version, inverse_of: :zip_parts
8-
delegate :zip_endpoint, :preserved_object, to: :zipped_moab_version
8+
delegate :zip_endpoint, :preserved_object, :druid_version_zip, to: :zipped_moab_version
99

1010
# @note Hash values cannot be modified without migrating any associated persisted data.
1111
# @see [enum docs] http://api.rubyonrails.org/classes/ActiveRecord/Enum.html
@@ -37,11 +37,15 @@ def suffixes_in_set
3737
druid_version_zip.expected_part_keys(parts_count).map { |key| File.extname(key) }
3838
end
3939

40-
def druid_version_zip
41-
@druid_version_zip ||= Replication::DruidVersionZip.new(preserved_object.druid, zipped_moab_version.version)
40+
def druid_version_zip_part
41+
@druid_version_zip_part ||= Replication::DruidVersionZipPart.new(druid_version_zip, s3_key)
4242
end
4343

4444
def s3_key
4545
druid_version_zip.s3_key(suffix)
4646
end
47+
48+
def s3_part
49+
@s3_part ||= zip_endpoint.bucket.object(s3_key)
50+
end
4751
end

app/models/zipped_moab_version.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,22 @@ class ZippedMoabVersion < ApplicationRecord
1414

1515
validates :version, presence: true
1616

17+
# If zip_parts_count is present, must be greater than zero
18+
validates :zip_parts_count, numericality: { only_integer: true, greater_than: 0 }, allow_nil: true
19+
20+
enum :status, {
21+
'ok' => 0,
22+
'incomplete' => 1,
23+
'created' => 2, # DB-level default
24+
'failed' => 3
25+
}
26+
1727
scope :by_druid, lambda { |druid|
1828
joins(:preserved_object).where(preserved_objects: { druid: druid })
1929
}
2030

31+
before_save :update_status_updated_at
32+
2133
# ideally, there should be only one distinct parts_count value among a set of sibling
2234
# zip_parts. if there's variation in the count, that implies the zip was remade, and that
2335
# the part count differed between the zip invocations (which may imply a zip implementation
@@ -41,4 +53,15 @@ def all_parts_replicated?
4153
def total_part_size
4254
zip_parts.sum(&:size)
4355
end
56+
57+
def update_status_updated_at
58+
self.status_updated_at = Time.current if status_changed?
59+
end
60+
61+
def druid_version_zip
62+
@druid_version_zip ||= Replication::DruidVersionZip.new(
63+
# In tests, a PreservedObject may not have a MoabRecord, hence the safe navigation.
64+
preserved_object.druid, version, preserved_object&.moab_record&.moab_storage_root&.storage_location # rubocop:disable Style/SafeNavigationChainLength
65+
)
66+
end
4467
end

app/services/audit/replication_support.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def self.check_child_zip_part_attributes(zmv, results)
4141
if unreplicated_parts.any?
4242
results.add_result(
4343
Audit::Results::ZIP_PARTS_NOT_ALL_REPLICATED,
44-
base_hash.merge(unreplicated_parts_list: unreplicated_parts.to_a)
44+
base_hash
4545
)
4646
end
4747

0 commit comments

Comments
 (0)