Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ cython_debug/

# PyPI configuration file
.pypirc
catalogs
catalogs/*
!catalogs/example-input-data/
holding
.DS_Store

Expand Down
Empty file.
22 changes: 22 additions & 0 deletions configs/params-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"watershed": {
"id": "allegheny",
"geometry_file": "/workspaces/stormhub/data/0_source/huc04/allegheny_huc.geojson",
"description": "Watershed for use in development of storm catalog"
},
"transposition_region": {
"id": "allegheny-transpo-area-v01",
"geometry_file": "/workspaces/stormhub/data/0_source/transposition/allegheny_slamsim2_transpo.geojson",
"description": "Transposition Domain developed by the ARC team"
},
"params": {
"start_date": "1979-02-01",
"end_date": "1979-02-28",
"top_n_events": 10,
"check_every_n_hours": 24,
"num_workers": 12,
"storm_duration_hours": 72,
"min_precip_threshold_inches": 1.0,
"output_resolution_km": 1
}
}
4 changes: 4 additions & 0 deletions env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ dependencies:
- zarr=3.1.3
- pip:
- pystac==1.14.1
- dataretrieval==1.1.2
- duckdb==1.4.4
- nbconvert==7.17.0
- boto3==1.40.54
- leafmap==0.60.1
- s3fs==0.4.2
- hecdss==0.1.28
- python-dotenv==1.1.1
Expand Down
126 changes: 114 additions & 12 deletions stormhub/met/storm_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,86 @@ def new_collection_from_items(self, collection_id: str, items: List[Item]) -> St
self.sanitize_catalog_assets()
return collection

def new_collection_from_items_on_disk(self, collection_id: str) -> StormCollection:
"""
Create a new collection from item JSON files saved on disk.

This avoids holding all items in memory at once.

Args:
collection_id (str): The ID of the collection.

Returns
-------
StormCollection: The new storm collection.
"""
collection_dir = self.spm.collection_dir(collection_id)
if not os.path.exists(collection_dir):
raise ValueError(f"Collection directory not found: {collection_dir}")

collection = None

for entry in os.scandir(collection_dir):
if not entry.is_dir():
continue

item_path = self.spm.collection_item(collection_id, entry.name)
if not os.path.exists(item_path):
continue

item = pystac.read_file(item_path)
if not isinstance(item, Item):
continue

if collection is None:
first_time = item.datetime or datetime(1970, 1, 1)
extent = pystac.Extent(
spatial=pystac.SpatialExtent(bboxes=[[0, 0, 0, 0]]),
temporal=pystac.TemporalExtent(intervals=[[first_time, first_time]]),
)
collection = StormCollection.__new__(StormCollection)
pystac.Collection.__init__(
collection,
id=collection_id,
description="STAC collection generated from storm items",
extent=extent,
)
collection.add_link(STORMHUB_REF_LINK)

collection.add_item_to_collection(item)

if collection is None:
raise ValueError(f"No item JSON files found in: {collection_dir}")

collection.update_extent_from_items()

collection.add_asset(
"valid-transposition-region",
pystac.Asset(
href=self.valid_transposition_region.self_href,
title="Valid Transposition Region",
description=f"Valid transposition region for {self.watershed.id} watershed",
media_type=pystac.MediaType.GEOJSON,
roles=["valid_transposition_region"],
),
)

collection.add_asset(
"watershed",
pystac.Asset(
href=self.watershed.self_href,
title="Watershed",
description=f"{self.watershed.id} watershed",
media_type=pystac.MediaType.GEOJSON,
roles=["watershed"],
),
)

collection.save_object(dest_href=self.spm.collection_file(collection_id), include_self_link=False)
self.add_collection_to_catalog(collection, override=True)
self.sanitize_catalog_assets()
return collection

def sort_collection(self, collection_id: Collection, property_name: str):
"""
Sort and save a STAC collection based on a specific property.
Expand Down Expand Up @@ -876,7 +956,7 @@ def create_items(
num_workers: int = None,
use_threads: bool = False,
with_tb: bool = False,
) -> List:
) -> None:
"""
Create items for storm events, setting the item ID to `por_rank` instead of storm_date.

Expand All @@ -891,19 +971,15 @@ def create_items(

Returns
-------
list: List of created event items.
None
"""
event_items = []

if not collection_id:
collection_id = catalog.spm.storm_collection_id(storm_duration)

collection_dir = catalog.spm.collection_dir(collection_id)
if not os.path.exists(collection_dir):
os.makedirs(collection_dir)

count = len(event_dates)

if not num_workers:
num_workers = 4

Expand All @@ -915,6 +991,24 @@ def create_items(
executor_class = ProcessPoolExecutor

storm_data = [(e["storm_date"], e["por_rank"]) for e in event_dates]
existing_item_ids = set()
if os.path.exists(collection_dir):
try:
existing_item_ids = {
name for name in os.listdir(collection_dir) if os.path.isdir(os.path.join(collection_dir, name))
}
except OSError as exc:
logging.warning("Unable to list collection directory '%s': %s", collection_dir, exc)
existing_item_ids = set()

if existing_item_ids:
pre_filter_count = len(storm_data)
storm_data = [(d, r) for d, r in storm_data if f"{r}" not in existing_item_ids]
skipped = pre_filter_count - len(storm_data)
if skipped > 0:
logging.info("Skipping %d existing items in collection '%s'.", skipped, collection_id)

count = len(storm_data)

with executor_class(max_workers=num_workers) as executor:
futures = [
Expand All @@ -934,8 +1028,10 @@ def create_items(
count -= 1
try:
r = future.result()
logging.info("%s processed (%d remaining)", r.datetime, count)
event_items.append(r)
if isinstance(r, dict):
logging.info("%s processed (%d remaining)", r.get("storm_date"), count)
else:
logging.info("%s processed (%d remaining)", r.datetime, count)

except Exception as e:
if with_tb:
Expand All @@ -944,7 +1040,7 @@ def create_items(
else:
logging.error("Error processing: %s", e)

return event_items
return None


def init_storm_catalog(
Expand Down Expand Up @@ -1131,6 +1227,7 @@ def add_storm_dss_files(
use_valid_region: bool = False,
variable_duration_map: Dict[NOAADataVariable, int] = None,
dss_output_dir: str = None,
output_resolution_km: int = 1,
):
"""
Add dss files containing meteorological data to all storm items in events collection.
Expand Down Expand Up @@ -1177,7 +1274,9 @@ def add_storm_dss_files(

variable_duration_map = {NOAADataVariable.APCP: duration_hours}

noaa_zarr_to_dss(dss_output_path, transpo_href, aoi_name, start_date_dt, variable_duration_map)
noaa_zarr_to_dss(
dss_output_path, transpo_href, aoi_name, start_date_dt, variable_duration_map, output_resolution_km
)

item.add_asset(
dss_fn,
Expand Down Expand Up @@ -1417,15 +1516,15 @@ def new_collection(

if create_new_items:
logging.info("Creating items for top %d events", len(top_events))
event_items = create_items(
create_items(
top_events.to_dict(orient="records"),
storm_catalog,
storm_duration=storm_duration,
with_tb=with_tb,
num_workers=num_workers,
use_threads=use_threads,
)
collection = storm_catalog.new_collection_from_items(collection_id, event_items)
collection = storm_catalog.new_collection_from_items_on_disk(collection_id)

else:
collection = storm_catalog.add_rank_to_collection(collection_id, top_events)
Expand Down Expand Up @@ -1453,6 +1552,7 @@ def resume_collection(
num_workers: int = None,
with_tb: bool = False,
create_items: bool = True,
use_threads: bool = False,
):
"""
Resume a storm collection.
Expand All @@ -1467,6 +1567,7 @@ def resume_collection(
check_every_n_hours (int): The interval in hours to check for storms.
num_workers (int, optional): Number of cpu's to use during processing.
with_tb (bool): Whether to include traceback in error logs.
use_threads (bool): Whether to use threads instead of processes.
"""
initialize_logger()
storm_catalog = StormCatalog.from_file(catalog)
Expand All @@ -1492,4 +1593,5 @@ def resume_collection(
num_workers=num_workers,
with_tb=with_tb,
create_new_items=create_items,
use_threads=use_threads,
)
3 changes: 2 additions & 1 deletion stormhub/met/zarr_to_dss.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ def noaa_zarr_to_dss(
aoi_name: str,
storm_start: datetime,
variable_duration_map: Dict[NOAADataVariable, int],
output_resolution_km: int,
):
"""Given a geometry and datetime information about a storm, writes variables of interest from NOAA dataset to DSS."""
# arrange parameters
Expand Down Expand Up @@ -500,6 +501,6 @@ def noaa_zarr_to_dss(
param_name=data_variable.dss_variable_title,
param_measurement_type=data_variable.measurement_type,
param_measurement_unit=data_variable.measurement_unit,
output_resolution_km=4,
output_resolution_km=output_resolution_km,
data_version="AORC",
)