Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
360 changes: 213 additions & 147 deletions src/mavedb/scripts/load_pp_style_calibration.py
Original file line number Diff line number Diff line change
@@ -1,174 +1,240 @@
"""Load an archive of Zeiberg calibration style calibrations into Score Sets.

This script processes JSON calibration files from an archive directory and applies them
to MaveDB Score Sets based on a dataset mapping file. The script iterates through all
JSON files in the archive directory, extracts dataset names from filenames, looks up
corresponding Score Set URNs in the mapping file, and creates ACMG-style functional
range calibrations for each Score Set.

Args:
archive_path (str): Path to directory containing calibration JSON files
dataset_map (str): Path to JSON file mapping dataset names to Score Set URNs
overwrite (bool): Whether to overwrite existing "Zeiberg calibration" entries

Input File Formats:

1. Archive Directory Structure:
- Contains JSON files named after datasets (e.g., "data_set_name.json")
- May include "_clinvar_2018" variant files (e.g., "data_set_name_clinvar_2018.json")
- Script automatically detects and processes both variants

2. Calibration JSON File Format:
{
"prior": 0.01548246603645654,
"point_ranges": {
"1": [[[0.7222, 0.9017]]], // BS3 Supporting (-1 to 1 points)
"2": [[[0.9017, 1.1315]]], // BS3 Moderate (-2 to 2 points)
"3": [[[1.1315, 5.3892]]], // BS3 Moderate+ (-3 to 3 points)
"4": [], // BS3 Strong (-4 to 4 points)
"-1": [[[-0.6934, -0.3990]]], // PS3 Supporting
"-2": [[[-6.5761, -0.6934]]], // PS3 Moderate
// ... other point values (-8 to 8)
},
"dataset": "data_set_name",
"relax": false,
"n_c": "2c",
"benign_method": "benign",
"clinvar_2018": false
}

3. Dataset Mapping JSON File Format:
{
"data_set_name": "urn:mavedb:00000050-a-1",
"data_set_with_urn_list": "urn:mavedb:00000060-a-1, urn:mavedb:00000060-a-2",
// ... more dataset mappings
}

Behavior:

1. File Discovery: Scans archive directory for all .json files
2. Dataset Name Extraction: Removes .json extension and _clinvar_2018 suffix
3. Mapping Lookup: Finds Score Set URNs for each dataset in mapping file
4. URN Processing: Handles comma-separated URN lists for datasets with multiple Score Sets
5. Calibration Creation: Creates functional ranges with ACMG classifications:
- Positive points (1-8): PS3 classifications for "abnormal" variants
- Negative points (-1 to -8): BS3 classifications for "normal" variants
- Strength labels: Supporting (±1), Moderate (±2), Moderate+ (±3), Strong (±4-8)
6. File Variants: Automatically detects and processes both regular and ClinVar 2018 variants
7. Calibration Naming:
- Regular files: "Zeiberg calibration"
- ClinVar 2018 files: "Zeiberg calibration (ClinVar 2018)"

Skipping Behavior:
- Files with no mapping entry or empty/invalid URNs (N/A, #VALUE!, empty string)
- Score Sets that don't exist in the database
- JSON files that can't be parsed

Output Statistics:
- Total JSON files found in archive
- Number of calibrations created vs updated
- Number of unmapped files
- Number of non-existing Score Sets

Example Usage:
python load_pp_style_calibration.py /path/to/calibrations_archive /path/to/dataset_mapping.json
python load_pp_style_calibration.py /path/to/calibrations_archive /path/to/dataset_mapping.json --overwrite
"""

import asyncio
import json
import math
from typing import Any, Callable, Dict, List, Optional
import os
from typing import Dict, List, Optional

import click
from sqlalchemy.orm import Session

from mavedb.lib.score_calibrations import create_score_calibration_in_score_set
from mavedb.models.score_calibration import ScoreCalibration
from mavedb.models.score_set import ScoreSet
from mavedb.models.user import User
from mavedb.scripts.environment import with_database_session
from mavedb.view_models.acmg_classification import ACMGClassificationCreate
from mavedb.view_models.score_calibration import FunctionalRangeCreate, ScoreCalibrationCreate
from mavedb.view_models import acmg_classification, score_calibration

# Evidence strength ordering definitions
PATH_STRENGTHS: List[int] = [1, 2, 3, 4, 8]
BENIGN_STRENGTHS: List[int] = [-1, -2, -3, -4, -8]
POINT_LABEL_MAPPINGS: Dict[int, str] = {
8: "Very Strong",
7: "Very Strong",
6: "Very Strong",
5: "Very Strong",
4: "Strong",
3: "Moderate+",
2: "Moderate",
1: "Supporting",
}

ALL_POINT_LABEL_MAPPINGS = {**POINT_LABEL_MAPPINGS, **{k * -1: v for k, v in POINT_LABEL_MAPPINGS.items()}}

def _not_nan(v: Any) -> bool:
return v is not None and not (isinstance(v, float) and math.isnan(v))

@click.command()
@with_database_session
@click.argument("archive_path", type=click.Path(exists=True, file_okay=False))
@click.argument("dataset_map", type=click.Path(exists=True, dir_okay=False))
@click.option("--overwrite", is_flag=True, default=False, help="Overwrite existing `Zeiberg calibration` in score set")
def main(db: Session, archive_path: str, dataset_map: str, overwrite: bool) -> None:
"""Load an archive of Zeiberg calibration style calibrations into Score Sets"""
with open(dataset_map, "r") as f:
dataset_mapping: Dict[str, str] = json.load(f)

system_user: User = db.query(User).filter(User.id == 1).one()

# Get all JSON files in the archive directory
json_files = [f for f in os.listdir(archive_path) if f.endswith(".json")]
total_json_files = len(json_files)

created_calibrations = 0
updated_calibrations = 0
non_existing_score_sets = 0
unmapped_files = []

click.echo(f"Found {total_json_files} JSON files in archive directory: {archive_path}")

for json_file in json_files:
with open(os.path.join(archive_path, json_file), "r") as f:
calibration_data = json.load(f)
dataset_name = calibration_data.get("dataset", None)
click.echo(f"Processing calibration file: {json_file} (dataset: {dataset_name})")

if not dataset_name:
click.echo(f" Dataset name not found in calibration file {json_file}, skipping...", err=True)
unmapped_files.append(json_file)
continue

def _collapse_duplicate_thresholds(m: dict[int, Optional[float]], comparator: Callable) -> dict[int, float]:
collapsed: dict[int, float] = {}
# Look up dataset in mapping
if "_clinvar_2018" in json_file:
dataset_name = dataset_name.replace("_clinvar_2018", "")

for strength, threshold in m.items():
if threshold is None:
score_set_urns_str = dataset_mapping.get(dataset_name)
if not score_set_urns_str or score_set_urns_str in ["", "N/A", "#VALUE!"]:
click.echo(f" Dataset {dataset_name} not found in mapping or has no URNs, skipping...", err=True)
unmapped_files.append(json_file)
continue

if threshold in collapsed.values():
# If the value is already present, we need to find the key it's associated with
current_strongest_strength = next(s for s, t in collapsed.items() if t == threshold)

# If the keys are different, we need to merge them. Keep the strongest one as decided
# by the provided comparator.
if current_strongest_strength != strength:
new_strongest_evidence = comparator(current_strongest_strength, strength)
collapsed.pop(current_strongest_strength)
collapsed[new_strongest_evidence] = threshold

else:
collapsed[strength] = threshold

return collapsed


def build_pathogenic_ranges(thresholds: List[Optional[float]], inverted: bool) -> List[FunctionalRangeCreate]:
raw_mapping = {
strength: thresholds[idx]
for idx, strength in enumerate(PATH_STRENGTHS)
if idx < len(thresholds) and _not_nan(thresholds[idx])
}
mapping = _collapse_duplicate_thresholds(raw_mapping, max)

# Only retain strengths if they are in the mapping. In inverted mode, upper is 'more pathogenic', which is
# the opposite of how the pathogenic ranges are given to us. Therefore if the inverted flag is false, we must reverse the
# order in which we handle ranges.
available = [s for s in PATH_STRENGTHS if s in mapping]
ordering = available[::-1] if not inverted else available

ranges: List[FunctionalRangeCreate] = []
for i, s in enumerate(ordering):
lower: Optional[float]
upper: Optional[float]

if inverted:
lower = mapping[s]
upper = mapping[ordering[i + 1]] if i + 1 < len(ordering) else None
else:
lower = None if i == 0 else mapping[ordering[i - 1]]
upper = mapping[s]

ranges.append(
FunctionalRangeCreate(
label=str(s),
classification="abnormal",
range=(lower, upper),
# Whichever bound interacts with infinity will always be exclusive, with the opposite always inclusive.
inclusive_lower_bound=False if not inverted else True,
inclusive_upper_bound=False if inverted else True,
acmg_classification=ACMGClassificationCreate(points=s),
# Handle comma-separated list of score set URNs
score_set_urns = [urn.strip() for urn in score_set_urns_str.split(",") if urn.strip()]

# Process each score set URN for this calibration file
for score_set_urn in score_set_urns:
click.echo(f" Applying calibration to Score Set {score_set_urn}...")

score_set: Optional[ScoreSet] = db.query(ScoreSet).filter(ScoreSet.urn == score_set_urn).one_or_none()
if not score_set:
click.echo(f" Score Set with URN {score_set_urn} not found, skipping...", err=True)
non_existing_score_sets += 1
continue

# Determine calibration name based on file name
if "_clinvar_2018" in json_file:
calibration_name = "Zeiberg calibration (ClinVar 2018)"
else:
calibration_name = "Zeiberg calibration"

existing_calibration = None
if overwrite:
existing_calibration = (
db.query(ScoreCalibration)
.filter(ScoreCalibration.score_set_id == score_set.id)
.filter(ScoreCalibration.title == calibration_name)
.one_or_none()
)

if existing_calibration:
db.delete(existing_calibration)
db.flush()
click.echo(f" Overwriting existing '{calibration_name}' in Score Set {score_set.urn}")

functional_ranges: List[score_calibration.FunctionalRangeCreate] = []
for points, range_data in calibration_data.get("point_ranges", {}).items():
if not range_data:
continue

range_data = tuple(float(bound) for bound in range_data[0])
points = int(points.strip())
ps_or_bs = "PS3" if points > 0 else "BS3"
strength_label = ALL_POINT_LABEL_MAPPINGS.get(points, "Unknown")

functional_range = score_calibration.FunctionalRangeCreate(
label=f"{ps_or_bs} {strength_label} ({points})",
classification="abnormal" if points > 0 else "normal",
range=range_data,
acmg_classification=acmg_classification.ACMGClassificationCreate(
points=int(points),
),
inclusive_lower_bound=True,
inclusive_upper_bound=False,
)
functional_ranges.append(functional_range)

score_calibration_create = score_calibration.ScoreCalibrationCreate(
title=calibration_name,
functional_ranges=functional_ranges,
research_use_only=True,
score_set_urn=score_set.urn,
calibration_metadata={"prior_probability_pathogenicity": calibration_data.get("prior", None)},
)
)
return ranges


def build_benign_ranges(thresholds: List[Optional[float]], inverted: bool) -> List[FunctionalRangeCreate]:
raw_mapping = {
strength: thresholds[idx]
for idx, strength in enumerate(BENIGN_STRENGTHS)
if idx < len(thresholds) and _not_nan(thresholds[idx])
}
mapping = _collapse_duplicate_thresholds(raw_mapping, min)

# Only retain strengths if they are in the mapping. In inverted mode, lower is 'more normal', which is
# how the benign ranges are given to us. Therefore if the inverted flag is false, we must reverse the
# order in which we handle ranges.
available = [s for s in BENIGN_STRENGTHS if s in mapping]
ordering = available[::-1] if inverted else available

ranges: List[FunctionalRangeCreate] = []
for i, s in enumerate(ordering):
lower: Optional[float]
upper: Optional[float]

if not inverted:
lower = mapping[s]
upper = mapping[ordering[i + 1]] if i + 1 < len(ordering) else None
else:
lower = None if i == 0 else mapping[ordering[i - 1]]
upper = mapping[s]

ranges.append(
FunctionalRangeCreate(
label=str(s),
classification="normal",
range=(lower, upper),
# Whichever bound interacts with infinity will always be exclusive, with the opposite always inclusive.
inclusive_lower_bound=False if inverted else True,
inclusive_upper_bound=False if not inverted else True,
acmg_classification=ACMGClassificationCreate(points=s),

new_calibration_object = asyncio.run(
create_score_calibration_in_score_set(db, score_calibration_create, system_user)
)
)
return ranges
new_calibration_object.primary = False
new_calibration_object.private = False
db.add(new_calibration_object)

click.echo(f" Successfully created calibration '{calibration_name}' for Score Set {score_set.urn}")
db.flush()

@click.command()
@with_database_session
@click.argument("json_path", type=click.Path(exists=True, dir_okay=False, readable=True))
@click.argument("score_set_urn", type=str)
@click.argument("public", default=False, type=bool)
@click.argument("user_id", type=int)
def main(db: Session, json_path: str, score_set_urn: str, public: bool, user_id: int) -> None:
"""Load pillar project calibration JSON into a score set's zeiberg_calibration score ranges."""
score_set: Optional[ScoreSet] = db.query(ScoreSet).filter(ScoreSet.urn == score_set_urn).one_or_none()
if not score_set:
raise click.ClickException(f"Score set with URN {score_set_urn} not found")

user: Optional[User] = db.query(User).filter(User.id == user_id).one_or_none()
if not user:
raise click.ClickException(f"User with ID {user_id} not found")

with open(json_path, "r") as fh:
data: Dict[str, Any] = json.load(fh)

path_thresholds = data.get("final_pathogenic_thresholds") or []
benign_thresholds = data.get("final_benign_thresholds") or []
# Lower is 'more normal' in inverted mode
inverted = data.get("inverted") == "inverted"

path_ranges = build_pathogenic_ranges(path_thresholds, inverted)
benign_ranges = build_benign_ranges(benign_thresholds, inverted)

if not path_ranges and not benign_ranges:
raise click.ClickException("No valid thresholds found to build ranges.")

calibration_create = ScoreCalibrationCreate(
title="Zeiberg Calibration",
research_use_only=True,
primary=False,
investigator_provided=False,
private=not public,
functional_ranges=path_ranges + benign_ranges,
score_set_urn=score_set_urn,
if existing_calibration:
updated_calibrations += 1
else:
created_calibrations += 1

click.echo(
"---\n"
f"Created {created_calibrations} calibrations, updated {updated_calibrations} calibrations ({created_calibrations + updated_calibrations} total). Non-existing score sets: {non_existing_score_sets}."
)
calibration = create_score_calibration_in_score_set(db, calibration_create, user)
db.add(calibration)
click.echo(
f"Loaded {len(path_ranges)} pathogenic and {len(benign_ranges)} benign ranges into score set {score_set_urn} (inverted={inverted})."
f"{len(unmapped_files)} unmapped calibration files out of {total_json_files} files in archive. Unmapped files were:"
)
for unmapped_file in unmapped_files:
click.echo(f" - {unmapped_file}")


if __name__ == "__main__": # pragma: no cover
Expand Down