Skip to content

Commit a649dbc

Browse files
committed
split up environments module
1 parent b0586e9 commit a649dbc

File tree

17 files changed

+344
-238
lines changed

17 files changed

+344
-238
lines changed

pydra/engine/submitter.py

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from datetime import datetime
1212
from collections import defaultdict
1313
import attrs
14-
from pydra.workers.base import Worker, WORKERS
14+
import logging
1515
from pydra.engine.graph import DiGraph
1616
from pydra.utils.general import (
1717
task_fields,
@@ -25,7 +25,8 @@
2525
from pydra.utils.general import default_run_cache_dir
2626
from pydra.compose import workflow
2727
from pydra.engine.state import State
28-
import logging
28+
from pydra.workers.base import Worker
29+
2930

3031
logger = logging.getLogger("pydra.submitter")
3132

@@ -143,24 +144,16 @@ def __init__(
143144
self._own_loop = not self.loop.is_running()
144145
if isinstance(worker, Worker):
145146
self._worker = worker
146-
self.worker_name = worker.plugin_name
147147
else:
148-
if isinstance(worker, str):
149-
self.worker_name = worker
150-
try:
151-
worker_cls = WORKERS[self.worker_name]
152-
except KeyError:
153-
raise NotImplementedError(
154-
f"No worker for '{self.worker_name}' worker"
155-
)
156-
else:
157-
try:
158-
self.worker_name = worker.plugin_name
159-
except AttributeError:
160-
raise ValueError(
161-
"Worker class must have a 'plugin_name' str attribute"
162-
)
148+
if issubclass(worker, Worker):
163149
worker_cls = worker
150+
elif isinstance(worker, str):
151+
worker_cls = Worker.plugin(worker)
152+
else:
153+
raise TypeError(
154+
"Worker must be a Worker object, name of a worker or a Worker "
155+
f"class, not {type(worker)}"
156+
)
164157
try:
165158
self._worker = worker_cls(**kwargs)
166159
except TypeError as e:
@@ -170,7 +163,7 @@ def __init__(
170163
self.clean_stale_locks = (
171164
clean_stale_locks
172165
if clean_stale_locks is not None
173-
else (self.worker_name == "debug")
166+
else (self.worker.plugin_name() == "debug")
174167
)
175168
self.worker_kwargs = kwargs
176169
self._worker.loop = self.loop
@@ -216,7 +209,7 @@ def __call__(
216209
from pydra.environments.base import Environment
217210

218211
if raise_errors is None:
219-
raise_errors = self.worker_name == "debug"
212+
raise_errors = self.worker.plugin_name() == "debug"
220213
if not isinstance(raise_errors, bool):
221214
raise TypeError(
222215
f"'raise_errors' must be a boolean or None, not {type(raise_errors)}"
@@ -317,14 +310,12 @@ def __getstate__(self):
317310
# Remove the unpicklable entries or those that should not be pickled
318311
# When unpickled (in another process) the submitter can't be called
319312
state["loop"] = None
320-
state["_worker"] = None
321313
return state
322314

323315
def __setstate__(self, state):
324316
self.__dict__.update(state)
325317
# Restore the loop and worker
326318
self.loop = get_open_loop()
327-
self._worker = WORKERS[self.worker_name](**self.worker_kwargs)
328319
self.worker.loop = self.loop
329320

330321
def expand_workflow(self, workflow_task: "Job[workflow.Task]", rerun: bool) -> None:

pydra/engine/tests/test_dockertask.py

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import pytest
33
from pydra.engine.submitter import Submitter
44
from fileformats.generic import File
5-
from pydra.environments.docker import Docker
5+
from pydra.environments import docker
66
from pydra.compose import shell, workflow
77
from pydra.engine.job import Job
88
from .utils import no_win, need_docker, run_submitter, run_no_submitter
@@ -20,11 +20,13 @@ def test_docker_1_nosubm(tmp_path):
2020
docky_task = Job(
2121
task=docky,
2222
name="docky",
23-
submitter=Submitter(environment=Docker(image="busybox"), cache_dir=tmp_path),
23+
submitter=Submitter(
24+
environment=docker.Environment(image="busybox"), cache_dir=tmp_path
25+
),
2426
)
2527
assert docky_task.environment.image == "busybox"
2628
assert docky_task.environment.tag == "latest"
27-
assert isinstance(docky_task.environment, Docker)
29+
assert isinstance(docky_task.environment, docker.Environment)
2830
assert docky.cmdline == cmd
2931

3032
res = docky_task.run()
@@ -42,7 +44,9 @@ def test_docker_1(worker, tmp_path):
4244
Docky = shell.define(cmd)
4345
docky = Docky()
4446

45-
with Submitter(cache_dir=tmp_path, environment=Docker(image="busybox")) as sub:
47+
with Submitter(
48+
cache_dir=tmp_path, environment=docker.Environment(image="busybox")
49+
) as sub:
4650
res = sub(docky)
4751

4852
assert res.outputs.stdout == "root\n"
@@ -61,7 +65,9 @@ def test_docker_2(run_function, worker, tmp_path):
6165
docky = Docky()
6266
# cmdline doesn't know anything about docker
6367
assert docky.cmdline == cmdline
64-
outputs = run_function(docky, tmp_path, worker, environment=Docker(image="busybox"))
68+
outputs = run_function(
69+
docky, tmp_path, worker, environment=docker.Environment(image="busybox")
70+
)
6571
assert outputs.stdout.strip() == " ".join(cmdline.split()[1:])
6672
assert outputs.return_code == 0
6773

@@ -80,7 +86,9 @@ def test_docker_2a(run_function, worker, tmp_path):
8086
assert docky.executable == cmd
8187
assert docky.cmdline == " ".join(cmd)
8288

83-
outputs = run_function(docky, tmp_path, worker, environment=Docker(image="busybox"))
89+
outputs = run_function(
90+
docky, tmp_path, worker, environment=docker.Environment(image="busybox")
91+
)
8492
assert outputs.stdout.strip() == " ".join(cmd[1:])
8593
assert outputs.return_code == 0
8694

@@ -99,7 +107,9 @@ def test_docker_st_1(run_function, worker, tmp_path):
99107
Docky = shell.define("docky") # cmd is just a placeholder
100108
docky = Docky().split(executable=cmd)
101109

102-
outputs = run_function(docky, tmp_path, worker, environment=Docker(image="busybox"))
110+
outputs = run_function(
111+
docky, tmp_path, worker, environment=docker.Environment(image="busybox")
112+
)
103113
assert (
104114
outputs.stdout[0]
105115
== f"/mnt/pydra{tmp_path}/{attrs.evolve(docky, executable=cmd[0])._checksum}\n"
@@ -121,7 +131,7 @@ def test_docker_outputspec_1(worker, tmp_path):
121131
Docky = shell.define("touch <out|newfile$newfile_tmp.txt>")
122132
docky = Docky()
123133

124-
outputs = docky(worker=worker, environment=Docker(image="ubuntu"))
134+
outputs = docky(worker=worker, environment=docker.Environment(image="ubuntu"))
125135
assert outputs.stdout == ""
126136

127137

@@ -154,7 +164,9 @@ def test_docker_inputspec_1(tmp_path, worker):
154164
docky = Docky(file=filename)
155165

156166
outputs = docky(
157-
cache_dir=tmp_path, worker=worker, environment=Docker(image="busybox")
167+
cache_dir=tmp_path,
168+
worker=worker,
169+
environment=docker.Environment(image="busybox"),
158170
)
159171
assert outputs.stdout.strip() == "hello from pydra"
160172

@@ -187,7 +199,7 @@ def test_docker_inputspec_1a(tmp_path):
187199

188200
docky = Docky()
189201

190-
outputs = docky(cache_dir=tmp_path, environment=Docker(image="busybox"))
202+
outputs = docky(cache_dir=tmp_path, environment=docker.Environment(image="busybox"))
191203
assert outputs.stdout.strip() == "hello from pydra"
192204

193205

@@ -231,7 +243,7 @@ def test_docker_inputspec_2(worker, tmp_path):
231243

232244
outputs = docky(
233245
name="docky",
234-
environment=Docker(image="busybox"),
246+
environment=docker.Environment(image="busybox"),
235247
)
236248
assert outputs.stdout.strip() == "hello from pydra\nhave a nice one"
237249

@@ -278,7 +290,9 @@ def test_docker_inputspec_2a_except(worker, tmp_path):
278290
assert docky.file2.fspath == filename_2
279291

280292
outputs = docky(
281-
cache_dir=tmp_path, worker=worker, environment=Docker(image="busybox")
293+
cache_dir=tmp_path,
294+
worker=worker,
295+
environment=docker.Environment(image="busybox"),
282296
)
283297
assert outputs.stdout.strip() == "hello from pydra\nhave a nice one"
284298

@@ -323,7 +337,9 @@ def test_docker_inputspec_2a(worker, tmp_path):
323337
docky = Docky(file2=filename_2)
324338

325339
outputs = docky(
326-
cache_dir=tmp_path, worker=worker, environment=Docker(image="busybox")
340+
cache_dir=tmp_path,
341+
worker=worker,
342+
environment=docker.Environment(image="busybox"),
327343
)
328344
assert outputs.stdout.strip() == "hello from pydra\nhave a nice one"
329345

@@ -350,7 +366,7 @@ def test_docker_inputspec_3(worker, tmp_path):
350366
]
351367

352368
docky = shell.define(cmd, inputs=inputs)(
353-
environment=Docker(image="busybox"),
369+
environment=docker.Environment(image="busybox"),
354370
file=filename,
355371
strip=True,
356372
)
@@ -391,7 +407,9 @@ class Outputs(shell.Outputs):
391407
docky = Docky(orig_file=str(file))
392408

393409
outputs = docky(
394-
cache_dir=tmp_path, worker=worker, environment=Docker(image="busybox")
410+
cache_dir=tmp_path,
411+
worker=worker,
412+
environment=docker.Environment(image="busybox"),
395413
)
396414
assert outputs.stdout == ""
397415
out_file = outputs.out_file.fspath
@@ -436,7 +454,9 @@ def test_docker_inputspec_state_1(worker, tmp_path):
436454
docky = Docky().split(file=[str(filename_1), str(filename_2)])
437455

438456
outputs = docky(
439-
worker=worker, cache_dir=tmp_path, environment=Docker(image="busybox")
457+
worker=worker,
458+
cache_dir=tmp_path,
459+
environment=docker.Environment(image="busybox"),
440460
)
441461
assert outputs.stdout[0].strip() == "hello from pydra"
442462
assert outputs.stdout[1].strip() == "have a nice one"
@@ -473,7 +493,9 @@ def test_docker_inputspec_state_1b(worker, tmp_path):
473493
docky = Docky().split(file=[str(file_1), str(file_2)])
474494

475495
outputs = docky(
476-
cache_dir=tmp_path, worker=worker, environment=Docker(image="busybox")
496+
cache_dir=tmp_path,
497+
worker=worker,
498+
environment=docker.Environment(image="busybox"),
477499
)
478500
assert outputs.stdout[0].strip() == "hello from pydra"
479501
assert outputs.stdout[1].strip() == "have a nice one"
@@ -507,7 +529,7 @@ def Workflow(file):
507529

508530
docky = workflow.add(
509531
Docky(file=file),
510-
environment=Docker(image="busybox"),
532+
environment=docker.Environment(image="busybox"),
511533
)
512534

513535
return docky.stdout
@@ -549,7 +571,7 @@ def Workflow(file):
549571

550572
docky = workflow.add(
551573
Docky(file=file),
552-
environment=Docker(image="busybox"),
574+
environment=docker.Environment(image="busybox"),
553575
)
554576

555577
return docky.stdout
@@ -593,7 +615,7 @@ def Workflow(file):
593615

594616
docky = workflow.add(
595617
Docky(file=file),
596-
environment=Docker(image="busybox"),
618+
environment=docker.Environment(image="busybox"),
597619
)
598620

599621
return docky.stdout

0 commit comments

Comments
 (0)