Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/backends.jl
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ function module_load_string(::Type{DerechoBackend})
module purge
module load climacommon
module list
export TMPDIR=\${SCRATCH}/temp
mkdir -p \${TMPDIR}
echo \$TMPDIR
"""
end

Expand Down Expand Up @@ -310,6 +313,7 @@ function calibrate(
verbose = false,
hpc_kwargs,
exeflags = "",
reruns = 0,
)
ensemble_size = EKP.get_N_ens(ekp)
@info "Initializing calibration" n_iterations ensemble_size output_dir
Expand All @@ -331,6 +335,7 @@ function calibrate(
hpc_kwargs = hpc_kwargs,
verbose = verbose,
exeflags,
reruns,
)
@info "Completed iteration $iter, updating ensemble"
ekp = load_ekp_struct(output_dir, iter)
Expand All @@ -354,6 +359,7 @@ function run_hpc_iteration(
hpc_kwargs,
verbose = false,
exeflags = "",
reruns = 0,
)
@info "Iteration $iter"
job_ids = map(1:ensemble_size) do member
Expand Down Expand Up @@ -382,7 +388,7 @@ function run_hpc_iteration(
module_load_str;
hpc_kwargs,
verbose,
reruns = 0,
reruns,
)
end
end
Expand Down
124 changes: 112 additions & 12 deletions src/pbs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ function generate_pbs_script(
#PBS -l walltime=$walltime
#PBS -l select=$num_nodes:ncpus=$cpus_per_node:ngpus=$gpus_per_node:mpiprocs=$ranks_per_node

$(pbs_trap_block())

$module_load_str

export JULIA_MPI_HAS_CUDA=true
Expand All @@ -74,6 +76,35 @@ function generate_pbs_script(
return pbs_script, julia_script
end

"""
pbs_trap_block()

Return the bash snippet that makes PBS jobs requeue on preemption or near-walltime signals.

This is included in the generated PBS scripts and is tested for presence to avoid
duplicating the job snippet in tests.
"""
function pbs_trap_block()
return """
# Self-requeue on preemption or near-walltime signals:
# Trap SIGTERM on job termination and call `qrerun` to requeue the same job ID
# so it can continue later with the same submission parameters.
# Exiting with status 0 prevents the scheduler from marking the job as failed
# due to the trap.
handle_preterminate() {
sig="\$1"
echo "[ClimaCalibrate] Received \$sig on PBS job \${PBS_JOBID:-unknown}, attempting qrerun"
if command -v qrerun >/dev/null 2>&1; then
qrerun "\${PBS_JOBID}"
else
echo "qrerun not available on this system"
fi
exit 0
}
trap 'handle_preterminate TERM' TERM
"""
end

"""
pbs_model_run(iter, member, output_dir, experiment_dir, model_interface, module_load_str; hpc_kwargs)

Expand Down Expand Up @@ -135,13 +166,22 @@ end

Submit a job to the PBS Pro scheduler using qsub, removing unwanted environment variables.

Unset variables: "PBS_MEM_PER_CPU", "PBS_MEM_PER_GPU", "PBS_MEM_PER_NODE"
Unset variables: "PBS_MEM_PER_CPU", "PBS_MEM_PER_GPU", "PBS_MEM_PER_NODE", "PYTHONHOME", "PYTHONPATH", "PYTHONUSERBASE"
"""
function submit_pbs_job(filepath; debug = false, env = deepcopy(ENV))
unset_env_vars = ("PBS_MEM_PER_CPU", "PBS_MEM_PER_GPU", "PBS_MEM_PER_NODE")
# Clean env to avoid user overrides breaking system PBS utilities (e.g., python wrappers)
unset_env_vars = (
"PBS_MEM_PER_CPU",
"PBS_MEM_PER_GPU",
"PBS_MEM_PER_NODE",
"PYTHONHOME",
"PYTHONPATH",
"PYTHONUSERBASE",
)
for k in unset_env_vars
haskey(env, k) && delete!(env, k)
end
env["PYTHONNOUSERSITE"] = "1"
jobid = readchomp(setenv(`qsub $filepath`, env))
return jobid
end
Expand All @@ -158,7 +198,7 @@ wait_for_jobs(
module_load_str;
verbose,
hpc_kwargs,
reruns = 1,
reruns = 0,
) = wait_for_jobs(
jobids,
output_dir,
Expand All @@ -172,17 +212,77 @@ wait_for_jobs(
reruns,
)

"""
_qstat_output(jobid, env; retries=2, delay=0.25)

Best-effort qstat caller: tries dsv then plain format, with a few short retries.
Returns the output String or `nothing` if all attempts fail.
"""
function _qstat_output(jobid::PBSJobID, env)
attempts = 3
delay = 0.25
for i in 1:attempts
try
out = readchomp(setenv(`qstat -f $jobid -x -F dsv`, env))
if isempty(strip(out)) && i < attempts
sleep(delay)
continue
end
return out
catch
try
out = readchomp(setenv(`qstat -f $jobid -x`, env))
if isempty(strip(out)) && i < attempts
sleep(delay)
continue
end
return out
catch
i < attempts && sleep(delay)
end
end
end
return nothing
end

function job_status(jobid::PBSJobID)
status_str = readchomp(`qstat -f $jobid -x -F dsv`)
job_state_match = match(r"job_state=([^|]+)", status_str)
status = first(job_state_match.captures)
substate_match = match(r"substate=([^|]+)", status_str)
substate_number = parse(Int, (first(substate_match.captures)))
status_dict = Dict("Q" => :RUNNING, "F" => :COMPLETED)
status_symbol = get(status_dict, status, :RUNNING)
# Check for failure in the substate number
# Call qstat with a sanitized environment to avoid user Python interfering with PBS wrappers
clean_env = deepcopy(ENV)
for k in ("PYTHONHOME", "PYTHONPATH", "PYTHONUSERBASE")
haskey(clean_env, k) && delete!(clean_env, k)
end
clean_env["PYTHONNOUSERSITE"] = "1"

status_str = _qstat_output(jobid, clean_env)
if isnothing(status_str)
@warn "qstat failed for job $jobid; assuming job is running"
return :RUNNING
end

# Support both dsv and plain formats
job_state_match = match(r"job_state\s*=\s*([^|\n\r]+)", status_str)
substate_match = match(r"substate\s*=\s*(\d+)", status_str)

status_code = if isnothing(job_state_match)
@warn "Job status for $jobid not found in qstat output. Assuming job is running"
"Q"
else
strip(first(job_state_match.captures))
end

substate_number =
isnothing(substate_match) ? 0 :
parse(Int, first(substate_match.captures))

# Map PBS states to our symbols; default to :RUNNING while job exists
status_symbol = get(
Dict("Q" => :RUNNING, "R" => :RUNNING, "F" => :COMPLETED),
status_code,
:RUNNING,
)

if status_symbol == :COMPLETED && substate_number in (91, 93)
status_symbol = :FAILED
return :FAILED
end
return status_symbol
end
Expand Down
69 changes: 63 additions & 6 deletions src/slurm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ kwargs(; kwargs...) = Dict{Symbol, Any}(kwargs...)

Wait for a set of jobs to complete. If a job fails, it will be rerun up to `reruns` times.

This function monitors the status of multiple jobs and handles failures by rerunning the failed jobs up to the specified number of `reruns`. It logs errors and job completion status, ensuring all jobs are completed before proceeding.
In addition to scheduler status, a job is only considered successful when the model writes its
`completed` checkpoint file. This makes restarts robust when jobs exit early due to time limits
and are requeued by the scheduler.

Arguments:
- `jobids`: Vector of job IDs.
Expand All @@ -39,7 +41,7 @@ function wait_for_jobs(
model_run_func;
verbose,
hpc_kwargs,
reruns = 1,
reruns = 0,
)
rerun_job_count = zeros(length(jobids))
completed_jobs = Set{Int}()
Expand Down Expand Up @@ -68,8 +70,15 @@ function wait_for_jobs(
push!(completed_jobs, m)
end
elseif job_success(jobid)
@info "Ensemble member $m complete"
push!(completed_jobs, m)
# Only mark success if the model has written its completion checkpoint
if model_completed(output_dir, iter, m)
@info "Ensemble member $m complete"
push!(completed_jobs, m)
else
# The scheduler may report COMPLETED for a batch script that requeued itself.
# Wait until the completion file exists.
@debug "Job $jobid completed but checkpoint not found yet for member $m"
end
end
end
sleep(5)
Expand Down Expand Up @@ -183,9 +192,26 @@ function generate_sbatch_directives(hpc_kwargs)
@assert haskey(hpc_kwargs, :time) "Slurm kwargs must include key :time"

hpc_kwargs[:time] = format_slurm_time(hpc_kwargs[:time])
# Provide a default pre-timeout signal to the batch script unless user overrides
if !haskey(hpc_kwargs, :signal)
# Send SIGUSR1 to the batch script 5 minutes before time limit
hpc_kwargs[:signal] = "B:USR1@300"
end
if !haskey(hpc_kwargs, :requeue)
hpc_kwargs[:requeue] = true
end

slurm_directives = map(collect(hpc_kwargs)) do (k, v)
"#SBATCH --$(replace(string(k), "_" => "-"))=$(replace(string(v), "_" => "-"))"
key = replace(string(k), "_" => "-")
if v === true
return "#SBATCH --$key"
elseif v === false
return nothing
else
return "#SBATCH --$key=$(replace(string(v), "_" => "-"))"
end
end
slurm_directives = filter(!isnothing, slurm_directives)
return join(slurm_directives, "\n")
end
"""
Expand Down Expand Up @@ -221,6 +247,14 @@ function generate_sbatch_script(
#SBATCH --output=$member_log
$slurm_directives

# Self-requeue on pre-timeout or termination signals
# `#SBATCH --signal=B:USR1@300` sends SIGUSR1 to the batch script 300 seconds before
# the job time limit (B means send to the batch script). Sites may also deliver TERM.
# We trap USR1/TERM and call `scontrol requeue \$SLURM_JOB_ID` so the job returns to
# the queue and can continue later with the same submission parameters.
# Exiting with status 0 prevents a false failure due to the trap itself.
trap 'echo "[ClimaCalibrate] Pre-timeout/TERM on job \$SLURM_JOB_ID, requeuing"; scontrol requeue \$SLURM_JOB_ID; exit 0' USR1 TERM

$module_load_str
export CLIMACOMMS_DEVICE="$climacomms_device"
export CLIMACOMMS_CONTEXT="MPI"
Expand Down Expand Up @@ -341,14 +375,37 @@ function job_status(job_id::SlurmJobID)
]
running_statuses =
["RUNNING", "COMPLETING", "STAGED", "SUSPENDED", "STOPPED", "RESIZING"]
failed_statuses = [
"FAILED",
"CANCELLED",
"NODE_FAIL",
"TIMEOUT",
"OUT_OF_MEMORY",
"PREEMPTED",
]
invalid_job_err = "slurm_load_jobs error: Invalid job id specified"
@debug job_id status exit_code stderr

status == "" && exit_code == 0 && stderr == "" && return :COMPLETED
if status == ""
# Not in squeue; fall back to sacct for a terminal state
try
acct = readchomp(`sacct -j $job_id --format=State%20 -n -X`)
# sacct may return multiple lines; take the first non-empty token
acct_state = strip(first(split(acct, '\n', keepempty = false), ""))
if any(str -> occursin(str, acct_state), failed_statuses)
return :FAILED
elseif occursin("COMPLETED", acct_state)
return :COMPLETED
end
catch
# Ignore sacct errors and continue to other checks
end
end
exit_code != 0 && contains(stderr, invalid_job_err) && return :COMPLETED

any(str -> contains(status, str), pending_statuses) && return :PENDING
any(str -> contains(status, str), running_statuses) && return :RUNNING
any(str -> contains(status, str), failed_statuses) && return :FAILED

@warn "Job ID $job_id has unknown status `$status`. Marking as completed"
return :COMPLETED
Expand Down
Loading
Loading