Skip to content

Commit e3418b3

Browse files
committed
got concurrent futures worker to work
1 parent 6a590e9 commit e3418b3

File tree

6 files changed

+150
-207
lines changed

6 files changed

+150
-207
lines changed

new-docs/source/tutorial/tst.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
from pydra.design import python
22

3+
if __name__ == "__main__":
34

4-
@python.define
5-
def TenToThePower(p: int) -> int:
6-
return 10**p
5+
@python.define
6+
def TenToThePower(p: int) -> int:
7+
return 10**p
78

9+
ten_to_the_power = TenToThePower().split(p=[1, 2, 3, 4, 5])
810

9-
ten_to_the_power = TenToThePower().split(p=[1, 2, 3, 4, 5])
11+
# Run the 5 tasks in parallel split across 3 processes
12+
outputs = ten_to_the_power(worker="cf", n_procs=3)
1013

11-
# Run the 5 tasks in parallel split across 3 processes
12-
outputs = ten_to_the_power(worker="debug")
14+
p1, p2, p3, p4, p5 = outputs.out
1315

14-
p1, p2, p3, p4, p5 = outputs.out
15-
16-
print(f"10^5 = {p5}")
16+
print(f"10^5 = {p5}")

pydra/engine/core.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,8 @@ def _populate_filesystem(self):
325325
if not self.can_resume and self.output_dir.exists():
326326
shutil.rmtree(self.output_dir)
327327
self.output_dir.mkdir(parents=False, exist_ok=self.can_resume)
328+
# Save task pkl into the output directory for future reference
329+
save(self.output_dir, task=self)
328330

329331
def run(self, rerun: bool = False):
330332
"""Prepare the task working directory, execute the task definition, and save the
@@ -382,7 +384,7 @@ def run(self, rerun: bool = False):
382384
self._check_for_hash_changes()
383385
return result
384386

385-
async def run_async(self, rerun: bool = False):
387+
async def run_async(self, rerun: bool = False) -> Result:
386388
"""Prepare the task working directory, execute the task definition asynchronously,
387389
and save the results. NB: only workflows are run asynchronously at the moment.
388390

pydra/engine/helpers.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ def load_and_run(task_pkl: Path, rerun: bool = False) -> Path:
463463

464464
resultfile = task.output_dir / "_result.pklz"
465465
try:
466-
task(rerun=rerun)
466+
task.run(rerun=rerun)
467467
except Exception as e:
468468
# creating result and error files if missing
469469
errorfile = task.output_dir / "_error.pklz"
@@ -479,16 +479,16 @@ def load_and_run(task_pkl: Path, rerun: bool = False) -> Path:
479479
return resultfile
480480

481481

482-
async def load_and_run_async(task_pkl):
483-
"""
484-
loading a task from a pickle file, settings proper input
485-
and running the workflow
486-
"""
487-
task = load_task(task_pkl=task_pkl)
488-
await task()
482+
# async def load_and_run_async(task_pkl):
483+
# """
484+
# loading a task from a pickle file, settings proper input
485+
# and running the workflow
486+
# """
487+
# task = load_task(task_pkl=task_pkl)
488+
# await task()
489489

490490

491-
def load_task(task_pkl):
491+
def load_task(task_pkl: Path | str) -> "Task[DefType]":
492492
"""loading a task from a pickle file, settings proper input for the specific ind"""
493493
if isinstance(task_pkl, str):
494494
task_pkl = Path(task_pkl)

pydra/engine/specs.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -876,6 +876,11 @@ def _resolve_value(
876876

877877
class ShellDef(TaskDef[ShellOutputsType]):
878878

879+
arguments: ty.List[str] = shell.arg(
880+
default=attrs.Factory(list),
881+
help="Additional arguments to pass to the command.",
882+
)
883+
879884
RESERVED_FIELD_NAMES = TaskDef.RESERVED_FIELD_NAMES + ("cmdline",)
880885

881886
def _run(self, task: "Task[ShellDef]") -> None:
@@ -952,7 +957,9 @@ def _command_args(
952957
# Sort command and arguments by position
953958
cmd_args = position_sort(pos_args)
954959
# pos_args values are each a list of arguments, so concatenate lists after sorting
955-
return sum(cmd_args, [])
960+
command_args = sum(cmd_args, [])
961+
command_args += self.arguments
962+
return command_args
956963

957964
def _command_shelltask_executable(
958965
self, field: shell.arg, value: ty.Any

pydra/engine/submitter.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from copy import copy
1010
from collections import defaultdict
1111
from .workers import Worker, WORKERS
12-
from .core import is_workflow
1312
from .graph import DiGraph
1413
from .helpers import (
1514
get_open_loop,
@@ -153,10 +152,10 @@ def Split(defn: TaskDef) -> tuple:
153152
"Use the `split` method to split the task before combining."
154153
)
155154
task = Task(task_def, submitter=self, name="task", environment=self.environment)
156-
if task.is_async:
157-
self.loop.run_until_complete(task.run_async(rerun=self.rerun))
155+
if task.is_async: # Only workflow tasks can be async
156+
self.loop.run_until_complete(self.worker.run_async(task, rerun=self.rerun))
158157
else:
159-
task.run(rerun=self.rerun)
158+
self.worker.run(rerun=self.rerun)
160159
PersistentCache().clean_up()
161160
result = task.result()
162161
if result is None:
@@ -285,8 +284,8 @@ async def expand_workflow_async(self, workflow_task: "Task[WorkflowDef]") -> Non
285284
)
286285
raise RuntimeError(msg)
287286
for task in tasks:
288-
if is_workflow(task):
289-
await task.run_async(rerun=self.rerun)
287+
if task.is_async:
288+
await self.worker.run_async(task, rerun=self.rerun)
290289
else:
291290
task_futures.add(self.worker.run(task, rerun=self.rerun))
292291
task_futures = await self.worker.fetch_finished(task_futures)

0 commit comments

Comments
 (0)