|
1 | 1 | """Utilities for building and mutating score calibration ORM objects.""" |
2 | 2 |
|
| 3 | +import math |
3 | 4 | from typing import Union |
4 | 5 |
|
| 6 | +from sqlalchemy import Float, and_, select |
5 | 7 | from sqlalchemy.orm import Session |
6 | 8 |
|
7 | 9 | from mavedb.lib.acmg import find_or_create_acmg_classification |
8 | 10 | from mavedb.lib.identifiers import find_or_create_publication_identifier |
| 11 | +from mavedb.lib.validation.utilities import inf_or_float |
9 | 12 | from mavedb.models.enums.score_calibration_relation import ScoreCalibrationRelation |
10 | 13 | from mavedb.models.score_calibration import ScoreCalibration |
11 | 14 | from mavedb.models.score_calibration_functional_classification import ScoreCalibrationFunctionalClassification |
12 | 15 | from mavedb.models.score_calibration_publication_identifier import ScoreCalibrationPublicationIdentifierAssociation |
13 | 16 | from mavedb.models.score_set import ScoreSet |
14 | 17 | from mavedb.models.user import User |
| 18 | +from mavedb.models.variant import Variant |
15 | 19 | from mavedb.view_models import score_calibration |
16 | 20 |
|
17 | 21 |
|
@@ -67,6 +71,9 @@ def create_functional_classification( |
67 | 71 | calibration=containing_calibration, |
68 | 72 | ) |
69 | 73 |
|
| 74 | + contained_variants = variants_for_functional_classification(db, functional_classification, use_sql=True) |
| 75 | + functional_classification.variants = contained_variants |
| 76 | + |
70 | 77 | return functional_classification |
71 | 78 |
|
72 | 79 |
|
@@ -598,3 +605,102 @@ def delete_score_calibration(db: Session, calibration: ScoreCalibration) -> None |
598 | 605 |
|
599 | 606 | db.delete(calibration) |
600 | 607 | return None |
| 608 | + |
| 609 | + |
| 610 | +def variants_for_functional_classification( |
| 611 | + db: Session, |
| 612 | + functional_classification: ScoreCalibrationFunctionalClassification, |
| 613 | + use_sql: bool = False, |
| 614 | +) -> list[Variant]: |
| 615 | + """Return variants in the parent score set whose numeric score falls inside the |
| 616 | + functional classification's range. |
| 617 | +
|
| 618 | + The variant score is extracted from the JSONB ``Variant.data`` field using |
| 619 | + ``score_json_path`` (default: ("score_data", "score") meaning |
| 620 | + ``variant.data['score_data']['score']``). The classification's existing |
| 621 | + ``score_is_contained_in_range`` method is used for interval logic, including |
| 622 | + inclusive/exclusive behaviors. |
| 623 | +
|
| 624 | + Parameters |
| 625 | + ---------- |
| 626 | + db : Session |
| 627 | + Active SQLAlchemy session. |
| 628 | + functional_classification : ScoreCalibrationFunctionalClassification |
| 629 | + The ORM row defining the interval to test against. |
| 630 | + use_sql : bool |
| 631 | + When True, perform filtering in the database using JSONB extraction and |
| 632 | + range predicates; falls back to Python filtering if an error occurs. |
| 633 | +
|
| 634 | + Returns |
| 635 | + ------- |
| 636 | + list[Variant] |
| 637 | + Variants whose score falls within the specified range. Empty list if |
| 638 | + classification has no usable range. |
| 639 | +
|
| 640 | + Notes |
| 641 | + ----- |
| 642 | + * If use_sql=False (default) filtering occurs in Python after loading all |
| 643 | + variants for the score set. For large sets set use_sql=True to push |
| 644 | + comparison into Postgres. |
| 645 | + * Variants lacking a score or with non-numeric scores are skipped. |
| 646 | + * If ``functional_classification.range`` is ``None`` an empty list is |
| 647 | + returned immediately. |
| 648 | + """ |
| 649 | + if not functional_classification.range: |
| 650 | + return [] |
| 651 | + |
| 652 | + # Resolve score set id from attached calibration (relationship may be lazy) |
| 653 | + score_set_id = functional_classification.calibration.score_set_id # type: ignore[attr-defined] |
| 654 | + |
| 655 | + if use_sql: |
| 656 | + try: |
| 657 | + # Build score extraction expression: data['score_data']['score']::text::float |
| 658 | + score_expr = Variant.data["score_data"]["score"].astext.cast(Float) |
| 659 | + |
| 660 | + lower_raw, upper_raw = functional_classification.range |
| 661 | + |
| 662 | + # Convert 'inf' sentinels (or None) to float infinities for condition omission. |
| 663 | + lower_bound = inf_or_float(lower_raw, lower=True) |
| 664 | + upper_bound = inf_or_float(upper_raw, lower=False) |
| 665 | + |
| 666 | + conditions = [Variant.score_set_id == score_set_id] |
| 667 | + if not math.isinf(lower_bound): |
| 668 | + if functional_classification.inclusive_lower_bound: |
| 669 | + conditions.append(score_expr >= lower_bound) |
| 670 | + else: |
| 671 | + conditions.append(score_expr > lower_bound) |
| 672 | + if not math.isinf(upper_bound): |
| 673 | + if functional_classification.inclusive_upper_bound: |
| 674 | + conditions.append(score_expr <= upper_bound) |
| 675 | + else: |
| 676 | + conditions.append(score_expr < upper_bound) |
| 677 | + |
| 678 | + stmt = select(Variant).where(and_(*conditions)) |
| 679 | + return list(db.execute(stmt).scalars()) |
| 680 | + |
| 681 | + except Exception: # noqa: BLE001 |
| 682 | + # Fall back to Python filtering if casting/JSON path errors occur. |
| 683 | + pass |
| 684 | + |
| 685 | + # Python filtering fallback / default path |
| 686 | + variants = db.execute(select(Variant).where(Variant.score_set_id == score_set_id)).scalars().all() |
| 687 | + matches: list[Variant] = [] |
| 688 | + for v in variants: |
| 689 | + try: |
| 690 | + container = v.data.get("score_data") if isinstance(v.data, dict) else None |
| 691 | + if not container or not isinstance(container, dict): |
| 692 | + continue |
| 693 | + |
| 694 | + raw = container.get("score") |
| 695 | + if raw is None: |
| 696 | + continue |
| 697 | + |
| 698 | + score = float(raw) |
| 699 | + |
| 700 | + except Exception: # noqa: BLE001 |
| 701 | + continue |
| 702 | + |
| 703 | + if functional_classification.score_is_contained_in_range(score): |
| 704 | + matches.append(v) |
| 705 | + |
| 706 | + return matches |
0 commit comments