Skip to content

Commit f3d72ed

Browse files
committed
import legacy co-assemblies
1 parent d46279b commit f3d72ed

File tree

3 files changed

+262
-0
lines changed

3 files changed

+262
-0
lines changed

workflows/flows/legacy/flows/import_v5_analyses.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
sync_study_metadata_from_ena,
1010
sync_sample_metadata_from_ena,
1111
)
12+
from workflows.flows.legacy.tasks.make_assembly_from_legacy_emg_db import (
13+
make_assembly_from_legacy_emg_db,
14+
)
1215
from workflows.flows.legacy.tasks.make_run_from_legacy_emg_db import (
1316
make_run_from_legacy_emg_db,
1417
)
@@ -71,6 +74,15 @@ def import_v5_analyses(mgys: str):
7174
sync_sample_metadata_from_ena(sample.ena_sample)
7275
run = make_run_from_legacy_emg_db(legacy_analysis.run, study)
7376

77+
assembly = None
78+
if legacy_analysis.experiment_type_id in (4, 7, 8): # Assembly types
79+
assembly = make_assembly_from_legacy_emg_db(
80+
legacy_analysis.secondary_accession,
81+
legacy_analysis.result_directory,
82+
study,
83+
sample,
84+
)
85+
7486
analysis, created = Analysis.objects.update_or_create(
7587
id=legacy_analysis.job_id,
7688
defaults={
@@ -80,6 +92,7 @@ def import_v5_analyses(mgys: str):
8092
"ena_study": study.ena_study,
8193
"pipeline_version": Analysis.PipelineVersions.v5,
8294
"run": run,
95+
"assembly": assembly,
8396
},
8497
)
8598
analysis.inherit_experiment_type()
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
from prefect import task, get_run_logger
2+
from sqlalchemy import select
3+
4+
from analyses.models import Study, Sample, Assembly
5+
from workflows.data_io_utils.legacy_emg_dbs import (
6+
LegacyAssembly,
7+
LegacySample,
8+
LegacyAssemblySample,
9+
LegacyRun,
10+
LegacyAssemblyRun,
11+
)
12+
from workflows.ena_utils.ena_api_requests import sync_sample_metadata_from_ena
13+
from workflows.flows.legacy.tasks.make_run_from_legacy_emg_db import (
14+
make_run_from_legacy_emg_db,
15+
)
16+
from workflows.flows.legacy.tasks.make_sample_from_legacy_emg_db import (
17+
make_sample_from_legacy_emg_db,
18+
)
19+
20+
21+
@task
22+
def make_assembly_from_legacy_emg_db(
23+
legacy_analysis_secondary_accession: str,
24+
legacy_analysis_result_directory: str,
25+
study: Study,
26+
sample: Sample,
27+
) -> Assembly | None:
28+
from workflows.data_io_utils.legacy_emg_dbs import legacy_emg_db_session
29+
30+
logger = get_run_logger()
31+
32+
with legacy_emg_db_session() as session:
33+
# Try to find a co-assembly/assembly in the legacy DB
34+
legacy_assembly_stmt = select(LegacyAssembly).where(
35+
LegacyAssembly.accession == legacy_analysis_secondary_accession
36+
)
37+
legacy_assembly: LegacyAssembly = session.scalar(legacy_assembly_stmt)
38+
39+
if not legacy_assembly:
40+
return None
41+
42+
# In this new schema, Assembly has a sample field and runs ManyToMany.
43+
# A co-assembly might have multiple runs and samples.
44+
# For now we use the analysis job's primary sample for the assembly.
45+
assembly, created = Assembly.objects.get_or_create(
46+
ena_study=study.ena_study,
47+
dir=legacy_analysis_result_directory,
48+
sample=sample,
49+
defaults={
50+
"ena_accessions": [legacy_assembly.accession],
51+
"reads_study": study,
52+
},
53+
)
54+
if created:
55+
logger.info(f"Created new Assembly object {assembly}")
56+
57+
# Ensure all samples linked to this assembly exist in the new DB
58+
# before creating the runs.
59+
legacy_samples_stmt = (
60+
select(LegacySample)
61+
.join(
62+
LegacyAssemblySample,
63+
LegacySample.sample_id == LegacyAssemblySample.sample_id,
64+
)
65+
.where(LegacyAssemblySample.assembly_id == legacy_assembly.assembly_id)
66+
)
67+
legacy_samples = session.scalars(legacy_samples_stmt).unique().all()
68+
for leg_sample in legacy_samples:
69+
s = make_sample_from_legacy_emg_db(leg_sample, study)
70+
sync_sample_metadata_from_ena(s.ena_sample)
71+
72+
# Link all runs associated with this legacy assembly
73+
legacy_runs_stmt = (
74+
select(LegacyRun)
75+
.join(
76+
LegacyAssemblyRun,
77+
LegacyRun.run_id == LegacyAssemblyRun.run_id,
78+
)
79+
.where(LegacyAssemblyRun.assembly_id == legacy_assembly.assembly_id)
80+
)
81+
legacy_runs = session.scalars(legacy_runs_stmt).unique().all()
82+
for leg_run in legacy_runs:
83+
r = make_run_from_legacy_emg_db(leg_run, study)
84+
assembly.runs.add(r)
85+
86+
return assembly
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import re
2+
3+
import pytest
4+
5+
from analyses.models import Analysis, Run
6+
from workflows.data_io_utils.legacy_emg_dbs import (
7+
LegacyStudy,
8+
LegacySample,
9+
LegacyRun,
10+
LegacyAnalysisJob,
11+
LegacyAssembly,
12+
LegacyAssemblyRun,
13+
LegacyAssemblySample,
14+
)
15+
from workflows.flows.legacy.flows.import_v5_analyses import import_v5_analyses
16+
from workflows.prefect_utils.testing_utils import (
17+
run_flow_and_capture_logs,
18+
should_not_mock_httpx_requests_to_prefect_server,
19+
)
20+
21+
22+
@pytest.fixture
23+
def coassembly_legacy_db(in_memory_legacy_emg_db):
24+
with in_memory_legacy_emg_db as session:
25+
# Add a co-assembly study (ID 6000)
26+
study = LegacyStudy(
27+
id=6000,
28+
centre_name="COASSEMBLY",
29+
study_name="Co-assembly study",
30+
ext_study_id="ERP6000",
31+
is_private=False,
32+
submission_account_id="Webin-6000",
33+
project_id="PRJ6000",
34+
is_suppressed=False,
35+
biome_id=1,
36+
)
37+
session.add(study)
38+
39+
# Two samples
40+
sample1 = LegacySample(
41+
sample_id=6001, ext_sample_id="ERS6001", primary_accession="SAMEA6001"
42+
)
43+
sample2 = LegacySample(
44+
sample_id=6002, ext_sample_id="ERS6002", primary_accession="SAMEA6002"
45+
)
46+
session.add_all([sample1, sample2])
47+
48+
# Two runs
49+
run1 = LegacyRun(
50+
run_id=6001,
51+
sample_id=6001,
52+
accession="ERR6001",
53+
experiment_type_id=4,
54+
secondary_accession="ERR6001",
55+
study_id=6000,
56+
instrument_platform="Illumina",
57+
instrument_model="HiSeq",
58+
)
59+
run2 = LegacyRun(
60+
run_id=6002,
61+
sample_id=6002,
62+
accession="ERR6002",
63+
experiment_type_id=4,
64+
secondary_accession="ERR6002",
65+
study_id=6000,
66+
instrument_platform="Illumina",
67+
instrument_model="HiSeq",
68+
)
69+
session.add_all([run1, run2])
70+
71+
# One assembly
72+
assembly = LegacyAssembly(
73+
assembly_id=6001, accession="ERZ6001", study_id=6000, experiment_type_id=4
74+
)
75+
session.add(assembly)
76+
77+
# Links
78+
session.add(
79+
LegacyAssemblyRun(assembly_run_id=6001, assembly_id=6001, run_id=6001)
80+
)
81+
session.add(
82+
LegacyAssemblyRun(assembly_run_id=6002, assembly_id=6001, run_id=6002)
83+
)
84+
session.add(
85+
LegacyAssemblySample(
86+
assembly_sample_id=6001, assembly_id=6001, sample_id=6001
87+
)
88+
)
89+
session.add(
90+
LegacyAssemblySample(
91+
assembly_sample_id=6002, assembly_id=6001, sample_id=6002
92+
)
93+
)
94+
95+
# Analysis job for the co-assembly
96+
analysis = LegacyAnalysisJob(
97+
job_id=66666,
98+
sample_id=6001,
99+
run_id=6001,
100+
study_id=6000,
101+
pipeline_id=6,
102+
result_directory="coassembly/results",
103+
external_run_ids="ERR6001,ERR6002",
104+
secondary_accession="ERZ6001",
105+
experiment_type_id=4,
106+
analysis_status_id=3,
107+
)
108+
session.add(analysis)
109+
session.commit()
110+
return in_memory_legacy_emg_db
111+
112+
113+
@pytest.mark.httpx_mock(should_mock=should_not_mock_httpx_requests_to_prefect_server)
114+
@pytest.mark.django_db(transaction=True)
115+
def test_import_coassembly(
116+
prefect_harness,
117+
coassembly_legacy_db,
118+
mock_legacy_emg_db_session,
119+
mock_mongo_client_for_taxonomy_and_protein_functions,
120+
httpx_mock,
121+
):
122+
httpx_mock.add_response(
123+
url=re.compile(r".*result=study.*ERP6000.*"),
124+
json=[{"study_accession": "ERP6000"}],
125+
is_reusable=True,
126+
)
127+
# Mock ENA sample metadata
128+
httpx_mock.add_response(
129+
url=re.compile(r".*result=sample.*SAMEA6001.*"),
130+
json=[{"sample_accession": "SAMEA6001"}],
131+
is_reusable=True,
132+
)
133+
httpx_mock.add_response(
134+
url=re.compile(r".*result=sample.*SAMEA6002.*"),
135+
json=[{"sample_accession": "SAMEA6002"}],
136+
is_reusable=True,
137+
)
138+
139+
run_flow_and_capture_logs(
140+
import_v5_analyses,
141+
mgys="MGYS00006000",
142+
)
143+
144+
analysis = Analysis.objects.get(id=66666)
145+
assert analysis.assembly is not None
146+
# ERZ is usually in ena_accessions
147+
assert "ERZ6001" in analysis.assembly.ena_accessions
148+
149+
# Check that the assembly is linked to both runs
150+
runs = analysis.assembly.runs.all()
151+
assert runs.count() == 2
152+
run_accessions = {r.first_accession for r in runs}
153+
assert run_accessions == {"ERR6001", "ERR6002"}
154+
155+
# Check that the assembly is linked to correct samples...
156+
# At the moment we expect only just first sample, as assemblies are transitioning to single virtual samples
157+
assert analysis.assembly.sample.first_accession == "SAMEA6001"
158+
assert not analysis.assembly.sample.related_samples.exists()
159+
# The other sample should still be linked via its run though
160+
assert (
161+
Run.objects.get(ena_accessions__contains=["ERR6002"]).sample.first_accession
162+
== "SAMEA6002"
163+
)

0 commit comments

Comments
 (0)