diff --git a/.gitignore b/.gitignore index 515eb709..c4dbe66a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ __pycache__/ .mypy_cache/ .pytest_cache/ .ruff_cache/ +.prek-cache/ .DS_Store dev_no_commit.py diff --git a/data/resstock/Justfile b/data/resstock/Justfile index 1f35a8d3..c518fddd 100644 --- a/data/resstock/Justfile +++ b/data/resstock/Justfile @@ -7,6 +7,8 @@ # just -f data/resstock/Justfile resstock-identify-hp-customers NY # just -f data/resstock/Justfile resstock-identify-hp-customers RI # just -f data/resstock/Justfile assign-utility-ny res_2024_amy2018_2 00 +# just -f data/resstock/Justfile aggregate-hourly-loads NY "00 02" false ./dev_plots/agg coned +# just -f data/resstock/Justfile aggregate-hourly-loads NY "00 02" true ./dev_plots/agg path_local_repo := `git rev-parse --show-toplevel` resstock_release := "res_2024_amy2018_2" @@ -258,6 +260,72 @@ adjust-mf-electricity-RI-upgrade-00: adjust-mf-electricity-RI-upgrade-02: just adjust-mf-electricity RI res_2024_amy2018_2 res_2024_amy2018_2_sb "02" +# ============================================================================= +# Aggregate hourly electric load (MWh) — data/resstock/create_aggregate_hourly_loads.py +# ============================================================================= +# +# Reads ``load_curve_hourly`` on S3 for ``resstock_release_sb`` and +# ``metadata_utility/.../utility_assignment.parquet``. +# +# - customer_class **false**: one file per upgrade per utility (requires +# --electric-utility or ``all`` for every utility in metadata). +# - customer_class **true**: separate HP vs non-HP cohorts per upgrade; omit +# electric_utility for state-wide cohorts, or use ``all`` for every utility. +# +# Example: +# just -f data/resstock/Justfile aggregate-hourly-loads NY "00 02" false ./dev_plots/agg coned +# just -f data/resstock/Justfile aggregate-hourly-loads NY "00 02" false ./dev_plots/agg all +# just -f data/resstock/Justfile aggregate-hourly-loads RI 02 true ./dev_plots/agg +# just -f data/resstock/Justfile aggregate-hourly-loads-NY-split-hp-coned ./dev_plots/agg +# state: NY or RI; upgrades: space-separated e.g. 00 02; customer_class: true | false; +# path_output_dir: directory for Parquet outputs (local path); electric_utility: optional + +# when customer_class is true (omit for state-wide HP vs non-HP). +aggregate-hourly-loads state upgrades customer_class path_output_dir electric_utility="": + #!/usr/bin/env bash + set -euo pipefail + cd "{{ path_local_repo }}" + cmd=( + uv run python "{{ path_local_repo }}/data/resstock/create_aggregate_hourly_loads.py" + --resstock-base "{{ path_s3_parquet }}/{{ resstock_release_sb }}" + --state "{{ state }}" + --upgrades "{{ upgrades }}" + --customer-class "{{ customer_class }}" + --path-output-dir "{{ path_output_dir }}" + ) + if [[ -n "{{ electric_utility }}" ]]; then + cmd+=(--electric-utility "{{ electric_utility }}") + fi + "${cmd[@]}" + +# Shortcut: ConEd, utility-only aggregate (customer_class false). +aggregate-hourly-loads-NY-utility-coned path_output_dir upgrades="00 02": + just -f "{{ path_local_repo }}/data/resstock/Justfile" aggregate-hourly-loads NY "{{ upgrades }}" false "{{ path_output_dir }}" coned + +# Shortcut: NY ISO utilities — HP vs non-HP for one utility. +aggregate-hourly-loads-NY-split-hp-coned path_output_dir upgrades="00 02": + just -f "{{ path_local_repo }}/data/resstock/Justfile" aggregate-hourly-loads NY "{{ upgrades }}" true "{{ path_output_dir }}" coned + +# Shortcut: state-wide HP vs non-HP (no electric-utility filter). +aggregate-hourly-loads-NY-split-hp-statewide path_output_dir upgrades="00 02": + just -f "{{ path_local_repo }}/data/resstock/Justfile" aggregate-hourly-loads NY "{{ upgrades }}" true "{{ path_output_dir }}" + +# Shortcut: every NY utility — utility-only totals (one parquet per utility per upgrade). +aggregate-hourly-loads-NY-all-utilities-utility-only path_output_dir upgrades="00 02": + just -f "{{ path_local_repo }}/data/resstock/Justfile" aggregate-hourly-loads NY "{{ upgrades }}" false "{{ path_output_dir }}" all + +# Shortcut: every NY utility — HP vs non-HP cohorts per utility. +aggregate-hourly-loads-NY-all-utilities-split-hp path_output_dir upgrades="00 02": + just -f "{{ path_local_repo }}/data/resstock/Justfile" aggregate-hourly-loads NY "{{ upgrades }}" true "{{ path_output_dir }}" all + +# Shortcut: RIE, utility-only aggregate. +aggregate-hourly-loads-RI-utility-rie path_output_dir upgrades="00 02": + just -f "{{ path_local_repo }}/data/resstock/Justfile" aggregate-hourly-loads RI "{{ upgrades }}" false "{{ path_output_dir }}" rie + +# Shortcut: RIE — HP vs non-HP cohorts for that utility. +aggregate-hourly-loads-RI-split-hp-rie path_output_dir upgrades="00 02": + just -f "{{ path_local_repo }}/data/resstock/Justfile" aggregate-hourly-loads RI "{{ upgrades }}" true "{{ path_output_dir }}" rie + path_ebs_parquet := "/ebs/data/nrel/resstock" add-monthly-loads state upgrade_ids: diff --git a/data/resstock/create_aggregate_hourly_loads.py b/data/resstock/create_aggregate_hourly_loads.py new file mode 100644 index 00000000..20ec6008 --- /dev/null +++ b/data/resstock/create_aggregate_hourly_loads.py @@ -0,0 +1,586 @@ +"""Build aggregate hourly electricity profiles (MWh) from ResStock loads. + +Filters buildings using ``utility_assignment.parquet`` (``sb.electric_utility``, +``postprocess_group.has_hp``, ``weight``), then sums hourly loads per upgrade. + +**``--customer-class``** (required): ``true`` or ``false``. + +- **true:** Split aggregation into **HP** vs **non-HP** cohorts (two outputs per + upgrade). Optional ``--electric-utility`` narrows both cohorts to that utility; + if omitted, cohorts are **state-wide**. +- **false:** Single aggregate **by utility only** (all buildings in that utility, + ignoring HP). **Requires** ``--electric-utility`` (use ``all`` to run once per + unique ``sb.electric_utility`` in the metadata file). + +kWh values are converted to MWh by dividing by 1000 after the weighted or +unweighted sum. + +**``--electric-utility all``** runs the same aggregation for **each** distinct +``sb.electric_utility`` in ``utility_assignment.parquet`` (sorted). With +``--customer-class true``, each utility gets separate HP and non-HP outputs. + +Usage:: + + # Utility total (HP + non-HP combined) + uv run python data/resstock/create_aggregate_hourly_loads.py \\ + --nrel-root s3://data.sb/nrel/resstock \\ + --release res_2024_amy2018_2_sb \\ + --state NY \\ + --upgrades 00 02 \\ + --electric-utility coned \\ + --customer-class false \\ + --path-output-dir /tmp/agg_loads + + # Separate HP vs non-HP for one utility + uv run python data/resstock/create_aggregate_hourly_loads.py \\ + --resstock-base s3://data.sb/nrel/resstock/res_2024_amy2018_2_sb \\ + --state NY --upgrades 02 \\ + --electric-utility coned \\ + --customer-class true \\ + --path-output-dir /tmp/agg_loads + + # State-wide HP vs non-HP (no utility filter) + uv run python data/resstock/create_aggregate_hourly_loads.py \\ + --resstock-base s3://data.sb/nrel/resstock/res_2024_amy2018_2_sb \\ + --state RI --upgrades 02 \\ + --customer-class true \\ + --path-output-dir /tmp/agg_loads + + # Every utility in the state (utility totals, one file per utility per upgrade) + uv run python data/resstock/create_aggregate_hourly_loads.py \\ + --state NY --upgrades 00 02 \\ + --electric-utility all \\ + --customer-class false \\ + --path-output-dir /tmp/agg_loads \\ + --nrel-root s3://data.sb/nrel/resstock \\ + --release res_2024_amy2018_2_sb +""" + +from __future__ import annotations + +import argparse +import logging +import re +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from typing import cast + +import polars as pl +import s3fs + +from utils import get_aws_region +from utils.loads import ( + BLDG_ID_COL, + ELECTRIC_LOAD_COL, + LOAD_CURVE_HOURLY_SUBDIR, +) + +log = logging.getLogger(__name__) + +_UA_UTILITY_COL = "sb.electric_utility" +_UA_HP_COL = "postprocess_group.has_hp" +_WEIGHT_COL = "weight" + +# Cap parallel parquet readers (I/O bound; avoids huge thread counts on wide folders). +_MAX_PARQUET_READ_WORKERS = 256 + + +def _normalize_upgrade(u: str) -> str: + s = u.strip() + if s.isdigit(): + return s.zfill(2) + return s + + +def _parse_upgrades(s: str) -> list[str]: + return [_normalize_upgrade(x) for x in s.split()] + + +def _upgrade_partition_dir(resstock_base: str, state: str, upgrade: str) -> str: + base = resstock_base.rstrip("/") + st = state.strip().upper() + up = _normalize_upgrade(upgrade) + return f"{base}/{LOAD_CURVE_HOURLY_SUBDIR}state={st}/upgrade={up}" + + +def _glob_parquet_local(partition_dir: str) -> list[str]: + p = Path(partition_dir) + if not p.is_dir(): + return [] + return sorted(str(x) for x in p.glob("*.parquet")) + + +def _glob_parquet_s3(partition_dir: str, storage_options: dict[str, str]) -> list[str]: + """List ``*.parquet`` under an ``s3://`` partition directory.""" + rest = partition_dir.removeprefix("s3://") + bucket, _, key_prefix = rest.partition("/") + key_prefix = key_prefix.rstrip("/") + fs = _s3_fs(storage_options) + pattern = f"{bucket}/{key_prefix}/*.parquet" + keys = fs.glob(pattern) + return sorted(f"s3://{k}" for k in keys) + + +def _s3_fs(storage_options: dict[str, str]) -> s3fs.S3FileSystem: + region = storage_options.get("aws_region") + kwargs: dict = {} + if region: + kwargs["client_kwargs"] = {"region_name": region} + return s3fs.S3FileSystem(**kwargs) + + +def _constructed_parquet_paths( + partition_dir: str, bldg_ids: list[int], upgrade: str +) -> list[str]: + """One path per building: ``{bldg_id}-{int(upgrade)}.parquet`` (ResStock on-disk layout).""" + up_int = int(_normalize_upgrade(upgrade)) + root = partition_dir.rstrip("/") + return [f"{root}/{bid}-{up_int}.parquet" for bid in bldg_ids] + + +def _parquet_paths_for_partition( + partition_dir: str, + bldg_ids: list[int], + upgrade: str, + storage_options: dict[str, str], +) -> list[str]: + """Prefer directory glob (chunk files, tests); else per-building filenames (typical S3 layout).""" + if partition_dir.startswith("s3://"): + paths = _glob_parquet_s3(partition_dir, storage_options) + else: + paths = _glob_parquet_local(partition_dir) + if paths: + return paths + return _constructed_parquet_paths(partition_dir, bldg_ids, upgrade) + + +def _storage_options_for_path( + path: str, storage_options: dict[str, str] +) -> dict[str, str] | None: + return storage_options if path.startswith("s3://") else None + + +def _partial_aggregate_one_parquet( + path: str, + *, + allowed_bldg_ids: frozenset[int], + load_col: str, + weighted: bool, + weights_df: pl.DataFrame, + storage_options: dict[str, str], +) -> tuple[pl.DataFrame | None, int]: + """Read one parquet, filter to cohort, return partial ``load_mwh`` and cohort building count. + + The int is the number of distinct ``bldg_id`` values from the cohort present in + this file (for progress reporting). + """ + opts = _storage_options_for_path(path, storage_options) + try: + df = pl.read_parquet( + path, + columns=[BLDG_ID_COL, "timestamp", load_col], + storage_options=opts, + ) + except OSError: + log.debug("Skipping unreadable or missing parquet: %s", path) + return None, 0 + + df = df.filter(pl.col(BLDG_ID_COL).cast(pl.Int64).is_in(list(allowed_bldg_ids))) + if df.height == 0: + return None, 0 + + n_bldg_in_file = int(df[BLDG_ID_COL].n_unique()) + + if weighted: + df = df.join(weights_df, on=BLDG_ID_COL, how="inner") + expr_load = pl.col(load_col).cast(pl.Float64) * pl.col(_WEIGHT_COL).cast( + pl.Float64 + ) + else: + expr_load = pl.col(load_col).cast(pl.Float64) + + out = ( + df.with_columns((expr_load / 1000.0).alias("_mwh")) + .group_by("timestamp") + .agg(pl.col("_mwh").sum().alias("load_mwh")) + ) + return out, n_bldg_in_file + + +def _resstock_base(args: argparse.Namespace) -> str: + if args.resstock_base: + return str(args.resstock_base).rstrip("/") + if not args.release: + msg = "Provide --resstock-base or --release (with --nrel-root)" + raise ValueError(msg) + root = str(args.nrel_root).rstrip("/") + release = str(args.release).strip() + return f"{root}/{release}" + + +def _default_utility_assignment_path(resstock_base: str, state: str) -> str: + st = state.strip().upper() + return f"{resstock_base}/metadata_utility/state={st}/utility_assignment.parquet" + + +def list_unique_electric_utilities( + path_utility_assignment: str, + storage_options: dict[str, str], +) -> list[str]: + """Distinct ``sb.electric_utility`` values in the file, lowercased and sorted.""" + df = pl.read_parquet( + path_utility_assignment, + columns=[_UA_UTILITY_COL], + storage_options=storage_options, + ) + if _UA_UTILITY_COL not in df.columns: + msg = f"utility_assignment missing column {_UA_UTILITY_COL!r}" + raise ValueError(msg) + raw = df[_UA_UTILITY_COL].drop_nulls().unique().to_list() + out = sorted( + {str(v).strip().lower() for v in raw if str(v).strip()}, + ) + if not out: + msg = "No electric utility values found in utility_assignment" + raise ValueError(msg) + return out + + +def electric_utility_passes( + electric_utility: str | None, + *, + path_utility_assignment: str, + storage_options: dict[str, str], +) -> list[str | None]: + """Return one pass per utility, or ``[None]`` when no utility filter (state-wide).""" + if electric_utility is None: + return [None] + s = electric_utility.strip().lower() + if s == "all": + return cast( + list[str | None], + list_unique_electric_utilities( + path_utility_assignment, + storage_options=storage_options, + ), + ) + return [s] + + +def load_filtered_buildings( + path_utility_assignment: str, + *, + electric_utility: str | None, + has_hp_filter: bool | None, + storage_options: dict[str, str], +) -> pl.DataFrame: + """Return ``bldg_id``, ``weight`` filtered by utility and/or HP cohort. + + ``has_hp_filter``: ``None`` = all buildings (subject to utility filter); + ``True`` = HP only; ``False`` = non-HP only. + """ + if electric_utility is None and has_hp_filter is None: + msg = "Provide electric_utility and/or has_hp_filter" + raise ValueError(msg) + + ua = pl.read_parquet(path_utility_assignment, storage_options=storage_options) + for col in (BLDG_ID_COL, _WEIGHT_COL, _UA_UTILITY_COL, _UA_HP_COL): + if col not in ua.columns: + msg = f"utility_assignment missing column {col!r}; have {ua.columns}" + raise ValueError(msg) + + lf = ua.lazy() + if electric_utility is not None: + lf = lf.filter(pl.col(_UA_UTILITY_COL) == electric_utility.strip().lower()) + if has_hp_filter is True: + lf = lf.filter(pl.col(_UA_HP_COL)) + elif has_hp_filter is False: + lf = lf.filter(~pl.col(_UA_HP_COL)) + + out = cast( + pl.DataFrame, + lf.select( + pl.col(BLDG_ID_COL).cast(pl.Int64), + pl.col(_WEIGHT_COL).cast(pl.Float64), + ).collect(), + ) + return out + + +def aggregate_hourly_load_mwh( + resstock_base: str, + state: str, + upgrade: str, + buildings: pl.DataFrame, + *, + load_col: str, + weighted: bool, + storage_options: dict[str, str], +) -> pl.DataFrame: + """One row per timestamp: ``timestamp``, ``load_mwh``. + + Reads each hourly parquet under the state/upgrade partition in parallel + (threads). Each worker returns a partial per-timestamp sum; the main thread + concatenates and sums with no shared mutable state (safe for concurrency). + """ + bldg_ids = buildings[BLDG_ID_COL].to_list() + if not bldg_ids: + msg = "No buildings after filters; check utility / cohort" + raise ValueError(msg) + + up = _normalize_upgrade(upgrade) + partition_dir = _upgrade_partition_dir(resstock_base, state, up) + paths = _parquet_paths_for_partition( + partition_dir, + bldg_ids, + up, + storage_options, + ) + if not paths: + msg = f"No parquet paths resolved under {partition_dir}" + raise ValueError(msg) + + allowed = frozenset(bldg_ids) + weights_df = buildings.select( + pl.col(BLDG_ID_COL).cast(pl.Int64), + pl.col(_WEIGHT_COL).cast(pl.Float64), + ) + + n_workers = min(_MAX_PARQUET_READ_WORKERS, max(1, len(paths))) + total_bldgs = len(bldg_ids) + # Log roughly every 2% of the cohort so large runs do not flood INFO. + log_step = max(1, total_bldgs // 50) + + def _worker(path: str) -> tuple[pl.DataFrame | None, int]: + return _partial_aggregate_one_parquet( + path, + allowed_bldg_ids=allowed, + load_col=load_col, + weighted=weighted, + weights_df=weights_df, + storage_options=storage_options, + ) + + partials: list[pl.DataFrame] = [] + processed_bldgs = 0 + next_milestone = log_step + last_logged = 0 + with ThreadPoolExecutor(max_workers=n_workers) as ex: + futures = {ex.submit(_worker, p): p for p in paths} + for fut in as_completed(futures): + part, n_bldg = fut.result() + processed_bldgs += n_bldg + if part is not None and part.height > 0: + partials.append(part) + shown = min(processed_bldgs, total_bldgs) + while next_milestone <= total_bldgs and shown >= next_milestone: + log.info( + "Progress: %d out of %d buildings processed", + next_milestone, + total_bldgs, + ) + last_logged = next_milestone + next_milestone += log_step + final_shown = min(processed_bldgs, total_bldgs) + if final_shown != last_logged: + log.info( + "Progress: %d out of %d buildings processed", + final_shown, + total_bldgs, + ) + + if not partials: + msg = "No load data after reading parquet files; check paths / cohort / load column" + raise ValueError(msg) + + out = ( + pl.concat(partials) + .group_by("timestamp") + .agg(pl.col("load_mwh").sum().alias("load_mwh")) + .sort("timestamp") + ) + return cast(pl.DataFrame, out) + + +def _safe_label(s: str | None) -> str: + if s is None: + return "all" + return re.sub(r"[^\w.\-]+", "_", s) + + +def _parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Aggregate ResStock hourly electric load to MWh per timestamp.", + ) + p.add_argument( + "--resstock-base", + type=str, + default=None, + help="Full path to ResStock release root (contains load_curve_hourly/).", + ) + p.add_argument( + "--nrel-root", + type=str, + default="s3://data.sb/nrel/resstock", + help="Parent of release folder (used with --release if --resstock-base unset).", + ) + p.add_argument( + "--release", + type=str, + default=None, + help="Release directory name, e.g. res_2024_amy2018_2_sb (required if no --resstock-base).", + ) + p.add_argument("--state", type=str, required=True, help="State, e.g. NY or RI.") + p.add_argument( + "--upgrades", + type=str, + required=True, + help='Space-separated upgrade IDs, e.g. "00 02".', + ) + p.add_argument( + "--electric-utility", + type=str, + default=None, + help=( + "``sb.electric_utility`` code (e.g. coned, rie), or ``all`` to run once " + "per distinct utility in the metadata file. Omit only when " + "--customer-class is true (state-wide HP vs non-HP)." + ), + ) + p.add_argument( + "--customer-class", + type=str, + choices=("true", "false"), + required=True, + help=( + "If true, aggregate separately for HP vs non-HP (optional utility, or " + "``all`` for every utility). If false, one aggregate per utility " + "(requires --electric-utility or ``all``)." + ), + ) + p.add_argument( + "--path-utility-assignment", + type=str, + default=None, + help="Override path to utility_assignment.parquet.", + ) + p.add_argument( + "--path-output-dir", + type=Path, + required=True, + help="Directory for output Parquet files (created if missing).", + ) + p.add_argument( + "--load-column", + type=str, + default=ELECTRIC_LOAD_COL, + help=f"Hourly kWh column (default: {ELECTRIC_LOAD_COL}).", + ) + p.add_argument( + "--unweighted", + action="store_true", + help="Sum building kWh without sample weights (default: weighted).", + ) + return p.parse_args() + + +def main() -> None: + logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") + args = _parse_args() + + split_by_hp = args.customer_class == "true" + if not split_by_hp and not args.electric_utility: + log.error( + "When --customer-class is false, --electric-utility is required " + "(use a code or ``all`` for every utility in the metadata).", + ) + sys.exit(2) + + try: + resstock_base = _resstock_base(args) + except ValueError as e: + log.error("%s", e) + sys.exit(2) + if args.resstock_base and args.release: + log.info("Using --resstock-base; --release/--nrel-root ignored") + state_u = args.state.strip().upper() + upgrades = _parse_upgrades(args.upgrades) + if not upgrades: + log.error("No upgrades parsed from --upgrades") + sys.exit(2) + + path_ua = args.path_utility_assignment or _default_utility_assignment_path( + resstock_base, + state_u, + ) + storage_options = {"aws_region": get_aws_region()} + + log.info("ResStock base: %s", resstock_base) + log.info("Utility assignment: %s", path_ua) + log.info("Split by HP cohorts: %s", split_by_hp) + + out_dir: Path = args.path_output_dir + out_dir.mkdir(parents=True, exist_ok=True) + + weighted = not args.unweighted + + try: + util_passes = electric_utility_passes( + args.electric_utility, + path_utility_assignment=path_ua, + storage_options=storage_options, + ) + except ValueError as e: + log.error("%s", e) + sys.exit(2) + + if args.electric_utility and args.electric_utility.strip().lower() == "all": + log.info("Running %d electric-utility passes (all)", len(util_passes)) + + cohorts: list[tuple[str, bool | None]] + if split_by_hp: + cohorts = [("hp", True), ("non_hp", False)] + else: + cohorts = [("all", None)] + + for util_code in util_passes: + util_label = _safe_label(util_code) + for cohort_name, has_hp_filter in cohorts: + buildings = load_filtered_buildings( + path_ua, + electric_utility=util_code, + has_hp_filter=has_hp_filter, + storage_options=storage_options, + ) + log.info( + "Cohort %s: %d buildings (utility=%s, has_hp_filter=%s)", + cohort_name, + buildings.height, + util_code, + has_hp_filter, + ) + for up in upgrades: + df = aggregate_hourly_load_mwh( + resstock_base, + state_u, + up, + buildings, + load_col=args.load_column, + weighted=weighted, + storage_options=storage_options, + ) + if split_by_hp: + name = ( + f"aggregate_hourly_load_{state_u}_{util_label}_{cohort_name}" + f"_upgrade{up}.parquet" + ) + else: + name = f"aggregate_hourly_load_{state_u}_{util_label}_upgrade{up}.parquet" + path_out = out_dir / name + df.write_parquet(path_out, storage_options=storage_options) + log.info("Wrote %s (%d rows)", path_out, df.height) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 8a44e1f4..23061283 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,7 +77,8 @@ include = ["data*", "rate_design*", "utils*"] "utils.post" = ["data/*.yaml"] [tool.ty.src] -exclude = [".cache/"] +# prek stores cloned hook repos under .prek-cache/; do not type-check third-party code. +exclude = [".cache/", ".prek-cache/"] [tool.uv.sources] cairo = { git = "https://github.com/natlabrockies/cairo", rev = "09a4d7f" } diff --git a/tests/test_create_aggregate_hourly_loads.py b/tests/test_create_aggregate_hourly_loads.py new file mode 100644 index 00000000..4495b570 --- /dev/null +++ b/tests/test_create_aggregate_hourly_loads.py @@ -0,0 +1,166 @@ +"""Tests for aggregate hourly ResStock load helper functions.""" + +from __future__ import annotations + +import argparse +from datetime import datetime + +import polars as pl +import pytest + +from data.resstock.create_aggregate_hourly_loads import ( + _parse_upgrades, + _resstock_base, + aggregate_hourly_load_mwh, + electric_utility_passes, + list_unique_electric_utilities, + load_filtered_buildings, +) + + +def test_parse_upgrades_zfill() -> None: + assert _parse_upgrades("0 2 00") == ["00", "02", "00"] + + +def test_resstock_base_from_release() -> None: + args = argparse.Namespace( + resstock_base=None, + nrel_root="s3://data.sb/nrel/resstock", + release="res_2024_amy2018_2_sb", + ) + assert _resstock_base(args) == "s3://data.sb/nrel/resstock/res_2024_amy2018_2_sb" + + +def test_resstock_base_explicit() -> None: + args = argparse.Namespace( + resstock_base="s3://bucket/res_x", + nrel_root="ignored", + release="ignored", + ) + assert _resstock_base(args) == "s3://bucket/res_x" + + +def test_load_filtered_buildings_utility_only(tmp_path) -> None: + path = tmp_path / "ua.parquet" + pl.DataFrame( + { + "bldg_id": [1, 2, 3], + "weight": [1.0, 2.0, 3.0], + "sb.electric_utility": ["a", "b", "a"], + "postprocess_group.has_hp": [True, False, True], + } + ).write_parquet(path) + df = load_filtered_buildings( + str(path), + electric_utility="a", + has_hp_filter=None, + storage_options={}, + ) + assert set(df["bldg_id"].to_list()) == {1, 3} + + +def test_load_filtered_buildings_hp_only(tmp_path) -> None: + path = tmp_path / "ua.parquet" + pl.DataFrame( + { + "bldg_id": [1, 2], + "weight": [1.0, 2.0], + "sb.electric_utility": ["a", "a"], + "postprocess_group.has_hp": [True, False], + } + ).write_parquet(path) + df = load_filtered_buildings( + str(path), + electric_utility=None, + has_hp_filter=True, + storage_options={}, + ) + assert df["bldg_id"].to_list() == [1] + + +def test_load_filtered_buildings_non_hp_only(tmp_path) -> None: + path = tmp_path / "ua.parquet" + pl.DataFrame( + { + "bldg_id": [1, 2], + "weight": [1.0, 2.0], + "sb.electric_utility": ["a", "a"], + "postprocess_group.has_hp": [True, False], + } + ).write_parquet(path) + df = load_filtered_buildings( + str(path), + electric_utility=None, + has_hp_filter=False, + storage_options={}, + ) + assert df["bldg_id"].to_list() == [2] + + +def test_aggregate_hourly_weighted_mwh(tmp_path) -> None: + """Two buildings, one hour: (10*2 + 5*1) kWh -> 0.025 MWh.""" + resstock_base = str(tmp_path) + st = "NY" + up = "00" + part = tmp_path / "load_curve_hourly" / f"state={st}" / f"upgrade={up}" + part.mkdir(parents=True) + pl.DataFrame( + { + "bldg_id": [1, 2], + "timestamp": [datetime(2018, 1, 1, 0), datetime(2018, 1, 1, 0)], + "out.electricity.net.energy_consumption": [10.0, 5.0], + } + ).write_parquet(part / "chunk.parquet") + + buildings = pl.DataFrame({"bldg_id": [1, 2], "weight": [2.0, 1.0]}) + out = aggregate_hourly_load_mwh( + resstock_base, + st, + up, + buildings, + load_col="out.electricity.net.energy_consumption", + weighted=True, + storage_options={}, + ) + assert out.height == 1 + assert out["load_mwh"][0] == pytest.approx(25.0 / 1000.0) + + +def test_load_filtered_buildings_requires_filter() -> None: + with pytest.raises(ValueError, match="electric_utility"): + load_filtered_buildings( + "missing.parquet", + electric_utility=None, + has_hp_filter=None, + storage_options={}, + ) + + +def test_list_unique_electric_utilities_sorted(tmp_path) -> None: + path = tmp_path / "ua.parquet" + pl.DataFrame( + {"sb.electric_utility": ["zebra", "a", "a", None, "Zebra"]}, + ).write_parquet(path) + assert list_unique_electric_utilities(str(path), {}) == ["a", "zebra"] + + +def test_electric_utility_passes_none_and_all(tmp_path) -> None: + path = tmp_path / "ua.parquet" + pl.DataFrame( + { + "sb.electric_utility": ["x", "y"], + }, + ).write_parquet(path) + assert electric_utility_passes( + None, path_utility_assignment=str(path), storage_options={} + ) == [None] + assert electric_utility_passes( + "all", + path_utility_assignment=str(path), + storage_options={}, + ) == ["x", "y"] + assert electric_utility_passes( + "x", + path_utility_assignment=str(path), + storage_options={}, + ) == ["x"] diff --git a/tests/test_validate_ny_mc_data.py b/tests/test_validate_ny_mc_data.py index b66a565e..ee495d97 100644 --- a/tests/test_validate_ny_mc_data.py +++ b/tests/test_validate_ny_mc_data.py @@ -7,7 +7,11 @@ import polars as pl -from utils.post.validate_ny_mc_data import check_mc, load_mc +from utils.post.validate_ny_mc_data import ( + check_mc, + load_mc, + mc_data_to_timeseries_long, +) def _timestamps_8760(year: int) -> list[datetime]: @@ -95,3 +99,17 @@ def test_bulk_tx_expected_nonzero_is_state_aware_for_ri() -> None: "expected 100 nonzero hours" in issue for issue in ri_result["issues"] ) assert any("expected 80 nonzero hours" in issue for issue in ny_result["issues"]) + + +def test_mc_data_to_timeseries_long_shape() -> None: + """Long timeseries table has one row per hour per component.""" + year = 2025 + ts = _timestamps_8760(year) + mc_data = { + "supply_energy": pl.DataFrame({"timestamp": ts, "mc_kwh": [0.1] * 8760}), + "bulk_tx": pl.DataFrame({"timestamp": ts, "mc_kwh": [0.0] * 8760}), + } + plot_keys = ["supply_energy", "bulk_tx"] + long_df = mc_data_to_timeseries_long(mc_data, plot_keys) + assert long_df.height == 8760 * 2 + assert long_df["component"].n_unique() == 2 diff --git a/utils/post/validate_ny_mc_data.py b/utils/post/validate_ny_mc_data.py index 0c8f6e5f..7528ce9f 100644 --- a/utils/post/validate_ny_mc_data.py +++ b/utils/post/validate_ny_mc_data.py @@ -5,9 +5,18 @@ marginal cost parquets from S3, runs sanity checks (row counts, nulls, non-negative values, expected nonzero hour counts, year matching), prints a text summary, and generates: - 1. A 5-panel heatmap grid per utility (hour-of-day x day-of-year, magma colormap, - independent color scale per MC type). - 2. A faceted histogram of supply energy MC across all utilities. + 1. A heatmap grid per utility (hour-of-day x day-of-year, plasma colormap, + independent color scale per MC type). Uses all loaded types in + ``HEATMAP_ORDER``; **``supply_ancillary`` is optional** — plots run when the + same **four** components as ``analysis.qmd`` exist (energy, capacity, + bulk TX, dist/sub-TX). + 2. A **total MC** heatmap (sum of all components) styled like + ``reports2/.../analyze_marginal_costs.qmd`` (green gradient). + 3. A **faceted** heatmap (one panel per MC type) like + ``reports2/.../tou_investigation.qmd``. + 4. A faceted histogram of supply energy MC across all utilities. + 5. Per utility: **time series** of each MC component (faceted lines) and of **total** + MC (sum of loaded components). Usage: uv run python utils/post/validate_ny_mc_data.py --state ny --output-dir /tmp/mc_validation @@ -23,18 +32,26 @@ from pathlib import Path from typing import cast +import matplotlib + +matplotlib.use("Agg") # headless-safe PNG writes (SSH / CI) + import polars as pl from plotnine import ( aes, + coord_cartesian, element_text, facet_wrap, geom_blank, geom_histogram, + geom_line, geom_tile, ggplot, labs, scale_fill_cmap, + scale_fill_gradient, scale_x_continuous, + scale_x_datetime, scale_y_reverse, theme, theme_minimal, @@ -125,7 +142,7 @@ class StateValidationConfig: SUMMER_MONTHS = {4, 5, 6, 7, 8, 9} WINTER_MONTHS = {1, 2, 3, 10, 11, 12} -# Heatmap panel order (row-major): top row then bottom row. +# Heatmap / validation iteration order (ancillary last — optional on S3 for some vintages). HEATMAP_ORDER = [ "supply_energy", "supply_capacity", @@ -134,6 +151,16 @@ class StateValidationConfig: "dist_sub_tx", ] +# Minimum set to draw heatmaps (matches reports2 NY ``analysis.qmd`` MC_SOURCES; no ancillary). +HEATMAP_REQUIRED_FOR_PLOTS: frozenset[str] = frozenset( + { + "supply_energy", + "supply_capacity", + "bulk_tx", + "dist_sub_tx", + } +) + # ── Data loading ────────────────────────────────────────────────────────────── @@ -421,6 +448,79 @@ def print_check_results(r: dict) -> None: # ── Heatmap data prep ───────────────────────────────────────────────────────── +def combine_mc_total(mc_data: dict[str, pl.DataFrame], keys: list[str]) -> pl.DataFrame: + """Join normalized MC types on ``timestamp`` and sum to total $/kWh.""" + if not keys: + msg = "combine_mc_total: at least one mc key required" + raise ValueError(msg) + first = mc_data[keys[0]].select("timestamp", pl.col(NORMALIZED_COL).alias("_s0")) + joined = first + for i, mc_key in enumerate(keys[1:], start=1): + joined = joined.join( + mc_data[mc_key].select( + "timestamp", + pl.col(NORMALIZED_COL).alias(f"_s{i}"), + ), + on="timestamp", + how="inner", + ) + sum_cols = [f"_s{i}" for i in range(len(keys))] + return joined.with_columns(pl.sum_horizontal(sum_cols).alias("mc_total")).select( + "timestamp", + "mc_total", + ) + + +def prepare_total_mc_heatmap_df(df: pl.DataFrame) -> pl.DataFrame: + """Long table for total-MC tile plot (matches notebook ``mc-heatmap`` cell).""" + return df.with_columns( + pl.col("timestamp").dt.hour().alias("hour"), + pl.col("timestamp").dt.ordinal_day().alias("day_of_year"), + ).select("hour", "day_of_year", pl.col("mc_total").alias("value")) + + +def mc_data_to_long_faceted( + mc_data: dict[str, pl.DataFrame], + plot_keys: list[str], +) -> pl.DataFrame: + """Unpivot MC types for ``facet_wrap`` heatmaps.""" + frames: list[pl.DataFrame] = [] + for mc_key in plot_keys: + info = MC_TYPE_DEFS[mc_key] + frames.append( + mc_data[mc_key] + .with_columns( + pl.col("timestamp").dt.hour().alias("hour"), + pl.col("timestamp").dt.ordinal_day().alias("day_of_year"), + ) + .select( + "day_of_year", + "hour", + pl.col(NORMALIZED_COL).alias("value"), + pl.lit(info["label"]).alias("component"), + ) + ) + return pl.concat(frames) + + +def mc_data_to_timeseries_long( + mc_data: dict[str, pl.DataFrame], + plot_keys: list[str], +) -> pl.DataFrame: + """Unpivot MC types for time-series line plots (timestamp on x-axis).""" + frames: list[pl.DataFrame] = [] + for mc_key in plot_keys: + info = MC_TYPE_DEFS[mc_key] + frames.append( + mc_data[mc_key].select( + "timestamp", + pl.col(NORMALIZED_COL).alias("value"), + pl.lit(info["label"]).alias("component"), + ) + ) + return pl.concat(frames) + + def prepare_heatmap_df(df: pl.DataFrame) -> pl.DataFrame: """Add hour-of-day and day-of-year columns for heatmap plotting. @@ -506,15 +606,145 @@ def _make_blank_panel() -> ggplot: ) +def make_total_mc_heatmap( + mc_total_df: pl.DataFrame, + utility: str, + year: int, +) -> ggplot: + """Total $/kWh by hour × day — same spirit as ``analyze_marginal_costs.qmd`` ``mc-heatmap``.""" + heat_df = prepare_total_mc_heatmap_df(mc_total_df) + breaks, labels = _month_breaks(year) + return ( + ggplot(heat_df, aes(x="day_of_year", y="hour", fill="value")) + + geom_tile() + + scale_fill_gradient(low="#f7fcf5", high="#005a32", name="$/kWh") + + scale_x_continuous(breaks=breaks, labels=labels) + + scale_y_reverse() + + coord_cartesian(expand=False) + + labs( + title=f"{utility}: total hourly MC ({year})", + x="Day of year", + y="Hour", + ) + + theme_minimal() + + theme(figure_size=(10, 4), legend_position="right") + ) + + +def make_mc_timeseries_faceted( + long_df: pl.DataFrame, + utility: str, + year: int, + plot_keys: list[str], +) -> ggplot: + """One line chart per MC component — hourly $/kWh vs time.""" + comp_order = [MC_TYPE_DEFS[k]["label"] for k in plot_keys] + plot_df = long_df.with_columns(pl.col("component").cast(pl.Enum(comp_order))) + n_panels = len(plot_keys) + fig_h = min(2.8 * float(n_panels), 22.0) + return ( + ggplot(plot_df, aes(x="timestamp", y="value")) + + geom_line(size=0.25, color="#023047", alpha=0.9) + + facet_wrap("component", ncol=1, scales="free_y") + + scale_x_datetime(date_breaks="1 month", date_labels="%b") + + labs( + title=f"{utility}: marginal cost time series ({year})", + x="", + y=NORMALIZED_UNITS, + ) + + theme_minimal() + + theme( + figure_size=(12, fig_h), + plot_title=element_text(size=10), + axis_title=element_text(size=8), + axis_text=element_text(size=7), + strip_text=element_text(size=8), + ) + ) + + +def make_mc_timeseries_total( + mc_total_df: pl.DataFrame, + utility: str, + year: int, +) -> ggplot: + """Single line: sum of loaded MC components vs time.""" + return ( + ggplot(mc_total_df, aes(x="timestamp", y="mc_total")) + + geom_line(size=0.35, color="#005a32", alpha=0.9) + + scale_x_datetime(date_breaks="1 month", date_labels="%b") + + labs( + title=(f"{utility}: total marginal cost (sum of components) ({year})"), + x="", + y=NORMALIZED_UNITS, + ) + + theme_minimal() + + theme( + figure_size=(14, 4), + plot_title=element_text(size=10), + axis_title=element_text(size=9), + axis_text=element_text(size=7), + ) + ) + + +def make_faceted_mc_heatmaps( + long_df: pl.DataFrame, + utility: str, + year: int, + plot_keys: list[str], +) -> ggplot: + """One tile heatmap per component — same layout as ``tou_investigation.qmd`` fig-coned-mc-heatmaps.""" + comp_order = [MC_TYPE_DEFS[k]["label"] for k in plot_keys] + plot_df = long_df.with_columns(pl.col("component").cast(pl.Enum(comp_order))) + breaks_q = [1, 91, 182, 274, 365] + labels_q = ["Jan", "Apr", "Jul", "Oct", "Dec"] + return ( + ggplot(plot_df, aes(x="day_of_year", y="hour", fill="value")) + + geom_tile() + + facet_wrap("component", ncol=2) + + scale_fill_gradient(low="#FFFFFF", high="#023047", name="$/kWh") + + scale_x_continuous(breaks=breaks_q, labels=labels_q) + + scale_y_reverse() + + coord_cartesian(expand=False) + + labs( + title=f"{utility}: marginal costs by hour and day ({year})", + x="", + y="Hour", + fill="$/kWh", + ) + + theme_minimal() + + theme(figure_size=(10.5, 8)) + ) + + +def _compose_heatmap_panels(plots: list[ggplot]) -> ggplot | Compose: + """Tile plots in rows of two (pad with a blank panel if odd count).""" + if not plots: + msg = "_compose_heatmap_panels: empty plot list" + raise ValueError(msg) + if len(plots) == 1: + return plots[0] + padded = list(plots) + if len(padded) % 2 == 1: + padded.append(_make_blank_panel()) + rows: list = [] + for i in range(0, len(padded), 2): + rows.append(padded[i] | padded[i + 1]) + out = rows[0] + for row in rows[1:]: + out = out / row + return out + + def make_heatmap_grid( mc_data: dict[str, pl.DataFrame], check_results: dict[str, dict], - utility: str, - year: int, -) -> Compose: - """Compose 5 plotnine heatmaps into a 2+2+1 panel grid.""" + plot_keys: list[str], +) -> ggplot | Compose: + """Compose one plotnine heatmap per loaded MC type (2 columns, as many rows as needed).""" plots: list[ggplot] = [] - for mc_key in HEATMAP_ORDER: + for mc_key in plot_keys: info = MC_TYPE_DEFS[mc_key] r = check_results[mc_key] subtitle = ( @@ -524,11 +754,7 @@ def make_heatmap_grid( ) heatmap_df = prepare_heatmap_df(mc_data[mc_key]) plots.append(_make_heatmap(heatmap_df, subtitle, year=r["year"])) - - row1 = plots[0] | plots[1] - row2 = plots[2] | plots[3] - row3 = plots[4] | _make_blank_panel() - return row1 / row2 / row3 + return _compose_heatmap_panels(plots) def make_energy_histogram(energy_frames: list[pl.DataFrame]) -> ggplot: @@ -713,18 +939,58 @@ def main() -> None: f"{loaded_keys[0]} and {k}: {len(diff)} differences" ) - # Generate 5-panel heatmap grid - expected_panels = len(HEATMAP_ORDER) - if len(mc_data) == expected_panels: - grid = make_heatmap_grid(mc_data, all_checks, utility, year) - path_png = output_dir / f"mc_heatmap_{utility}.png" - grid.save(str(path_png), dpi=150, verbose=False) - print(f"\n Saved heatmap: {path_png}") - else: + # Heatmaps: require the 4-component stack used in analysis.qmd; ancillary is optional. + plot_keys = [k for k in HEATMAP_ORDER if k in mc_data] + if not HEATMAP_REQUIRED_FOR_PLOTS.issubset(mc_data.keys()): + missing_req = sorted(HEATMAP_REQUIRED_FOR_PLOTS - mc_data.keys()) print( - f"\n Skipping heatmap for {utility}: " - f"only {len(mc_data)}/{expected_panels} MC types loaded" + f"\n Skipping heatmaps for {utility}: missing {missing_req}. " + f"Required for plots: {sorted(HEATMAP_REQUIRED_FOR_PLOTS)}." ) + else: + if "supply_ancillary" not in mc_data: + print( + "\n Note: supply_ancillary not loaded — heatmaps use loaded components only " + "(four-way stack matches analysis.qmd when ancillary is absent)." + ) + try: + grid = make_heatmap_grid(mc_data, all_checks, plot_keys) + path_png = output_dir / f"mc_heatmap_{utility}.png" + grid.save(str(path_png), dpi=150, verbose=False) + print(f"\n Saved heatmap grid: {path_png}") + + mc_total_df = combine_mc_total(mc_data, plot_keys) + path_total = output_dir / f"mc_heatmap_total_{utility}.png" + make_total_mc_heatmap(mc_total_df, utility, year).save( + str(path_total), dpi=150, verbose=False + ) + print(f" Saved total MC heatmap: {path_total}") + + long_faceted = mc_data_to_long_faceted(mc_data, plot_keys) + path_faceted = output_dir / f"mc_heatmap_faceted_{utility}.png" + make_faceted_mc_heatmaps(long_faceted, utility, year, plot_keys).save( + str(path_faceted), dpi=150, verbose=False + ) + print(f" Saved faceted MC heatmap: {path_faceted}") + + ts_long = mc_data_to_timeseries_long(mc_data, plot_keys) + path_ts = output_dir / f"mc_timeseries_{utility}.png" + make_mc_timeseries_faceted(ts_long, utility, year, plot_keys).save( + str(path_ts), dpi=150, verbose=False + ) + print(f" Saved MC time series (by component): {path_ts}") + + path_ts_total = output_dir / f"mc_timeseries_total_{utility}.png" + make_mc_timeseries_total(mc_total_df, utility, year).save( + str(path_ts_total), dpi=150, verbose=False + ) + print(f" Saved MC time series (total): {path_ts_total}") + except Exception as exc: + print( + f"\n ERROR saving heatmaps for {utility}: {exc}", + file=sys.stderr, + ) + raise # Faceted energy histogram if energy_frames: