Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .github/workflows/documentation-publish.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
name: build-publish-documentation

on:
pull_request:
types: [opened, reopened, ready_for_review]
branches: [main]
workflow_dispatch:

# pull_request:
# types: [opened, reopened, ready_for_review]
# branches: [main]

push:
branches: [main]
Expand Down
6 changes: 2 additions & 4 deletions src/teehr/evaluation/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,8 +911,7 @@ def nwm_operational_points(
),
timeseries_type=timeseries_type,
write_mode=write_mode,
drop_duplicates=drop_duplicates,
drop_overlapping_assimilation_values=drop_overlapping_assimilation_values # noqa
drop_duplicates=drop_duplicates
)

def nwm_operational_grids(
Expand Down Expand Up @@ -1215,6 +1214,5 @@ def nwm_operational_grids(
in_path=Path(self.nwm_cache_dir),
timeseries_type=timeseries_type,
write_mode=write_mode,
drop_duplicates=drop_duplicates,
drop_overlapping_assimilation_values=drop_overlapping_assimilation_values # noqa
drop_duplicates=drop_duplicates
)
8 changes: 7 additions & 1 deletion src/teehr/fetching/nwm/grid_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ def fetch_and_format_nwm_grids(
overwrite_output: bool,
location_id_prefix: Union[str, None],
variable_mapper: Dict[str, Dict[str, str]],
timeseries_type: TimeseriesTypeEnum
timeseries_type: TimeseriesTypeEnum,
drop_overlapping_assimilation_values: bool
):
"""Compute weighted average, grouping by reference time.

Expand Down Expand Up @@ -243,6 +244,11 @@ def fetch_and_format_nwm_grids(
Path(output_parquet_dir), f"{ref_time_str}.parquet"
)
z_hour_df.sort_values([LOCATION_ID, VALUE_TIME], inplace=True)

if drop_overlapping_assimilation_values and "assim" in nwm_configuration_name:
# Set reference_time to NaT for assimilation values
z_hour_df.loc[:, REFERENCE_TIME] = pd.NaT

write_timeseries_parquet_file(
filepath=parquet_filepath,
overwrite_output=overwrite_output,
Expand Down
3 changes: 2 additions & 1 deletion src/teehr/fetching/nwm/nwm_grids.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,5 +382,6 @@ def nwm_grids_to_parquet(
overwrite_output=overwrite_output,
location_id_prefix=location_id_prefix,
variable_mapper=variable_mapper,
timeseries_type=timeseries_type
timeseries_type=timeseries_type,
drop_overlapping_assimilation_values=drop_overlapping_assimilation_values
)
3 changes: 2 additions & 1 deletion src/teehr/fetching/nwm/nwm_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,5 +348,6 @@ def nwm_to_parquet(
overwrite_output,
nwm_version,
variable_mapper,
timeseries_type
timeseries_type,
drop_overlapping_assimilation_values
)
21 changes: 18 additions & 3 deletions src/teehr/fetching/nwm/point_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ def process_chunk_of_files(
overwrite_output: bool,
nwm_version: str,
variable_mapper: Dict[str, Dict[str, str]],
timeseries_type: TimeseriesTypeEnum
timeseries_type: TimeseriesTypeEnum,
drop_overlapping_assimilation_values: bool
):
"""Assemble a table for a chunk of NWM files."""
location_ids = np.array(location_ids).astype(int)
Expand Down Expand Up @@ -156,6 +157,12 @@ def process_chunk_of_files(
end = f"{end_json[1]}T{end_json[3][1:3]}F{end_json[6][1:]}"
filename = f"{start}_{end}.parquet"

if drop_overlapping_assimilation_values and "assim" in configuration:
# Set reference_time to NaT for assimilation values
df_output = output_table.to_pandas()
df_output.loc[:, REFERENCE_TIME] = pd.NaT
output_table = pa.Table.from_pandas(df_output, schema=schema)

write_timeseries_parquet_file(
Path(output_parquet_dir, filename),
overwrite_output,
Expand All @@ -176,7 +183,8 @@ def fetch_and_format_nwm_points(
overwrite_output: bool,
nwm_version: str,
variable_mapper: Dict[str, Dict[str, str]],
timeseries_type: TimeseriesTypeEnum
timeseries_type: TimeseriesTypeEnum,
drop_overlapping_assimilation_values: bool
):
"""Fetch NWM point data and save as parquet files.

Expand Down Expand Up @@ -211,6 +219,12 @@ def fetch_and_format_nwm_points(
they already exist. True = overwrite; False = fail.
nwm_version : str
Specified NWM version.
variable_mapper : Dict[str, Dict[str, str]]
A mapping dictionary for variable names and units.
timeseries_type : TimeseriesTypeEnum
The type of timeseries being processed.
drop_overlapping_assimilation_values : bool
Whether to drop assimilation values that overlap in value_time.
"""
output_parquet_dir = Path(output_parquet_dir)
if not output_parquet_dir.exists():
Expand Down Expand Up @@ -241,5 +255,6 @@ def fetch_and_format_nwm_points(
overwrite_output,
nwm_version,
variable_mapper,
timeseries_type
timeseries_type,
drop_overlapping_assimilation_values
)
12 changes: 1 addition & 11 deletions src/teehr/loading/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ def validate_and_insert_timeseries(
timeseries_type: str,
pattern: str = "**/*.parquet",
write_mode: TableWriteEnum = "append",
drop_duplicates: bool = True,
drop_overlapping_assimilation_values: bool = False
drop_duplicates: bool = True
):
"""Validate and insert primary timeseries data.

Expand All @@ -272,12 +271,6 @@ def validate_and_insert_timeseries(
drop_duplicates : bool, optional (default: True)
Whether to drop duplicates in the dataframe before writing
to the table.
drop_overlapping_assimilation_values: Optional[bool] = True
Whether to drop overlapping assimilation values. Default is True.
If True, values that overlap in value_time are dropped, keeping those with
the most recent reference_time. In this case, all reference_time values
are set to None. If False, overlapping values are kept and reference_time
is retained.
""" # noqa
in_path = Path(in_path)
logger.info(f"Validating and inserting timeseries data from {in_path}")
Expand All @@ -292,9 +285,6 @@ def validate_and_insert_timeseries(
# Read the converted files to Spark DataFrame
df = table._read_files(in_path, pattern)

if drop_overlapping_assimilation_values:
df = df.withColumn("reference_time", lit(None))

# Validate using the _validate() method
validated_df = table._validate(
df=df,
Expand Down