Skip to content

Commit 95759b7

Browse files
committed
added "cf" to test matrix
1 parent 68a2b6e commit 95759b7

File tree

2 files changed

+66
-51
lines changed

2 files changed

+66
-51
lines changed

pydra/conftest.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def pytest_generate_tests(metafunc):
2020
if bool(shutil.which("sbatch")):
2121
Plugins = ["slurm"]
2222
else:
23-
Plugins = ["debug"] # ["debug", "cf"]
23+
Plugins = ["debug", "cf"]
2424
try:
2525
if metafunc.config.getoption("dask"):
2626
Plugins.append("dask")
@@ -50,7 +50,7 @@ def pytest_generate_tests(metafunc):
5050
elif bool(shutil.which("sbatch")):
5151
Plugins = ["slurm"]
5252
else:
53-
Plugins = ["debug"] # ["debug", "cf"]
53+
Plugins = ["debug", "cf"]
5454
try:
5555
if metafunc.config.getoption("psij"):
5656
Plugins.append("psij-" + metafunc.config.getoption("psij"))
@@ -63,6 +63,30 @@ def pytest_generate_tests(metafunc):
6363
pass
6464
metafunc.parametrize("plugin", Plugins)
6565

66+
if "plugin_parallel" in metafunc.fixturenames:
67+
use_dask = False
68+
try:
69+
use_dask = metafunc.config.getoption("dask")
70+
except ValueError:
71+
pass
72+
if use_dask:
73+
Plugins = []
74+
elif bool(shutil.which("sbatch")):
75+
Plugins = ["slurm"]
76+
else:
77+
Plugins = ["cf"]
78+
try:
79+
if metafunc.config.getoption("psij"):
80+
Plugins.append("psij-" + metafunc.config.getoption("psij"))
81+
if (
82+
bool(shutil.which("sbatch"))
83+
and metafunc.config.getoption("psij") == "slurm"
84+
):
85+
Plugins.remove("slurm")
86+
except ValueError:
87+
pass
88+
metafunc.parametrize("plugin_parallel", Plugins)
89+
6690

6791
# For debugging in IDE's don't catch raised exceptions and let the IDE
6892
# break at it

pydra/engine/tests/test_workflow.py

Lines changed: 40 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,7 +1227,7 @@ def Worky(x):
12271227
# workflows with structures A -> B -> C with multiple connections
12281228

12291229

1230-
def test_wf_3nd_8(tmp_path: Path):
1230+
def test_wf_3nd_8(plugin_parallel, tmp_path: Path):
12311231
"""workflow with three tasks A->B->C vs two tasks A->C with multiple connections"""
12321232

12331233
@workflow.define(outputs=["out1", "out2", "out1a", "out2a"])
@@ -1254,7 +1254,7 @@ def Worky(zip):
12541254

12551255
worky = Worky(zip=[["test1", "test3", "test5"], ["test2", "test4", "test6"]])
12561256

1257-
with Submitter(worker="cf") as sub:
1257+
with Submitter(worker=plugin_parallel) as sub:
12581258
res = sub(worky)
12591259

12601260
assert (
@@ -2208,11 +2208,9 @@ def Worky2(x, y):
22082208

22092209

22102210
@pytest.mark.flaky(reruns=3)
2211-
def test_wf_nostate_cachelocations_a(plugin: str, tmp_path: Path):
2211+
def test_wf_nostate_cachelocations_a(plugin_parallel, plugin: str, tmp_path: Path):
22122212
"""
2213-
the same as previous test, but workflows names differ;
2214-
the task should not be run and it should be fast,
2215-
but the worky itself is triggered and the new output dir is created
2213+
the same as previous test, but workflows differ
22162214
"""
22172215
cache_dir1 = tmp_path / "test_wf_cache3"
22182216
cache_dir1.mkdir()
@@ -2238,7 +2236,6 @@ def Worky1(x, y):
22382236

22392237
@workflow.define
22402238
def Worky2(x, y):
2241-
22422239
mult = workflow.add(Divide(x=x, y=y), name="mult")
22432240
add2 = workflow.add(Add2Wait(x=mult.out), name="add2")
22442241
return add2.out
@@ -2252,17 +2249,9 @@ def Worky2(x, y):
22522249
results2 = sub(worky2)
22532250

22542251
assert not results2.errored, "\n".join(results2.errors["error message"])
2255-
t2 = time.time() - t0
22562252

22572253
assert 2 == results2.outputs.out
22582254

2259-
# for win and dask/slurm the time for dir creation etc. might take much longer
2260-
if not sys.platform.startswith("win") and plugin == "cf":
2261-
# checking execution time (second one should be quick)
2262-
assert t1 > 2
2263-
# testing relative values (windows or slurm takes much longer to create worky itself)
2264-
assert t2 < max(1, t1 - 1)
2265-
22662255
# checking if both output_dirs are created
22672256
assert results1.output_dir != results2.output_dir
22682257

@@ -2328,7 +2317,9 @@ def Worky2(x, y):
23282317

23292318

23302319
@pytest.mark.flaky(reruns=3)
2331-
def test_wf_nostate_cachelocations_setoutputchange(plugin: str, tmp_path: Path):
2320+
def test_wf_nostate_cachelocations_setoutputchange(
2321+
plugin_parallel, plugin: str, tmp_path: Path
2322+
):
23322323
"""
23332324
the same as previous test, but worky output names differ,
23342325
the tasks should not be run and it should be fast,
@@ -3601,7 +3592,7 @@ def Worky(x):
36013592
assert val.fspath == outputs._output_dir / file_list[ii]
36023593

36033594

3604-
def test_wf_upstream_error1(tmp_path: Path):
3595+
def test_wf_upstream_error1(plugin_parallel: str, tmp_path: Path):
36053596
"""workflow with two tasks, task2 dependent on an task1 which raised an error"""
36063597

36073598
@workflow.define
@@ -3613,12 +3604,12 @@ def Worky(x):
36133604
worky = Worky(x="hi") # TypeError for adding str and int
36143605

36153606
with pytest.raises(RuntimeError) as excinfo:
3616-
worky(worker="cf", cache_dir=tmp_path)
3607+
worky(worker=plugin_parallel, cache_dir=tmp_path)
36173608
assert "addvar1" in str(excinfo.value)
36183609
assert "failed with errors" in str(excinfo.value)
36193610

36203611

3621-
def test_wf_upstream_error2(tmp_path: Path):
3612+
def test_wf_upstream_error2(plugin_parallel: str, tmp_path: Path):
36223613
"""task2 dependent on task1, task1 errors, workflow-level split on task 1
36233614
goal - workflow finish running, one output errors but the other doesn't
36243615
"""
@@ -3635,13 +3626,13 @@ def Worky(x):
36353626
) # workflow-level split TypeError for adding str and int
36363627

36373628
with pytest.raises(Exception) as excinfo:
3638-
worky(worker="cf", cache_dir=tmp_path)
3629+
worky(worker=plugin_parallel, cache_dir=tmp_path)
36393630
assert "addvar1" in str(excinfo.value)
36403631
assert "failed with errors" in str(excinfo.value)
36413632

36423633

36433634
@pytest.mark.flaky(reruns=2) # when slurm
3644-
def test_wf_upstream_error3(plugin: str, tmp_path: Path):
3635+
def test_wf_upstream_error3(plugin_parallel: str, tmp_path: Path):
36453636
"""task2 dependent on task1, task1 errors, task-level split on task 1
36463637
goal - workflow finish running, one output errors but the other doesn't
36473638
"""
@@ -3655,12 +3646,12 @@ def Worky(x):
36553646

36563647
worky = Worky(x=[1, "hi"]) # TypeError for adding str and int
36573648
with pytest.raises(RuntimeError) as excinfo:
3658-
worky(worker="cf", cache_dir=tmp_path)
3649+
worky(worker=plugin_parallel, cache_dir=tmp_path)
36593650
assert "addvar1" in str(excinfo.value)
36603651
assert "failed with errors" in str(excinfo.value)
36613652

36623653

3663-
def test_wf_upstream_error4(tmp_path: Path):
3654+
def test_wf_upstream_error4(plugin_parallel: str, tmp_path: Path):
36643655
"""workflow with one task, which raises an error"""
36653656

36663657
@workflow.define
@@ -3671,12 +3662,12 @@ def Worky(x):
36713662

36723663
worky = Worky(x="hi") # TypeError for adding str and int
36733664
with pytest.raises(Exception) as excinfo:
3674-
worky(worker="cf", cache_dir=tmp_path)
3665+
worky(worker=plugin_parallel, cache_dir=tmp_path)
36753666
assert "failed with errors" in str(excinfo.value)
36763667
assert "addvar1" in str(excinfo.value)
36773668

36783669

3679-
def test_wf_upstream_error5(tmp_path: Path):
3670+
def test_wf_upstream_error5(plugin_parallel: str, tmp_path: Path):
36803671
"""nested workflow with one task, which raises an error"""
36813672

36823673
@workflow.define
@@ -3692,13 +3683,13 @@ def WfMain(x):
36923683
wf_main = WfMain(x="hi") # TypeError for adding str and int
36933684

36943685
with pytest.raises(Exception) as excinfo:
3695-
wf_main(worker="cf", cache_dir=tmp_path)
3686+
wf_main(worker=plugin_parallel, cache_dir=tmp_path)
36963687

36973688
assert "addvar1" in str(excinfo.value)
36983689
assert "failed with errors" in str(excinfo.value)
36993690

37003691

3701-
def test_wf_upstream_error6(tmp_path: Path):
3692+
def test_wf_upstream_error6(plugin_parallel: str, tmp_path: Path):
37023693
"""nested workflow with two tasks, the first one raises an error"""
37033694

37043695
@workflow.define(outputs=["wf_out"])
@@ -3716,13 +3707,13 @@ def WfMain(x):
37163707
wf_main = WfMain(x="hi") # TypeError for adding str and int
37173708

37183709
with pytest.raises(RuntimeError) as excinfo:
3719-
wf_main(worker="cf", cache_dir=tmp_path)
3710+
wf_main(worker=plugin_parallel, cache_dir=tmp_path)
37203711

37213712
assert "addvar1" in str(excinfo.value)
37223713
assert "failed with errors" in str(excinfo.value)
37233714

37243715

3725-
def test_wf_upstream_error7(tmp_path: Path):
3716+
def test_wf_upstream_error7(plugin_parallel: str, tmp_path: Path):
37263717
"""
37273718
workflow with three sequential tasks, the first task raises an error
37283719
the last task is set as the workflow output
@@ -3738,7 +3729,7 @@ def Worky(x):
37383729

37393730
worky = Worky(x="hi") # TypeError for adding str and int
37403731

3741-
with Submitter(worker="cf", cache_dir=tmp_path) as sub:
3732+
with Submitter(worker=plugin_parallel, cache_dir=tmp_path) as sub:
37423733
results = sub(worky)
37433734
error_message = "".join(results.errors["error message"])
37443735
assert "addvar1" in error_message
@@ -3750,7 +3741,7 @@ def Worky(x):
37503741
assert list(graph["addvar3"].unrunnable.values()) == [[graph["addvar2"]]]
37513742

37523743

3753-
def test_wf_upstream_error7a(tmp_path: Path):
3744+
def test_wf_upstream_error7a(plugin_parallel: str, tmp_path: Path):
37543745
"""
37553746
workflow with three sequential tasks, the first task raises an error
37563747
the second task is set as the workflow output
@@ -3765,7 +3756,7 @@ def Worky(x):
37653756
return addvar3.out
37663757

37673758
worky = Worky(x="hi") # TypeError for adding str and int
3768-
with Submitter(worker="cf", cache_dir=tmp_path) as sub:
3759+
with Submitter(worker=plugin_parallel, cache_dir=tmp_path) as sub:
37693760
results = sub(worky)
37703761
error_message = "".join(results.errors["error message"])
37713762
assert "addvar1" in error_message
@@ -3777,7 +3768,7 @@ def Worky(x):
37773768
assert list(graph["addvar3"].unrunnable.values()) == [[graph["addvar2"]]]
37783769

37793770

3780-
def test_wf_upstream_error7b(tmp_path: Path):
3771+
def test_wf_upstream_error7b(plugin_parallel: str, tmp_path: Path):
37813772
"""
37823773
workflow with three sequential tasks, the first task raises an error
37833774
the second and the third tasks are set as the workflow output
@@ -3792,7 +3783,7 @@ def Worky(x):
37923783
return addvar2.out, addvar3.out #
37933784

37943785
worky = Worky(x="hi") # TypeError for adding str and int
3795-
with Submitter(worker="cf", cache_dir=tmp_path) as sub:
3786+
with Submitter(worker=plugin_parallel, cache_dir=tmp_path) as sub:
37963787
results = sub(worky)
37973788
error_message = "".join(results.errors["error message"])
37983789
assert "addvar1" in error_message
@@ -3804,7 +3795,7 @@ def Worky(x):
38043795
assert list(graph["addvar3"].unrunnable.values()) == [[graph["addvar2"]]]
38053796

38063797

3807-
def test_wf_upstream_error8(tmp_path: Path):
3798+
def test_wf_upstream_error8(plugin_parallel: str, tmp_path: Path):
38083799
"""workflow with three tasks, the first one raises an error, so 2 others are removed"""
38093800

38103801
@workflow.define(outputs=["out1", "out2"])
@@ -3816,7 +3807,7 @@ def Worky(x):
38163807
return addvar2.out, addtwo.out #
38173808

38183809
worky = Worky(x="hi") # TypeError for adding str and int
3819-
with Submitter(worker="cf", cache_dir=tmp_path) as sub:
3810+
with Submitter(worker=plugin_parallel, cache_dir=tmp_path) as sub:
38203811
results = sub(worky)
38213812
error_message = "".join(results.errors["error message"])
38223813
assert "addvar1" in error_message
@@ -3829,7 +3820,7 @@ def Worky(x):
38293820
assert list(graph["addtwo"].unrunnable.values()) == [[graph["addvar1"]]]
38303821

38313822

3832-
def test_wf_upstream_error9(plugin: str, tmp_path: Path):
3823+
def test_wf_upstream_error9(plugin_parallel, tmp_path: Path):
38333824
"""
38343825
workflow with five tasks with two "branches",
38353826
one branch has an error, the second is fine
@@ -3846,7 +3837,7 @@ def Worky(x):
38463837
return follow_err.out # out1
38473838

38483839
worky = Worky(x=2)
3849-
with Submitter(worker="cf", cache_dir=tmp_path) as sub:
3840+
with Submitter(worker=plugin_parallel, cache_dir=tmp_path) as sub:
38503841
results = sub(worky)
38513842
error_message = "".join(results.errors["error message"])
38523843
assert "err" in error_message
@@ -3857,7 +3848,7 @@ def Worky(x):
38573848
assert list(graph["follow_err"].unrunnable.values()) == [[graph["err"]]]
38583849

38593850

3860-
def test_wf_upstream_error9a(plugin: str, tmp_path: Path):
3851+
def test_wf_upstream_error9a(plugin_parallel: str, tmp_path: Path):
38613852
"""
38623853
workflow with five tasks with two "branches",
38633854
one branch has an error, the second is fine
@@ -3876,7 +3867,7 @@ def Worky(x):
38763867

38773868
worky = Worky(x=2)
38783869

3879-
with Submitter(worker="cf", cache_dir=tmp_path) as sub:
3870+
with Submitter(worker=plugin_parallel, cache_dir=tmp_path) as sub:
38803871
results = sub(worky)
38813872
error_message = "".join(results.errors["error message"])
38823873
assert "err" in error_message
@@ -3887,7 +3878,7 @@ def Worky(x):
38873878
assert list(graph["follow_err"].unrunnable.values()) == [[graph["err"]]]
38883879

38893880

3890-
def test_wf_upstream_error9b(plugin: str, tmp_path: Path):
3881+
def test_wf_upstream_error9b(plugin_parallel: str, tmp_path: Path):
38913882
"""
38923883
workflow with five tasks with two "branches",
38933884
one branch has an error, the second is fine
@@ -3905,7 +3896,7 @@ def Worky(x):
39053896

39063897
worky = Worky(x=2)
39073898

3908-
with Submitter(worker="cf", cache_dir=tmp_path) as sub:
3899+
with Submitter(worker=plugin_parallel, cache_dir=tmp_path) as sub:
39093900
results = sub(worky)
39103901
error_message = "".join(results.errors["error message"])
39113902
assert "err" in error_message
@@ -4405,7 +4396,7 @@ def Worky(x, y):
44054396

44064397

44074398
@pytest.mark.timeout(20)
4408-
def test_duplicate_input_on_split_wf(tmp_path: Path):
4399+
def test_duplicate_input_on_split_wf(plugin_parallel, tmp_path: Path):
44094400
"""checking if the workflow gets stuck if it has to run two tasks with equal checksum;
44104401
This can occur when splitting on a list containing duplicate values.
44114402
"""
@@ -4422,13 +4413,13 @@ def Worky(text):
44224413

44234414
worky = Worky().split(text=text)
44244415

4425-
outputs = worky(worker="cf", n_procs=6)
4416+
outputs = worky(worker=plugin_parallel, n_procs=6)
44264417

44274418
assert outputs.out1[0] == "test" and outputs.out1[0] == "test"
44284419

44294420

44304421
@pytest.mark.timeout(40)
4431-
def test_inner_outer_wf_duplicate(tmp_path: Path):
4422+
def test_inner_outer_wf_duplicate(plugin_parallel, tmp_path: Path):
44324423
"""checking if the execution gets stuck if there is an inner and outer workflows
44334424
that run two nodes with the exact same inputs.
44344425
"""
@@ -4466,13 +4457,13 @@ def OuterWf(start_number, task_name, dummy):
44664457
["start_number", "task_name"], start_number=start_list, task_name=task_list
44674458
)
44684459

4469-
with Submitter(worker="cf") as sub:
4460+
with Submitter(worker=plugin_parallel) as sub:
44704461
res = sub(test_outer)
44714462

44724463
assert res.outputs.res2[0] == 23 and res.outputs.res2[1] == 23
44734464

44744465

4475-
def test_rerun_errored(tmp_path, capfd):
4466+
def test_rerun_errored(plugin_parallel, tmp_path, capfd):
44764467
"""Test rerunning a workflow containing errors.
44774468
Only the errored tasks and workflow should be rerun"""
44784469

@@ -4498,11 +4489,11 @@ def WorkyPassOdds(x):
44984489
print("Starting run 1")
44994490
with pytest.raises(RuntimeError):
45004491
# Must be cf to get the error from all tasks, otherwise will only get the first error
4501-
worky(worker="cf", cache_dir=tmp_path, n_procs=5)
4492+
worky(worker=plugin_parallel, cache_dir=tmp_path, n_procs=5)
45024493

45034494
print("Starting run 2")
45044495
with pytest.raises(RuntimeError):
4505-
worky(worker="cf", cache_dir=tmp_path, n_procs=5)
4496+
worky(worker=plugin_parallel, cache_dir=tmp_path, n_procs=5)
45064497

45074498
out, err = capfd.readouterr()
45084499
stdout_lines = out.splitlines()

0 commit comments

Comments
 (0)