Skip to content

Commit c54afa3

Browse files
ucabc46Angeladadd
authored andcommitted
replace unoptimized copy_states, put path to vars, set optimize option as default
1 parent 9e19062 commit c54afa3

File tree

10 files changed

+31
-91
lines changed

10 files changed

+31
-91
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ Manifest-v*.toml
55
/docs/build/
66
/.benchmarkci
77
*.h5
8+
slurm_log/

extra/weak_scaling/Project.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[deps]
2+
ParticleDA = "61cd1fb4-f4c4-4bc8-80c6-ea5639a6ca2e"

extra/weak_scaling/kathleen_slurm_copy_states.sh

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,18 @@
55
#SBATCH --cpus-per-task=40
66
#SBATCH --nodes=16
77
#SBATCH --ntasks-per-node=1
8-
#SBATCH --chdir=/home/ucabc46/exp/ParticleDA.jl
9-
#SBATCH --output=test/slurm_log/%x-%j.out
10-
#SBATCH --error=test/slurm_log/%x-%j.err
8+
#SBATCH --chdir=/home/ucabc46/ParticleDA.jl
9+
#SBATCH --output=slurm_log/%x-%j.out
10+
#SBATCH --error=slurm_log/%x-%j.err
1111

1212
export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
1313
export JULIA_NUM_THREADS=$OMP_NUM_THREADS
1414

1515
julia --project=. -e 'using Pkg; Pkg.instantiate(); Pkg.precompile()'
1616

17-
/home/ucabc46/.julia/bin/mpiexecjl -n $SLURM_NNODES\
17+
PARTICLEDA_TEST_DIR=$HOME/ParticleDA.jl/test
18+
JULIA_DIR=$HOME/.julia
19+
20+
$JULIA_DIR/bin/mpiexecjl -n $SLURM_NNODES\
1821
julia --project=. \
19-
/home/ucabc46/exp/ParticleDA.jl/test/mpi_optimized_copy_states.jl -t /home/ucabc46/exp/ParticleDA.jl/test/output/dedup_threading_optimize_resampling/all_timers_$SLURM_NNODES.h5 -o
22+
$PARTICLEDA_TEST_DIR/mpi_optimized_copy_states.jl -t $PARTICLEDA_TEST_DIR/output/all_timers_$SLURM_NNODES.h5 -o

extra/weak_scaling/kathleen_slurm_weak_scaling.sh

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55
#SBATCH --nodes=4
66
#SBATCH --ntasks-per-node=1
77
#SBATCH --cpus-per-task=40
8-
#SBATCH --chdir=/home/ucabc46/exp/ParticleDA.jl
8+
#SBATCH --chdir=/home/ucabc46/ParticleDA.jl/extra/weak_scaling
99
#SBATCH --output=slurm_log/%x-%j.out
1010
#SBATCH --error=slurm_log/%x-%j.err
1111

1212
export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
1313
export JULIA_NUM_THREADS=$OMP_NUM_THREADS
1414

15-
/home/ucabc46/.julia/bin/mpiexecjl -n $SLURM_NNODES\
15+
PARTICLEDA_WEAKSCALING_DIR=$HOME/ParticleDA.jl/extra/weak_scaling
16+
JULIA_DIR=$HOME/.julia
17+
18+
$JULIA_DIR/bin/mpiexecjl -n $SLURM_NNODES\
1619
julia --project=. \
17-
/home/ucabc46/exp/ParticleDA.jl/extra/weak_scaling/run_particleda.jl
20+
$PARTICLEDA_WEAKSCALING_DIR/run_particleda.jl

extra/weak_scaling/parametersW1.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ simulate_observations:
33
seed: 123
44
model:
55
llw2d:
6-
station_filename: "/home/ucabc46/exp/ParticleDA.jl/extra/weak_scaling/stationsW1.txt"
6+
station_filename: "stationsW1.txt"
77
nu: 2.5
88
nu_initial_state: 2.5
99
peak_height: 30.0
@@ -27,7 +27,7 @@ model:
2727
- 1
2828
time_step: 0.5
2929
filter:
30-
optimize_copy_states: true
30+
optimize_resampling: true
3131
output_filename: "llw2d_filtering.h5"
32-
nprt: 1000
32+
nprt: 2000
3333
enable_timers: true

extra/weak_scaling/run_particleda.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ mpi_size = MPI.Comm_size(MPI.COMM_WORLD)
1818
llw2d_src = joinpath(dirname(pathof(ParticleDA)), "..", "test", "models", "llw2d.jl")
1919
include(llw2d_src)
2020
using .LLW2d
21-
observation_file = joinpath(dirname(pathof(ParticleDA)), "..", "extra", "weak_scaling", "test_observations.h5")
22-
parameters_file = joinpath(dirname(pathof(ParticleDA)), "..", "extra", "weak_scaling", "parametersW1.yaml")
23-
output_file = joinpath(dirname(pathof(ParticleDA)), "..", "extra", "weak_scaling", "llw2d_filtering.h5")
21+
observation_file = "test_observations.h5"
22+
parameters_file = "parametersW1.yaml"
23+
output_file = "llw2d_filtering.h5"
2424
#filter_type = OptimalFilter
2525
filter_type = BootstrapFilter
2626
summary_stat_type = NaiveMeanSummaryStat
@@ -48,7 +48,7 @@ open(parameters_file, "w") do io
4848
YAML.write(io, parameters)
4949
end
5050

51-
println("Optimized copy states enabled: ", parameters["filter"]["optimize_copy_states"])
51+
println("Optimized resampling enabled: ", parameters["filter"]["optimize_resampling"])
5252

5353
final_states, final_statistics = run_particle_filter(
5454
LLW2d.init, parameters_file, observation_file, filter_type, summary_stat_type

src/ParticleDA.jl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ function run_particle_filter(
271271
@timeit_debug timer "Resample" resample!(
272272
filter_data.resampling_indices, filter_data.weights, rng
273273
)
274-
if filter_params.optimize_copy_states
274+
if filter_params.optimize_resampling
275275
# Optimize resampling indices to minimize data movement when copying states
276276
@timeit_debug timer "Optimize Resample" filter_data.resampling_indices .= optimized_resample!(
277277
filter_data.resampling_indices, my_size
@@ -293,8 +293,7 @@ function run_particle_filter(
293293
filter_data.resampling_indices,
294294
my_rank,
295295
nprt_per_rank,
296-
timer,
297-
filter_params.optimize_copy_states
296+
timer
298297
)
299298

300299
if filter_params.verbose

src/params.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ Parameters for ParticleDA run. Keyword arguments:
1717
the scheduler to balance load across threads but potentially increase overheads.
1818
If simulation of the model being filtered use multiple threads then it may be
1919
beneficial to set the `n_tasks = 1` to avoid too much contention between threads.
20-
* `optimize_copy_states::Bool`: Flag to control whether to use optimized copy_states
21-
function that reduces the number of messages sent during resampling.
20+
* `optimize_resampling::Bool`: Flag to control whether to optimize resampling indices
21+
to minimize data movement when copying states between MPI ranks.
2222
"""
2323
Base.@kwdef struct FilterParameters{V<:Union{AbstractSet, AbstractVector}}
2424
master_rank::Int = 0
@@ -29,7 +29,7 @@ Base.@kwdef struct FilterParameters{V<:Union{AbstractSet, AbstractVector}}
2929
particle_save_time_indices::V = []
3030
seed::Union{Nothing, Int} = nothing
3131
n_tasks::Int = -1
32-
optimize_copy_states::Bool = false
32+
optimize_resampling::Bool = true
3333
end
3434

3535

src/utils.jl

Lines changed: 1 addition & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -83,73 +83,6 @@ function init_states(model, nprt_per_rank::Int, n_tasks::Int, rng::AbstractRNG)
8383
end
8484

8585
function copy_states!(
86-
particles::AbstractMatrix{T},
87-
buffer::AbstractMatrix{T},
88-
resampling_indices::Vector{Int},
89-
my_rank::Int,
90-
nprt_per_rank::Int,
91-
to::TimerOutputs.TimerOutput = TimerOutputs.TimerOutput(),
92-
dedup::Bool = false
93-
) where T
94-
95-
if dedup
96-
return copy_states_dedup!(particles, buffer, resampling_indices, my_rank, nprt_per_rank, to)
97-
end
98-
99-
# These are the particle indices stored on this rank
100-
particles_have = my_rank * nprt_per_rank + 1:(my_rank + 1) * nprt_per_rank
101-
102-
# These are the particle indices this rank should have after resampling
103-
particles_want = resampling_indices[particles_have]
104-
105-
# These are the ranks that have the particles this rank should have
106-
rank_has = floor.(Int, (particles_want .- 1) / nprt_per_rank)
107-
108-
# We could work out how many sends and receives we have to do and allocate
109-
# this appropriately but, lazy
110-
reqs = Vector{MPI.Request}(undef, 0)
111-
112-
# Send particles to processes that want them
113-
@timeit_debug to "send loop" begin
114-
for (k,id) in enumerate(resampling_indices)
115-
rank_wants = floor(Int, (k - 1) / nprt_per_rank)
116-
if id in particles_have && rank_wants != my_rank
117-
local_id = id - my_rank * nprt_per_rank
118-
req = MPI.Isend(view(particles, :, local_id), rank_wants, id, MPI.COMM_WORLD)
119-
push!(reqs, req)
120-
end
121-
end
122-
end
123-
124-
# Receive particles this rank wants from ranks that have them
125-
# If I already have them, just do a local copy
126-
# Receive into a buffer so we dont accidentally overwrite stuff
127-
@timeit_debug to "receive loop" begin
128-
for (k,proc,id) in zip(1:nprt_per_rank, rank_has, particles_want)
129-
if proc == my_rank
130-
@timeit_debug to "local copy" begin
131-
local_id = id - my_rank * nprt_per_rank
132-
buffer[:, k] .= view(particles, :, local_id)
133-
end
134-
else
135-
@timeit_debug to "remote receive" begin
136-
req = MPI.Irecv!(view(buffer, :, k), proc, id, MPI.COMM_WORLD)
137-
push!(reqs,req)
138-
end
139-
end
140-
end
141-
end
142-
143-
# Wait for all comms to complete
144-
@timeit_debug to "waitall phase" MPI.Waitall(reqs)
145-
146-
@timeit_debug to "buffer write-back" particles .= buffer
147-
148-
end
149-
150-
# An optimized version of copy_states that minimizes the number of messages sent
151-
# by deduplicating particles that need to be sent between ranks.
152-
function copy_states_dedup!(
15386
particles::AbstractMatrix{T},
15487
buffer::AbstractMatrix{T},
15588
resampling_indices::Vector{Int},
@@ -243,7 +176,7 @@ function _determine_sends(resampling_indices::Vector{Int}, my_rank::Int, nprt_pe
243176
return sends_to
244177
end
245178

246-
function _categorize_wants(particles_want, my_rank::Int, nprt_per_rank::Int)
179+
function _categorize_wants(particles_want::Vector{Int}, my_rank::Int, nprt_per_rank::Int)
247180
local_copies = Dict{Int, Vector{Int}}()
248181
remote_copies = Dict{Int, Vector{Int}}()
249182

test/mpi_optimized_copy_states.jl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,7 @@ for (trial_name, indices_func) in trial_sets
124124
indices,
125125
my_rank,
126126
n_particle_per_rank,
127-
timer,
128-
!no_dedup
127+
timer
129128
)
130129
end
131130
end

0 commit comments

Comments
 (0)