Skip to content

Commit e742836

Browse files
authored
Merge pull request #787 from nipype/touch-ups
Touch-ups
2 parents 322f175 + 21f2ef7 commit e742836

File tree

9 files changed

+137
-40
lines changed

9 files changed

+137
-40
lines changed

pydra/compose/base/task.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,16 @@ class Outputs:
3939
RESERVED_FIELD_NAMES = ("inputs",)
4040

4141
_cache_dir: Path = attrs.field(default=None, init=False, repr=False)
42+
_node = attrs.field(default=None, init=False, repr=False)
4243

4344
@property
4445
def inputs(self):
4546
"""The inputs object associated with a lazy-outputs object"""
46-
return self._get_node().inputs
47+
if self._node is None:
48+
raise AttributeError(
49+
f"{self} outputs object is not a lazy output of a workflow node"
50+
)
51+
return self._node.inputs
4752

4853
@classmethod
4954
def _from_task(cls, job: "Job[TaskType]") -> Self:
@@ -81,14 +86,6 @@ def _results(self) -> "Result[Self]":
8186
with open(results_path, "rb") as f:
8287
return cp.load(f)
8388

84-
def _get_node(self):
85-
try:
86-
return self._node
87-
except AttributeError:
88-
raise AttributeError(
89-
f"{self} outputs object is not a lazy output of a workflow node"
90-
) from None
91-
9289
def __iter__(self) -> ty.Generator[str, None, None]:
9390
"""The names of the fields in the output object"""
9491
return iter(sorted(f.name for f in attrs_fields(self)))

pydra/compose/python.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import inspect
33
from typing import dataclass_transform
44
import attrs
5-
from pydra.utils.general import task_fields, attrs_values
5+
from pydra.utils.general import task_fields, task_dict
66
from pydra.compose import base
77
from pydra.compose.base import (
88
ensure_field_objects,
@@ -231,7 +231,7 @@ class PythonTask(base.Task[PythonOutputsType]):
231231

232232
def _run(self, job: "Job[PythonTask]", rerun: bool = True) -> None:
233233
# Prepare the inputs to the function
234-
inputs = attrs_values(self)
234+
inputs = task_dict(self)
235235
del inputs["function"]
236236
# Run the actual function
237237
returned = self.function(**inputs)

pydra/compose/tests/test_workflow_fields.py

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from operator import attrgetter
2+
from pathlib import Path
23
from copy import copy
34
from unittest.mock import Mock
45
import pytest
@@ -15,17 +16,17 @@
1516

1617

1718
@python.define
18-
def Add(a, b):
19+
def Add(a: int | float, b: int | float) -> int | float:
1920
return a + b
2021

2122

2223
@python.define
23-
def Mul(a, b):
24+
def Mul(a: int | float, b: int | float) -> int | float:
2425
return a * b
2526

2627

2728
@python.define(outputs=["divided"])
28-
def Divide(x, y):
29+
def Divide(x: int | float, y: int | float) -> float:
2930
return x / y
3031

3132

@@ -68,7 +69,9 @@ def MyTestWorkflow(a, b):
6869
wf = Workflow.construct(workflow_spec)
6970
assert wf.inputs.a == 1
7071
assert wf.inputs.b == 2.0
71-
assert wf.outputs.out == LazyOutField(node=wf["Mul"], field="out", type=ty.Any)
72+
assert wf.outputs.out == LazyOutField(
73+
node=wf["Mul"], field="out", type=int | float, type_checked=True
74+
)
7275

7376
# Nodes are named after the specs by default
7477
assert list(wf.node_names) == ["Add", "Mul"]
@@ -185,7 +188,9 @@ class Outputs(workflow.Outputs):
185188
wf = Workflow.construct(workflow_spec)
186189
assert wf.inputs.a == 1
187190
assert wf.inputs.b == 2.0
188-
assert wf.outputs.out == LazyOutField(node=wf["Mul"], field="out", type=ty.Any)
191+
assert wf.outputs.out == LazyOutField(
192+
node=wf["Mul"], field="out", type=int | float, type_checked=True
193+
)
189194

190195
# Nodes are named after the specs by default
191196
assert list(wf.node_names) == ["Add", "Mul"]
@@ -323,7 +328,7 @@ def MyTestWorkflow(a: int, b: float) -> tuple[float, float]:
323328
node=wf["Mul"], field="out", type=float, type_checked=True
324329
)
325330
assert wf.outputs.out2 == LazyOutField(
326-
node=wf["division"], field="divided", type=ty.Any
331+
node=wf["division"], field="divided", type=float, type_checked=True
327332
)
328333
assert list(wf.node_names) == ["addition", "Mul", "division"]
329334

@@ -362,8 +367,12 @@ def MyTestWorkflow(a: int, b: float):
362367
wf = Workflow.construct(workflow_spec)
363368
assert wf.inputs.a == 1
364369
assert wf.inputs.b == 2.0
365-
assert wf.outputs.out1 == LazyOutField(node=wf["Mul"], field="out", type=ty.Any)
366-
assert wf.outputs.out2 == LazyOutField(node=wf["Add"], field="out", type=ty.Any)
370+
assert wf.outputs.out1 == LazyOutField(
371+
node=wf["Mul"], field="out", type=int | float, type_checked=True
372+
)
373+
assert wf.outputs.out2 == LazyOutField(
374+
node=wf["Add"], field="out", type=int | float, type_checked=True
375+
)
367376
assert list(wf.node_names) == ["Add", "Mul"]
368377

369378

@@ -500,3 +509,51 @@ def RecursiveNestedWorkflow(a: float, depth: int) -> float:
500509
type=float,
501510
type_checked=True,
502511
)
512+
513+
514+
def test_workflow_lzout_inputs1(tmp_path: Path):
515+
516+
@workflow.define
517+
def InputAccessWorkflow(a, b, c):
518+
add = workflow.add(Add(a=a, b=b))
519+
add.inputs.a = c
520+
mul = workflow.add(Mul(a=add.out, b=b))
521+
return mul.out
522+
523+
input_access_workflow = InputAccessWorkflow(a=1, b=2.0, c=3.0)
524+
outputs = input_access_workflow(cache_root=tmp_path)
525+
assert outputs.out == 10.0
526+
527+
528+
def test_workflow_lzout_inputs2(tmp_path: Path):
529+
530+
@workflow.define
531+
def InputAccessWorkflow2(a, b, c):
532+
add = workflow.add(Add(a=a, b=b))
533+
add.inputs.a = c
534+
mul = workflow.add(Mul(a=add.out, b=b))
535+
return mul.out
536+
537+
input_access_workflow = InputAccessWorkflow2(a=1, b=2.0, c=3.0)
538+
outputs = input_access_workflow(cache_root=tmp_path)
539+
assert outputs.out == 10.0
540+
541+
542+
def test_workflow_lzout_inputs_state_change_fail(tmp_path: Path):
543+
"""Set the inputs of the 'mul' node after its outputs have been accessed
544+
with an upstream lazy field that has a different state than the original.
545+
This changes the type of the input and is therefore not permitted"""
546+
547+
@workflow.define
548+
def InputAccessWorkflow3(a, b, c):
549+
add1 = workflow.add(Add(a=a, b=b), name="add1")
550+
add2 = workflow.add(Add(a=a).split(b=c), name="add2")
551+
mul1 = workflow.add(Mul(a=add1.out, b=b), name="mul1")
552+
workflow.add(Mul(a=mul1.out, b=b), name="mul2")
553+
mul1.inputs.a = add2.out
554+
555+
input_access_workflow = InputAccessWorkflow3(a=1, b=2.0, c=[3.0, 4.0])
556+
with pytest.raises(
557+
RuntimeError, match="have already been accessed and therefore cannot set"
558+
):
559+
input_access_workflow.construct()

pydra/conftest.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ def pytest_generate_tests(metafunc):
4343

4444
# For debugging in IDE's don't catch raised exceptions and let the IDE
4545
# break at it
46-
if os.getenv("_PYTEST_RAISE", "0") != "0":
46+
if os.getenv("_PYTEST_RAISE", "0") != "0": # pragma: no cover
4747

48-
@pytest.hookimpl(tryfirst=True)
49-
def pytest_exception_interact(call):
50-
raise call.excinfo.value
48+
@pytest.hookimpl(tryfirst=True) # pragma: no cover
49+
def pytest_exception_interact(call): # pragma: no cover
50+
raise call.excinfo.value # pragma: no cover
5151

52-
@pytest.hookimpl(tryfirst=True)
53-
def pytest_internalerror(excinfo):
54-
raise excinfo.value
52+
@pytest.hookimpl(tryfirst=True) # pragma: no cover
53+
def pytest_internalerror(excinfo): # pragma: no cover
54+
raise excinfo.value # pragma: no cover
5555

5656

5757
# Example VSCode launch configuration for debugging unittests

pydra/engine/lazy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class LazyField(ty.Generic[T], metaclass=abc.ABCMeta):
3030

3131
def __bytes_repr__(self, cache):
3232
yield type(self).__name__.encode() + b"("
33-
yield from bytes(hash_single(self.source, cache))
33+
yield b"source=" + bytes(hash_single(self._source, cache))
3434
yield b"field=" + self._field.encode()
3535
yield b"type=" + bytes(hash_single(self._type, cache))
3636
yield b"cast_from=" + bytes(hash_single(self._cast_from, cache))

pydra/engine/node.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from enum import Enum
44
import attrs
55
from pydra.engine import lazy
6-
from pydra.utils.general import attrs_values
6+
from pydra.utils.general import attrs_values, task_dict
77
from pydra.utils.typing import is_lazy
88
from pydra.engine.state import State, add_name_splitter, add_name_combiner
99

@@ -79,7 +79,7 @@ def __setattr__(self, name: str, value: ty.Any) -> None:
7979
f"cannot set {name!r} input to {value} because it changes the "
8080
f"state"
8181
)
82-
self._set_state()
82+
self._node._set_state()
8383

8484
@property
8585
def inputs(self) -> Inputs:
@@ -144,6 +144,7 @@ def lzout(self) -> OutputType:
144144
# output of an upstream node with additional state variables.
145145
outpt._type_checked = False
146146
self._lzout = outputs
147+
outputs._node = self
147148
return outputs
148149

149150
@property
@@ -161,10 +162,8 @@ def combiner(self):
161162
def _check_if_outputs_have_been_used(self, msg):
162163
used = []
163164
if self._lzout:
164-
for outpt_name, outpt_val in attrs.asdict(
165-
self._lzout, recurse=False
166-
).items():
167-
if outpt_val.type_checked:
165+
for outpt_name, outpt_val in task_dict(self._lzout).items():
166+
if outpt_val._type_checked:
168167
used.append(outpt_name)
169168
if used:
170169
raise RuntimeError(

pydra/engine/workflow.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,20 +95,20 @@ def construct(
9595
non_lazy_keys = frozenset(non_lazy_vals)
9696
hash_cache = Cache() # share the hash cache to avoid recalculations
9797
non_lazy_hash = hash_function(non_lazy_vals, cache=hash_cache)
98-
defn_hash = hash_function(type(task), cache=hash_cache)
98+
task_hash = hash_function(type(task), cache=hash_cache)
9999
# Check for same non-lazy inputs
100100
try:
101-
defn_cache = cls._constructed_cache[defn_hash]
101+
cached_tasks = cls._constructed_cache[task_hash]
102102
except KeyError:
103103
pass
104104
else:
105105
if (
106-
non_lazy_keys in defn_cache
107-
and non_lazy_hash in defn_cache[non_lazy_keys]
106+
non_lazy_keys in cached_tasks
107+
and non_lazy_hash in cached_tasks[non_lazy_keys]
108108
):
109-
return defn_cache[non_lazy_keys][non_lazy_hash]
109+
return cached_tasks[non_lazy_keys][non_lazy_hash]
110110
# Check for supersets of lazy inputs
111-
for key_set, key_set_cache in defn_cache.items():
111+
for key_set, key_set_cache in cached_tasks.items():
112112
if key_set.issubset(non_lazy_keys):
113113
subset_vals = {
114114
k: v for k, v in non_lazy_vals.items() if k in key_set
@@ -193,7 +193,7 @@ def construct(
193193
f"constructor of {workflow!r}"
194194
)
195195
if not dont_cache:
196-
cls._constructed_cache[defn_hash][non_lazy_keys][non_lazy_hash] = workflow
196+
cls._constructed_cache[task_hash][non_lazy_keys][non_lazy_hash] = workflow
197197

198198
return workflow
199199

pydra/utils/tests/test_hash.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import random
2222
from fileformats.generic import Directory, File
2323
from pydra.utils.hash import hash_function
24+
from pydra.utils.tests.utils import Concatenate
2425

2526

2627
def test_hash_file(tmpdir):
@@ -558,3 +559,14 @@ def __repr__(self):
558559
),
559560
):
560561
hash_object(A())
562+
563+
564+
def test_hash_task(tmp_path):
565+
"""
566+
Test that the hash of a task is consistent across runs
567+
"""
568+
569+
concatenate1 = Concatenate()
570+
concatenate2 = Concatenate()
571+
572+
assert hash_function(concatenate1) == hash_function(concatenate2)

pydra/utils/tests/utils.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import typing as ty
2+
from pathlib import Path
13
from fileformats.generic import File, BinaryFile
24
from fileformats.core.mixin import WithSeparateHeader, WithMagicNumber
35
from pydra.compose import shell, python
@@ -94,3 +96,33 @@ class Outputs(shell.Outputs):
9496
)
9597

9698
executable = "echo"
99+
100+
101+
@python.define(outputs=["out_file"])
102+
def Concatenate(
103+
in_file1: File,
104+
in_file2: File,
105+
out_file: ty.Optional[Path] = None,
106+
duplicates: int = 1,
107+
) -> File:
108+
"""Concatenates the contents of two files and writes them to a third
109+
110+
Parameters
111+
----------
112+
in_file1 : Path
113+
A text file
114+
in_file2 : Path
115+
Another text file
116+
out_file : Path
117+
The path to write the output file to
118+
119+
Returns
120+
-------
121+
out_file: Path
122+
A text file made by concatenating the two inputs
123+
"""
124+
if out_file is None:
125+
out_file = Path("out_file.txt").absolute()
126+
contents = [Path(fname).read_text() for fname in (in_file1, in_file2)]
127+
out_file.write_text("\n".join(contents * duplicates))
128+
return out_file

0 commit comments

Comments
 (0)