Skip to content

Commit dcd21df

Browse files
committed
removing some tests from dask and marking as flaky all tests that are run with dask; fixing bosh check; fixing tests naming
1 parent b26dcfb commit dcd21df

File tree

5 files changed

+69
-64
lines changed

5 files changed

+69
-64
lines changed

pydra/engine/tests/test_boutiques.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,20 @@
1111

1212
need_bosh_docker = pytest.mark.skipif(
1313
shutil.which("docker") is None
14-
or sp.call(["docker", "info"] or sp.call(["bosh", "version"])),
14+
or sp.call(["docker", "info"])
15+
or sp.call(["which", "bosh"]),
1516
reason="requires docker and bosh",
1617
)
1718

18-
if bool(shutil.which("sbatch")):
19-
Plugins = ["cf", "slurm"]
20-
else:
21-
Plugins = ["cf"]
22-
2319
Infile = Path(__file__).resolve().parent / "data_tests" / "test.nii.gz"
2420

2521

2622
@no_win
2723
@need_bosh_docker
28-
@pytest.mark.flaky(reruns=2)
2924
@pytest.mark.parametrize(
3025
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
3126
)
3227
@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter])
33-
@pytest.mark.parametrize("plugin", Plugins)
3428
def test_boutiques_1(maskfile, plugin, results_function):
3529
""" simple task to run fsl.bet using BoshTask"""
3630
btask = BoshTask(name="NA", zenodo_id="1482743")
@@ -99,7 +93,6 @@ def test_boutiques_spec_2():
9993
@pytest.mark.parametrize(
10094
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
10195
)
102-
@pytest.mark.parametrize("plugin", Plugins)
10396
def test_boutiques_wf_1(maskfile, plugin):
10497
""" wf with one task that runs fsl.bet using BoshTask"""
10598
wf = Workflow(name="wf", input_spec=["maskfile", "infile"])
@@ -130,7 +123,6 @@ def test_boutiques_wf_1(maskfile, plugin):
130123
@pytest.mark.parametrize(
131124
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
132125
)
133-
@pytest.mark.parametrize("plugin", Plugins)
134126
def test_boutiques_wf_2(maskfile, plugin):
135127
""" wf with two BoshTasks (fsl.bet and fsl.stats) and one ShellTask"""
136128
wf = Workflow(name="wf", input_spec=["maskfile", "infile"])

pydra/engine/tests/test_node_task.py

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ def test_odir_init():
349349
# Tests for tasks without state (i.e. no splitter)
350350

351351

352+
@pytest.mark.flaky(reruns=2) # when dask
352353
def test_task_nostate_1(plugin_dask_opt):
353354
""" task without splitter"""
354355
nn = fun_addtwo(name="NA", a=3)
@@ -377,6 +378,7 @@ def test_task_nostate_1(plugin_dask_opt):
377378
assert nn.output_dir.exists()
378379

379380

381+
@pytest.mark.flaky(reruns=2) # when dask
380382
def test_task_nostate_1_call_subm(plugin_dask_opt):
381383
""" task without splitter"""
382384
nn = fun_addtwo(name="NA", a=3)
@@ -393,6 +395,7 @@ def test_task_nostate_1_call_subm(plugin_dask_opt):
393395
assert nn.output_dir.exists()
394396

395397

398+
@pytest.mark.flaky(reruns=2) # when dask
396399
def test_task_nostate_1_call_plug(plugin_dask_opt):
397400
""" task without splitter"""
398401
nn = fun_addtwo(name="NA", a=3)
@@ -521,6 +524,7 @@ def test_task_nostate_7():
521524
# Testing caching for tasks without states
522525

523526

527+
@pytest.mark.flaky(reruns=2) # when dask
524528
def test_task_nostate_cachedir(plugin_dask_opt, tmpdir):
525529
""" task with provided cache_dir using pytest tmpdir"""
526530
cache_dir = tmpdir.mkdir("test_task_nostate")
@@ -536,6 +540,7 @@ def test_task_nostate_cachedir(plugin_dask_opt, tmpdir):
536540
assert results.output.out == 5
537541

538542

543+
@pytest.mark.flaky(reruns=2) # when dask
539544
def test_task_nostate_cachedir_relativepath(tmpdir, plugin_dask_opt):
540545
""" task with provided cache_dir as relative path"""
541546
cwd = tmpdir.chdir()
@@ -556,6 +561,7 @@ def test_task_nostate_cachedir_relativepath(tmpdir, plugin_dask_opt):
556561
shutil.rmtree(cache_dir)
557562

558563

564+
@pytest.mark.flaky(reruns=2) # when dask
559565
def test_task_nostate_cachelocations(plugin_dask_opt, tmpdir):
560566
"""
561567
Two identical tasks with provided cache_dir;
@@ -686,6 +692,7 @@ def test_task_nostate_cachelocations_updated(plugin, tmpdir):
686692
# Tests for tasks with states (i.e. with splitter)
687693

688694

695+
@pytest.mark.flaky(reruns=2) # when dask
689696
def test_task_state_1(plugin_dask_opt):
690697
""" task with the simplest splitter"""
691698
nn = fun_addtwo(name="NA").split(splitter="a", a=[3, 5])
@@ -724,7 +731,7 @@ def test_task_state_1(plugin_dask_opt):
724731
assert odir.exists()
725732

726733

727-
def test_task_state_1a(plugin_dask_opt):
734+
def test_task_state_1a(plugin):
728735
""" task with the simplest splitter (inputs set separately)"""
729736
nn = fun_addtwo(name="NA")
730737
nn.split(splitter="a")
@@ -734,7 +741,7 @@ def test_task_state_1a(plugin_dask_opt):
734741
assert nn.state.splitter_rpn == ["NA.a"]
735742
assert (nn.inputs.a == np.array([3, 5])).all()
736743

737-
with Submitter(plugin=plugin_dask_opt) as sub:
744+
with Submitter(plugin=plugin) as sub:
738745
sub(nn)
739746

740747
# checking the results
@@ -744,6 +751,33 @@ def test_task_state_1a(plugin_dask_opt):
744751
assert results[i].output.out == res[1]
745752

746753

754+
def test_task_state_singl_1(plugin):
755+
""" Tasks with two inputs and a splitter (no combiner)
756+
one input is a single value, the other is in the splitter and combiner
757+
"""
758+
nn = fun_addvar(name="NA").split(splitter="a", a=[3, 5], b=10)
759+
760+
assert nn.inputs.a == [3, 5]
761+
assert nn.inputs.b == 10
762+
assert nn.state.splitter == "NA.a"
763+
assert nn.state.splitter_rpn == ["NA.a"]
764+
assert nn.state.splitter_final == "NA.a"
765+
assert nn.state.splitter_rpn_final == ["NA.a"]
766+
767+
with Submitter(plugin=plugin) as sub:
768+
sub(nn)
769+
770+
# checking the results
771+
expected = [({"NA.a": 3, "NA.b": 10}, 13), ({"NA.a": 5, "NA.b": 10}, 15)]
772+
results = nn.result()
773+
for i, res in enumerate(expected):
774+
assert results[i].output.out == res[1]
775+
# checking the output_dir
776+
assert nn.output_dir
777+
for odir in nn.output_dir:
778+
assert odir.exists()
779+
780+
747781
@pytest.mark.parametrize(
748782
"splitter, state_splitter, state_rpn, expected, expected_ind",
749783
[
@@ -774,7 +808,7 @@ def test_task_state_1a(plugin_dask_opt):
774808
],
775809
)
776810
def test_task_state_2(
777-
plugin_dask_opt, splitter, state_splitter, state_rpn, expected, expected_ind
811+
plugin, splitter, state_splitter, state_rpn, expected, expected_ind
778812
):
779813
""" Tasks with two inputs and a splitter (no combiner)"""
780814
nn = fun_addvar(name="NA").split(splitter=splitter, a=[3, 5], b=[10, 20])
@@ -786,7 +820,7 @@ def test_task_state_2(
786820
assert nn.state.splitter_final == state_splitter
787821
assert nn.state.splitter_rpn_final == state_rpn
788822

789-
with Submitter(plugin=plugin_dask_opt) as sub:
823+
with Submitter(plugin=plugin) as sub:
790824
sub(nn)
791825

792826
# checking the results
@@ -814,34 +848,7 @@ def test_task_state_2(
814848
assert odir.exists()
815849

816850

817-
def test_task_state_singl_1(plugin_dask_opt):
818-
""" Tasks with two inputs and a splitter (no combiner)
819-
one input is a single value, the other is in the splitter and combiner
820-
"""
821-
nn = fun_addvar(name="NA").split(splitter="a", a=[3, 5], b=10)
822-
823-
assert nn.inputs.a == [3, 5]
824-
assert nn.inputs.b == 10
825-
assert nn.state.splitter == "NA.a"
826-
assert nn.state.splitter_rpn == ["NA.a"]
827-
assert nn.state.splitter_final == "NA.a"
828-
assert nn.state.splitter_rpn_final == ["NA.a"]
829-
830-
with Submitter(plugin=plugin_dask_opt) as sub:
831-
sub(nn)
832-
833-
# checking the results
834-
expected = [({"NA.a": 3, "NA.b": 10}, 13), ({"NA.a": 5, "NA.b": 10}, 15)]
835-
results = nn.result()
836-
for i, res in enumerate(expected):
837-
assert results[i].output.out == res[1]
838-
# checking the output_dir
839-
assert nn.output_dir
840-
for odir in nn.output_dir:
841-
assert odir.exists()
842-
843-
844-
def test_task_state_2(plugin):
851+
def test_task_state_3(plugin):
845852
""" task with the simplest splitter, the input is an empty list"""
846853
nn = fun_addtwo(name="NA").split(splitter="a", a=[])
847854

@@ -861,7 +868,7 @@ def test_task_state_2(plugin):
861868
assert nn.output_dir == []
862869

863870

864-
def test_task_state_3(plugin):
871+
def test_task_state_4(plugin):
865872
""" task with a list as an input, and a simple splitter """
866873
nn = moment(name="NA", n=3, lst=[[2, 3, 4], [1, 2, 3]]).split(splitter="lst")
867874
assert np.allclose(nn.inputs.n, 3)
@@ -881,7 +888,7 @@ def test_task_state_3(plugin):
881888
assert odir.exists()
882889

883890

884-
def test_task_state_3a(plugin):
891+
def test_task_state_4a(plugin):
885892
""" task with a tuple as an input, and a simple splitter """
886893
nn = moment(name="NA", n=3, lst=[(2, 3, 4), (1, 2, 3)]).split(splitter="lst")
887894
assert np.allclose(nn.inputs.n, 3)
@@ -901,7 +908,7 @@ def test_task_state_3a(plugin):
901908
assert odir.exists()
902909

903910

904-
def test_task_state_4(plugin):
911+
def test_task_state_5(plugin):
905912
""" task with a list as an input, and the variable is part of the scalar splitter"""
906913
nn = moment(name="NA", n=[1, 3], lst=[[2, 3, 4], [1, 2, 3]]).split(
907914
splitter=("n", "lst")
@@ -923,7 +930,7 @@ def test_task_state_4(plugin):
923930
assert odir.exists()
924931

925932

926-
def test_task_state_4_exception(plugin):
933+
def test_task_state_5_exception(plugin):
927934
""" task with a list as an input, and the variable is part of the scalar splitter
928935
the shapes are not matching, so exception should be raised
929936
"""
@@ -940,7 +947,7 @@ def test_task_state_4_exception(plugin):
940947
assert "shape" in str(excinfo.value)
941948

942949

943-
def test_task_state_5(plugin_dask_opt):
950+
def test_task_state_6(plugin):
944951
""" ask with a list as an input, and the variable is part of the outer splitter """
945952
nn = moment(name="NA", n=[1, 3], lst=[[2, 3, 4], [1, 2, 3]]).split(
946953
splitter=["n", "lst"]
@@ -949,7 +956,7 @@ def test_task_state_5(plugin_dask_opt):
949956
assert np.allclose(nn.inputs.lst, [[2, 3, 4], [1, 2, 3]])
950957
assert nn.state.splitter == ["NA.n", "NA.lst"]
951958

952-
with Submitter(plugin=plugin_dask_opt) as sub:
959+
with Submitter(plugin=plugin) as sub:
953960
sub(nn)
954961

955962
# checking the results
@@ -962,7 +969,7 @@ def test_task_state_5(plugin_dask_opt):
962969
assert odir.exists()
963970

964971

965-
def test_task_state_5a(plugin):
972+
def test_task_state_6a(plugin):
966973
""" ask with a tuple as an input, and the variable is part of the outer splitter """
967974
nn = moment(name="NA", n=[1, 3], lst=[(2, 3, 4), (1, 2, 3)]).split(
968975
splitter=["n", "lst"]
@@ -1164,7 +1171,7 @@ def test_task_state_comb_2(
11641171
assert odir.exists()
11651172

11661173

1167-
def test_task_state_comb_singl_1(plugin_dask_opt):
1174+
def test_task_state_comb_singl_1(plugin):
11681175
""" Tasks with two inputs;
11691176
one input is a single value, the other is in the splitter and combiner
11701177
"""
@@ -1178,7 +1185,7 @@ def test_task_state_comb_singl_1(plugin_dask_opt):
11781185
assert nn.state.splitter_final == None
11791186
assert nn.state.splitter_rpn_final == []
11801187

1181-
with Submitter(plugin=plugin_dask_opt) as sub:
1188+
with Submitter(plugin=plugin) as sub:
11821189
sub(nn)
11831190

11841191
# checking the results
@@ -1290,7 +1297,7 @@ def test_task_state_cachedir(plugin_dask_opt, tmpdir):
12901297
assert results[i].output.out == res[1]
12911298

12921299

1293-
def test_task_state_cachelocations(plugin_dask_opt, tmpdir):
1300+
def test_task_state_cachelocations(plugin, tmpdir):
12941301
"""
12951302
Two identical tasks with a state and cache_dir;
12961303
the second task has cache_locations and should not recompute the results
@@ -1299,13 +1306,13 @@ def test_task_state_cachelocations(plugin_dask_opt, tmpdir):
12991306
cache_dir2 = tmpdir.mkdir("test_task_nostate2")
13001307

13011308
nn = fun_addtwo(name="NA", a=3, cache_dir=cache_dir).split(splitter="a", a=[3, 5])
1302-
with Submitter(plugin=plugin_dask_opt) as sub:
1309+
with Submitter(plugin=plugin) as sub:
13031310
sub(nn)
13041311

13051312
nn2 = fun_addtwo(
13061313
name="NA", a=3, cache_dir=cache_dir2, cache_locations=cache_dir
13071314
).split(splitter="a", a=[3, 5])
1308-
with Submitter(plugin=plugin_dask_opt) as sub:
1315+
with Submitter(plugin=plugin) as sub:
13091316
sub(nn2)
13101317

13111318
# checking the results

pydra/engine/tests/test_shelltask.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
pytest.skip("SLURM not available in windows", allow_module_level=True)
1919

2020

21+
@pytest.mark.flaky(reruns=2) # when dask
2122
@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter])
2223
def test_shell_cmd_1(plugin_dask_opt, results_function):
2324
""" simple command, no arguments """
@@ -94,6 +95,7 @@ def test_shell_cmd_2b(plugin, results_function):
9495
# tests with State
9596

9697

98+
@pytest.mark.flaky(reruns=2)
9799
def test_shell_cmd_3(plugin_dask_opt):
98100
""" commands without arguments
99101
splitter = executable
@@ -1339,6 +1341,7 @@ def test_shell_cmd_inputspec_copyfile_state_1(plugin, results_function, tmpdir):
13391341
# customised input_spec in Workflow
13401342

13411343

1344+
@pytest.mark.flaky(reruns=2) # when dask
13421345
def test_wf_shell_cmd_2(plugin_dask_opt):
13431346
""" a workflow with input with defined output_file_template (str)
13441347
that requires wf.lzin
@@ -1605,7 +1608,7 @@ def test_wf_shell_cmd_3a(plugin):
16051608
assert res.output.cp_file.exists()
16061609

16071610

1608-
def test_wf_shell_cmd_state_1(plugin_dask_opt):
1611+
def test_wf_shell_cmd_state_1(plugin):
16091612
""" a workflow with 2 tasks and splitter on the wf level,
16101613
first one has input with output_file_template (str, uses wf.lzin),
16111614
that is passed to the second task
@@ -1683,7 +1686,7 @@ def test_wf_shell_cmd_state_1(plugin_dask_opt):
16831686
]
16841687
)
16851688

1686-
with Submitter(plugin=plugin_dask_opt) as sub:
1689+
with Submitter(plugin=plugin) as sub:
16871690
wf(submitter=sub)
16881691

16891692
res_l = wf.result()
@@ -1694,7 +1697,7 @@ def test_wf_shell_cmd_state_1(plugin_dask_opt):
16941697
assert res.output.cp_file.exists()
16951698

16961699

1697-
def test_wf_shell_cmd_ndst_1(plugin_dask_opt):
1700+
def test_wf_shell_cmd_ndst_1(plugin):
16981701
""" a workflow with 2 tasks and a splitter on the node level,
16991702
first one has input with output_file_template (str, uses wf.lzin),
17001703
that is passed to the second task
@@ -1772,7 +1775,7 @@ def test_wf_shell_cmd_ndst_1(plugin_dask_opt):
17721775
]
17731776
)
17741777

1775-
with Submitter(plugin=plugin_dask_opt) as sub:
1778+
with Submitter(plugin=plugin) as sub:
17761779
wf(submitter=sub)
17771780

17781781
res = wf.result()

pydra/engine/tests/test_submitter.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ def test_wf_in_wf(plugin):
103103
assert res.output.out == 7
104104

105105

106+
@pytest.mark.flaky(reruns=2) # when dask
106107
def test_wf2(plugin_dask_opt):
107108
""" workflow as a node
108109
workflow-node with one task and no splitter
@@ -123,6 +124,7 @@ def test_wf2(plugin_dask_opt):
123124
assert res.output.out == 3
124125

125126

127+
@pytest.mark.flaky(reruns=2) # when dask
126128
def test_wf_with_state(plugin_dask_opt):
127129
wf = Workflow(name="wf_with_state", input_spec=["x"])
128130
wf.add(sleep_add_one(name="taska", x=wf.lzin.x))

0 commit comments

Comments
 (0)