Skip to content

Commit f6443e7

Browse files
authored
Merge pull request #507 from VariantEffect/maintenance/bencap/pp-clinvar-script-updates
Calibration script and ClinVar control refresh script improvements
2 parents 613718c + 0aa3836 commit f6443e7

File tree

2 files changed

+179
-1
lines changed

2 files changed

+179
-1
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
from typing import Callable
2+
import json
3+
import math
4+
import click
5+
from typing import List, Dict, Any, Optional
6+
7+
from sqlalchemy.orm import Session
8+
9+
from mavedb.scripts.environment import with_database_session
10+
from mavedb.models.score_set import ScoreSet
11+
from mavedb.view_models.score_range import (
12+
PillarProjectScoreRangeCreate,
13+
PillarProjectScoreRangesCreate,
14+
ScoreSetRangesCreate,
15+
)
16+
17+
# Evidence strength ordering definitions
18+
PATH_STRENGTHS: List[int] = [1, 2, 3, 4, 8]
19+
BENIGN_STRENGTHS: List[int] = [-1, -2, -3, -4, -8]
20+
21+
22+
def _not_nan(v: Any) -> bool:
23+
return v is not None and not (isinstance(v, float) and math.isnan(v))
24+
25+
26+
def _collapse_duplicate_thresholds(m: dict[int, Optional[float]], comparator: Callable) -> dict[int, float]:
27+
collapsed: dict[int, float] = {}
28+
29+
for strength, threshold in m.items():
30+
if threshold is None:
31+
continue
32+
33+
if threshold in collapsed.values():
34+
# If the value is already present, we need to find the key it's associated with
35+
current_strongest_strength = next(s for s, t in collapsed.items() if t == threshold)
36+
37+
# If the keys are different, we need to merge them. Keep the strongest one as decided
38+
# by the provided comparator.
39+
if current_strongest_strength != strength:
40+
new_strongest_evidence = comparator(current_strongest_strength, strength)
41+
collapsed.pop(current_strongest_strength)
42+
collapsed[new_strongest_evidence] = threshold
43+
44+
else:
45+
collapsed[strength] = threshold
46+
47+
return collapsed
48+
49+
50+
def build_pathogenic_ranges(thresholds: List[Optional[float]], inverted: bool) -> List[PillarProjectScoreRangeCreate]:
51+
raw_mapping = {
52+
strength: thresholds[idx]
53+
for idx, strength in enumerate(PATH_STRENGTHS)
54+
if idx < len(thresholds) and _not_nan(thresholds[idx])
55+
}
56+
mapping = _collapse_duplicate_thresholds(raw_mapping, max)
57+
58+
# Only retain strengths if they are in the mapping. In inverted mode, upper is 'more pathogenic', which is
59+
# the opposite of how the pathogenic ranges are given to us. Therefore if the inverted flag is false, we must reverse the
60+
# order in which we handle ranges.
61+
available = [s for s in PATH_STRENGTHS if s in mapping]
62+
ordering = available[::-1] if not inverted else available
63+
64+
ranges: List[PillarProjectScoreRangeCreate] = []
65+
for i, s in enumerate(ordering):
66+
lower: Optional[float]
67+
upper: Optional[float]
68+
69+
if inverted:
70+
lower = mapping[s]
71+
upper = mapping[ordering[i + 1]] if i + 1 < len(ordering) else None
72+
else:
73+
lower = None if i == 0 else mapping[ordering[i - 1]]
74+
upper = mapping[s]
75+
76+
ranges.append(
77+
PillarProjectScoreRangeCreate(
78+
label=str(s),
79+
classification="abnormal",
80+
evidence_strength=s,
81+
range=(lower, upper),
82+
# Whichever bound interacts with infinity will always be exclusive, with the opposite always inclusive.
83+
inclusive_lower_bound=False if not inverted else True,
84+
inclusive_upper_bound=False if inverted else True,
85+
)
86+
)
87+
return ranges
88+
89+
90+
def build_benign_ranges(thresholds: List[Optional[float]], inverted: bool) -> List[PillarProjectScoreRangeCreate]:
91+
raw_mapping = {
92+
strength: thresholds[idx]
93+
for idx, strength in enumerate(BENIGN_STRENGTHS)
94+
if idx < len(thresholds) and _not_nan(thresholds[idx])
95+
}
96+
mapping = _collapse_duplicate_thresholds(raw_mapping, min)
97+
98+
# Only retain strengths if they are in the mapping. In inverted mode, lower is 'more normal', which is
99+
# how the benign ranges are given to us. Therefore if the inverted flag is false, we must reverse the
100+
# order in which we handle ranges.
101+
available = [s for s in BENIGN_STRENGTHS if s in mapping]
102+
ordering = available[::-1] if inverted else available
103+
104+
ranges: List[PillarProjectScoreRangeCreate] = []
105+
for i, s in enumerate(ordering):
106+
lower: Optional[float]
107+
upper: Optional[float]
108+
109+
if not inverted:
110+
lower = mapping[s]
111+
upper = mapping[ordering[i + 1]] if i + 1 < len(ordering) else None
112+
else:
113+
lower = None if i == 0 else mapping[ordering[i - 1]]
114+
upper = mapping[s]
115+
116+
ranges.append(
117+
PillarProjectScoreRangeCreate(
118+
label=str(s),
119+
classification="normal",
120+
evidence_strength=s,
121+
range=(lower, upper),
122+
# Whichever bound interacts with infinity will always be exclusive, with the opposite always inclusive.
123+
inclusive_lower_bound=False if inverted else True,
124+
inclusive_upper_bound=False if not inverted else True,
125+
)
126+
)
127+
return ranges
128+
129+
130+
@click.command()
131+
@with_database_session
132+
@click.argument("json_path", type=click.Path(exists=True, dir_okay=False, readable=True))
133+
@click.argument("score_set_urn", type=str)
134+
@click.option("--overwrite", is_flag=True, default=False, help="Overwrite existing score_ranges if present.")
135+
def main(db: Session, json_path: str, score_set_urn: str, overwrite: bool) -> None:
136+
"""Load pillar project calibration JSON into a score set's pillar_project score ranges."""
137+
score_set: Optional[ScoreSet] = db.query(ScoreSet).filter(ScoreSet.urn == score_set_urn).one_or_none()
138+
if not score_set:
139+
raise click.ClickException(f"Score set with URN {score_set_urn} not found")
140+
141+
if score_set.score_ranges and score_set.score_ranges["pillar_project"] and not overwrite:
142+
raise click.ClickException(
143+
"pillar project score ranges already present for this score set. Use --overwrite to replace them."
144+
)
145+
146+
if not score_set.score_ranges:
147+
existing_score_ranges = ScoreSetRangesCreate()
148+
else:
149+
existing_score_ranges = ScoreSetRangesCreate(**score_set.score_ranges)
150+
151+
with open(json_path, "r") as fh:
152+
data: Dict[str, Any] = json.load(fh)
153+
154+
path_thresholds = data.get("final_pathogenic_thresholds") or []
155+
benign_thresholds = data.get("final_benign_thresholds") or []
156+
# Lower is 'more normal' in inverted mode
157+
inverted = data.get("inverted") == "inverted"
158+
159+
path_ranges = build_pathogenic_ranges(path_thresholds, inverted)
160+
benign_ranges = build_benign_ranges(benign_thresholds, inverted)
161+
162+
if not path_ranges and not benign_ranges:
163+
raise click.ClickException("No valid thresholds found to build ranges.")
164+
165+
existing_score_ranges.pillar_project = PillarProjectScoreRangesCreate(ranges=path_ranges + benign_ranges)
166+
score_set.score_ranges = existing_score_ranges.model_dump(exclude_none=True)
167+
168+
db.add(score_set)
169+
click.echo(
170+
f"Loaded {len(path_ranges)} pathogenic and {len(benign_ranges)} benign ranges into score set {score_set_urn} (inverted={inverted})."
171+
)
172+
173+
174+
if __name__ == "__main__": # pragma: no cover
175+
main()

src/mavedb/scripts/refresh_clinvar_variant_data.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ def refresh_clinvar_variants(db: Session, month: Optional[str], year: str, urns:
8585
select(distinct(MappedVariant.clingen_allele_id))
8686
.join(Variant)
8787
.join(ScoreSet)
88-
.where(MappedVariant.current.is_(True), MappedVariant.post_mapped.is_not(None))
8988
.where(
9089
and_(
9190
MappedVariant.clingen_allele_id.is_not(None),
@@ -105,6 +104,10 @@ def refresh_clinvar_variants(db: Session, month: Optional[str], year: str, urns:
105104
if total_variants_with_clingen_ids > 0 and index % (max(total_variants_with_clingen_ids // 100, 1)) == 0:
106105
logger.info(f"Progress: {index / total_variants_with_clingen_ids:.0%}")
107106

107+
if clingen_id is not None and "," in clingen_id:
108+
logger.debug("Detected a multi-variant ClinGen allele ID, skipping.")
109+
continue
110+
108111
# Guaranteed based on our query filters.
109112
clingen_data = query_clingen_allele_api(clingen_id) # type: ignore
110113
clinvar_allele_id = clingen_data.get("externalRecords", {}).get("ClinVarAlleles", [{}])[0].get("alleleId")

0 commit comments

Comments
 (0)