Skip to content

Commit b8a0f1d

Browse files
committed
debugging test_tasks
1 parent 151c325 commit b8a0f1d

File tree

17 files changed

+642
-706
lines changed

17 files changed

+642
-706
lines changed

new-docs/source/tutorial/2-advanced-execution.ipynb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,15 @@
393393
"Work in progress..."
394394
]
395395
},
396+
{
397+
"cell_type": "markdown",
398+
"metadata": {},
399+
"source": [
400+
"## Hooks\n",
401+
"\n",
402+
"Work in progress..."
403+
]
404+
},
396405
{
397406
"cell_type": "markdown",
398407
"metadata": {},

new-docs/source/tutorial/6-workflow.ipynb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@
133133
"def SplitWorkflow(a: list[int], b: list[float]) -> list[float]:\n",
134134
" # Multiply over all combinations of the elements of a and b, then combine the results\n",
135135
" # for each a element into a list over each b element\n",
136-
" mul = workflow.add(Mul()).split(x=a, y=b).combine(\"x\")\n",
136+
" mul = workflow.add(Mul().split(x=a, y=b).combine(\"x\"))\n",
137137
" # Sume the multiplications across all all b elements for each a element\n",
138138
" sum = workflow.add(Sum(x=mul.out))\n",
139139
" return sum.out"
@@ -154,8 +154,8 @@
154154
"source": [
155155
"@workflow.define\n",
156156
"def SplitThenCombineWorkflow(a: list[int], b: list[float], c: float) -> list[float]:\n",
157-
" mul = workflow.add(Mul()).split(x=a, y=b)\n",
158-
" add = workflow.add(Add(x=mul.out, y=c)).combine(\"Mul.x\")\n",
157+
" mul = workflow.add(Mul().split(x=a, y=b))\n",
158+
" add = workflow.add(Add(x=mul.out, y=c).combine(\"Mul.x\"))\n",
159159
" sum = workflow.add(Sum(x=add.out))\n",
160160
" return sum.out"
161161
]

pydra/design/base.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -830,10 +830,16 @@ def extract_function_inputs_and_outputs(
830830
inputs = input_types
831831
for inpt_name, default in input_defaults.items():
832832
inpt = inputs[inpt_name]
833-
if isinstance(inpt, arg_type) and inpt.default is EMPTY:
834-
inpt.default = default
835-
else:
833+
if isinstance(inpt, arg_type):
834+
if inpt.default is EMPTY:
835+
inpt.default = default
836+
elif inspect.isclass(inpt):
836837
inputs[inpt_name] = arg_type(type=inpt, default=default)
838+
else:
839+
raise ValueError(
840+
f"Unrecognised input type ({inpt}) for input {inpt_name} with default "
841+
f"value {default}"
842+
)
837843
return_type = type_hints.get("return", ty.Any)
838844
if outputs and len(outputs) > 1:
839845
if return_type is not ty.Any:

pydra/engine/audit.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Module to keep track of provenance information."""
22

33
import os
4+
import typing as ty
45
import json
56
from pydra.utils.messenger import send_message, make_message, gen_uuid, now, AuditFlag
67
from pydra.engine.helpers import attrs_values
@@ -12,6 +13,9 @@
1213
except ImportError:
1314
import importlib.resources as importlib_resources # type: ignore
1415

16+
if ty.TYPE_CHECKING:
17+
from pydra.engine.task import Task
18+
1519

1620
class Audit:
1721
"""Handle provenance tracking and resource utilization."""
@@ -178,17 +182,19 @@ def audit_check(self, flag):
178182
"""
179183
return self.audit_flags & flag
180184

181-
def audit_task(self, task):
185+
def audit_task(self, task: "Task"):
182186
import subprocess as sp
183-
from .helpers import attrs_fields
187+
from .helpers import list_fields
184188

185189
label = task.name
186190

187-
command = task.cmdline if hasattr(task.inputs, "executable") else None
188-
attr_list = attrs_fields(task.inputs)
191+
command = (
192+
task.definition.cmdline if hasattr(task.definition, "executable") else None
193+
)
194+
attr_list = list_fields(task.definition)
189195
for attrs in attr_list:
190196
input_name = attrs.name
191-
value = getattr(task.inputs, input_name)
197+
value = task.inputs[input_name]
192198
if isinstance(value, FileSet):
193199
input_path = os.path.abspath(value)
194200
file_hash = hash_function(value)

pydra/engine/core.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
TaskHook,
3333
)
3434
from .helpers import (
35-
create_checksum,
3635
attrs_fields,
3736
attrs_values,
3837
load_result,
@@ -203,8 +202,7 @@ def checksum(self):
203202
"""
204203
if self._checksum is not None:
205204
return self._checksum
206-
input_hash = self.definition._hash
207-
self._checksum = create_checksum(self.definition._task_type, input_hash)
205+
self._checksum = self.definition._checksum
208206
return self._checksum
209207

210208
@property

pydra/engine/specs.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ def __call__(
159159
audit_flags: AuditFlag = AuditFlag.NONE,
160160
messengers: ty.Iterable[Messenger] | None = None,
161161
messenger_args: dict[str, ty.Any] | None = None,
162+
name: str | None = None,
162163
**kwargs: ty.Any,
163164
) -> OutputsType:
164165
"""Create a task from this definition and execute it to produce a result.
@@ -183,6 +184,8 @@ def __call__(
183184
Messengers, by default None
184185
messenger_args : dict, optional
185186
Messenger arguments, by default None
187+
name : str
188+
The name of the task, by default None
186189
**kwargs : dict
187190
Keyword arguments to pass on to the worker initialisation
188191
@@ -209,7 +212,7 @@ def __call__(
209212
worker=worker,
210213
**kwargs,
211214
) as sub:
212-
result = sub(self)
215+
result = sub(self, name=name)
213216
except TypeError as e:
214217
if hasattr(e, "__notes__") and WORKER_KWARG_FAIL_NOTE in e.__notes__:
215218
if match := re.match(
@@ -412,6 +415,10 @@ def _hash(self):
412415
hsh, self._hashes = self._compute_hashes()
413416
return hsh
414417

418+
@property
419+
def _checksum(self):
420+
return f"{self._task_type}-{self._hash}"
421+
415422
def _hash_changes(self):
416423
"""Detects any changes in the hashed values between the current inputs and the
417424
previously calculated values"""

pydra/engine/submitter.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,13 @@ def worker(self):
168168
def __call__(
169169
self,
170170
task_def: "TaskDef",
171+
name: str | None = "task",
171172
):
172173
"""Submitter run function."""
173174

175+
if name is None:
176+
name = "task"
177+
174178
task_def._check_rules()
175179
# If the outer task is split, create an implicit workflow to hold the split nodes
176180
if task_def._splitter:
@@ -190,10 +194,10 @@ def Split(defn: TaskDef, output_types: dict):
190194
f"Task {self} is marked for combining, but not splitting. "
191195
"Use the `split` method to split the task before combining."
192196
)
193-
task = Task(task_def, submitter=self, name="task", environment=self.environment)
197+
task = Task(task_def, submitter=self, name=name, environment=self.environment)
194198
try:
195199
self.run_start_time = datetime.now()
196-
if task.is_async: # Only workflow tasks can be async
200+
if self.worker.is_async: # Only workflow tasks can be async
197201
self.loop.run_until_complete(
198202
self.worker.run_async(task, rerun=self.rerun)
199203
)

pydra/engine/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def __init__(
149149
self.inputs_mod_root = {}
150150

151151

152-
class BoshTask(ShellTask):
152+
class BoshTask(ShellDef):
153153

154154
def _command_args_single(self, state_ind=None, index=None):
155155
"""Get command line arguments for a single state"""

pydra/engine/tests/test_dockertask.py

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import pytest
2-
from ..task import ShellTask
3-
from ..submitter import Submitter
2+
from pydra.engine.specs import ShellDef
3+
from pydra.engine.submitter import Submitter
44
from fileformats.generic import File
5-
from ..environments import Docker
5+
from pydra.engine.environments import Docker
66
from pydra.design import shell, workflow
7+
from pydra.engine.core import Task
78
from .utils import no_win, need_docker, result_submitter, result_no_submitter
89

910

@@ -14,13 +15,19 @@ def test_docker_1_nosubm():
1415
no submitter
1516
"""
1617
cmd = "whoami"
17-
docky = shell.define(cmd)(environment=Docker(image="busybox"))
18-
assert docky.environment.image == "busybox"
19-
assert docky.environment.tag == "latest"
20-
assert isinstance(docky.environment, Docker)
21-
assert docky.cmdline == cmd
18+
Docky = shell.define(cmd)
19+
docky = Docky()
20+
docky_task = Task(
21+
definition=docky,
22+
name="docky",
23+
submitter=Submitter(environment=Docker(image="busybox")),
24+
)
25+
assert docky_task.environment.image == "busybox"
26+
assert docky_task.environment.tag == "latest"
27+
assert isinstance(docky_task.environment, Docker)
28+
assert docky_task.cmdline == cmd
2229

23-
res = docky()
30+
res = docky_task()
2431
assert res.output.stdout == "root\n"
2532
assert res.output.return_code == 0
2633

@@ -32,14 +39,14 @@ def test_docker_1(plugin):
3239
using submitter
3340
"""
3441
cmd = "whoami"
35-
docky = shell.define(cmd)(environment=Docker(image="busybox"))
42+
Docky = shell.define(cmd)
43+
docky = Docky()
3644

37-
with Submitter(worker=plugin) as sub:
38-
docky(submitter=sub)
45+
with Submitter(environment=Docker(image="busybox")) as sub:
46+
res = sub(docky)
3947

40-
res = docky.result()
41-
assert res.output.stdout == "root\n"
42-
assert res.output.return_code == 0
48+
assert res.outputs.stdout == "root\n"
49+
assert res.outputs.return_code == 0
4350

4451

4552
@no_win
@@ -50,7 +57,8 @@ def test_docker_2(results_function, plugin):
5057
with and without submitter
5158
"""
5259
cmdline = "echo hail pydra"
53-
docky = shell.define(cmdline)(environment=Docker(image="busybox"))
60+
Docky = shell.define(cmdline)
61+
docky = Docky()
5462
# cmdline doesn't know anything about docker
5563
assert docky.cmdline == cmdline
5664
res = results_function(docky, plugin)
@@ -68,13 +76,9 @@ def test_docker_2a(results_function, plugin):
6876
cmd_exec = "echo"
6977
cmd_args = ["hail", "pydra"]
7078
# separate command into exec + args
71-
docky = ShellTask(
72-
name="docky",
73-
executable=cmd_exec,
74-
args=cmd_args,
75-
environment=Docker(image="busybox"),
76-
)
77-
assert docky.definition.executable == "echo"
79+
Docky = shell.define(" ".join([cmd_exec] + cmd_args))
80+
docky = Docky()
81+
assert docky.executable == "echo"
7882
assert docky.cmdline == f"{cmd_exec} {' '.join(cmd_args)}"
7983

8084
res = results_function(docky, plugin)
@@ -93,9 +97,9 @@ def test_docker_st_1(results_function, plugin):
9397
splitter = executable
9498
"""
9599
cmd = ["pwd", "whoami"]
96-
docky = ShellTask(name="docky", environment=Docker(image="busybox")).split(
97-
"executable", executable=cmd
98-
)
100+
Docky = shell.define("placeholder")
101+
docky = Docky().split(executable=cmd)
102+
99103
assert docky.state.splitter == "docky.executable"
100104

101105
res = results_function(docky, plugin)

0 commit comments

Comments
 (0)