Skip to content

Commit 128a3e9

Browse files
committed
added avoid_clashes option to copy_nested_files to ensure names don't clash when copying files with the same name from multiple node outputs
1 parent 01d50e2 commit 128a3e9

File tree

3 files changed

+63
-26
lines changed

3 files changed

+63
-26
lines changed

new-docs/source/tutorial/1-getting-started.ipynb

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
"source": [
3131
"from pathlib import Path\n",
3232
"from tempfile import mkdtemp\n",
33+
"from pprint import pprint\n",
3334
"import json\n",
3435
"\n",
3536
"JSON_CONTENTS = {'a': True, 'b': 'two', 'c': 3, 'd': [7, 0.55, 6]}\n",
@@ -160,7 +161,7 @@
160161
"outputs = mrgrid()\n",
161162
"\n",
162163
"# Print the locations of the output files\n",
163-
"print(\"\\n\".join(str(p) for p in outputs.out_file))"
164+
"pprint(outputs.out_file)"
164165
]
165166
},
166167
{
@@ -184,8 +185,6 @@
184185
"metadata": {},
185186
"outputs": [],
186187
"source": [
187-
"\n",
188-
"\n",
189188
"mrgrid_varying_vox_sizes = MrGrid(operation=\"regrid\").split(\n",
190189
" (\"in_file\", \"voxel\"),\n",
191190
" in_file=nifti_dir.iterdir(),\n",
@@ -205,7 +204,9 @@
205204
" ],\n",
206205
")\n",
207206
"\n",
208-
"print(\"\\n\".join(str(p) for p in outputs.out_file))"
207+
"outputs = mrgrid_varying_vox_sizes()\n",
208+
"\n",
209+
"pprint(outputs.out_file)"
209210
]
210211
},
211212
{
@@ -277,7 +278,9 @@
277278
"metadata": {},
278279
"outputs": [],
279280
"source": [
280-
"outputs = mrgrid(cache_root=Path(\"~/pydra-cache\").expanduser())"
281+
"outputs = mrgrid(cache_dir=Path(\"~/pydra-cache\").expanduser())\n",
282+
"\n",
283+
"pprint(outputs)"
281284
]
282285
},
283286
{
@@ -296,7 +299,7 @@
296299
"source": [
297300
"from pydra.utils import default_run_cache_dir\n",
298301
"\n",
299-
"my_cache_dir = Path(\"~/pydra-cache\").expanduser()\n",
302+
"my_cache_dir = Path(\"~/new-pydra-cache\").expanduser()\n",
300303
"my_cache_dir.mkdir(exist_ok=True)\n",
301304
"\n",
302305
"outputs = mrgrid(\n",

new-docs/source/tutorial/tst.py

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,47 @@
1-
from pydra.tasks.testing import UnsafeDivisionWorkflow
2-
from pydra.engine.submitter import Submitter
1+
from pathlib import Path
2+
from tempfile import mkdtemp
3+
from pprint import pprint
4+
import json
5+
from pydra.utils.hash import hash_function
6+
from pydra.tasks.mrtrix3.v3_0 import MrGrid
7+
from fileformats.medimage import Nifti1
38

4-
# This workflow will fail because we are trying to divide by 0
5-
wf = UnsafeDivisionWorkflow(a=10, b=5, denominator=2)
9+
JSON_CONTENTS = {"a": True, "b": "two", "c": 3, "d": [7, 0.55, 6]}
610

7-
if __name__ == "__main__":
8-
with Submitter(worker="cf", rerun=True) as sub:
9-
result = sub(wf)
11+
test_dir = Path(mkdtemp())
12+
cache_root = Path(mkdtemp())
13+
json_file = test_dir / "test.json"
14+
with open(json_file, "w") as f:
15+
json.dump(JSON_CONTENTS, f)
1016

17+
nifti_dir = test_dir / "nifti"
18+
nifti_dir.mkdir()
1119

12-
# from pydra.tasks.testing import UnsafeDivisionWorkflow
13-
# from pydra.engine.submitter import Submitter
20+
for i in range(10):
21+
Nifti1.sample(nifti_dir, seed=i) # Create a dummy NIfTI file in the dest. directory
1422

15-
# # This workflow will fail because we are trying to divide by 0
16-
# failing_workflow = UnsafeDivisionWorkflow(a=10, b=5).split(denominator=[3, 2, 0])
23+
niftis = list(nifti_dir.iterdir())
24+
pprint([hash_function(nifti) for nifti in niftis])
1725

18-
# if __name__ == "__main__":
19-
# with Submitter(worker="cf") as sub:
20-
# result = sub(failing_workflow)
26+
mrgrid_varying_vox_sizes = MrGrid(operation="regrid").split(
27+
("in_file", "voxel"),
28+
in_file=niftis,
29+
# Define a list of voxel sizes to resample the NIfTI files to,
30+
# the list must be the same length as the list of NIfTI files
31+
voxel=[
32+
(1.0, 1.0, 1.0),
33+
(1.0, 1.0, 1.0),
34+
(1.0, 1.0, 1.0),
35+
(0.5, 0.5, 0.5),
36+
(0.75, 0.75, 0.75),
37+
(0.5, 0.5, 0.5),
38+
(0.5, 0.5, 0.5),
39+
(1.0, 1.0, 1.0),
40+
(1.25, 1.25, 1.25),
41+
(1.25, 1.25, 1.25),
42+
],
43+
)
2144

22-
# if result.errored:
23-
# print("Workflow failed with errors:\n" + str(result.errors))
24-
# else:
25-
# print("Workflow completed successfully :)")
45+
outputs = mrgrid_varying_vox_sizes(cache_dir=cache_root)
46+
47+
pprint(outputs.out_file)

pydra/engine/helpers_file.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import subprocess as sp
1010
from contextlib import contextmanager
1111
import attr
12-
from fileformats.core import FileSet
12+
from fileformats.generic import FileSet
1313
from pydra.engine.helpers import is_lazy, attrs_values, list_fields
1414

1515

@@ -77,6 +77,10 @@ def copy_nested_files(
7777

7878
cache: ty.Dict[FileSet, FileSet] = {}
7979

80+
# Set to keep track of file paths that have already been copied
81+
# to allow FileSet.copy to avoid name clashes
82+
clashes_to_avoid = set()
83+
8084
def copy_fileset(fileset: FileSet):
8185
try:
8286
return cache[fileset]
@@ -89,7 +93,15 @@ def copy_fileset(fileset: FileSet):
8993
MountIndentifier.on_same_mount(p, dest_dir) for p in fileset.fspaths
9094
):
9195
supported -= FileSet.CopyMode.hardlink
92-
copied = fileset.copy(dest_dir=dest_dir, supported_modes=supported, **kwargs)
96+
cp_kwargs = {}
97+
98+
cp_kwargs.update(kwargs)
99+
copied = fileset.copy(
100+
dest_dir=dest_dir,
101+
supported_modes=supported,
102+
avoid_clashes=clashes_to_avoid, # this prevents fname clashes between filesets
103+
**kwargs,
104+
)
93105
cache[fileset] = copied
94106
return copied
95107

0 commit comments

Comments
 (0)