diff --git a/calibration/experiments/gcm_driven_scm/Project.toml b/calibration/experiments/gcm_driven_scm/Project.toml index d72e961527..d1968a0dee 100644 --- a/calibration/experiments/gcm_driven_scm/Project.toml +++ b/calibration/experiments/gcm_driven_scm/Project.toml @@ -11,6 +11,7 @@ ClimaUtilities = "b3f4f4ca-9299-4f7f-bd9b-81e1242a7513" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" EnsembleKalmanProcesses = "aa8a2aa5-91d8-4396-bcef-d4f2ec43552d" +Flux = "587475ba-b771-5e3f-ad9e-33799f191a9c" Glob = "c27321d9-0574-5035-807b-f59d2c89b15c" Interpolations = "a98d9a8b-a2ab-59e6-89dd-64a1c18fca59" JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819" @@ -19,5 +20,5 @@ NCDatasets = "85f8d34a-cbdd-5861-8df4-14fed0d494ab" YAML = "ddb6d928-2868-570f-bddf-ab3f9cf99eb6" [compat] -ClimaCalibrate = "=0.0.3" -EnsembleKalmanProcesses = "2" +EnsembleKalmanProcesses = "=2.1.2" +ClimaCalibrate = "=0.0.13" diff --git a/calibration/experiments/gcm_driven_scm/README.md b/calibration/experiments/gcm_driven_scm/README.md index f9725cfae5..c7e2084996 100644 --- a/calibration/experiments/gcm_driven_scm/README.md +++ b/calibration/experiments/gcm_driven_scm/README.md @@ -1,10 +1,33 @@ # Overview of Calibration Pipeline for GCM-Driven Single Column Model (EDMF) -This setup provide tools for calibrating both prognostic and diagnostic EDMF variants to LES profiles, given the same forcings and boundary conditions. The gcm-driven EDMF setup is employed in single-column mode, which uses both interactive radiation and surface fluxes. Forcing profiles include resolved eddy advection, horizontal advection, subsidence, and GCM state relaxation. The setup is run to the top of the atmosphere to compute radiation, but calibrations statistics are computed only on the lower 4km (`z_max`), where LES output is available. +## Pipeline Components + +### Configuration Files +- `experiment_config.yml` - Configuration of calibration settings + - Defines spatiotemporal calibration window + - Specifies required pipeline file paths + - Controls batch processing parameters + +- `model_config_**.yml` - Configuration for ClimaAtmos single column model + - Defines model-specific parameters + - Allows for multiple model configuration variants + +## Best Practices +- Ensure `batch_size` matches available LES configurations +- Verify normalization factors for each variable +- Monitor ensemble convergence using provided plotting tools + +This setup provide tools for calibrating both prognostic and diagnostic EDMF variants to LES profiles, given the same forcings and boundary conditions. The gcm-driven EDMF setup is employed in single-column mode, which uses both interactive radiation and surface fluxes. Forcing profiles include resolved eddy advection, horizontal advection, subsidence, and GCM state relaxation. The setup is run to the top of the atmosphere to compute radiation, but calibration statistics are only computed on the calibration grid `z_cal_grid`. LES profiles are available for different geolocations ("cfsites"), spanning seasons, forcing host models, and climates (AMIP, AMIP4K). A given LES simulation is referred to as a "configuration". Calibrations employ batching by default and stack multiple configurations (a number equal to the `batch_size`) in a given iteration. The observation vector for a single configuration is formed by concatenating profiles across calibration variables, where each geophysical variable is normalized to have approximately unit variance and zero mean. These variable-by-variable normalization factors are precomputed (`norm_factors_dict`) and applied to all observations. Following this operation, the spatiotemporal calibration window is applied and temporal means are computed to form the observation vector `y`. Because variables are normalized to have 0 mean and unit variance, a constant diagonal noise matrix is used (configurable as `const_noise`). +### Observation Map +1. **Time-mean**: Time-mean of profiles taken between [`y_t_start_sec`, `y_t_end_sec`] for `y` and [`g_t_start_sec`, `g_t_end_sec`] for `G`. +2. **Interpolation**: Case-specific (i.e., "shallow", "deep") interpolation to the calibration grid, defined with stretch-grid parameters in `z_cal_grid`. +3. **Normalization**: Variable-specific normalization using the mean and standard deviation defined in `norm_factors_by_var`. Optionally, take log of variables using `log_vars` before normalization. +4. **Concatenation**: Stack across cases in a batch, forming `y`, `G`. + ## Getting Started ### Define calibration and model configurations: @@ -20,6 +43,11 @@ LES profiles are available for different geolocations ("cfsites"), spanning seas ### Analyze output with: - `julia --project plot_ensemble.jl` - plots vertical profiles of all ensemble members in a given iteration, given path to calibration output - `julia --project edmf_ensemble_stats.jl` - computes and plots metrics offline [i.e., root mean squared error (RMSE)] as a function of iteration, given path to calibration output. -- `julia --project plot_eki.jl` - plot eki metrics [loss, var-weighted loss] and `y`, `g` vectors vs iteration, display best particles +- `julia --project plot_eki.jl` - plot eki metrics [loss, variance-weighted loss] and `y`, `G` vectors vs iteration, display best particles +## Troubleshooting +- **Memory Issues**: If you encounter out of memory errors, increase the memory allocation in both `run_calibration.sbatch` and the `experiment_config.yml` file. This is particularly important when working with larger batch sizes. Example error message: + ``` + srun: error: hpc-92-10: task 9: Out Of Memory + ``` \ No newline at end of file diff --git a/calibration/experiments/gcm_driven_scm/edmf_ensemble_stats.jl b/calibration/experiments/gcm_driven_scm/edmf_ensemble_stats.jl index 5c6525ad27..2dc0af5710 100644 --- a/calibration/experiments/gcm_driven_scm/edmf_ensemble_stats.jl +++ b/calibration/experiments/gcm_driven_scm/edmf_ensemble_stats.jl @@ -1,12 +1,8 @@ #!/usr/bin/env julia -import ClimaComms -@static pkgversion(ClimaComms) >= v"0.6" && ClimaComms.@import_required_backends - using ArgParse using Distributed -addprocs() - +addprocs(1) @everywhere begin using EnsembleKalmanProcesses: TOMLInterface @@ -60,6 +56,13 @@ function parse_args() return parse_with_settings(s) end +@everywhere function validate_ensemble_member(iteration_dir, batch_size) + config_dirs = + filter(x -> isdir(joinpath(iteration_dir, x)), readdir(iteration_dir)) + num_configs = count(x -> startswith(x, "config_"), config_dirs) + return num_configs == batch_size +end + function main() args = parse_args() @@ -87,6 +90,7 @@ function main() cal_vars = config_dict["y_var_names"] const_noise_by_var = config_dict["const_noise_by_var"] n_iterations = config_dict["n_iterations"] + batch_size = config_dict["batch_size"] model_config_dict = YAML.load_file(joinpath(output_dir, "configs", "model_config.yml")) @@ -95,9 +99,6 @@ function main() end ref_paths, _ = get_les_calibration_library() - comms_ctx = ClimaComms.SingletonCommsContext() - atmos_config = CA.AtmosConfig(model_config_dict; comms_ctx) - zc_model = get_z_grid(atmos_config, z_max = z_max) @everywhere function calculate_statistics(y_var) non_nan_values = y_var[.!isnan.(y_var)] @@ -124,9 +125,9 @@ function main() cal_vars, const_noise_by_var, ref_paths, - zc_model, reduction, ensemble_size, + batch_size, ) println("Processing Iteration: $iteration") stats_df = DataFrame( @@ -141,13 +142,25 @@ function main() rmse_std = Union{Missing, Float64}[], ) config_indices = get_batch_indicies_in_iteration(iteration, output_dir) + iteration_dir = + joinpath(output_dir, "iteration_$(lpad(iteration, 3, '0'))") + + valid_ensemble_members = filter( + config_i -> validate_ensemble_member( + joinpath(iteration_dir, "member_$(lpad(config_i, 3, '0'))"), + batch_size, + ), + config_indices, + ) + for var_name in var_names means = Float64[] maxs = Float64[] mins = Float64[] sum_squared_errors = zeros(Float64, ensemble_size) - for config_i in config_indices - data = ensemble_data( + + for config_i in valid_ensemble_members + data, zc_model = ensemble_data( process_profile_variable, iteration, config_i, @@ -157,6 +170,7 @@ function main() output_dir = output_dir, z_max = z_max, n_vert_levels = n_vert_levels, + return_z_interp = true, ) for i in 1:size(data, 2) y_var = data[:, i] @@ -166,12 +180,21 @@ function main() push!(mins, col_min) end if in(var_name, cal_vars) + ref_path = ref_paths[config_i] + cfsite_number, _, _, _ = parse_les_path(ref_path) + forcing_type = get_cfsite_type(cfsite_number) + + ti = config_dict["y_t_start_sec"] + ti = isa(ti, AbstractFloat) ? ti : ti[forcing_type] + tf = config_dict["y_t_end_sec"] + tf = isa(tf, AbstractFloat) ? tf : tf[forcing_type] + y_true, Σ_obs, norm_vec_obs = get_obs( - ref_paths[config_i], + ref_path, [var_name], zc_model; - ti = config_dict["y_t_start_sec"], - tf = config_dict["y_t_end_sec"], + ti = ti, + tf = tf, Σ_const = const_noise_by_var, z_score_norm = false, ) @@ -179,12 +202,10 @@ function main() compute_ensemble_squared_error(data, y_true) end end + if in(var_name, cal_vars) - # Compute RMSE per ensemble member rmse_per_member = sqrt.(sum_squared_errors / n_vert_levels) - # Filter out NaNs (failed simulations) valid_rmse = rmse_per_member[.!isnan.(rmse_per_member)] - non_nan_simulation_count = length(valid_rmse) mean_rmse = mean(valid_rmse) min_rmse = minimum(valid_rmse) max_rmse = maximum(valid_rmse) @@ -226,9 +247,9 @@ function main() cal_vars, const_noise_by_var, ref_paths, - zc_model, reduction, ensemble_size, + batch_size, ), iterations_list, ) diff --git a/calibration/experiments/gcm_driven_scm/experiment_config.yml b/calibration/experiments/gcm_driven_scm/experiment_config.yml index fa1cdea356..53da4d9042 100644 --- a/calibration/experiments/gcm_driven_scm/experiment_config.yml +++ b/calibration/experiments/gcm_driven_scm/experiment_config.yml @@ -1,25 +1,61 @@ -prior_path: prior_prognostic_pi_entr.toml +prior_path: prior_prognostic_pi_entr_smooth_entr_detr_impl_0M_v1.toml ensemble_size: 100 -n_iterations: 12 -batch_size: 2 # number of cases per iteration -model_config : model_config_prognostic.yml # options {model_config_prognostic.yml, model_config_diagnostic.yml} -output_dir : output/exp_1 # output dir -y_var_names: [thetaa, hus, clw] # calibration variables clw +n_iterations: 8 +batch_size: 5 # number of cases per iteration +# model_config : model_config_prognostic.yml # options {model_config_prognostic.yml, model_config_diagnostic.yml} +model_config : model_config_prognostic_impl.yml +output_dir : /central/scratch/cchristo/debug/exp16 + +# Slurm resource configuration +slurm_time: "02:00:00" +slurm_mem_per_cpu: "25G" +slurm_cpus_per_task: 1 + +y_var_names: [thetaa, hus, clw] # calibration variables clw clw] log_vars: ["clw"] # take log(var) when forming y, g -z_max : 4000 # spatial subsetting: use statistics from [0, z_max] (in [m]) for calibration -dims_per_var : 29 # num dimensions per variable (num cells in vertical profile below z_max) +# log_vars: [] + +nice_loc_ug: 0.01 +nice_loc_gg: 0.5 + +z_max: null +z_cal_grid: # calibration grid (stretch-grid parameters). In general, `z_elem` should be the same for all types + shallow: + z_max: 4000.0 + z_elem: 30 + dz_bottom: 30 + deep: + z_max: 15000.0 + z_elem: 30 + dz_bottom: 30 +dims_per_var : 30 # num dimensions per variable (num cells in vertical profile below z_max) # eki_timestep: 0.1 # timestep of eki, if using default -y_t_start_sec : 475200.0 # start time of LES averaging window [s] : 5.5 days -y_t_end_sec : 518400.0 # end time of LES averaging window [s] : 6 days (LES length = 6 days) -g_t_start_sec : 216000.0 # start time of SCM averaging window [s] : 2.5 days -g_t_end_sec : 259200.0 # end time of SCM averaging window [s] : 3 days (SCM length = 3 days) + +y_t_start_sec: # start time of LES averaging window [s] + shallow: 475200.0 # 5.5 days + deep: 302400.0 # 3.5 days +y_t_end_sec: # end time of LES averaging window [s] + shallow: 518400.0 # 6 days (LES length = 6 days) + deep: 345600.0 # 4 days (LES length = 4 days) +g_t_start_sec: 216000.0 # start time of SCM averaging window [s] : 2.5 days +g_t_end_sec: 259200.0 # end time of SCM averaging window [s] : 3 days (SCM length = 3 days) norm_factors_by_var: - thetaa: [298.828, 8.617] - hus: [0.00676, 0.00423] - clw: [-9.808, 3.116] # log norm factors + thetaa: [301.218, 15.235] + hus: [0.00672, 0.00477] + clw: [-9.579, 3.164] # log norm factors + # cli: [-11.697, 1.304] # log norm factors const_noise_by_var: - thetaa: 0.00005 - hus: 0.00005 - clw: 0.00005 \ No newline at end of file + thetaa: 0.0016 + hus: 0.0016 + clw: 0.0045 + # clw: 0.0016 + # cli: 0.01 + +pretrained_nn_path: "/home/cchristo/ml_mixing_length/nn_666p_leaky_relu.jld2" + +# Config files for deep and shallow cases +forcing_toml_files: + shallow: "scm_tomls/gcmdriven_relaxation_shallow_forcing.toml" + deep: "scm_tomls/gcmdriven_relaxation_deep_forcing.toml" \ No newline at end of file diff --git a/calibration/experiments/gcm_driven_scm/get_les_metadata.jl b/calibration/experiments/gcm_driven_scm/get_les_metadata.jl index 8244bb7887..f5aff2f260 100644 --- a/calibration/experiments/gcm_driven_scm/get_les_metadata.jl +++ b/calibration/experiments/gcm_driven_scm/get_les_metadata.jl @@ -3,10 +3,22 @@ using Glob """ """ +# cfSite numbers +CFSITE_TYPES = Dict( + "shallow" => (collect(4:15)..., collect(17:23)...), + "deep" => + (collect(30:33)..., collect(66:70)..., 82, 92, 94, 96, 99, 100), +) + function get_les_calibration_library() les_library = get_shallow_LES_library() - # AMIP4K data: July, NE Pacific - cfsite_numbers = (17, 23) + # AMIP data: July, NE Pacific + # cfsite_numbers = (17, 18, 22, 23, 30, 94) + # cfsite_numbers = (17, 22, 23, 30, 33, 94) + cfsite_numbers = (17, 21, 23, 30, 33)# 94) + # cfsite_numbers = (30, 33,)# 94) + + # cfsite_numbers = (17, 30,)# 94) les_kwargs = (forcing_model = "HadGEM2-A", month = 7, experiment = "amip") ref_paths = [ get_stats_path(get_cfsite_les_dir(cfsite_number; les_kwargs...)) for @@ -15,6 +27,20 @@ function get_les_calibration_library() return (ref_paths, cfsite_numbers) end +function get_cfsite_type(i, cfsite_numbers) + return get_cfsite_type(cfsite_numbers[i]) +end + +function get_cfsite_type(cfsite_number::Int) + if cfsite_number in CFSITE_TYPES["shallow"] + return "shallow" + elseif cfsite_number in CFSITE_TYPES["deep"] + return "deep" + else + @error "cfSite number $(cfsite_number) not found in available sites." + end +end + """ get_LES_library @@ -25,7 +51,18 @@ and experiments. """ function get_LES_library() LES_library = get_shallow_LES_library() - deep_sites = (collect(30:33)..., collect(66:70)..., 82, 92, 94, 96, 99, 100) + deep_sites = deepcopy(CFSITE_TYPES["deep"]) + + + # remove <0 ql/cli cases + # sites_07 = deepcopy(setdiff(deep_sites, [92, 99, 100])) + # append!(LES_library["HadGEM2-A"]["07"]["cfsite_numbers"], sites_07) + # sites_01 = deepcopy(setdiff(deep_sites, [99,])) + # append!(LES_library["HadGEM2-A"]["01"]["cfsite_numbers"], sites_01) + # sites_04 = deepcopy(setdiff(deep_sites, [32, 92, 94, 96, 99, 100])) + # append!(LES_library["HadGEM2-A"]["04"]["cfsite_numbers"], sites_04) + # sites_10 = deepcopy(setdiff(deep_sites, [92, 94, 99, 100])) + # append!(LES_library["HadGEM2-A"]["10"]["cfsite_numbers"], sites_10) append!(LES_library["HadGEM2-A"]["07"]["cfsite_numbers"], deep_sites) append!(LES_library["HadGEM2-A"]["01"]["cfsite_numbers"], deep_sites) @@ -34,6 +71,7 @@ function get_LES_library() sites_10 = deepcopy(setdiff(deep_sites, [94, 100])) append!(LES_library["HadGEM2-A"]["10"]["cfsite_numbers"], sites_10) + LES_library_full = deepcopy(LES_library) for model in keys(LES_library_full) for month in keys(LES_library_full[model]) @@ -103,8 +141,7 @@ function get_shallow_LES_library() "CNRM-CM5" => Dict(), "CNRM-CM6-1" => Dict(), ) - Shen_et_al_sites = collect(4:15) - append!(Shen_et_al_sites, collect(17:23)) + Shen_et_al_sites = collect(deepcopy(CFSITE_TYPES["shallow"])) # HadGEM2-A model (76 AMIP-AMIP4K pairs) LES_library["HadGEM2-A"]["10"] = Dict() diff --git a/calibration/experiments/gcm_driven_scm/get_nearest_particle.jl b/calibration/experiments/gcm_driven_scm/get_nearest_particle.jl new file mode 100644 index 0000000000..893acf7728 --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/get_nearest_particle.jl @@ -0,0 +1,81 @@ +import TOML, YAML +import JLD2 +using Distributions +import EnsembleKalmanProcesses as EKP +using EnsembleKalmanProcesses.ParameterDistributions +using EnsembleKalmanProcesses.TOMLInterface +import ClimaCalibrate as CAL +using Plots +using LinearAlgebra +using DataFrames +using Statistics + + +include("helper_funcs.jl") + + +# output_dir = "/groups/esm/cchristo/climaatmos_scm_calibrations/output_ml_mix/exp_43" # path to calibration output +output_dir = "/central/scratch/cchristo/edmf_impl_dev3/exp11" # path to calibration output +iteration = 5 +prefix = "" + + +# write_optimal_toml_dir = "./calibrated_tomls/pert_pres" +# param_overrides_path = "./scm_tomls/prognostic_edmfx.toml" + +write_optimal_toml_dir = "./scm_runner/optimal_tomls" +param_overrides_path = "./scm_tomls/prognostic_edmfx.toml" + + +const config_dict = + YAML.load_file(joinpath(output_dir, "configs", "experiment_config.yml")) +const pretrained_nn_path = config_dict["pretrained_nn_path"] + + +prior_path = joinpath(output_dir, "configs", "prior.toml") + +# if no nn +const prior = CAL.get_prior(prior_path) + +# if nn +# prior = create_prior_with_nn(prior_path, pretrained_nn_path) + +iter_path = CAL.path_to_iteration(output_dir, iteration) +eki = JLD2.load_object(joinpath(iter_path, "eki_file.jld2")) +u = EKP.get_u_final(eki) + + +# mean_diff = sum(abs, u .- u_best_mean, dims = 1) +mean_diff = sum(abs, u .- mean(u, dims = 2), dims = 1) +nearest_mean_index = argmin(mean_diff) +col_index = nearest_mean_index[2] + +u_nearest = u[:, col_index] +phi_nearest = EKP.transform_unconstrained_to_constrained(prior, u_nearest) + + +@info "Ensemble member nearest to the mean for iteration $iteration" +@info "Particle Number: $col_index" +# @info "u values: $u_nearest" +# @info "phi values: $phi_nearest" + +param_toml_path = joinpath( + CAL.path_to_ensemble_member(output_dir, iteration, col_index), + "parameters.toml", +) +param_nearest = TOML.parsefile(param_toml_path) + +param_overrides = TOML.parsefile(param_overrides_path) + +merged_params = merge(param_nearest, param_overrides) + +# write optimal toml to file +exp_match = match(r"exp_(\d+)", output_dir) +exp_number = + exp_match !== nothing ? "exp" * exp_match.captures[1] : "exp_unknown" +file_name = "parameters_nearest_neig_particle_i$(iteration)_m$(col_index)_$(exp_number)$(prefix).toml" +output_toml_path = joinpath(write_optimal_toml_dir, file_name) +open(output_toml_path, "w") do file + TOML.print(file, merged_params) +end +@info "Merged parameters written to: $output_toml_path" diff --git a/calibration/experiments/gcm_driven_scm/get_optimal_particles.jl b/calibration/experiments/gcm_driven_scm/get_optimal_particles.jl new file mode 100644 index 0000000000..3487af8c66 --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/get_optimal_particles.jl @@ -0,0 +1,193 @@ +import TOML, YAML +import JLD2 +using Distributions +import EnsembleKalmanProcesses as EKP +using EnsembleKalmanProcesses.ParameterDistributions +using EnsembleKalmanProcesses.TOMLInterface +import ClimaCalibrate as CAL +using Plots +using LinearAlgebra +using DataFrames +using Statistics + + +include("helper_funcs.jl") +output_dir = "/groups/esm/cchristo/climaatmos_scm_calibrations/output_ml_mix/exp_39" +iteration = 13 # iteration to get optimal particles (iterations = nothing) +final_iter = iteration + 1 + +n_lowest = 200 + + + +const config_dict = + YAML.load_file(joinpath(output_dir, "configs", "experiment_config.yml")) +const pretrained_nn_path = config_dict["pretrained_nn_path"] + + +prior_path = joinpath(output_dir, "configs", "prior.toml") +prior = create_prior_with_nn(prior_path, pretrained_nn_path) + +# const prior = CAL.get_prior(joinpath(output_dir, "configs", "prior.toml")) + + +### print best particles and loss +@info "Best particle in final iteration and loss" +@info "Final Iteration: $final_iter" + +iter_path = CAL.path_to_iteration(output_dir, iteration) +eki = JLD2.load_object(joinpath(iter_path, "eki_file.jld2")) + +iter_path_p1 = CAL.path_to_iteration(output_dir, iteration + 1) +eki_p1 = JLD2.load_object(joinpath(iter_path_p1, "eki_file.jld2")) + +Δt = EKP.get_Δt(eki) +y_i = EKP.get_obs(eki) +gamma_i = EKP.get_obs_noise_cov(eki) + +g = EKP.get_g_final(eki_p1) + +nan_columns_count = sum(any(isnan, g, dims = 1)) + +# Find successful simulations +non_nan_columns_indices = findall(x -> !x, vec(any(isnan, g, dims = 1))) +g_non_nan = g[:, non_nan_columns_indices] + +y_diff = y_i .- g_non_nan + +u = EKP.get_u_final(eki) +N_obs = size(g, 1) +cov_init = cov(u, dims = 2) + +loss_min_inds, loss_min_vals = + get_loss_min(output_dir, iteration, n_lowest = n_lowest) + +u_best = u[:, loss_min_inds] + +df = DataFrame(ParticleNumber = loss_min_inds, Loss = loss_min_vals) +display(df) + +u_best_mean = mean(u_best, dims = 2) +u_best_std = std(u_best, dims = 2) + +phi_best = EKP.transform_unconstrained_to_constrained(prior, u_best) + +phi_best_mean = mean(phi_best, dims = 2) +phi_best_std = std(phi_best, dims = 2) + + +### find ensemble member nearest to be mean + + +# mean_diff = sum(abs, u .- u_best_mean, dims = 1) +mean_diff = sum(abs, u .- mean(u, dims = 2), dims = 1) +nearest_mean_index = argmin(mean_diff) +col_index = nearest_mean_index[2] + +u_nearest = u[:, col_index] +phi_nearest = EKP.transform_unconstrained_to_constrained(prior, u_nearest) +g_nearest = g[:, col_index] + +@info "Ensemble member nearest to the mean for iteration $iteration" +@info "Particle Number: $col_index" +@info "u values: $u_nearest" +@info "phi values: $phi_nearest" + + + +names = [] +for i in 1:length(prior.name) + if isa(prior.distribution[i], EKP.Parameterized) + push!(names, prior.name[i]) + elseif isa(prior.distribution[i], EKP.VectorOfParameterized) + for j in 1:length(prior.distribution[i].distribution) + push!(names, prior.name[i] * "_$j") + end + end +end + + +# println("Best Particle Parameters:") +# for ii = 1:length(names) +# println("Parameter: $(names[ii])") +# if occursin("turb_entr_param", names[ii]) +# println("Mean: $(u_best_mean[ii])") +# println("Std: $(u_best_std[ii])") +# end +# println("Mean: $(phi_best_mean[ii])") +# println("Std: $(phi_best_std[ii])") +# end + + +# println("Nearest Particle Parameters for iteration $iteration:") +# for ii = 1:length(names) +# println("Parameter: $(names[ii])") +# if occursin("turb_entr_param", names[ii]) +# println("Mean: $(u_nearest[ii])") +# # println("Std: $(u_best_std[ii])") +# else +# println("Mean: $(phi_nearest[ii])") +# end +# # println("Std: $(phi_best_std[ii])") +# end + +println("Nearest Particle Standard Dev for iteration $iteration:") +for ii in 1:length(names) + println("Parameter: $(names[ii])") + if occursin("turb_entr_param", names[ii]) + println("Std: $(u_best_std[ii])") + else + println("Std: $(phi_best_std[ii])") + end +end + + +const prior_config = + TOML.parsefile(joinpath(output_dir, "configs", "prior.toml")) + +prior_config_out = deepcopy(prior_config) +for (name_i, prior_i) in pairs(prior_config) + # println("Key: $name_i, Value: $prior") + if occursin("VectorOfParameterized", prior_i["prior"]) + contraint_i = get(prior_i, "constraint", nothing) + # @show prior + # @show contraint_i + end + # else occursin("constrained_gaussian", prior_i["prior"]) + # @show prior_i["prior"] +end + +# for prior_param_name in names +# prior_config[prior_param_name] +# end + + + +# if !isnothing(contraint_i) +# if occursin("no_contraint", contraint_i) +# println("Constrained Gaussian") +# end +# end + + + + + +# param_dict = CAL.get_param_dict(prior) +# save_parameter_ensemble( +# u_best_mean, +# prior, +# param_dict, +# output_dir, +# "test_write_parameters.toml", +# iteration) + +# param_dict = get_param_dict(prior) +# save_parameter_ensemble( +# EKP.get_u_final(eki), +# prior, +# param_dict, +# output_dir, +# "test_write_parameters.toml", +# iteration, +# ) diff --git a/calibration/experiments/gcm_driven_scm/helper_funcs.jl b/calibration/experiments/gcm_driven_scm/helper_funcs.jl index a2c3239a97..e0589291fa 100644 --- a/calibration/experiments/gcm_driven_scm/helper_funcs.jl +++ b/calibration/experiments/gcm_driven_scm/helper_funcs.jl @@ -4,7 +4,25 @@ using Statistics using LinearAlgebra import ClimaAtmos as CA import ClimaCalibrate as CAL -import Interpolations +using Interpolations +import EnsembleKalmanProcesses as EKP +using Logging +using TOML +using Flux +using JLD2 + +include("nn_helpers.jl") + +import ClimaComms +@static pkgversion(ClimaComms) >= v"0.6" && ClimaComms.@import_required_backends + + +"""Suppress Info and Warnings for any function""" +function suppress_logs(f, args...; kwargs...) + Logging.with_logger(Logging.SimpleLogger(stderr, Logging.Error)) do + f(args...; kwargs...) + end +end "Optional vector" const OptVec{T} = Union{Nothing, Vector{T}} @@ -15,14 +33,22 @@ const OptReal = Union{Real, Nothing} "Optional dictionary" const OptDict = Union{Nothing, Dict} -CLIMADIAGNOSTICS_LES_NAME_MAP = - Dict("thetaa" => "theta_mean", "hus" => "qt_mean", "clw" => "ql_mean") - +CLIMADIAGNOSTICS_LES_NAME_MAP = Dict( + "thetaa" => "theta_mean", + "hus" => "qt_mean", + "clw" => "ql_mean", + "cli" => "qi_mean", + "wap " => "w_core", +) """Get z cell centers coordinates for CA run, given config. """ function get_z_grid(atmos_config; z_max = nothing) params = CA.ClimaAtmosParameters(atmos_config) +end + +function get_z_grid(atmos_config::CA.AtmosConfig; z_max = nothing) + params = CA.ClimaAtmosParameters(atmos_config) spaces = CA.get_spaces(atmos_config.parsed_args, params, atmos_config.comms_ctx) coord = CA.Fields.coordinate_field(spaces.center_space) @@ -33,6 +59,46 @@ function get_z_grid(atmos_config; z_max = nothing) return z_vec end +"""Creates stretched vertical grid using ClimaCore utils, given `z_max`, `z_elem`, and `dz_bottom`. + +Output: + - `z_vec` :: Vector of `z` coordinates. +""" +function create_z_stretch( + atmos_config; + z_max = nothing, + z_elem = nothing, + dz_bottom = nothing, +) + + config_tmp = deepcopy(atmos_config) + params = CA.ClimaAtmosParameters(config_tmp) + + !isnothing(z_max) ? config_tmp.parsed_args["z_max"] = z_max : nothing + !isnothing(z_elem) ? config_tmp.parsed_args["z_elem"] = z_elem : nothing + !isnothing(dz_bottom) ? config_tmp.parsed_args["dz_bottom"] = dz_bottom : + nothing + + spaces = CA.get_spaces(config_tmp.parsed_args, params, config_tmp.comms_ctx) + + coord = CA.Fields.coordinate_field(spaces.center_space); + z_vec = convert(Vector{Float64}, parent(coord.z)[:]) + return z_vec +end + +function get_cal_z_grid(atmos_config, z_cal_grid::Dict, forcing_type::String) + if forcing_type in keys(z_cal_grid) + return create_z_stretch( + atmos_config; + z_max = z_cal_grid[forcing_type]["z_max"], + z_elem = z_cal_grid[forcing_type]["z_elem"], + dz_bottom = z_cal_grid[forcing_type]["dz_bottom"], + ) + else + return get_z_grid(atmos_config) + end +end + """ is_timeseries(filename::String, var_name::String) @@ -238,6 +304,11 @@ function fetch_interpolate_transform( else var_ = nc_fetch_interpolate(var_name, filename, z_scm) end + + if var_name == "ql_mean" || var_name == "qi_mean" + var_ = max.(var_, 0.0) + end + return var_ end @@ -422,6 +493,11 @@ function vertical_interpolation( end end +function interp_prof_1D(var_data, z_ref, z_interp) + nodes = (z_ref,) + var_itp = LinearInterpolation(nodes, var_data; extrapolation_bc = Line()) + return var_itp(z_interp) +end """ get_profile( @@ -798,18 +874,36 @@ function ensemble_data( output_dir = nothing, z_max = nothing, n_vert_levels, + return_z_interp = false, ) G_ensemble = Array{Float64}(undef, n_vert_levels, config_dict["ensemble_size"]) + z_interp = nothing for m in 1:config_dict["ensemble_size"] - try member_path = TOMLInterface.path_to_ensemble_member(output_dir, iteration, m) - simdir = - SimDir(joinpath(member_path, "config_$config_i", "output_0000")) + simulation_dir = + joinpath(member_path, "config_$config_i", "output_0000") + + model_config_dict = YAML.load_file(joinpath(simulation_dir, ".yml")) + # suppress logs when creating model config, z grids to avoid cluttering output + model_config = suppress_logs( + CA.AtmosConfig, + model_config_dict; + comms_ctx = ClimaComms.SingletonCommsContext(), + ) + if !isnothing(config_dict["z_cal_grid"]) + z_interp = suppress_logs( + get_cal_z_grid, + model_config, + config_dict["z_cal_grid"], + ) + end + + simdir = SimDir(simulation_dir) G_ensemble[:, m] .= process_profile_func( simdir, @@ -818,13 +912,26 @@ function ensemble_data( t_start = config_dict["g_t_start_sec"], t_end = config_dict["g_t_end_sec"], z_max = z_max, + z_interp = z_interp, ) + + # catch file i/o errors -> ensemble member crashed catch err - @info "Error during observation map for ensemble member $m" err - G_ensemble[:, m] .= NaN + err_str = string(err) + if occursin("Simulation failed at:", err_str) || + occursin("opening file", err_str) + @info "Simulation failed at a specific time for ensemble member $m" err + G_ensemble[:, m] .= NaN + elseif occursin("HDF error", err_str) + @info "NetCDF HDF error encountered for ensemble member $m" err + G_ensemble[:, m] .= NaN + else + rethrow(err) + end end + end - return G_ensemble + return return_z_interp ? (G_ensemble, z_interp) : G_ensemble end """Get minimum loss (RMSE) from EKI obj for a given iteration.""" @@ -864,15 +971,92 @@ function lowest_loss_rmse( end function get_forcing_file(i, ref_paths) - return "/central/groups/esm/zhaoyi/GCMForcedLES/forcing/corrected/HadGEM2-A_amip.2004-2008.07.nc" + ref_path = ref_paths[i] + cfsite_info = get_cfsite_info_from_path(ref_path) + + forcing_model = cfsite_info["forcing_model"] + experiment = cfsite_info["experiment"] + month = cfsite_info["month"] + + forcing_file_path = "/central/groups/esm/zhaoyi/GCMForcedLES/forcing/corrected/$(forcing_model)_$(experiment).2004-2008.$(month).nc" + + return forcing_file_path end function get_cfsite_id(i, cfsite_numbers) return string("site", cfsite_numbers[i]) end +function get_cfsite_info_from_path(input_string::String) + pattern = + r"cfsite/(\d+)/([^/]+)/([^/]+)/.*cfsite(\d+)_([^_]+)_([^_]+)_.*\.(\d{2})\..*nc" + m = match(pattern, input_string) + if m !== nothing + return Dict( + "forcing_model" => m.captures[2], + "cfsite_number" => m.captures[4], + "month" => m.captures[7], + "experiment" => m.captures[3], + ) + else + return Dict{String, String}() + end +end + + function get_batch_indicies_in_iteration(iteration, output_dir::AbstractString) iter_path = CAL.path_to_iteration(output_dir, iteration) eki = JLD2.load_object(joinpath(iter_path, "eki_file.jld2")) return EKP.get_current_minibatch(eki) end + + + + +function create_prior_with_nn( + prior_path, + pretrained_nn_path; + arc = [8, 20, 15, 10, 1], +) + + prior_dict = TOML.parsefile(prior_path) + parameter_names = keys(prior_dict) + + prior_vec = + Vector{EKP.ParameterDistribution}(undef, length(parameter_names)) + for (i, n) in enumerate(parameter_names) + prior_vec[i] = CAL.get_parameter_distribution(prior_dict, n) + end + + @load pretrained_nn_path serialized_weights + num_nn_params = length(serialized_weights) + + + # nn_model = construct_fully_connected_nn(arc, deepcopy(serialized_weights); biases_bool = true, output_layer_activation_function = Flux.identity) + nn_model = construct_fully_connected_nn( + arc, + deepcopy(serialized_weights); + biases_bool = true, + activation_function = Flux.leakyrelu, + output_layer_activation_function = Flux.identity, + ) + + # serialized_stds = serialize_std_model(nn_model; std_weight = 0.05, std_bias = 0.005) + serialized_stds = + serialize_std_model(nn_model; std_weight = 0.1, std_bias = 0.00001) + + nn_mean_std = EKP.VectorOfParameterized([ + Normal(serialized_weights[ii], serialized_stds[ii]) for + ii in 1:num_nn_params + ]) + nn_constraint = repeat([EKP.no_constraint()], num_nn_params) + nn_prior = EKP.ParameterDistribution( + nn_mean_std, + nn_constraint, + "mixing_length_param_vec", + ) + push!(prior_vec, nn_prior) + + prior = EKP.combine_distributions(prior_vec) + return prior +end diff --git a/calibration/experiments/gcm_driven_scm/les_quick_plot.jl b/calibration/experiments/gcm_driven_scm/les_quick_plot.jl new file mode 100644 index 0000000000..4bd6441714 --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/les_quick_plot.jl @@ -0,0 +1,135 @@ +import ClimaCalibrate as CAL +import ClimaAtmos as CA +import EnsembleKalmanProcesses as EKP +import YAML +using Plots + +include("helper_funcs.jl") +include("get_les_metadata.jl") + +plot_vars = ["thetaa", "hus", "cli", "clw"] + +experiment_dir = dirname(Base.active_project()) +const model_interface = joinpath(experiment_dir, "model_interface.jl") +const experiment_config = + YAML.load_file(joinpath(experiment_dir, "experiment_config.yml")) + + +# unpack experiment_config vars into scope +for (key, value) in experiment_config + @eval const $(Symbol(key)) = $value +end + +const prior = CAL.get_prior(joinpath(experiment_dir, prior_path)) + +# load configs and directories +model_config_dict = YAML.load_file(model_config) +atmos_config = CA.AtmosConfig(model_config_dict) + + +CFSITE_TYPES = Dict( + "shallow" => (collect(4:15)..., collect(17:23)...), + "deep" => + (collect(30:33)..., collect(66:70)..., 82, 92, 94, 96, 99, 100), +) + + +function get_les_calibration_library() + les_library = get_shallow_LES_library() + cfsite_numbers = [ + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 30, + 31, + 32, + 33, + 66, + 67, + 68, + 69, + 70, + 82, + 92, + 94, + 96, + 99, + 100, + ] + + les_kwargs = (forcing_model = "HadGEM2-A", month = 7, experiment = "amip") + ref_paths = [ + get_stats_path(get_cfsite_les_dir(cfsite_number; les_kwargs...)) for + cfsite_number in cfsite_numbers + ] + return (ref_paths, cfsite_numbers) +end + +### get LES obs (Y) and norm factors +ref_paths, _ = get_les_calibration_library() + +for plot_var in plot_vars + # Create directory for each plot_var under prof_plots + plot_dir = joinpath("./prof_plots", plot_var) + if !isdir(plot_dir) + mkdir(plot_dir) # Create the directory if it doesn't exist + end + + obs_vec = [] + for ref_path in ref_paths + cfsite_number, _, _, _ = parse_les_path(ref_path) + forcing_type = get_cfsite_type(cfsite_number) + zc_model = get_cal_z_grid(atmos_config, z_cal_grid, forcing_type) + + ti = experiment_config["y_t_start_sec"] + ti = isa(ti, AbstractFloat) ? ti : ti[forcing_type] + tf = experiment_config["y_t_end_sec"] + tf = isa(tf, AbstractFloat) ? tf : tf[forcing_type] + + y_obs, Σ_obs, norm_vec_obs = get_obs( + ref_path, + [plot_var], + zc_model; + ti = ti, + tf = tf, + norm_factors_dict = norm_factors_by_var, + z_score_norm = false, + log_vars = [], + Σ_const = const_noise_by_var, + Σ_scaling = "const", + ) + + # Plot and save the figure + plot(y_obs, zc_model) + savefig(joinpath(plot_dir, "les_obs_$(cfsite_number).png")) + + push!( + obs_vec, + EKP.Observation( + Dict( + "samples" => y_obs, + "covariances" => Σ_obs, + "names" => split(ref_path, "/")[end], + ), + ), + ) + end +end + +series_names = [ref_paths[i] for i in 1:length(ref_paths)] diff --git a/calibration/experiments/gcm_driven_scm/model_config_prognostic.yml b/calibration/experiments/gcm_driven_scm/model_config_prognostic.yml index 3e4af7ab18..bc98bbe3ad 100644 --- a/calibration/experiments/gcm_driven_scm/model_config_prognostic.yml +++ b/calibration/experiments/gcm_driven_scm/model_config_prognostic.yml @@ -3,25 +3,32 @@ external_forcing: "GCM" external_forcing_file: "/central/groups/esm/zhaoyi/GCMForcedLES/forcing/corrected/HadGEM2-A_amip.2004-2008.07.nc" surface_setup: "GCM" turbconv: "prognostic_edmfx" +rayleigh_sponge: true +viscous_sponge: true implicit_diffusion: true -implicit_sgs_advection: false approximate_linear_solve_iters: 2 -max_newton_iters_ode: 1 +max_newton_iters_ode: 3 edmfx_upwinding: "first_order" rayleigh_sponge: true edmfx_entr_model: "PiGroups" -edmfx_detr_model: "PiGroups" -# precip_model: 1M +edmfx_detr_model: "SmoothArea" +precip_model: 0M +cloud_model: "quadrature_sgs" +# cloud_model : "grid_scale" edmfx_sgs_mass_flux: true edmfx_sgs_diffusive_flux: true edmfx_nh_pressure: true -edmfx_filter: false +# edmfx_nh_pressure_type : physical +# edmfx_nh_pressure_type : linear +edmfx_filter: true prognostic_tke: true moist: "equil" config: "column" hyperdiff: "false" z_max: 40e3 z_elem: 60 +# mixing_length_model: "nn" +# mixing_length_model: "physical" z_stretch: true dz_bottom: 30 rad: allskywithclear @@ -31,11 +38,14 @@ perturb_initstate: false dt: "10secs" dt_rad: "30mins" t_end: "72hours" -cloud_model: "quadrature_sgs" + +dt_save_to_disk: "10mins" call_cloud_diagnostics_per_stage : true toml: [scm_tomls/prognostic_edmfx.toml] netcdf_output_at_levels: true netcdf_interpolation_num_points: [2, 2, 60] diagnostics: - - short_name: [hus, thetaa, clw, arup, tke, entr, detr, waup, turbentr] - period: 10mins +- short_name: [thetaa, hus, cl, clw, cli, pr] + period: 10mins +- short_name: [entr, detr, lmix, turbentr, tke, arup, waup, edt] + period: 10mins diff --git a/calibration/experiments/gcm_driven_scm/model_config_prognostic_impl.yml b/calibration/experiments/gcm_driven_scm/model_config_prognostic_impl.yml new file mode 100644 index 0000000000..cf601a8544 --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/model_config_prognostic_impl.yml @@ -0,0 +1,50 @@ +initial_condition: "GCM" +external_forcing: "GCM" +external_forcing_file: "/central/groups/esm/zhaoyi/GCMForcedLES/forcing/corrected/HadGEM2-A_amip.2004-2008.07.nc" +surface_setup: "GCM" +turbconv: "prognostic_edmfx" +rayleigh_sponge: true +viscous_sponge: true +implicit_diffusion: true +implicit_sgs_advection: true +implicit_sgs_mass_flux: true +implicit_sgs_entr_detr: true +implicit_sgs_nh_pressure: true +approximate_linear_solve_iters: 2 +max_newton_iters_ode: 3 +edmfx_upwinding: "first_order" +rayleigh_sponge: true +edmfx_entr_model : "PiGroups" +edmfx_detr_model : "SmoothArea" +precip_model: 0M +cloud_model : "grid_scale" +edmfx_sgs_mass_flux: true +edmfx_sgs_diffusive_flux: true +edmfx_nh_pressure: true +# edmfx_nh_pressure_type : physical +# edmfx_nh_pressure_type : linear +edmfx_filter: true +prognostic_tke: true +moist: "equil" +config: "column" +hyperdiff: "false" +z_max: 40e3 +z_elem: 60 +z_stretch: true +dz_bottom: 30 +rad: allskywithclear +co2_model: fixed +insolation: "gcmdriven" +perturb_initstate: false +dt: "70secs" +dt_rad: "30mins" +t_end: "72hours" +call_cloud_diagnostics_per_stage : true +toml: [scm_tomls/prognostic_edmfx.toml] +netcdf_output_at_levels: true +netcdf_interpolation_num_points: [2, 2, 60] +diagnostics: +- short_name: [thetaa, hus, cl, clw, cli, pr] + period: 10mins +- short_name: [entr, detr, lmix, turbentr, tke, arup, waup, edt] #nh_pressure] + period: 10mins \ No newline at end of file diff --git a/calibration/experiments/gcm_driven_scm/model_interface.jl b/calibration/experiments/gcm_driven_scm/model_interface.jl index a1b058b169..c5f5b1dd52 100644 --- a/calibration/experiments/gcm_driven_scm/model_interface.jl +++ b/calibration/experiments/gcm_driven_scm/model_interface.jl @@ -3,8 +3,9 @@ import YAML import ClimaComms @static pkgversion(ClimaComms) >= v"0.6" && ClimaComms.@import_required_backends using ClimaUtilities.ClimaArtifacts -import ClimaCalibrate: - set_up_forward_model, run_forward_model, path_to_ensemble_member +import ClimaCalibrate: path_to_ensemble_member + +import ClimaCalibrate as CAL import EnsembleKalmanProcesses as EKP using JLD2 @@ -18,26 +19,36 @@ const output_dir = experiment_config_dict["output_dir"] const model_config = experiment_config_dict["model_config"] const batch_size = experiment_config_dict["batch_size"] +@everywhere function run_atmos_simulation(atmos_config) + simulation = CA.get_simulation(atmos_config) + sol_res = CA.solve_atmos!(simulation) + if sol_res.ret_code == :simulation_crashed + if !isnothing(sol_res.sol) + T = eltype(sol_res.sol) + if T !== Any && isconcretetype(T) + sol_res.sol .= T(NaN) + else + fill!(sol_res.sol, NaN) + end + end + error( + "The ClimaAtmos simulation has crashed. See the stack trace for details.", + ) + end +end -""" - set_up_forward_model(member, iteration, experiment_dir::AbstractString) - set_up_forward_model(member, iteration, config_dict::AbstractDict) - -Return an AtmosConfig object for the given member and iteration. +function CAL.forward_model(iteration, member) + base_config_dict = YAML.load_file(joinpath(@__DIR__, model_config)) -Turns off default diagnostics and sets the TOML parameter file to the member's path. -This assumes that the config dictionary has an `output_dir` key. -""" -function set_up_forward_model(member, iteration, experiment_dir::AbstractString) - experiment_config_dict = - YAML.load_file(joinpath(experiment_dir, model_config)) - config_dict = YAML.load_file(joinpath(experiment_dir, model_config)) iter_path = CAL.path_to_iteration(output_dir, iteration) eki = JLD2.load_object(joinpath(iter_path, "eki_file.jld2")) member_path = path_to_ensemble_member(output_dir, iteration, member) + + config_dict = deepcopy(base_config_dict) config_dict["output_dir"] = member_path parameter_path = joinpath(member_path, "parameters.toml") if haskey(config_dict, "toml") + config_dict["toml"] = abspath.(config_dict["toml"]) push!(config_dict["toml"], parameter_path) else config_dict["toml"] = [parameter_path] @@ -50,41 +61,46 @@ function set_up_forward_model(member, iteration, experiment_dir::AbstractString) config["external_forcing_file"] = get_forcing_file(i, ref_paths) config["cfsite_number"] = get_cfsite_id(i, cfsite_numbers) config["output_dir"] = joinpath(member_path, "config_$i") - comms_ctx = ClimaComms.SingletonCommsContext() - CA.AtmosConfig(config; comms_ctx) - end - - return atmos_configs -end + forcing_type = get_cfsite_type(i, cfsite_numbers) - -addprocs(batch_size) -@everywhere begin - import ClimaAtmos as CA - using JLD2 -end - -@everywhere function run_atmos_simulation(atmos_config) - simulation = CA.get_simulation(atmos_config) - sol_res = CA.solve_atmos!(simulation) - if sol_res.ret_code == :simulation_crashed - !isnothing(sol_res.sol) && sol_res.sol .= eltype(sol_res.sol)(NaN) - error( - "The ClimaAtmos simulation has crashed. See the stack trace for details.", + if haskey(experiment_config_dict, "forcing_toml_files") && haskey( + experiment_config_dict["forcing_toml_files"], + forcing_type, ) + forcing_config_file = + experiment_config_dict["forcing_toml_files"][forcing_type] + forcing_config_path = + joinpath(output_dir, "configs", forcing_config_file) + + if isfile(forcing_config_path) + forcing_config_path = abspath(forcing_config_path) + if haskey(config, "toml") + config["toml"] = abspath.(config["toml"]) + push!(config["toml"], forcing_config_path) + @info "Added forcing config: $forcing_config_path for case $i" + else + config["toml"] = [forcing_config_path] + end + else + @warn "Forcing config file not found at $forcing_config_path for case $i." + end + else + @warn "No forcing config file specified for type '$forcing_type' for case $i." + end + + comms_ctx = ClimaComms.SingletonCommsContext() + CA.AtmosConfig(config; comms_ctx) end -end -function run_forward_model(atmos_configs) @info "Preparing to run $(length(atmos_configs)) model simulations in parallel." println("Number of workers: ", nprocs()) start_time = time() - - pmap(run_atmos_simulation, atmos_configs) - + map(run_atmos_simulation, atmos_configs) end_time = time() + elapsed_time = (end_time - start_time) / 60.0 @info "Finished all model simulations. Total time taken: $(elapsed_time) minutes." + end diff --git a/calibration/experiments/gcm_driven_scm/nn_helpers.jl b/calibration/experiments/gcm_driven_scm/nn_helpers.jl new file mode 100644 index 0000000000..1042df6c3a --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/nn_helpers.jl @@ -0,0 +1,203 @@ +using Flux + + +""" + Count number of parameters in fully-connected NN model given Array specifying architecture following + the pattern: [#inputs, #neurons in L1, #neurons in L2, ...., #outputs]. Equal to the number of weights + biases. +""" +num_params_from_arc(nn_arc::AbstractArray{Int}) = + num_weights_from_arc(nn_arc) + num_biases_from_arc(nn_arc) + +""" + Count number of weights in fully-connected NN architecture. +""" +num_weights_from_arc(nn_arc::AbstractArray{Int}) = + sum(i -> nn_arc[i] * nn_arc[i + 1], 1:(length(nn_arc) - 1)) + +""" + Count number of biases in fully-connected NN architecture. +""" +num_biases_from_arc(nn_arc::AbstractArray{Int}) = + sum(i -> nn_arc[i + 1], 1:(length(nn_arc) - 1)) + + +""" + construct_fully_connected_nn( + arc::AbstractArray{Int}, + params::AbstractArray{FT}; + biases_bool::bool = false, + activation_function::Flux.Function = Flux.relu, + output_layer_activation_function::Flux.Function = Flux.relu,) + + Given network architecture and parameter vectors, construct NN model and unpack weights (and biases if `biases_bool` is true). + - `arc` :: vector specifying network architecture + - `params` :: parameter vector containing weights (and biases if `biases_bool` is true) + - `biases_bool` :: bool specifying whether `params` includes biases. + - `activation_function` :: activation function for hidden layers + - `output_layer_activation_function` :: activation function for output layer +""" +function construct_fully_connected_nn( + arc::AbstractArray{Int}, + params::AbstractArray{FT}; + biases_bool::Bool = false, + activation_function::Flux.Function = Flux.relu, + output_layer_activation_function::Flux.Function = Flux.relu, +) where {FT <: Real} + + # check consistency of architecture and parameters + if biases_bool + n_params_nn = num_params_from_arc(arc) + n_params_vect = length(params) + else + n_params_nn = num_weights_from_arc(arc) + n_params_vect = length(params) + end + if n_params_nn != n_params_vect + error( + "Incorrect number of parameters ($n_params_vect) for requested NN architecture ($n_params_nn)!", + ) + end + + layers = [] + parameters_i = 1 + # unpack parameters in parameter vector into network + for layer_i in 1:(length(arc) - 1) + if layer_i == length(arc) - 1 + activation_function = output_layer_activation_function + end + layer_num_weights = arc[layer_i] * arc[layer_i + 1] + + nn_biases = if biases_bool + params[(parameters_i + layer_num_weights):(parameters_i + layer_num_weights + arc[layer_i + 1] - 1)] + else + biases_bool + end + + layer = Flux.Dense( + reshape( + params[parameters_i:(parameters_i + layer_num_weights - 1)], + arc[layer_i + 1], + arc[layer_i], + ), + nn_biases, + activation_function, + ) + parameters_i += layer_num_weights + + if biases_bool + parameters_i += arc[layer_i + 1] + end + push!(layers, layer) + end + + return Flux.Chain(layers...) +end + +""" +serialize_ml_model( + ml_model::Flux.Chain, + ) + + Given Flux NN model, serialize model weights into a vector of parameters. + - `ml_model` - A Flux model instance. +""" + +function serialize_ml_model(ml_model::Flux.Chain) + parameters = [] + param_type = eltype.(Flux.params(ml_model))[1] + for layer in ml_model + for param in Flux.params(layer) + param_flattened = reshape(param, length(param)) + push!(parameters, param_flattened...) + end + end + return convert(Array{param_type}, parameters) +end + + +""" + construct_fully_connected_nn_default( + arc::AbstractArray{Int}; + biases_bool::Bool = false, + activation_function::Flux.Function = Flux.relu, + output_layer_activation_function::Flux.Function = Flux.relu, + ) + +Given network architecture, construct NN model with default `Flux.jl` weight initialization (glorot_uniform). +- `arc` :: vector specifying network architecture +- `biases_bool` :: bool specifying whether to include biases. +- `activation_function` :: activation function for hidden layers +- `output_layer_activation_function` :: activation function for output layer +""" + +function construct_fully_connected_nn_default( + arc::AbstractArray{Int}; + biases_bool::Bool = false, + activation_function::Flux.Function = Flux.relu, + output_layer_activation_function::Flux.Function = Flux.relu, +) + + layers = [] + for layer_i in 1:(length(arc) - 1) + if layer_i == length(arc) - 1 + activation_function = output_layer_activation_function + end + + layer = Flux.Dense( + arc[layer_i] => arc[layer_i + 1], + activation_function; + bias = biases_bool, + ) + push!(layers, layer) + end + + return Flux.Chain(layers...) +end + + + +""" + serialize_std_model( + ml_model::Flux.Chain; + std_weight::Real, + std_bias::Real + ) -> Vector{Float64} + +Given a Flux NN model, serialize the standard deviations for weights and biases into a vector. +- `ml_model` :: A Flux.Chain model instance. +- `std_weight` :: Standard deviation for weights. +- `std_bias` :: Standard deviation for biases. + +Returns a vector of stds where weights have `std_weight` and biases have `std_bias`, +aligned with the parameter order from `serialize_ml_model`. +""" +function serialize_std_model( + ml_model::Flux.Chain; + std_weight::Real, + std_bias::Real, +) + stds = Float64[] # Initialize an empty Float64 vector + + for layer in ml_model.layers + # Ensure the layer is a Dense layer + if isa(layer, Flux.Dense) + # Serialize weights + weights = layer.weight + n_weights = length(weights) + push!(stds, fill(Float64(std_weight), n_weights)...) + + # Serialize biases if they exist + if layer.bias !== nothing + biases = layer.bias + n_biases = length(biases) + push!(stds, fill(Float64(std_bias), n_biases)...) + end + else + error( + "serialize_std_model currently supports only Flux.Dense layers. Found layer of type $(typeof(layer)).", + ) + end + end + + return stds +end diff --git a/calibration/experiments/gcm_driven_scm/norm_factors.jl b/calibration/experiments/gcm_driven_scm/norm_factors.jl new file mode 100644 index 0000000000..f816cf5919 --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/norm_factors.jl @@ -0,0 +1,131 @@ +using Plots +using LinearAlgebra +using YAML +import ClimaAtmos as CA +using Statistics + +include("helper_funcs.jl") +include("get_les_metadata.jl") + + +var_i = "cli" #"clw" + +experiment_dir = dirname(Base.active_project()) +const experiment_config = + YAML.load_file(joinpath(experiment_dir, "experiment_config.yml")) +const model_config = experiment_config["model_config"] +model_config_dict = YAML.load_file(model_config) +atmos_config = CA.AtmosConfig(model_config_dict) +const z_cal_grid = experiment_config["z_cal_grid"] +const const_noise_by_var = experiment_config["const_noise_by_var"] + +function get_local_les_calibration_library() + # les_library = get_shallow_LES_library() + les_library = get_LES_library() + + + ref_paths = [] + for model in keys(les_library) + for month in keys(les_library[model]) + cfsite_numbers = Tuple(les_library[model][month]["cfsite_numbers"]) + les_kwargs = ( + forcing_model = model, + month = parse(Int, month), + experiment = "amip", + ) + append!( + ref_paths, + [ + get_stats_path( + get_cfsite_les_dir( + parse(Int, cfsite_number[1]); + les_kwargs..., + ), + ) for cfsite_number in cfsite_numbers + ], + ) + + end + end + + + return ref_paths +end + + + +ref_paths = get_local_les_calibration_library() + + + +y_obs_all = Float64[] + +for ref_path in ref_paths + + try + cfsite_number, _, _, _ = parse_les_path(ref_path) + forcing_type = get_cfsite_type(cfsite_number) + zc_model = get_cal_z_grid(atmos_config, z_cal_grid, forcing_type) + + ti = experiment_config["y_t_start_sec"] + ti = isa(ti, AbstractFloat) ? ti : ti[forcing_type] + tf = experiment_config["y_t_end_sec"] + tf = isa(tf, AbstractFloat) ? tf : tf[forcing_type] + + y_obs, Σ_obs, norm_vec_obs = get_obs( + ref_path, + [var_i], + zc_model; + ti = ti, + tf = tf, + norm_factors_dict = nothing, + z_score_norm = false, + Σ_const = const_noise_by_var, + Σ_scaling = "const", + ) + if (var_i == "clw" || var_i == "cli") && any(<(0), y_obs) + negative_values = filter(<(0), y_obs) + @info "Skipping $ref_path due to negative values: $negative_values" + else + append!(y_obs_all, y_obs) + end + catch e + @info "Error: $e" + end +end + + +# clw/cli +log_y_obs_all = log10.(y_obs_all .+ 1e-12) +log_mean, log_std = mean(log_y_obs_all), std(log_y_obs_all) +log_y_obs_all_norm = (log_y_obs_all .- log_mean) ./ log_std +histogram( + log_y_obs_all_norm, + bins = 30, + xlabel = "Value", + ylabel = "Frequency (log scale)", + title = var_i, +) + + +## hus +# variable_mean, variable_std = mean(y_obs_all), std(y_obs_all) +# y_obs_all_norm = (y_obs_all .- variable_mean) ./ variable_std +# histogram( +# y_obs_all_norm, +# bins = 30, +# xlabel = "Value", +# ylabel = "Frequency (log scale)", +# title = var_i, +# ) + +# @show variable_mean, variable_std + + + + +# # histogram(log_y_obs_all, bins=30, yscale=:log10, xlabel="Value", ylabel="Frequency (log scale)", title=var_i) + +# # λ = 0.1 # Choose an appropriate λ, often determined empirically +# # box_cox_transformed = (y_obs_all .^ λ .- 1) ./ λ +# # histogram(box_cox_transformed, bins=30, xlabel="Value", ylabel="Frequency (log scale)", title=var_i) diff --git a/calibration/experiments/gcm_driven_scm/observation_map.jl b/calibration/experiments/gcm_driven_scm/observation_map.jl index 417638adbc..82bc550f67 100644 --- a/calibration/experiments/gcm_driven_scm/observation_map.jl +++ b/calibration/experiments/gcm_driven_scm/observation_map.jl @@ -1,12 +1,24 @@ import EnsembleKalmanProcesses as EKP -import ClimaCalibrate: - observation_map, ExperimentConfig, path_to_ensemble_member +import ClimaCalibrate as CAL +import ClimaCalibrate: path_to_ensemble_member using ClimaAnalysis using JLD2 using Statistics +using Logging +"""Suppress Info and Warnings for any function""" +function suppress_logs(f, args...; kwargs...) + Logging.with_logger(Logging.SimpleLogger(stderr, Logging.Error)) do + f(args...; kwargs...) + end +end -function observation_map(iteration; config_dict::Dict) +function CAL.observation_map( + iteration; + config_dict = YAML.load_file( + joinpath(dirname(Base.active_project()), "experiment_config.yml"), + ), +) full_dim = config_dict["dims_per_var"] * @@ -16,7 +28,7 @@ function observation_map(iteration; config_dict::Dict) G_ensemble = Array{Float64}(undef, full_dim..., config_dict["ensemble_size"]) - iter_path = CAL.path_to_iteration(output_dir, iteration) + iter_path = CAL.path_to_iteration(config_dict["output_dir"], iteration) eki = JLD2.load_object(joinpath(iter_path, "eki_file.jld2")) for m in 1:config_dict["ensemble_size"] member_path = @@ -29,6 +41,7 @@ function observation_map(iteration; config_dict::Dict) t_start = config_dict["g_t_start_sec"], t_end = config_dict["g_t_end_sec"], z_max = config_dict["z_max"], + z_cal_grid = config_dict["z_cal_grid"], norm_factors_dict = config_dict["norm_factors_by_var"], log_vars = config_dict["log_vars"], ) @@ -48,13 +61,25 @@ function process_member_data( t_start, t_end, z_max = nothing, + z_cal_grid = nothing, norm_factors_dict = nothing, log_vars = [], ) forcing_file_indices = EKP.get_current_minibatch(eki) g = Float64[] for i in forcing_file_indices - simdir = SimDir(joinpath(member_path, "config_$i", "output_0000")) + simulation_dir = joinpath(member_path, "config_$i", "output_0000") + model_config_dict = YAML.load_file(joinpath(simulation_dir, ".yml")) + # suppress logs when creating model config, z grids to avoid cluttering output + model_config = suppress_logs(CA.AtmosConfig, model_config_dict) + + cfsite_number = model_config_dict["cfsite_number"] + site_num = parse(Int, replace(cfsite_number, r"[^0-9]" => "")) + forcing_type = get_cfsite_type(site_num) + + z_interp = get_cal_z_grid(model_config, z_cal_grid, forcing_type) + + simdir = SimDir(simulation_dir) for (i, y_name) in enumerate(y_names) y_var_i = process_profile_variable( simdir, @@ -63,6 +88,7 @@ function process_member_data( t_start, t_end, z_max, + z_interp, norm_factors_dict, log_vars, ) @@ -81,6 +107,7 @@ function process_profile_variable( t_start, t_end, z_max = nothing, + z_interp = nothing, norm_factors_dict = nothing, log_vars = [], ) @@ -92,10 +119,18 @@ function process_profile_variable( z_window = filter(x -> x <= z_max, var_i.dims["z"]) var_i = window(var_i, "z", right = maximum(z_window)) end - sim_t_end = var_i.dims["time"][end] + + sim_t_end = nothing + try + sim_t_end = var_i.dims["time"][end] + catch e + if isa(e, BoundsError) + throw(ErrorException("Simulation failed at: t=0")) + end + end if sim_t_end < 0.95 * t_end - throw(ErrorException("Simulation failed.")) + throw(ErrorException("Simulation failed at: $sim_t_end")) end # take time-mean @@ -103,6 +138,11 @@ function process_profile_variable( average_time(window(var_i, "time", left = t_start, right = sim_t_end)) y_var_i = slice(var_i_ave, x = 1, y = 1).data + # interpolate to calibration grid + if !isnothing(z_interp) + y_var_i = interp_prof_1D(y_var_i, var_i.dims["z"], z_interp) + end + if y_name in log_vars y_var_i = log10.(y_var_i .+ 1e-12) end diff --git a/calibration/experiments/gcm_driven_scm/plot_eki.jl b/calibration/experiments/gcm_driven_scm/plot_eki.jl index 2b48679f75..243961052f 100644 --- a/calibration/experiments/gcm_driven_scm/plot_eki.jl +++ b/calibration/experiments/gcm_driven_scm/plot_eki.jl @@ -9,8 +9,12 @@ using Plots using LinearAlgebra using DataFrames -output_dir = "output/exp_1" -iterations = nothing +# output_dir = "output/exp_1" +# output_dir= "/groups/esm/cchristo/climaatmos_scm_calibrations/output_ml_mix/exp_40" +# output_dir= "/central/scratch/cchristo/output_ml_mix2/exp_6" +output_dir = "/central/scratch/cchristo/output_ml_mix2/exp_8" +iterations = 0:5 +# iterations = nothing include("helper_funcs.jl") @@ -31,6 +35,12 @@ end const prior = CAL.get_prior(joinpath(output_dir, "configs", "prior.toml")) + +# const pretrained_nn_path = config_dict["pretrained_nn_path"] +# prior_path = joinpath(output_dir, "configs", "prior.toml") +# prior = create_prior_with_nn(prior_path, pretrained_nn_path) + + function inv_variance_weighted_loss( y_diff::Matrix{Float64}, gamma_i::Matrix{Float64}, @@ -111,10 +121,11 @@ for iteration in iterations fillalpha = 0.2, label = "Noise range", color = :gray, + linewidth = 0.1, ) - ylims!(plt, -3.0, 3.0) - savefig(plt, joinpath(plot_dir_y_vec, "eki_y_vs_g_iter_$(iteration).png")) + ylims!(plt, -5.0, 8.0) + savefig(plt, joinpath(plot_dir_y_vec, "eki_y_vs_g_iter_$(iteration).pdf")) end diff --git a/calibration/experiments/gcm_driven_scm/plot_ensemble.jl b/calibration/experiments/gcm_driven_scm/plot_ensemble.jl index 4d275f4c1e..91023a67c1 100644 --- a/calibration/experiments/gcm_driven_scm/plot_ensemble.jl +++ b/calibration/experiments/gcm_driven_scm/plot_ensemble.jl @@ -1,3 +1,4 @@ +using ArgParse import EnsembleKalmanProcesses: TOMLInterface import EnsembleKalmanProcesses as EKP using EnsembleKalmanProcesses.ParameterDistributions @@ -14,18 +15,95 @@ include("observation_map.jl") include("get_les_metadata.jl") -output_dir = "output/exp_1" # output directory -config_i = 1 # config to plot -ylims = (0, 4000) # y limits for plotting (`z` coord) -iterations = nothing # iterations to plot (i.e., 0:2). default is all iterations -var_names = - ("thetaa", "hus", "clw", "arup", "entr", "detr", "waup", "tke", "turbentr") +function parse_command_line_args() + s = ArgParseSettings() + @add_arg_table s begin + "--output_dir" + help = "Directory for output files" + arg_type = String + default = "output/exp_1" + + "--figs_dir" + help = "Directory for output figs" + arg_type = String + default = "figs/exp_default" + + "--config_i" + help = "Config index to plot" + arg_type = Int + default = 1 + + "--iterations" + help = "Iterations to plot (as i.e., `0:5` or `3`) (default is all iterations)" + arg_type = String + default = nothing + end + return parse_args(s) +end + +# Function to get all config indices from a given iteration and member directory +function get_config_indices(output_dir, iteration) + iteration_dir = joinpath(output_dir, "iteration_$(iteration)") + member_dirs = filter(isdir, readdir(iteration_dir, join = true)) + config_indices = Set{Int}() + + for member_dir in member_dirs + config_dirs = filter(isdir, readdir(member_dir, join = true)) + for config_dir in config_dirs + config_name = basename(config_dir) + if startswith(config_name, "config_") + config_index = parse(Int, split(config_name, "_")[2]) + push!(config_indices, config_index) + end + end + end + + return sort(collect(config_indices)) +end + +# Get command-line arguments +parsed_args = parse_command_line_args() + +# Assign variables based on the parsed arguments +output_dir = parsed_args["output_dir"] +figs_dir = parsed_args["figs_dir"] +config_i = parsed_args["config_i"] + +# ylims = (0, 4000) # y limits for plotting (`z` coord) +ylims = nothing +# iterations = !isnothing(parsed_args["iterations"]) ? parse(Int, parsed_args["iterations"]) : nothing +if !isnothing(parsed_args["iterations"]) + if occursin(":", parsed_args["iterations"]) + start_iter, end_iter = split(parsed_args["iterations"], ":") + iterations = parse(Int, start_iter):parse(Int, end_iter) + else + iterations = parse(Int, parsed_args["iterations"]) + end +else + iterations = nothing +end + +var_names = ( + "thetaa", + "hus", + "clw", + "entr", + "detr", + "waup", + "tke", + "arup", + "turbentr", + "lmix", + "nh_pressure", +) # "cli" reduction = "inst" config_dict = YAML.load_file(joinpath(output_dir, "configs", "experiment_config.yml")) const z_max = config_dict["z_max"] -const cal_var_names = config_dict["y_var_names"] +const z_cal_grid = config_dict["z_cal_grid"] +cal_var_names = config_dict["y_var_names"] +push!(cal_var_names, "wap") const const_noise_by_var = config_dict["const_noise_by_var"] const n_vert_levels = config_dict["dims_per_var"] model_config_dict = @@ -34,17 +112,28 @@ model_config_dict = ref_paths, _ = get_les_calibration_library() atmos_config = CA.AtmosConfig(model_config_dict) -# get/store LES obs and norm factors -zc_model = get_z_grid(atmos_config, z_max = z_max) +# Initialize config_indices +config_indices = [] -if isnothing(iterations) +# If config_i is not specified, infer from directory structure +if isnothing(config_i) iterations = get_iters_with_config(config_i, config_dict) + config_indices = get_config_indices(output_dir, iterations[1]) +else + config_indices = [config_i] end -xlims_dict = Dict("arup" => (-0.1, 0.4), "clw" => "auto") +xlims_dict = Dict( + "arup" => (-0.1, 0.25), + "entr" => (-1e-6, 0.005), + "detr" => (-1e-6, 0.005), + "clw" => "auto", + "cli" => "auto", + "lmix" => (-0.25, 750), +) -function compute_plot_limits(data; margin_ratio = 0.5, fixed_margin = 1.0) +function compute_plot_limits(data; margin_ratio = 2.5, fixed_margin = 1.0) min_val = minimum(data) max_val = maximum(data) @@ -59,76 +148,95 @@ end for iteration in iterations @show "Iter: $iteration" - for var_name in var_names - - data = ensemble_data( - process_profile_variable, - iteration, - config_i, - config_dict; - var_name = var_name, - reduction = reduction, - output_dir = output_dir, - z_max = z_max, - n_vert_levels = n_vert_levels, - ) - - # size(data) = [num vertical levels x ensemble size] - eki_filepath = joinpath( - ClimaCalibrate.path_to_iteration(output_dir, iteration), - "eki_file.jld2", - ) - eki = JLD2.load_object(eki_filepath) - - plot(legend = false) - for i in 1:size(data, 2) - - y_var = data[:, i] - Plots.plot!(y_var, zc_model) - end - if in(var_name, cal_var_names) - y_truth, Σ_obs, norm_vec_obs = get_obs( - ref_paths[config_i], - [var_name], - zc_model; - ti = config_dict["y_t_start_sec"], - tf = config_dict["y_t_end_sec"], - Σ_const = const_noise_by_var, - z_score_norm = false, + for config_i in config_indices + @show "Config: $config_i" + for var_name in var_names + + data, zc_model = ensemble_data( + process_profile_variable, + iteration, + config_i, + config_dict; + var_name = var_name, + reduction = reduction, + output_dir = output_dir, + z_max = z_max, + n_vert_levels = n_vert_levels, + return_z_interp = true, ) - Plots.plot!(y_truth, zc_model, color = :black, label = "LES") - end - - xlims = get(xlims_dict, var_name, nothing) - - if xlims == "auto" - xlims = compute_plot_limits(y_truth) - end + # size(data) = [num vertical levels x ensemble size] + eki_filepath = joinpath( + ClimaCalibrate.path_to_iteration(output_dir, iteration), + "eki_file.jld2", + ) + eki = JLD2.load_object(eki_filepath) + + plot(legend = false) + for i in 1:size(data, 2) + + y_var = data[:, i] + Plots.plot!(y_var, zc_model) + end + if in(var_name, cal_var_names) + ref_path = ref_paths[config_i] + cfsite_number, _, _, _ = parse_les_path(ref_path) + forcing_type = get_cfsite_type(cfsite_number) + + ti = config_dict["y_t_start_sec"] + ti = isa(ti, AbstractFloat) ? ti : ti[forcing_type] + tf = config_dict["y_t_end_sec"] + tf = isa(tf, AbstractFloat) ? tf : tf[forcing_type] + + y_truth, Σ_obs, norm_vec_obs = get_obs( + ref_path, + [var_name], + zc_model; + ti = ti, + tf = tf, + Σ_const = const_noise_by_var, + z_score_norm = false, + ) + Plots.plot!(y_truth, zc_model, color = :black, label = "LES") + end + + + xlims = get(xlims_dict, var_name, nothing) + + if xlims == "auto" + xlims = compute_plot_limits(y_truth) + end + + cfsite_info = get_cfsite_info_from_path(ref_paths[config_i]) + forcing_model = cfsite_info["forcing_model"] + experiment = cfsite_info["experiment"] + month = cfsite_info["month"] + cfsite_number = cfsite_info["cfsite_number"] + + Plots.plot!( + xlabel = var_name, + ylabel = "Height (z)", + title = "$var_name (cfsite: $cfsite_number, month: $month, model: $forcing_model)", + xlims = xlims, + ylims = ylims, + ) - Plots.plot!( - xlabel = var_name, - ylabel = "Height (z)", - title = var_name, - xlims = xlims, - ylims = ylims, - ) - - plot_rel_dir = joinpath( - output_dir, - "plots", - "ensemble_plots", - "config_$(config_i)", - ) - if !isdir(plot_rel_dir) - mkpath(plot_rel_dir) + plot_rel_dir = joinpath( + output_dir, + "plots", + "ensemble_plots", + "config_$(config_i)", + ) + if !isdir(plot_rel_dir) + mkpath(plot_rel_dir) + end + + savefig( + joinpath( + plot_rel_dir, + "ensemble_plot_$(var_name)_i_$(iteration).png", + ), + ) end - - savefig( - joinpath( - plot_rel_dir, - "ensemble_plot_$(var_name)_i_$(iteration).png", - ), - ) end end diff --git a/calibration/experiments/gcm_driven_scm/plot_priors.jl b/calibration/experiments/gcm_driven_scm/plot_priors.jl new file mode 100644 index 0000000000..25146bb5f4 --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/plot_priors.jl @@ -0,0 +1,36 @@ +import ClimaCalibrate as CAL +using EnsembleKalmanProcesses +using EnsembleKalmanProcesses.ParameterDistributions +import YAML + +import YAML +import JLD2 +using LinearAlgebra +using Distributions +using Plots + +include("helper_funcs.jl") + + + +# const prior = CAL.get_prior("./prior_prognostic_pi_entr_smooth_entr_detr_impl_0M.toml") +# p1 = plot(prior) +# savefig(p1, "prior.pdf") + +# usefule for specifing bounded parameters in config like: +# ParameterDistribution(Parameterized(Normal(10.1871, 0.1)), bounded_below(0.0), "tes2") + +prior_1 = constrained_gaussian("turb_entr_scale", 0.001, 0.0005, 0, Inf) +transformed_mu_1 = prior_1.distribution[1].distribution.μ +prior_2 = constrained_gaussian("turb_entr_power", 5000.0, 1.000, 0, Inf) +transformed_mu_2 = prior_2.distribution[1].distribution.μ + +# p1 = plot(prior) +# p_dist1 = plot(parm_dist1) +# savefig(p_dist1, "./prior_dist1.pdf") + +p_dist2 = plot(parm_dist2) +savefig(p_dist2, "./prior_dist2.pdf") + +# p4 = constrained_gaussian("entr_inv_tau", 1e-4, 1e-4, 0, Inf) +# p5 = constrained_gaussian("specific_humidity_precipitation_threshold", 5e-6, 3e-7, 0, Inf) diff --git a/calibration/experiments/gcm_driven_scm/plot_timeseries.jl b/calibration/experiments/gcm_driven_scm/plot_timeseries.jl new file mode 100644 index 0000000000..b6d1ba647e --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/plot_timeseries.jl @@ -0,0 +1,212 @@ +import EnsembleKalmanProcesses: TOMLInterface +import EnsembleKalmanProcesses as EKP +using EnsembleKalmanProcesses.ParameterDistributions +import ClimaCalibrate: observation_map, ExperimentConfig +using ClimaAnalysis +import ClimaAtmos as CA +import ClimaCalibrate +using Plots +using JLD2 +using Statistics +import YAML + +include("helper_funcs.jl") + + +# const z_max = experiment_config_dict["z_max"] +# const output_dir = experiment_config_dict["output_dir"] + +# output_dir = "output_prior_and_entr_exp/exp_8" +output_dir = "output_prior_and_entr_exp/exp_8" + +model_config_dict = + YAML.load_file(joinpath(output_dir, "configs", "model_config.yml")) +experiment_config_dict = + YAML.load_file(joinpath(output_dir, "configs", "experiment_config.yml")) + +atmos_config = CA.AtmosConfig(model_config_dict) +# get/store LES obs and norm factors +# zc_model = get_z_grid(atmos_config; z_max = z_max) + +# hus, thetaa, clw, arup, tke, entr, detr + +reduction = "inst" + +iteration = 0 +iter_formatted = lpad(iteration, 3, '0') + +config_i = 1 + +n_members = 49 +z_max = 3000.0 +ylims = (0.0, z_max) + +var_names = ("thetaa", "hus", "arup", "entr", "detr", "waup", "tke") + + +plot_rel_dir = joinpath(output_dir, "plots", "timeseries", "config_$(config_i)") +if !isdir(plot_rel_dir) + mkpath(plot_rel_dir) +end + + +clims_map = Dict( + "thetaa" => (290.0, 320.0), + "arup" => (-0.1, 0.1), + "detr" => (0.0, 0.1), + "entr" => (0.0, 1e-2), + "hus" => (0.0, 2e-2), + "waup" => (-3.0, 3.0), + "tke" => (-1.0, 1.0), +) +colormap_map = Dict( + "arup" => :balance, + "waup" => :balance, + "detr" => :dense, + "entr" => :dense, + "tke" => :balance, +) + + + + +for var_name in var_names + plots = [] + for member in 1:n_members + # println("Member: $member") + + n_rows = ceil(Int, sqrt(n_members)) + n_cols = ceil(Int, n_members / n_rows) + + row = div(member - 1, n_cols) + 1 + col = mod(member - 1, n_cols) + 1 + + member_formatted = lpad(member, 3, '0') + member_path = joinpath( + output_dir, + "iteration_$(iter_formatted)/member_$(member_formatted)/config_$(config_i)", + ) + # try + + simdir = SimDir(joinpath(member_path, "output_active")) + + var_i = get(simdir; short_name = var_name, reduction = reduction) + zc_model = var_i.dims["z"] + # if !isnothing(z_max) + # z_window = filter(x -> x <= z_max, var_i.dims["z"]) + # var_i = window(var_i, "z", right = maximum(z_window)) + # end + + t_array = var_i.dims["time"] + timeseries_matrix = slice(var_i, x = 1, y = 1).data + + show_colorbar = (member == n_members) + + if haskey(clims_map, var_name) + clims = clims_map[var_name] + else + clims = nothing + end + + ## Create a heatmap for each member and add it to the plots array + # p = heatmap( + # t_array, + # zc_model, + # timeseries_matrix', + # colormap = get(colormap_map, var_name, :darkrainbow), + # aspect_ratio = :auto, + # xlabel = "", + # ylabel = "", + # colorbar = show_colorbar, + # clims = clims, + # ) + # push!(plots, p) + + + ############################################################ + # Inside the for loop where each subplot is created + + yticks = nothing + xticks = nothing + if col != 1 + yticks = [] + ylabel = "" + else + ylabel = "Height (m)" + end + if row != n_rows + xticks = [] + xlabel = "" + else + xlabel = "Time (s)" + end + + p = heatmap( + t_array, + zc_model, + timeseries_matrix', + colormap = get(colormap_map, var_name, :darkrainbow), + aspect_ratio = :auto, + xlabel = xlabel, + ylabel = ylabel, + colorbar = show_colorbar, + clims = clims, + yticks = yticks, + xticks = xticks, + ylims = ylims, + ) + + + + + # Label each subplot with the member number + annotate!(p, (0.1, 0.9), text("M $member", :left, 5, "black")) + + push!(plots, p) + + + # catch + # @warn "Error during observation map for ensemble member $member" + # end + + end + + # Combine all plots into a 3x3 grid + # combined_plot = plot(plots..., layout = (5, 5), size = (900, 900), link = :both) + + # Optionally save the combined plot to a file + # savefig( + # combined_plot, + # joinpath( + # plot_rel_dir, + # "timeseries_$(var_name)_iter_$(iter_formatted).png", + # ), + # ) + + + ############################################################ + # Calculate the number of rows and columns based on the number of members + n_rows = ceil(Int, sqrt(n_members)) + n_cols = ceil(Int, n_members / n_rows) + + # Combine all plots into a grid + combined_plot = plot( + plots..., + layout = (n_rows, n_cols), + size = (900, 900), + link = :both, + margin = 0.01Plots.mm, + outer_margin = 5Plots.mm, + ) + + # Optionally save the combined plot to a file + savefig( + combined_plot, + joinpath( + plot_rel_dir, + "timeseries_$(var_name)_iter_$(iter_formatted).png", + ), + ) + + +end diff --git a/calibration/experiments/gcm_driven_scm/prior_nearest_neig_particle_i9_m3_precal_exp2_tighten_std.toml b/calibration/experiments/gcm_driven_scm/prior_nearest_neig_particle_i9_m3_precal_exp2_tighten_std.toml new file mode 100644 index 0000000000..5a65153f5b --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/prior_nearest_neig_particle_i9_m3_precal_exp2_tighten_std.toml @@ -0,0 +1,57 @@ +[entr_param_vec] +prior = "VectorOfParameterized([Normal(0.8232451844623447, 0.5), Normal(0.058776909570673115, 0.5), Normal(0.3812876963832981, 0.5), Normal(1.7521400508808318, 0.5), Normal(-0.32149828410236075, 0.5), Normal(-0.019764881527468917, 0.1), Normal(-0.05926889361439075, 0.5), Normal(0.3610260801159997, 0.5), Normal(0.5411236315642174, 0.5), Normal(-0.4681505113638351, 0.5), Normal(1.0674945161351919, 0.5), Normal(0.38794618444184004, 0.15)])" +constraint = "repeat([no_constraint()], 12)" +type = "float" + +[entr_coeff] +prior = "constrained_gaussian(entr_coeff, 2.0, 2.0, 0, Inf)" +type = "float" + +[turb_entr_param_vec] +prior = "VectorOfParameterized([Normal(-3.6137, 0.554), Normal(8.1871, 0.362)])" +constraint = "[bounded_below(0.0), bounded_below(0.0)]" +type = "float" + +[mixing_length_eddy_viscosity_coefficient] +prior = "constrained_gaussian(mixing_length_eddy_viscosity_coefficient, 0.12938179994984028, 0.07, 0, Inf)" +type = "float" + +[mixing_length_diss_coeff] +prior = "constrained_gaussian(mixing_length_diss_coeff, 0.13515021886991527, 0.07, 0, Inf)" +type = "float" + +[mixing_length_tke_surf_scale] +prior = "constrained_gaussian(mixing_length_tke_surf_scale, 0.5507468907407347, 0.5, 0, Inf)" +type = "float" + +[mixing_length_static_stab_coeff] +prior = "constrained_gaussian(mixing_length_static_stab_coeff, 0.21375280743443664, 0.1, 0, Inf)" +type = "float" + +[mixing_length_Prandtl_number_0] +prior = "constrained_gaussian(mixing_length_Prandtl_number_0, 1.0831106562777666, 0.2, 0, Inf)" +type = "float" + +[pressure_normalmode_buoy_coeff1] +prior = "constrained_gaussian(pressure_normalmode_buoy_coeff1, 0.17622092079471174, 0.08, 0, Inf)" +type = "float" + +[pressure_normalmode_drag_coeff] +prior = "constrained_gaussian(pressure_normalmode_drag_coeff, 25.872050045766862, 10.0, 0, Inf)" +type = "float" + +[diagnostic_covariance_coeff] +prior = "constrained_gaussian(diagnostic_covariance_coeff, 1.858712542728056, 0.3, 0, Inf)" +type = "float" + +[EDMF_surface_area] +prior = "constrained_gaussian(EDMF_surface_area, 0.09964843547415161, 0.03, 0, Inf)" +type = "float" + +[rain_autoconversion_timescale] +prior = "constrained_gaussian(rain_autoconversion_timescale, 5777.697315577852, 300.0, 0, Inf)" +type = "float" + +[snow_autoconversion_timescale] +prior = "constrained_gaussian(snow_autoconversion_timescale, 145.68541447981468, 12.0, 0, Inf)" +type = "float" diff --git a/calibration/experiments/gcm_driven_scm/prior_prognostic_pi_entr_smooth_entr_detr_impl_0M_v1.toml b/calibration/experiments/gcm_driven_scm/prior_prognostic_pi_entr_smooth_entr_detr_impl_0M_v1.toml new file mode 100644 index 0000000000..cefbfd8c33 --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/prior_prognostic_pi_entr_smooth_entr_detr_impl_0M_v1.toml @@ -0,0 +1,61 @@ +[entr_param_vec] +prior = "VectorOfParameterized([Normal(0.0, 10.0), Normal(0.0, 10.0), Normal(0.0, 10.0), Normal(0.0, 10.0), Normal(0.0, 10.0), Normal(0.3, 0.15)])" +constraint = "repeat([no_constraint()], 6)" +type = "float" + +[entr_mult_limiter_coeff] +prior = "constrained_gaussian(entr_mult_limiter_coeff, 0.7, 0.2, 0, Inf)" +type = "float" + +[max_area_limiter_scale] +prior = "constrained_gaussian(max_area_limiter_scale, 0.01, 0.002, 0, Inf)" +type = "float" + +[max_area_limiter_power] +prior = "constrained_gaussian(max_area_limiter_power, 25.0, 6.0, 0, Inf)" +type = "float" + +[turb_entr_param_vec] +prior = "VectorOfParameterized([Normal(-3.6137, 0.554), Normal(8.1871, 0.362)])" +constraint = "[bounded_below(0.0), bounded_below(0.0)]" +type = "float" + +[mixing_length_eddy_viscosity_coefficient] +prior = "constrained_gaussian(mixing_length_eddy_viscosity_coefficient, 0.14, 0.07, 0, Inf)" +type = "float" + +[mixing_length_diss_coeff] +prior = "constrained_gaussian(mixing_length_diss_coeff, 0.22, 0.15, 0, Inf)" +type = "float" + +[mixing_length_tke_surf_flux_coeff] +prior = "constrained_gaussian(mixing_length_tke_surf_flux_coeff, 3.0, 1.5, 0, Inf)" +type = "float" + +[mixing_length_static_stab_coeff] +prior = "constrained_gaussian(mixing_length_static_stab_coeff, 0.4, 0.2, 0, Inf)" +type = "float" + +[mixing_length_Prandtl_number_0] +prior = "constrained_gaussian(mixing_length_Prandtl_number_0, 0.74, 0.2, 0, Inf)" +type = "float" + +[pressure_normalmode_buoy_coeff1] +prior = "constrained_gaussian(pressure_normalmode_buoy_coeff1, 0.12, 0.08, 0, Inf)" +type = "float" + +[pressure_normalmode_drag_coeff] +prior = "constrained_gaussian(pressure_normalmode_drag_coeff, 40.0, 10.0, 0, Inf)" +type = "float" + +[EDMF_surface_area] +prior = "constrained_gaussian(EDMF_surface_area, 0.1, 0.03, 0, Inf)" +type = "float" + +[precipitation_timescale] +prior = "constrained_gaussian(precipitation_timescale, 1000.0, 200.0, 0, Inf)" +type = "float" + +[specific_humidity_precipitation_threshold] +prior = "constrained_gaussian(specific_humidity_precipitation_threshold, 5e-6, 2e-6, 0, Inf)" +type = "float" diff --git a/calibration/experiments/gcm_driven_scm/restart_calibration.jl b/calibration/experiments/gcm_driven_scm/restart_calibration.jl new file mode 100644 index 0000000000..ab221da8eb --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/restart_calibration.jl @@ -0,0 +1,148 @@ +using ArgParse +import ClimaCalibrate as CAL +import ClimaAtmos as CA +import EnsembleKalmanProcesses as EKP +import YAML +import JLD2 +using LinearAlgebra +using Random +using Flux +using TOML +using Distributions + + +include("helper_funcs.jl") +include("observation_map.jl") +include("get_les_metadata.jl") +include("nn_helpers.jl") + +function main() + s = ArgParseSettings() + @add_arg_table! s begin + "--output_dir" + help = "The output directory for calibration" + "--restart_iteration" + help = "Optional: specify the iteration to restart from" + arg_type = Int + default = -1 + end + + parsed_args = parse_args(s) + output_dir = parsed_args["output_dir"] + restart_iteration = parsed_args["restart_iteration"] + + experiment_dir = dirname(Base.active_project()) + model_interface = joinpath(experiment_dir, "model_interface.jl") + + experiment_config = + YAML.load_file(joinpath(output_dir, "configs", "experiment_config.yml")) + + for (key, value) in experiment_config + if key != "output_dir" + @eval $(Symbol(key)) = $value + end + end + + if model_config_dict["mixing_length_model"] == "nn" + prior = create_prior_with_nn( + prior_path, + pretrained_nn_path; + arc = [8, 20, 15, 10, 1], + ) + else + const prior = CAL.get_prior(joinpath(experiment_dir, prior_path)) + end + + model_config_dict = + YAML.load_file(joinpath(output_dir, "configs", "model_config.yml")) + atmos_config = CA.AtmosConfig(model_config_dict) + + + @info "Initializing calibration" n_iterations ensemble_size output_dir + + current_iterations = sort([ + parse(Int, split(x, "_")[2]) for x in readdir(output_dir) if + isdir(joinpath(output_dir, x)) && occursin("iteration_", x) + ]) + + if restart_iteration == -1 + latest_iteration = current_iterations[end] + else + latest_iteration = restart_iteration + end + + last_iteration_dir = + joinpath(output_dir, "iteration_$(lpad(latest_iteration, 3, '0'))") + @info "Restarting from iteration $latest_iteration" + + if isdir(last_iteration_dir) + member_dirs = + filter(x -> startswith(x, "member_"), readdir(last_iteration_dir)) + for member_dir in member_dirs + config_dirs = filter( + x -> startswith(x, "config_"), + readdir(joinpath(last_iteration_dir, member_dir)), + ) + for config_dir in config_dirs + rm( + joinpath(last_iteration_dir, member_dir, config_dir); + force = true, + recursive = true, + ) + end + end + end + + + eki_path = joinpath( + CAL.path_to_iteration(output_dir, latest_iteration), + "eki_file.jld2", + ) + eki = JLD2.load_object(eki_path) + + hpc_kwargs = CAL.kwargs( + time = 180, + mem_per_cpu = "12G", + cpus_per_task = min(batch_size + 1, 5), + ntasks = 1, + nodes = 1, + # reservation = "clima", + ) + module_load_str = CAL.module_load_string(CAL.CaltechHPCBackend) + for iter in latest_iteration:(n_iterations - 1) + @info "Iteration $iter" + jobids = map(1:ensemble_size) do member + @info "Running ensemble member $member" + + CAL.slurm_model_run( + iter, + member, + output_dir, + experiment_dir, + model_interface, + module_load_str; + hpc_kwargs, + ) + end + + statuses = CAL.wait_for_jobs( + jobids, + output_dir, + iter, + experiment_dir, + model_interface, + module_load_str; + hpc_kwargs, + verbose = true, + reruns = 0, + ) + CAL.report_iteration_status(statuses, output_dir, iter) + @info "Completed iteration $iter, updating ensemble" + G_ensemble = CAL.observation_map(iter; config_dict = experiment_config) + CAL.save_G_ensemble(output_dir, iter, G_ensemble) + eki = CAL.update_ensemble(output_dir, iter, prior) + end + +end + +main() diff --git a/calibration/experiments/gcm_driven_scm/restart_calibration.sbatch b/calibration/experiments/gcm_driven_scm/restart_calibration.sbatch new file mode 100644 index 0000000000..720577d67b --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/restart_calibration.sbatch @@ -0,0 +1,16 @@ +#!/bin/bash + +#SBATCH --time=30:00:00 # walltime +#SBATCH --nodes=1 # number of nodes +#SBATCH --ntasks=1 # number of processor cores (i.e. tasks) +#SBATCH --mem=60G # memory per CPU core +#SBATCH -J "2scm_cal_restart" # job name +#SBATCH --output=scm_cal_restart.out + +module purge +export MODULEPATH="/groups/esm/modules:$MODULEPATH" +module load climacommon/2024_05_27 + +julia --project restart_calibration.jl "$@" + +echo finished diff --git a/calibration/experiments/gcm_driven_scm/run_calibration.jl b/calibration/experiments/gcm_driven_scm/run_calibration.jl index 6f23ef1736..66ccf1694e 100644 --- a/calibration/experiments/gcm_driven_scm/run_calibration.jl +++ b/calibration/experiments/gcm_driven_scm/run_calibration.jl @@ -1,32 +1,77 @@ +using ClimaCalibrate import ClimaCalibrate as CAL import ClimaAtmos as CA import EnsembleKalmanProcesses as EKP import YAML +import TOML +using Distributions +using Distributed +using Random +using Flux +using Logging -import YAML import JLD2 using LinearAlgebra include("helper_funcs.jl") include("observation_map.jl") include("get_les_metadata.jl") +include("nn_helpers.jl") +# load configs experiment_dir = dirname(Base.active_project()) const model_interface = joinpath(experiment_dir, "model_interface.jl") const experiment_config = YAML.load_file(joinpath(experiment_dir, "experiment_config.yml")) + # unpack experiment_config vars into scope for (key, value) in experiment_config @eval const $(Symbol(key)) = $value end -const prior = CAL.get_prior(joinpath(experiment_dir, prior_path)) - # load configs and directories model_config_dict = YAML.load_file(model_config) atmos_config = CA.AtmosConfig(model_config_dict) -zc_model = get_z_grid(atmos_config; z_max) + +# add workers +@info "Starting $ensemble_size workers." +addprocs( + CAL.SlurmManager(Int(ensemble_size)), + t = experiment_config["slurm_time"], + mem_per_cpu = experiment_config["slurm_mem_per_cpu"], + cpus_per_task = experiment_config["slurm_cpus_per_task"], +) + + +@everywhere begin + using ClimaCalibrate + import ClimaCalibrate as CAL + import ClimaAtmos as CA + import JLD2 + import YAML + + include("observation_map.jl") + + experiment_dir = dirname(Base.active_project()) + const model_interface = joinpath(experiment_dir, "model_interface.jl") + const experiment_config = + YAML.load_file(joinpath(experiment_dir, "experiment_config.yml")) + + include(model_interface) + +end + + +if get(model_config_dict, "mixing_length_model", "") == "nn" + prior = create_prior_with_nn( + prior_path, + pretrained_nn_path; + arc = [8, 20, 15, 10, 1], + ) +else + const prior = CAL.get_prior(joinpath(experiment_dir, prior_path)) +end ### create output directories & copy configs mkpath(output_dir) @@ -46,6 +91,43 @@ cp( joinpath(output_dir, "configs", "prior.toml"), force = true, ) + +# Copy TOML files specified in model_config.yml +if haskey(model_config_dict, "toml") && !isnothing(model_config_dict["toml"]) + model_config_dir = dirname(model_config) + for toml_rel_path in model_config_dict["toml"] + source_toml_path = joinpath(model_config_dir, toml_rel_path) + # Preserve relative path, don't use basename + dest_toml_path = joinpath(output_dir, "configs", toml_rel_path) + if isfile(source_toml_path) + # Ensure the destination directory exists + mkpath(dirname(dest_toml_path)) + cp(source_toml_path, dest_toml_path, force = true) + @info "Copied $source_toml_path to $dest_toml_path" + else + @warn "TOML file specified in model_config not found: $source_toml_path" + end + end +end + +# Copy forcing config files +if haskey(experiment_config, "forcing_toml_files") + for (forcing_type, config_file) in experiment_config["forcing_toml_files"] + source_path = joinpath(experiment_dir, config_file) + # Use the relative path from config_file, not just the basename + dest_path = joinpath(output_dir, "configs", config_file) + + if isfile(source_path) + # Ensure the destination directory exists + mkpath(dirname(dest_path)) + cp(source_path, dest_path, force = true) + @info "Copied forcing config $source_path to $dest_path" + else + @warn "Forcing config file not found: $source_path" + end + end +end + # save norm factors to output dir JLD2.jldsave( joinpath(output_dir, "norm_factors.jld2"); @@ -58,12 +140,21 @@ obs_vec = [] for ref_path in ref_paths + cfsite_number, _, _, _ = parse_les_path(ref_path) + forcing_type = get_cfsite_type(cfsite_number) + zc_model = get_cal_z_grid(atmos_config, z_cal_grid, forcing_type) + + ti = experiment_config["y_t_start_sec"] + ti = isa(ti, AbstractFloat) ? ti : ti[forcing_type] + tf = experiment_config["y_t_end_sec"] + tf = isa(tf, AbstractFloat) ? tf : tf[forcing_type] + y_obs, Σ_obs, norm_vec_obs = get_obs( ref_path, experiment_config["y_var_names"], zc_model; - ti = experiment_config["y_t_start_sec"], - tf = experiment_config["y_t_end_sec"], + ti = ti, + tf = tf, norm_factors_dict = norm_factors_by_var, z_score_norm = true, log_vars = log_vars, @@ -85,63 +176,42 @@ end series_names = [ref_paths[i] for i in 1:length(ref_paths)] -### define minibatcher +# function create_minibatches_internal(n_indices::Int, batch_size::Int) +# shuffled_indices = shuffle(1:n_indices) +# num_full_batches = div(n_indices, batch_size) +# batches = [collect(shuffled_indices[(i-1)*batch_size + 1 : i*batch_size]) for i in 1:num_full_batches] +# return batches +# end + +# minibatch_inds = create_minibatches_internal(length(series_names), experiment_config["batch_size"]) + +# @show minibatch_inds + rfs_minibatcher = EKP.FixedMinibatcher(collect(1:experiment_config["batch_size"])) + +# rfs_minibatcher = +# EKP.FixedMinibatcher(collect(1:experiment_config["batch_size"])) + +# rfs_minibatcher = EKP.no_minibatcher(experiment_config["batch_size"]) + observations = EKP.ObservationSeries(obs_vec, rfs_minibatcher, series_names) ### EKI hyperparameters/settings @info "Initializing calibration" n_iterations ensemble_size output_dir -CAL.initialize( + +eki = CAL.calibrate( + CAL.WorkerBackend, ensemble_size, + n_iterations, observations, + nothing, # noise alread sprecified in observations prior, output_dir; scheduler = EKP.DataMisfitController(on_terminate = "continue"), localization_method = EKP.Localizers.NoLocalization(), + ## localization_method = EKP.Localizers.SECNice(nice_loc_ug, nice_loc_gg), failure_handler_method = EKP.SampleSuccGauss(), accelerator = EKP.DefaultAccelerator(), + # # accelerator = EKP.NesterovAccelerator(), ) - -eki = nothing -hpc_kwargs = CAL.kwargs( - time = 60, - mem_per_cpu = "12G", - cpus_per_task = batch_size + 1, - ntasks = 1, - nodes = 1, - reservation = "clima", -) -module_load_str = CAL.module_load_string(CAL.CaltechHPCBackend) -for iter in 0:(n_iterations - 1) - @info "Iteration $iter" - jobids = map(1:ensemble_size) do member - @info "Running ensemble member $member" - CAL.slurm_model_run( - iter, - member, - output_dir, - experiment_dir, - model_interface, - module_load_str; - hpc_kwargs, - ) - end - - statuses = CAL.wait_for_jobs( - jobids, - output_dir, - iter, - experiment_dir, - model_interface, - module_load_str; - hpc_kwargs, - verbose = false, - reruns = 0, - ) - CAL.report_iteration_status(statuses, output_dir, iter) - @info "Completed iteration $iter, updating ensemble" - G_ensemble = CAL.observation_map(iter; config_dict = experiment_config) - CAL.save_G_ensemble(output_dir, iter, G_ensemble) - eki = CAL.update_ensemble(output_dir, iter, prior) -end diff --git a/calibration/experiments/gcm_driven_scm/run_calibration.sbatch b/calibration/experiments/gcm_driven_scm/run_calibration.sbatch index e6eb8acfdc..175d4592d8 100644 --- a/calibration/experiments/gcm_driven_scm/run_calibration.sbatch +++ b/calibration/experiments/gcm_driven_scm/run_calibration.sbatch @@ -1,15 +1,15 @@ #!/bin/bash -#SBATCH --time=06:00:00 # walltime -#SBATCH --nodes=1 # number of nodes -#SBATCH --ntasks=1 # number of processor cores (i.e. tasks) -#SBATCH --mem=5G # memory per CPU core +#SBATCH --time=7:00:00 # walltime #SBATCH -J "scm_calibration" # job name +#SBATCH --ntasks=100 # Number of tasks (~ num ensemble members) +#SBATCH --mem-per-cpu=25G +#SBATCH --cpus-per-task=1 # CPUs per task #SBATCH --output=scm_calibration.out module purge export MODULEPATH="/groups/esm/modules:$MODULEPATH" -module load climacommon/2024_05_27 +module load climacommon/2025_03_18 julia --project run_calibration.jl diff --git a/calibration/experiments/gcm_driven_scm/scm_runner/model_config_prognostic_runner.yml b/calibration/experiments/gcm_driven_scm/scm_runner/model_config_prognostic_runner.yml new file mode 100644 index 0000000000..87fbc450ce --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/scm_runner/model_config_prognostic_runner.yml @@ -0,0 +1,50 @@ +initial_condition: "GCM" +external_forcing: "GCM" +external_forcing_file: "/central/groups/esm/zhaoyi/GCMForcedLES/forcing/corrected/HadGEM2-A_amip.2004-2008.07.nc" +surface_setup: "GCM" +turbconv: "prognostic_edmfx" +implicit_diffusion: true +implicit_sgs_advection: false +approximate_linear_solve_iters: 2 +max_newton_iters_ode: 1 +edmfx_upwinding: "first_order" +rayleigh_sponge: true +edmfx_entr_model: "PiGroups" +edmfx_detr_model: "PiGroups" +precip_model: 1M +edmfx_sgs_mass_flux: true +edmfx_sgs_diffusive_flux: true +edmfx_nh_pressure: true +edmfx_filter: false +prognostic_tke: true +moist: "equil" +config: "column" +hyperdiff: "false" +z_max: 40e3 +z_elem: 60 +# mixing_length_model: "physical" # physical +mixing_length_model: "nn" +z_stretch: true +dz_bottom: 30 +rad: allskywithclear +insolation: "gcmdriven" +perturb_initstate: false +dt: "10secs" +dt_rad: "30mins" +t_end: "72hours" +dt_save_to_disk: "10mins" +cloud_model: "quadrature_sgs" +call_cloud_diagnostics_per_stage : true +ode_algo: "ARS343" +toml: [scm_tomls/prognostic_edmfx.toml] +netcdf_output_at_levels: true +netcdf_interpolation_num_points: [2, 2, 60] +diagnostics: +- short_name: [ts, ta, thetaa, ha, pfull, rhoa, ua, va, wa, hur, hus, cl, clw, cli, hussfc, evspsbl, pr] + period: 10mins +- short_name: [arup, waup, taup, thetaaup, haup, husup, hurup, clwup, cliup, waen, taen, thetaaen, haen, husen, huren, clwen, clien, tke] + period: 10mins +- short_name: [entr, detr, lmix, bgrad, strain, edt, evu, turbentr, obulen] + period: 10mins +- short_name: [rlut, rlutcs, rsut, rsutcs, rsdt, clwvi, lwp, clivi, dsevi, clvi, prw, hurvi, husv] + period: 10mins \ No newline at end of file diff --git a/calibration/experiments/gcm_driven_scm/scm_runner/parameters_nearest_neig_particle_i9_m3_precal_exp2.toml b/calibration/experiments/gcm_driven_scm/scm_runner/parameters_nearest_neig_particle_i9_m3_precal_exp2.toml new file mode 100644 index 0000000000..0182a87ecb --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/scm_runner/parameters_nearest_neig_particle_i9_m3_precal_exp2.toml @@ -0,0 +1,78 @@ +[rain_autoconversion_timescale] +value = 5777.697315577852 +type = "float" + +[mixing_length_tke_surf_scale] +value = 0.5507468907407347 +type = "float" + +[mixing_length_diss_coeff] +value = 0.13515021886991527 +type = "float" + +[diagnostic_covariance_coeff] +value = 1.858712542728056 +type = "float" + +[entr_param_vec] +value = [0.8232451844623447, 0.058776909570673115, 0.3812876963832981, 1.7521400508808318, -0.32149828410236075, -0.019764881527468917, -0.05926889361439075, 0.3610260801159997, 0.5411236315642174, -0.4681505113638351, 1.0674945161351919, 0.38794618444184004] +type = "float" + +[EDMF_surface_area] +value = 0.09964843547415161 +type = "float" + +[mixing_length_eddy_viscosity_coefficient] +value = 0.12938179994984028 +type = "float" + +[snow_autoconversion_timescale] +value = 145.68541447981468 +type = "float" + +[turb_entr_param_vec] +value = [0.026950324299085678, 3594.4850754174017] +type = "float" + +[pressure_normalmode_drag_coeff] +value = 25.872050045766862 +type = "float" + +[mixing_length_Prandtl_number_0] +value = 1.0831106562777666 +type = "float" + +[mixing_length_static_stab_coeff] +value = 0.21375280743443664 +type = "float" + +[pressure_normalmode_buoy_coeff1] +value = 0.17622092079471174 +type = "float" + +[EDMF_min_area] +value = 1.0e-5 + +[EDMF_max_area] +value = 0.7 + +[min_area_limiter_scale] +value = 0 + +[detr_inv_tau] +value = 0 + +[detr_coeff] +value = 0 + +[detr_buoy_coeff] +value = 0 + +[detr_vertdiv_coeff] +value = 0 + +[max_area_limiter_scale] +value = 0 + +[zd_rayleigh] +value = 20000.0 \ No newline at end of file diff --git a/calibration/experiments/gcm_driven_scm/scm_runner/runner_helper.jl b/calibration/experiments/gcm_driven_scm/scm_runner/runner_helper.jl new file mode 100644 index 0000000000..0719875399 --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/scm_runner/runner_helper.jl @@ -0,0 +1,55 @@ + +include("../get_les_metadata.jl") + + +function get_les_calibration_library_runner() # All HADGEM + les_library = get_LES_library() + ref_paths = String[] + cfsite_numbers = Int[] + models = ["HadGEM2-A"] + experiment = "amip" + for model in models + for month in keys(les_library[model]) + cfsite_numbers_i = [ + parse(Int, key) for + key in keys(les_library[model][month]["cfsite_numbers"]) + ] + les_kwargs = ( + forcing_model = model, + month = parse(Int, month), + experiment = experiment, + ) + for cfsite_number in cfsite_numbers_i + try + stats_path = get_stats_path( + get_cfsite_les_dir(cfsite_number; les_kwargs...), + ) + push!(ref_paths, stats_path) + push!(cfsite_numbers, cfsite_number) + catch e + if isa(e, AssertionError) + continue + else + rethrow(e) + end + end + end + end + end + + + return (ref_paths, cfsite_numbers) +end + + +# function get_les_calibration_library_runner() +# les_library = get_shallow_LES_library() +# # AMIP data: July, NE Pacific +# cfsite_numbers = (17, 18, 22, 23, 30, 94) +# les_kwargs = (forcing_model = "HadGEM2-A", month = 7, experiment = "amip") +# ref_paths = [ +# get_stats_path(get_cfsite_les_dir(cfsite_number; les_kwargs...)) for +# cfsite_number in cfsite_numbers +# ] +# return (ref_paths, cfsite_numbers) +# end diff --git a/calibration/experiments/gcm_driven_scm/scm_runner/scm_runner.jl b/calibration/experiments/gcm_driven_scm/scm_runner/scm_runner.jl new file mode 100644 index 0000000000..6a5d2aca1b --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/scm_runner/scm_runner.jl @@ -0,0 +1,145 @@ +import ClimaCalibrate as CAL +import ClimaAtmos as CA +import EnsembleKalmanProcesses as EKP +import YAML +import TOML +using Distributions +import EnsembleKalmanProcesses as EKP +using JLD2 + +import ClimaComms +@static pkgversion(ClimaComms) >= v"0.6" && ClimaComms.@import_required_backends + +import JLD2 +using LinearAlgebra + +include("../helper_funcs.jl") +include("runner_helper.jl") + +# Load experiment configuration to find forcing TOML files +const experiment_config_dict = + YAML.load_file(joinpath(@__DIR__, "..", "experiment_config.yml")) +const calib_output_dir = experiment_config_dict["output_dir"] # Base directory for calibration outputs/configs + +using Distributed + +using ArgParse + +NUM_WORKERS = 10 + + +# example usage +# sbatch scm_runner.sbatch --run_output_dir=/groups/esm/cchristo/climaatmos_scm_calibrations/scm_runs/nearest_neig_particle_i6_m31_exp51 --parameter_path=./optimal_tomls/parameters_nearest_neig_particle_i6_m31_exp51.toml + +function main() + s = ArgParseSettings() + @add_arg_table! s begin + "--run_output_dir" + help = "Directory to output SCM simulations" + required = true + + "--parameter_path" + help = "Path to parameter TOML file" + required = true + end + + args = parse_args(s) + + base_config_path = "./model_config_prognostic_runner.yml" + run_output_dir = args["run_output_dir"] + parameter_path = args["parameter_path"] + + all_configs = + generate_atmos_configs(base_config_path, parameter_path, run_output_dir) + run_all_simluations(all_configs) +end + +function generate_atmos_configs( + base_config_path::String, + parameter_path::String, + run_output_dir::String, +) + config_dict = YAML.load_file(base_config_path) + config_dict["output_dir"] = run_output_dir + config_dict["toml"] = [abspath(parameter_path)] + config_dict["output_default_diagnostics"] = false + + ref_paths, cfsite_numbers = get_les_calibration_library_runner() + num_cases = length(ref_paths) + atmos_configs = map(collect(1:num_cases)) do i + config = deepcopy(config_dict) + cfsite_info = get_cfsite_info_from_path(ref_paths[i]) + forcing_model = cfsite_info["forcing_model"] + experiment = cfsite_info["experiment"] + month = cfsite_info["month"] + cfsite_number = cfsite_info["cfsite_number"] + + config["external_forcing_file"] = get_forcing_file(i, ref_paths) + config["cfsite_number"] = string("site", cfsite_number) + config["output_dir"] = joinpath( + run_output_dir, + "cfsite_$(cfsite_number)_$(forcing_model)_$(experiment)_$(month)", + ) + # config["external_forcing_type"] = get_cfsite_type(i, cfsite_numbers) # Removed: No longer used + + # Add forcing-specific TOML file if specified + forcing_type = get_cfsite_type(i, cfsite_numbers) + if haskey(experiment_config_dict, "forcing_toml_files") && haskey( + experiment_config_dict["forcing_toml_files"], + forcing_type, + ) + forcing_config_file = + experiment_config_dict["forcing_toml_files"][forcing_type] + # Assume forcing configs are in a 'configs' subdir relative to the main calibration output dir + forcing_config_path = + joinpath(calib_output_dir, "configs", forcing_config_file) + + if isfile(forcing_config_path) + forcing_config_path = abspath(forcing_config_path) + # config["toml"] should already exist and contain parameter_path + push!(config["toml"], forcing_config_path) + @info "Added forcing config: $forcing_config_path for case $i (type: $forcing_type)" + else + @warn "Forcing config file not found at $forcing_config_path for case $i (type: $forcing_type)." + end + else + @warn "No forcing config file specified in experiment_config.yml for type '$forcing_type' for case $i." + end + + comms_ctx = ClimaComms.SingletonCommsContext() + CA.AtmosConfig(config; comms_ctx) + end + + return atmos_configs +end + +addprocs(NUM_WORKERS) +@everywhere begin + import ClimaAtmos as CA + using JLD2 +end + +@everywhere function run_atmos_simulation(atmos_config) + simulation = CA.get_simulation(atmos_config) + sol_res = CA.solve_atmos!(simulation) + if sol_res.ret_code == :simulation_crashed + !isnothing(sol_res.sol) && sol_res.sol .= eltype(sol_res.sol)(NaN) + error( + "The ClimaAtmos simulation has crashed. See the stack trace for details.", + ) + end +end + +function run_all_simluations(atmos_configs) + @info "Preparing to run $(length(atmos_configs)) model simulations in parallel." + println("Number of workers: ", nprocs()) + + start_time = time() + pmap(run_atmos_simulation, atmos_configs) + end_time = time() + elapsed_time = (end_time - start_time) / 60.0 + + @info "Finished all model simulations. Total time taken: $(elapsed_time) minutes." +end + +main() diff --git a/calibration/experiments/gcm_driven_scm/scm_runner/scm_runner.sbatch b/calibration/experiments/gcm_driven_scm/scm_runner/scm_runner.sbatch new file mode 100644 index 0000000000..cdd270baed --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/scm_runner/scm_runner.sbatch @@ -0,0 +1,17 @@ +#!/bin/bash + +#SBATCH --time=6:00:00 # walltime +#SBATCH --nodes=1 # number of nodes +#SBATCH --ntasks=11 # number of processor cores (i.e. tasks) +#SBATCH --mem=100G # memory +#SBATCH --cpus-per-task=1 +#SBATCH -J "scm_runner" # job name +#SBATCH --output=scm_runner_%j.out + +module purge +export MODULEPATH="/groups/esm/modules:$MODULEPATH" +module load climacommon/2024_05_27 + +julia --project scm_runner.jl "$@" + +echo finished \ No newline at end of file diff --git a/calibration/experiments/gcm_driven_scm/scm_tomls/gcmdriven_relaxation_deep_forcing.toml b/calibration/experiments/gcm_driven_scm/scm_tomls/gcmdriven_relaxation_deep_forcing.toml new file mode 100644 index 0000000000..afe4f1dcb5 --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/scm_tomls/gcmdriven_relaxation_deep_forcing.toml @@ -0,0 +1,11 @@ +[gcmdriven_momentum_relaxation_timescale] +value = 3600.0 + +[gcmdriven_scalar_relaxation_timescale] +value = 7200.0 + +[gcmdriven_relaxation_minimum_height] +value = 16000.0 + +[gcmdriven_relaxation_maximum_height] +value = 20000.0 \ No newline at end of file diff --git a/calibration/experiments/gcm_driven_scm/scm_tomls/gcmdriven_relaxation_shallow_forcing.toml b/calibration/experiments/gcm_driven_scm/scm_tomls/gcmdriven_relaxation_shallow_forcing.toml new file mode 100644 index 0000000000..a5f77036a5 --- /dev/null +++ b/calibration/experiments/gcm_driven_scm/scm_tomls/gcmdriven_relaxation_shallow_forcing.toml @@ -0,0 +1,11 @@ +[gcmdriven_momentum_relaxation_timescale] +value = 21600.0 + +[gcmdriven_scalar_relaxation_timescale] +value = 86400.0 + +[gcmdriven_relaxation_minimum_height] +value = 3000.0 + +[gcmdriven_relaxation_maximum_height] +value = 3500.0 \ No newline at end of file diff --git a/calibration/experiments/gcm_driven_scm/scm_tomls/prognostic_edmfx.toml b/calibration/experiments/gcm_driven_scm/scm_tomls/prognostic_edmfx.toml index dec7ad8655..363a11c85a 100644 --- a/calibration/experiments/gcm_driven_scm/scm_tomls/prognostic_edmfx.toml +++ b/calibration/experiments/gcm_driven_scm/scm_tomls/prognostic_edmfx.toml @@ -19,8 +19,8 @@ value = 0 [detr_vertdiv_coeff] value = 0 -[max_area_limiter_scale] -value = 0 - [zd_rayleigh] -value = 20000.0 \ No newline at end of file +value = 30000.0 + +[zd_viscous] +value = 30000.0 \ No newline at end of file