Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ def __post_init__(self, test_mode: bool = False):
self._fallback_account_arg = None
self._fallback_partition = None
self._preemption_warning = False # no preemption warning has been issued
self._submitted_job_clusters = set() # track clusters of submitted jobs
self.slurm_logdir = (
Path(self.workflow.executor_settings.logdir)
if self.workflow.executor_settings.logdir
Expand Down Expand Up @@ -596,6 +597,14 @@ def run_job(self, job: JobExecutorInterface):
f"Job {job.jobid} has been submitted with SLURM jobid "
f"{slurm_jobid} (log: {slurm_logfile})."
)
# Track cluster specification for later use in cancel_jobs
cluster_val = (
job.resources.get("cluster")
or job.resources.get("clusters")
or job.resources.get("slurm_cluster")
)
if cluster_val:
self._submitted_job_clusters.add(cluster_val)
self.report_job_submission(
SubmittedJobInfo(
job,
Expand Down Expand Up @@ -803,12 +812,18 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
if active_jobs:
# TODO chunk jobids in order to avoid too long command lines
jobids = " ".join([job_info.external_jobid for job_info in active_jobs])

try:
# timeout set to 60, because a scheduler cycle usually is
# about 30 sec, but can be longer in extreme cases.
# Under 'normal' circumstances, 'scancel' is executed in
# virtually no time.
scancel_command = f"scancel {jobids} --clusters=all"
scancel_command = f"scancel {jobids}"

# Add cluster specification if any clusters were found during submission
if self._submitted_job_clusters:
clusters_str = ",".join(sorted(self._submitted_job_clusters))
scancel_command += f" --clusters={clusters_str}"

subprocess.check_output(
scancel_command,
Expand All @@ -823,6 +838,16 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
msg = e.stderr.strip()
if msg:
msg = f": {msg}"
# If we were using --clusters and it failed, provide additional context
if self._submitted_job_clusters:
msg += (
"\nWARNING: Job cancellation failed while using "
"--clusters flag. Your multicluster SLURM setup may not "
"support this feature, or the SLURM database may not be "
"properly configured for multicluster operations. "
"Please verify your SLURM configuration with your "
"HPC administrator."
)
raise WorkflowError(
"Unable to cancel jobs with scancel "
f"(exit code {e.returncode}){msg}"
Expand Down