diff --git a/README.md b/README.md index 7488db56..81c38113 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,8 @@ Streamfall.jl

A graph-based streamflow modelling system written in Julialang.

-[![DOI](https://zenodo.org/badge/345341654.svg)](https://zenodo.org/badge/latestdoi/345341654) +[![DOI](https://zenodo.org/badge/345341654.svg)](https://zenodo.org/badge/latestdoi/345341654) [![](https://img.shields.io/badge/docs-dev-blue.svg)](https://connectedsystems.github.io/Streamfall.jl/dev) + Streamfall leverages the Julia language and ecosystem to provide: @@ -23,10 +24,25 @@ The IHACRES rainfall-runoff model was previously implemented with [ihacres_nim]( [Graphs](https://github.com/JuliaGraphs/Graphs.jl) and [MetaGraphs](https://github.com/JuliaGraphs/MetaGraphs.jl) are used underneath for network traversal/analysis. -Development version of the documentation can be found here: [![](https://img.shields.io/badge/docs-dev-blue.svg)](https://connectedsystems.github.io/Streamfall.jl/dev). - > [NOTE] Streamfall is currently in its early stages and under active development. Although it is fairly usable for small networks and assessing/analyzing single sub-catchments, things may change drastically and unexpectedly. +## Installation + +Streamfall is now registered! The latest release version can be installed with: + +```julia +] add Streamfall +``` + +or the latest development version from GitHub with `dev` () or `add`: + +```julia +# Editable install +] dev Streamfall#main + +] add https://github.com/ConnectedSystems/Streamfall.jl#main +``` + ## Development Local development should follow the usual process of git cloning the repository. diff --git a/docs/make.jl b/docs/make.jl index 05994e1b..ba25bc87 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -27,6 +27,9 @@ makedocs(sitename="Streamfall Documentation", "examples/simple_showcase.md", "examples/model_comparison.md", "examples/simple_multisystem.md", + ], + "Ensemble modeling" => [ + "examples/weighted_ensembles.md" ] ], "API" => [ diff --git a/docs/src/assets/ensemble_bias_corrected.png b/docs/src/assets/ensemble_bias_corrected.png new file mode 100644 index 00000000..b447e794 Binary files /dev/null and b/docs/src/assets/ensemble_bias_corrected.png differ diff --git a/docs/src/assets/ensemble_model_comparison_quickplots.png b/docs/src/assets/ensemble_model_comparison_quickplots.png new file mode 100644 index 00000000..bfcc2192 Binary files /dev/null and b/docs/src/assets/ensemble_model_comparison_quickplots.png differ diff --git a/docs/src/assets/ensemble_xsection.png b/docs/src/assets/ensemble_xsection.png new file mode 100644 index 00000000..a66de81c Binary files /dev/null and b/docs/src/assets/ensemble_xsection.png differ diff --git a/docs/src/examples/weighted_ensembles.md b/docs/src/examples/weighted_ensembles.md new file mode 100644 index 00000000..a5bb5c87 --- /dev/null +++ b/docs/src/examples/weighted_ensembles.md @@ -0,0 +1,122 @@ +Streamfall supports ensemble modeling using individual instances of rainfall-runoff models +as the ensemble constituents. The default ensemble is a normalized weighted sum. + +The usual setup process is shown here, detailed in previous sections of this guide. + +```julia +using YAML, DataFrames, CSV, Plots +using Statistics +using Streamfall + + +climate = Climate("../test/data/campaspe/climate/climate.csv", "_rain", "_evap") + +# Historic flows and dam level data +obs_data = CSV.read( + "../test/data/cotter/climate/CAMELS-AUS_410730.csv", + DataFrame; + comment="#" +) + +Qo = extract_flow(obs_data, "410730") +climate = extract_climate(obs_data) +``` + +Specific node representations can then be created, each representing the same sub-catchment. +A stream network is not considered for this demonstration. + +```julia + +# Create one instance each of IHACRES_CMD and GR4J +ihacres_node = create_node(IHACRESBilinearNode, "410730", 129.2) +gr4j_node = create_node(GR4JNode, "410730", 129.2) + +# Create a weighted ensemble with equal weights +# The default behavior is to combine component predictions with a normalized weighted sum. +ensemble = create_node(WeightedEnsembleNode, [ihacres_node, gr4j_node], [0.5, 0.5]) +``` + +In previous examples, the `calibrate!()` method was used to calibrate nodes to available +data. For WeightedEnsembleNodes, `calibrate!()` optimizes just the weights to allow for +pre-calibrated instances to be provided for use in an ensemble. This may be useful if +it is desired for the component models to be calibrated according to different criteria and +objective functions. + +A separate `calibrate_instances!()` method is available to calibrate individual component +models (and the weights used). + +```julia +# The default behaviour for WeightedEnsembleNodes are to calibrate just the weights. +# res, opt = calibrate!(ensemble, climate, Qo, (obs, sim) -> 1.0 .- Streamfall.NmKGE(obs, sim); MaxTime=180) + +# Here, the component models are uncalibrated so we calibrate these and the weights. +res, opt = calibrate_instances!(ensemble, climate, Qo, (obs, sim) -> 1.0 .- Streamfall.NmKGE(obs, sim); MaxTime=180) +``` + +Running the calibrated models directly, the amount of improvement to model performance can +be assessed. Here, a 1-year burn in period is used. + + +```julia +run_node!(ihacres_node, climate) +run_node!(gr4j_node, climate) +run_node!(ensemble, climate) + +burn_in = 365 +burn_dates = timesteps(climate)[burn_in:end] +burn_obs = Qo[burn_in:end, "410730"] + +ihacres_qp = quickplot(burn_obs, ihacres_node.outflow[burn_in:end], climate, "IHACRES", true) +gr4j_qp = quickplot(burn_obs, gr4j_node.outflow[burn_in:end], climate, "GR4J", true) +ensemble_qp = quickplot(burn_obs, ensemble.outflow[burn_in:end], climate, "Weighted Ensemble", true) + +plot(ihacres_qp, gr4j_qp, ensemble_qp; layout=(3, 1), size=(800, 1200)) +``` + +Below a small improvement to model performance based on the modified Kling-Gupta Efficiency +score can be seen. Comparing the Q-Q plots, IHACRES had a tendency to underestimate low +flows and high flows, whereas GR4J had a tendency to overestimate low flows. + +The weighted ensemble combined characteristics of both, with a tendency to overestimate +low flows as with GR4J. + +![](../assets/ensemble_model_comparison_quickplots.png) + +Comparing the temporal cross section: + +```julia +ihacres_xs = temporal_cross_section(burn_dates, burn_obs, ihacres_node.outflow[burn_in:end]; title="IHACRES") +gr4j_xs = temporal_cross_section(burn_dates, burn_obs, gr4j_node.outflow[burn_in:end]; title="GR4J") +ensemble_xs = temporal_cross_section(burn_dates, burn_obs, ensemble.outflow[burn_in:end]; title="Weighted Ensemble (IHACRES-GR4J)") + +plot(ihacres_xs, gr4j_xs, ensemble_xs; layout=(3, 1), size=(800, 1200)) +``` + +A reduction in the median error can be seen with extreme errors reduced somewhat (according to +the 95% CI). + +![](../assets/ensemble_xsection.png) + +The median error can then be applied to modelled streamflow (on a month-day basis) as a +form of bias correction. + +```julia +q_star = Streamfall.apply_temporal_correction(ensemble, climate, Qo[:, "410730"]) + +bc_ensemble_qp = quickplot(burn_obs, q_star[burn_in:end], climate, "Bias Corrected Ensemble") + +bias_corrected_xs = temporal_cross_section( + burn_dates, + burn_obs, + q_star[burn_in:end]; + title="Bias Corrected Ensemble" +) + +plot(bc_ensemble_qp, bias_corrected_xs; layout=(2,1), size=(800, 800)) +``` + +While the median error has increased, its variance has reduced significantly. At the same +time, performance at the 75 and 95% CI remain steady relative to the original weighted +ensemble results. + +![](../assets/ensemble_bias_corrected.png) diff --git a/src/Analysis/uncertainty.jl b/src/Analysis/uncertainty.jl index 511935b5..da9e4bea 100644 --- a/src/Analysis/uncertainty.jl +++ b/src/Analysis/uncertainty.jl @@ -55,7 +55,7 @@ function cross_section(tr::TemporalCrossSection) boxframe[i, 8:9] .= mean(vi), std(vi) end - cols = [:abs_min, :lower_95, :lower_75, :median, + cols = [:abs_min, :lower_95, :lower_75, :median, :upper_75, :upper_95, :abs_max, :mean, :std] x_section = DataFrame(boxframe, cols) @@ -68,9 +68,9 @@ end """ - offsets(tr::TemporalCrossSection, period::Function=monthday) + offsets(tr::TemporalCrossSection) -Determine (median) offsets, assuming mean error-based temporal cross-sectional analysis +Determine (median) offsets, assuming mean error-based temporal cross-sectional analysis was performed. Only supports daily time series. """ function offsets(tr::TemporalCrossSection) @@ -84,11 +84,11 @@ function offsets(tr::TemporalCrossSection) mds = monthday.(dates) for (idx, (d, md)) in enumerate(zip(doy, mds)) if md == (2, 29) || d == 366 - offset_vals[idx] = offset[d-1] * -1.0 + offset_vals[idx] = -offset[d-1] continue end - offset_vals[idx] = offset[d] * -1.0 + offset_vals[idx] = -offset[d] end return offset_vals diff --git a/src/Nodes/EnsembleNode.jl b/src/Nodes/EnsembleNode.jl deleted file mode 100644 index a23c9fdb..00000000 --- a/src/Nodes/EnsembleNode.jl +++ /dev/null @@ -1,101 +0,0 @@ -using Parameters -using ModelParameters -using Statistics - - -abstract type EnsembleNode <: NetworkNode end - - -Base.@kwdef mutable struct BaseEnsemble{N<:NetworkNode, P, A<:Real} <: EnsembleNode - name::String - area::A - - instances::Array{N} = [] - weights::Array{P} = [ - Param(1.0; bounds=[0.0,1.0]) - ] - - # Default to weighted sum - comb_method::Function = (X, weights) -> sum([x * w for (x, w) in zip(X, weights)]) - - outflow::Array{A} = [] - - obj_func::Function = obj_func -end - - -function param_info(node::BaseEnsemble; kwargs...)::Tuple - values = Float64[w.val for w in node.weights] - bounds = [w.bounds for w in node.weights] - param_names = ["weights_n$(i)" for i in 1:length(bounds)] - - return param_names, values, bounds -end - - -function BaseEnsemble(nodes::Array{<:NetworkNode}; weights::Array{Float64}, - bounds::Union{Nothing, Array{Tuple}}=nothing, - comb_method::Union{Nothing, Function}=nothing)::BaseEnsemble - if isnothing(bounds) - num_nodes = length(nodes) - @assert(length(weights) == num_nodes, "Number of nodes do not match provided number of weights") - - bounds = repeat([(0.0, 1.0)], num_nodes, 1) - else - @assert(length(bounds) == length(weights), "Number of bounds do not match provided number of weights") - end - - p_weights = [Param(w; bounds=b) for (w, b) in zip(weights, bounds)] - - n1 = nodes[1] - if !isnothing(comb_method) - tmp = BaseEnsemble{NetworkNode, Param, Float64}(; name=n1.name, area=n1.area, instances=nodes, weights=p_weights, comb_method) - else - tmp = BaseEnsemble{NetworkNode, Param, Float64}(; name=n1.name, area=n1.area, instances=nodes, weights=p_weights) - end - - return tmp -end - - -function run_node!(ensemble::BaseEnsemble, climate::Climate; inflow=nothing, extraction=nothing, exchange=nothing) - for inst in ensemble.instances - run_node!(inst, climate; inflow=inflow, extraction=extraction, exchange=exchange) - end - - ensemble.outflow = ensemble.comb_method([inst.outflow for inst in ensemble.instances], ensemble.weights) -end - - -function run_timestep!(node::BaseEnsemble, rain, et, ts; inflow=0.0, extraction=0.0, exchange=0.0) - for inst in node.instances - run_timestep!(inst, rain, et, ts; inflow=inflow, extraction=extraction, exchange=exchange) - end - - Qt = node.comb_method([inst.outflow[ts] for inst in node.instances], node.weights) - - append!(node.outflow, Qt) -end - - -function reset!(ensemble::BaseEnsemble) - for inst in ensemble.instances - reset!(inst) - end - - ensemble.outflow = [] -end - - -# Functions to support calibration of weights -function update_params!(node::BaseEnsemble, weights...) - for (i, w) in enumerate(weights) - node.weights[i] = Param(w, bounds=node.weights[i].bounds) - end -end - - -# Calibrate each individual instance? -# function calibrate!(ensemble::BaseEnsemble, climate::Climate) -# ensemble.weights -# end diff --git a/src/Nodes/Ensembles/EnsembleNode.jl b/src/Nodes/Ensembles/EnsembleNode.jl new file mode 100644 index 00000000..df133bf3 --- /dev/null +++ b/src/Nodes/Ensembles/EnsembleNode.jl @@ -0,0 +1,10 @@ +using Parameters +using ModelParameters +using Statistics + + +abstract type EnsembleNode <: NetworkNode end + +include("WeightedEnsembleNode.jl") +# include("StateEnsembleNode.jl") + diff --git a/src/Nodes/StateEnsembleNode.jl b/src/Nodes/Ensembles/StateEnsembleNode.jl similarity index 100% rename from src/Nodes/StateEnsembleNode.jl rename to src/Nodes/Ensembles/StateEnsembleNode.jl diff --git a/src/Nodes/Ensembles/WeightedEnsembleNode.jl b/src/Nodes/Ensembles/WeightedEnsembleNode.jl new file mode 100644 index 00000000..eed17728 --- /dev/null +++ b/src/Nodes/Ensembles/WeightedEnsembleNode.jl @@ -0,0 +1,214 @@ +Base.@kwdef mutable struct WeightedEnsembleNode{N<:NetworkNode, P, A<:Real} <: EnsembleNode + name::String + area::A + + instances::Array{N} = NetworkNode[] + weights::Array{P} = [ + Param(1.0; bounds=[0.0,1.0]) + ] + + # Default to normalized weighted sum + comb_method::Function = (X, weights) -> sum([x * (w / sum(weights)) for (x, w) in zip(X, weights)]) + + outflow::Array{A} = [] + + obj_func::Function = obj_func +end + +function create_node( + node::Type{<:WeightedEnsembleNode}, + nodes::Vector{<:NetworkNode}, + weights::Vector{Float64}, + kwargs... +) + return WeightedEnsembleNode(nodes; weights=weights, kwargs...) +end + +function WeightedEnsembleNode(nodes::Vector{<:NetworkNode}; weights::Vector{Float64}, + bounds::Union{Nothing, Vector{Tuple}}=nothing, + comb_method::Union{Nothing, Function}=nothing)::WeightedEnsembleNode + if isnothing(bounds) + num_nodes = length(nodes) + @assert(length(weights) == num_nodes, "Number of nodes do not match provided number of weights") + + bounds = repeat([(0.0, 1.0)], num_nodes, 1) + else + @assert(length(bounds) == length(weights), "Number of bounds do not match provided number of weights") + end + + p_weights = [Param(w; bounds=b) for (w, b) in zip(weights, bounds)] + + n1 = nodes[1] + tmp = WeightedEnsembleNode{NetworkNode, Param, Float64}(; + name=n1.name, + area=n1.area, + instances=nodes, + weights=p_weights + ) + + if !isnothing(comb_method) + tmp.comb_method = comb_method + end + + return tmp +end + +function prep_state!(node::WeightedEnsembleNode, timesteps::Int64)::Nothing + node.outflow = fill(0.0, timesteps) + + for n in node.instances + prep_state!(n, timesteps) + end + + return nothing +end + + +function param_info(node::WeightedEnsembleNode; kwargs...)::Tuple + values = Float64[w.val for w in node.weights] + bounds = [w.bounds for w in node.weights] + param_names = ["weights_n$(i)" for i in 1:length(bounds)] + + return param_names, values, bounds +end + +function run_node!(ensemble::WeightedEnsembleNode, climate::Climate; inflow=nothing, extraction=nothing, exchange=nothing) + for inst in ensemble.instances + run_node!(inst, climate; inflow=inflow, extraction=extraction, exchange=exchange) + end + + ensemble.outflow = ensemble.comb_method([inst.outflow for inst in ensemble.instances], ensemble.weights) +end + + +function run_timestep!(node::WeightedEnsembleNode, rain, et, ts; inflow=0.0, extraction=0.0, exchange=0.0) + for inst in node.instances + run_timestep!(inst, rain, et, ts; inflow=inflow, extraction=extraction, exchange=exchange) + end + + node.outflow[ts] = node.comb_method([inst.outflow[ts] for inst in node.instances], node.weights) +end + + +function reset!(ensemble::WeightedEnsembleNode) + for inst in ensemble.instances + reset!(inst) + end + + ensemble.outflow = [] +end + + +# Functions to support calibration of weights +function update_params!(node::WeightedEnsembleNode, weights...) + for (i, w) in enumerate(weights) + node.weights[i] = Param(w, bounds=node.weights[i].bounds) + end +end + +""" + calibrate_instances!(ensemble::WeightedEnsembleNode, climate::Climate, calib_data::Union{AbstractArray,DataFrame}, metric::Union{F,AbstractDict{String,F}}; kwargs...) where {F} + +Calibrate individual model instances and then the ensemble weights. + +# Arguments +- `ensemble`: WeightedEnsembleNode containing multiple model instances +- `climate`: Climate data for simulation +- `calib_data`: Calibration data, either as an array or DataFrame with node names as columns +- `metric`: Optimization metric function or Dict mapping node names to metrics +- `kwargs`: Additional arguments passed to BlackBoxOptim + +# Returns +- Tuple of (optimization_result, optimization_setup) from weights calibration +""" +function calibrate_instances!( + ensemble::WeightedEnsembleNode, + climate::Climate, + calib_data::DataFrame, + metric::Union{F,AbstractDict{String,F}}; + kwargs... +) where {F} + + # Calibrate individual instances first + for node in ensemble.instances + calibrate!(node, climate, calib_data, metric; kwargs...) + end + + # Then optimize weights + return calibrate!(ensemble, climate, calib_data, metric; kwargs...) +end + +""" + calibrate!(ensemble::WeightedEnsembleNode, climate::Climate, calib_data::Union{AbstractArray,DataFrame}, metric::Union{F,AbstractDict{String,F}}; kwargs...) where {F} + +Calibrate individual the ensemble weights assuming the component models are pre-calibrated. + +# Arguments +- `ensemble`: WeightedEnsembleNode containing multiple model instances +- `climate`: Climate data for simulation +- `calib_data`: Calibration data, either as an array or DataFrame with node names as columns +- `metric`: Optimization metric function or Dict mapping node names to metrics +- `kwargs`: Additional arguments passed to BlackBoxOptim + +# Returns +- Tuple of (optimization_result, optimization_setup) from weights calibration +""" +function calibrate!( + ensemble::WeightedEnsembleNode, + climate::Climate, + calib_data::DataFrame, + metric::Union{AbstractDict{String, C}, C}; + kwargs... +) where {C<:Function} + return invoke( + calibrate!, + Tuple{NetworkNode, Climate, DataFrame, typeof(metric)}, + ensemble, + climate, + calib_data, + metric; + kwargs... + ) +end + +""" + apply_temporal_correction( + ensemble::WeightedEnsembleNode, + climate::Climate, + obs::Vector{T} + ) where {T<:Real} + +Correct for model bias using median error. +""" +function apply_temporal_correction( + ensemble::WeightedEnsembleNode, + climate::Climate, + obs::Vector{T} +) where {T<:Real} + dates = timesteps(climate) + tcs = TemporalCrossSection(dates, obs, ensemble.outflow) + + return max.(ensemble.outflow .+ Streamfall.Analysis.offsets(tcs), 0.0) +end + +""" + apply_bias_correction( + ensemble::WeightedEnsembleNode, + climate::Climate, + obs::Vector{T}; + period=monthday + ) where {T<:Real} + +Apply bias correction using the median error for a given period (monthday) +""" +function apply_bias_correction( + ensemble::WeightedEnsembleNode, + climate::Climate, + obs::Vector{T}; + period=monthday +) where {T<:Real} + dates = timesteps(climate) + tcs = TemporalCrossSection(dates, obs, ensemble.outflow) + + return ensemble.outflow .+ (-tcs.ts) +end diff --git a/src/Nodes/IHACRES/IHACRESNode.jl b/src/Nodes/IHACRES/IHACRESNode.jl index 700e2231..3c816df5 100644 --- a/src/Nodes/IHACRES/IHACRESNode.jl +++ b/src/Nodes/IHACRES/IHACRESNode.jl @@ -49,7 +49,7 @@ Base.@kwdef mutable struct IHACRESBilinearNode{P, A<:AbstractFloat} <: IHACRESNo end -function prep_state!(node::IHACRESNode, timesteps::Int64)::Nothing +function prep_state!(node::T, timesteps::Int64)::Nothing where {T<:IHACRESNode} resize!(node.storage, timesteps+1) node.storage[2:end] .= 0.0 diff --git a/src/Streamfall.jl b/src/Streamfall.jl index 8399169a..e1a562fc 100644 --- a/src/Streamfall.jl +++ b/src/Streamfall.jl @@ -22,7 +22,6 @@ end const IHACRES = joinpath(MODPATH, "../deps", "usr", "lib", "ihacres$(libext)") - include("Network.jl") include("Nodes/Node.jl") include("Climate.jl") @@ -34,7 +33,7 @@ include("Nodes/GR4J/GR4JNode.jl") include("Nodes/HyMod/HyModNode.jl") include("Nodes/SYMHYD/SYMHYDNode.jl") include("Nodes/DamNode.jl") -include("Nodes/EnsembleNode.jl") +include("Nodes/Ensembles/EnsembleNode.jl") function timestep_value(ts::Int, gauge_id::String, col_partial::String, dataset::DataFrame)::Float64 @@ -297,7 +296,7 @@ include("plotting.jl") export NetworkNode, GenericNode, GenericDirectNode export IHACRES, IHACRESNode, IHACRESBilinearNode, ExpuhNode, DamNode, Climate export create_node, GR4JNode, HyModNode, SimpleHyModNode, SYMHYDNode -export EnsembleNode, BaseEnsemble +export EnsembleNode, WeightedEnsembleNode # Network export find_inlets_and_outlets, inlets, outlets @@ -305,7 +304,7 @@ export climate_values, node_names, get_node, get_node_id, get_prop, set_prop! export param_info, update_params!, sim_length, reset!, prep_state! export run_catchment!, run_basin!, run_node!, run_node_with_temp! export run_timestep! -export calibrate! +export calibrate!, calibrate_instances!, calibrate_weights! export create_network, load_network, save_network export best_candidate, best_fitness, best_params @@ -322,4 +321,6 @@ export timesteps # Analysis export Analysis +export apply_bias_correction + end # end module