Skip to content

Commit d7cd987

Browse files
committed
Merge branch 'master' of github.com:chasejohnson3/pydra
2 parents 0efcc88 + 447f34b commit d7cd987

File tree

7 files changed

+137
-95
lines changed

7 files changed

+137
-95
lines changed

.github/workflows/testsingularity.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ jobs:
3434
- name: Build
3535
run: |
3636
cd singularity
37-
./mconfig --without-suid -p ${{ runner.tool_cache }}/singularity/${{ env.RELEASE_VERSION }}/x64
37+
./mconfig --without-suid -p /usr/local/
3838
make -C ./builddir
3939
sudo make -C ./builddir install
4040
cd ..
4141
- name: Echo singularity version
4242
run: |
4343
echo ${{ github.ref }}
44-
${{ runner.tool_cache }}/singularity/${{ env.RELEASE_VERSION }}/x64/bin/singularity --version
44+
singularity --version
4545
4646
4747
- name: Set up Python ${{ matrix.python-version }}

pydra/engine/core.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,16 @@ def checksum_states(self, state_index=None):
274274
key.split(".")[1],
275275
getattr(inputs_copy, key.split(".")[1])[ind],
276276
)
277+
# setting files_hash again in case it was cleaned by setting specific element
278+
# that might be important for outer splitter of input variable with big files
279+
# the file can be changed with every single index even if there are only two files
280+
inputs_copy.files_hash = self.inputs.files_hash
277281
input_hash = inputs_copy.hash
282+
# updating self.inputs.files_hash, so big files hashes
283+
# doesn't have to be recompute for the next element
284+
for key, val in inputs_copy.files_hash.items():
285+
if val:
286+
self.inputs.files_hash[key].update(val)
278287
if is_workflow(self):
279288
con_hash = hash_function(self._connections)
280289
hash_list = [input_hash, con_hash]
@@ -392,26 +401,28 @@ def output_dir(self):
392401
return [self._cache_dir / checksum for checksum in self.checksum_states()]
393402
return self._cache_dir / self.checksum
394403

395-
def __call__(self, submitter=None, plugin=None, rerun=False, **kwargs):
404+
def __call__(
405+
self, submitter=None, plugin=None, plugin_kwargs=None, rerun=False, **kwargs
406+
):
396407
"""Make tasks callable themselves."""
397408
from .submitter import Submitter
398409

399410
if submitter and plugin:
400411
raise Exception("Specify submitter OR plugin, not both")
401-
plugin = plugin or self.plugin
402-
if plugin:
403-
submitter = Submitter(plugin=plugin)
404-
elif self.state:
405-
submitter = Submitter()
412+
elif submitter:
413+
pass
414+
# if there is plugin provided or the task is a Workflow or has a state,
415+
# the submitter will be created using provided plugin, self.plugin or "cf"
416+
elif plugin or self.state or is_workflow(self):
417+
plugin = plugin or self.plugin or "cf"
418+
if plugin_kwargs is None:
419+
plugin_kwargs = {}
420+
submitter = Submitter(plugin=plugin, **plugin_kwargs)
406421

407422
if submitter:
408423
with submitter as sub:
409424
res = sub(self)
410-
else:
411-
if is_workflow(self):
412-
raise NotImplementedError(
413-
"TODO: linear workflow execution - assign submitter or plugin for now"
414-
)
425+
else: # tasks without state could be run without a submitter
415426
res = self._run(rerun=rerun, **kwargs)
416427
return res
417428

pydra/engine/tests/test_numpy_examples.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ def arrayout(val):
1717
return np.array([val, val])
1818

1919

20-
def test_multiout(plugin, tmpdir):
20+
def test_multiout(tmpdir):
2121
""" testing a simple function that returns a numpy array"""
2222
wf = Workflow("wf", input_spec=["val"], val=2)
2323
wf.add(arrayout(name="mo", val=wf.lzin.val))
2424

2525
wf.set_output([("array", wf.mo.lzout.b)])
2626
wf.cache_dir = tmpdir
2727

28-
with Submitter(plugin=plugin, n_procs=2) as sub:
28+
with Submitter(plugin="cf", n_procs=2) as sub:
2929
sub(runnable=wf)
3030

3131
results = wf.result(return_inputs=True)
@@ -34,7 +34,7 @@ def test_multiout(plugin, tmpdir):
3434
assert np.array_equal(results[1].output.array, np.array([2, 2]))
3535

3636

37-
def test_multiout_st(plugin, tmpdir):
37+
def test_multiout_st(tmpdir):
3838
""" testing a simple function that returns a numpy array, adding splitter"""
3939
wf = Workflow("wf", input_spec=["val"], val=[0, 1, 2])
4040
wf.add(arrayout(name="mo", val=wf.lzin.val))
@@ -43,7 +43,7 @@ def test_multiout_st(plugin, tmpdir):
4343
wf.set_output([("array", wf.mo.lzout.b)])
4444
wf.cache_dir = tmpdir
4545

46-
with Submitter(plugin=plugin, n_procs=2) as sub:
46+
with Submitter(plugin="cf", n_procs=2) as sub:
4747
sub(runnable=wf)
4848

4949
results = wf.result(return_inputs=True)

pydra/engine/tests/test_singularity.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
shutil.which("singularity") is None, reason="no singularity available"
1818
)
1919

20+
need_slurm = pytest.mark.skipif(
21+
not bool(shutil.which("sbatch")), reason="no singularity available"
22+
)
23+
2024

2125
@need_singularity
2226
def test_singularity_1_nosubm(tmpdir):
@@ -255,6 +259,26 @@ def test_singularity_st_3(plugin, tmpdir):
255259
assert "Ubuntu" in res[3].output.stdout
256260

257261

262+
@need_singularity
263+
@need_slurm
264+
@pytest.mark.xfail(
265+
reason="slurm can complain if the number of submitted jobs exceeds the limit"
266+
)
267+
@pytest.mark.parametrize("n", [10, 50, 100])
268+
def test_singularity_st_4(tmpdir, n):
269+
""" splitter over args (checking bigger splitters if slurm available)"""
270+
args_n = list(range(n))
271+
image = "library://sylabsed/linux/alpine"
272+
singu = SingularityTask(
273+
name="singu", executable="echo", image=image, cache_dir=tmpdir, args=args_n
274+
).split("args")
275+
assert singu.state.splitter == "singu.args"
276+
res = singu(plugin="slurm")
277+
assert "1" in res[1].output.stdout
278+
assert str(n - 1) in res[-1].output.stdout
279+
assert res[0].output.return_code == res[1].output.return_code == 0
280+
281+
258282
@need_singularity
259283
def test_wf_singularity_1(plugin, tmpdir):
260284
""" a workflow with two connected task

pydra/engine/tests/test_submitter.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,30 @@ def sleep_add_one(x):
2323

2424
def test_callable_wf(plugin, tmpdir):
2525
wf = gen_basic_wf()
26+
res = wf()
27+
assert res.output.out == 9
28+
del wf, res
2629

27-
with pytest.raises(NotImplementedError):
28-
wf()
29-
30+
# providing plugin
31+
wf = gen_basic_wf()
3032
res = wf(plugin="cf")
3133
assert res.output.out == 9
3234
del wf, res
3335

36+
# providing plugin_kwargs
3437
wf = gen_basic_wf()
35-
wf.cache_dir = tmpdir
38+
res = wf(plugin="cf", plugin_kwargs={"n_procs": 2})
39+
assert res.output.out == 9
40+
del wf, res
41+
42+
# providing wrong plugin_kwargs
43+
wf = gen_basic_wf()
44+
with pytest.raises(TypeError, match="an unexpected keyword argument"):
45+
wf(plugin="cf", plugin_kwargs={"sbatch_args": "-N2"})
3646

47+
# providing submitter
48+
wf = gen_basic_wf()
49+
wf.cache_dir = tmpdir
3750
sub = Submitter(plugin)
3851
res = wf(submitter=sub)
3952
assert res.output.out == 9

0 commit comments

Comments
 (0)