diff --git a/.gitignore b/.gitignore index 14d2e4f..49c052d 100644 --- a/.gitignore +++ b/.gitignore @@ -169,7 +169,8 @@ cython_debug/ # PyPI configuration file .pypirc -catalogs +catalogs/* +!catalogs/example-input-data/ holding .DS_Store diff --git a/catalogs/example-input-data/.gitkeep b/catalogs/example-input-data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/configs/params-config.json b/configs/params-config.json new file mode 100644 index 0000000..c95eae5 --- /dev/null +++ b/configs/params-config.json @@ -0,0 +1,22 @@ +{ + "watershed": { + "id": "allegheny", + "geometry_file": "/workspaces/stormhub/data/0_source/huc04/allegheny_huc.geojson", + "description": "Watershed for use in development of storm catalog" + }, + "transposition_region": { + "id": "allegheny-transpo-area-v01", + "geometry_file": "/workspaces/stormhub/data/0_source/transposition/allegheny_slamsim2_transpo.geojson", + "description": "Transposition Domain developed by the ARC team" + }, + "params": { + "start_date": "1979-02-01", + "end_date": "1979-02-28", + "top_n_events": 10, + "check_every_n_hours": 24, + "num_workers": 12, + "storm_duration_hours": 72, + "min_precip_threshold_inches": 1.0, + "output_resolution_km": 1 + } +} \ No newline at end of file diff --git a/env.yaml b/env.yaml index cb594d9..cd12447 100644 --- a/env.yaml +++ b/env.yaml @@ -21,7 +21,11 @@ dependencies: - zarr=3.1.3 - pip: - pystac==1.14.1 + - dataretrieval==1.1.2 + - duckdb==1.4.4 + - nbconvert==7.17.0 - boto3==1.40.54 + - leafmap==0.60.1 - s3fs==0.4.2 - hecdss==0.1.28 - python-dotenv==1.1.1 diff --git a/stormhub/met/storm_catalog.py b/stormhub/met/storm_catalog.py index 930f99d..82f3eff 100644 --- a/stormhub/met/storm_catalog.py +++ b/stormhub/met/storm_catalog.py @@ -544,6 +544,86 @@ def new_collection_from_items(self, collection_id: str, items: List[Item]) -> St self.sanitize_catalog_assets() return collection + def new_collection_from_items_on_disk(self, collection_id: str) -> StormCollection: + """ + Create a new collection from item JSON files saved on disk. + + This avoids holding all items in memory at once. + + Args: + collection_id (str): The ID of the collection. + + Returns + ------- + StormCollection: The new storm collection. + """ + collection_dir = self.spm.collection_dir(collection_id) + if not os.path.exists(collection_dir): + raise ValueError(f"Collection directory not found: {collection_dir}") + + collection = None + + for entry in os.scandir(collection_dir): + if not entry.is_dir(): + continue + + item_path = self.spm.collection_item(collection_id, entry.name) + if not os.path.exists(item_path): + continue + + item = pystac.read_file(item_path) + if not isinstance(item, Item): + continue + + if collection is None: + first_time = item.datetime or datetime(1970, 1, 1) + extent = pystac.Extent( + spatial=pystac.SpatialExtent(bboxes=[[0, 0, 0, 0]]), + temporal=pystac.TemporalExtent(intervals=[[first_time, first_time]]), + ) + collection = StormCollection.__new__(StormCollection) + pystac.Collection.__init__( + collection, + id=collection_id, + description="STAC collection generated from storm items", + extent=extent, + ) + collection.add_link(STORMHUB_REF_LINK) + + collection.add_item_to_collection(item) + + if collection is None: + raise ValueError(f"No item JSON files found in: {collection_dir}") + + collection.update_extent_from_items() + + collection.add_asset( + "valid-transposition-region", + pystac.Asset( + href=self.valid_transposition_region.self_href, + title="Valid Transposition Region", + description=f"Valid transposition region for {self.watershed.id} watershed", + media_type=pystac.MediaType.GEOJSON, + roles=["valid_transposition_region"], + ), + ) + + collection.add_asset( + "watershed", + pystac.Asset( + href=self.watershed.self_href, + title="Watershed", + description=f"{self.watershed.id} watershed", + media_type=pystac.MediaType.GEOJSON, + roles=["watershed"], + ), + ) + + collection.save_object(dest_href=self.spm.collection_file(collection_id), include_self_link=False) + self.add_collection_to_catalog(collection, override=True) + self.sanitize_catalog_assets() + return collection + def sort_collection(self, collection_id: Collection, property_name: str): """ Sort and save a STAC collection based on a specific property. @@ -876,7 +956,7 @@ def create_items( num_workers: int = None, use_threads: bool = False, with_tb: bool = False, -) -> List: +) -> None: """ Create items for storm events, setting the item ID to `por_rank` instead of storm_date. @@ -891,10 +971,8 @@ def create_items( Returns ------- - list: List of created event items. + None """ - event_items = [] - if not collection_id: collection_id = catalog.spm.storm_collection_id(storm_duration) @@ -902,8 +980,6 @@ def create_items( if not os.path.exists(collection_dir): os.makedirs(collection_dir) - count = len(event_dates) - if not num_workers: num_workers = 4 @@ -915,6 +991,24 @@ def create_items( executor_class = ProcessPoolExecutor storm_data = [(e["storm_date"], e["por_rank"]) for e in event_dates] + existing_item_ids = set() + if os.path.exists(collection_dir): + try: + existing_item_ids = { + name for name in os.listdir(collection_dir) if os.path.isdir(os.path.join(collection_dir, name)) + } + except OSError as exc: + logging.warning("Unable to list collection directory '%s': %s", collection_dir, exc) + existing_item_ids = set() + + if existing_item_ids: + pre_filter_count = len(storm_data) + storm_data = [(d, r) for d, r in storm_data if f"{r}" not in existing_item_ids] + skipped = pre_filter_count - len(storm_data) + if skipped > 0: + logging.info("Skipping %d existing items in collection '%s'.", skipped, collection_id) + + count = len(storm_data) with executor_class(max_workers=num_workers) as executor: futures = [ @@ -934,8 +1028,10 @@ def create_items( count -= 1 try: r = future.result() - logging.info("%s processed (%d remaining)", r.datetime, count) - event_items.append(r) + if isinstance(r, dict): + logging.info("%s processed (%d remaining)", r.get("storm_date"), count) + else: + logging.info("%s processed (%d remaining)", r.datetime, count) except Exception as e: if with_tb: @@ -944,7 +1040,7 @@ def create_items( else: logging.error("Error processing: %s", e) - return event_items + return None def init_storm_catalog( @@ -1131,6 +1227,7 @@ def add_storm_dss_files( use_valid_region: bool = False, variable_duration_map: Dict[NOAADataVariable, int] = None, dss_output_dir: str = None, + output_resolution_km: int = 1, ): """ Add dss files containing meteorological data to all storm items in events collection. @@ -1177,7 +1274,9 @@ def add_storm_dss_files( variable_duration_map = {NOAADataVariable.APCP: duration_hours} - noaa_zarr_to_dss(dss_output_path, transpo_href, aoi_name, start_date_dt, variable_duration_map) + noaa_zarr_to_dss( + dss_output_path, transpo_href, aoi_name, start_date_dt, variable_duration_map, output_resolution_km + ) item.add_asset( dss_fn, @@ -1417,7 +1516,7 @@ def new_collection( if create_new_items: logging.info("Creating items for top %d events", len(top_events)) - event_items = create_items( + create_items( top_events.to_dict(orient="records"), storm_catalog, storm_duration=storm_duration, @@ -1425,7 +1524,7 @@ def new_collection( num_workers=num_workers, use_threads=use_threads, ) - collection = storm_catalog.new_collection_from_items(collection_id, event_items) + collection = storm_catalog.new_collection_from_items_on_disk(collection_id) else: collection = storm_catalog.add_rank_to_collection(collection_id, top_events) @@ -1453,6 +1552,7 @@ def resume_collection( num_workers: int = None, with_tb: bool = False, create_items: bool = True, + use_threads: bool = False, ): """ Resume a storm collection. @@ -1467,6 +1567,7 @@ def resume_collection( check_every_n_hours (int): The interval in hours to check for storms. num_workers (int, optional): Number of cpu's to use during processing. with_tb (bool): Whether to include traceback in error logs. + use_threads (bool): Whether to use threads instead of processes. """ initialize_logger() storm_catalog = StormCatalog.from_file(catalog) @@ -1492,4 +1593,5 @@ def resume_collection( num_workers=num_workers, with_tb=with_tb, create_new_items=create_items, + use_threads=use_threads, ) diff --git a/stormhub/met/zarr_to_dss.py b/stormhub/met/zarr_to_dss.py index ba2f384..3c1f5d4 100644 --- a/stormhub/met/zarr_to_dss.py +++ b/stormhub/met/zarr_to_dss.py @@ -467,6 +467,7 @@ def noaa_zarr_to_dss( aoi_name: str, storm_start: datetime, variable_duration_map: Dict[NOAADataVariable, int], + output_resolution_km: int, ): """Given a geometry and datetime information about a storm, writes variables of interest from NOAA dataset to DSS.""" # arrange parameters @@ -500,6 +501,6 @@ def noaa_zarr_to_dss( param_name=data_variable.dss_variable_title, param_measurement_type=data_variable.measurement_type, param_measurement_unit=data_variable.measurement_unit, - output_resolution_km=4, + output_resolution_km=output_resolution_km, data_version="AORC", )