Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ PDMats = "0.11.17"
Random = "<0.0.1, 1"
Statistics = "<0.0.1, 1"
StructArrays = "0.6.15, 0.7"
Test = "<0.0.1, 1"
TimerOutputs = "0.5"
YAML = "0.4"
julia = "1.10"
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
[![Coverage](https://codecov.io/gh/Team-RADDISH/ParticleDA.jl/branch/main/graph/badge.svg)](https://codecov.io/gh/Team-RADDISH/ParticleDA.jl)
[![DOI](https://zenodo.org/badge/232626497.svg)](https://zenodo.org/badge/latestdoi/232626497)
[![DOI:10.5194/gmd-2023-38](https://img.shields.io/badge/Journal_article-10.5194/gmd--2023--38-d4a519.svg)](https://doi.org/10.5194/gmd-2023-38)
[![Aqua QA](https://raw.githubusercontent.com/JuliaTesting/Aqua.jl/master/badge.svg)](https://github.com/JuliaTesting/Aqua.jl)


`ParticleDA.jl` is a Julia package to perform data assimilation with particle filters,
`ParticleDA.jl` is a Julia package to perform data assimilation with particle filters,
supporting both thread-based parallelism and distributed processing using MPI.

This project is developed in collaboration with the
Expand Down
43 changes: 20 additions & 23 deletions src/filters.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function init_filter end
)

Sample new values for two-dimensional array of state vectors `states` from proposal
distribution writing in-place to `states` array and compute logarithm of unnormalized
distribution writing in-place to `states` array and compute logarithm of unnormalized
particle weights writing to `log_weights` given observation vector `observation`, with
state space model described by `model`, named tuple of filter specific data
structures `filter_data`, filter type `T` and random number generator `rng` used to
Expand All @@ -39,27 +39,27 @@ function sample_proposal_and_compute_log_weights! end
"""
BootstrapFilter <: ParticleFilter

Singleton type `BootstrapFilter`. This can be used as argument of
Singleton type `BootstrapFilter`. This can be used as argument of
[`run_particle_filter`](@ref) to select the bootstrap filter.
"""
struct BootstrapFilter <: ParticleFilter end

"""
OptimalFilter <: ParticleFilter

Singleton type `OptimalFilter`. This can be used as argument of
Singleton type `OptimalFilter`. This can be used as argument of
[`run_particle_filter`](@ref) to select the optimal proposal filter (for conditionally
linear-Gaussian models).
"""
struct OptimalFilter <: ParticleFilter end

# Initialize arrays used by the filter
function init_filter(
filter_params::FilterParameters,
model,
filter_params::FilterParameters,
model,
nprt_per_rank::Int,
n_tasks::Int,
::Type{BootstrapFilter},
::Type{BootstrapFilter},
summary_stat_type::Type{<:AbstractSummaryStat}
)
state_dimension = get_state_dimension(model)
Expand All @@ -78,7 +78,7 @@ function init_filter(

# Memory buffer used during copy of the states
copy_buffer = Array{state_eltype, 2}(undef, state_dimension, nprt_per_rank)
return (;
return (;
weights,
resampling_indices,
statistics,
Expand All @@ -90,9 +90,9 @@ end

struct OfflineMatrices{R<:Real, M<:AbstractMatrix{R}, F<:AbstractPDMat{R}}
# Covariance between state X and observations Y given previous state x
cov_X_Y::M
cov_X_Y::M
# Covariance between observations Y given previous state x
cov_Y_Y::F
cov_Y_Y::F
end

struct OnlineMatrices{T<:AbstractMatrix}
Expand All @@ -107,7 +107,7 @@ end
# Allocate and compute matrices that do not depend on time-dependent variables
function init_offline_matrices(model)
return OfflineMatrices(
get_covariance_state_observation_given_previous_state(model),
get_covariance_state_observation_given_previous_state(model),
get_covariance_observation_observation_given_previous_state(model),
)
end
Expand All @@ -130,11 +130,11 @@ end

# Initialize arrays used by the filter
function init_filter(
filter_params::FilterParameters,
filter_params::FilterParameters,
model,
nprt_per_rank::Int,
n_tasks::Int,
::Type{OptimalFilter},
::Type{OptimalFilter},
summary_stat_type::Type{<:AbstractSummaryStat}
)
filter_data = init_filter(
Expand Down Expand Up @@ -184,17 +184,14 @@ function get_log_density_observation_given_previous_state(
get_observation_mean_given_state!(observation_mean, pre_noise_state, model, task_index)
return -invquad(
filter_data.offline_matrices.cov_Y_Y, observation - observation_mean
) / 2
) / 2
end

# ldiv! not currently defined for PDMat so define here
LinearAlgebra.ldiv!(A::PDMat, B::AbstractMatrix) = ldiv!(A.chol, B)

function update_states_given_observations!(
states::AbstractMatrix,
observation::AbstractVector,
model,
filter_data,
states::AbstractMatrix,
observation::AbstractVector,
model,
filter_data,
rng::Random.AbstractRNG
)
observation_buffer = filter_data.online_matrices.observation_buffer
Expand All @@ -217,7 +214,7 @@ function update_states_given_observations!(
# To allow for only a subset of state components being correlated to observations
# (given previous state) and so needing to be updated as part of optimal proposal
# the model can specify the relevant indices to update. This avoids computing a
# zero update for such state components
# zero update for such state components
update_indices = get_state_indices_correlated_to_observations(model)
# Update particles to account for observations, X = X - QHᵀ(HQHᵀ + R)⁻¹(Y − y)
# The following lines are equivalent to the single statement version
Expand All @@ -227,7 +224,7 @@ function update_states_given_observations!(
# but we stage across multiple statements to allow using in-place operations to
# avoid unnecessary allocations.
observation_buffer .-= observation
ldiv!(cov_Y_Y, observation_buffer)
ldiv!(cholesky(cov_Y_Y), observation_buffer)
mul!(state_buffer, cov_X_Y, observation_buffer)
@view(states[update_indices, :]) .-= state_buffer
end
Expand Down Expand Up @@ -258,6 +255,6 @@ function sample_proposal_and_compute_log_weights!(
end
# Update to account for conditioning on observations can be performed using matrix-
# matrix level 3 BLAS operations therefore perform outside of threaded loop over
# particles
# particles
update_states_given_observations!(states, observation, model, filter_data, rng)
end
2 changes: 2 additions & 0 deletions test/Project.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[deps]
Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595"
ChunkSplitters = "ae650224-84b6-46f8-82ea-d812ca08434e"
DelimitedFiles = "8bb1440f-4735-579b-a4ab-409b98df4dab"
Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
Expand All @@ -15,6 +16,7 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
YAML = "ddb6d928-2868-570f-bddf-ab3f9cf99eb6"

[compat]
Aqua = "0.8"
ChunkSplitters = "3.1"
DelimitedFiles = "<0.0.1, 1"
Distributions = "0.22, 0.23, 0.24, 0.25"
Expand Down
91 changes: 49 additions & 42 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
using ParticleDA
using ParticleDA.Kalman
using HDF5, LinearAlgebra, MPI, PDMats, Random, StableRNGs, Statistics, Test, YAML
using HDF5, LinearAlgebra, MPI, PDMats, Random, StableRNGs, Statistics, Test, YAML

include(joinpath(@__DIR__, "models", "llw2d.jl"))
include(joinpath(@__DIR__, "models", "lorenz63.jl"))
include(joinpath(@__DIR__, "models", "lineargaussian.jl"))

using .LLW2d
using .Lorenz63
using Aqua

# We purposefully add, but not use, OpenMPI_jll:
# https://github.com/Team-RADDISH/ParticleDA.jl/pull/296. So let's run all the Aqua tests,
# excluding the stale dep one, and then run that one alone and ignore `OpenMPI_jll`.
Aqua.test_all(ParticleDA; stale_deps=false)
Aqua.test_stale_deps(ParticleDA; ignore=[:OpenMPI_jll])

@testset "LLW2d model unit tests" begin
dx = dy = 2e3
Expand Down Expand Up @@ -93,7 +100,7 @@ end
check_hdf5_group_valid(file, group, group_name)
@test read(group, dataset_name) == test_array
@test all([
read_attribute(group[dataset_name], k) == test_attributes[k]
read_attribute(group[dataset_name], k) == test_attributes[k]
for k in keys(test_attributes)
])
# test writing to existing dataset name results in warning and does not update
Expand All @@ -108,9 +115,9 @@ end
# test writing timer data
timer_strings = ["ab", "cde", "fg", "hij"]
ParticleDA.write_timers(
map(length, timer_strings),
length(timer_strings),
codeunits(join(timer_strings)),
map(length, timer_strings),
length(timer_strings),
codeunits(join(timer_strings)),
output_filename
)
h5open(output_filename, "cw") do file
Expand All @@ -125,7 +132,7 @@ end


@testset (
"Generic model interface unit tests - $(parentmodule(typeof(model)))"
"Generic model interface unit tests - $(parentmodule(typeof(model)))"
) for model in (
LLW2d.init(Dict()),
Lorenz63.init(Dict()),
Expand All @@ -152,7 +159,7 @@ end
"x_length" => 100e3,
"y_length" => 100e3,
"station_boundary_x" => 30e3,
"station_boundary_y" => 30e3,
"station_boundary_y" => 30e3,
)
)
),
Expand All @@ -175,7 +182,7 @@ end
config.model,
seed,
estimate_bound_constant,
config.estimate_n_samples;
config.estimate_n_samples;
RNGType=StableRNG
)
end
Expand Down Expand Up @@ -216,7 +223,7 @@ end
avg=mean(states; dims=2), var=var(states, corrected=true; dims=2)
)
for statistics_type in (
ParticleDA.NaiveMeanSummaryStat,
ParticleDA.NaiveMeanSummaryStat,
ParticleDA.NaiveMeanAndVarSummaryStat,
ParticleDA.MeanSummaryStat,
ParticleDA.MeanAndVarSummaryStat
Expand Down Expand Up @@ -272,12 +279,12 @@ end
new_states = copy(states)
Random.seed!(rng, seed)
ParticleDA.sample_proposal_and_compute_log_weights!(
new_states,
log_weights,
observation,
time_index,
model,
filter_data,
new_states,
log_weights,
observation,
time_index,
model,
filter_data,
filter_type,
rng
)
Expand All @@ -292,12 +299,12 @@ end
# Check filter update gives deterministic updates when rng state is fixed
Random.seed!(rng, seed)
ParticleDA.sample_proposal_and_compute_log_weights!(
new_states_2,
log_weights_2,
observation,
time_index,
model,
filter_data,
new_states_2,
log_weights_2,
observation,
time_index,
model,
filter_data,
filter_type,
rng
)
Expand All @@ -311,10 +318,10 @@ end
n_task = 1
model_params_dict = Dict(
"llw2d" => Dict(
"nx" => 32,
"ny" => 32,
"n_stations_x" => 4,
"n_stations_y" => 4,
"nx" => 32,
"ny" => 32,
"n_stations_x" => 4,
"n_stations_y" => 4,
"padding" => 0
)
)
Expand All @@ -333,8 +340,8 @@ end
online_matrices = ParticleDA.init_online_matrices(model, 1)
for f in nfields(online_matrices)
matrix = getfield(online_matrices, f)
@test isa(matrix, AbstractMatrix)
end
@test isa(matrix, AbstractMatrix)
end
state_dimension = ParticleDA.get_state_dimension(model)
updated_indices = ParticleDA.get_state_indices_correlated_to_observations(
model
Expand Down Expand Up @@ -374,10 +381,10 @@ end
# X = F(x) + U and Y = HX + V where x is the previous state value, F the forward
# operator for the deterministic state dynamics, U ~ Normal(0, Q) the additive
# state noise, X the state at the next time step, H the linear observation
# operator, V ~ Normal(0, R) the additive observation noise and Y the modelled
# observations, is Normal(m, C) where
# operator, V ~ Normal(0, R) the additive observation noise and Y the modelled
# observations, is Normal(m, C) where
# m = F(x) + QHᵀ(HQHᵀ + R)⁻¹(y − HF(x))
# = F(x) + cov(X, Y) @ cov(Y, Y)⁻¹ (y − HF(x))
# = F(x) + cov(X, Y) @ cov(Y, Y)⁻¹ (y − HF(x))
# and C = Q − QHᵀ(HQHᵀ + R)⁻¹HQ = cov(X, X) - cov(X, Y) cov(Y, Y)⁻¹ cov(X, Y)ᵀ
analytic_mean = copy(state)
@view(analytic_mean[updated_indices]) .+= (
Expand All @@ -397,7 +404,7 @@ end
# Create set of state 'particles' all equal to propagated state
states = Matrix{ParticleDA.get_state_eltype(model)}(
undef, (state_dimension, nprt)
)
)
states .= state
updated_states = copy(states)
for state in eachcol(updated_states)
Expand All @@ -407,21 +414,21 @@ end
# Mean of noise added by update_particle_noise! should be zero in all components
# and empirical mean should therefore be zero to within Monte Carlo error. The
# constant in the tolerance below was set by looking at scale of typical
# deviation, the point of check is that errors scale at expected O(1/√N) rate.
@test maximum(abs.(mean(noise, dims=2))) < (10. / sqrt(nprt))
# deviation, the point of check is that errors scale at expected O(1/√N) rate.
@test maximum(abs.(mean(noise, dims=2))) < (10. / sqrt(nprt))
# Covariance of noise added by update_particle_noise! to observed state
# components should be cov_X_X as computed above and empirical covariance of
# these components should therefore be within Monte Carlo error of cov_X_X. The
# constant in tolerance below was set by looking at scale of typical deviations,
# the point of check is that errors scale at expected O(1/√N) rate.
# the point of check is that errors scale at expected O(1/√N) rate.
noise_cov = cov(noise, dims=2)
@test maximum(abs.(noise_cov .- cov_X_X)) < (10. / sqrt(nprt))
@test maximum(abs.(noise_cov .- cov_X_X)) < (10. / sqrt(nprt))
ParticleDA.update_states_given_observations!(
updated_states, observation, model, filter_data, rng
)
updated_mean = mean(updated_states, dims=2)
updated_cov = cov(updated_states, dims=2)
# Mean and covariance of updated particles should be within O(1/√N) Monte Carlo
# Mean and covariance of updated particles should be within O(1/√N) Monte Carlo
# error of analytic values - constants in tolerances were set by looking at
# scale of typical deviations, main point of checks are that errors scale at
# expected O(1/√N) rate.
Expand Down Expand Up @@ -466,24 +473,24 @@ end
end


@testset "Integration test -- $(input_file) with $(filter_type) and $(stat_type)" for
@testset "Integration test -- $(input_file) with $(filter_type) and $(stat_type)" for
filter_type in (ParticleDA.BootstrapFilter, ParticleDA.OptimalFilter),
stat_type in (ParticleDA.MeanSummaryStat, ParticleDA.MeanAndVarSummaryStat),
input_file in ["integration_test_$i.yaml" for i in 1:6]
observation_file_path = tempname()
ParticleDA.simulate_observations_from_model(
LLW2d.init,
joinpath(@__DIR__, input_file),
LLW2d.init,
joinpath(@__DIR__, input_file),
observation_file_path
)
observation_sequence = h5open(
ParticleDA.read_observation_sequence, observation_file_path, "r"
)
@test !any(isnan.(observation_sequence))
states, statistics = ParticleDA.run_particle_filter(
LLW2d.init,
joinpath(@__DIR__, input_file),
observation_file_path,
LLW2d.init,
joinpath(@__DIR__, input_file),
observation_file_path,
filter_type,
stat_type,
)
Expand Down
Loading