Skip to content

Commit dc121df

Browse files
authored
Merge pull request #516 from nipype/rf/environments
NF: Add Environment class, with initial Native/Docker implementations
2 parents 4f389cd + 0b0c71b commit dc121df

14 files changed

+954
-1497
lines changed

pydra/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@
1515
import attr
1616

1717
from . import mark
18-
from .engine import AuditFlag, DockerTask, ShellCommandTask, Submitter, Workflow, specs
18+
from .engine import AuditFlag, ShellCommandTask, Submitter, Workflow, specs
1919

2020
__all__ = (
2121
"Submitter",
2222
"Workflow",
2323
"AuditFlag",
2424
"ShellCommandTask",
25-
"DockerTask",
2625
"specs",
2726
"mark",
2827
)

pydra/engine/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
"""The core of the workflow engine."""
22
from .submitter import Submitter
33
from .core import Workflow
4-
from .task import AuditFlag, ShellCommandTask, DockerTask
4+
from .task import AuditFlag, ShellCommandTask
55
from . import specs
66

77
__all__ = [
88
"AuditFlag",
9-
"DockerTask",
109
"ShellCommandTask",
1110
"Submitter",
1211
"Workflow",

pydra/engine/core.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,13 @@ def cont_dim(self, cont_dim):
429429
self._cont_dim = cont_dim
430430

431431
def __call__(
432-
self, submitter=None, plugin=None, plugin_kwargs=None, rerun=False, **kwargs
432+
self,
433+
submitter=None,
434+
plugin=None,
435+
plugin_kwargs=None,
436+
rerun=False,
437+
environment=None,
438+
**kwargs,
433439
):
434440
"""Make tasks callable themselves."""
435441
from .submitter import Submitter
@@ -449,9 +455,9 @@ def __call__(
449455
if submitter:
450456
with submitter as sub:
451457
self.inputs = attr.evolve(self.inputs, **kwargs)
452-
res = sub(self)
458+
res = sub(self, environment=environment)
453459
else: # tasks without state could be run without a submitter
454-
res = self._run(rerun=rerun, **kwargs)
460+
res = self._run(rerun=rerun, environment=environment, **kwargs)
455461
return res
456462

457463
def _modify_inputs(self):
@@ -501,7 +507,7 @@ def _populate_filesystem(self, checksum, output_dir):
501507
shutil.rmtree(output_dir)
502508
output_dir.mkdir(parents=False, exist_ok=self.can_resume)
503509

504-
def _run(self, rerun=False, **kwargs):
510+
def _run(self, rerun=False, environment=None, **kwargs):
505511
self.inputs = attr.evolve(self.inputs, **kwargs)
506512
self.inputs.check_fields_input_spec()
507513

@@ -518,6 +524,7 @@ def _run(self, rerun=False, **kwargs):
518524
return result
519525
cwd = os.getcwd()
520526
self._populate_filesystem(checksum, output_dir)
527+
os.chdir(output_dir)
521528
orig_inputs = self._modify_inputs()
522529
result = Result(output=None, runtime=None, errored=False)
523530
self.hooks.pre_run_task(self)
@@ -526,7 +533,7 @@ def _run(self, rerun=False, **kwargs):
526533
self.audit.audit_task(task=self)
527534
try:
528535
self.audit.monitor()
529-
self._run_task()
536+
self._run_task(environment=environment)
530537
result.output = self._collect_outputs(output_dir=output_dir)
531538
except Exception:
532539
etype, eval, etr = sys.exc_info()
@@ -538,7 +545,6 @@ def _run(self, rerun=False, **kwargs):
538545
self.hooks.post_run_task(self, result)
539546
self.audit.finalize_audit(result)
540547
save(output_dir, result=result, task=self)
541-
self.output_ = None
542548
# removing the additional file with the chcksum
543549
(self.cache_dir / f"{self.uid}_info.json").unlink()
544550
# # function etc. shouldn't change anyway, so removing
@@ -551,15 +557,14 @@ def _run(self, rerun=False, **kwargs):
551557
return result
552558

553559
def _collect_outputs(self, output_dir):
554-
run_output = self.output_
555560
output_klass = make_klass(self.output_spec)
556561
output = output_klass(
557562
**{f.name: attr.NOTHING for f in attr.fields(output_klass)}
558563
)
559564
other_output = output.collect_additional_outputs(
560-
self.inputs, output_dir, run_output
565+
self.inputs, output_dir, self.output_
561566
)
562-
return attr.evolve(output, **run_output, **other_output)
567+
return attr.evolve(output, **self.output_, **other_output)
563568

564569
def split(
565570
self,

pydra/engine/environments.py

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
from .helpers import execute
2+
3+
from pathlib import Path
4+
5+
6+
class Environment:
7+
"""
8+
Base class for environments that are used to execute tasks.
9+
Right now it is asssumed that the environment, including container images,
10+
are available and are not removed at the end
11+
TODO: add setup and teardown methods
12+
"""
13+
14+
def setup(self):
15+
pass
16+
17+
def execute(self, task):
18+
"""
19+
Execute the task in the environment.
20+
21+
Parameters
22+
----------
23+
task : TaskBase
24+
the task to execute
25+
26+
Returns
27+
-------
28+
output
29+
Output of the task.
30+
"""
31+
raise NotImplementedError
32+
33+
def teardown(self):
34+
pass
35+
36+
37+
class Native(Environment):
38+
"""
39+
Native environment, i.e. the tasks are executed in the current python environment.
40+
"""
41+
42+
def execute(self, task):
43+
keys = ["return_code", "stdout", "stderr"]
44+
values = execute(task.command_args(), strip=task.strip)
45+
output = dict(zip(keys, values))
46+
if output["return_code"]:
47+
msg = f"Error running '{task.name}' task with {task.command_args()}:"
48+
if output["stderr"]:
49+
msg += "\n\nstderr:\n" + output["stderr"]
50+
if output["stdout"]:
51+
msg += "\n\nstdout:\n" + output["stdout"]
52+
raise RuntimeError(msg)
53+
return output
54+
55+
56+
class Container(Environment):
57+
"""
58+
Base class for container environments used by Docker and Singularity.
59+
60+
Parameters
61+
----------
62+
image : str
63+
Name of the container image
64+
tag : str
65+
Tag of the container image
66+
root : str
67+
Base path for mounting host directories into the container
68+
xargs : Union[str, List[str]]
69+
Extra arguments to be passed to the container
70+
"""
71+
72+
def __init__(self, image, tag="latest", root="/mnt/pydra", xargs=None):
73+
self.image = image
74+
self.tag = tag
75+
if xargs is None:
76+
xargs = []
77+
elif isinstance(xargs, str):
78+
xargs = xargs.split()
79+
self.xargs = xargs
80+
self.root = root
81+
82+
def bind(self, loc, mode="ro"):
83+
loc_abs = Path(loc).absolute()
84+
return f"{loc_abs}:{self.root}{loc_abs}:{mode}"
85+
86+
87+
class Docker(Container):
88+
"""Docker environment."""
89+
90+
def execute(self, task):
91+
docker_img = f"{self.image}:{self.tag}"
92+
# mounting all input locations
93+
mounts = task.get_bindings(root=self.root)
94+
95+
docker_args = [
96+
"docker",
97+
"run",
98+
"-v",
99+
self.bind(task.cache_dir, "rw"),
100+
*self.xargs,
101+
]
102+
docker_args.extend(
103+
" ".join(
104+
[f"-v {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
105+
).split()
106+
)
107+
docker_args.extend(["-w", f"{self.root}{task.output_dir}"])
108+
keys = ["return_code", "stdout", "stderr"]
109+
110+
values = execute(
111+
docker_args + [docker_img] + task.command_args(root=self.root),
112+
strip=task.strip,
113+
)
114+
output = dict(zip(keys, values))
115+
if output["return_code"]:
116+
if output["stderr"]:
117+
raise RuntimeError(output["stderr"])
118+
else:
119+
raise RuntimeError(output["stdout"])
120+
return output
121+
122+
123+
class Singularity(Container):
124+
"""Singularity environment."""
125+
126+
def execute(self, task):
127+
singularity_img = f"{self.image}:{self.tag}"
128+
# mounting all input locations
129+
mounts = task.get_bindings(root=self.root)
130+
131+
# todo adding xargsy etc
132+
singularity_args = [
133+
"singularity",
134+
"exec",
135+
"-B",
136+
self.bind(task.cache_dir, "rw"),
137+
*self.xargs,
138+
]
139+
singularity_args.extend(
140+
" ".join(
141+
[f"-B {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
142+
).split()
143+
)
144+
singularity_args.extend(["--pwd", f"{self.root}{task.output_dir}"])
145+
keys = ["return_code", "stdout", "stderr"]
146+
147+
values = execute(
148+
singularity_args + [singularity_img] + task.command_args(root=self.root),
149+
strip=task.strip,
150+
)
151+
output = dict(zip(keys, values))
152+
if output["return_code"]:
153+
if output["stderr"]:
154+
raise RuntimeError(output["stderr"])
155+
else:
156+
raise RuntimeError(output["stdout"])
157+
return output

pydra/engine/specs.py

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -676,37 +676,6 @@ def _check_requires(self, fld, inputs):
676676
return False
677677

678678

679-
@attr.s(auto_attribs=True, kw_only=True)
680-
class ContainerSpec(ShellSpec):
681-
"""Refine the generic command-line specification to container execution."""
682-
683-
image: ty.Union[File, str] = attr.ib(
684-
metadata={"help_string": "image", "mandatory": True}
685-
)
686-
"""The image to be containerized."""
687-
container: ty.Union[File, str, None] = attr.ib(
688-
metadata={"help_string": "container"}
689-
)
690-
"""The container."""
691-
container_xargs: ty.Optional[ty.List[str]] = attr.ib(
692-
default=None, metadata={"help_string": "todo"}
693-
)
694-
695-
696-
@attr.s(auto_attribs=True, kw_only=True)
697-
class DockerSpec(ContainerSpec):
698-
"""Particularize container specifications to the Docker engine."""
699-
700-
container: str = attr.ib("docker", metadata={"help_string": "container"})
701-
702-
703-
@attr.s(auto_attribs=True, kw_only=True)
704-
class SingularitySpec(ContainerSpec):
705-
"""Particularize container specifications to Singularity."""
706-
707-
container: str = attr.ib("singularity", metadata={"help_string": "container type"})
708-
709-
710679
@attr.s
711680
class LazyInterface:
712681
_task: "core.TaskBase" = attr.ib()

pydra/engine/submitter.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,16 @@ def __init__(self, plugin="cf", **kwargs):
3535
raise NotImplementedError(f"No worker for {self.plugin}")
3636
self.worker.loop = self.loop
3737

38-
def __call__(self, runnable, cache_locations=None, rerun=False):
38+
def __call__(self, runnable, cache_locations=None, rerun=False, environment=None):
3939
"""Submitter run function."""
4040
if cache_locations is not None:
4141
runnable.cache_locations = cache_locations
42-
self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
42+
self.loop.run_until_complete(
43+
self.submit_from_call(runnable, rerun, environment)
44+
)
4345
return runnable.result()
4446

45-
async def submit_from_call(self, runnable, rerun):
47+
async def submit_from_call(self, runnable, rerun, environment):
4648
"""
4749
This coroutine should only be called once per Submitter call,
4850
and serves as the bridge between sync/async lands.
@@ -56,7 +58,7 @@ async def submit_from_call(self, runnable, rerun):
5658
Once Python 3.10 is the minimum, this should probably be refactored into using
5759
structural pattern matching.
5860
"""
59-
if is_workflow(runnable):
61+
if is_workflow(runnable): # TODO: env to wf
6062
# connect and calculate the checksum of the graph before running
6163
runnable._connect_and_propagate_to_tasks(override_task_caches=True)
6264
# 0
@@ -74,10 +76,11 @@ async def submit_from_call(self, runnable, rerun):
7476
# 2
7577
if runnable.state is None:
7678
# run_el should always return a coroutine
77-
await self.worker.run_el(runnable, rerun=rerun)
79+
print("in SUBM", environment)
80+
await self.worker.run_el(runnable, rerun=rerun, environment=environment)
7881
# 3
7982
else:
80-
await self.expand_runnable(runnable, wait=True, rerun=rerun)
83+
await self.expand_runnable(runnable, wait=True, rerun=rerun) # TODO
8184
return True
8285

8386
async def expand_runnable(self, runnable, wait=False, rerun=False):

0 commit comments

Comments
 (0)