diff --git a/.buildkite/longruns/pipeline.yml b/.buildkite/longruns/pipeline.yml index be38a7a8ce..7847a6603a 100644 --- a/.buildkite/longruns/pipeline.yml +++ b/.buildkite/longruns/pipeline.yml @@ -184,21 +184,15 @@ steps: - group: "Calibration experiments" steps: - - label: "Perfect model calibration test" - key: "amip_pm_calibration" + - label: "Subseasonal calibration test" + key: "subseasonal_calibration" command: - - "julia --color=yes --project=experiments/ClimaEarth experiments/calibration/run_calibration.jl" - artifact_paths: "experiments/calibration/output/*" - env: - CLIMACOMMS_DEVICE: "CUDA" - CLIMACOMMS_CONTEXT: "MPI" + - julia --color=yes --project=experiments/ClimaEarth experiments/calibration/subseasonal/generate_observations.jl + - julia --color=yes --project=experiments/ClimaEarth experiments/calibration/subseasonal/run_calibration.jl + artifact_paths: "experiments/calibration/subseasonal/output/*" agents: queue: clima - slurm_mem: 96GB - slurm_ntasks: 3 - slurm_gpus_per_task: 1 - slurm_cpus_per_task: 4 - slurm_time: 05:00:00 + slurm_time: 24:00:00 - wait diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index ef313e22ba..ad3aeaecdb 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -395,7 +395,7 @@ steps: - label: "Perfect model calibration test" key: "amip_pm_calibration" command: - - "julia --color=yes --project=experiments/ClimaEarth experiments/calibration/run_calibration.jl" + - "julia --color=yes --project=experiments/ClimaEarth experiments/calibration/test/run_calibration.jl" artifact_paths: "experiments/calibration/output/*" env: CLIMACOMMS_DEVICE: "CUDA" diff --git a/.gitignore b/.gitignore index 4f4dd88cd6..715d882ce1 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ docs/src/generated/ # Experiments !experiments/ClimaEarth/**/Manifest.toml !experiments/ClimaCore/**/Manifest.toml +experiments/calibration/coarse_amip/output*/ # Output output/ diff --git a/config/atmos_configs/climaatmos_wx_diagedmf.yml b/config/atmos_configs/climaatmos_wx_diagedmf.yml index 5e60b80dab..2e28bcb73f 100644 --- a/config/atmos_configs/climaatmos_wx_diagedmf.yml +++ b/config/atmos_configs/climaatmos_wx_diagedmf.yml @@ -15,15 +15,16 @@ insolation: "timevarying" rad: allskywithclear dt_cloud_fraction: "1hours" dt_rad: "1hours" -# time_varying_trace_gases: ["O3"] -# prescribed_aerosols: ["CB1", "CB2", "DST01", "OC1", "OC2", "SO4", "SSLT01"] +aerosol_radiation: true +time_varying_trace_gases: ["CO2", "O3"] +prescribed_aerosols: ["CB1", "CB2", "DST01", "DST02", "DST03", "DST04", "DST05", "OC1", "OC2", "SO4", "SSLT01", "SSLT02", "SSLT03", "SSLT04", "SSLT05"] turbconv: diagnostic_edmfx implicit_diffusion: true approximate_linear_solve_iters: 2 prognostic_tke: true edmfx_upwinding: first_order -h_elem: 15 -z_max: 48000.0 +h_elem: 16 +z_max: 60000.0 z_elem: 63 dz_bottom: 30.0 edmfx_entr_model: "Generalized" diff --git a/config/subseasonal_configs/wxquest_diagedmf.yml b/config/subseasonal_configs/wxquest_diagedmf.yml index 5316cd18e2..4976fb10f1 100644 --- a/config/subseasonal_configs/wxquest_diagedmf.yml +++ b/config/subseasonal_configs/wxquest_diagedmf.yml @@ -2,36 +2,34 @@ FLOAT_TYPE: "Float32" albedo_model: "CouplerAlbedo" atmos_config_file: "config/atmos_configs/climaatmos_wx_diagedmf.yml" coupler_toml: ["toml/amip_diagedmf.toml"] -mode_name: "subseasonal" -era5_initial_condition_dir: "/net/sampo/data1/wxquest_data/initial_conditions" initial_condition: "WeatherModel" checkpoint_dt: "7days" -dt: "30secs" -dt_cpl: "30secs" +dt: "120secs" +dt_cpl: "120secs" +dt_cloud_fraction: "1hours" +dt_rad: "1hours" energy_check: false h_elem: 16 land_fraction_source: "era5" binary_area_fraction: false - -### integrated land ### -# land_model: "integrated" - -### bucket land ### +mode_name: "subseasonal" +era5_initial_condition_dir: "/net/sampo/data1/wxquest_data/initial_conditions" land_model: "bucket" bucket_albedo_type: "map_temporal" -bucket_initial_condition: "/net/sampo/data1/wxquest_data/initial_conditions/era5_bucket_processed_20250907_0000.nc" - land_spun_up_ic: false land_temperature_anomaly: "nothing" -use_land_diagnostics: true radiation_reset_rng_seed: true -start_date: "20250907" +start_date: "20180901" surface_setup: "PrescribedSurface" topo_smoothing: true topography: "Earth" -t_end: "7days" netcdf_output_at_levels: true +output_default_diagnostics: false +use_coupler_diagnostics: false +use_land_diagnostics: false +strict_params: false +insolation: "timevarying" extra_atmos_diagnostics: - - short_name: [tas, mslp, pr, ua, va, rhoa, pfull, hur, hus, clw, cli, clivi, clwvi, cl, arup, tke] - period: 1hours - reduction_time: average \ No newline at end of file + - short_name: [hfls, hfss, rsus, rlus] + period: 1months + reduction_time: average diff --git a/experiments/ClimaEarth/Artifacts.toml b/experiments/ClimaEarth/Artifacts.toml index d9be2d46e3..593f61efc6 100644 --- a/experiments/ClimaEarth/Artifacts.toml +++ b/experiments/ClimaEarth/Artifacts.toml @@ -45,3 +45,10 @@ git-tree-sha1 = "7f14ef1c7859da1363b8973743b2a376c008f29f" [[era5_land_fraction.download]] sha256 = "1d6acf98656b50f1a324ee3daa267281fb017314d2169dbfda25208299f44870" url = "https://caltech.box.com/shared/static/glqlyb626j3szcjltyuwp534yzzhf770.gz" + +[era5_monthly_averages_surface_single_level_1979_2024] +git-tree-sha1 = "6cddb07eeee2dd46dc5a19e9b2f706302ddba2c9" + + [[era5_monthly_averages_surface_single_level_1979_2024.download]] + sha256 = "46b422722d98c89c6bc0b8641bff259db1caee253f45389ddf9eb9c2d31ed605" + url = "https://caltech.box.com/shared/static/jbgtyt6oq9lxvk8il5zzck6k581q7f3k.gz" diff --git a/experiments/ClimaEarth/Manifest-v1.11.toml b/experiments/ClimaEarth/Manifest-v1.11.toml index da68d1de36..90c9312545 100644 --- a/experiments/ClimaEarth/Manifest-v1.11.toml +++ b/experiments/ClimaEarth/Manifest-v1.11.toml @@ -2,7 +2,7 @@ julia_version = "1.11.8" manifest_format = "2.0" -project_hash = "3fd2c8b8c3ebb502a89202b3f7285e40572f38a0" +project_hash = "c4f04d4e37057155b5f15db984f87880058617e1" [[deps.ADTypes]] git-tree-sha1 = "f7304359109c768cf32dc5fa2d371565bb63b68a" @@ -453,9 +453,9 @@ version = "0.34.0" [[deps.ClimaCalibrate]] deps = ["Dates", "Distributed", "Distributions", "EnsembleKalmanProcesses", "JLD2", "Logging", "Random", "TOML", "YAML"] -git-tree-sha1 = "0d7ff225f8cfe2f6adad34ed7ed6f32438d4a772" +git-tree-sha1 = "a0c8232f78f6136f223629cfc4abf7df5c292374" uuid = "4347a170-ebd6-470c-89d3-5c705c0cacc2" -version = "0.1.4" +version = "0.2.0" [deps.ClimaCalibrate.extensions] CESExt = "CalibrateEmulateSample" diff --git a/experiments/ClimaEarth/Project.toml b/experiments/ClimaEarth/Project.toml index 7f91d26775..b81b4995b9 100644 --- a/experiments/ClimaEarth/Project.toml +++ b/experiments/ClimaEarth/Project.toml @@ -38,7 +38,7 @@ YAML = "ddb6d928-2868-570f-bddf-ab3f9cf99eb6" [compat] CUDA = "5" -ClimaCalibrate = "0.1" +ClimaCalibrate = "0.2" ClimaLand = "1.4" ClimaOcean = "0.8.6" ClimaParams = "1.0" diff --git a/experiments/calibration/README.md b/experiments/calibration/README.md index 7f626b5109..936b16f587 100644 --- a/experiments/calibration/README.md +++ b/experiments/calibration/README.md @@ -1,18 +1,13 @@ # ClimaCoupler Calibration Experiments -This folder contains a trivial perfect-model calibration of the atmosphere coupled with the bucket model. -The calibration uses 30-day and lat/lon averages of top-of-atmosphere shortwave -radiation to calibrate the `total_solar_irradiance` parameter in a perfect model setting. -The current run script uses the `ClimaCalibrate.SlurmManager` to add Slurm workers - which run each ensemble member in parallel. -To run this calibration on a Slurm cluster, ensure that `run_calibration.sh` is -configured for your cluster and run `sbatch run_calibration.sh`. The output will -be generated in `experiments/calibration/output`. +This folder contains pipelines to reproduce coupled calibration experiments. +Each pipeline has its own subfolder: -Components: -- run_calibration.sh: SBATCH script used to instantiate the project and run the calibration on a Slurm cluster. -- run_calibration.jl: Julia script for the overall calibration and postprocessing. Contains the expriment configuration, such as ensemble size and number of iterations. -- model_interface.jl: Contains `forward_model`, the function that gets run during calibration. This basically just uses the `setup_run` function. -- model_config.yml: Contains the configuration for the coupler -- \ No newline at end of file +- perfect_model: A trivial perfect-model calibration of the atmosphere coupled +with the bucket model. The calibration uses 30-day and lat/lon averages of +top-of-atmosphere shortwave radiation to calibrate the `total_solar_irradiance` +parameter in a perfect model setting. The current run script uses the + `ClimaCalibrate.SlurmManager` to add Slurm workers which run each ensemble + member in parallel. +- subseasonal: Calibrates the inverse entrainment timescale to ERA5 October monthly surface fluxes and surface temperature from 2018 to 2024. diff --git a/experiments/calibration/api.jl b/experiments/calibration/api.jl new file mode 100644 index 0000000000..73699efc7c --- /dev/null +++ b/experiments/calibration/api.jl @@ -0,0 +1,154 @@ +import Dates + +""" + struct CalibrateConfig{SPINUP <: Dates.Period, EXTEND <: Dates.Period} + short_names::Vector{String} + minibatch_size::Int64 + n_iterations::Int64 + sample_date_ranges::Vector{NTuple{2, DATE}} + extend::EXTEND + spinup::SPINUP + nelements::Tuple{Int64, Int64} + output_dir::String + rng_seed::Int64 + end + +A configuration struct for keeping track of multiple fields that are of interest +to a user running calibration, or that are needed in multiple places (e.g., for +ensemble members and generating observations). +""" +struct CalibrateConfig{SPINUP <: Dates.Period, EXTEND <: Dates.Period} + "Configuration file to use for ClimaCoupler simulation" + config_file::String + + "The short names of the observations used for calibration. The short names + should match the same names used for the diagnostics." + short_names::Vector{String} + + "The size of the minibatch for each iteration" + minibatch_size::Int64 + + "The number of iterations to run the calibration for" + n_iterations::Int64 + + "The date ranges of the samples for calibration and used to determine the + start and end dates of a simulation for each iteration of calibration" + sample_date_ranges::Vector{NTuple{2, Dates.DateTime}} + + "The amount of time to run a simulation after the last date of the + minibatch" + extend::EXTEND + + "The amount of time to run a simulation before the first date of the + minibatch" + spinup::SPINUP + + "The directory to store the iterations and members of the calibration." + output_dir::String + + "An integer value for ensuring calibrations are the same between multiple + calibrations with the same settings" + rng_seed::Int64 +end + +""" + CalibrateConfig(; + config_file, + short_names, + sample_date_ranges, + extend, + spinup = Dates.Month(3), + minibatch_size, + n_iterations, + output_dir = "calibration/weatherquest",, + rng_seed = 42, + ) + +Initializes a CalibrateConfig, which is of interest to a user running +calibration or contains values needed in multiple places during calibration. + +Keyword arguments +===================== + +- `config_file`: Configuration file to use for ClimaCoupler simulation. + +- `short_names`: Short names of the observations. The currently supported short + names are `pr`, `tas`, and `mslp`. + +- `minibatch_size`: The size of the minibatch for each iteration. + +- `n_iterations`: The number of iterations to run the calibration for. + +- `sample_date_ranges`: The date ranges for each sample. The dates should be the + same as found in the time series data of the observations. + +- `extend`: The amount of time to run the simulation after the end date + determined by `sample_date_ranges`. For seasonal averages, `extend` should be + `Dates.Month(3)` and for monthly averages, `extend` should be + `Dates.Month(1)`. + +- `spinup`: The amount of time to run the simulation before the start date + determined by `sample_date_ranges`. + +- `nelements`: The resolution of the model. This is also used to determine the + mask of the observations. + +- `output_dir`: The location to save the calibration at. + +- `rng_seed`: An integer to ensure that calibration runs with the same settings + are the same. +""" +function CalibrateConfig(; + config_file, + short_names, + minibatch_size, + n_iterations, + sample_date_ranges, + extend, + spinup = Dates.Month(3), + output_dir = "calibration/weatherquest", + rng_seed = 42, +) + isempty(short_names) && error("Cannot run calibration with no short names") + isempty(sample_date_ranges) && + error("Cannot run calibration with no date ranges for the samples") + + sample_date_ranges = [ + (Dates.DateTime(date_pair[1]), Dates.DateTime(date_pair[2])) for + date_pair in sample_date_ranges + ] + + for (start_date, stop_date) in sample_date_ranges + start_date <= stop_date || error( + "The start date ($start_date) should be before the stop date ($stop_date)", + ) + end + issorted(sample_date_ranges) || + error("The samples in $sample_date_ranges should be sorted") + + minibatch_size > 0 || error("The minibatch size ($minibatch_size) should be positive") + n_iterations > 0 || error("The number of iterations ($n_iterations) should be positive") + + num_samples = length(sample_date_ranges) + minibatch_size > num_samples && error( + "The minibatch size is $minibatch_size, but the number of samples is $num_samples", + ) + + remaining = num_samples % minibatch_size + remaining == 0 || @warn( + "Number of samples is not divisible by the minibatch size; the last $remaining samples may be missing when running the calibration" + ) + + return CalibrateConfig( + config_file, + short_names, + minibatch_size, + n_iterations, + sample_date_ranges, + extend, + spinup, + output_dir, + rng_seed, + ) + +end diff --git a/experiments/calibration/run_calibration.sh b/experiments/calibration/run_calibration.sh deleted file mode 100644 index 2f8e6ebb8a..0000000000 --- a/experiments/calibration/run_calibration.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash - -#SBATCH --partition=a3 -#SBATCH --output="run_calibration.txt" -#SBATCH --time=05:00:00 -#SBATCH --ntasks=10 -#SBATCH --gpus-per-task=1 -#SBATCH --cpus-per-task=4 - -julia --project=experiments/ClimaEarth -e 'using Pkg; Pkg.develop(;path="."); Pkg.instantiate(;verbose=true)' - -julia --project=experiments/ClimaEarth experiments/calibration/run_calibration.jl - diff --git a/experiments/calibration/subseasonal/generate_observations.jl b/experiments/calibration/subseasonal/generate_observations.jl new file mode 100644 index 0000000000..7c492ddd7d --- /dev/null +++ b/experiments/calibration/subseasonal/generate_observations.jl @@ -0,0 +1,158 @@ +import ClimaAnalysis +import ClimaAnalysis: OutputVar +import ClimaCalibrate +import ClimaCoupler +import Dates +import EnsembleKalmanProcesses as EKP +import JLD2 +import ClimaDiagnostics +import ClimaCore +import ClimaUtilities.ClimaArtifacts.@clima_artifact +import Pkg.Artifacts +# Access CalibrateConfig +include(joinpath(@__DIR__, "run_calibration.jl")) +include(joinpath(@__DIR__, "observation_utils.jl")) + +# Path to the ClimaEarth Artifacts.toml file +const CLIMAEARTH_ARTIFACTS_TOML = + joinpath(pkgdir(ClimaCoupler), "experiments", "ClimaEarth", "Artifacts.toml") + +""" + load_var(filepath, short_name; varname=nothing, flip_sign=false, transform_dates=false) + +Helper function to load and process an ERA5 variable with common transformations. +""" +function load_var( + filepath, + short_name; + varname = nothing, + flip_sign = false, + transform_dates = nothing, +) + # TODO: Don't assume monthly data + var = + isnothing(varname) ? OutputVar(filepath) : + OutputVar(filepath, varname; shift_by = Dates.firstdayofmonth) + + flip_sign && (var = -var) + + var.attributes["short_name"] = short_name + + if !isnothing(transform_dates) + var = ClimaAnalysis.Var._shift_by(var, date -> date - transform_dates) + end + if !issorted(latitudes(var)) + var = reverse_dim(var, latitude_name(var)) + end + + return var +end + +""" + load_vars(obsdir, short_names) + +Load NetCDF files belonging to `short_names` in `obsdir` as `OutputVar`s. +""" +function load_vars() + # Use Artifacts API directly with explicit path to ClimaEarth Artifacts.toml + # This is necessary because @clima_artifact searches upward from the source + # file, which won't find experiments/ClimaEarth/Artifacts.toml + + artifact_path = Artifacts.ensure_artifact_installed( + "era5_monthly_averages_surface_single_level_1979_2024", + CLIMAEARTH_ARTIFACTS_TOML, + ) + flux_file = joinpath( + artifact_path, + "era5_monthly_averages_surface_single_level_197901-202410.nc", + ) + + lhf = load_var(flux_file, "hfls"; varname = "mslhf", flip_sign = true) + shf = load_var(flux_file, "hfss"; varname = "msshf", flip_sign = true) + rsus = load_var(flux_file, "rsus"; varname = "msuwswrf") + rlus = load_var(flux_file, "rlus"; varname = "msuwlwrf") + + return [lhf, shf, rsus, rlus] +end + +""" + preprocess_vars(vars) + +Preprocess each OutputVar in `vars` by keeping the relevant dates in +`sample_date_ranges`. +""" +function preprocess_vars(vars) + resample_var = resampled_lonlat(CALIBRATE_CONFIG.config_file) + vars = map(vars) do var + var = resample_var(var) + var = set_units(var, var_units[short_name(var)]) + end + + return vars +end + +function make_observation_vector(vars, sample_date_ranges) + covar_estimator = ClimaCalibrate.ObservationRecipe.ScalarCovariance(; + scalar = 5.0, + use_latitude_weights = true, + ) + obs_vec = map(sample_date_ranges) do sample_date_range + ClimaCalibrate.ObservationRecipe.observation( + covar_estimator, + vars, + first(sample_date_range), + last(sample_date_range), + ) + end + return obs_vec +end + +""" + resampled_lonlat(config_file) + +Return a function to resample longitude and latitudes according to the model +grid specified by `config_file`. +""" +function resampled_lonlat(config_file) + config_dict = ClimaCoupler.Input.get_coupler_config_dict(CALIBRATE_CONFIG.config_file) + if !isnothing(config_dict["netcdf_interpolation_num_points"]) + (nlon, nlat, nlev) = tuple(config_dict["netcdf_interpolation_num_points"]...) + else + cs = CoupledSimulation(config_file) + center_space = cs.model_sims.atmos_sim.domain.center_space + (nlon, nlat, nlev) = ClimaDiagnostics.Writers.default_num_points(center_space) + stretch = center_space.grid.vertical_grid.topology.mesh.stretch + dz_bottom = center_space.grid.vertical_grid.topology.mesh.faces[2].z + z = range(dz_bottom, ClimaCore.Spaces.z_max(center_space), nlev) + end + lon = range(-180, 180, nlon) + lat = range(-90, 90, nlat) + # TODO: Generalize to 3D vars, account for stretch for 3D variables + return var -> resampled_as(var; lon, lat) +end + +if abspath(PROGRAM_FILE) == @__FILE__ + ENV["CLIMACOMMS_CONTEXT"] = "SINGLETON" + sample_date_ranges = CALIBRATE_CONFIG.sample_date_ranges + short_names = CALIBRATE_CONFIG.short_names + config_file = CALIBRATE_CONFIG.config_file + @info "Generating observations for $short_names" + @info "The number of samples is $(length(sample_date_ranges)) over $sample_date_ranges" + + unprocessed_vars = load_vars() + + preprocessed_vars = preprocess_vars(unprocessed_vars) + + JLD2.save_object( + joinpath( + pkgdir(ClimaCoupler), + "experiments/calibration/subseasonal/preprocessed_vars.jld2", + ), + preprocessed_vars, + ) + observation_vector = make_observation_vector(preprocessed_vars, sample_date_ranges) + JLD2.save_object( + joinpath(pkgdir(ClimaCoupler), "experiments/calibration/subseasonal/obs_vec.jld2"), + observation_vector, + ) +end diff --git a/experiments/calibration/subseasonal/model_interface.jl b/experiments/calibration/subseasonal/model_interface.jl new file mode 100644 index 0000000000..a6bf87e91f --- /dev/null +++ b/experiments/calibration/subseasonal/model_interface.jl @@ -0,0 +1,52 @@ +ENV["CLIMACOMMS_DEVICE"] = "CUDA" +ENV["CLIMACOMMS_CONTEXT"] = "SINGLETON" +import ClimaCoupler +import ClimaCalibrate +import CUDA +import EnsembleKalmanProcesses as EKP +include(joinpath(pkgdir(ClimaCoupler), "experiments", "ClimaEarth", "setup_run.jl")) +include( + joinpath( + pkgdir(ClimaCoupler), + "experiments", + "calibration", + "subseasonal", + "run_calibration.jl", + ), +) +using Pkg +function ClimaCalibrate.forward_model(iter, member) + Pkg.status() + config_dict = ClimaCoupler.Input.get_coupler_config_dict(CALIBRATE_CONFIG.config_file) + output_dir_root = CALIBRATE_CONFIG.output_dir + start_date = + first(CALIBRATE_CONFIG.sample_date_ranges[iter + 1]) - CALIBRATE_CONFIG.spinup + start_date_str = replace(string(Date(start_date)), "-" => "") + end_date = last(CALIBRATE_CONFIG.sample_date_ranges[iter + 1]) + CALIBRATE_CONFIG.extend + sim_length = Second(end_date - start_date) + + config_dict["start_date"] = start_date_str + config_dict["bucket_initial_condition"] = "/net/sampo/data1/wxquest_data/initial_conditions/era5_bucket_processed_$(start_date_str)_0000.nc" + config_dict["t_end"] = "$(sim_length.value)secs" + config_dict["checkpoint_dt"] = "900days" + config_dict["dt"] = "90secs" + config_dict["dt_cpl"] = "90secs" + + # Set member parameter file + sampled_parameter_file = ClimaCalibrate.parameter_path(output_dir_root, iter, member) + if haskey(config_dict, "coupler_toml") + config_dict["coupler_toml"] = + [config_dict["coupler_toml"]..., sampled_parameter_file] + else + config_dict["coupler_toml"] = [sampled_parameter_file] + end + # Set member output directory + member_output_dir = + ClimaCalibrate.path_to_ensemble_member(output_dir_root, iter, member) + config_dict["coupler_output_dir"] = member_output_dir + + @info "Simulation dates" start_date end_date + setup_and_run(config_dict) + @info "Completed member $member" + return nothing +end diff --git a/experiments/calibration/subseasonal/observation_map.jl b/experiments/calibration/subseasonal/observation_map.jl new file mode 100644 index 0000000000..376f9fcac2 --- /dev/null +++ b/experiments/calibration/subseasonal/observation_map.jl @@ -0,0 +1,278 @@ +using Statistics +import JLD2 +# import GeoMakie +# import Makie +import Dates +using ClimaAnalysis +import ClimaCalibrate +import ClimaAnalysis.Utils: kwargs as ca_kwargs +import ClimaCoupler +import ClimaCalibrate: EnsembleBuilder + +include(joinpath(@__DIR__, "observation_utils.jl")) + +""" + ClimaCalibrate.observation_map(iteration) + +Return G ensemble for an `iteration`. + +G ensemble represents the concatenated forward model evaluations from all +ensemble members, arranged horizontally. Each individual forward model +evaluation corresponds to preprocessed, flattened simulation data from a single +ensemble member that has been matched to the corresponding observational data. +""" +function ClimaCalibrate.observation_map(iteration) + output_dir = CALIBRATE_CONFIG.output_dir + ekp = JLD2.load_object(ClimaCalibrate.ekp_path(output_dir, iteration)) + + g_ens_builder = EnsembleBuilder.GEnsembleBuilder(ekp) + + for m in 1:EKP.get_N_ens(ekp) + member_path = ClimaCalibrate.path_to_ensemble_member(output_dir, iteration, m) + simdir_path = joinpath(member_path, "wxquest_diagedmf/output_active") + @info "Processing member $m: $simdir_path" + try + process_member_data!(g_ens_builder, simdir_path, m, iteration) + catch e + @error "Ensemble member $m failed" exception = (e, catch_backtrace()) + EnsembleBuilder.fill_g_ens_col!(g_ens_builder, m, NaN) + end + end + if count(isnan, g_ens_builder.g_ens) > 0.9 * length(g_ens_builder.g_ens) + error("Too many NaNs") + end + return g_ens_builder.g_ens +end + +""" + process_member_data!(diagnostics_folder_path, short_names, current_minibatch) + +Process the data of a single ensemble member and return a single column of the +G ensemble matrix. +""" +function process_member_data!(g_ens_builder, diagnostics_folder_path, col_idx, iteration) + short_names = EnsembleBuilder.missing_short_names(g_ens_builder, col_idx) + sample_date_ranges = CALIBRATE_CONFIG.sample_date_ranges[iteration + 1] + @info "Short names: $short_names" + + simdir = ClimaAnalysis.SimDir(diagnostics_folder_path) + for short_name in short_names + var = get_var(short_name, simdir) + var = preprocess_var(var, sample_date_ranges) + + EnsembleBuilder.fill_g_ens_col!(g_ens_builder, col_idx, var; verbose = true) + end + + return nothing +end + +function largest_period(sample_date_range) + span = maximum(sample_date_range) - minimum(sample_date_range) + span = Millisecond(span) + span.value == 0 && return Month(1) + day_in_ms = 8.64e7 + period = + span.value >= day_in_ms * 365 ? Year(1) : + span.value >= day_in_ms * 30 ? Month(1) : + span.value >= day_in_ms * 7 ? Week(1) : Day(1) + return period +end + +function get_var(short_name, simdir) + if short_name == "tas - ta" + tas = get(simdir; short_name = "tas") + ta = get(simdir; short_name = "ta") + # TODO: figure out why this doesn't work + # pfull = get(simdir; short_name = "pfull") + # ta_900 = ClimaAnalysis.Atmos.to_pressure_coordinates(ta, pfull; target_pressure=[900]) + ta_900hpa = slice(ta; z = 1000) + var = tas - ta_900hpa + else + # TODO: support multiple periods/reductions of same variable + var = get(simdir; short_name) + end + + var.attributes["short_name"] = short_name + return var +end + +""" + preprocess_var(var::ClimaAnalysis.OutputVar, reference_date) + +Preprocess `var` before flattening for G ensemble matrix. + +For "pr", weekly sums are computed. For "tas" and "mslp", weekly means are +computed from daily means. The daily means are computing starting from +`reference_date`. + +This function assumes that the data is monthly. +""" +function preprocess_var(var, sample_date_range) + period = largest_period(sample_date_range) + var = ClimaAnalysis.Var._shift_by(var, date -> date - period) + var = set_units(var, var_units[short_name(var)]) + # TODO: Match dates instead of just windowing + var = window(var, "time"; left = sample_date_range[1], right = sample_date_range[2]) + return var +end + +""" + ClimaCalibrate.analyze_iteration(ekp, + g_ensemble, + prior, + output_dir, + iteration) + +Analyze an iteration by plotting the bias plots, constrained parameters over +iterations, and errors over iterations and time. +""" +function ClimaCalibrate.analyze_iteration(ekp, g_ensemble, prior, output_dir, iteration) + plot_output_path = ClimaCalibrate.path_to_iteration(output_dir, iteration) + plot_constrained_params_and_errors(plot_output_path, ekp, prior) + + simdir = SimDir(ClimaCalibrate.path_to_ensemble_member(output_dir, iteration, 1)) + plot_bias(ekp, simdir, iteration; output_dir = plot_output_path) + + @info "Ensemble spread: $(scalar_spread(ekp))" +end + +""" + plot_constrained_params_and_errors(output_dir, ekp, prior) + +Plot the constrained parameters and errors from `ekp` and `prior` and save +them to `output_dir`. +""" +function plot_constrained_params_and_errors(output_dir, ekp, prior) + dim_size = sum(length.(EKP.batch(prior))) + fig = CairoMakie.Figure(size = ((dim_size + 1) * 500, 500)) + for i in 1:dim_size + EKP.Visualize.plot_ϕ_over_iters(fig[1, i], ekp, prior, i) + end + EKP.Visualize.plot_error_over_iters(fig[1, dim_size + 1], ekp, error_metric = "loss") + EKP.Visualize.plot_error_over_time(fig[1, dim_size + 2], ekp, error_metric = "loss") + CairoMakie.save(joinpath(output_dir, "constrained_params_and_error.png"), fig) + return nothing +end + +bias_plot_extrema = Dict( + "tas" => (-6, 6), + "tas - ta" => (-6, 6), + "hfls" => (-50, 50), + "hfss" => (-25, 25), + "rsus" => (-50, 50), + "rlus" => (-50, 50), + "mslp" => (-1000, 1000), + "pr" => (-1e-4, 1e-4), +) + +""" + plot_bias(ekp, simdir, iteration; output_dir) + +Plot bias maps comparing simulation output to observations for all variables in +`CALIBRATE_CONFIG.short_names`. + +Uses observations from the EKP object via `reconstruct_vars` and compares +them against simulation variables for each date in the sample date ranges. +""" +function plot_bias(ekp, simdir, iteration; output_dir = simdir.simulation_path) + # Get observations for this iteration + sample_date_ranges = CALIBRATE_CONFIG.sample_date_ranges[iteration + 1] + obs_series = EKP.get_observation_series(ekp) + minibatch_obs = ClimaCalibrate.ObservationRecipe.get_observations_for_nth_iteration( + obs_series, + iteration + 1, + ) + + # Reconstruct OutputVars from observations (ERA5 data) + era5_vars = [] + for obs in minibatch_obs + obs_vars = ClimaCalibrate.ObservationRecipe.reconstruct_vars(obs) + append!(era5_vars, obs_vars) + end + + # Get simulation variables for all short_names + sim_vars = [] + for short_name in CALIBRATE_CONFIG.short_names + var = get_var(short_name, simdir) + var = preprocess_var(var, sample_date_ranges) + push!(sim_vars, var) + end + + # Match sim_vars with era5_vars by short_name + var_pairs = [] + for sim_var in sim_vars + sim_short_name = ClimaAnalysis.short_name(sim_var) + era5_idx = findfirst(v -> ClimaAnalysis.short_name(v) == sim_short_name, era5_vars) + if !isnothing(era5_idx) + push!(var_pairs, (sim_var, era5_vars[era5_idx])) + else + @warn "No ERA5 data found for $(sim_short_name)" + end + end + + if isempty(var_pairs) + @warn "No matching variable pairs found for bias plotting" + return nothing + end + + # Get sample dates for this iteration + sample_dates = unique(CALIBRATE_CONFIG.sample_date_ranges[iteration + 1]) + + # Create figure + fig = GeoMakie.Figure(size = (1500, 500 * length(var_pairs))) + for (j, date) in enumerate(sample_dates) + for (i, (sim_var, era5_var)) in enumerate(var_pairs) + sim_var_t = slice(sim_var, time = date) + era5_var_t = slice(era5_var, time = date) + + # Calculate biases + global_bias = ClimaAnalysis.global_bias(sim_var_t, era5_var_t) + global_mean = weighted_average_lonlat(sim_var_t).data[1] + relative_global_bias = global_bias / global_mean + + land_bias = ClimaAnalysis.global_bias( + sim_var_t, + era5_var_t; + mask = ClimaAnalysis.apply_oceanmask, + ) + land_mean = + weighted_average_lonlat(ClimaAnalysis.apply_oceanmask(sim_var_t)).data[1] + relative_land_bias = land_bias / land_mean + + ocean_bias = ClimaAnalysis.global_bias( + sim_var_t, + era5_var_t; + mask = ClimaAnalysis.apply_landmask, + ) + ocean_mean = + weighted_average_lonlat(ClimaAnalysis.apply_landmask(sim_var_t)).data[1] + relative_ocean_bias = ocean_bias / ocean_mean + + @info short_name(sim_var_t) relative_global_bias global_bias global_mean + @info short_name(sim_var_t) relative_land_bias land_bias land_mean + @info short_name(sim_var_t) relative_ocean_bias ocean_bias ocean_mean + + cmap_extrema = + get(bias_plot_extrema, short_name(sim_var_t), extrema(sim_var_t.data)) + + # Plot bias + ax = ClimaAnalysis.Visualize.plot_bias_on_globe!( + fig[i, j], + sim_var_t, + era5_var_t; + cmap_extrema, + ) + end + end + + GeoMakie.save(joinpath(output_dir, "bias_sample_dates.png"), fig) + return nothing +end + + +function scalar_spread(ekp) + g_mean_final = EKP.get_g_mean_final(ekp) + g_final = EKP.get_g_final(ekp) + sq_dists = [sum((col .- g_mean_final) .^ 2) for col in eachcol(g_final)] + return mean(sq_dists) +end diff --git a/experiments/calibration/subseasonal/observation_utils.jl b/experiments/calibration/subseasonal/observation_utils.jl new file mode 100644 index 0000000000..ef15ba77b6 --- /dev/null +++ b/experiments/calibration/subseasonal/observation_utils.jl @@ -0,0 +1,25 @@ +import ClimaCoupler +using Statistics +import Dates +using ClimaAnalysis + +include(joinpath(pkgdir(ClimaCoupler), "experiments/ClimaEarth/setup_run.jl")) +ext = Base.get_extension(ClimaCalibrate, :ClimaAnalysisExt) + + +var_units = Dict( + "pr" => "kg m^-2 s^-1", + "mslp" => "Pa", + "tas" => "K", + "tas - ta" => "K", + "hfls" => "W m^-2", + "hfss" => "W m^-2", + "rsus" => "W m^-2", + "rlus" => "W m^-2", +) + +function remove_global_mean(var) + mean_var = ClimaAnalysis.Var.average_lonlat(var; weighted = true) + mean_data = mean_var.data + return ClimaAnalysis.replace(val -> val - mean_data[1], var) +end diff --git a/experiments/calibration/subseasonal/run_calibration.jl b/experiments/calibration/subseasonal/run_calibration.jl new file mode 100644 index 0000000000..1934d76ea4 --- /dev/null +++ b/experiments/calibration/subseasonal/run_calibration.jl @@ -0,0 +1,94 @@ +using Dates +using Distributed +import Random +import ClimaCalibrate +import ClimaAnalysis +import ClimaComms +import ClimaCoupler +import EnsembleKalmanProcesses as EKP +import EnsembleKalmanProcesses.ParameterDistributions as PD +import JLD2 + + +include(joinpath(pkgdir(ClimaCoupler), "experiments", "calibration", "api.jl")) +include( + joinpath( + pkgdir(ClimaCoupler), + "experiments/calibration/subseasonal/observation_map.jl", + ), +) +model_interface = joinpath( + pkgdir(ClimaCoupler), + "experiments", + "calibration", + "subseasonal", + "model_interface.jl", +) + +years = 2018:2021 +sample_date_ranges = [(DateTime(yr, 9, 1), DateTime(yr, 9, 1)) for yr in years] +const CALIBRATE_CONFIG = CalibrateConfig(; + config_file = joinpath( + pkgdir(ClimaCoupler), + "config/subseasonal_configs/wxquest_diagedmf.yml", + ), + short_names = ["hfls", "hfss", "rsus", "rlus"], + minibatch_size = 1, + n_iterations = 3, + sample_date_ranges, + extend = Dates.Month(1), + spinup = Dates.Month(0), + output_dir = "output/subseasonal", + rng_seed = 42, +) + +if abspath(PROGRAM_FILE) == @__FILE__ + priors = [PD.constrained_gaussian("precipitation_timescale", 600, 300, 100, 1000)] + prior = EKP.combine_distributions(priors) + + observation_vector = JLD2.load_object( + joinpath(pkgdir(ClimaCoupler), "experiments/calibration/subseasonal/obs_vec.jld2"), + ) + + sample_date_ranges = CALIBRATE_CONFIG.sample_date_ranges + minibatch_size = CALIBRATE_CONFIG.minibatch_size + # Structure observations into an ObservationSeries + obs_series = EKP.ObservationSeries( + Dict( + "observations" => observation_vector, + "names" => [ + string(Dates.year(start_date)) for + (start_date, stop_date) in sample_date_ranges + ], + "minibatcher" => ClimaCalibrate.minibatcher_over_samples( + length(observation_vector), + minibatch_size, + ), + ), + ) + + rng_seed = CALIBRATE_CONFIG.rng_seed + rng = Random.MersenneTwister(rng_seed) + + ekp = EKP.EnsembleKalmanProcess( + obs_series, + EKP.TransformUnscented(prior, impose_prior = true); + verbose = true, + rng, + scheduler = EKP.DataMisfitController(terminate_at = 100), + ) + + backend = ClimaCalibrate.ClimaGPUBackend(; + hpc_kwargs = ClimaCalibrate.kwargs(gpus = 1, time = 60 * 6), + model_interface, + verbose = true, + ) + + eki = ClimaCalibrate.calibrate( + backend, + ekp, + CALIBRATE_CONFIG.n_iterations, + prior, + CALIBRATE_CONFIG.output_dir; + ) +end diff --git a/experiments/calibration/model_config.yml b/experiments/calibration/test/model_config.yml similarity index 100% rename from experiments/calibration/model_config.yml rename to experiments/calibration/test/model_config.yml diff --git a/experiments/calibration/model_interface.jl b/experiments/calibration/test/model_interface.jl similarity index 79% rename from experiments/calibration/model_interface.jl rename to experiments/calibration/test/model_interface.jl index ecaf9674af..ac39c6ba10 100644 --- a/experiments/calibration/model_interface.jl +++ b/experiments/calibration/test/model_interface.jl @@ -3,9 +3,14 @@ import ClimaCalibrate include(joinpath(pkgdir(ClimaCoupler), "experiments", "ClimaEarth", "setup_run.jl")) function ClimaCalibrate.forward_model(iter, member) - config_file = - joinpath(pkgdir(ClimaCoupler), "experiments", "calibration", "model_config.yml") - config_dict = Input.get_coupler_config_dict(config_file) + config_file = joinpath( + pkgdir(ClimaCoupler), + "experiments", + "calibration", + "test", + "model_config.yml", + ) + config_dict = ClimaCoupler.Input.get_coupler_config_dict(config_file) # Run for a shorter time if SHORT_RUN is set if SHORT_RUN diff --git a/experiments/calibration/run_calibration.jl b/experiments/calibration/test/run_calibration.jl similarity index 93% rename from experiments/calibration/run_calibration.jl rename to experiments/calibration/test/run_calibration.jl index b9879c7d51..b0c3215d39 100644 --- a/experiments/calibration/run_calibration.jl +++ b/experiments/calibration/test/run_calibration.jl @@ -55,7 +55,7 @@ addprocs(CAL.SlurmManager()) ENV["CLIMACOMMS_CONTEXT"] = "SINGLETON" experiment_dir = joinpath(pkgdir(ClimaCoupler), "experiments", "calibration") - include(joinpath(experiment_dir, "model_interface.jl")) + include(joinpath(experiment_dir, "test", "model_interface.jl")) output_dir = joinpath(experiment_dir, "output") obs_path = joinpath(experiment_dir, "observations.jld2") end @@ -91,16 +91,9 @@ ensemble_size = EKP.get_N_ens(eki) # Allow 100% failure rate for short run testing if SHORT_RUN - eki = CAL.calibrate( - CAL.WorkerBackend, - eki, - n_iterations, - prior, - output_dir; - failure_rate = 1, - ) + eki = CAL.calibrate(CAL.WorkerBackend(), eki, n_iterations, prior, output_dir) else - eki = CAL.calibrate(CAL.WorkerBackend, eki, n_iterations, prior, output_dir) + eki = CAL.calibrate(CAL.WorkerBackend(), eki, n_iterations, prior, output_dir) end # Postprocessing