Skip to content

Commit ff18811

Browse files
committed
update daskworker
1 parent 29d3d1f commit ff18811

File tree

2 files changed

+51
-3
lines changed

2 files changed

+51
-3
lines changed

.github/workflows/testdask.yml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
name: Dask Tests
2+
3+
on:
4+
push:
5+
branches:
6+
- master
7+
pull_request:
8+
9+
jobs:
10+
test:
11+
name: Test
12+
runs-on: ubuntu-latest
13+
strategy:
14+
matrix:
15+
python-version: [3.9, 3.10, 3.11]
16+
fail-fast: false
17+
18+
steps:
19+
- name: Set up Python ${{ matrix.python-version }}
20+
uses: actions/setup-python@v2
21+
with:
22+
python-version: ${{ matrix.python-version }}
23+
24+
- name: Install dependencies
25+
run: |
26+
python -m pip install --upgrade pip
27+
28+
- name: Checkout Pydra repo
29+
uses: actions/checkout@v3
30+
with:
31+
repository: ${{ github.repository }}
32+
33+
- name: Install pydra with Dask and test dependencies
34+
run: |
35+
pip install -e ".[test,dask]"
36+
37+
- name: Run tests
38+
run: |
39+
pytest -v --dask pydra/engine --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml
40+
41+
- name: Upload to codecov
42+
run: codecov -f cov.xml -F unittests -e GITHUB_WORKFLOW

pydra/engine/workers.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -879,8 +879,14 @@ async def exec_dask(self, runnable, rerun=False):
879879
from dask.distributed import Client
880880

881881
self.client = await Client(**self.client_args, asynchronous=True)
882-
future = self.client.submit(runnable._run, rerun)
883-
result = await future
882+
883+
if isinstance(runnable, TaskBase):
884+
future = self.client.submit(runnable._run, rerun)
885+
result = await future
886+
else: # it could be tuple that includes pickle files with tasks and inputs
887+
ind, task_main_pkl, task_orig = runnable
888+
future = self.client.submit(load_and_run, task_main_pkl, ind, rerun)
889+
result = await future
884890
return result
885891

886892
def close(self):
@@ -894,4 +900,4 @@ def close(self):
894900
"slurm": SlurmWorker,
895901
"dask": DaskWorker,
896902
"sge": SGEWorker,
897-
}
903+
}

0 commit comments

Comments
 (0)