7
7
import pytest
8
8
9
9
from .utils import (
10
+ need_sge ,
11
+ need_slurm ,
10
12
gen_basic_wf ,
11
13
gen_basic_wf_with_threadcount ,
12
14
gen_basic_wf_with_threadcount_concurrent ,
13
15
)
14
16
from ..core import Workflow
15
- from ..task import ShellCommandTask
16
17
from ..submitter import Submitter
17
18
from ... import mark
18
19
from pathlib import Path
19
- import uuid
20
20
from datetime import datetime
21
21
22
- slurm_available = bool (shutil .which ("sbatch" ))
23
- sge_available = bool (shutil .which ("qsub" ))
24
-
25
22
26
23
@mark .task
27
24
def sleep_add_one (x ):
@@ -182,7 +179,7 @@ def test_serial_wf():
182
179
assert res .output .out == 9
183
180
184
181
185
- @pytest . mark . skipif ( not slurm_available , reason = "slurm not installed" )
182
+ @need_slurm
186
183
def test_slurm_wf (tmpdir ):
187
184
wf = gen_basic_wf ()
188
185
wf .cache_dir = tmpdir
@@ -198,7 +195,7 @@ def test_slurm_wf(tmpdir):
198
195
assert len ([sd for sd in script_dir .listdir () if sd .isdir ()]) == 2
199
196
200
197
201
- @pytest . mark . skipif ( not slurm_available , reason = "slurm not installed" )
198
+ @need_slurm
202
199
def test_slurm_wf_cf (tmpdir ):
203
200
# submit entire workflow as single job executing with cf worker
204
201
wf = gen_basic_wf ()
@@ -217,7 +214,7 @@ def test_slurm_wf_cf(tmpdir):
217
214
assert sdirs [0 ].basename == wf .uid
218
215
219
216
220
- @pytest . mark . skipif ( not slurm_available , reason = "slurm not installed" )
217
+ @need_slurm
221
218
def test_slurm_wf_state (tmpdir ):
222
219
wf = gen_basic_wf ()
223
220
wf .split ("x" )
@@ -234,7 +231,7 @@ def test_slurm_wf_state(tmpdir):
234
231
assert len (sdirs ) == 2 * len (wf .inputs .x )
235
232
236
233
237
- @pytest . mark . skipif ( not slurm_available , reason = "slurm not installed" )
234
+ @need_slurm
238
235
@pytest .mark .flaky (reruns = 3 )
239
236
def test_slurm_max_jobs (tmpdir ):
240
237
wf = Workflow ("new_wf" , input_spec = ["x" , "y" ], cache_dir = tmpdir )
@@ -274,7 +271,7 @@ def test_slurm_max_jobs(tmpdir):
274
271
assert (prev - et ).seconds >= 2
275
272
276
273
277
- @pytest . mark . skipif ( not slurm_available , reason = "slurm not installed" )
274
+ @need_slurm
278
275
def test_slurm_args_1 (tmpdir ):
279
276
"""testing sbatch_args provided to the submitter"""
280
277
task = sleep_add_one (x = 1 )
@@ -289,7 +286,7 @@ def test_slurm_args_1(tmpdir):
289
286
assert script_dir .exists ()
290
287
291
288
292
- @pytest . mark . skipif ( not slurm_available , reason = "slurm not installed" )
289
+ @need_slurm
293
290
def test_slurm_args_2 (tmpdir ):
294
291
"""testing sbatch_args provided to the submitter
295
292
exception should be raised for invalid options
@@ -339,7 +336,7 @@ def cancel(job_name_part):
339
336
340
337
341
338
@pytest .mark .flaky (reruns = 1 )
342
- @pytest . mark . skipif ( not slurm_available , reason = "slurm not installed" )
339
+ @need_slurm
343
340
def test_slurm_cancel_rerun_1 (tmpdir ):
344
341
"""testing that tasks run with slurm is re-queue
345
342
Running wf with 2 tasks, one sleeps and the other trying to get
@@ -372,7 +369,7 @@ def test_slurm_cancel_rerun_1(tmpdir):
372
369
373
370
374
371
@pytest .mark .flaky (reruns = 1 )
375
- @pytest . mark . skipif ( not slurm_available , reason = "slurm not installed" )
372
+ @need_slurm
376
373
def test_slurm_cancel_rerun_2 (tmpdir ):
377
374
"""testing that tasks run with slurm that has --no-requeue
378
375
Running wf with 2 tasks, one sleeps and the other gets
@@ -392,7 +389,7 @@ def test_slurm_cancel_rerun_2(tmpdir):
392
389
sub (wf )
393
390
394
391
395
- @pytest . mark . skipif ( not sge_available , reason = "sge not installed" )
392
+ @need_sge
396
393
def test_sge_wf (tmpdir ):
397
394
"""testing that a basic workflow can be run with the SGEWorker"""
398
395
wf = gen_basic_wf ()
@@ -412,7 +409,7 @@ def test_sge_wf(tmpdir):
412
409
assert len ([sd for sd in script_dir .listdir () if sd .isdir ()]) == 2
413
410
414
411
415
- @pytest . mark . skipif ( not sge_available , reason = "sge not installed" )
412
+ @need_sge
416
413
def test_sge_wf_cf (tmpdir ):
417
414
"""testing the SGEWorker can submit SGE tasks while the workflow
418
415
uses the concurrent futures plugin"""
@@ -433,7 +430,7 @@ def test_sge_wf_cf(tmpdir):
433
430
assert Path (sdirs [0 ]).name == wf .uid
434
431
435
432
436
- @pytest . mark . skipif ( not sge_available , reason = "sge not installed" )
433
+ @need_sge
437
434
def test_sge_wf_state (tmpdir ):
438
435
"""testing the SGEWorker can be used with a workflow with state"""
439
436
wf = gen_basic_wf ()
@@ -466,7 +463,7 @@ def qacct_output_to_dict(qacct_output):
466
463
return stdout_dict
467
464
468
465
469
- @pytest . mark . skipif ( not sge_available , reason = "sge not installed" )
466
+ @need_sge
470
467
def test_sge_set_threadcount (tmpdir ):
471
468
"""testing the number of threads for an SGEWorker task can be set
472
469
using the input_spec variable sgeThreads"""
@@ -496,7 +493,7 @@ def test_sge_set_threadcount(tmpdir):
496
493
assert int (out_job1_dict ["slots" ][0 ]) == 1
497
494
498
495
499
- @pytest . mark . skipif ( not sge_available , reason = "sge not installed" )
496
+ @need_sge
500
497
def test_sge_limit_maxthreads (tmpdir ):
501
498
"""testing the ability to limit the number of threads used by the SGE
502
499
at one time with the max_threads argument to SGEWorker"""
@@ -540,7 +537,7 @@ def test_sge_limit_maxthreads(tmpdir):
540
537
assert job_1_endtime < job_2_starttime
541
538
542
539
543
- @pytest . mark . skipif ( not sge_available , reason = "sge not installed" )
540
+ @need_sge
544
541
def test_sge_no_limit_maxthreads (tmpdir ):
545
542
"""testing unlimited threads can be used at once by SGE
546
543
when max_threads is not set"""
0 commit comments