diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 0000000..2fb3d69 --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,9 @@ +FROM mcr.microsoft.com/devcontainers/base:jammy + +RUN apt-get update && apt-get upgrade -y && \ + apt-get install -y awscli && \ + apt-get install -y gdal-bin libgdal-dev && \ + apt-get clean -y && \ + rm -rf /var/lib/apt/lists/* + +COPY env.yaml /tmp/env.yaml \ No newline at end of file diff --git a/.devcontainer/README.md b/.devcontainer/README.md new file mode 100644 index 0000000..857b56f --- /dev/null +++ b/.devcontainer/README.md @@ -0,0 +1,16 @@ +# Devcontainer Template +Devcontainers allow interactive development inside of a docker container using VSCode. + + +This devcontainer creates a reproducible environment for python projects using micromamba +environments (faster/more robust version of conda). To add this devcontainer template to your project, copy this .devcontainer folder +into the parent directory of your repository, and copy this [.gitattributes file](https://github.com/Michael-Baker-International-Lakewood/mbi_templates/blob/main/.gitattributes) into the same parent directory. + +When opening this repository in VSCode, you may be prompted to re-open the project in devcontainer. +Alternatively, you may access this option through the +View menu -> Command Palette -> DevContainers: Reopen in Container. + +Other requirements: +1. An environment file (env.yaml) is required placed in the root folder of the +project for a reproducible python environment to be succussfully built. +2. docker installed on the local machine (linux) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..0db8646 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,44 @@ +{ + "name": "stormhub-devcontainer", + "build": { + "dockerfile": "Dockerfile", + "context": ".." + }, + "features": { + "ghcr.io/devcontainers/features/git:1": {}, + "ghcr.io/mamba-org/devcontainer-features/micromamba:1": { + "envFile": "/tmp/env.yaml", + "envName": "stormhub-base", + "autoActivate": true, + "version": "1.5.6" + }, + "ghcr.io/devcontainers/features/node:1": { + "version": "lts" + }, + "ghcr.io/devcontainers/features/aws-cli:1": { + "version": "latest" + } + }, + // "runArgs": [ + // "--gpus", + // "all" + // ], + "customizations": { + "vscode": { + "extensions": [ + "ms-python.python", + "charliermarsh.ruff", + "GitHub.copilot", + "ms-toolsai.jupyter" + ], + "settings": { + "python.defaultInterpreterPath": "/opt/conda/envs/stormhub-base/bin/python" + } + } + }, + // be sure to have the workspace folder owned by vscode user + "postCreateCommand": "sudo chown -R vscode:vscode ${containerWorkspaceFolder}", + // start the dev container with the stormhub-base environment activated + // avoid dubious ownership of the workspace folder https://www.kenmuse.com/blog/avoiding-dubious-ownership-in-dev-containers/ + "postStartCommand": "micromamba shell init --shell=bash && echo 'micromamba activate stormhub-base' >> ~/.bashrc && git config --global --add safe.directory ${containerWorkspaceFolder}" +} \ No newline at end of file diff --git a/.gitignore b/.gitignore index 69ab47f..14d2e4f 100644 --- a/.gitignore +++ b/.gitignore @@ -174,4 +174,6 @@ holding .DS_Store bighorn -indian-creek \ No newline at end of file +indian-creek +allegheny +data \ No newline at end of file diff --git a/docs/make.sh b/docs/make.sh index 1136201..9d881bf 100755 --- a/docs/make.sh +++ b/docs/make.sh @@ -1,3 +1,3 @@ rm -rf build -sphinx-apidoc -o source ../../stormhub -sphinx-build -M html source build \ No newline at end of file +sphinx-apidoc -o docs/source stormhub +sphinx-build -M html docs/source docs/build -c docs/source \ No newline at end of file diff --git a/docs/source/user_guide.rst b/docs/source/user_guide.rst index c239036..e0017f0 100644 --- a/docs/source/user_guide.rst +++ b/docs/source/user_guide.rst @@ -101,8 +101,18 @@ The following snippet provides an example of how to build and create a storm cat storm_duration_hours, min_precip_threshold, top_n_events, + use_threads=True, #True for Linux/WSL, False for Windows + num_workers=8, # Number of parallel workers check_every_n_hours=6, ) + # Optionally, add DSS files to storm items + add_storm_dss_files(storm_catalog) + # Optionally, create normal precipitation grid + create_normal_precip(storm_catalog, duration_hours=storm_duration_hours) + +.. note:: + If using Windows, set `use_threads` to `False` in order to avoid issues with multiprocessing. On Linux/WSL, set `use_threads` to `True`. + The use of ProcessPoolExecutor on Linux can lead to complications due to the way processes are spawned. More investigation is needed on this. Viewing Results ---------------- diff --git a/env.yaml b/env.yaml new file mode 100644 index 0000000..cb594d9 --- /dev/null +++ b/env.yaml @@ -0,0 +1,29 @@ +name: base +channels: +- conda-forge +dependencies: +- dask=2025.10.0 +- fiona=1.10.1 +- geopandas=1.1.1 +- ipykernel=7.0.1 +- matplotlib=3.10.7 +- numpy=2.3.3 +- pandas=2.3.3 +- pip=25.2 +- pyarrow=21.0.0 +- pytest=8.4.2 +- python=3.12.12 +- requests=2.32.5 +- rioxarray=0.19.0 +- ruff=0.14.1 +- scipy=1.16.2 +- xarray=2025.10.1 +- zarr=3.1.3 +- pip: + - pystac==1.14.1 + - boto3==1.40.54 + - s3fs==0.4.2 + - hecdss==0.1.28 + - python-dotenv==1.1.1 + - shapely==2.1.2 + - contextily==1.6.2 diff --git a/stormhub/met/analysis.py b/stormhub/met/analysis.py index ba23c7f..0b06efb 100644 --- a/stormhub/met/analysis.py +++ b/stormhub/met/analysis.py @@ -3,6 +3,7 @@ import datetime import logging import os +from typing import Union import numpy as np import pandas as pd @@ -101,14 +102,14 @@ def _rank_storm_ids(self) -> list[str]: sorted_indices = np.argsort(self.metrics_df["mean"].values)[::-1] return self.metrics_df["storm_date"].dt.strftime("%Y-%m-%dT%H").iloc[sorted_indices].tolist() - def rank_and_save(self, collection_id: str, spm: StacPathManager) -> pd.DataFrame: + def rank_and_save(self, collection_id: str, spm: StacPathManager) -> tuple[pd.DataFrame, str]: """Rank storms and save to csv.""" output_file = os.path.join(spm.collection_dir(collection_id), "ranked-storms.csv") ranked_df = self.rank_and_filter_storms() ranked_df.to_csv(output_file, index=False) ranked_df["storm_date"] = pd.to_datetime(ranked_df["storm_date"]) logging.info("Saved ranked storm data to %s", output_file) - return ranked_df + return ranked_df, output_file class StormFilter: diff --git a/stormhub/met/aorc/aorc.py b/stormhub/met/aorc/aorc.py index 3a66572..0b2f7ec 100644 --- a/stormhub/met/aorc/aorc.py +++ b/stormhub/met/aorc/aorc.py @@ -1,6 +1,7 @@ """AORC Item class.""" import datetime +import gc import json import logging import os @@ -115,6 +116,16 @@ def __init__( self._transposition_transform: Affine | None = None self._stats: dict | None = None + def clear_cached_data(self) -> None: + """Explicitly clear all cached data to free memory. + + Call this after processing is complete to release memory. + """ + self._aorc_source_data = None + self._transpose = None + self._sum_aorc = None + gc.collect() + def _register_extensions(self) -> None: """Register item extensions.""" ProjectionExtension.add_to(self) @@ -145,8 +156,10 @@ def aorc_source_data(self) -> xr.Dataset: - adds ZARR files to assets if they don't exist already """ if self._aorc_source_data is None: - s3_out = s3fs.S3FileSystem(anon=True) + # Increase connection pool and configure for better memory management + s3_out = s3fs.S3FileSystem(anon=True, config_kwargs={"max_pool_connections": 50}) fileset = [s3fs.S3Map(root=aorc_path, s3=s3_out, check=False) for aorc_path in self.aorc_paths] + # Use auto chunks to align with Zarr storage, then rechunk if needed ds = xr.open_mfdataset(fileset, engine="zarr", chunks="auto", consolidated=True) transposition_geom_for_clip = self.transposition_domain_geometry @@ -158,7 +171,20 @@ def aorc_source_data(self) -> xr.Dataset: longitude=slice(bounds[0], bounds[2]), latitude=slice(bounds[1], bounds[3]), ) - self._aorc_source_data = subsection.rio.clip([transposition_geom_for_clip], drop=True, all_touched=True) + + # Clip to geometry + clipped = subsection.rio.clip([transposition_geom_for_clip], drop=True, all_touched=True) + + # Rechunk after loading to optimize memory usage for downstream operations + # This avoids the warning about misaligned chunks + self._aorc_source_data = clipped.chunk({"time": -1, "latitude": "auto", "longitude": "auto"}) + + # Clean up to free memory + del ds + del subsection + del clipped + gc.collect() + for aorc_path in self.aorc_paths: aorc_year = int(os.path.basename(aorc_path).replace(".zarr", "")) aorc_start_datetime = datetime.datetime( @@ -314,7 +340,8 @@ def aorc_thumbnail( def valid_spaces_item(watershed: Item, transposition_region: Item, storm_duration: int = 72) -> Polygon: """Search a sample zarr dataset to identify valid spaces for transposition. datetime.datetime(1980, 5, 1) is used as a start time for the search.""" - s3 = s3fs.S3FileSystem(anon=True) + # Increase connection pool to avoid warnings + s3 = s3fs.S3FileSystem(anon=True, config_kwargs={"max_pool_connections": 50}) start_time = datetime.datetime(1980, 5, 1) sample_data = s3fs.S3Map(root=f"{NOAA_AORC_S3_BASE_URL}/{start_time.year}.zarr", s3=s3) ds = xr.open_dataset(sample_data, engine="zarr", chunks="auto", consolidated=True) diff --git a/stormhub/met/storm_catalog.py b/stormhub/met/storm_catalog.py index 9d661b1..930f99d 100644 --- a/stormhub/met/storm_catalog.py +++ b/stormhub/met/storm_catalog.py @@ -6,17 +6,29 @@ import traceback from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed from datetime import datetime, timedelta -from typing import Any, List, Union +from typing import Any, List, Union, Dict +import gc +import re import pandas as pd import pystac from pystac import Asset, Collection, Item, Link, MediaType from shapely.geometry import mapping, shape, Point +import xarray as xr +import geopandas as gpd +import matplotlib from stormhub.hydro_domain import HydroDomain from stormhub.logger import initialize_logger from stormhub.met.analysis import StormAnalyzer from stormhub.met.aorc.aorc import AORCItem, valid_spaces_item +from stormhub.met.zarr_to_dss import ( + get_aorc_paths, + get_s3_zarr_data, + noaa_zarr_to_dss, + NOAADataVariable, + save_da_as_geotiff, +) from stormhub.utils import ( STORMHUB_REF_LINK, StacPathManager, @@ -254,6 +266,26 @@ def max_precip_feature_collection(self, spm: StacPathManager): logging.info("FeatureCollection saved to %s", output_geojson) self.save_object(dest_href=spm.collection_file(self.id), include_self_link=False) + def add_ranked_storms_asset(self, ranked_storms_path: str, spm: StacPathManager): + """ + Add ranked storms CSV as an asset to the collection. + + Args: + ranked_storms_path (str): Path to the ranked storms CSV file. + """ + self.add_asset( + "ranked_storms", + Asset( + href=ranked_storms_path, + title="ranked_storms", + description="CSV file containing ranked storm events.", + media_type="text/csv", + roles=["storm_summary"], + ), + ) + logging.info("Added ranked storms asset from %s to collection %s", ranked_storms_path, self.id) + self.save_object(dest_href=spm.collection_file(self.id), include_self_link=False) + class StormCatalog(pystac.Catalog): """ @@ -637,6 +669,10 @@ def storm_search( logging.debug("Statistics: %s", event_stats) logging.debug("Storm Date: %s", storm_start_date.strftime("%Y-%m-%dT%H")) logging.debug("Destination href: %s", catalog.spm.collection_item(collection_id, event_item.id)) + + # Clear cached data immediately to free memory + event_item.clear_cached_data() + if return_item: if not os.path.exists(item_dir): os.makedirs(item_dir) @@ -645,11 +681,14 @@ def storm_search( event_item.save_object(dest_href=catalog.spm.collection_item(collection_id, event_item.id)) return event_item else: - return { + result = { "storm_date": storm_start_date.strftime("%Y-%m-%dT%H"), "centroid": centroid, "aorc:statistics": event_stats, } + del event_item + gc.collect() + return result def serial_processor( @@ -718,35 +757,50 @@ def multi_processor( with_tb (bool): Whether to include traceback in error logs. """ if use_threads: - executor = ThreadPoolExecutor + executor_class = ThreadPoolExecutor else: - executor = ProcessPoolExecutor + executor_class = ProcessPoolExecutor if not os.path.exists(output_csv): - # append_mode=True with open(output_csv, "w", encoding="utf-8") as f: f.write("storm_date,min,mean,max,x,y\n") count = len(event_dates) - with open(output_csv, "a", encoding="utf-8") as f: - with executor(max_workers=num_workers) as executor: - futures = [executor.submit(func, catalog, date, storm_duration) for date in event_dates] - for future in as_completed(futures): - count -= 1 - try: - r = future.result() - f.write(storm_search_results_to_csv_line(r)) - logging.info("%s processed (%d remaining)", r["storm_date"], count) - - except Exception as e: - if with_tb: - tb = traceback.format_exc() - logging.error("Error processing: %s\n%s", e, tb) - continue - else: - logging.error("Error processing: %s", e) - continue + # Process in smaller batches to avoid memory buildup + # Limit concurrent tasks to prevent OOM, especially with threads loading large datasets + batch_size = max(min(num_workers // 3, 10), 2) if use_threads else num_workers + logging.info("Processing in batches of %d", batch_size) + + with executor_class(max_workers=num_workers) as executor: + for i in range(0, len(event_dates), batch_size): + batch = event_dates[i : i + batch_size] + futures = [executor.submit(func, catalog, date, storm_duration) for date in batch] + + # Write results as they complete + with open(output_csv, "a", encoding="utf-8") as f: + for future in as_completed(futures): + count -= 1 + try: + r = future.result() + f.write(storm_search_results_to_csv_line(r)) + f.flush() # Force write to disk + logging.info("%s processed (%d remaining)", r["storm_date"], count) + + # Explicitly delete result to free memory + del r + + except Exception as e: + if with_tb: + tb = traceback.format_exc() + logging.error("Error processing: %s\n%s", e, tb) + else: + logging.error("Error processing: %s", e) + + # Clear futures list and force garbage collection between batches + del futures + del batch + gc.collect() def collect_event_stats( @@ -781,12 +835,13 @@ def collect_event_stats( os.makedirs(collection_dir) if not num_workers and not use_threads: + # For processes: limited by CPU cores if os.cpu_count() > 2: num_workers = os.cpu_count() - 2 else: num_workers = 1 elif not num_workers and use_threads: - num_workers = 15 + num_workers = 10 output_csv = os.path.join(collection_dir, "storm-stats.csv") if use_parallel_processing: @@ -819,6 +874,7 @@ def create_items( collection_id: str = None, storm_duration: int = 72, num_workers: int = None, + use_threads: bool = False, with_tb: bool = False, ) -> List: """ @@ -830,6 +886,7 @@ def create_items( collection_id (str, optional): The ID of the collection. storm_duration (int): The duration of the storm. num_workers (int, optional): Number of workers to use. + use_threads (bool): Whether to use threads instead of processes. with_tb (bool): Whether to include traceback in error logs. Returns @@ -848,11 +905,18 @@ def create_items( count = len(event_dates) if not num_workers: - num_workers = os.cpu_count() + num_workers = 4 + + if use_threads: + executor_class = ThreadPoolExecutor + # Force a non‑interactive backend (Agg) when threading. + matplotlib.use("Agg") + else: + executor_class = ProcessPoolExecutor storm_data = [(e["storm_date"], e["por_rank"]) for e in event_dates] - with ProcessPoolExecutor(max_workers=num_workers) as executor: + with executor_class(max_workers=num_workers) as executor: futures = [ executor.submit( storm_search, @@ -1023,6 +1087,201 @@ def find_missing_storm_dates(file_path: str, start_date: str, stop_date: str, ev return missing_datetimes +def find_asset_by_title(collection: Union[pystac.Catalog, pystac.Collection], title: str): + """Find an asset with the given title in a STAC collection.""" + for asset_key, asset in collection.assets.items(): + if asset.title == title: + return asset + return None + + +def parse_duration_from_id(collection_id: str) -> int: + """Parse the front number (duration in hours) from a collection ID.""" + match = re.match(r"(\d+)", collection_id) + if match: + return int(match.group(1)) + else: + return None + + +def get_events_collection(catalog: pystac.Catalog): + """Find storm events collection from given Catalog.""" + for collection in catalog.get_all_collections(): + if "-events" in collection.id: + return collection + + raise ValueError(f"Could not find events collection in catalog: {catalog.id}.") + + +def get_transposition_item(catalog: pystac.Catalog, use_valid_region: bool = False): + """Find transposition region item from given Catalog.""" + for item in catalog.get_all_items(): + if "transpo" in item.id: + if use_valid_region and "valid" in item.id: + return item + elif not use_valid_region and "valid" not in item.id: + return item + + raise ValueError(f"Could not find transposition region item in catalog: {catalog.id}.") + + +def add_storm_dss_files( + catalog: pystac.catalog, + aoi_name: str = None, + use_valid_region: bool = False, + variable_duration_map: Dict[NOAADataVariable, int] = None, + dss_output_dir: str = None, +): + """ + Add dss files containing meteorological data to all storm items in events collection. + + Args: + catalog (Union[str | Catalog]): The storm catalog or path to the catalog file. + aoi_name (str, optional): Optional aoi name for part B of dss file. If None then the catalog ID is used. + use_valid_region (bool, optional): If True, the gridded DSS data will be confined to the valid transposition region. If False, the the data uses the entire transposition region. Defaults to False. + variable_duration_map (Dict[NOAADataVariable, int], Optional): Optional variable map to include multiple variables and/or different durations for dss creation. If None is given, only precipitation is used at the duration between the storm items start and end time. + dss_output_dir (str, optional): Optional output directory for dss files. If None, dss files are saved in the same directory as the associated storm item. + + """ + if isinstance(catalog, str): + catalog = pystac.read_file(catalog) + + events_collection = get_events_collection(catalog) + transpo_item = get_transposition_item(catalog, use_valid_region) + transpo_href = transpo_item.get_self_href() + + if aoi_name is None: + aoi_name = catalog.id + + for item in events_collection.get_items(): + try: + if dss_output_dir: + item_dir = os.path.abspath(dss_output_dir) + os.makedirs(item_dir, exist_ok=True) + else: + item_href = item.get_self_href() + item_dir = os.path.dirname(item_href) + + full_start_date = item.properties["start_datetime"] + start_date_dt = datetime.strptime(full_start_date, "%Y-%m-%dT%H:%M:%SZ") + start_date = start_date_dt.strftime("%Y%m%d") + + dss_fn = f"{start_date}.dss" + dss_output_path = os.path.join(item_dir, dss_fn) + + if variable_duration_map is None: + full_end_date = item.properties["end_datetime"] + end_date_dt = datetime.strptime(full_end_date, "%Y-%m-%dT%H:%M:%SZ") + time_difference = end_date_dt - start_date_dt + duration_hours = time_difference.total_seconds() / 3600 + + variable_duration_map = {NOAADataVariable.APCP: duration_hours} + + noaa_zarr_to_dss(dss_output_path, transpo_href, aoi_name, start_date_dt, variable_duration_map) + + item.add_asset( + dss_fn, + Asset( + dss_output_path, + dss_fn, + description="DSS file containing meteorological data for storm period.", + media_type="application/x-dss", + roles="data", + ), + ) + item.save_object() + logging.info(f"Successfully saved storm dss file to: {dss_output_path}") + except Exception as e: + logging.error(f"Could not create dss file for item: {item.id} with error: {e}") + + +def create_normal_precip(catalog: pystac.Catalog, output_path: str = None, duration_hours: int = None): + """ + Create a normal precipitation GeoTIFF by averaging precipitation data over the top annual storms for each year. + + Args: + catalog (pystac.Catalog): The STAC catalog containing storm data. + output_path (str, optional): Path to save the output GeoTIFF. If None is given, it will be saved in the events collection directory. + duration_hours (int, optional): Duration of the storm event in hours. If None, it will be parsed from the collection ID. + """ + events_collection = get_events_collection(catalog) + transpo_item = get_transposition_item(catalog) + transpo_href = transpo_item.get_self_href() + + if duration_hours is None: + collection_id = events_collection.id + duration_hours = parse_duration_from_id(collection_id) + + if duration_hours is None: + logging.warning( + "No duration given and could not determine duration hours from collection ID, using default value of 72." + ) + duration_hours = 72 + + logging.info(f"Using storm event duration of {duration_hours} hours for normal precipitation calculation.") + + ranked_storms_path = events_collection.self_href.replace("collection.json", f"ranked-storms.csv") + try: + ranked_storms = pd.read_csv(ranked_storms_path) + except Exception as e: + logging.error("No ranked storms CSV found.") + return + + top_annual_storms = ranked_storms[ranked_storms["annual_rank"] == 1] + top_annual_storm_dates = top_annual_storms["storm_date"].tolist() + num_years = len(top_annual_storm_dates) + logging.info(f"Number of years for normal precipitation calculation: {num_years}") + + if output_path is None: + output_path = events_collection.self_href.replace("collection.json", f"normal_precip_{num_years}-years.tif") + + # Precipitation variable map with duration + variable_duration_map = {NOAADataVariable.APCP: duration_hours} + + storm_arrays = [] + for storm_date_str in top_annual_storm_dates: + logging.info(f"Processing storm date: {storm_date_str}") + try: + storm_start = datetime.strptime(storm_date_str, "%Y-%m-%dT%H") + + all_variables = list(variable_duration_map.keys()) + min_start = storm_start + timedelta(hours=1) # make exclusive + max_end = storm_start + timedelta(hours=max(variable_duration_map.values())) + aorc_paths = get_aorc_paths(min_start, max_end) + aoi_gdf = gpd.read_file(transpo_href) + voi_keys = [v.value for v in all_variables] + + # get aorc data + aorc_data = get_s3_zarr_data(aorc_paths, aoi_gdf, min_start, max_end, voi_keys) + summed_data = aorc_data.sum(dim="time") + + da = summed_data["APCP_surface"] + da = da.expand_dims(storm=[storm_start]) + da = da.assign_coords(storm_time=("storm", [storm_start])) + storm_arrays.append(da) + except Exception as e: + logging.error(f"Error extracting precip data from storm date {storm_date_str}: {e}") + raise + + all_storms_da = xr.concat(storm_arrays, dim="storm") + mean_precip_da = all_storms_da.mean(dim="storm") + + logging.info(f"Saving normal precipitation GeoTIFF to {output_path}") + save_da_as_geotiff(mean_precip_da, output_path) + + events_collection.add_asset( + "normal-precipitation", + pystac.Asset( + output_path, + "normal-precipitation", + description="Normal precipitation data containing the average precipitation over the top storms for each year.", + media_type="application/geotiff", + roles=["data"], + ), + ) + events_collection.save_object() + + def new_catalog( catalog_id: str, config_file: str, @@ -1077,11 +1336,12 @@ def new_collection( start_date: str = "1979-02-01", end_date: str = None, storm_duration: int = 72, - min_precip_threshold: int = 1, + min_precip_threshold: float = 1.0, top_n_events: int = 5, - check_every_n_hours: int = 6, + check_every_n_hours: int = 24, specific_dates: list = None, num_workers: int = None, + use_threads: bool = False, with_tb: bool = False, create_new_items: bool = True, ): @@ -1093,11 +1353,12 @@ def new_collection( start_date (str): The start date for the collection. end_date (str, optional): The end date for the collection. storm_duration (int): The duration of the storm. - min_precip_threshold (int): The minimum precipitation threshold. + min_precip_threshold (float): The minimum precipitation threshold. top_n_events (int): The number of top events to include. check_every_n_hours (int): The interval in hours to check for storms. specific_dates (list, optional): Specific dates to include. num_workers (int, optional): Number of cpu's to use during processing. + use_threads (bool): Whether to use threads instead of processes. with_tb (bool): Whether to include traceback in error logs. create_new_items (bool): Create items (or skip if items exist) """ @@ -1133,7 +1394,13 @@ def new_collection( if dates: logging.info("Collecting event stats for %d dates", len(dates)) collect_event_stats( - dates, storm_catalog, collection_id, storm_duration, num_workers=num_workers, with_tb=with_tb + dates, + storm_catalog, + collection_id, + storm_duration, + num_workers=num_workers, + with_tb=with_tb, + use_threads=use_threads, ) stats_csv = os.path.join(storm_catalog.spm.collection_dir(collection_id), "storm-stats.csv") try: @@ -1143,24 +1410,34 @@ def new_collection( logging.error("No events at threshold `min_precip_threshold` %d: %s", min_precip_threshold, e) return - ranked_data = analyzer.rank_and_save(collection_id, storm_catalog.spm) + logging.info("Ranking storm events and selecting top %d events", top_n_events) + ranked_data, ranked_storms_output_path = analyzer.rank_and_save(collection_id, storm_catalog.spm) top_events = ranked_data[ranked_data["por_rank"] <= top_n_events].copy() if create_new_items: + logging.info("Creating items for top %d events", len(top_events)) event_items = create_items( - top_events.to_dict(orient="records"), storm_catalog, storm_duration=storm_duration, with_tb=with_tb + top_events.to_dict(orient="records"), + storm_catalog, + storm_duration=storm_duration, + with_tb=with_tb, + num_workers=num_workers, + use_threads=use_threads, ) collection = storm_catalog.new_collection_from_items(collection_id, event_items) else: collection = storm_catalog.add_rank_to_collection(collection_id, top_events) + logging.info("Adding summary stats and features to collection.") + collection.add_ranked_storms_asset(ranked_storms_output_path, storm_catalog.spm) collection.add_summary_stats(storm_catalog.spm) collection.watershed_centroid_feature_collection(storm_catalog.spm) collection.max_precip_feature_collection(storm_catalog.spm) storm_catalog.add_collection_to_catalog(collection, override=True) + logging.info("Saving catalog and collection.") storm_catalog.save_catalog() return collection diff --git a/stormhub/met/zarr_to_dss.py b/stormhub/met/zarr_to_dss.py index 77bd0d0..ba2f384 100644 --- a/stormhub/met/zarr_to_dss.py +++ b/stormhub/met/zarr_to_dss.py @@ -301,6 +301,15 @@ def interpolate_nan_values(ds: xr.DataArray) -> xr.DataArray: return interpolated_combined +def save_da_as_geotiff( + da: xr.DataArray, output_path: str, crs: str = "EPSG:4326", x_dim: str = "longitude", y_dim: str = "latitude" +): + """Save xarray DataArray as GeoTIFF.""" + da = da.rio.write_crs(crs) + da.rio.set_spatial_dims(x_dim=x_dim, y_dim=y_dim, inplace=True) + da.rio.to_raster(output_path, compress="LZW") + + def get_s3_zarr_data( s3_paths: List[str], aoi_gdf: GeoDataFrame, @@ -309,8 +318,17 @@ def get_s3_zarr_data( variables_of_interest: List[str], interp_nan_vals: bool = True, ) -> xr.Dataset: - """Load and processe Zarr datasets from S3, clipping them to a given area of interest (AOI), filter by time and variables, and optionally interpolates missing values.""" - s3 = s3fs.S3FileSystem(anon=True) + """ + Read a multifile dataset from the specified S3 paths, filters it based on the area of interest (AOI) and the time range, extracts only the variables of interest and returns an xarray Dataset. + + Args: + s3_paths: A list of S3 paths where the Zarr data is stored. + aoi_gdf: A GeoDataFrame containing the area of interest. Only the first entry is used, and should be a polygon or multipolygon geometry. + start_dt: The start datetime to filter the data. + end_dt: The end datetime to filter the data. + variables_of_interest: A list of variables to select from the dataset. If empty, all variables will be read. + """ + s3 = s3fs.S3FileSystem(anon=True, config_kwargs={"max_pool_connections": 50}) fileset = [s3fs.S3Map(root=path, s3=s3, check=False) for path in s3_paths] ds = xr.open_mfdataset(fileset, engine="zarr", chunks="auto", consolidated=True) diff --git a/workflows/new.py b/workflows/new.py index 57a4dd2..6c33882 100644 --- a/workflows/new.py +++ b/workflows/new.py @@ -1,5 +1,5 @@ from stormhub.logger import initialize_logger -from stormhub.met.storm_catalog import new_catalog, new_collection +from stormhub.met.storm_catalog import add_storm_dss_files, create_normal_precip, new_catalog, new_collection import json import logging import shutil @@ -79,7 +79,15 @@ def add_config_to_collection(storm_collection, config_filename="params-config.js storm_duration_hours, min_precip_threshold, top_n_events, - check_every_n_hours=24, + use_threads=True, #True for Linux/WSL, False for Windows + num_workers=8, # Number of parallel workers + check_every_n_hours=48, ) # Add config file as a STAC collection asset add_config_to_collection(storm_collection) + + # Optionally, add DSS files to storm items + #add_storm_dss_files(storm_catalog) + + # Optionally, create normal precipitation grid + #create_normal_precip(storm_catalog, duration_hours=storm_duration_hours)