Skip to content

Commit 7dff914

Browse files
committed
Add ClinGen Submission EnvVar to Control Submission
Adds a environment variable `CLIN_GEN_SUBMISSION_ENABLED` that controls whether ClinGen submission jobs are enqueued by the worker. Tests are edited to control the value of this environment variable.
1 parent dc6559a commit 7dff914

File tree

3 files changed

+109
-33
lines changed

3 files changed

+109
-33
lines changed

src/mavedb/lib/clingen/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import os
22

3+
CLIN_GEN_SUBMISSION_ENABLED = os.getenv("SUBMISSION_ENABLED", "false").lower() == "true"
4+
35
GENBOREE_ACCOUNT_NAME = os.getenv("GENBOREE_ACCOUNT_NAME")
46
GENBOREE_ACCOUNT_PASSWORD = os.getenv("GENBOREE_ACCOUNT_PASSWORD")
57

src/mavedb/worker/jobs.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
DEFAULT_LDH_SUBMISSION_BATCH_SIZE,
2020
LDH_SUBMISSION_URL,
2121
LINKED_DATA_RETRY_THRESHOLD,
22+
CLIN_GEN_SUBMISSION_ENABLED,
2223
)
2324
from mavedb.lib.clingen.content_constructors import construct_ldh_submission
2425
from mavedb.lib.clingen.linked_data_hub import (
@@ -560,20 +561,26 @@ async def map_variants_for_score_set(
560561

561562
new_job_id = None
562563
try:
563-
new_job = await redis.enqueue_job(
564-
"submit_score_set_mappings_to_ldh",
565-
correlation_id,
566-
score_set.id,
567-
)
564+
if CLIN_GEN_SUBMISSION_ENABLED:
565+
new_job = await redis.enqueue_job(
566+
"submit_score_set_mappings_to_ldh",
567+
correlation_id,
568+
score_set.id,
569+
)
568570

569-
if new_job:
570-
new_job_id = new_job.job_id
571+
if new_job:
572+
new_job_id = new_job.job_id
571573

572-
logging_context["submit_clingen_variants_job_id"] = new_job_id
573-
logger.info(msg="Queued a new ClinGen submission job.", extra=logging_context)
574+
logging_context["submit_clingen_variants_job_id"] = new_job_id
575+
logger.info(msg="Queued a new ClinGen submission job.", extra=logging_context)
574576

577+
else:
578+
raise SubmissionEnqueueError()
575579
else:
576-
raise SubmissionEnqueueError()
580+
logger.warning(
581+
msg="ClinGen submission is disabled, skipped submission of mapped variants to LDH.",
582+
extra=logging_context,
583+
)
577584

578585
except Exception as e:
579586
send_slack_error(e)

tests/worker/test_jobs.py

Lines changed: 90 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,19 @@ async def setup_records_files_and_variants_with_mapping(
129129
async def dummy_mapping_job():
130130
return await setup_mapping_output(async_client, session, score_set)
131131

132-
with patch.object(
133-
_UnixSelectorEventLoop,
134-
"run_in_executor",
135-
return_value=dummy_mapping_job(),
132+
with (
133+
patch.object(
134+
_UnixSelectorEventLoop,
135+
"run_in_executor",
136+
return_value=dummy_mapping_job(),
137+
),
138+
patch("mavedb.worker.jobs.CLIN_GEN_SUBMISSION_ENABLED", False),
136139
):
137140
result = await map_variants_for_score_set(standalone_worker_context, uuid4().hex, score_set.id, 1)
138141

139142
assert result["success"]
140143
assert not result["retried"]
141-
assert result["enqueued_job"] is not None
144+
assert result["enqueued_job"] is None
142145
return session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn)).one()
143146

144147

@@ -501,6 +504,7 @@ async def dummy_linking_job():
501504
patch.object(ClinGenLdhService, "_existing_jwt", return_value="test_jwt"),
502505
patch("mavedb.worker.jobs.MAPPING_BACKOFF_IN_SECONDS", 0),
503506
patch("mavedb.worker.jobs.LINKING_BACKOFF_IN_SECONDS", 0),
507+
patch("mavedb.worker.jobs.CLIN_GEN_SUBMISSION_ENABLED", True),
504508
):
505509
await arq_redis.enqueue_job("create_variants_for_score_set", uuid4().hex, score_set.id, 1, scores, counts)
506510
await arq_worker.async_run()
@@ -595,10 +599,13 @@ async def dummy_mapping_job():
595599
# We seem unable to mock requests via requests_mock that occur inside another event loop. Workaround
596600
# this limitation by instead patching the _UnixSelectorEventLoop 's executor function, with a coroutine
597601
# object that sets up test mappingn output.
598-
with patch.object(
599-
_UnixSelectorEventLoop,
600-
"run_in_executor",
601-
return_value=dummy_mapping_job(),
602+
with (
603+
patch.object(
604+
_UnixSelectorEventLoop,
605+
"run_in_executor",
606+
return_value=dummy_mapping_job(),
607+
),
608+
patch("mavedb.worker.jobs.CLIN_GEN_SUBMISSION_ENABLED", True),
602609
):
603610
result = await map_variants_for_score_set(standalone_worker_context, uuid4().hex, score_set.id, 1)
604611

@@ -638,10 +645,13 @@ async def dummy_mapping_job():
638645
# We seem unable to mock requests via requests_mock that occur inside another event loop. Workaround
639646
# this limitation by instead patching the _UnixSelectorEventLoop 's executor function, with a coroutine
640647
# object that sets up test mappingn output.
641-
with patch.object(
642-
_UnixSelectorEventLoop,
643-
"run_in_executor",
644-
return_value=dummy_mapping_job(),
648+
with (
649+
patch.object(
650+
_UnixSelectorEventLoop,
651+
"run_in_executor",
652+
return_value=dummy_mapping_job(),
653+
),
654+
patch("mavedb.worker.jobs.CLIN_GEN_SUBMISSION_ENABLED", True),
645655
):
646656
existing_variant = session.scalars(select(Variant)).first()
647657

@@ -1061,10 +1071,13 @@ async def dummy_mapping_job():
10611071
# We seem unable to mock requests via requests_mock that occur inside another event loop. Workaround
10621072
# this limitation by instead patching the _UnixSelectorEventLoop 's executor function, with a coroutine
10631073
# object that sets up test mappingn output.
1064-
with patch.object(
1065-
_UnixSelectorEventLoop,
1066-
"run_in_executor",
1067-
return_value=dummy_mapping_job(),
1074+
with (
1075+
patch.object(
1076+
_UnixSelectorEventLoop,
1077+
"run_in_executor",
1078+
return_value=dummy_mapping_job(),
1079+
),
1080+
patch("mavedb.worker.jobs.CLIN_GEN_SUBMISSION_ENABLED", True),
10681081
):
10691082
result = await map_variants_for_score_set(standalone_worker_context, uuid4().hex, score_set.id, 1)
10701083

@@ -1434,10 +1447,59 @@ async def dummy_linking_job():
14341447
patch.object(ClinGenLdhService, "_existing_jwt", return_value="test_jwt"),
14351448
patch("mavedb.worker.jobs.MAPPING_BACKOFF_IN_SECONDS", 0),
14361449
patch("mavedb.worker.jobs.LINKING_BACKOFF_IN_SECONDS", 0),
1450+
patch("mavedb.worker.jobs.CLIN_GEN_SUBMISSION_ENABLED", True),
14371451
):
1438-
await arq_redis.enqueue_job("variant_mapper_manager", uuid4().hex, 1)
14391452
await arq_worker.async_run()
1440-
await arq_worker.run_check()
1453+
num_completed_jobs = await arq_worker.run_check()
1454+
1455+
# We should have completed all jobs exactly once.
1456+
assert num_completed_jobs == 4
1457+
1458+
score_set = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn)).one()
1459+
mapped_variants_for_score_set = session.scalars(
1460+
select(MappedVariant).join(Variant).join(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set.urn)
1461+
).all()
1462+
assert (await arq_redis.llen(MAPPING_QUEUE_NAME)) == 0
1463+
assert (await arq_redis.get(MAPPING_CURRENT_ID_NAME)).decode("utf-8") == ""
1464+
assert len(mapped_variants_for_score_set) == score_set.num_variants
1465+
assert score_set.mapping_state == MappingState.complete
1466+
assert score_set.mapping_errors is None
1467+
1468+
1469+
@pytest.mark.asyncio
1470+
async def test_mapping_manager_enqueues_mapping_process_with_successful_mapping_linking_disabled(
1471+
setup_worker_db, standalone_worker_context, session, async_client, data_files, arq_worker, arq_redis
1472+
):
1473+
score_set = await setup_records_files_and_variants(
1474+
session,
1475+
async_client,
1476+
data_files,
1477+
TEST_MINIMAL_SEQ_SCORESET,
1478+
standalone_worker_context,
1479+
)
1480+
1481+
async def dummy_mapping_job():
1482+
return await setup_mapping_output(async_client, session, score_set)
1483+
1484+
# We seem unable to mock requests via requests_mock that occur inside another event loop. Workaround
1485+
# this limitation by instead patching the _UnixSelectorEventLoop 's executor function, with a coroutine
1486+
# object that sets up test mappingn output.
1487+
with (
1488+
patch.object(
1489+
_UnixSelectorEventLoop,
1490+
"run_in_executor",
1491+
side_effect=[dummy_mapping_job()],
1492+
),
1493+
patch.object(ClinGenLdhService, "_existing_jwt", return_value="test_jwt"),
1494+
patch("mavedb.worker.jobs.MAPPING_BACKOFF_IN_SECONDS", 0),
1495+
patch("mavedb.worker.jobs.LINKING_BACKOFF_IN_SECONDS", 0),
1496+
patch("mavedb.worker.jobs.CLIN_GEN_SUBMISSION_ENABLED", False),
1497+
):
1498+
await arq_worker.async_run()
1499+
num_completed_jobs = await arq_worker.run_check()
1500+
1501+
# We should have completed the manager and mapping jobs, but not the submission or linking jobs.
1502+
assert num_completed_jobs == 2
14411503

14421504
score_set = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn)).one()
14431505
mapped_variants_for_score_set = session.scalars(
@@ -1491,10 +1553,13 @@ async def dummy_linking_job():
14911553
patch.object(ClinGenLdhService, "_existing_jwt", return_value="test_jwt"),
14921554
patch("mavedb.worker.jobs.MAPPING_BACKOFF_IN_SECONDS", 0),
14931555
patch("mavedb.worker.jobs.LINKING_BACKOFF_IN_SECONDS", 0),
1556+
patch("mavedb.worker.jobs.CLIN_GEN_SUBMISSION_ENABLED", True),
14941557
):
1495-
await arq_redis.enqueue_job("variant_mapper_manager", uuid4().hex, 1)
14961558
await arq_worker.async_run()
1497-
await arq_worker.run_check()
1559+
num_completed_jobs = await arq_worker.run_check()
1560+
1561+
# We should have completed the mapping manager job twice, the mapping job twice, the submission job, and the linking job.
1562+
assert num_completed_jobs == 6
14981563

14991564
score_set = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn)).one()
15001565
mapped_variants_for_score_set = session.scalars(
@@ -1533,9 +1598,11 @@ async def failed_mapping_job():
15331598
),
15341599
patch("mavedb.worker.jobs.MAPPING_BACKOFF_IN_SECONDS", 0),
15351600
):
1536-
await arq_redis.enqueue_job("variant_mapper_manager", uuid4().hex, 1)
15371601
await arq_worker.async_run()
1538-
await arq_worker.run_check()
1602+
num_completed_jobs = await arq_worker.run_check()
1603+
1604+
# We should have completed 6 mapping jobs and 6 management jobs.
1605+
assert num_completed_jobs == 12
15391606

15401607
score_set = session.scalars(select(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn)).one()
15411608
mapped_variants_for_score_set = session.scalars(

0 commit comments

Comments
 (0)