Skip to content

Commit dd606f8

Browse files
committed
debugging new syntax in unittests
1 parent bf11151 commit dd606f8

File tree

6 files changed

+152
-49
lines changed

6 files changed

+152
-49
lines changed

new-docs/source/tutorial/3-troubleshooting.ipynb

Lines changed: 91 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
"source": [
77
"# Troubleshooting\n",
88
"\n",
9-
"This tutorial steps through tecnhiques to identify errors and pipeline failures, and\n",
10-
"avoid common pitfalls."
9+
"This tutorial steps through tecnhiques to identify errors and pipeline failures, as well\n",
10+
"as avoid common pitfalls setting up executing over multiple processes."
1111
]
1212
},
1313
{
@@ -45,9 +45,9 @@
4545
"source": [
4646
"### Enclosing multi-process code within `if __name__ == \"__main__\"`\n",
4747
"\n",
48-
"If running a script that executes a workflow with the concurrent futures worker\n",
49-
"(i.e. `worker=\"cf\"`) on macOS or Windows, then the submissing/execution call needs to\n",
50-
"be enclosed within a `if __name__ == \"__main__\"` blocks, e.g."
48+
"When running multi-process Python code on macOS or Windows, as is the case when the \n",
49+
"concurrent futures worker is selected (i.e. `worker=\"cf\"`), then scripts that execute\n",
50+
"the forking code need to be enclosed within an `if __name__ == \"__main__\"` block, e.g."
5151
]
5252
},
5353
{
@@ -71,12 +71,24 @@
7171
"cell_type": "markdown",
7272
"metadata": {},
7373
"source": [
74-
"### Remove stray lockfiles\n",
74+
"This allows the secondary processes to import the script without executing it. Without\n",
75+
"such a block Pydra will lock up and not process the workflow. On Linux this is not an\n",
76+
"issue due to the way that processes are forked, but is good practice in any case for\n",
77+
"code portability."
78+
]
79+
},
80+
{
81+
"cell_type": "markdown",
82+
"metadata": {},
83+
"source": [
84+
"### Removing stray lockfiles\n",
7585
"\n",
76-
"During the execution of a task, a lockfile is generated to signify that a task is running.\n",
77-
"These lockfiles are released after a task completes, either successfully or with an error,\n",
78-
"within a *try/finally* block. However, if a task/workflow is terminated by an interactive\n",
79-
"debugger the finally block may not be executed causing stray lockfiles to hang around. This\n",
86+
"When a Pydra task is executed, a lockfile is generated to signify that the task is running.\n",
87+
"Other processes will wait for this lock to be released before attempting to access the\n",
88+
"tasks results. The lockfiles are automatically deleted after a task completes, either\n",
89+
"successfully or with an error, within a *try/finally* block so should run most of the time.\n",
90+
"However, if a task/workflow is terminated by an interactive\n",
91+
"debugger, the finally block may not be executed, leaving stray lockfiles. This\n",
8092
"can cause the Pydra to hang waiting for the lock to be released. If you suspect this to be\n",
8193
"an issue, and there are no other jobs running, then simply remove all lock files from your\n",
8294
"cache directory (e.g. `rm <your-run-cache-dir>/*.lock`) and re-submit your job.\n",
@@ -91,7 +103,7 @@
91103
"cell_type": "markdown",
92104
"metadata": {},
93105
"source": [
94-
"## Finding errors\n",
106+
"## Inspecting errors\n",
95107
"\n",
96108
"### Running in *debug* mode\n",
97109
"\n",
@@ -116,8 +128,9 @@
116128
"# This workflow will fail because we are trying to divide by 0\n",
117129
"wf = UnsafeDivisionWorkflow(a=10, b=5).split(denominator=[3, 2 ,0])\n",
118130
"\n",
119-
"with Submitter(worker=\"cf\") as sub:\n",
120-
" result = sub(wf)\n",
131+
"if __name__ == \"__main__\":\n",
132+
" with Submitter(worker=\"cf\") as sub:\n",
133+
" result = sub(wf)\n",
121134
" \n",
122135
"if result.errored:\n",
123136
" print(\"Workflow failed with errors:\\n\" + str(result.errors))\n",
@@ -129,7 +142,23 @@
129142
"cell_type": "markdown",
130143
"metadata": {},
131144
"source": [
132-
"Work in progress..."
145+
"The error pickle files can be loaded using the `cloudpickle` library, noting that it is\n",
146+
"important to use the same Python version to load the files that was used to run the Pydra\n",
147+
"workflow"
148+
]
149+
},
150+
{
151+
"cell_type": "code",
152+
"execution_count": null,
153+
"metadata": {},
154+
"outputs": [],
155+
"source": [
156+
"import cloudpickle as cp\n",
157+
"\n",
158+
"with open(\"<your-cache-root>/<task-cache-dir/_error.pklz\", \"rb\") as f:\n",
159+
" error = cp.load(f)\n",
160+
"\n",
161+
"print(error)"
133162
]
134163
},
135164
{
@@ -147,31 +176,69 @@
147176
"Currently in Pydra you need to step backwards through the tasks of the workflow, load\n",
148177
"the saved task object and inspect its inputs to find the preceding nodes. If any of the\n",
149178
"inputs that have been generated by previous nodes are not ok, then you should check the\n",
150-
"tasks that generated them in turn.\n",
179+
"tasks that generated them in turn. For file-based inputs, you should be able to find\n",
180+
"the path of the preceding task's cache directory from the provided file path. However,\n",
181+
"for non-file inputs you may need to exhaustively iterate through all the task dirs\n",
182+
"in your cache root to find the issue.\n",
151183
"\n",
152-
"For example, in the following example if we are not happy with the mask brain that has\n",
153-
"been generated, we can check the mask to see whether it looks sensible by first loading\n",
154-
"the apply mask task and then inspecting its inputs."
184+
"For example, in the following example workflow, if a divide by 0 occurs within the division\n",
185+
"node of the workflow, then an `float('inf')` will be returned, which will then propagate\n",
186+
"through the workflow."
155187
]
156188
},
157189
{
158190
"cell_type": "code",
159-
"execution_count": null,
191+
"execution_count": 2,
160192
"metadata": {},
161-
"outputs": [],
162-
"source": []
193+
"outputs": [
194+
{
195+
"ename": "NameError",
196+
"evalue": "name 'Submitter' is not defined",
197+
"output_type": "error",
198+
"traceback": [
199+
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
200+
"\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)",
201+
"Cell \u001b[0;32mIn[2], line 5\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mpydra\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mtasks\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mtesting\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m SafeDivisionWorkflow\n\u001b[1;32m 3\u001b[0m wf \u001b[38;5;241m=\u001b[39m SafeDivisionWorkflow(a\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m10\u001b[39m, b\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m5\u001b[39m)\u001b[38;5;241m.\u001b[39msplit(denominator\u001b[38;5;241m=\u001b[39m[\u001b[38;5;241m3\u001b[39m, \u001b[38;5;241m2\u001b[39m ,\u001b[38;5;241m0\u001b[39m])\n\u001b[0;32m----> 5\u001b[0m \u001b[38;5;28;01mwith\u001b[39;00m \u001b[43mSubmitter\u001b[49m(worker\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mcf\u001b[39m\u001b[38;5;124m\"\u001b[39m) \u001b[38;5;28;01mas\u001b[39;00m sub:\n\u001b[1;32m 6\u001b[0m result \u001b[38;5;241m=\u001b[39m sub(wf)\n\u001b[1;32m 8\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mWorkflow completed successfully, results saved in: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mresult\u001b[38;5;241m.\u001b[39moutput_dir\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m)\n",
202+
"\u001b[0;31mNameError\u001b[0m: name 'Submitter' is not defined"
203+
]
204+
}
205+
],
206+
"source": [
207+
"from pydra.tasks.testing import SafeDivisionWorkflow\n",
208+
"\n",
209+
"wf = SafeDivisionWorkflow(a=10, b=5).split(denominator=[3, 2 ,0])\n",
210+
"\n",
211+
"with Submitter(worker=\"cf\") as sub:\n",
212+
" result = sub(wf)\n",
213+
" \n",
214+
"print(f\"Workflow completed successfully, results saved in: {result.output_dir}\")"
215+
]
163216
},
164217
{
165218
"cell_type": "markdown",
166219
"metadata": {},
167220
"source": [
168-
"Work in progress..."
221+
"To find the task directory where the issue first surfaced, iterate through every task\n",
222+
"cache dir and check the results for `float(\"inf\")`s"
169223
]
170224
},
171225
{
172-
"cell_type": "markdown",
226+
"cell_type": "code",
227+
"execution_count": null,
173228
"metadata": {},
174-
"source": []
229+
"outputs": [],
230+
"source": [
231+
"import cloudpickle as cp\n",
232+
"from pydra.utils import user_cache_dir\n",
233+
"\n",
234+
"run_cache = user_cache_dir / \"run-cache\"\n",
235+
"\n",
236+
"for task_cache_dir in run_cache.iterdir():\n",
237+
" with open(task_cache_dir / \"_result.pklz\", \"rb\") as f:\n",
238+
" error = cp.load(f)\n",
239+
" for \n",
240+
" "
241+
]
175242
}
176243
],
177244
"metadata": {

new-docs/source/tutorial/tst.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
wf = UnsafeDivisionWorkflow(a=10, b=5, denominator=2)
66

77
if __name__ == "__main__":
8-
with Submitter(worker="cf") as sub:
8+
with Submitter(worker="cf", rerun=True) as sub:
99
result = sub(wf)
1010

1111

pydra/design/python.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def make(wrapped: ty.Callable | type) -> PythonDef:
166166
for i, output in enumerate(parsed_outputs.values()):
167167
output.order = i
168168

169-
interface = make_task_def(
169+
defn = make_task_def(
170170
PythonDef,
171171
PythonOutputs,
172172
parsed_inputs,
@@ -177,7 +177,7 @@ def make(wrapped: ty.Callable | type) -> PythonDef:
177177
outputs_bases=outputs_bases,
178178
)
179179

180-
return interface
180+
return defn
181181

182182
if wrapped is not None:
183183
if not isinstance(wrapped, (ty.Callable, type)):

pydra/engine/specs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,8 @@ def _compute_hashes(self) -> ty.Tuple[bytes, ty.Dict[str, bytes]]:
380380
if getattr(field, "container_path", False):
381381
continue
382382
inp_dict[field.name] = getattr(self, field.name)
383+
# Include the outputs class, just in case any names or types have changed
384+
inp_dict["Outputs"] = self.Outputs
383385
hash_cache = Cache()
384386
field_hashes = {
385387
k: hash_function(v, cache=hash_cache) for k, v in inp_dict.items()

pydra/engine/tests/test_functions.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -182,18 +182,17 @@ def Indirect(a):
182182

183183
# Run functions to ensure behavior is unaffected
184184
a = random.randint(0, (1 << 32) - 3)
185-
assert Direct(a=a) == Partial(a=a)
186-
assert Direct(a=a) == Indirect(a=a)
185+
assert hashes(Direct(a=a)) == hashes(Partial(a=a)) == hashes(Indirect(a=a))
187186

188187
# checking if the annotation is properly converted to output_spec if used in task
189188
assert list_fields(Direct.Outputs) == [
190-
python.arg(name="sum", type=int),
191-
python.arg(name="sub", type=int),
189+
python.out(name="sum", type=int, order=0),
190+
python.out(name="sub", type=int, order=1),
192191
]
193192

194193

195194
def test_invalid_annotation():
196-
with pytest.raises(TypeError):
195+
with pytest.raises(ValueError, match="Unrecognised input names"):
197196

198197
@python.define(inputs={"b": int})
199198
def addtwo(a):
@@ -202,50 +201,51 @@ def addtwo(a):
202201

203202
def test_annotated_task():
204203

205-
def square(in_val: float):
204+
@python.define
205+
def Square(in_val: float):
206206
return in_val**2
207207

208-
res = square(in_val=2.0)()
209-
assert res.output.out == 4.0
208+
outputs = Square(in_val=2.0)()
209+
assert outputs.out == 4.0
210210

211211

212212
def test_return_annotated_task():
213213

214214
@python.define(inputs={"in_val": float}, outputs={"squared": float})
215-
def square(in_val):
215+
def Square(in_val):
216216
return in_val**2
217217

218-
res = square(in_val=2.0)()
219-
assert res.output.squared == 4.0
218+
outputs = Square(in_val=2.0)()
219+
assert outputs.squared == 4.0
220220

221221

222222
def test_return_halfannotated_annotated_task():
223223

224224
@python.define(inputs={"in_val": float}, outputs={"out": float})
225-
def square(in_val):
225+
def Square(in_val):
226226
return in_val**2
227227

228-
res = square(in_val=2.0)()
229-
assert res.output.out == 4.0
228+
outputs = Square(in_val=2.0)()
229+
assert outputs.out == 4.0
230230

231231

232232
def test_return_annotated_task_multiple_output():
233233

234234
@python.define(inputs={"in_val": float}, outputs={"squared": float, "cubed": float})
235-
def square(in_val):
235+
def Square(in_val):
236236
return in_val**2, in_val**3
237237

238-
res = square(in_val=2.0)()
239-
assert res.output.squared == 4.0
240-
assert res.output.cubed == 8.0
238+
outputs = Square(in_val=2.0)()
239+
assert outputs.squared == 4.0
240+
assert outputs.cubed == 8.0
241241

242242

243243
def test_return_halfannotated_task_multiple_output():
244244

245245
@python.define(inputs={"in_val": float}, outputs=(float, float))
246-
def square(in_val):
246+
def Square(in_val):
247247
return in_val**2, in_val**3
248248

249-
res = square(in_val=2.0)()
250-
assert res.output.out1 == 4.0
251-
assert res.output.out2 == 8.0
249+
outputs = Square(in_val=2.0)()
250+
assert outputs.out1 == 4.0
251+
assert outputs.out2 == 8.0

pydra/tasks/testing/__init__.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ def Divide(x: float, y: float) -> float:
1111
return x / y
1212

1313

14+
@python.define
15+
def SafeDivide(x: float, y: float) -> float:
16+
if y == 0:
17+
return float("inf")
18+
return x / y
19+
20+
1421
@python.define
1522
def Subtract(x: float, y: float) -> float:
1623
return x - y
@@ -41,3 +48,30 @@ def UnsafeDivisionWorkflow(a: float, b: float, denominator: float) -> float:
4148
divide = workflow.add(Divide(x=add.out, y=denominator))
4249
subtract = workflow.add(Subtract(x=divide.out, y=b))
4350
return subtract.out
51+
52+
53+
@workflow.define
54+
def SafeDivisionWorkflow(a: float, b: float, denominator: float) -> float:
55+
"""Adds 'a' and 'b' together, divides by 'denominator', and then subtracts 'b' from
56+
the output. Division by 0 is not guarded against so the workflow will fail if
57+
the value passed to the 'denominator' parameter is 0.
58+
59+
Parameters
60+
----------
61+
a : float
62+
The first number to add.
63+
b : float
64+
The second number to add.
65+
denominator : float
66+
The number to divide the sum of 'a' and 'b' by.
67+
68+
Returns
69+
-------
70+
out : float
71+
The result of subtracting 'b' from the result of dividing the sum of 'a' and
72+
'b' by 'denominator'.
73+
"""
74+
add = workflow.add(Add(x=a, y=b))
75+
divide = workflow.add(SafeDivide(x=add.out, y=denominator))
76+
subtract = workflow.add(Subtract(x=divide.out, y=b))
77+
return subtract.out

0 commit comments

Comments
 (0)