|
17 | 17 | import json |
18 | 18 | import re |
19 | 19 | import logging |
20 | | -from functools import lru_cache |
21 | | -from typing import Union, List, Dict, Optional, Any, BinaryIO, cast, Tuple |
| 20 | +import tempfile |
| 21 | +from contextlib import contextmanager |
| 22 | +from functools import lru_cache, partial |
| 23 | +from typing import Any, BinaryIO, Dict, Generator, List, Optional, Tuple, Union, cast |
22 | 24 |
|
23 | 25 | import pandas as pd |
24 | 26 | from pandera import Column, Check, DataFrameSchema |
25 | | -from pandera.errors import SchemaErrors |
| 27 | +from pandera.errors import SchemaError, SchemaErrors |
| 28 | + |
| 29 | +from edvise.dataio.read import read_raw_pdp_cohort_data, read_raw_pdp_course_data |
| 30 | +from edvise.utils.data_cleaning import handling_duplicates |
26 | 31 |
|
27 | 32 | from . import validation_pdp_edvise as pdp_edvise |
28 | 33 |
|
|
38 | 43 |
|
39 | 44 |
|
40 | 45 | def validate_file_reader( |
41 | | - filename: Union[str, os.PathLike[str], BinaryIO, io.TextIOWrapper], |
| 46 | + filename: Union[str, os.PathLike[str], BinaryIO, io.TextIOWrapper, io.StringIO], |
42 | 47 | allowed_schema: list[str], |
43 | 48 | base_schema: dict, |
44 | 49 | inst_schema: Optional[Dict[Any, Any]] = None, |
@@ -168,7 +173,7 @@ def get_extension_model_columns_only( |
168 | 173 | # Encoding sniffing (mypy-friendly) |
169 | 174 | # --------------------------------------------------------------------------- # |
170 | 175 |
|
171 | | -Src = Union[str, os.PathLike[str], BinaryIO, io.TextIOWrapper] |
| 176 | +Src = Union[str, os.PathLike[str], BinaryIO, io.TextIOWrapper, io.StringIO] |
172 | 177 |
|
173 | 178 |
|
174 | 179 | def _read_sample(buf: BinaryIO, n: int) -> bytes: |
@@ -711,6 +716,190 @@ def _compute_model_list_and_merged_specs( |
711 | 716 | return model_list, merged_specs |
712 | 717 |
|
713 | 718 |
|
| 719 | +# --------------------------------------------------------------------------- # |
| 720 | +# PDP: use edvise read + validate (single source of truth) |
| 721 | +# --------------------------------------------------------------------------- # |
| 722 | + |
| 723 | +# Datetime formats to try for PDP course (same order as pdp_data_audit) |
| 724 | +PDP_COURSE_DTTM_FORMATS = ("ISO8601", "%Y%m%d.0", "%Y%m%d") |
| 725 | + |
| 726 | + |
| 727 | +@contextmanager |
| 728 | +def _path_for_edvise_read(filename: Src, enc: str) -> Generator[str, None, None]: |
| 729 | + """ |
| 730 | + Yield a file path that edvise read_raw_pdp_* can use. |
| 731 | +
|
| 732 | + If filename is a path, yield it. If file-like, read content, write to a temp |
| 733 | + file (utf-8), yield that path; the temp file is always removed on exit. |
| 734 | +
|
| 735 | + Args: |
| 736 | + filename: Path or file-like to read from. |
| 737 | + enc: Encoding used to decode file-like content before writing utf-8 temp. |
| 738 | +
|
| 739 | + Yields: |
| 740 | + Path to a CSV file (original or temp). |
| 741 | +
|
| 742 | + Raises: |
| 743 | + HardValidationError: If file-like read fails (with failure_cases=[str(e)]). |
| 744 | + """ |
| 745 | + if isinstance(filename, (str, os.PathLike)): |
| 746 | + yield str(filename) |
| 747 | + return |
| 748 | + try: |
| 749 | + raw = filename.read() |
| 750 | + except Exception as e: |
| 751 | + # Intentionally broad: any read failure becomes HardValidationError for API. |
| 752 | + logger.error("Could not read file for validation: %s", e, exc_info=True) |
| 753 | + raise HardValidationError( |
| 754 | + schema_errors="Could not read file for validation.", |
| 755 | + failure_cases=[str(e)], |
| 756 | + ) from e |
| 757 | + if isinstance(raw, bytes): |
| 758 | + raw = raw.decode(enc) |
| 759 | + fd, path = tempfile.mkstemp(suffix=".csv") |
| 760 | + try: |
| 761 | + os.write(fd, raw.encode("utf-8")) |
| 762 | + except Exception: |
| 763 | + try: |
| 764 | + os.unlink(path) |
| 765 | + except OSError: |
| 766 | + pass |
| 767 | + raise |
| 768 | + finally: |
| 769 | + os.close(fd) |
| 770 | + try: |
| 771 | + yield path |
| 772 | + finally: |
| 773 | + try: |
| 774 | + os.unlink(path) |
| 775 | + except OSError: |
| 776 | + pass |
| 777 | + |
| 778 | + |
| 779 | +def _read_pdp_course_edvise(path: str) -> pd.DataFrame: |
| 780 | + """ |
| 781 | + Read and validate PDP course data via edvise (same as pipeline). |
| 782 | +
|
| 783 | + Tries each datetime format with each converter: first |
| 784 | + handling_duplicates(..., school_type="pdp"), then handling_duplicates(df) |
| 785 | + for older edvise. Raises HardValidationError if all attempts fail. |
| 786 | +
|
| 787 | + Args: |
| 788 | + path: Path to course CSV. |
| 789 | +
|
| 790 | + Returns: |
| 791 | + Validated DataFrame (same as pipeline output). |
| 792 | +
|
| 793 | + Raises: |
| 794 | + HardValidationError: If no (converter, format) pair succeeded. |
| 795 | + """ |
| 796 | + converters = ( |
| 797 | + partial(handling_duplicates, school_type="pdp"), |
| 798 | + handling_duplicates, |
| 799 | + ) |
| 800 | + last_error: Optional[Exception] = None |
| 801 | + for converter in converters: |
| 802 | + for fmt in PDP_COURSE_DTTM_FORMATS: |
| 803 | + try: |
| 804 | + return read_raw_pdp_course_data( |
| 805 | + file_path=path, |
| 806 | + schema=pdp_edvise.get_edvise_schema_for_models(["COURSE"]), |
| 807 | + dttm_format=fmt, |
| 808 | + converter_func=converter, |
| 809 | + spark_session=None, |
| 810 | + ) |
| 811 | + except ValueError as e: |
| 812 | + last_error = e |
| 813 | + except TypeError as e: |
| 814 | + if "school_type" in str(e): |
| 815 | + last_error = None |
| 816 | + break |
| 817 | + raise |
| 818 | + error_message = ( |
| 819 | + "Course data did not parse with any known datetime format." |
| 820 | + if last_error is not None |
| 821 | + else "Course validation failed (datetime format or schema)." |
| 822 | + ) |
| 823 | + validation_error = HardValidationError( |
| 824 | + schema_errors=error_message, |
| 825 | + failure_cases=[str(last_error)] if last_error else [], |
| 826 | + ) |
| 827 | + logger.error( |
| 828 | + "PDP course validation failed: path=%s, last_error=%s", |
| 829 | + path, |
| 830 | + last_error, |
| 831 | + ) |
| 832 | + if last_error is not None: |
| 833 | + raise validation_error from last_error |
| 834 | + raise validation_error |
| 835 | + |
| 836 | + |
| 837 | +def _validate_pdp_with_edvise_read( |
| 838 | + filename: Src, |
| 839 | + enc: str, |
| 840 | + model_list: List[str], |
| 841 | + institution_id: str, |
| 842 | +) -> Dict[str, Any]: |
| 843 | + """ |
| 844 | + Validate PDP cohort or course via edvise read + schema (same as pipeline). |
| 845 | +
|
| 846 | + Resolves filename to a path (temp file if file-like), then calls |
| 847 | + read_raw_pdp_cohort_data or read_raw_pdp_course_data. Converts Pandera |
| 848 | + SchemaErrors to HardValidationError for API/formatter consistency. |
| 849 | +
|
| 850 | + Args: |
| 851 | + filename: Path or file-like to CSV. |
| 852 | + enc: Encoding (from sniff_encoding) for file-like decode. |
| 853 | + model_list: Single model, e.g. ["STUDENT"] or ["COURSE"]. |
| 854 | + institution_id: Institution schema key (e.g. "pdp"). |
| 855 | +
|
| 856 | + Returns: |
| 857 | + Dict with validation_status, schemas, missing_optional, |
| 858 | + unknown_extra_columns, normalized_df. |
| 859 | +
|
| 860 | + Raises: |
| 861 | + HardValidationError: If read/schema fails (or SchemaErrors converted). |
| 862 | + """ |
| 863 | + _reset_to_start_if_possible(filename) |
| 864 | + model_set = {str(m).strip().upper() for m in model_list if m} |
| 865 | + |
| 866 | + with _path_for_edvise_read(filename, enc) as path: |
| 867 | + try: |
| 868 | + if model_set == {"STUDENT"}: |
| 869 | + df = read_raw_pdp_cohort_data( |
| 870 | + file_path=path, |
| 871 | + schema=pdp_edvise.get_edvise_schema_for_models(["STUDENT"]), |
| 872 | + converter_func=None, |
| 873 | + spark_session=None, |
| 874 | + ) |
| 875 | + elif model_set == {"COURSE"}: |
| 876 | + df = _read_pdp_course_edvise(path) |
| 877 | + else: |
| 878 | + raise HardValidationError( |
| 879 | + schema_errors=f"PDP single-model expected; got models={model_list}", |
| 880 | + failure_cases=[], |
| 881 | + ) |
| 882 | + |
| 883 | + return { |
| 884 | + "validation_status": "passed", |
| 885 | + "schemas": model_list, |
| 886 | + "missing_optional": [], |
| 887 | + "unknown_extra_columns": [], |
| 888 | + "normalized_df": df, |
| 889 | + } |
| 890 | + except (SchemaErrors, SchemaError) as e: |
| 891 | + logger.error( |
| 892 | + "PDP edvise schema validation failed: model_set=%s, error=%s", |
| 893 | + model_set, |
| 894 | + e, |
| 895 | + exc_info=True, |
| 896 | + ) |
| 897 | + hard = pdp_edvise._convert_schema_errors_to_hard_validation_error( |
| 898 | + e, raw_to_canon={}, canon_to_raw={}, merged_specs={} |
| 899 | + ) |
| 900 | + raise hard from e |
| 901 | + |
| 902 | + |
714 | 903 | # --------------------------------------------------------------------------- # |
715 | 904 | # Main validation |
716 | 905 | # --------------------------------------------------------------------------- # |
@@ -750,6 +939,10 @@ def validate_dataset( |
750 | 939 | "normalized_df": None, |
751 | 940 | } |
752 | 941 |
|
| 942 | + # PDP single-model: use edvise read + validate (same as pipeline) |
| 943 | + if pdp_edvise.get_edvise_schema_for_upload(institution_id, model_list) is not None: |
| 944 | + return _validate_pdp_with_edvise_read(filename, enc, model_list, institution_id) |
| 945 | + |
753 | 946 | ( |
754 | 947 | raw_to_canon, |
755 | 948 | canon_to_raw, |
|
0 commit comments