Skip to content

Commit 6f60ae1

Browse files
committed
debugged advanced execution tutorial
1 parent 128a3e9 commit 6f60ae1

File tree

5 files changed

+109
-124
lines changed

5 files changed

+109
-124
lines changed

new-docs/source/tutorial/2-advanced-execution.ipynb

Lines changed: 79 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111
"executed (e.g. on the cloud, on a HPC cluster, ...). This tutorial steps you through\n",
1212
"some of the available options for executing a task.\n",
1313
"\n",
14-
"[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/nipype/pydra-tutorial/develop/notebooks/tutorial/advanced_execution.ipynb)"
14+
"[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/nipype/pydra-tutorial/develop/notebooks/tutorial/advanced_execution.ipynb)\n",
15+
"\n",
16+
"Remember that before attempting to run multi-process code in Jupyter notebooks, the\n",
17+
"following snippet must be called"
1518
]
1619
},
1720
{
@@ -30,20 +33,8 @@
3033
"source": [
3134
"## Submitter\n",
3235
"\n",
33-
"If you want to access a richer `Result` object you can use a Submitter object to execute the following task"
34-
]
35-
},
36-
{
37-
"cell_type": "code",
38-
"execution_count": null,
39-
"metadata": {},
40-
"outputs": [],
41-
"source": [
42-
"from pydra.design import python\n",
43-
"\n",
44-
"@python.define\n",
45-
"def TenToThePower(p: int) -> int:\n",
46-
" return 10 ** p"
36+
"If you want to access a richer `Result` object you can use a Submitter object to initiate\n",
37+
"the task execution. For example, using the `TenToThePower` task from the testing package"
4738
]
4839
},
4940
{
@@ -53,6 +44,8 @@
5344
"outputs": [],
5445
"source": [
5546
"from pydra.engine.submitter import Submitter\n",
47+
"from pydra.tasks.testing import TenToThePower\n",
48+
"\n",
5649
"\n",
5750
"ten_to_the_power = TenToThePower(p=3)\n",
5851
"\n",
@@ -110,7 +103,11 @@
110103
"class itself. Additional parameters can be passed to the worker initialisation as keyword\n",
111104
"arguments to the execution call. For example, if we wanted to run five tasks using the\n",
112105
"ConcurentFutures worker but only use three CPUs, we can pass `n_procs=3` to the execution\n",
113-
"call."
106+
"call.\n",
107+
"\n",
108+
"Remember that when calling multi-process code in a top level script the call must be\n",
109+
"enclosed within a `if __name__ == \"__main__\"` block to allow the worker processes to\n",
110+
"import the module without re-executing it."
114111
]
115112
},
116113
{
@@ -119,14 +116,16 @@
119116
"metadata": {},
120117
"outputs": [],
121118
"source": [
122-
"from pydra.design import python\n",
119+
"import tempfile\n",
120+
"\n",
121+
"cache_root = tempfile.mkdtemp()\n",
123122
"\n",
124123
"if __name__ == \"__main__\":\n",
125124
"\n",
126125
" ten_to_the_power = TenToThePower().split(p=[1, 2, 3, 4, 5])\n",
127126
"\n",
128127
" # Run the 5 tasks in parallel split across 3 processes\n",
129-
" outputs = ten_to_the_power(worker=\"cf\", n_procs=3)\n",
128+
" outputs = ten_to_the_power(worker=\"cf\", n_procs=3, cache_dir=cache_root)\n",
130129
"\n",
131130
" p1, p2, p3, p4, p5 = outputs.out\n",
132131
"\n",
@@ -168,7 +167,9 @@
168167
"as long as exactly the hashes of the inputs provided to the task are the same. Here we\n",
169168
"go through some of the practicalities of this caching and hashing (see\n",
170169
"[Caches and hashes](../explanation/hashing-caching.html) for more details and issues\n",
171-
"to consider)."
170+
"to consider).\n",
171+
"\n",
172+
"First we import the functions and classes we need andcreate some sample NIfTI files to work with"
172173
]
173174
},
174175
{
@@ -179,37 +180,18 @@
179180
"source": [
180181
"from pathlib import Path\n",
181182
"import tempfile\n",
183+
"from pprint import pprint\n",
182184
"from fileformats.medimage import Nifti1\n",
183185
"from pydra.engine.submitter import Submitter\n",
184186
"from pydra.tasks.mrtrix3.v3_0 import MrGrid\n",
185187
"\n",
186-
"# Make directory filled with nifti files\n",
188+
"# Make a temporary directory\n",
187189
"test_dir = Path(tempfile.mkdtemp())\n",
188190
"nifti_dir = test_dir / \"nifti\"\n",
189191
"nifti_dir.mkdir()\n",
190-
"for i in range(10):\n",
191-
" Nifti1.sample(nifti_dir, seed=i)\n",
192-
"\n",
193-
"# Instantiate the task definition, \"splitting\" over all NIfTI files in the test directory\n",
194-
"# by splitting the \"input\" input field over all files in the directory\n",
195-
"mrgrid = MrGrid(operation=\"regrid\", voxel=(0.5, 0.5, 0.5)).split(\n",
196-
" in_file=nifti_dir.iterdir()\n",
197-
")\n",
198-
"\n",
199-
"# Run the task to resample all NIfTI files\n",
200-
"outputs = mrgrid()\n",
201-
"\n",
202-
"# Create a new custom directory\n",
203-
"cache_dir = test_dir / \"cache\"\n",
204-
"cache_dir.mkdir()\n",
205-
"\n",
206-
"submitter = Submitter(cache_dir=cache_dir)\n",
207-
"\n",
208-
"# Run the task to resample all NIfTI files with different voxel sizes\n",
209-
"with submitter:\n",
210-
" result1 = submitter(mrgrid)\n",
211192
"\n",
212-
"print(result1)"
193+
"# Generate some random NIfTI files to work with\n",
194+
"nifti_files = [Nifti1.sample(nifti_dir, seed=i) for i in range(10)]"
213195
]
214196
},
215197
{
@@ -243,21 +225,20 @@
243225
"\n",
244226
"mrgrid_varying_vox = MrGrid(operation=\"regrid\").split(\n",
245227
" (\"in_file\", \"voxel\"),\n",
246-
" in_file=nifti_dir.iterdir(),\n",
228+
" in_file=nifti_files,\n",
247229
" voxel=VOX_SIZES,\n",
248230
")\n",
249231
"\n",
250232
"submitter = Submitter(cache_dir=test_dir / \"cache\")\n",
251233
"\n",
252234
"\n",
253-
"# Result from previous run is reused as the task and inputs are identical\n",
254235
"with submitter:\n",
255236
" result1 = submitter(mrgrid_varying_vox)\n",
256237
"\n",
257238
"\n",
258239
"mrgrid_varying_vox2 = MrGrid(operation=\"regrid\").split(\n",
259240
" (\"in_file\", \"voxel\"),\n",
260-
" in_file=nifti_dir.iterdir(),\n",
241+
" in_file=nifti_files,\n",
261242
" voxel=copy(VOX_SIZES),\n",
262243
")\n",
263244
"\n",
@@ -298,30 +279,30 @@
298279
"outputs": [],
299280
"source": [
300281
"# Rename a NIfTI file within the test directory\n",
301-
"first_file = next(nifti_dir.iterdir())\n",
302-
"new_name = first_file.with_name(\"first.nii\")\n",
303-
"first_file.rename(new_name)\n",
282+
"nifti_files[0] = Nifti1(\n",
283+
" nifti_files[0].fspath.rename(nifti_files[0].fspath.with_name(\"first.nii\"))\n",
284+
")\n",
304285
"\n",
305286
"mrgrid_varying_vox3 = MrGrid(operation=\"regrid\").split(\n",
306287
" (\"in_file\", \"voxel\"),\n",
307-
" in_file=nifti_dir.iterdir(),\n",
288+
" in_file=nifti_files,\n",
308289
" voxel=VOX_SIZES,\n",
309290
")\n",
310291
"\n",
311-
"# Result from previous run is reused as the task and inputs are identical\n",
292+
"# Result from previous run is reused as contents of the files have not changed, despite\n",
293+
"# the file names changing\n",
312294
"with submitter:\n",
313-
" result3 = submitter(mrgrid_varying_vox3)\n",
295+
" result4 = submitter(mrgrid_varying_vox3)\n",
314296
"\n",
315-
"assert result3.output_dir == result1.output_dir\n",
297+
"assert result4.output_dir == result1.output_dir\n",
316298
"\n",
317299
"# Replace the first NIfTI file with a new file\n",
318-
"new_name.unlink()\n",
319-
"Nifti1.sample(nifti_dir, seed=100)\n",
300+
"nifti_files[0] = Nifti1.sample(nifti_dir, seed=100)\n",
320301
"\n",
321302
"# Update the in_file input field to include the new file\n",
322303
"mrgrid_varying_vox4 = MrGrid(operation=\"regrid\").split(\n",
323304
" (\"in_file\", \"voxel\"),\n",
324-
" in_file=nifti_dir.iterdir(),\n",
305+
" in_file=nifti_files,\n",
325306
" voxel=VOX_SIZES,\n",
326307
")\n",
327308
"\n",
@@ -333,19 +314,14 @@
333314
"assert result4.output_dir != result1.output_dir"
334315
]
335316
},
336-
{
337-
"cell_type": "markdown",
338-
"metadata": {},
339-
"source": []
340-
},
341317
{
342318
"cell_type": "markdown",
343319
"metadata": {},
344320
"source": [
345-
"## Environments\n",
321+
"## Environments and hooks\n",
346322
"\n",
347323
"For shell tasks, it is possible to specify that the command runs within a specific\n",
348-
"software environment, such as those provided by software containers (e.g. Docker or Apptainer).\n",
324+
"software environment, such as those provided by software containers (e.g. Docker or Singularity/Apptainer).\n",
349325
"This is down by providing the environment to the submitter/execution call,"
350326
]
351327
},
@@ -371,7 +347,7 @@
371347
"outputs = mrgrid(environment=Docker(image=\"mrtrix3/mrtrix3\", tag=\"latest\"))\n",
372348
"\n",
373349
"# Print the locations of the output files\n",
374-
"print(\"\\n\".join(str(p) for p in outputs.out_file))"
350+
"pprint(outputs.out_file)"
375351
]
376352
},
377353
{
@@ -381,31 +357,61 @@
381357
"Of course for this to work Docker needs to work and be configured for\n",
382358
"[sudo-less execution](https://docs.docker.com/engine/install/linux-postinstall/).\n",
383359
"See [Containers and Environments](../explanation/environments.rst) for more details on\n",
384-
"how to utilise containers and add support for other software environments."
360+
"how to utilise containers and add support for other software environments.\n",
361+
"\n",
362+
"It is also possible to specify functions to run at hooks that are immediately before and after\n",
363+
"the task is executed by passing a `pydra.engine.spec.TaskHooks` object to the `hooks`\n",
364+
"keyword arg. The callable should take the `pydra.engine.core.Task` object as its only\n",
365+
"argument and return None. The available hooks to attach functions are:\n",
366+
"\n",
367+
"* pre_run: before the task cache directory is created\n",
368+
"* pre_run_task: after the cache directory has been created and the inputs resolved but before the task is executed\n",
369+
"* post_run_task: after the task has been run and the outputs collected\n",
370+
"* post_run: after the cache directory has been finalised\n",
371+
"\n",
372+
"\n",
373+
"QUESTION: What are these hooks intended for? Should the post_run_task hook be run before the outputs have been\n",
374+
"collected?"
385375
]
386376
},
387377
{
388-
"cell_type": "markdown",
378+
"cell_type": "code",
379+
"execution_count": null,
389380
"metadata": {},
381+
"outputs": [],
390382
"source": [
391-
"## Provenance and auditing\n",
383+
"from pydra.engine.core import Task\n",
384+
"from pydra.engine.specs import TaskHooks, Result\n",
385+
"import os\n",
386+
"import platform\n",
387+
"\n",
388+
"def notify_task_completion(task: Task, result: Result):\n",
389+
" # Print a message to the terminal\n",
390+
" print(f\"Task completed! Results are stored in {str(task.output_dir)!r}\")\n",
391+
"\n",
392+
" # Platform-specific notifications\n",
393+
" if platform.system() == \"Darwin\": # macOS\n",
394+
" os.system('osascript -e \\'display notification \"Task has completed successfully!\" with title \"Task Notification\"\\'')\n",
395+
" elif platform.system() == \"Linux\": # Linux\n",
396+
" os.system('notify-send \"Task Notification\" \"Task has completed successfully!\"')\n",
397+
" elif platform.system() == \"Windows\": # Windows\n",
398+
" os.system('msg * \"Task has completed successfully!\"')\n",
392399
"\n",
393-
"Work in progress..."
400+
"# Run the task to resample all NIfTI files\n",
401+
"outputs = mrgrid(hooks=TaskHooks(post_run=notify_task_completion), cache_dir=tempfile.mkdtemp())\n",
402+
"\n",
403+
"# Print the locations of the output files\n",
404+
"pprint(outputs.out_file)"
394405
]
395406
},
396407
{
397408
"cell_type": "markdown",
398409
"metadata": {},
399410
"source": [
400-
"## Hooks\n",
411+
"## Provenance and auditing\n",
401412
"\n",
402413
"Work in progress..."
403414
]
404-
},
405-
{
406-
"cell_type": "markdown",
407-
"metadata": {},
408-
"source": []
409415
}
410416
],
411417
"metadata": {

new-docs/source/tutorial/tst.py

Lines changed: 21 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,21 @@
1-
from pathlib import Path
2-
from tempfile import mkdtemp
3-
from pprint import pprint
4-
import json
5-
from pydra.utils.hash import hash_function
6-
from pydra.tasks.mrtrix3.v3_0 import MrGrid
7-
from fileformats.medimage import Nifti1
8-
9-
JSON_CONTENTS = {"a": True, "b": "two", "c": 3, "d": [7, 0.55, 6]}
10-
11-
test_dir = Path(mkdtemp())
12-
cache_root = Path(mkdtemp())
13-
json_file = test_dir / "test.json"
14-
with open(json_file, "w") as f:
15-
json.dump(JSON_CONTENTS, f)
16-
17-
nifti_dir = test_dir / "nifti"
18-
nifti_dir.mkdir()
19-
20-
for i in range(10):
21-
Nifti1.sample(nifti_dir, seed=i) # Create a dummy NIfTI file in the dest. directory
22-
23-
niftis = list(nifti_dir.iterdir())
24-
pprint([hash_function(nifti) for nifti in niftis])
25-
26-
mrgrid_varying_vox_sizes = MrGrid(operation="regrid").split(
27-
("in_file", "voxel"),
28-
in_file=niftis,
29-
# Define a list of voxel sizes to resample the NIfTI files to,
30-
# the list must be the same length as the list of NIfTI files
31-
voxel=[
32-
(1.0, 1.0, 1.0),
33-
(1.0, 1.0, 1.0),
34-
(1.0, 1.0, 1.0),
35-
(0.5, 0.5, 0.5),
36-
(0.75, 0.75, 0.75),
37-
(0.5, 0.5, 0.5),
38-
(0.5, 0.5, 0.5),
39-
(1.0, 1.0, 1.0),
40-
(1.25, 1.25, 1.25),
41-
(1.25, 1.25, 1.25),
42-
],
43-
)
44-
45-
outputs = mrgrid_varying_vox_sizes(cache_dir=cache_root)
46-
47-
pprint(outputs.out_file)
1+
from pydra.design import python
2+
import shutil
3+
4+
5+
@python.define
6+
def TenToThePower(p: int) -> int:
7+
return 10**p
8+
9+
10+
if __name__ == "__main__":
11+
12+
shutil.rmtree("/Users/tclose/Library/Caches/pydra/0.25.dev190+g6a726571/run-cache")
13+
14+
ten_to_the_power = TenToThePower().split(p=[1, 2, 3, 4, 5])
15+
16+
# Run the 5 tasks in parallel split across 3 processes
17+
outputs = ten_to_the_power(worker="cf", n_procs=3)
18+
19+
p1, p2, p3, p4, p5 = outputs.out
20+
21+
print(f"10^5 = {p5}")

pydra/engine/specs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ def __call__(
261261
raise
262262
if result.errored:
263263
if isinstance(self, WorkflowDef) or self._splitter:
264-
raise RuntimeError(f"Workflow {self} failed with errors:")
264+
raise RuntimeError(f"Workflow {self} failed with errors")
265265
else:
266266
errors = result.errors
267267
raise RuntimeError(

pydra/engine/submitter.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,9 @@ def __init__(
114114
)
115115
if cache_dir is None:
116116
cache_dir = default_run_cache_dir
117-
cache_dir.mkdir(parents=True, exist_ok=True)
118-
elif not cache_dir.exists():
119-
raise ValueError(f"Cache directory {str(cache_dir)!r} does not exist")
117+
cache_dir = Path(cache_dir).resolve()
118+
cache_dir.mkdir(parents=True, exist_ok=True)
119+
120120
self.cache_dir = cache_dir
121121
self.cache_locations = cache_locations
122122
self.environment = environment

pydra/tasks/testing/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,8 @@ def SafeDivisionWorkflow(a: float, b: float, denominator: float) -> float:
7575
divide = workflow.add(SafeDivide(x=add.out, y=denominator))
7676
subtract = workflow.add(Subtract(x=divide.out, y=b))
7777
return subtract.out
78+
79+
80+
@python.define
81+
def TenToThePower(p: int) -> int:
82+
return 10**p

0 commit comments

Comments
 (0)