Skip to content

Commit d47920c

Browse files
committed
Fix: Accumulator overflow during aggregation (#6793)
1 parent ca6fa2d commit d47920c

File tree

12 files changed

+162
-2299
lines changed

12 files changed

+162
-2299
lines changed

src/azul/plugins/metadata/anvil/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,12 +314,26 @@ def manifest_config(self) -> ManifestConfig:
314314
# Note that there is a brittle coupling that must be maintained between
315315
# the fields listed here and those used in `self._field_mapping`.
316316
fields_to_omit_from_manifest: list[FieldPath] = [
317+
('contents', 'activities', 'activity_id'),
317318
('contents', 'activities', 'activity_table'),
319+
('contents', 'activities', 'document_id'),
320+
('contents', 'activities', 'source_datarepo_row_ids'),
321+
('contents', 'biosamples', 'biosample_id'),
322+
('contents', 'biosamples', 'document_id'),
323+
('contents', 'biosamples', 'source_datarepo_row_ids'),
324+
('contents', 'datasets', 'document_id'),
318325
# We omit the `duos_id` field from manifests since there is only one
319326
# DUOS bundle per dataset, and that bundle only contributes to outer
320327
# entities of the `datasets` type, not to entities of the other
321328
# types, such as files, which the manifest is generated from.
322329
('contents', 'datasets', 'duos_id'),
330+
('contents', 'datasets', 'source_datarepo_row_ids'),
331+
('contents', 'diagnoses', 'diagnosis_id'),
332+
('contents', 'diagnoses', 'document_id'),
333+
('contents', 'diagnoses', 'source_datarepo_row_ids'),
334+
('contents', 'donors', 'document_id'),
335+
('contents', 'donors', 'donor_id'),
336+
('contents', 'donors', 'source_datarepo_row_ids'),
323337
('contents', 'files', 'version'),
324338
]
325339

src/azul/plugins/metadata/anvil/indexer/aggregate.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,20 @@
2323

2424

2525
class ActivityAggregator(SimpleAggregator):
26-
pass
26+
27+
def _accumulator(self, field: str) -> Accumulator | None:
28+
if field in ('activity_id', 'document_id', 'source_datarepo_row_ids'):
29+
return None
30+
else:
31+
return super()._accumulator(field)
2732

2833

2934
class BiosampleAggregator(SimpleAggregator):
3035

3136
def _accumulator(self, field: str) -> Accumulator | None:
32-
if field == 'donor_age_at_collection':
37+
if field in ('biosample_id', 'document_id', 'source_datarepo_row_ids'):
38+
return None
39+
elif field == 'donor_age_at_collection':
3340
return SetOfDictAccumulator(max_size=100,
3441
key=compose_keys(none_safe_tuple_key(none_last=True),
3542
itemgetter('lte', 'gte')))
@@ -38,13 +45,24 @@ def _accumulator(self, field: str) -> Accumulator | None:
3845

3946

4047
class DatasetAggregator(SimpleAggregator):
41-
pass
48+
49+
def _accumulator(self, field: str) -> Accumulator | None:
50+
if field == 'source_datarepo_row_ids':
51+
return None
52+
# Aggregation of datasets.document_id is required for the creation of
53+
# manifests
54+
elif field == 'document_id':
55+
return super()._accumulator(field)
56+
else:
57+
return super()._accumulator(field)
4258

4359

4460
class DiagnosisAggregator(SimpleAggregator):
4561

4662
def _accumulator(self, field: str) -> Accumulator | None:
47-
if field in ('diagnosis_age', 'onset_age'):
63+
if field in ('diagnosis_id', 'document_id', 'source_datarepo_row_ids'):
64+
return None
65+
elif field in ('diagnosis_age', 'onset_age'):
4866
return SetOfDictAccumulator(max_size=100,
4967
key=compose_keys(none_safe_tuple_key(none_last=True),
5068
itemgetter('lte', 'gte')))
@@ -53,7 +71,12 @@ def _accumulator(self, field: str) -> Accumulator | None:
5371

5472

5573
class DonorAggregator(SimpleAggregator):
56-
pass
74+
75+
def _accumulator(self, field: str) -> Accumulator | None:
76+
if field in ('document_id', 'donor_id', 'source_datarepo_row_ids'):
77+
return None
78+
else:
79+
return super()._accumulator(field)
5780

5881

5982
class FileAggregator(GroupingAggregator):
@@ -72,7 +95,17 @@ def _group_keys(self, entity) -> tuple[Any, ...]:
7295
return entity['file_format'],
7396

7497
def _accumulator(self, field: str) -> Accumulator | None:
75-
if field in ('count', 'file_size'):
98+
if field in (
99+
'document_id',
100+
'drs_uri',
101+
'file_id',
102+
'file_md5sum',
103+
'file_name',
104+
'source_datarepo_row_ids',
105+
'uuid',
106+
):
107+
return None
108+
elif field in ('count', 'file_size'):
76109
return DistinctAccumulator(SumAccumulator())
77110
else:
78111
return super()._accumulator(field)

src/azul/plugins/metadata/hca/__init__.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -387,14 +387,10 @@ def manifest_config(self) -> ManifestConfig:
387387
'file_mirror_uri': 'file_mirror_uri',
388388
},
389389
('contents', 'cell_suspensions'): {
390-
'document_id': 'cell_suspension.provenance.document_id',
391-
'biomaterial_id': 'cell_suspension.biomaterial_core.biomaterial_id',
392390
'total_estimated_cells': 'cell_suspension.estimated_cell_count',
393391
'selected_cell_type': 'cell_suspension.selected_cell_type'
394392
},
395-
('contents', 'sequencing_processes'): {
396-
'document_id': 'sequencing_process.provenance.document_id'
397-
},
393+
('contents', 'sequencing_processes'): {},
398394
('contents', 'sequencing_protocols'): {
399395
'instrument_manufacturer_model': 'sequencing_protocol.instrument_manufacturer_model',
400396
'paired_end': 'sequencing_protocol.paired_end'
@@ -420,7 +416,6 @@ def manifest_config(self) -> ManifestConfig:
420416
},
421417
('contents', 'donors'): {
422418
'biological_sex': 'donor_organism.sex',
423-
'biomaterial_id': 'donor_organism.biomaterial_core.biomaterial_id',
424419
'document_id': 'donor_organism.provenance.document_id',
425420
'genus_species': 'donor_organism.genus_species',
426421
'development_stage': 'donor_organism.development_stage',
@@ -439,12 +434,8 @@ def manifest_config(self) -> ManifestConfig:
439434
},
440435
('contents', 'samples'): {
441436
'entity_type': '_entity_type',
442-
'document_id': 'sample.provenance.document_id',
443-
'biomaterial_id': 'sample.biomaterial_core.biomaterial_id'
444437
},
445438
('contents', 'sequencing_inputs'): {
446-
'document_id': 'sequencing_input.provenance.document_id',
447-
'biomaterial_id': 'sequencing_input.biomaterial_core.biomaterial_id',
448439
'sequencing_input_type': 'sequencing_input_type'
449440
}
450441
}

src/azul/plugins/metadata/hca/indexer/aggregate.py

Lines changed: 67 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,32 @@ def _default_accumulator(self) -> Accumulator | None:
116116

117117

118118
class SampleAggregator(SimpleAggregator):
119-
pass
119+
120+
def _accumulator(self, field) -> Accumulator | None:
121+
if field == 'document_id':
122+
return None
123+
# TODO: remove sampleId filter
124+
# Aggregation of samples.biomaterial_id is required for filters
125+
# using the `sampleId` field on non-sample endpoints.
126+
elif field == 'biomaterial_id':
127+
return super()._accumulator(field)
128+
else:
129+
return super()._accumulator(field)
120130

121131

122132
class SpecimenAggregator(SimpleAggregator):
123-
pass
133+
134+
def _accumulator(self, field) -> Accumulator | None:
135+
if field == 'biomaterial_id':
136+
return None
137+
# TODO: use `if` and comment why (high cardinality, only 1 for samples)
138+
# if self.outer_entity_type == samples
139+
# Aggregation of `document_id` is required for the summary response
140+
# field `specimenCount` as it is calculated from the `samples` aggregate
141+
elif field == 'document_id':
142+
return super()._accumulator(field)
143+
else:
144+
return super()._accumulator(field)
124145

125146

126147
class CellSuspensionAggregator(GroupingAggregator):
@@ -143,14 +164,21 @@ def _group_keys(self, entity) -> tuple[Any, ...]:
143164
return frozenset(entity['organ']),
144165

145166
def _accumulator(self, field) -> Accumulator | None:
146-
if field in self.cell_count_fields:
167+
if field in ('document_id', 'biomaterial_id'):
168+
return None
169+
elif field in self.cell_count_fields:
147170
return DistinctAccumulator(SumAccumulator())
148171
else:
149172
return super()._accumulator(field)
150173

151174

152175
class CellLineAggregator(SimpleAggregator):
153-
pass
176+
177+
def _accumulator(self, field) -> Accumulator | None:
178+
if field in ('document_id', 'biomaterial_id'):
179+
return None
180+
else:
181+
return super()._accumulator(field)
154182

155183

156184
class DonorOrganismAggregator(SimpleAggregator):
@@ -162,34 +190,39 @@ def _transform_entity(self, entity: JSON) -> JSON:
162190
}
163191

164192
def _accumulator(self, field) -> Accumulator | None:
165-
if field == 'organism_age_range':
166-
return SetAccumulator(max_size=100)
193+
if field == 'biomaterial_id':
194+
return None
195+
# Aggregation of donors.document_id is required for the summary response
196+
# field `donorCount` which is calculated from the `samples` aggregate.
197+
elif field == 'document_id':
198+
return super()._accumulator(field)
199+
elif field == 'development_stage':
200+
return SetAccumulator(max_size=200)
201+
elif field == 'organism_age_range':
202+
return SetAccumulator(max_size=200)
167203
elif field == 'organism_age':
168-
return SetOfDictAccumulator(max_size=100,
204+
return SetOfDictAccumulator(max_size=200,
169205
key=compose_keys(none_safe_tuple_key(none_last=True),
170206
none_safe_itemgetter('value', 'unit')))
171207
elif field == 'donor_count':
172208
return UniqueValueCountAccumulator()
173-
elif field == 'document_id':
174-
# If any donor IDs are missing from the aggregate, those donors will
175-
# be omitted during the verbatim handover. Donors are a "hot" entity
176-
# type, and we can't track their hubs in replica documents, so we
177-
# rely on the inner entity IDs instead.
178-
#
179-
# FIXME: Enforce that hot entity types are completely aggregated
180-
# https://github.com/DataBiosphere/azul/issues/6793
181-
return SetAccumulator(max_size=100)
182209
else:
183210
return super()._accumulator(field)
184211

185212

186213
class OrganoidAggregator(SimpleAggregator):
187-
pass
188214

215+
def _accumulator(self, field) -> Accumulator | None:
216+
if field in ('document_id', 'biomaterial_id'):
217+
return None
218+
else:
219+
return super()._accumulator(field)
189220

190221
class ProjectAggregator(SimpleAggregator):
191222

192223
def _accumulator(self, field) -> Accumulator | None:
224+
# Aggregation of projects.document_id is required to allow filters using
225+
# the `projectId` field on non-project endpoints.
193226
if field == 'document_id':
194227
return SetAccumulator(max_size=100)
195228
elif field in ('project_description',
@@ -212,17 +245,10 @@ def _accumulator(self, field) -> Accumulator | None:
212245
class ProtocolAggregator(SimpleAggregator):
213246

214247
def _accumulator(self, field) -> Accumulator | None:
215-
if field == 'assay_type':
248+
if field in ('document_id', 'biomaterial_id'):
249+
return None
250+
elif field == 'assay_type':
216251
return FrequencySetAccumulator(max_size=100)
217-
elif field == 'document_id':
218-
# If any protocol IDs are missing from the aggregate, those
219-
# protocols may be omitted during the verbatim handover. Some
220-
# protocols are "hot" entity types, and we can't track their hubs in
221-
# replicas, so we rely on the inner entity IDs instead.
222-
#
223-
# FIXME: Enforce that hot entity types are completely aggregated
224-
# https://github.com/DataBiosphere/azul/issues/6793
225-
return SetAccumulator(max_size=100)
226252
else:
227253
return super()._accumulator(field)
228254

@@ -231,11 +257,22 @@ def _default_accumulator(self) -> Accumulator | None:
231257

232258

233259
class SequencingInputAggregator(SimpleAggregator):
234-
pass
260+
261+
def _accumulator(self, field) -> Accumulator | None:
262+
if field in ('document_id', 'biomaterial_id'):
263+
return None
264+
else:
265+
return super()._accumulator(field)
235266

236267

237268
class SequencingProcessAggregator(SimpleAggregator):
238269

270+
def _accumulator(self, field) -> Accumulator | None:
271+
if field in ('document_id', 'biomaterial_id'):
272+
return None
273+
else:
274+
return super()._accumulator(field)
275+
239276
def _default_accumulator(self) -> Accumulator | None:
240277
return SetAccumulator(max_size=10)
241278

@@ -246,15 +283,15 @@ def _accumulator(self, field) -> Accumulator | None:
246283
if field == 'document_id':
247284
return None
248285
elif field == 'file':
249-
return DictAccumulator(max_size=100, key=itemgetter('uuid'))
286+
return DictAccumulator(max_size=500, key=itemgetter('uuid'))
250287
else:
251288
return SetAccumulator()
252289

253290

254291
class DateAggregator(SimpleAggregator):
255292

256293
def _accumulator(self, field) -> Accumulator | None:
257-
if field == 'document_id':
294+
if field in ('document_id', 'biomaterial_id'):
258295
return None
259296
elif field in ('submission_date', 'aggregate_submission_date'):
260297
return MinAccumulator()

src/azul/plugins/metadata/hca/service/response.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ def make_projects(self, entry) -> MutableJSONs:
362362
'duosId': project.get('duos_id')
363363
}
364364
if self.entity_type == 'projects':
365+
# translated_project['projectId'] = project['document_id']
365366
translated_project['projectDescription'] = project.get('project_description', [])
366367
contributors = project.get('contributors', []) # list of dict
367368
translated_project['contributors'] = contributors
@@ -422,7 +423,7 @@ def make_file(self, file: JSON) -> JSON:
422423

423424
def make_specimen(self, specimen) -> MutableJSON:
424425
return {
425-
'id': specimen['biomaterial_id'],
426+
# 'id': specimen['biomaterial_id'],
426427
'organ': specimen.get('organ', None),
427428
'organPart': specimen.get('organ_part', None),
428429
'disease': specimen.get('disease', None),
@@ -452,7 +453,7 @@ def make_cell_suspensions(self, entry) -> MutableJSONs:
452453

453454
def make_cell_line(self, cell_line) -> MutableJSON:
454455
return {
455-
'id': cell_line['biomaterial_id'],
456+
# 'id': cell_line['biomaterial_id'],
456457
'cellLineType': cell_line.get('cell_line_type', None),
457458
'modelOrgan': cell_line.get('model_organ', None),
458459
}
@@ -462,7 +463,7 @@ def make_cell_lines(self, entry) -> MutableJSONs:
462463

463464
def make_donor(self, donor) -> MutableJSON:
464465
return {
465-
'id': donor['biomaterial_id'],
466+
# 'id': donor['biomaterial_id'],
466467
'donorCount': donor.get('donor_count', None),
467468
'developmentStage': donor.get('development_stage', None),
468469
'genusSpecies': donor.get('genus_species', None),
@@ -477,7 +478,7 @@ def make_donors(self, entry) -> MutableJSONs:
477478

478479
def make_organoid(self, organoid) -> MutableJSON:
479480
return {
480-
'id': organoid['biomaterial_id'],
481+
# 'id': organoid['biomaterial_id'],
481482
'modelOrgan': organoid.get('model_organ', None),
482483
'modelOrganPart': organoid.get('model_organ_part', None)
483484
}
@@ -486,7 +487,7 @@ def make_organoids(self, entry) -> MutableJSONs:
486487
return [self.make_organoid(organoid) for organoid in entry['contents']['organoids']]
487488

488489
def make_sample(self, sample, entity_dict, entity_type) -> MutableJSON:
489-
is_aggregate = isinstance(sample['document_id'], list)
490+
is_aggregate = 'document_id' not in sample
490491
organ_prop = 'organ' if entity_type == 'specimens' else 'model_organ'
491492
return {
492493
'sampleEntityType': [entity_type] if is_aggregate else entity_type,

0 commit comments

Comments
 (0)