Skip to content

Commit 7a78ae0

Browse files
committed
fix bug with drop overlapping assim flag (#597)
1 parent 797ca00 commit 7a78ae0

File tree

4 files changed

+29
-6
lines changed

4 files changed

+29
-6
lines changed

src/teehr/fetching/nwm/grid_utils.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ def fetch_and_format_nwm_grids(
183183
overwrite_output: bool,
184184
location_id_prefix: Union[str, None],
185185
variable_mapper: Dict[str, Dict[str, str]],
186-
timeseries_type: TimeseriesTypeEnum
186+
timeseries_type: TimeseriesTypeEnum,
187+
drop_overlapping_assimilation_values: bool
187188
):
188189
"""Compute weighted average, grouping by reference time.
189190
@@ -243,6 +244,11 @@ def fetch_and_format_nwm_grids(
243244
Path(output_parquet_dir), f"{ref_time_str}.parquet"
244245
)
245246
z_hour_df.sort_values([LOCATION_ID, VALUE_TIME], inplace=True)
247+
248+
if drop_overlapping_assimilation_values and "assim" in nwm_configuration_name:
249+
# Set reference_time to NaT for assimilation values
250+
z_hour_df.loc[:, REFERENCE_TIME] = pd.NaT
251+
246252
write_timeseries_parquet_file(
247253
filepath=parquet_filepath,
248254
overwrite_output=overwrite_output,

src/teehr/fetching/nwm/nwm_grids.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,5 +382,6 @@ def nwm_grids_to_parquet(
382382
overwrite_output=overwrite_output,
383383
location_id_prefix=location_id_prefix,
384384
variable_mapper=variable_mapper,
385-
timeseries_type=timeseries_type
385+
timeseries_type=timeseries_type,
386+
drop_overlapping_assimilation_values=drop_overlapping_assimilation_values
386387
)

src/teehr/fetching/nwm/nwm_points.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,5 +348,6 @@ def nwm_to_parquet(
348348
overwrite_output,
349349
nwm_version,
350350
variable_mapper,
351-
timeseries_type
351+
timeseries_type,
352+
drop_overlapping_assimilation_values
352353
)

src/teehr/fetching/nwm/point_utils.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ def process_chunk_of_files(
102102
overwrite_output: bool,
103103
nwm_version: str,
104104
variable_mapper: Dict[str, Dict[str, str]],
105-
timeseries_type: TimeseriesTypeEnum
105+
timeseries_type: TimeseriesTypeEnum,
106+
drop_overlapping_assimilation_values: bool
106107
):
107108
"""Assemble a table for a chunk of NWM files."""
108109
location_ids = np.array(location_ids).astype(int)
@@ -156,6 +157,12 @@ def process_chunk_of_files(
156157
end = f"{end_json[1]}T{end_json[3][1:3]}F{end_json[6][1:]}"
157158
filename = f"{start}_{end}.parquet"
158159

160+
if drop_overlapping_assimilation_values and "assim" in configuration:
161+
# Set reference_time to NaT for assimilation values
162+
df_output = output_table.to_pandas()
163+
df_output.loc[:, REFERENCE_TIME] = pd.NaT
164+
output_table = pa.Table.from_pandas(df_output, schema=schema)
165+
159166
write_timeseries_parquet_file(
160167
Path(output_parquet_dir, filename),
161168
overwrite_output,
@@ -176,7 +183,8 @@ def fetch_and_format_nwm_points(
176183
overwrite_output: bool,
177184
nwm_version: str,
178185
variable_mapper: Dict[str, Dict[str, str]],
179-
timeseries_type: TimeseriesTypeEnum
186+
timeseries_type: TimeseriesTypeEnum,
187+
drop_overlapping_assimilation_values: bool
180188
):
181189
"""Fetch NWM point data and save as parquet files.
182190
@@ -211,6 +219,12 @@ def fetch_and_format_nwm_points(
211219
they already exist. True = overwrite; False = fail.
212220
nwm_version : str
213221
Specified NWM version.
222+
variable_mapper : Dict[str, Dict[str, str]]
223+
A mapping dictionary for variable names and units.
224+
timeseries_type : TimeseriesTypeEnum
225+
The type of timeseries being processed.
226+
drop_overlapping_assimilation_values : bool
227+
Whether to drop assimilation values that overlap in value_time.
214228
"""
215229
output_parquet_dir = Path(output_parquet_dir)
216230
if not output_parquet_dir.exists():
@@ -241,5 +255,6 @@ def fetch_and_format_nwm_points(
241255
overwrite_output,
242256
nwm_version,
243257
variable_mapper,
244-
timeseries_type
258+
timeseries_type,
259+
drop_overlapping_assimilation_values
245260
)

0 commit comments

Comments
 (0)