Skip to content

Commit 313053d

Browse files
committed
feat: refactor pillar project calibration loader for use against simplified json
1 parent a04ccb0 commit 313053d

File tree

1 file changed

+213
-147
lines changed

1 file changed

+213
-147
lines changed

src/mavedb/scripts/load_pp_style_calibration.py

Lines changed: 213 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -1,174 +1,240 @@
1+
"""Load an archive of Zeiberg calibration style calibrations into Score Sets.
2+
3+
This script processes JSON calibration files from an archive directory and applies them
4+
to MaveDB Score Sets based on a dataset mapping file. The script iterates through all
5+
JSON files in the archive directory, extracts dataset names from filenames, looks up
6+
corresponding Score Set URNs in the mapping file, and creates ACMG-style functional
7+
range calibrations for each Score Set.
8+
9+
Args:
10+
archive_path (str): Path to directory containing calibration JSON files
11+
dataset_map (str): Path to JSON file mapping dataset names to Score Set URNs
12+
overwrite (bool): Whether to overwrite existing "Zeiberg calibration" entries
13+
14+
Input File Formats:
15+
16+
1. Archive Directory Structure:
17+
- Contains JSON files named after datasets (e.g., "data_set_name.json")
18+
- May include "_clinvar_2018" variant files (e.g., "data_set_name_clinvar_2018.json")
19+
- Script automatically detects and processes both variants
20+
21+
2. Calibration JSON File Format:
22+
{
23+
"prior": 0.01548246603645654,
24+
"point_ranges": {
25+
"1": [[[0.7222, 0.9017]]], // BS3 Supporting (-1 to 1 points)
26+
"2": [[[0.9017, 1.1315]]], // BS3 Moderate (-2 to 2 points)
27+
"3": [[[1.1315, 5.3892]]], // BS3 Moderate+ (-3 to 3 points)
28+
"4": [], // BS3 Strong (-4 to 4 points)
29+
"-1": [[[-0.6934, -0.3990]]], // PS3 Supporting
30+
"-2": [[[-6.5761, -0.6934]]], // PS3 Moderate
31+
// ... other point values (-8 to 8)
32+
},
33+
"dataset": "data_set_name",
34+
"relax": false,
35+
"n_c": "2c",
36+
"benign_method": "benign",
37+
"clinvar_2018": false
38+
}
39+
40+
3. Dataset Mapping JSON File Format:
41+
{
42+
"data_set_name": "urn:mavedb:00000050-a-1",
43+
"data_set_with_urn_list": "urn:mavedb:00000060-a-1, urn:mavedb:00000060-a-2",
44+
// ... more dataset mappings
45+
}
46+
47+
Behavior:
48+
49+
1. File Discovery: Scans archive directory for all .json files
50+
2. Dataset Name Extraction: Removes .json extension and _clinvar_2018 suffix
51+
3. Mapping Lookup: Finds Score Set URNs for each dataset in mapping file
52+
4. URN Processing: Handles comma-separated URN lists for datasets with multiple Score Sets
53+
5. Calibration Creation: Creates functional ranges with ACMG classifications:
54+
- Positive points (1-8): PS3 classifications for "abnormal" variants
55+
- Negative points (-1 to -8): BS3 classifications for "normal" variants
56+
- Strength labels: Supporting (±1), Moderate (±2), Moderate+ (±3), Strong (±4-8)
57+
6. File Variants: Automatically detects and processes both regular and ClinVar 2018 variants
58+
7. Calibration Naming:
59+
- Regular files: "Zeiberg calibration"
60+
- ClinVar 2018 files: "Zeiberg calibration (ClinVar 2018)"
61+
62+
Skipping Behavior:
63+
- Files with no mapping entry or empty/invalid URNs (N/A, #VALUE!, empty string)
64+
- Score Sets that don't exist in the database
65+
- JSON files that can't be parsed
66+
67+
Output Statistics:
68+
- Total JSON files found in archive
69+
- Number of calibrations created vs updated
70+
- Number of unmapped files
71+
- Number of non-existing Score Sets
72+
73+
Example Usage:
74+
python load_pp_style_calibration.py /path/to/calibrations_archive /path/to/dataset_mapping.json
75+
python load_pp_style_calibration.py /path/to/calibrations_archive /path/to/dataset_mapping.json --overwrite
76+
"""
77+
78+
import asyncio
179
import json
2-
import math
3-
from typing import Any, Callable, Dict, List, Optional
80+
import os
81+
from typing import Dict, List, Optional
482

583
import click
684
from sqlalchemy.orm import Session
785

886
from mavedb.lib.score_calibrations import create_score_calibration_in_score_set
87+
from mavedb.models.score_calibration import ScoreCalibration
988
from mavedb.models.score_set import ScoreSet
1089
from mavedb.models.user import User
1190
from mavedb.scripts.environment import with_database_session
12-
from mavedb.view_models.acmg_classification import ACMGClassificationCreate
13-
from mavedb.view_models.score_calibration import FunctionalRangeCreate, ScoreCalibrationCreate
91+
from mavedb.view_models import acmg_classification, score_calibration
1492

15-
# Evidence strength ordering definitions
16-
PATH_STRENGTHS: List[int] = [1, 2, 3, 4, 8]
17-
BENIGN_STRENGTHS: List[int] = [-1, -2, -3, -4, -8]
93+
POINT_LABEL_MAPPINGS: Dict[int, str] = {
94+
8: "Very Strong",
95+
7: "Very Strong",
96+
6: "Very Strong",
97+
5: "Very Strong",
98+
4: "Strong",
99+
3: "Moderate+",
100+
2: "Moderate",
101+
1: "Supporting",
102+
}
18103

104+
ALL_POINT_LABEL_MAPPINGS = {**POINT_LABEL_MAPPINGS, **{k * -1: v for k, v in POINT_LABEL_MAPPINGS.items()}}
19105

20-
def _not_nan(v: Any) -> bool:
21-
return v is not None and not (isinstance(v, float) and math.isnan(v))
22106

107+
@click.command()
108+
@with_database_session
109+
@click.argument("archive_path", type=click.Path(exists=True, file_okay=False))
110+
@click.argument("dataset_map", type=click.Path(exists=True, dir_okay=False))
111+
@click.option("--overwrite", is_flag=True, default=False, help="Overwrite existing `Zeiberg calibration` in score set")
112+
def main(db: Session, archive_path: str, dataset_map: str, overwrite: bool) -> None:
113+
"""Load an archive of Zeiberg calibration style calibrations into Score Sets"""
114+
with open(dataset_map, "r") as f:
115+
dataset_mapping: Dict[str, str] = json.load(f)
116+
117+
system_user: User = db.query(User).filter(User.id == 1).one()
118+
119+
# Get all JSON files in the archive directory
120+
json_files = [f for f in os.listdir(archive_path) if f.endswith(".json")]
121+
total_json_files = len(json_files)
122+
123+
created_calibrations = 0
124+
updated_calibrations = 0
125+
non_existing_score_sets = 0
126+
unmapped_files = []
127+
128+
click.echo(f"Found {total_json_files} JSON files in archive directory: {archive_path}")
129+
130+
for json_file in json_files:
131+
with open(os.path.join(archive_path, json_file), "r") as f:
132+
calibration_data = json.load(f)
133+
dataset_name = calibration_data.get("dataset", None)
134+
click.echo(f"Processing calibration file: {json_file} (dataset: {dataset_name})")
135+
136+
if not dataset_name:
137+
click.echo(f" Dataset name not found in calibration file {json_file}, skipping...", err=True)
138+
unmapped_files.append(json_file)
139+
continue
23140

24-
def _collapse_duplicate_thresholds(m: dict[int, Optional[float]], comparator: Callable) -> dict[int, float]:
25-
collapsed: dict[int, float] = {}
141+
# Look up dataset in mapping
142+
if "_clinvar_2018" in json_file:
143+
dataset_name = dataset_name.replace("_clinvar_2018", "")
26144

27-
for strength, threshold in m.items():
28-
if threshold is None:
145+
score_set_urns_str = dataset_mapping.get(dataset_name)
146+
if not score_set_urns_str or score_set_urns_str in ["", "N/A", "#VALUE!"]:
147+
click.echo(f" Dataset {dataset_name} not found in mapping or has no URNs, skipping...", err=True)
148+
unmapped_files.append(json_file)
29149
continue
30150

31-
if threshold in collapsed.values():
32-
# If the value is already present, we need to find the key it's associated with
33-
current_strongest_strength = next(s for s, t in collapsed.items() if t == threshold)
34-
35-
# If the keys are different, we need to merge them. Keep the strongest one as decided
36-
# by the provided comparator.
37-
if current_strongest_strength != strength:
38-
new_strongest_evidence = comparator(current_strongest_strength, strength)
39-
collapsed.pop(current_strongest_strength)
40-
collapsed[new_strongest_evidence] = threshold
41-
42-
else:
43-
collapsed[strength] = threshold
44-
45-
return collapsed
46-
47-
48-
def build_pathogenic_ranges(thresholds: List[Optional[float]], inverted: bool) -> List[FunctionalRangeCreate]:
49-
raw_mapping = {
50-
strength: thresholds[idx]
51-
for idx, strength in enumerate(PATH_STRENGTHS)
52-
if idx < len(thresholds) and _not_nan(thresholds[idx])
53-
}
54-
mapping = _collapse_duplicate_thresholds(raw_mapping, max)
55-
56-
# Only retain strengths if they are in the mapping. In inverted mode, upper is 'more pathogenic', which is
57-
# the opposite of how the pathogenic ranges are given to us. Therefore if the inverted flag is false, we must reverse the
58-
# order in which we handle ranges.
59-
available = [s for s in PATH_STRENGTHS if s in mapping]
60-
ordering = available[::-1] if not inverted else available
61-
62-
ranges: List[FunctionalRangeCreate] = []
63-
for i, s in enumerate(ordering):
64-
lower: Optional[float]
65-
upper: Optional[float]
66-
67-
if inverted:
68-
lower = mapping[s]
69-
upper = mapping[ordering[i + 1]] if i + 1 < len(ordering) else None
70-
else:
71-
lower = None if i == 0 else mapping[ordering[i - 1]]
72-
upper = mapping[s]
73-
74-
ranges.append(
75-
FunctionalRangeCreate(
76-
label=str(s),
77-
classification="abnormal",
78-
range=(lower, upper),
79-
# Whichever bound interacts with infinity will always be exclusive, with the opposite always inclusive.
80-
inclusive_lower_bound=False if not inverted else True,
81-
inclusive_upper_bound=False if inverted else True,
82-
acmg_classification=ACMGClassificationCreate(points=s),
151+
# Handle comma-separated list of score set URNs
152+
score_set_urns = [urn.strip() for urn in score_set_urns_str.split(",") if urn.strip()]
153+
154+
# Process each score set URN for this calibration file
155+
for score_set_urn in score_set_urns:
156+
click.echo(f" Applying calibration to Score Set {score_set_urn}...")
157+
158+
score_set: Optional[ScoreSet] = db.query(ScoreSet).filter(ScoreSet.urn == score_set_urn).one_or_none()
159+
if not score_set:
160+
click.echo(f" Score Set with URN {score_set_urn} not found, skipping...", err=True)
161+
non_existing_score_sets += 1
162+
continue
163+
164+
# Determine calibration name based on file name
165+
if "_clinvar_2018" in json_file:
166+
calibration_name = "Zeiberg calibration (ClinVar 2018)"
167+
else:
168+
calibration_name = "Zeiberg calibration"
169+
170+
existing_calibration = None
171+
if overwrite:
172+
existing_calibration = (
173+
db.query(ScoreCalibration)
174+
.filter(ScoreCalibration.score_set_id == score_set.id)
175+
.filter(ScoreCalibration.title == calibration_name)
176+
.one_or_none()
177+
)
178+
179+
if existing_calibration:
180+
db.delete(existing_calibration)
181+
db.flush()
182+
click.echo(f" Overwriting existing '{calibration_name}' in Score Set {score_set.urn}")
183+
184+
functional_ranges: List[score_calibration.FunctionalRangeCreate] = []
185+
for points, range_data in calibration_data.get("point_ranges", {}).items():
186+
if not range_data:
187+
continue
188+
189+
range_data = tuple(float(bound) for bound in range_data[0])
190+
points = int(points.strip())
191+
ps_or_bs = "PS3" if points > 0 else "BS3"
192+
strength_label = ALL_POINT_LABEL_MAPPINGS.get(points, "Unknown")
193+
194+
functional_range = score_calibration.FunctionalRangeCreate(
195+
label=f"{ps_or_bs} {strength_label} ({points})",
196+
classification="abnormal" if points > 0 else "normal",
197+
range=range_data,
198+
acmg_classification=acmg_classification.ACMGClassificationCreate(
199+
points=int(points),
200+
),
201+
inclusive_lower_bound=True,
202+
inclusive_upper_bound=False,
203+
)
204+
functional_ranges.append(functional_range)
205+
206+
score_calibration_create = score_calibration.ScoreCalibrationCreate(
207+
title=calibration_name,
208+
functional_ranges=functional_ranges,
209+
research_use_only=True,
210+
score_set_urn=score_set.urn,
211+
calibration_metadata={"prior_probability_pathogenicity": calibration_data.get("prior", None)},
83212
)
84-
)
85-
return ranges
86-
87-
88-
def build_benign_ranges(thresholds: List[Optional[float]], inverted: bool) -> List[FunctionalRangeCreate]:
89-
raw_mapping = {
90-
strength: thresholds[idx]
91-
for idx, strength in enumerate(BENIGN_STRENGTHS)
92-
if idx < len(thresholds) and _not_nan(thresholds[idx])
93-
}
94-
mapping = _collapse_duplicate_thresholds(raw_mapping, min)
95-
96-
# Only retain strengths if they are in the mapping. In inverted mode, lower is 'more normal', which is
97-
# how the benign ranges are given to us. Therefore if the inverted flag is false, we must reverse the
98-
# order in which we handle ranges.
99-
available = [s for s in BENIGN_STRENGTHS if s in mapping]
100-
ordering = available[::-1] if inverted else available
101-
102-
ranges: List[FunctionalRangeCreate] = []
103-
for i, s in enumerate(ordering):
104-
lower: Optional[float]
105-
upper: Optional[float]
106-
107-
if not inverted:
108-
lower = mapping[s]
109-
upper = mapping[ordering[i + 1]] if i + 1 < len(ordering) else None
110-
else:
111-
lower = None if i == 0 else mapping[ordering[i - 1]]
112-
upper = mapping[s]
113-
114-
ranges.append(
115-
FunctionalRangeCreate(
116-
label=str(s),
117-
classification="normal",
118-
range=(lower, upper),
119-
# Whichever bound interacts with infinity will always be exclusive, with the opposite always inclusive.
120-
inclusive_lower_bound=False if inverted else True,
121-
inclusive_upper_bound=False if not inverted else True,
122-
acmg_classification=ACMGClassificationCreate(points=s),
213+
214+
new_calibration_object = asyncio.run(
215+
create_score_calibration_in_score_set(db, score_calibration_create, system_user)
123216
)
124-
)
125-
return ranges
217+
new_calibration_object.primary = False
218+
new_calibration_object.private = False
219+
db.add(new_calibration_object)
126220

221+
click.echo(f" Successfully created calibration '{calibration_name}' for Score Set {score_set.urn}")
222+
db.flush()
127223

128-
@click.command()
129-
@with_database_session
130-
@click.argument("json_path", type=click.Path(exists=True, dir_okay=False, readable=True))
131-
@click.argument("score_set_urn", type=str)
132-
@click.argument("public", default=False, type=bool)
133-
@click.argument("user_id", type=int)
134-
def main(db: Session, json_path: str, score_set_urn: str, public: bool, user_id: int) -> None:
135-
"""Load pillar project calibration JSON into a score set's zeiberg_calibration score ranges."""
136-
score_set: Optional[ScoreSet] = db.query(ScoreSet).filter(ScoreSet.urn == score_set_urn).one_or_none()
137-
if not score_set:
138-
raise click.ClickException(f"Score set with URN {score_set_urn} not found")
139-
140-
user: Optional[User] = db.query(User).filter(User.id == user_id).one_or_none()
141-
if not user:
142-
raise click.ClickException(f"User with ID {user_id} not found")
143-
144-
with open(json_path, "r") as fh:
145-
data: Dict[str, Any] = json.load(fh)
146-
147-
path_thresholds = data.get("final_pathogenic_thresholds") or []
148-
benign_thresholds = data.get("final_benign_thresholds") or []
149-
# Lower is 'more normal' in inverted mode
150-
inverted = data.get("inverted") == "inverted"
151-
152-
path_ranges = build_pathogenic_ranges(path_thresholds, inverted)
153-
benign_ranges = build_benign_ranges(benign_thresholds, inverted)
154-
155-
if not path_ranges and not benign_ranges:
156-
raise click.ClickException("No valid thresholds found to build ranges.")
157-
158-
calibration_create = ScoreCalibrationCreate(
159-
title="Zeiberg Calibration",
160-
research_use_only=True,
161-
primary=False,
162-
investigator_provided=False,
163-
private=not public,
164-
functional_ranges=path_ranges + benign_ranges,
165-
score_set_urn=score_set_urn,
224+
if existing_calibration:
225+
updated_calibrations += 1
226+
else:
227+
created_calibrations += 1
228+
229+
click.echo(
230+
"---\n"
231+
f"Created {created_calibrations} calibrations, updated {updated_calibrations} calibrations ({created_calibrations + updated_calibrations} total). Non-existing score sets: {non_existing_score_sets}."
166232
)
167-
calibration = create_score_calibration_in_score_set(db, calibration_create, user)
168-
db.add(calibration)
169233
click.echo(
170-
f"Loaded {len(path_ranges)} pathogenic and {len(benign_ranges)} benign ranges into score set {score_set_urn} (inverted={inverted})."
234+
f"{len(unmapped_files)} unmapped calibration files out of {total_json_files} files in archive. Unmapped files were:"
171235
)
236+
for unmapped_file in unmapped_files:
237+
click.echo(f" - {unmapped_file}")
172238

173239

174240
if __name__ == "__main__": # pragma: no cover

0 commit comments

Comments
 (0)