Skip to content

Commit 82c7886

Browse files
authored
Merge pull request #242 from satra/enh-dask
enh: add support for dask
2 parents 9d1e27f + a702e3f commit 82c7886

15 files changed

+197
-384
lines changed

.travis.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ env:
3333
# Useful for testing un-released upstream fixes
3434
matrix:
3535
include:
36+
- python: 3.7
37+
env:
38+
- INSTALL_TYPE="develop"
39+
- CHECK_TYPE="test_dask"
3640
- os: osx
3741
osx_image: xcode11.2
3842
language: generic

ci/none.sh

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,20 @@ function travis_before_script {
4040
# extras don't seem possible with setup.py install, so switch to pip
4141
pip install ".[test]"
4242
fi
43+
elif [ "$CHECK_TYPE" = "test_dask" ]; then
44+
if [ "$INSTALL_TYPE" = "develop" ]; then
45+
pip install -e ".[dask]"
46+
fi
4347
elif [ "$CHECK_TYPE" = "style" ]; then
4448
pip install black==19.3b0
4549
fi
4650
}
4751

4852
function travis_script {
4953
if [ "$CHECK_TYPE" = "test" ]; then
50-
pytest -vs -n auto --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml --doctest-modules pydra tutorial
54+
pytest -vs -n auto --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml --doctest-modules pydra
55+
elif [ "$CHECK_TYPE" = "test_dask" ]; then
56+
pytest -vs -n auto --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml --doctest-modules --dask pydra/engine
5157
elif [ "$CHECK_TYPE" = "style" ]; then
5258
black --check pydra tools setup.py
5359
fi

pydra/conftest.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import shutil
2+
3+
4+
def pytest_addoption(parser):
5+
parser.addoption("--dask", action="store_true", help="run all combinations")
6+
7+
8+
def pytest_generate_tests(metafunc):
9+
if "plugin_dask_opt" in metafunc.fixturenames:
10+
if bool(shutil.which("sbatch")):
11+
Plugins = ["cf", "slurm"]
12+
else:
13+
Plugins = ["cf"]
14+
if metafunc.config.getoption("dask"):
15+
Plugins.append("dask")
16+
metafunc.parametrize("plugin_dask_opt", Plugins)
17+
18+
if "plugin" in metafunc.fixturenames:
19+
if metafunc.config.getoption("dask"):
20+
Plugins = []
21+
elif bool(shutil.which("sbatch")):
22+
Plugins = ["cf", "slurm"]
23+
else:
24+
Plugins = ["cf"]
25+
metafunc.parametrize("plugin", Plugins)

pydra/engine/helpers.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,8 @@ def execute(cmd, strip=False):
338338
strip : :obj:`bool`
339339
TODO
340340
341+
"""
342+
rc, stdout, stderr = read_and_display(*cmd, strip=strip)
341343
"""
342344
loop = get_open_loop()
343345
if loop.is_running():
@@ -346,6 +348,7 @@ def execute(cmd, strip=False):
346348
rc, stdout, stderr = loop.run_until_complete(
347349
read_and_display_async(*cmd, strip=strip)
348350
)
351+
"""
349352
return rc, stdout, stderr
350353

351354

pydra/engine/submitter.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Handle execution backends."""
22
import asyncio
3-
from .workers import SerialWorker, ConcurrentFuturesWorker, SlurmWorker
3+
from .workers import SerialWorker, ConcurrentFuturesWorker, SlurmWorker, DaskWorker
44
from .core import is_workflow
55
from .helpers import get_open_loop
66

@@ -33,6 +33,8 @@ def __init__(self, plugin="cf", **kwargs):
3333
self.worker = ConcurrentFuturesWorker(**kwargs)
3434
elif self.plugin == "slurm":
3535
self.worker = SlurmWorker(**kwargs)
36+
elif self.plugin == "dask":
37+
self.worker = DaskWorker(**kwargs)
3638
else:
3739
raise Exception("plugin {} not available".format(self.plugin))
3840
self.worker.loop = self.loop
@@ -176,9 +178,9 @@ def close(self):
176178
Do not close previously running loop.
177179
178180
"""
181+
self.worker.close()
179182
if self._own_loop:
180183
self.loop.close()
181-
self.worker.close()
182184

183185

184186
def get_runnable_tasks(graph):

pydra/engine/tests/test_boutiques.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,21 @@
1111

1212
need_bosh_docker = pytest.mark.skipif(
1313
shutil.which("docker") is None
14-
or sp.call(["docker", "info"] or sp.call(["bosh", "version"])),
14+
or sp.call(["docker", "info"])
15+
or sp.call(["which", "bosh"]),
1516
reason="requires docker and bosh",
1617
)
1718

18-
if bool(shutil.which("sbatch")):
19-
Plugins = ["cf", "slurm"]
20-
else:
21-
Plugins = ["cf"]
22-
2319
Infile = Path(__file__).resolve().parent / "data_tests" / "test.nii.gz"
2420

2521

2622
@no_win
2723
@need_bosh_docker
28-
@pytest.mark.flaky(reruns=2)
24+
@pytest.mark.flaky(reruns=2) # need for travis
2925
@pytest.mark.parametrize(
3026
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
3127
)
3228
@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter])
33-
@pytest.mark.parametrize("plugin", Plugins)
3429
def test_boutiques_1(maskfile, plugin, results_function):
3530
""" simple task to run fsl.bet using BoshTask"""
3631
btask = BoshTask(name="NA", zenodo_id="1482743")
@@ -99,7 +94,6 @@ def test_boutiques_spec_2():
9994
@pytest.mark.parametrize(
10095
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
10196
)
102-
@pytest.mark.parametrize("plugin", Plugins)
10397
def test_boutiques_wf_1(maskfile, plugin):
10498
""" wf with one task that runs fsl.bet using BoshTask"""
10599
wf = Workflow(name="wf", input_spec=["maskfile", "infile"])
@@ -130,7 +124,6 @@ def test_boutiques_wf_1(maskfile, plugin):
130124
@pytest.mark.parametrize(
131125
"maskfile", ["test_brain.nii.gz", "test_brain", "test_brain.nii"]
132126
)
133-
@pytest.mark.parametrize("plugin", Plugins)
134127
def test_boutiques_wf_2(maskfile, plugin):
135128
""" wf with two BoshTasks (fsl.bet and fsl.stats) and one ShellTask"""
136129
wf = Workflow(name="wf", input_spec=["maskfile", "infile"])

0 commit comments

Comments
 (0)