Skip to content

Commit e24d032

Browse files
Add Nudging decay (including reading LastObs files) (#310)
* added last obs df including discharge and ids * lastobs nc file folder * formatting and cleanup * added both discharge and model discharge * added prediction delta timesteps * da decay with exp decay incorporated, unblackened for readability * added check between obs file results and our fvd output * removed pdbs and set to run on lastobs * added verbose statements * restructuring of lastobs df to simplify process * generalized last timestep index call to automatically determine from various input sizes * github requested fixes to if statements and cleanup * updated variable * working ncar da decay prototype need to bring in real lastobs data inside equation * getting da values in order through classic da assim technique * pushing ids and values to mc reach * fixed gage id matching, cython is broken cant compile correctly, need to print values to identify if da is properly working * saving changes, trying to fix old DA function * restructing da timeslice file read to use datetime, not generalized before * added last obs df including discharge and ids * lastobs nc file folder * formatting and cleanup * added both discharge and model discharge * added prediction delta timesteps * da decay with exp decay incorporated, unblackened for readability * added check between obs file results and our fvd output * removed pdbs and set to run on lastobs * added verbose statements * restructuring of lastobs df to simplify process * generalized last timestep index call to automatically determine from various input sizes * github requested fixes to if statements and cleanup * updated variable * generalized da naming conventional and date timeframe with improved interpolation * removed extra comments * remove dependence on not-yet-created flowveldepth * name "last_obs_file" * include data_assimilation_parameters to yaml * include empty dict for data_assimilation_parameters in yaml * black * added paths to shared drive locations and blackened files * quick merge changes * files working * sync for merge * sync for merge * sync merge to upstream * move last obs function next to usgs_da function * add TODOs * add lastobs to other parallel modes * move last_obs above usgs_df * cimports and cdefs for last_obs * fixed broken usgs_df names were changed to folder in some areas and filter in others * moved da functions into single wrapper in nhd_network_utilities also harmonized inputs a bit. * add da yaml file (DATA NOT YET ADDED) * drop non-5-minute entries from time_slices Also harmonizing inputs for merge. * add function for finding the tailwater for a given segment * add masks * use pandas date_range * cleanup * add comment showing possible handling of extended DA * Revert "add comment showing possible handling of extended DA" This reverts commit 0455466. * temporarily disable last_obs * Update example yaml with inputs that work * temporarily disable last_obs * update comment * adjust DA for perfect match * removed filter list * use efficient shape call for usgs_positions_list length * add gage_maxtime and pseudocode for lastobs * Identified major hard-code issue in structured- and structured-obj * update lastobs comments and pseudocode * update da test yaml file with additional gage options * use "reindex" to fill/eliminate columns for usgs_df * functions in place for decay but last obs file is behaving incorrectly and changing parity check even when all uses are turned off * added decay timestep count * Use new fields in flowveldepth to simplify initial condition handling To facilitate this, added a constant qvd_ts_w (flowveldepth timestep width) to define the standard column width. * add two additional segments for parity checking * reconfigure reach splitting to consider gages * update diffusive call signature * yaml updates for test * black Co-authored-by: James Halgren <james.halgren@noaa.gov>
1 parent fa93df6 commit e24d032

File tree

8 files changed

+430
-73
lines changed

8 files changed

+430
-73
lines changed

src/python_framework_v02/troute/nhd_io.py

Lines changed: 112 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import numpy as np
99
from toolz import compose
1010
import dask.array as da
11+
import sys
12+
import math
13+
from datetime import *
1114

1215

1316
def read_netcdf(geo_file_path):
@@ -186,7 +189,6 @@ def get_ql_from_wrf_hydro_mf(qlat_files, index_col="feature_id", value_col="q_la
186189
2018-01-01 13:00:00 4186117 41.233807 -75.413895 0.006496
187190
```
188191
"""
189-
filter_list = None
190192

191193
with xr.open_mfdataset(
192194
qlat_files,
@@ -339,6 +341,92 @@ def preprocess_time_station_index(xd):
339341
)
340342

341343

344+
def build_last_obs_df(lastobsfile, routelink, wrf_last_obs_flag):
345+
# open routelink_file and extract discharges
346+
347+
ds1 = xr.open_dataset(routelink)
348+
df = ds1.to_dataframe()
349+
df2 = df.loc[df["gages"] != b" "]
350+
df2["gages"] = df2["gages"].astype("int")
351+
df2 = df2[["gages", "to"]]
352+
df2 = df2.reset_index()
353+
df2 = df2.set_index("gages")
354+
355+
with xr.open_dataset(lastobsfile) as ds:
356+
df_model_discharges = ds["model_discharge"].to_dataframe()
357+
df_discharges = ds["discharge"].to_dataframe()
358+
last_ts = df_model_discharges.index.get_level_values("timeInd")[-1]
359+
model_discharge_last_ts = df_model_discharges[
360+
df_model_discharges.index.get_level_values("timeInd") == last_ts
361+
]
362+
discharge_last_ts = df_discharges[
363+
df_discharges.index.get_level_values("timeInd") == last_ts
364+
]
365+
df1 = ds["stationId"].to_dataframe()
366+
df1 = df1.astype(int)
367+
model_discharge_last_ts = model_discharge_last_ts.join(df1)
368+
model_discharge_last_ts = model_discharge_last_ts.join(discharge_last_ts)
369+
model_discharge_last_ts = model_discharge_last_ts.loc[
370+
model_discharge_last_ts["model_discharge"] != -9999.0
371+
]
372+
model_discharge_last_ts = model_discharge_last_ts.reset_index().set_index(
373+
"stationId"
374+
)
375+
model_discharge_last_ts = model_discharge_last_ts.drop(
376+
["stationIdInd", "timeInd"], axis=1
377+
)
378+
model_discharge_last_ts["discharge"] = model_discharge_last_ts[
379+
"discharge"
380+
].to_frame()
381+
# If predict from last_obs file use last obs file results
382+
# if last_obs_file == "error-based":
383+
# elif last_obs_file == "obs-based": # the wrf-hydro default
384+
if wrf_last_obs_flag:
385+
model_discharge_last_ts["last_nudge"] = (
386+
model_discharge_last_ts["discharge"]
387+
- model_discharge_last_ts["model_discharge"]
388+
)
389+
final_df = df2.join(model_discharge_last_ts["discharge"])
390+
final_df = final_df.reset_index()
391+
final_df = final_df.set_index("to")
392+
final_df = final_df.drop(["feature_id", "gages"], axis=1)
393+
final_df = final_df.dropna()
394+
395+
# Else predict from the model outputs from t-route if index doesn't match interrupt computation as the results won't be valid
396+
# else:
397+
# fvd_df = fvd_df
398+
# if len(model_discharge_last_ts.index) == len(fvd_df.index):
399+
# model_discharge_last_ts["last_nudge"] = (
400+
# model_discharge_last_ts["discharge"] - fvd_df[fvd_df.columns[0]]
401+
# )
402+
# else:
403+
# print("THE NUDGING FILE IDS DO NOT MATCH THE FLOWVELDEPTH IDS")
404+
# sys.exit()
405+
# # Predictions created with continuously decreasing deltas until near 0 difference
406+
# a = 120
407+
# prediction_df = pd.DataFrame(index=model_discharge_last_ts.index)
408+
409+
# for time in range(0, 720, 5):
410+
# weight = math.exp(time / -a)
411+
# delta = pd.DataFrame(
412+
# model_discharge_last_ts["last_nudge"] / weight)
413+
414+
# if time == 0:
415+
# prediction_df[str(time)] = model_discharge_last_ts["last_nudge"]
416+
# weight_diff = prediction_df[str(time)] - prediction_df[str(time)]
417+
# else:
418+
# if weight > 0.1:
419+
# prediction_df[str(time)] = (
420+
# delta["last_nudge"] + model_discharge_last_ts["model_discharge"]
421+
# )
422+
# elif weight < -0.1:
423+
# prediction_df[str(time)] = (
424+
# delta["last_nudge"] + model_discharge_last_ts["model_discharge"]
425+
# )
426+
# prediction_df["0"] = model_discharge_last_ts["model_discharge"]
427+
return final_df
428+
429+
342430
def get_usgs_from_time_slices_csv(routelink_subset_file, usgs_csv):
343431

344432
df2 = pd.read_csv(usgs_csv, index_col=0)
@@ -421,32 +509,36 @@ def get_usgs_from_time_slices_folder(
421509
usgs_df = usgs_df.drop(["gages", "ascendingIndex", "to"], axis=1)
422510
columns_list = usgs_df.columns
423511

424-
for i in range(0, (len(columns_list) * 3) - 12, 12):
425-
original_string = usgs_df.columns[i]
426-
original_string_shortened = original_string[:-5]
427-
temp_name1 = original_string_shortened + str("05:00")
428-
temp_name2 = original_string_shortened + str("10:00")
429-
temp_name3 = original_string_shortened + str("20:00")
430-
temp_name4 = original_string_shortened + str("25:00")
431-
temp_name5 = original_string_shortened + str("35:00")
432-
temp_name6 = original_string_shortened + str("40:00")
433-
temp_name7 = original_string_shortened + str("50:00")
434-
temp_name8 = original_string_shortened + str("55:00")
435-
usgs_df.insert(i + 1, temp_name1, np.nan)
436-
usgs_df.insert(i + 2, temp_name2, np.nan)
437-
usgs_df.insert(i + 4, temp_name3, np.nan)
438-
usgs_df.insert(i + 5, temp_name4, np.nan)
439-
usgs_df.insert(i + 7, temp_name5, np.nan)
440-
usgs_df.insert(i + 8, temp_name6, np.nan)
441-
usgs_df.insert(i + 10, temp_name7, np.nan)
442-
usgs_df.insert(i + 11, temp_name8, np.nan)
512+
original_string_first = usgs_df.columns[0]
513+
date_time_str = original_string_first[:10] + " " + original_string_first[11:]
514+
date_time_obj_start = datetime.strptime(date_time_str, "%Y-%m-%d %H:%M:%S")
515+
516+
original_string_last = usgs_df.columns[-1]
517+
date_time_str = original_string_last[:10] + " " + original_string_last[11:]
518+
date_time_obj_end = datetime.strptime(date_time_str, "%Y-%m-%d %H:%M:%S")
519+
520+
dates = []
521+
# for j in pd.date_range(date_time_obj_start, date_time_obj_end + timedelta(1), freq="5min"):
522+
for j in pd.date_range(date_time_obj_start, date_time_obj_end, freq="5min"):
523+
dates.append(j.strftime("%Y-%m-%d_%H:%M:00"))
524+
525+
"""
526+
# dates_to_drop = ~usgs_df.columns.isin(dates)
527+
OR
528+
# dates_to_drop = usgs_df.columns.difference(dates)
529+
# dates_to_add = pd.Index(dates).difference(usgs_df.columns)
530+
"""
531+
532+
usgs_df = usgs_df.reindex(columns=dates)
443533

444534
usgs_df = usgs_df.interpolate(method="linear", axis=1)
535+
usgs_df = usgs_df.interpolate(method="linear", axis=1, limit_direction="backward")
445536
usgs_df.drop(usgs_df[usgs_df.iloc[:, 0] == -999999.000000].index, inplace=True)
446537

447538
return usgs_df
448539

449540

541+
# TODO: Move channel restart above usgs to keep order with execution script
450542
def get_channel_restart_from_csv(
451543
channel_initial_states_file,
452544
index_col=0,

src/python_framework_v02/troute/nhd_network.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,31 @@ def reverse_network(N):
8888
return rg
8989

9090

91+
def find_tw_for_node(reaches_bytw, node):
92+
# TODO: extend this function (or write a new one) to handle a list of nodes.
93+
# Such functionality might be useful for finding networks corresponding to a
94+
# list of gages, for instance.
95+
"""
96+
reaches_bytw is a dictionary of lists of the form
97+
98+
tw 1:
99+
[ [ seg1, seg2, seg3, ... segn ], # reach 1
100+
[ sega, segb, segc, ... segz ], # reach 2
101+
.
102+
.
103+
.
104+
[ ... ] ] reach n
105+
tw 2:
106+
etc.
107+
"""
108+
for tw, rs in reaches_bytw.items():
109+
for r in rs:
110+
if node in r:
111+
return tw
112+
113+
return None # Node not in reach set.
114+
115+
91116
def junctions(N):
92117
c = Counter(chain.from_iterable(N.values()))
93118
return {k for k, v in c.items() if v > 1}

src/python_framework_v02/troute/nhd_network_utilities_v02.py

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -419,10 +419,31 @@ def build_connections(supernetwork_parameters, dt):
419419

420420
param_df["dt"] = dt
421421
param_df = param_df.rename(columns=reverse_dict(cols))
422+
423+
wbodies = {}
424+
if "waterbody" in cols:
425+
wbodies = build_waterbodies(
426+
param_df[["waterbody"]], supernetwork_parameters, "waterbody"
427+
)
428+
param_df = param_df.drop("waterbody", axis=1)
429+
430+
gages = {}
431+
if "gages" in cols:
432+
gages = build_gages(param_df[["gages"]])
433+
param_df = param_df.drop("gages", axis=1)
434+
422435
param_df = param_df.astype("float32")
423436

424437
# datasub = data[['dt', 'bw', 'tw', 'twcc', 'dx', 'n', 'ncc', 'cs', 's0']]
425-
return connections, param_df
438+
return connections, param_df, wbodies, gages
439+
440+
441+
def build_gages(segment_gage_df,):
442+
gage_list = list(map(bytes.strip, segment_gage_df.gages.values))
443+
gage_mask = list(map(bytes.isdigit, gage_list))
444+
gages = segment_gage_df.loc[gage_mask, "gages"].to_dict()
445+
446+
return gages
426447

427448

428449
def build_waterbodies(
@@ -569,20 +590,42 @@ def build_data_assimilation(data_assimilation_parameters):
569590
data_assimilation_csv = data_assimilation_parameters.get(
570591
"data_assimilation_csv", None
571592
)
572-
data_assimilation_filter = data_assimilation_parameters.get(
573-
"data_assimilation_filter", None
593+
data_assimilation_folder = data_assimilation_parameters.get(
594+
"data_assimilation_timeslices_folder", None
574595
)
596+
# TODO: Fix the Logic here according to the following.
597+
598+
# If there are any observations for data assimilation, there
599+
# needs to be a complete set in the first time set or else
600+
# there must be a "LastObs". If there is a complete set in
601+
# the first time step, the LastObs is optional. If there are
602+
# no observations for assimilation, there can be a LastObs
603+
# with an empty usgs dataframe.
604+
605+
last_obs_file = data_assimilation_parameters.get("wrf_hydro_last_obs_file", None)
606+
last_obs_type = data_assimilation_parameters.get("wrf_last_obs_type", "error-based")
607+
last_obs_crosswalk_file = data_assimilation_parameters.get(
608+
"wrf_hydro_da_channel_ID_crosswalk_file", None
609+
)
610+
611+
last_obs_df = pd.DataFrame()
612+
613+
if last_obs_file:
614+
last_obs_df = nhd_io.build_last_obs_df(
615+
last_obs_file, last_obs_crosswalk_file, last_obs_type,
616+
)
617+
575618
if data_assimilation_csv:
576619
usgs_df = build_data_assimilation_csv(data_assimilation_parameters)
577-
elif data_assimilation_filter:
620+
elif data_assimilation_folder:
578621
usgs_df = build_data_assimilation_folder(data_assimilation_parameters)
579-
return usgs_df
622+
return usgs_df, last_obs_df
580623

581624

582625
def build_data_assimilation_csv(data_assimilation_parameters):
583626

584627
usgs_df = nhd_io.get_usgs_from_time_slices_csv(
585-
data_assimilation_parameters["data_assimilation_parameters_file"],
628+
data_assimilation_parameters["wrf_hydro_da_channel_ID_crosswalk_file"],
586629
data_assimilation_parameters["data_assimilation_csv"],
587630
)
588631

@@ -597,7 +640,7 @@ def build_data_assimilation_folder(data_assimilation_parameters):
597640
).resolve()
598641

599642
usgs_df = nhd_io.get_usgs_from_time_slices_folder(
600-
data_assimilation_parameters["data_assimilation_parameters_file"],
643+
data_assimilation_parameters["wrf_hydro_da_channel_ID_crosswalk_file"],
601644
usgs_timeslices_folder,
602645
data_assimilation_parameters["data_assimilation_filter"],
603646
)

0 commit comments

Comments
 (0)