Skip to content

Commit 0d50498

Browse files
committed
fixing test for rerunning jobs - should be more rebust; checking --no-requeue option
1 parent d07be70 commit 0d50498

File tree

2 files changed

+74
-37
lines changed

2 files changed

+74
-37
lines changed

pydra/engine/tests/test_submitter.py

Lines changed: 70 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -264,56 +264,90 @@ def test_slurm_args_2(tmpdir):
264264
sub(task)
265265

266266

267+
@mark.task
268+
def sleep(x, job_name_part):
269+
time.sleep(x)
270+
import subprocess as sp
271+
272+
# getting the job_id of the first job that sleeps
273+
job_id = 999
274+
while job_id != "":
275+
time.sleep(3)
276+
id_p1 = sp.Popen(["squeue"], stdout=sp.PIPE)
277+
id_p2 = sp.Popen(["grep", job_name_part], stdin=id_p1.stdout, stdout=sp.PIPE)
278+
id_p3 = sp.Popen(["awk", "{print $1}"], stdin=id_p2.stdout, stdout=sp.PIPE)
279+
job_id = id_p3.communicate()[0].decode("utf-8").strip()
280+
281+
return x
282+
283+
284+
@mark.task
285+
def cancel(job_name_part):
286+
import subprocess as sp
287+
288+
# getting the job_id of the first job that sleeps
289+
job_id = ""
290+
while job_id == "":
291+
time.sleep(1)
292+
id_p1 = sp.Popen(["squeue"], stdout=sp.PIPE)
293+
id_p2 = sp.Popen(["grep", job_name_part], stdin=id_p1.stdout, stdout=sp.PIPE)
294+
id_p3 = sp.Popen(["awk", "{print $1}"], stdin=id_p2.stdout, stdout=sp.PIPE)
295+
job_id = id_p3.communicate()[0].decode("utf-8").strip()
296+
297+
# # canceling the job
298+
proc1 = sp.run(["scancel", job_id])
299+
# checking the status of jobs with the name; returning the last item
300+
proc2 = sp.run(["sacct", "-j", job_id], stdout=sp.PIPE, stderr=sp.PIPE)
301+
return proc2.stdout.decode("utf-8").strip()
302+
303+
267304
@pytest.mark.skipif(not slurm_available, reason="slurm not installed")
268-
def test_slurm_cancel_rerun(tmpdir):
305+
def test_slurm_cancel_rerun_1(tmpdir):
269306
""" testing that tasks run with slurm is re-queue
270307
Running wf with 2 tasks, one sleeps and the other trying to get
271308
job_id of the first task and cancel it.
272309
The first job should be re-queue and finish without problem.
273310
(possibly has to be improved, in theory cancel job might finish before cancel)
274311
"""
275-
276-
@mark.task
277-
def sleep(x):
278-
time.sleep(x)
279-
return x
280-
281-
@mark.task
282-
def cancel(job_name_part):
283-
import subprocess as sp
284-
285-
# getting the job_id of the first job that sleeps
286-
job_id = ""
287-
while job_id == "":
288-
time.sleep(1)
289-
id_p1 = sp.Popen(["squeue"], stdout=sp.PIPE)
290-
id_p2 = sp.Popen(
291-
["grep", job_name_part], stdin=id_p1.stdout, stdout=sp.PIPE
292-
)
293-
id_p3 = sp.Popen(["awk", "{print $1}"], stdin=id_p2.stdout, stdout=sp.PIPE)
294-
job_id = id_p3.communicate()[0].decode("utf-8").strip()
295-
296-
# # canceling the job
297-
proc1 = sp.run(["scancel", job_id])
298-
# checking the status of jobs with the name; returning the last item
299-
proc2 = sp.run(["sacct", "-j", job_id], stdout=sp.PIPE, stderr=sp.PIPE)
300-
return proc2.stdout.decode("utf-8").strip() # .split("\n")[-1]
301-
302-
wf = Workflow(name="wf", input_spec=["x", "job_name"], cache_dir=tmpdir)
303-
wf.add(sleep(name="sleep", x=wf.lzin.x))
304-
wf.add(cancel(nane="cancel", job_name_part=wf.lzin.job_name))
305-
# this is a job name for x=10, if x is different checksum and jobname would have to be updated
306-
wf.inputs.x = 20
307-
wf.inputs.job_name = "sleep"
312+
wf = Workflow(
313+
name="wf",
314+
input_spec=["x", "job_name_cancel", "job_name_resqueue"],
315+
cache_dir=tmpdir,
316+
)
317+
wf.add(sleep(name="sleep", x=wf.lzin.x, job_name_part=wf.lzin.job_name_cancel))
318+
wf.add(cancel(nane="cancel", job_name_part=wf.lzin.job_name_resqueue))
319+
wf.inputs.x = 10
320+
wf.inputs.job_name_resqueue = "sleep"
321+
wf.inputs.job_name_cancel = "cancel"
308322

309323
wf.set_output([("out", wf.sleep.lzout.out), ("canc_out", wf.cancel.lzout.out)])
310324
with Submitter("slurm") as sub:
311325
sub(wf)
312326

313327
res = wf.result()
314-
assert res.output.out == 20
328+
assert res.output.out == 10
315329
# checking if indeed the sleep-task job was cancelled by cancel-task
316330
assert "CANCELLED" in res.output.canc_out
317-
breakpoint()
318331
script_dir = tmpdir / "SlurmWorker_scripts"
319332
assert script_dir.exists()
333+
334+
335+
@pytest.mark.skipif(not slurm_available, reason="slurm not installed")
336+
def test_slurm_cancel_rerun_2(tmpdir):
337+
""" testing that tasks run with slurm is re-queue
338+
Running wf with 2 tasks, one sleeps and the other trying to get
339+
job_id of the first task and cancel it.
340+
The first job should be re-queue and finish without problem.
341+
(possibly has to be improved, in theory cancel job might finish before cancel)
342+
"""
343+
wf = Workflow(name="wf", input_spec=["x", "job_name"], cache_dir=tmpdir)
344+
wf.add(sleep(name="sleep", x=wf.lzin.x))
345+
wf.add(cancel(nane="cancel", job_name_part=wf.lzin.job_name))
346+
347+
wf.inputs.x = 10
348+
wf.inputs.job_name = "sleep"
349+
350+
wf.set_output([("out", wf.sleep.lzout.out), ("canc_out", wf.cancel.lzout.out)])
351+
with pytest.raises(Exception):
352+
with Submitter("slurm", sbatch_args="--no-requeue") as sub:
353+
sub(wf)

pydra/engine/workers.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,10 @@ async def _submit_job(self, batchscript, name, checksum, cache_dir):
300300
# Exception: Polling / job failure
301301
done = await self._poll_job(jobid)
302302
if done:
303-
if done in ["CANCELLED", "TIMEOUT", "PREEMPTED"]:
303+
if (
304+
done in ["CANCELLED", "TIMEOUT", "PREEMPTED"]
305+
and "--no-requeue" not in self.sbatch_args
306+
):
304307
if (cache_dir / f"{checksum}.lock").exists():
305308
# for pyt3.8 we could you missing_ok=True
306309
(cache_dir / f"{checksum}.lock").unlink()

0 commit comments

Comments
 (0)