|
53 | 53 | TEST_SAVED_GENERIC_CLINICAL_CONTROL, |
54 | 54 | TEST_SAVED_GNOMAD_VARIANT, |
55 | 55 | TEST_USER, |
| 56 | + VALID_CLINGEN_CA_ID, |
56 | 57 | ) |
57 | 58 | from tests.helpers.dependency_overrider import DependencyOverrider |
58 | 59 | from tests.helpers.util.common import ( |
@@ -2853,6 +2854,83 @@ def test_download_scores_counts_and_post_mapped_variants_file( |
2853 | 2854 | ) |
2854 | 2855 |
|
2855 | 2856 |
|
| 2857 | +# Additional namespace export tests: VEP, ClinGen, gnomAD |
| 2858 | +def test_download_vep_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): |
| 2859 | + experiment = create_experiment(client) |
| 2860 | + score_set = create_seq_score_set(client, experiment["urn"]) |
| 2861 | + score_set = mock_worker_variant_insertion( |
| 2862 | + client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" |
| 2863 | + ) |
| 2864 | + # Create mapped variants with VEP consequence populated |
| 2865 | + create_mapped_variants_for_score_set(session, score_set["urn"], TEST_MAPPED_VARIANT_WITH_HGVS_G_EXPRESSION) |
| 2866 | + |
| 2867 | + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: |
| 2868 | + published_score_set = publish_score_set(client, score_set["urn"]) |
| 2869 | + worker_queue.assert_called_once() |
| 2870 | + |
| 2871 | + response = client.get( |
| 2872 | + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=vep&include_post_mapped_hgvs=true&drop_na_columns=true" |
| 2873 | + ) |
| 2874 | + assert response.status_code == 200 |
| 2875 | + reader = csv.DictReader(StringIO(response.text)) |
| 2876 | + assert "vep.vep_functional_consequence" in reader.fieldnames |
| 2877 | + # At least one row should contain the test consequence value |
| 2878 | + rows = list(reader) |
| 2879 | + assert any(row.get("vep.vep_functional_consequence") == "missense_variant" for row in rows) |
| 2880 | + |
| 2881 | + |
| 2882 | +def test_download_clingen_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): |
| 2883 | + experiment = create_experiment(client) |
| 2884 | + score_set = create_seq_score_set(client, experiment["urn"]) |
| 2885 | + score_set = mock_worker_variant_insertion( |
| 2886 | + client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" |
| 2887 | + ) |
| 2888 | + # Create mapped variants then set ClinGen allele id for first mapped variant |
| 2889 | + create_mapped_variants_for_score_set(session, score_set["urn"], TEST_MAPPED_VARIANT_WITH_HGVS_G_EXPRESSION) |
| 2890 | + db_score_set = session.query(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set["urn"]).one() |
| 2891 | + first_mapped_variant = db_score_set.variants[0].mapped_variants[0] |
| 2892 | + first_mapped_variant.clingen_allele_id = VALID_CLINGEN_CA_ID |
| 2893 | + session.add(first_mapped_variant) |
| 2894 | + session.commit() |
| 2895 | + |
| 2896 | + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: |
| 2897 | + published_score_set = publish_score_set(client, score_set["urn"]) |
| 2898 | + worker_queue.assert_called_once() |
| 2899 | + |
| 2900 | + response = client.get( |
| 2901 | + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=clingen&include_post_mapped_hgvs=true&drop_na_columns=true" |
| 2902 | + ) |
| 2903 | + assert response.status_code == 200 |
| 2904 | + reader = csv.DictReader(StringIO(response.text)) |
| 2905 | + assert "clingen.clingen_allele_id" in reader.fieldnames |
| 2906 | + rows = list(reader) |
| 2907 | + assert rows[0].get("clingen.clingen_allele_id") == VALID_CLINGEN_CA_ID |
| 2908 | + |
| 2909 | + |
| 2910 | +def test_download_gnomad_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): |
| 2911 | + experiment = create_experiment(client) |
| 2912 | + score_set = create_seq_score_set(client, experiment["urn"]) |
| 2913 | + score_set = mock_worker_variant_insertion( |
| 2914 | + client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" |
| 2915 | + ) |
| 2916 | + # Link a gnomAD variant to the first mapped variant (version may not match export filter) |
| 2917 | + score_set = create_seq_score_set_with_mapped_variants( |
| 2918 | + client, session, data_provider, experiment["urn"], data_files / "scores.csv" |
| 2919 | + ) |
| 2920 | + link_gnomad_variants_to_mapped_variants(session, score_set) |
| 2921 | + |
| 2922 | + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: |
| 2923 | + published_score_set = publish_score_set(client, score_set["urn"]) |
| 2924 | + worker_queue.assert_called_once() |
| 2925 | + |
| 2926 | + response = client.get( |
| 2927 | + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=gnomad&drop_na_columns=true" |
| 2928 | + ) |
| 2929 | + assert response.status_code == 200 |
| 2930 | + reader = csv.DictReader(StringIO(response.text)) |
| 2931 | + assert "gnomad.gnomad_af" in reader.fieldnames |
| 2932 | + |
| 2933 | + |
2856 | 2934 | ######################################################################################################################## |
2857 | 2935 | # Fetching clinical controls and control options for a score set |
2858 | 2936 | ######################################################################################################################## |
|
0 commit comments