Skip to content

Commit d56cb88

Browse files
authored
fix bug with drop overlapping assim flag (#597)
* fix bug with drop overlapping assim flag * publish docs on merge or manual trigger not PR
1 parent 7a2b8da commit d56cb88

File tree

7 files changed

+37
-24
lines changed

7 files changed

+37
-24
lines changed

.github/workflows/documentation-publish.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
name: build-publish-documentation
22

33
on:
4-
pull_request:
5-
types: [opened, reopened, ready_for_review]
6-
branches: [main]
4+
workflow_dispatch:
5+
6+
# pull_request:
7+
# types: [opened, reopened, ready_for_review]
8+
# branches: [main]
79

810
push:
911
branches: [main]

src/teehr/evaluation/fetch.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -911,8 +911,7 @@ def nwm_operational_points(
911911
),
912912
timeseries_type=timeseries_type,
913913
write_mode=write_mode,
914-
drop_duplicates=drop_duplicates,
915-
drop_overlapping_assimilation_values=drop_overlapping_assimilation_values # noqa
914+
drop_duplicates=drop_duplicates
916915
)
917916

918917
def nwm_operational_grids(
@@ -1215,6 +1214,5 @@ def nwm_operational_grids(
12151214
in_path=Path(self.nwm_cache_dir),
12161215
timeseries_type=timeseries_type,
12171216
write_mode=write_mode,
1218-
drop_duplicates=drop_duplicates,
1219-
drop_overlapping_assimilation_values=drop_overlapping_assimilation_values # noqa
1217+
drop_duplicates=drop_duplicates
12201218
)

src/teehr/fetching/nwm/grid_utils.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ def fetch_and_format_nwm_grids(
183183
overwrite_output: bool,
184184
location_id_prefix: Union[str, None],
185185
variable_mapper: Dict[str, Dict[str, str]],
186-
timeseries_type: TimeseriesTypeEnum
186+
timeseries_type: TimeseriesTypeEnum,
187+
drop_overlapping_assimilation_values: bool
187188
):
188189
"""Compute weighted average, grouping by reference time.
189190
@@ -243,6 +244,11 @@ def fetch_and_format_nwm_grids(
243244
Path(output_parquet_dir), f"{ref_time_str}.parquet"
244245
)
245246
z_hour_df.sort_values([LOCATION_ID, VALUE_TIME], inplace=True)
247+
248+
if drop_overlapping_assimilation_values and "assim" in nwm_configuration_name:
249+
# Set reference_time to NaT for assimilation values
250+
z_hour_df.loc[:, REFERENCE_TIME] = pd.NaT
251+
246252
write_timeseries_parquet_file(
247253
filepath=parquet_filepath,
248254
overwrite_output=overwrite_output,

src/teehr/fetching/nwm/nwm_grids.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,5 +382,6 @@ def nwm_grids_to_parquet(
382382
overwrite_output=overwrite_output,
383383
location_id_prefix=location_id_prefix,
384384
variable_mapper=variable_mapper,
385-
timeseries_type=timeseries_type
385+
timeseries_type=timeseries_type,
386+
drop_overlapping_assimilation_values=drop_overlapping_assimilation_values
386387
)

src/teehr/fetching/nwm/nwm_points.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,5 +348,6 @@ def nwm_to_parquet(
348348
overwrite_output,
349349
nwm_version,
350350
variable_mapper,
351-
timeseries_type
351+
timeseries_type,
352+
drop_overlapping_assimilation_values
352353
)

src/teehr/fetching/nwm/point_utils.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ def process_chunk_of_files(
102102
overwrite_output: bool,
103103
nwm_version: str,
104104
variable_mapper: Dict[str, Dict[str, str]],
105-
timeseries_type: TimeseriesTypeEnum
105+
timeseries_type: TimeseriesTypeEnum,
106+
drop_overlapping_assimilation_values: bool
106107
):
107108
"""Assemble a table for a chunk of NWM files."""
108109
location_ids = np.array(location_ids).astype(int)
@@ -156,6 +157,12 @@ def process_chunk_of_files(
156157
end = f"{end_json[1]}T{end_json[3][1:3]}F{end_json[6][1:]}"
157158
filename = f"{start}_{end}.parquet"
158159

160+
if drop_overlapping_assimilation_values and "assim" in configuration:
161+
# Set reference_time to NaT for assimilation values
162+
df_output = output_table.to_pandas()
163+
df_output.loc[:, REFERENCE_TIME] = pd.NaT
164+
output_table = pa.Table.from_pandas(df_output, schema=schema)
165+
159166
write_timeseries_parquet_file(
160167
Path(output_parquet_dir, filename),
161168
overwrite_output,
@@ -176,7 +183,8 @@ def fetch_and_format_nwm_points(
176183
overwrite_output: bool,
177184
nwm_version: str,
178185
variable_mapper: Dict[str, Dict[str, str]],
179-
timeseries_type: TimeseriesTypeEnum
186+
timeseries_type: TimeseriesTypeEnum,
187+
drop_overlapping_assimilation_values: bool
180188
):
181189
"""Fetch NWM point data and save as parquet files.
182190
@@ -211,6 +219,12 @@ def fetch_and_format_nwm_points(
211219
they already exist. True = overwrite; False = fail.
212220
nwm_version : str
213221
Specified NWM version.
222+
variable_mapper : Dict[str, Dict[str, str]]
223+
A mapping dictionary for variable names and units.
224+
timeseries_type : TimeseriesTypeEnum
225+
The type of timeseries being processed.
226+
drop_overlapping_assimilation_values : bool
227+
Whether to drop assimilation values that overlap in value_time.
214228
"""
215229
output_parquet_dir = Path(output_parquet_dir)
216230
if not output_parquet_dir.exists():
@@ -241,5 +255,6 @@ def fetch_and_format_nwm_points(
241255
overwrite_output,
242256
nwm_version,
243257
variable_mapper,
244-
timeseries_type
258+
timeseries_type,
259+
drop_overlapping_assimilation_values
245260
)

src/teehr/loading/timeseries.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,7 @@ def validate_and_insert_timeseries(
247247
timeseries_type: str,
248248
pattern: str = "**/*.parquet",
249249
write_mode: TableWriteEnum = "append",
250-
drop_duplicates: bool = True,
251-
drop_overlapping_assimilation_values: bool = False
250+
drop_duplicates: bool = True
252251
):
253252
"""Validate and insert primary timeseries data.
254253
@@ -272,12 +271,6 @@ def validate_and_insert_timeseries(
272271
drop_duplicates : bool, optional (default: True)
273272
Whether to drop duplicates in the dataframe before writing
274273
to the table.
275-
drop_overlapping_assimilation_values: Optional[bool] = True
276-
Whether to drop overlapping assimilation values. Default is True.
277-
If True, values that overlap in value_time are dropped, keeping those with
278-
the most recent reference_time. In this case, all reference_time values
279-
are set to None. If False, overlapping values are kept and reference_time
280-
is retained.
281274
""" # noqa
282275
in_path = Path(in_path)
283276
logger.info(f"Validating and inserting timeseries data from {in_path}")
@@ -292,9 +285,6 @@ def validate_and_insert_timeseries(
292285
# Read the converted files to Spark DataFrame
293286
df = table._read_files(in_path, pattern)
294287

295-
if drop_overlapping_assimilation_values:
296-
df = df.withColumn("reference_time", lit(None))
297-
298288
# Validate using the _validate() method
299289
validated_df = table._validate(
300290
df=df,

0 commit comments

Comments
 (0)