diff --git a/pixi.toml b/pixi.toml index a76d90c3..49c77d5e 100644 --- a/pixi.toml +++ b/pixi.toml @@ -66,7 +66,6 @@ quarto = "*" quartodoc = "*" rasterstats = "*" requests = "*" -ribasim = "==2025.4.0" ruff = "*" seaborn = "*" shapely = ">=2" @@ -82,3 +81,4 @@ bokeh_helpers = { path = "src/bokeh_helpers", editable = true } hydamo = { path = "src/hydamo", editable = true } peilbeheerst_model = { path = "src/peilbeheerst_model", editable = true } ribasim_nl = { path = "src/ribasim_nl", editable = true } +ribasim = { git = "https://github.com/Deltares/Ribasim.git", branch = "main", subdirectory = "python/ribasim" } diff --git a/src/peilbeheerst_model/Parametrize/AmstelGooienVecht_parametrize.py b/src/peilbeheerst_model/Parametrize/AmstelGooienVecht_parametrize.py index ace113dd..2fedac93 100644 --- a/src/peilbeheerst_model/Parametrize/AmstelGooienVecht_parametrize.py +++ b/src/peilbeheerst_model/Parametrize/AmstelGooienVecht_parametrize.py @@ -113,6 +113,7 @@ with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=FutureWarning) ribasim_model = Model(filepath=ribasim_work_dir_model_toml) + ribasim_model.set_crs("EPSG:28992") # check basin area ribasim_param.validate_basin_area(ribasim_model) diff --git a/src/peilbeheerst_model/Parametrize/Delfland_parametrize.py b/src/peilbeheerst_model/Parametrize/Delfland_parametrize.py index b2722f7e..a6538daa 100644 --- a/src/peilbeheerst_model/Parametrize/Delfland_parametrize.py +++ b/src/peilbeheerst_model/Parametrize/Delfland_parametrize.py @@ -11,6 +11,7 @@ import peilbeheerst_model.ribasim_parametrization as ribasim_param from peilbeheerst_model.add_storage_basins import AddStorageBasins from peilbeheerst_model.assign_authorities import AssignAuthorities +from peilbeheerst_model.assign_flushing import Flushing from peilbeheerst_model.assign_parametrization import AssignMetaData from peilbeheerst_model.controle_output import Control from peilbeheerst_model.ribasim_feedback_processor import RibasimFeedbackProcessor @@ -114,6 +115,7 @@ with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=FutureWarning) ribasim_model = Model(filepath=ribasim_work_dir_model_toml) + ribasim_model.set_crs("EPSG:28992") # check basin area ribasim_param.validate_basin_area(ribasim_model) @@ -338,6 +340,10 @@ ) ribasim_model = assign.assign_authorities() +# Add flushing data +flush = Flushing(ribasim_model) +flush.add_flushing() + # set numerical settings # write model output ribasim_model.use_validation = True diff --git a/src/peilbeheerst_model/Parametrize/HollandsNoorderkwartier_parametrize.py b/src/peilbeheerst_model/Parametrize/HollandsNoorderkwartier_parametrize.py index ddff0442..777f9bf7 100644 --- a/src/peilbeheerst_model/Parametrize/HollandsNoorderkwartier_parametrize.py +++ b/src/peilbeheerst_model/Parametrize/HollandsNoorderkwartier_parametrize.py @@ -11,6 +11,7 @@ import peilbeheerst_model.ribasim_parametrization as ribasim_param from peilbeheerst_model.add_storage_basins import AddStorageBasins from peilbeheerst_model.assign_authorities import AssignAuthorities +from peilbeheerst_model.assign_flushing import Flushing from peilbeheerst_model.assign_parametrization import AssignMetaData from peilbeheerst_model.controle_output import Control from peilbeheerst_model.ribasim_feedback_processor import RibasimFeedbackProcessor @@ -111,6 +112,7 @@ with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=FutureWarning) ribasim_model = Model(filepath=ribasim_work_dir_model_toml) + ribasim_model.set_crs("EPSG:28992") # check basin area ribasim_param.validate_basin_area(ribasim_model) @@ -369,6 +371,10 @@ ) ribasim_model = assign.assign_authorities() +# Add flushing data +flush = Flushing(ribasim_model) +flush.add_flushing() + # set numerical settings # write model output ribasim_model.use_validation = True diff --git a/src/peilbeheerst_model/Parametrize/HollandseDelta_parametrize.py b/src/peilbeheerst_model/Parametrize/HollandseDelta_parametrize.py index 370ca8f0..43fb4505 100644 --- a/src/peilbeheerst_model/Parametrize/HollandseDelta_parametrize.py +++ b/src/peilbeheerst_model/Parametrize/HollandseDelta_parametrize.py @@ -11,6 +11,7 @@ import peilbeheerst_model.ribasim_parametrization as ribasim_param from peilbeheerst_model.add_storage_basins import AddStorageBasins from peilbeheerst_model.assign_authorities import AssignAuthorities +from peilbeheerst_model.assign_flushing import Flushing from peilbeheerst_model.assign_parametrization import AssignMetaData from peilbeheerst_model.controle_output import Control from peilbeheerst_model.ribasim_feedback_processor import RibasimFeedbackProcessor @@ -112,6 +113,7 @@ with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=FutureWarning) ribasim_model = Model(filepath=ribasim_work_dir_model_toml) + ribasim_model.set_crs("EPSG:28992") # model specific tweaks # merge small basins into larger basins for numerical stability @@ -738,6 +740,10 @@ ribasim_model.basin.time.df.precipitation /= meteo_factor # decrease meteo ribasim_model.basin.time.df.potential_evaporation /= meteo_factor # decrease meteo +# Add flushing data +flush = Flushing(ribasim_model) +flush.add_flushing() + # set numerical settings ribasim_model.use_validation = True ribasim_model.starttime = starttime diff --git a/src/peilbeheerst_model/Parametrize/Rijnland_parametrize.py b/src/peilbeheerst_model/Parametrize/Rijnland_parametrize.py index 99df7d9c..c8c0692c 100644 --- a/src/peilbeheerst_model/Parametrize/Rijnland_parametrize.py +++ b/src/peilbeheerst_model/Parametrize/Rijnland_parametrize.py @@ -12,6 +12,7 @@ from peilbeheerst_model import supply from peilbeheerst_model.add_storage_basins import AddStorageBasins from peilbeheerst_model.assign_authorities import AssignAuthorities +from peilbeheerst_model.assign_flushing import Flushing from peilbeheerst_model.assign_parametrization import AssignMetaData from peilbeheerst_model.controle_output import Control from peilbeheerst_model.ribasim_feedback_processor import RibasimFeedbackProcessor @@ -110,6 +111,7 @@ with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=FutureWarning) ribasim_model = Model(filepath=ribasim_work_dir_model_toml) + ribasim_model.set_crs("EPSG:28992") inlaat_pump = [] @@ -302,6 +304,10 @@ ribasim_model = assign.assign_authorities() +# Add flushing data +flush = Flushing(ribasim_model) +flush.add_flushing() + # set numerical settings # write model output ribasim_model.use_validation = True diff --git a/src/peilbeheerst_model/Parametrize/Rivierenland_parametrize.py b/src/peilbeheerst_model/Parametrize/Rivierenland_parametrize.py index d539d381..18e53a53 100644 --- a/src/peilbeheerst_model/Parametrize/Rivierenland_parametrize.py +++ b/src/peilbeheerst_model/Parametrize/Rivierenland_parametrize.py @@ -12,6 +12,7 @@ import peilbeheerst_model.ribasim_parametrization as ribasim_param from peilbeheerst_model.add_storage_basins import AddStorageBasins from peilbeheerst_model.assign_authorities import AssignAuthorities +from peilbeheerst_model.assign_flushing import Flushing from peilbeheerst_model.assign_parametrization import AssignMetaData from peilbeheerst_model.controle_output import Control from peilbeheerst_model.ribasim_feedback_processor import RibasimFeedbackProcessor @@ -110,6 +111,7 @@ with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=FutureWarning) ribasim_model = Model(filepath=ribasim_work_dir_model_toml) + ribasim_model.set_crs("EPSG:28992") # check basin area ribasim_param.validate_basin_area(ribasim_model) @@ -467,6 +469,10 @@ ) ribasim_model = assign.assign_authorities() +# Add flushing data +flush = Flushing(ribasim_model) +flush.add_flushing() + # set numerical settings # write model output ribasim_model.use_validation = True diff --git a/src/peilbeheerst_model/Parametrize/Scheldestromen_parametrize.py b/src/peilbeheerst_model/Parametrize/Scheldestromen_parametrize.py index 68dda6bf..d954c9ff 100644 --- a/src/peilbeheerst_model/Parametrize/Scheldestromen_parametrize.py +++ b/src/peilbeheerst_model/Parametrize/Scheldestromen_parametrize.py @@ -11,6 +11,7 @@ import peilbeheerst_model.ribasim_parametrization as ribasim_param from peilbeheerst_model.add_storage_basins import AddStorageBasins from peilbeheerst_model.assign_authorities import AssignAuthorities +from peilbeheerst_model.assign_flushing import Flushing from peilbeheerst_model.assign_parametrization import AssignMetaData from peilbeheerst_model.controle_output import Control from peilbeheerst_model.ribasim_feedback_processor import RibasimFeedbackProcessor @@ -109,6 +110,7 @@ with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=FutureWarning) ribasim_model = Model(filepath=ribasim_work_dir_model_toml) + ribasim_model.set_crs("EPSG:28992") # check basin area ribasim_param.validate_basin_area(ribasim_model) @@ -325,6 +327,9 @@ ) ribasim_model = assign.assign_authorities() +flush = Flushing(ribasim_model) +flush.add_flushing() + # set numerical settings # write model output ribasim_model.use_validation = True diff --git a/src/peilbeheerst_model/Parametrize/SchielandendeKrimpenerwaard.py b/src/peilbeheerst_model/Parametrize/SchielandendeKrimpenerwaard.py index f9b5d323..02e2648b 100644 --- a/src/peilbeheerst_model/Parametrize/SchielandendeKrimpenerwaard.py +++ b/src/peilbeheerst_model/Parametrize/SchielandendeKrimpenerwaard.py @@ -12,6 +12,7 @@ from peilbeheerst_model import supply from peilbeheerst_model.add_storage_basins import AddStorageBasins from peilbeheerst_model.assign_authorities import AssignAuthorities +from peilbeheerst_model.assign_flushing import Flushing from peilbeheerst_model.assign_parametrization import AssignMetaData from peilbeheerst_model.controle_output import Control from peilbeheerst_model.ribasim_feedback_processor import RibasimFeedbackProcessor @@ -113,6 +114,7 @@ with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=FutureWarning) ribasim_model = Model(filepath=ribasim_work_dir_model_toml) + ribasim_model.set_crs("EPSG:28992") # model specific tweaks # change unknown streefpeilen to a default streefpeil @@ -450,6 +452,10 @@ ) ribasim_model = assign.assign_authorities() +# Add flushing data +flush = Flushing(ribasim_model) +flush.add_flushing() + # set numerical settings # write model output ribasim_model.use_validation = True diff --git a/src/peilbeheerst_model/Parametrize/WetterskipFryslan_parametrize.py b/src/peilbeheerst_model/Parametrize/WetterskipFryslan_parametrize.py index 7d64d66b..8a577ddf 100644 --- a/src/peilbeheerst_model/Parametrize/WetterskipFryslan_parametrize.py +++ b/src/peilbeheerst_model/Parametrize/WetterskipFryslan_parametrize.py @@ -12,6 +12,7 @@ from peilbeheerst_model import supply from peilbeheerst_model.add_storage_basins import AddStorageBasins from peilbeheerst_model.assign_authorities import AssignAuthorities +from peilbeheerst_model.assign_flushing import Flushing from peilbeheerst_model.assign_parametrization import AssignMetaData from peilbeheerst_model.controle_output import Control from peilbeheerst_model.ribasim_feedback_processor import RibasimFeedbackProcessor @@ -111,6 +112,7 @@ with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=FutureWarning) ribasim_model = Model(filepath=ribasim_work_dir_model_toml) + ribasim_model.set_crs("EPSG:28992") # Uitlaat toevoegen at Ter Schelling level_boundary_node = ribasim_model.level_boundary.add( @@ -580,6 +582,10 @@ ) ribasim_model = assign.assign_authorities() +# Add flushing data +flush = Flushing(ribasim_model) +flush.add_flushing() + # TEMP CHANGES! VERY IMPORTANT TO REMOVE THIS AFTERWARDS! #@TODO ################################## reduce_computation_time = False if reduce_computation_time: diff --git a/src/peilbeheerst_model/Parametrize/Zuiderzeeland_parametrize.py b/src/peilbeheerst_model/Parametrize/Zuiderzeeland_parametrize.py index 87f229d4..f6e899f3 100644 --- a/src/peilbeheerst_model/Parametrize/Zuiderzeeland_parametrize.py +++ b/src/peilbeheerst_model/Parametrize/Zuiderzeeland_parametrize.py @@ -111,6 +111,7 @@ with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=FutureWarning) ribasim_model = Model(filepath=ribasim_work_dir_model_toml) + ribasim_model.set_crs("EPSG:28992") # merge the smallest basins together ribasim_model.merge_basins(node_id=30, to_node_id=29) # 4363 m2 diff --git a/src/peilbeheerst_model/peilbeheerst_model/assign_flushing.py b/src/peilbeheerst_model/peilbeheerst_model/assign_flushing.py new file mode 100644 index 00000000..4775ada0 --- /dev/null +++ b/src/peilbeheerst_model/peilbeheerst_model/assign_flushing.py @@ -0,0 +1,583 @@ +from pathlib import Path + +import geopandas as gpd +import numpy as np +import pandas as pd +import shapely +from networkx import DiGraph, simple_cycles +from ribasim import Model, Node +from ribasim.geometry.link import NodeData +from ribasim.nodes import flow_demand +from shapely.geometry import MultiPolygon, Point, Polygon + +from ribasim_nl import CloudStorage +from ribasim_nl import Model as ModelNL +from ribasim_nl.case_conversions import pascal_to_snake_case + + +class Flushing: + def __init__( + self, + model: ModelNL | Model | Path | str, + lhm_flushing_path: Path | str = "Basisgegevens/LHM/lsw_flushing.gpkg", + flushing_layer: str = "lsw_flushing_lhm43", + flushing_id: str = "LSWNR", + flushing_col: str = "doorsp_mmj", + flushing_geom_offset: float = 50.0, + significant_overlap: float = 0.5, + convert_to_m3s: float = 1 / (1000 * 365 * 24 * 3600), + dissolve_by_val: bool = True, + debug_output: bool = False, + ): + """Initialize the Flushing class for adding flushing information to a Ribasim model. + + Parameters + ---------- + model : ModelNL | Model | Path | str + The Ribasim model to add flushing to, or a path to the model + lhm_flushing_path : Path | str, optional + Path to the flushing geopackage, by default "Basisgegevens/LHM/lsw_flushing.gpkg" + flushing_layer : str, optional + Name of the layer in the geopackage, by default "lsw_flushing_lhm43" + flushing_id : str, optional + Column name with the flushing IDs, by default "LSWNR" + flushing_col : str, optional + Column name with the flushing values, by default "doorsp_mmj" + flushing_geom_offset: float, optional + Horizontal offset used for placing the FlowDemand node (relative to the basin) + significant_overlap : float, optional + Threshold for considering area overlap significant, by default 0.5 + convert_to_m3s : float, optional + Conversion factor to convert flushing_col to m3/s, by default 1 / (1000 * 365 * 24 * 3600) + dissolve_by_val: bool, optional + Dissolve geospatially by the integer value of 'flushing_col', by default True + debug_output: bool, optional + Print debug node - basin choices, by default True + """ + self.cloud = CloudStorage() + self.model = model + self.lhm_flushing_path = self.cloud.joinpath(lhm_flushing_path) + self.flushing_layer = flushing_layer + self.flushing_id = flushing_id + self.flushing_col = flushing_col + self.flushing_geom_offset = flushing_geom_offset + self.significant_overlap = significant_overlap + self.convert_to_m3s = convert_to_m3s + self.dissolve_by_val = dissolve_by_val + self.debug_output = debug_output + + def add_flushing( + self, + ) -> ModelNL | Model: + """Add flushing information to the Ribasim model. + + Returns + ------- + ModelNL | Model + The updated Ribasim model with flushing information added + """ + # Synchronize flushing data and model files + model, df_flushing = self._sync_files() + + # Reduce flushing data to non-null and matching data + df_flushing_subset = df_flushing[~pd.isna(df_flushing[self.flushing_col])].copy() + df_flushing_subset = df_flushing_subset[[self.flushing_id, self.flushing_col, "geometry"]].copy() + + if self.dissolve_by_val: + df_flushing_subset = self._dissolve_flushing_data(df_flushing_subset) + + # Get handles to relevant tables + all_nodes = model.node_table().df[["node_type", "geometry"]].copy() + df_outlet_static = model.outlet.static.df.set_index("node_id").copy() + df_pump_static = model.pump.static.df.set_index("node_id").copy() + + # Get an extended basin table with no 'bergend' basins + df_basin = model.basin.node.df[["meta_categorie"]].join( + model.basin.area.df.set_index("node_id")[["meta_aanvoer", "geometry"]] + ) + df_basin = df_basin[df_basin.meta_categorie != "bergend"] + df_basin = gpd.GeoDataFrame(df_basin, crs=model.basin.area.df.crs) + + # Reset the internal graph to ensure we have the most up-to-date version + model.reset_graph + + # Find matching basins for each flushing geometry + for flushing_row in df_flushing_subset.itertuples(): + flush_id = getattr(flushing_row, self.flushing_id) + flush_val = getattr(flushing_row, self.flushing_col) + + basin_matches = df_basin.iloc[df_basin.sindex.query(flushing_row.geometry, predicate="intersects")].copy() + basin_matches["area_match"] = basin_matches.intersection(flushing_row.geometry).area + basin_matches["rel_area_match"] = basin_matches.area_match / basin_matches.area + basin_matches["rel_area_flush"] = basin_matches.area_match / flushing_row.geometry.area + basin_matches = basin_matches[basin_matches.rel_area_match >= self.significant_overlap] + basin_matches = basin_matches.reset_index(drop=False) + + if len(basin_matches) == 0: + # No (sufficiently) matching basins, continue + continue + + # Find all upstream paths from the matching basins, contained by + # the contour of the basins geometry and flushing geometry + upstream_paths = [] + geom = shapely.union_all([flushing_row.geometry.buffer(0.1)] + basin_matches.geometry.buffer(0.1).tolist()) + for match in basin_matches.itertuples(): + # Find the upstream path(s) for this basin + upstream_paths += self._all_upstream_paths(model.graph, match.node_id, all_nodes, limit_geom=geom) + + # Make a DataFrame of the allowed nodes (node type) present in the + # paths found. + dfu = self._find_upstream_nodes(model, upstream_paths, all_nodes, df_outlet_static, df_pump_static) + + # No allowed upstream nodes found at all, log warning and continue + if len(dfu) == 0: + basin_nids = basin_matches.node_id.tolist() + print(f"WARNING: Polygon {flush_id=} with basin node_id's={basin_nids} has no valid upstream nodes") + continue + + # Select the least amount of required flushing upstream nodes + dfu = self._find_upstream_candidates(dfu) + + # The result should contain at least one node + if not dfu.optimal_choice.any(): + print(f"WARNING: Polygon {flush_id=} has upstream nodes, but no valid optimal choice") + continue + + # Determine the contribution of each matching basin + basins_cov = dfu[dfu.optimal_choice].basin.unique().tolist() + df_flush = basin_matches[basin_matches.node_id.isin(basins_cov)].copy() + df_flush["rel_contrib"] = df_flush.area_match / df_flush.area_match.sum() + + # Check if all basins are connected + basins_mis = basin_matches.node_id[~basin_matches.node_id.isin(basins_cov)].tolist() + if len(basins_mis) > 0: + max_cover = basin_matches.rel_area_flush.sum() * 100 + current_cover = df_flush.rel_area_flush.sum() * 100 + print( + f"WARNING: Polygon {flush_id=} missing upstream nodes for basins: {basins_mis}. Covered basins: {basins_cov}, {current_cover=:.1f}%, {max_cover=:.1f}%" + ) + + if self.debug_output: + debug_str = [] + for nid, group in dfu[dfu.optimal_choice].groupby("node_id"): + debug_str.append(f"node {nid} connects basins {group.basin.tolist()}") + print(f"Polygon {flush_id=}, {', '.join(debug_str)}") + + for (target_nid, target_type), group in dfu[dfu.optimal_choice].groupby(["node_id", "node_type"]): + # Determine the flushing value and convert to m3/s + group_basins = group.basin.unique().tolist() + contrib = df_flush[df_flush.node_id.isin(group_basins)].rel_contrib.sum() + demand = contrib * flushing_row.geometry.area * flush_val + demand = demand * self.convert_to_m3s + + # Select the target node + target_node = getattr(model, pascal_to_snake_case(target_type))[target_nid] + + # Create and link the flow_demand + metadata = { + f"meta_{self.flushing_id}": flush_id, + "meta_basin_nid": ",".join(map(str, group_basins)), + } + model = self.add_flushing_demand(model, target_node, demand, metadata=metadata) + + return model + + def add_flushing_demand( + self, + model: ModelNL | Model, + target_node: NodeData, + demand: float, + metadata: dict[str, str] | None, + ): + """Add a flushing demand node to the model and connect it to a target node. + + Parameters + ---------- + model : ModelNL | Model + The model to add the flushing demand to + target_node : NodeData + The node to connect the flushing demand to + demand : float + The constant demand value to apply at all timesteps + metadata : dict[str, str] + meta data as column - value, defaults to None + + Returns + ------- + None + + """ + df_static = getattr(getattr(model, pascal_to_snake_case(target_node.node_type)), "static").df + max_flow_rate = df_static.set_index("node_id").loc[target_node.node_id, ["flow_rate", "max_flow_rate"]].max() + if max_flow_rate < demand: + print(f"WARNING: {target_node} has {max_flow_rate=:.2e}m3/s, setting a greater {demand=:.2e}m3/s") + + if metadata is None: + metadata = {} + + uniq_times = model.basin.time.df.time.unique() + new_flow_demand = model.flow_demand.add( + Node( + model.node_table().df.index.max() + 1, + Point( + target_node.geometry.x + self.flushing_geom_offset, + target_node.geometry.y, + ), + **metadata, + ), + [ + # @TODO hardcoded demand_priority=1 for now + flow_demand.Time( + time=uniq_times, + demand_priority=1, + demand=len(uniq_times) * [demand], + ), + ], + ) + model.link.add(new_flow_demand, target_node) + + return model + + def _find_upstream_nodes( + self, + model: ModelNL | Model, + paths: list[list[int]], + all_nodes: gpd.GeoDataFrame, + df_outlet_static: pd.DataFrame, + df_pump_static: pd.DataFrame, + ) -> pd.DataFrame: + """Find upstream nodes that can be used for flushing. + + Parameters + ---------- + paths : list[list[int]] + List of paths containing node IDs + all_nodes : gpd.GeoDataFrame + GeoDataFrame containing all nodes and their types + df_outlet_static : pd.DataFrame + DataFrame with static outlet information + df_pump_static : pd.DataFrame + DataFrame with static pump information + + Returns + ------- + pd.DataFrame + DataFrame containing upstream nodes and their properties + """ + # Find unique nodes over all paths + uniq_nodes = np.unique(np.concatenate(paths)) + + # Ignore nodes in cycles + cycles = list(simple_cycles(model.graph.subgraph(uniq_nodes))) + if len(cycles) == 0: + ignore_nodes = np.array([]) + else: + ignore_nodes = np.unique(np.concatenate(cycles)) + uniq_nodes = uniq_nodes[~np.isin(uniq_nodes, ignore_nodes)] + + # Some areas have a lot of paths with duplicate nodes. Improve + # performance by caching the response for unique nodes + df_control_links = model.link.df[model.link.df.link_type == "control"] + df_control_links = df_control_links.set_index("to_node_id") + node_lookup = {} + for nid in uniq_nodes: + nid_type = all_nodes.node_type.at[nid] + if nid_type in ["Outlet", "Pump"]: + if nid in df_control_links.index: + # This node has incoming control links, check if a + # FlowDemand node is present already + incoming_nids = df_control_links.loc[[nid], "from_node_id"].tolist() + incoming_types = all_nodes.loc[incoming_nids, "node_type"].tolist() + allowed = "FlowDemand" not in incoming_types + else: + # No incoming control link, so this node is allowed + allowed = True + + # Skip nodes that are not allowed + if not allowed: + continue + + if all_nodes.node_type.at[nid] == "Outlet": + bool_aanvoer = bool(df_outlet_static.at[nid, "meta_aanvoer"]) + elif all_nodes.node_type.at[nid] == "Pump": + bool_aanvoer = bool(df_pump_static.at[nid, "meta_func_aanvoer"]) + node_lookup[nid] = (nid_type, bool_aanvoer) + + dfu = { + "basin": [], + "path_id": [], + "upstream_index": [], + "node_id": [], + "node_type": [], + "aanvoer": [], + } + is_added = {} + for pid, path in enumerate(paths): + basin_nid = path[0] + + for i, nid in enumerate(path): + # We don't need duplicate upstream node_id's originating from + # the same basin or non-applicable nodes + if nid not in node_lookup or (basin_nid, nid) in is_added: + continue + + nid_type, bool_aanvoer = node_lookup[nid] + dfu["basin"].append(basin_nid) + dfu["path_id"].append(pid) + dfu["upstream_index"].append(i) + dfu["node_id"].append(nid) + dfu["node_type"].append(nid_type) + dfu["aanvoer"].append(bool_aanvoer) + is_added[(basin_nid, nid)] = True + dfu = pd.DataFrame(dfu) + + return dfu + + def _find_upstream_candidates(self, dfu: pd.DataFrame) -> pd.DataFrame: + """Find the best upstream candidates for flushing. + + Parameters + ---------- + dfu : pd.DataFrame + DataFrame with upstream nodes and their properties + + Returns + ------- + pd.DataFrame + DataFrame with upstream nodes and their properties + """ + dfu["optimal_choice"] = False + all_basins = set(dfu.basin.tolist()) + covered_basins = set() + best_sources = [] + for df in [dfu[dfu.aanvoer], dfu]: + # Find a minimal coverage set + coversets = self._exact_cover_minimum(df) + if len(coversets) == 1 and len(coversets[0]) == 0: + # No optimal set + continue + elif len(coversets) == 1: + sources = coversets[0] + elif len(coversets) > 1: + # Multiple similar choices, chose the most upstream one + # based on the sum of upstream indices + weights = [] + for coverset in coversets: + weights.append(df[df.node_id.isin(coverset)].upstream_index.sum()) + sources = coversets[np.argmax(weights)] + + # Update the covered basins if this iteration provides better coverage + basins = set(df[df.node_id.isin(sources)].basin.tolist()) + if len(basins) > len(covered_basins): + covered_basins = basins.copy() + best_sources = sources.copy() + + # Stop in case the coverage is complete + if covered_basins == all_basins: + break + + # Update the optimal choice + if len(best_sources) > 0: + dfu.loc[dfu.node_id.isin(best_sources), "optimal_choice"] = True + + return dfu + + def _exact_cover_minimum(self, df: pd.DataFrame) -> list[list[int]]: + """Find approximate minimal set of nodes that cover basins using a greedy approach. + + Uses a modified greedy algorithm optimized for large datasets. + + Parameters + ---------- + df : pd.DataFrame + DataFrame with node_id, basin and upstream_index columns + + Returns + ------- + list[list[int]] + List containing single solution as list of node IDs + """ + # Pre-compute node information for efficient access + node_info = {} + for node_id, group in df.groupby("node_id"): + node_info[node_id] = {"basins": set(group.basin), "upstream_index": group.upstream_index.max()} + + # Track basins that still need coverage + all_basins = set(df.basin.unique()) + uncovered = all_basins.copy() + solution = [] + available_nodes = set(node_info.keys()) + + while uncovered and available_nodes: + # Score each node based on new coverage and position + scores = [] + for node_id in available_nodes: + info = node_info[node_id] + new_coverage = info["basins"] & uncovered + if new_coverage: # Only consider nodes that add coverage + scores.append( + ( + len(new_coverage), # Primary: number of new basins covered + info["upstream_index"], # Secondary: upstream position + node_id, # For stable sorting + ) + ) + + # If no nodes provide new coverage, stop + if not scores: + break + + # Select node with best coverage and upstream position + best_coverage, best_upstream, best_node = max(scores) + + # Update tracking + uncovered -= node_info[best_node]["basins"] + solution.append(best_node) + available_nodes.remove(best_node) + + return [sorted(solution)] if solution else [[]] + + def _all_upstream_paths( + self, + graph: DiGraph, + start_node: int, + all_nodes: gpd.GeoDataFrame, + limit_geom: MultiPolygon | Polygon | None = None, + ) -> list[list[int]]: + """Find all upstream paths from a starting node. + + Parameters + ---------- + graph : DiGraph + NetworkX directed graph representing the network + start_node : int + Node ID to start the search from + all_nodes : gpd.GeoDataFrame + GeoDataFrame containing all nodes and their geometries + limit_geom : MultiPolygon | Polygon | None, optional + Geometry to limit the search area, by default None + + Returns + ------- + list[list[int]] + List of paths, where each path is a list of node IDs + """ + # Initialize the return variable + end_paths = [] + + # Precompute nodes that intersect with limit_geom + if limit_geom is not None: + valid_indices = all_nodes.sindex.query(limit_geom, predicate="intersects") + valid_nodes = set(all_nodes.index[valid_indices]) + else: + valid_nodes = set(all_nodes.index) + + # Pre-build adjacency sets for faster lookups during DFS + # Performs a set intersection operation with valid_nodes + predecessors = {node: set(graph.predecessors(node)) & valid_nodes for node in valid_nodes} + + # Recursively fill the paths with a depth-first search + self._dfs(graph, [start_node], end_paths, valid_nodes, predecessors) + + return end_paths + + def _dfs( + self, + graph: DiGraph, + path: list[int], + end_paths: list[list[int]], + valid_nodes: set[int], + predecessors: dict[int, set[int]], + ): + """Perform depth-first search to find upstream paths. + + Parameters + ---------- + graph : DiGraph + NetworkX directed graph representing the network + path : list[int] + Current path being explored + end_paths : list[list[int]] + List to store complete paths + valid_nodes : set[int] + Set of node IDs that are valid for the search + predecessors : dict[int, set[int]] + Pre-computed adjacency sets for faster lookups + """ + last_node = path[-1] + + # Get predecessors that are not already in the path and are in valid_nodes + unvisited_predecessors = predecessors[last_node] - set(path) + + # Stop looking in case of a dead end or a cycle + if not unvisited_predecessors: + end_paths.append(path) + return + + # Recurse in predecessors + for predecessor in sorted(unvisited_predecessors): + self._dfs(graph, path + [predecessor], end_paths, valid_nodes, predecessors) + + def _dissolve_flushing_data(self, df_flushing: pd.DataFrame) -> pd.DataFrame: + # Round flushing_col to nearest integer value + df = df_flushing.copy() + df[self.flushing_col] = df[self.flushing_col].round().astype(int) + + # First step: dissolve by flushing_col and id if these are equal + df = df.dissolve(by=[self.flushing_id, self.flushing_col]) + df = df.reset_index() + + # Second step: iteratively merge overlapping geometries with the + # same flushing_col value + new_rows = [] + for _, group in df.groupby(self.flushing_col): + group["geometry"] = group.buffer(0.1) + visited = [] + for cur_idx, row in group.iterrows(): + if cur_idx in visited: + continue + subgroup = group[~group.index.isin(visited)].copy() + idxs, midxs = [], [cur_idx] + while len(midxs) > len(idxs): + idxs = midxs + midxs = subgroup.sindex.query(subgroup.loc[idxs].union_all(), predicate="intersects") + midxs = subgroup.index[midxs].tolist() + if len(idxs) > 0: + new_row = row.copy() + new_row[self.flushing_id] = ",".join(map(str, subgroup.loc[idxs, self.flushing_id].tolist())) + new_row["geometry"] = subgroup.loc[idxs, "geometry"].union_all() + new_rows.append(new_row) + visited += idxs + df = pd.concat(new_rows, axis=1).T + + return df + + def _sync_files( + self, + ) -> tuple[ModelNL | Model, gpd.GeoDataFrame]: + """Synchronize and load required files. + + Returns + ------- + tuple[ModelNL | Model, gpd.GeoDataFrame] + Tuple containing: + - The loaded Ribasim model + - GeoDataFrame with flushing data + """ + is_model = isinstance(self.model, ModelNL) or isinstance(self.model, Model) + + # Synchronize flushing data and model files + filepaths = [self.lhm_flushing_path] + if not is_model: + filepaths.append(Path(self.model)) + self.cloud.synchronize(filepaths=filepaths) + + # Read the ribasim model + model = self.model + if not is_model: + model = ModelNL.read(self.model) + + # Open the flushing data + df_flushing = gpd.read_file(self.lhm_flushing_path, layer=self.flushing_layer) + + return model, df_flushing diff --git a/src/peilbeheerst_model/peilbeheerst_model/assign_parametrization.py b/src/peilbeheerst_model/peilbeheerst_model/assign_parametrization.py index cc4560a4..9b062764 100644 --- a/src/peilbeheerst_model/peilbeheerst_model/assign_parametrization.py +++ b/src/peilbeheerst_model/peilbeheerst_model/assign_parametrization.py @@ -197,6 +197,10 @@ def add_meta_to_basins( # Add the matching areas to the ribasim model for row in self.model.basin.area.df.itertuples(): + if hasattr(row, "meta_node_id") and row.node_id != row.meta_node_id: + # Skip "bergend" + continue + # Find overlapping area(s) idxs = df_area.sindex.query(row.geometry, predicate="intersects") dfa = df_area.iloc[idxs].copy() @@ -210,10 +214,10 @@ def add_meta_to_basins( dfa = dfa.sort_values(["i_area", "t_area"], ascending=False) if len(dfa) == 0: - print(f" - Warning: Found no matching basin area for {row.node_id=}") + print(f" - Warning: Found no matching area for basin #{row.node_id}") continue elif len(dfa) > 1: - print(f" - Warning: Multiple overlapping areas for {row.node_id=}, using the largest overlap") + print(f" - Warning: Multiple overlapping areas for basin #{row.node_id}, using the largest overlap") # Assign metadata matching_row = dfa.iloc[0]