Skip to content

Commit f08f4be

Browse files
committed
Add requeue support for time-limited jobs
1 parent 689a4a8 commit f08f4be

File tree

5 files changed

+175
-8
lines changed

5 files changed

+175
-8
lines changed

src/backends.jl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ function calibrate(
310310
verbose = false,
311311
hpc_kwargs,
312312
exeflags = "",
313+
reruns = 0,
313314
)
314315
ensemble_size = EKP.get_N_ens(ekp)
315316
@info "Initializing calibration" n_iterations ensemble_size output_dir
@@ -331,6 +332,7 @@ function calibrate(
331332
hpc_kwargs = hpc_kwargs,
332333
verbose = verbose,
333334
exeflags,
335+
reruns,
334336
)
335337
@info "Completed iteration $iter, updating ensemble"
336338
ekp = load_ekp_struct(output_dir, iter)
@@ -354,6 +356,7 @@ function run_hpc_iteration(
354356
hpc_kwargs,
355357
verbose = false,
356358
exeflags = "",
359+
reruns = 0,
357360
)
358361
@info "Iteration $iter"
359362
job_ids = map(1:ensemble_size) do member
@@ -382,7 +385,7 @@ function run_hpc_iteration(
382385
module_load_str;
383386
hpc_kwargs,
384387
verbose,
385-
reruns = 0,
388+
reruns,
386389
)
387390
end
388391
end

src/pbs.jl

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ function generate_pbs_script(
5757
#PBS -l walltime=$walltime
5858
#PBS -l select=$num_nodes:ncpus=$cpus_per_node:ngpus=$gpus_per_node:mpiprocs=$ranks_per_node
5959
60+
$(pbs_trap_block())
61+
6062
$module_load_str
6163
6264
export JULIA_MPI_HAS_CUDA=true
@@ -74,6 +76,38 @@ function generate_pbs_script(
7476
return pbs_script, julia_script
7577
end
7678

79+
"""
80+
pbs_trap_block()
81+
82+
Return the bash snippet that makes PBS jobs requeue on preemption or near-walltime signals.
83+
84+
This is included in the generated PBS scripts and is tested for presence to avoid
85+
duplicating the job snippet in tests.
86+
"""
87+
function pbs_trap_block()
88+
return """
89+
# Self-requeue on preemption or near-walltime signals
90+
# - Many PBS deployments send SIGTERM shortly before walltime or on preemption;
91+
# some may send SIGUSR1 as a warning.
92+
# - We trap these signals and call `qrerun` to requeue the same job ID so it can
93+
# continue later with the same submission parameters.
94+
# - Exiting with status 0 prevents the scheduler from marking the job as failed
95+
# due to the trap.
96+
handle_preterminate() {
97+
sig="\$1"
98+
echo "[ClimaCalibrate] Received \$sig on PBS job \${PBS_JOBID:-unknown}, attempting qrerun"
99+
if command -v qrerun >/dev/null 2>&1; then
100+
qrerun "\${PBS_JOBID}"
101+
else
102+
echo "qrerun not available on this system"
103+
fi
104+
exit 0
105+
}
106+
trap 'handle_preterminate TERM' TERM
107+
trap 'handle_preterminate USR1' USR1
108+
"""
109+
end
110+
77111
"""
78112
pbs_model_run(iter, member, output_dir, experiment_dir, model_interface, module_load_str; hpc_kwargs)
79113
@@ -167,7 +201,7 @@ wait_for_jobs(
167201
module_load_str;
168202
verbose,
169203
hpc_kwargs,
170-
reruns = 1,
204+
reruns = 0,
171205
) = wait_for_jobs(
172206
jobids,
173207
output_dir,

src/slurm.jl

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ kwargs(; kwargs...) = Dict{Symbol, Any}(kwargs...)
1515
1616
Wait for a set of jobs to complete. If a job fails, it will be rerun up to `reruns` times.
1717
18-
This function monitors the status of multiple jobs and handles failures by rerunning the failed jobs up to the specified number of `reruns`. It logs errors and job completion status, ensuring all jobs are completed before proceeding.
18+
In addition to scheduler status, a job is only considered successful when the model writes its
19+
`completed` checkpoint file. This makes restarts robust when jobs exit early due to time limits
20+
and are requeued by the scheduler.
1921
2022
Arguments:
2123
- `jobids`: Vector of job IDs.
@@ -39,7 +41,7 @@ function wait_for_jobs(
3941
model_run_func;
4042
verbose,
4143
hpc_kwargs,
42-
reruns = 1,
44+
reruns = 0,
4345
)
4446
rerun_job_count = zeros(length(jobids))
4547
completed_jobs = Set{Int}()
@@ -68,8 +70,15 @@ function wait_for_jobs(
6870
push!(completed_jobs, m)
6971
end
7072
elseif job_success(jobid)
71-
@info "Ensemble member $m complete"
72-
push!(completed_jobs, m)
73+
# Only mark success if the model has written its completion checkpoint
74+
if model_completed(output_dir, iter, m)
75+
@info "Ensemble member $m complete"
76+
push!(completed_jobs, m)
77+
else
78+
# The scheduler may report COMPLETED for a batch script that requeued itself.
79+
# Wait until the completion file exists.
80+
@debug "Job $jobid completed but checkpoint not found yet for member $m"
81+
end
7382
end
7483
end
7584
sleep(5)
@@ -183,9 +192,26 @@ function generate_sbatch_directives(hpc_kwargs)
183192
@assert haskey(hpc_kwargs, :time) "Slurm kwargs must include key :time"
184193

185194
hpc_kwargs[:time] = format_slurm_time(hpc_kwargs[:time])
195+
# Provide a default pre-timeout signal to the batch script unless user overrides
196+
if !haskey(hpc_kwargs, :signal)
197+
# Send SIGUSR1 to the batch script 5 minutes before time limit
198+
hpc_kwargs[:signal] = "B:USR1@300"
199+
end
200+
if !haskey(hpc_kwargs, :requeue)
201+
hpc_kwargs[:requeue] = true
202+
end
203+
186204
slurm_directives = map(collect(hpc_kwargs)) do (k, v)
187-
"#SBATCH --$(replace(string(k), "_" => "-"))=$(replace(string(v), "_" => "-"))"
205+
key = replace(string(k), "_" => "-")
206+
if v === true
207+
return "#SBATCH --$key"
208+
elseif v === false
209+
return nothing
210+
else
211+
return "#SBATCH --$key=$(replace(string(v), "_" => "-"))"
212+
end
188213
end
214+
slurm_directives = filter(!isnothing, slurm_directives)
189215
return join(slurm_directives, "\n")
190216
end
191217
"""
@@ -221,6 +247,14 @@ function generate_sbatch_script(
221247
#SBATCH --output=$member_log
222248
$slurm_directives
223249
250+
# Self-requeue on pre-timeout or termination signals
251+
# `#SBATCH --signal=B:USR1@300` sends SIGUSR1 to the batch script 300 seconds before
252+
# the job time limit (B means send to the batch script). Sites may also deliver TERM.
253+
# We trap USR1/TERM and call `scontrol requeue $SLURM_JOB_ID` so the job returns to
254+
# the queue and can continue later with the same submission parameters.
255+
# Exiting with status 0 prevents a false failure due to the trap itself.
256+
trap 'echo "[ClimaCalibrate] Pre-timeout/TERM on job $SLURM_JOB_ID, requeuing"; scontrol requeue $SLURM_JOB_ID; exit 0' USR1 TERM
257+
224258
$module_load_str
225259
export CLIMACOMMS_DEVICE="$climacomms_device"
226260
export CLIMACOMMS_CONTEXT="MPI"
@@ -341,14 +375,37 @@ function job_status(job_id::SlurmJobID)
341375
]
342376
running_statuses =
343377
["RUNNING", "COMPLETING", "STAGED", "SUSPENDED", "STOPPED", "RESIZING"]
378+
failed_statuses = [
379+
"FAILED",
380+
"CANCELLED",
381+
"NODE_FAIL",
382+
"TIMEOUT",
383+
"OUT_OF_MEMORY",
384+
"PREEMPTED",
385+
]
344386
invalid_job_err = "slurm_load_jobs error: Invalid job id specified"
345387
@debug job_id status exit_code stderr
346388

347-
status == "" && exit_code == 0 && stderr == "" && return :COMPLETED
389+
if status == ""
390+
# Not in squeue; fall back to sacct for a terminal state
391+
try
392+
acct = readchomp(`sacct -j $job_id --format=State%20 -n -X`)
393+
# sacct may return multiple lines; take the first non-empty token
394+
acct_state = strip(first(split(acct, '\n', keepempty = false), ""))
395+
if any(str -> occursin(str, acct_state), failed_statuses)
396+
return :FAILED
397+
elseif occursin("COMPLETED", acct_state)
398+
return :COMPLETED
399+
end
400+
catch
401+
# Ignore sacct errors and continue to other checks
402+
end
403+
end
348404
exit_code != 0 && contains(stderr, invalid_job_err) && return :COMPLETED
349405

350406
any(str -> contains(status, str), pending_statuses) && return :PENDING
351407
any(str -> contains(status, str), running_statuses) && return :RUNNING
408+
any(str -> contains(status, str), failed_statuses) && return :FAILED
352409

353410
@warn "Job ID $job_id has unknown status `$status`. Marking as completed"
354411
return :COMPLETED

test/pbs_unit_tests.jl

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,27 @@ expected_pbs_contents = """
4646
#PBS -l walltime=01:30:00
4747
#PBS -l select=2:ncpus=16:ngpus=2:mpiprocs=2
4848
49+
# Self-requeue on preemption or near-walltime signals
50+
# - Many PBS deployments send SIGTERM shortly before walltime or on preemption;
51+
# some may send SIGUSR1 as a warning.
52+
# - We trap these signals and call `qrerun` to requeue the same job ID so it can
53+
# continue later with the same submission parameters.
54+
# - Exiting with status 0 prevents the scheduler from marking the job as failed
55+
# due to the trap.
56+
handle_preterminate() {
57+
sig="\$1"
58+
echo "[ClimaCalibrate] Received \$sig on PBS job \${PBS_JOBID:-unknown}, attempting qrerun"
59+
if command -v qrerun >/dev/null 2>&1; then
60+
qrerun "\${PBS_JOBID}"
61+
else
62+
echo "qrerun not available on this system"
63+
fi
64+
exit 0
65+
}
66+
trap 'handle_preterminate TERM' TERM
67+
trap 'handle_preterminate USR1' USR1
68+
69+
4970
export MODULEPATH="/glade/campaign/univ/ucit0011/ClimaModules-Derecho:\$MODULEPATH"
5071
module purge
5172
module load climacommon
@@ -63,6 +84,57 @@ for (generated_str, test_str) in
6384
@test generated_str == test_str
6485
end
6586

87+
# Requeue behavior test (skips automatically if PBS tools are unavailable)
88+
@testset "PBS requeue trap integration (walltime)" begin
89+
testdir = mktempdir()
90+
state_file = joinpath(testdir, "state.txt")
91+
done_file = joinpath(testdir, "done.txt")
92+
93+
pbs_script = """
94+
#!/bin/bash
95+
#PBS -N requeue_test
96+
#PBS -j oe
97+
#PBS -A UCIT0011
98+
#PBS -q main
99+
#PBS -l walltime=00:00:10
100+
#PBS -l select=1:ncpus=1:ngpus=1
101+
102+
$(CAL.pbs_trap_block())
103+
104+
STATE_FILE="$(state_file)"
105+
DONE_FILE="$(done_file)"
106+
107+
if [ ! -f "\$STATE_FILE" ]; then
108+
echo "first" > "\$STATE_FILE"
109+
# Exceed walltime so the scheduler sends TERM; the trap requeues
110+
sleep 120
111+
else
112+
echo "second" >> "\$STATE_FILE"
113+
touch "\$DONE_FILE"
114+
fi
115+
"""
116+
117+
script_path, io = mktemp()
118+
write(io, pbs_script)
119+
close(io)
120+
jobid = CAL.submit_pbs_job(script_path)
121+
122+
# Wait for completion (with timeout)
123+
t_start = time()
124+
timeout_s = 300.0
125+
while !CAL.job_completed(jobid) && (time() - t_start) < timeout_s
126+
sleep(2)
127+
end
128+
@test CAL.job_completed(jobid)
129+
130+
# Validate requeue and done marker
131+
@test isfile(state_file)
132+
@test isfile(done_file)
133+
content = read(state_file, String)
134+
@test occursin("first", content)
135+
@test occursin("second", content)
136+
end
137+
66138
# Helper function for submitting commands and checking job status
67139
function submit_cmd_helper(cmd)
68140
sbatch_filepath, io = mktemp()

test/runtests.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ using SafeTestsets
88
@safetestset "Julia backend" begin include("julia_backend.jl") end
99
@safetestset "Aqua" begin include("aqua.jl") end
1010
@safetestset "Observation recipe" begin include("observation_recipe.jl") end
11+
@safetestset "PBS requeue integration" begin include("pbs_requeue_tests.jl") end
1112
#! format: on
1213

1314
nothing

0 commit comments

Comments
 (0)