|
1 | 1 | from dateutil import parser
|
2 | 2 | import re
|
3 | 3 | import shutil
|
| 4 | +import attrs |
4 | 5 | import subprocess as sp
|
5 | 6 | import time
|
6 | 7 |
|
|
14 | 15 | gen_basic_wf_with_threadcount_concurrent,
|
15 | 16 | )
|
16 | 17 | from ..core import Workflow
|
17 |
| -from ..submitter import Submitter |
| 18 | +from ..submitter import Submitter, get_runnable_tasks, _list_blocked_tasks |
18 | 19 | from ... import mark
|
19 | 20 | from pathlib import Path
|
20 | 21 | from datetime import datetime
|
| 22 | +from pydra.engine.specs import LazyField |
21 | 23 |
|
22 | 24 |
|
23 | 25 | @mark.task
|
@@ -575,3 +577,35 @@ def test_sge_no_limit_maxthreads(tmpdir):
|
575 | 577 | out_job2_dict["start_time"][0], f"%a %b %d %H:%M:%S %Y"
|
576 | 578 | )
|
577 | 579 | assert job_1_endtime > job_2_starttime
|
| 580 | + |
| 581 | + |
| 582 | +@mark.task |
| 583 | +def add_together(x, y): |
| 584 | + return x + y |
| 585 | + |
| 586 | + |
| 587 | +def test_wf_with_blocked_tasks(tmpdir): |
| 588 | + dummy_task = add_together(name="dummy", x=1, y=2) |
| 589 | + lf = LazyField(node=dummy_task, attr_type="output") |
| 590 | + lf.field = "out" |
| 591 | + wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x"]) |
| 592 | + wf.add(sleep_add_one(name="taska", x=wf.lzin.x)) |
| 593 | + wf.add(sleep_add_one(name="taskb", x=lf)) # wf.taska.lzout.out)) |
| 594 | + # wf.add(sleep_add_one(name="taskc", x=wf.taska.lzout.out)) |
| 595 | + # wf.add(add_together(name="taskd", x=wf.taskb.lzout.out, y=wf.taskc.lzout.out)) |
| 596 | + wf.set_output([("out", wf.taskb.lzout.out)]) |
| 597 | + |
| 598 | + wf.inputs.x = 1 |
| 599 | + |
| 600 | + wf.cache_dir = tmpdir |
| 601 | + |
| 602 | + with Submitter("serial") as sub: |
| 603 | + sub(wf) |
| 604 | + |
| 605 | + # with pytest.raises(Exception) as exc: |
| 606 | + # wf_graph = wf.graph.copy() |
| 607 | + # runnable_tasks, _ = get_runnable_tasks(wf_graph) |
| 608 | + # assert runnable_tasks == [] |
| 609 | + # blocked = _list_blocked_tasks(wf_graph) |
| 610 | + |
| 611 | + # assert "graph is not empty" in str(exc) |
0 commit comments