Skip to content

Commit 9b68251

Browse files
committed
Add mirror URI column to compact manifest (#7151, PR #7576)
2 parents 88f78a4 + 1221688 commit 9b68251

File tree

9 files changed

+82
-24
lines changed

9 files changed

+82
-24
lines changed

src/azul/indexer/mirror_file_service.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,17 @@ def _storage(self) -> StorageService:
164164
bucket = aws.mirror_bucket
165165
return StorageService(bucket)
166166

167-
def get_mirror_url(self, file: File) -> str:
167+
def mirror_uri(self, file: File) -> str:
168+
"""
169+
Speculative S3 URI of the given file. No check is performed to see if
170+
the file is currently mirrored, so there is no guarantee that requests
171+
to the URI will succeed.
172+
"""
173+
return str(furl(scheme='s3',
174+
netloc=self._storage.bucket_name,
175+
path=self.mirror_object_key(file)))
176+
177+
def mirror_url(self, file: File) -> str:
168178
return self._storage.get_presigned_url(key=self.mirror_object_key(file),
169179
file_name=file.name,
170180
content_type=file.content_type)

src/azul/indexer/mirror_service.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -131,19 +131,29 @@ class BaseMirrorService:
131131
def _queues(self) -> Queues:
132132
return Queues()
133133

134+
@classmethod
135+
def may_mirror(cls, catalog: CatalogName, file_size: int = 0) -> bool:
136+
"""
137+
Test whether it makes sense to request the mirroring of files from the
138+
given catalog if they are of the given size or larger. If this method
139+
returns True, such files may or may not be mirrored. If this method
140+
returns False, the service will definitely refuse to mirror such files,
141+
although it may accept smaller files.
142+
"""
143+
if config.enable_mirroring:
144+
max_size = config.catalogs[catalog].mirror_limit
145+
return max_size is None or file_size <= max_size
146+
else:
147+
return False
148+
134149
def mirror_sources(self,
135150
catalog: CatalogName,
136151
sources: Iterable[tuple[SourceRef, SourceConfig]]
137152
):
138-
mirror_limit = config.catalogs[catalog].mirror_limit
139-
if mirror_limit is not None and mirror_limit < 0:
140-
log.info('Not mirroring any files in catalog %r because the file '
141-
'size limit is negative', catalog)
142-
else:
143-
153+
if self.may_mirror(catalog):
144154
def messages():
145-
for source, cfg in sources:
146-
if cfg.mirror:
155+
for source, source_config in sources:
156+
if source_config.mirror:
147157
log.info('Mirroring files in source %r from catalog %r',
148158
str(source.spec), catalog)
149159
yield MirrorSourceAction(catalog=catalog, source=source)
@@ -153,6 +163,9 @@ def messages():
153163
str(source.spec), catalog)
154164

155165
self._queue_messages(messages())
166+
else:
167+
log.info('Not mirroring any files in catalog %r because the file '
168+
'size limit is negative', catalog)
156169

157170
def mirror_file(self, catalog: CatalogName, source: SourceRef, file: File):
158171
self._queue_messages([MirrorFileAction(catalog=catalog,
@@ -238,17 +251,16 @@ def _list_public_source_ids(self, catalog: CatalogName) -> set[str]:
238251
def _(self, a: MirrorPartitionAction) -> Iterable[MirrorAction]:
239252
plugin = self._repository_plugin(a.catalog)
240253
files = plugin.list_files(a.source, a.prefix)
241-
max_size = config.catalogs[a.catalog].mirror_limit
242254
for file in files:
243255
assert file.size is not None, R('File size unknown', file)
244-
if max_size is not None and file.size > max_size:
245-
log.info('Not mirroring file to save cost: %r', file)
246-
else:
256+
if self.may_mirror(a.catalog, file.size):
247257
log.debug('Queueing file %r', file)
248258
yield MirrorFileAction(catalog=a.catalog,
249259
source=a.source,
250260
prefix=a.prefix,
251261
file=file)
262+
else:
263+
log.info('Not mirroring file to save cost: %r', file)
252264
log.info('Queued %d files in partition %r of source %r in catalog %r',
253265
len(files), a.prefix, str(a.source), a.catalog)
254266

src/azul/plugins/metadata/anvil/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,10 @@ def recurse(mapping: MetadataPlugin._FieldMapping, path: FieldPath):
359359
# Above, we already configured these two fields to be omitted from the
360360
# manifest since they are not informative to the user.
361361
result[('contents', 'files')]['file_url'] = 'files.azul_url'
362+
# FIXME: AnVIL uses uncommon encoding for MD5 digests
363+
# https://github.com/DataBiosphere/azul/issues/7154
364+
if False:
365+
result[('contents', 'files')]['file_mirror_uri'] = 'files.azul_mirror_uri'
362366
return result
363367

364368
primary_keys_by_table = {

src/azul/plugins/metadata/hca/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,8 @@ def manifest_config(self) -> ManifestConfig:
381381
'sha256': 'file_sha256',
382382
'content-type': 'file_content_type',
383383
'drs_uri': 'file_drs_uri',
384-
'file_url': 'file_azul_url'
384+
'file_url': 'file_azul_url',
385+
'file_mirror_uri': 'file_mirror_uri',
385386
},
386387
('contents', 'cell_suspensions'): {
387388
'document_id': 'cell_suspension.provenance.document_id',

src/azul/service/manifest_service.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@
107107
FieldTypes,
108108
null_str,
109109
)
110+
from azul.indexer.mirror_file_service import (
111+
BaseMirrorFileService,
112+
)
113+
from azul.indexer.mirror_service import (
114+
BaseMirrorService,
115+
)
110116
from azul.json import (
111117
copy_json,
112118
)
@@ -814,6 +820,10 @@ def repository_plugin(self) -> RepositoryPlugin:
814820
def metadata_plugin(self) -> MetadataPlugin:
815821
return self.service.metadata_plugin(self.catalog)
816822

823+
@cached_property
824+
def mirror_file_service(self) -> BaseMirrorFileService:
825+
return BaseMirrorFileService(catalog=self.catalog)
826+
817827
@classmethod
818828
@abstractmethod
819829
def file_name_extension(cls) -> str:
@@ -1083,7 +1093,7 @@ def convert(field_name, field_value):
10831093
try:
10841094
field_type = field_types[field_name]
10851095
except KeyError:
1086-
if field_name == 'file_url':
1096+
if field_name in ('file_url', 'file_mirror_uri'):
10871097
field_type = null_str
10881098
else:
10891099
raise
@@ -1145,6 +1155,13 @@ def _azul_file_url(self,
11451155
fetch=False,
11461156
**args))
11471157

1158+
def _azul_mirror_uri(self, file: JSON) -> str | None:
1159+
file = self.metadata_plugin.file_class.from_index(file)
1160+
if BaseMirrorService.may_mirror(self.catalog, file.size):
1161+
return self.mirror_file_service.mirror_uri(file)
1162+
else:
1163+
return None
1164+
11481165
@cached_property
11491166
def manifest_content_hash(self) -> int:
11501167
log.debug('Computing content hash for manifest using filters %r ...', self.filters)
@@ -1671,7 +1688,10 @@ def write_page_to(self,
16711688
entities = self._get_entities(field_path, doc)
16721689
if field_path == ('contents', 'files'):
16731690
file = copy_json(one(entities))
1674-
file['file_url'] = self._azul_file_url(file)
1691+
if 'file_url' in column_mapping:
1692+
file['file_url'] = self._azul_file_url(file)
1693+
if 'file_mirror_uri' in column_mapping:
1694+
file['file_mirror_uri'] = self._azul_mirror_uri(file)
16751695
entities = [file]
16761696
self._extract_fields(field_path=field_path,
16771697
entities=entities,
@@ -1684,7 +1704,10 @@ def write_page_to(self,
16841704
for related_file in file['related_files']:
16851705
related_row = {}
16861706
file.update(related_file)
1687-
file['file_url'] = self._azul_file_url(file)
1707+
if 'file_url' in column_mapping:
1708+
file['file_url'] = self._azul_file_url(file)
1709+
if 'file_mirror_uri' in column_mapping:
1710+
file['file_mirror_uri'] = self._azul_mirror_uri(file)
16881711
self._extract_fields(field_path=field_path,
16891712
entities=[file],
16901713
column_mapping=column_mapping,

src/azul/service/repository_controller.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def download_file(self,
275275
assert request_index == 0, request_index
276276
download = MirrorFileDownload(
277277
file=file,
278-
location=mirror_service.get_mirror_url(file),
278+
location=mirror_service.mirror_url(file),
279279
replica=replica,
280280
token=token
281281
)

test/indexer/test_mirror_controller.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,13 +227,13 @@ def test_files_not_mirrored(self):
227227

228228
catalog = config.catalogs[self.catalog]
229229

230-
def patch_max_file_size(size):
230+
def patch_mirror_limit(size):
231231
return patch.dict(config.catalogs, {
232232
self.catalog: attrs.evolve(catalog, mirror_limit=size)
233233
})
234234

235235
with self.subTest(mirror_limit=-1):
236-
with patch_max_file_size(-1):
236+
with patch_mirror_limit(-1):
237237
messages = self._mirror_sources()
238238
self.assertEqual([], messages)
239239

@@ -243,5 +243,5 @@ def patch_max_file_size(size):
243243
size=self._file.size + 1)
244244
source_message = self._test_mirror_sources()
245245
partition_message = self._test_mirror_source(source_message)
246-
with patch_max_file_size(self._file.size):
246+
with patch_mirror_limit(self._file.size):
247247
self._test_mirror_partition(partition_message, [too_big, self._file])

test/integration_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ class Catalog:
468468
# If test_mirroring is run for the catalog, ensure that the
469469
# source is not flagged as no_mirror so that we can test
470470
# downloading a mirrored file
471-
mirror=config.enable_mirroring and catalog.mirror_limit >= 0
471+
mirror=self.azul_client.mirror_service.may_mirror(catalog.name)
472472
)
473473
ma_source = self._select_source(catalog.name, public=False)
474474
if ma_source is not None:
@@ -1744,7 +1744,7 @@ def _test_mirroring(self, *, delete: bool):
17441744
catalogs = [
17451745
c.name
17461746
for c in config.catalogs.values()
1747-
if c.is_integration_test_catalog and c.mirror_limit >= 0
1747+
if c.is_integration_test_catalog and mirror_service.may_mirror(c.name)
17481748
]
17491749
sources_by_catalog = {
17501750
catalog: [self._select_source(catalog, public=True, mirror=True)]

test/service/test_manifest.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@
135135
)
136136
from service import (
137137
DocumentCloningTestCase,
138+
MirrorTestCase,
138139
StorageServiceTestCase,
139140
WebServiceTestCase,
140141
)
@@ -340,7 +341,10 @@ class DCP1ManifestTestCase(ManifestTestCase, DCP1CannedBundleTestCase):
340341
pass
341342

342343

343-
class TestManifests(DCP1ManifestTestCase):
344+
class TestManifests(DCP1ManifestTestCase, MirrorTestCase):
345+
346+
def _mirror_uri(self, sha256: str):
347+
return f's3://{self.mirror_bucket}/file/{sha256}.sha256'
344348

345349
def run(self,
346350
result: Optional[unittest.result.TestResult] = None
@@ -500,6 +504,10 @@ def test_compact_manifest(self):
500504
self._file_url('f2b6c6f0-8d25-4aae-b255-1974cc110cfe',
501505
'2018-09-14T12:33:43.720332Z')),
502506

507+
('file_mirror_uri',
508+
self._mirror_uri('2f6866c4ede92123f90dd15fb180fac56e33309b8fd3f4f52f263ed2f8af2f16'),
509+
self._mirror_uri('3125f2f86092798b85be93fbc66f4e733e9aec0929b558589c06929627115582')),
510+
503511
('cell_suspension.provenance.document_id',
504512
'',
505513
'0037c9eb-8038-432f-8d9d-13ee094e54ab || aaaaaaaa-8038-432f-8d9d-13ee094e54ab'),

0 commit comments

Comments
 (0)