|
8 | 8 |
|
9 | 9 | from .utils import gen_basic_wf
|
10 | 10 | from ..core import Workflow
|
| 11 | +from ..task import ShellCommandTask |
11 | 12 | from ..submitter import Submitter
|
12 | 13 | from ... import mark
|
13 | 14 |
|
@@ -261,3 +262,58 @@ def test_slurm_args_2(tmpdir):
|
261 | 262 | with pytest.raises(RuntimeError, match="Error returned from sbatch:"):
|
262 | 263 | with Submitter("slurm", sbatch_args="-N1 --invalid") as sub:
|
263 | 264 | sub(task)
|
| 265 | + |
| 266 | + |
| 267 | +@pytest.mark.skipif(not slurm_available, reason="slurm not installed") |
| 268 | +def test_slurm_cancel_rerun(tmpdir): |
| 269 | + """ testing that tasks run with slurm is re-queue |
| 270 | + Running wf with 2 tasks, one sleeps and the other trying to get |
| 271 | + job_id of the first task and cancel it. |
| 272 | + The first job should be re-queue and finish without problem. |
| 273 | + (possibly has to be improved, in theory cancel job might finish before cancel) |
| 274 | + """ |
| 275 | + |
| 276 | + @mark.task |
| 277 | + def sleep(x): |
| 278 | + time.sleep(x) |
| 279 | + return x |
| 280 | + |
| 281 | + @mark.task |
| 282 | + def cancel(job_name_part): |
| 283 | + import subprocess as sp |
| 284 | + |
| 285 | + # getting the job_id of the first job that sleeps |
| 286 | + job_id = "" |
| 287 | + while job_id == "": |
| 288 | + time.sleep(1) |
| 289 | + id_p1 = sp.Popen(["squeue"], stdout=sp.PIPE) |
| 290 | + id_p2 = sp.Popen( |
| 291 | + ["grep", job_name_part], stdin=id_p1.stdout, stdout=sp.PIPE |
| 292 | + ) |
| 293 | + id_p3 = sp.Popen(["awk", "{print $1}"], stdin=id_p2.stdout, stdout=sp.PIPE) |
| 294 | + job_id = id_p3.communicate()[0].decode("utf-8").strip() |
| 295 | + |
| 296 | + # # canceling the job |
| 297 | + proc1 = sp.run(["scancel", job_id]) |
| 298 | + # checking the status of jobs with the name; returning the last item |
| 299 | + proc2 = sp.run(["sacct", "-j", job_id], stdout=sp.PIPE, stderr=sp.PIPE) |
| 300 | + return proc2.stdout.decode("utf-8").strip() # .split("\n")[-1] |
| 301 | + |
| 302 | + wf = Workflow(name="wf", input_spec=["x", "job_name"], cache_dir=tmpdir) |
| 303 | + wf.add(sleep(name="sleep", x=wf.lzin.x)) |
| 304 | + wf.add(cancel(nane="cancel", job_name_part=wf.lzin.job_name)) |
| 305 | + # this is a job name for x=10, if x is different checksum and jobname would have to be updated |
| 306 | + wf.inputs.x = 20 |
| 307 | + wf.inputs.job_name = "sleep" |
| 308 | + |
| 309 | + wf.set_output([("out", wf.sleep.lzout.out), ("canc_out", wf.cancel.lzout.out)]) |
| 310 | + with Submitter("slurm") as sub: |
| 311 | + sub(wf) |
| 312 | + |
| 313 | + res = wf.result() |
| 314 | + assert res.output.out == 20 |
| 315 | + # checking if indeed the sleep-task job was cancelled by cancel-task |
| 316 | + assert "CANCELLED" in res.output.canc_out |
| 317 | + breakpoint() |
| 318 | + script_dir = tmpdir / "SlurmWorker_scripts" |
| 319 | + assert script_dir.exists() |
0 commit comments