Skip to content

Commit dbbd17f

Browse files
authored
Merge pull request #361 from djarecka/mnt/slurm_upd
[mnt] slurm updates: sbatch args and requeuing
2 parents c1573a9 + e94a10c commit dbbd17f

File tree

2 files changed

+138
-5
lines changed

2 files changed

+138
-5
lines changed

pydra/engine/tests/test_submitter.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from .utils import gen_basic_wf
1010
from ..core import Workflow
11+
from ..task import ShellCommandTask
1112
from ..submitter import Submitter
1213
from ... import mark
1314

@@ -233,3 +234,119 @@ def test_slurm_max_jobs(tmpdir):
233234
prev = et
234235
continue
235236
assert (prev - et).seconds >= 2
237+
238+
239+
@pytest.mark.skipif(not slurm_available, reason="slurm not installed")
240+
def test_slurm_args_1(tmpdir):
241+
""" testing sbatch_args provided to the submitter"""
242+
task = sleep_add_one(x=1)
243+
task.cache_dir = tmpdir
244+
# submit workflow and every task as slurm job
245+
with Submitter("slurm", sbatch_args="-N1") as sub:
246+
sub(task)
247+
248+
res = task.result()
249+
assert res.output.out == 2
250+
script_dir = tmpdir / "SlurmWorker_scripts"
251+
assert script_dir.exists()
252+
253+
254+
@pytest.mark.skipif(not slurm_available, reason="slurm not installed")
255+
def test_slurm_args_2(tmpdir):
256+
""" testing sbatch_args provided to the submitter
257+
exception should be raised for invalid options
258+
"""
259+
task = sleep_add_one(x=1)
260+
task.cache_dir = tmpdir
261+
# submit workflow and every task as slurm job
262+
with pytest.raises(RuntimeError, match="Error returned from sbatch:"):
263+
with Submitter("slurm", sbatch_args="-N1 --invalid") as sub:
264+
sub(task)
265+
266+
267+
@mark.task
268+
def sleep(x, job_name_part):
269+
time.sleep(x)
270+
import subprocess as sp
271+
272+
# getting the job_id of the first job that sleeps
273+
job_id = 999
274+
while job_id != "":
275+
time.sleep(3)
276+
id_p1 = sp.Popen(["squeue"], stdout=sp.PIPE)
277+
id_p2 = sp.Popen(["grep", job_name_part], stdin=id_p1.stdout, stdout=sp.PIPE)
278+
id_p3 = sp.Popen(["awk", "{print $1}"], stdin=id_p2.stdout, stdout=sp.PIPE)
279+
job_id = id_p3.communicate()[0].decode("utf-8").strip()
280+
281+
return x
282+
283+
284+
@mark.task
285+
def cancel(job_name_part):
286+
import subprocess as sp
287+
288+
# getting the job_id of the first job that sleeps
289+
job_id = ""
290+
while job_id == "":
291+
time.sleep(1)
292+
id_p1 = sp.Popen(["squeue"], stdout=sp.PIPE)
293+
id_p2 = sp.Popen(["grep", job_name_part], stdin=id_p1.stdout, stdout=sp.PIPE)
294+
id_p3 = sp.Popen(["awk", "{print $1}"], stdin=id_p2.stdout, stdout=sp.PIPE)
295+
job_id = id_p3.communicate()[0].decode("utf-8").strip()
296+
297+
# # canceling the job
298+
proc1 = sp.run(["scancel", job_id])
299+
# checking the status of jobs with the name; returning the last item
300+
proc2 = sp.run(["sacct", "-j", job_id], stdout=sp.PIPE, stderr=sp.PIPE)
301+
return proc2.stdout.decode("utf-8").strip()
302+
303+
304+
@pytest.mark.skipif(not slurm_available, reason="slurm not installed")
305+
def test_slurm_cancel_rerun_1(tmpdir):
306+
""" testing that tasks run with slurm is re-queue
307+
Running wf with 2 tasks, one sleeps and the other trying to get
308+
job_id of the first task and cancel it.
309+
The first job should be re-queue and finish without problem.
310+
(possibly has to be improved, in theory cancel job might finish before cancel)
311+
"""
312+
wf = Workflow(
313+
name="wf",
314+
input_spec=["x", "job_name_cancel", "job_name_resqueue"],
315+
cache_dir=tmpdir,
316+
)
317+
wf.add(sleep(name="sleep", x=wf.lzin.x, job_name_part=wf.lzin.job_name_cancel))
318+
wf.add(cancel(nane="cancel", job_name_part=wf.lzin.job_name_resqueue))
319+
wf.inputs.x = 10
320+
wf.inputs.job_name_resqueue = "sleep"
321+
wf.inputs.job_name_cancel = "cancel"
322+
323+
wf.set_output([("out", wf.sleep.lzout.out), ("canc_out", wf.cancel.lzout.out)])
324+
with Submitter("slurm") as sub:
325+
sub(wf)
326+
327+
res = wf.result()
328+
assert res.output.out == 10
329+
# checking if indeed the sleep-task job was cancelled by cancel-task
330+
assert "CANCELLED" in res.output.canc_out
331+
script_dir = tmpdir / "SlurmWorker_scripts"
332+
assert script_dir.exists()
333+
334+
335+
@pytest.mark.skipif(not slurm_available, reason="slurm not installed")
336+
def test_slurm_cancel_rerun_2(tmpdir):
337+
""" testing that tasks run with slurm that has --no-requeue
338+
Running wf with 2 tasks, one sleeps and the other gets
339+
job_id of the first task and cancel it.
340+
The first job is not able t be rescheduled and the error is returned.
341+
"""
342+
wf = Workflow(name="wf", input_spec=["x", "job_name"], cache_dir=tmpdir)
343+
wf.add(sleep(name="sleep", x=wf.lzin.x))
344+
wf.add(cancel(nane="cancel", job_name_part=wf.lzin.job_name))
345+
346+
wf.inputs.x = 10
347+
wf.inputs.job_name = "sleep"
348+
349+
wf.set_output([("out", wf.sleep.lzout.out), ("canc_out", wf.cancel.lzout.out)])
350+
with pytest.raises(Exception):
351+
with Submitter("slurm", sbatch_args="--no-requeue") as sub:
352+
sub(wf)

pydra/engine/workers.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Execution workers."""
22
import asyncio
3-
import sys
3+
import sys, os
44
import re
55
from tempfile import gettempdir
66
from pathlib import Path
@@ -280,9 +280,13 @@ async def _submit_job(self, batchscript, name, checksum, cache_dir):
280280
error_file = None
281281
sargs.append(str(batchscript))
282282
# TO CONSIDER: add random sleep to avoid overloading calls
283-
_, stdout, _ = await read_and_display_async("sbatch", *sargs, hide_display=True)
283+
rc, stdout, stderr = await read_and_display_async(
284+
"sbatch", *sargs, hide_display=True
285+
)
284286
jobid = re.search(r"\d+", stdout)
285-
if not jobid:
287+
if rc:
288+
raise RuntimeError(f"Error returned from sbatch: {stderr}")
289+
elif not jobid:
286290
raise RuntimeError("Could not extract job ID")
287291
jobid = jobid.group()
288292
if error_file:
@@ -296,7 +300,17 @@ async def _submit_job(self, batchscript, name, checksum, cache_dir):
296300
# Exception: Polling / job failure
297301
done = await self._poll_job(jobid)
298302
if done:
299-
return True
303+
if (
304+
done in ["CANCELLED", "TIMEOUT", "PREEMPTED"]
305+
and "--no-requeue" not in self.sbatch_args
306+
):
307+
if (cache_dir / f"{checksum}.lock").exists():
308+
# for pyt3.8 we could you missing_ok=True
309+
(cache_dir / f"{checksum}.lock").unlink()
310+
cmd_re = ("scontrol", "requeue", jobid)
311+
await read_and_display_async(*cmd_re, hide_display=True)
312+
else:
313+
return True
300314
await asyncio.sleep(self.poll_delay)
301315

302316
async def _poll_job(self, jobid):
@@ -317,7 +331,9 @@ async def _verify_exit_code(self, jobid):
317331
m = self._sacct_re.search(stdout)
318332
error_file = self.error[jobid]
319333
if int(m.group("exit_code")) != 0 or m.group("status") != "COMPLETED":
320-
if m.group("status") in ["RUNNING", "PENDING"]:
334+
if m.group("status") in ["CANCELLED", "TIMEOUT", "PREEMPTED"]:
335+
return m.group("status")
336+
elif m.group("status") in ["RUNNING", "PENDING"]:
321337
return False
322338
# TODO: potential for requeuing
323339
# parsing the error message

0 commit comments

Comments
 (0)