|
1 | 1 | from dateutil import parser
|
2 | 2 | import re
|
3 |
| -import shutil |
4 |
| -import attrs |
5 | 3 | import subprocess as sp
|
6 | 4 | import time
|
7 | 5 |
|
@@ -579,33 +577,41 @@ def test_sge_no_limit_maxthreads(tmpdir):
|
579 | 577 | assert job_1_endtime > job_2_starttime
|
580 | 578 |
|
581 | 579 |
|
582 |
| -@mark.task |
583 |
| -def add_together(x, y): |
584 |
| - return x + y |
585 |
| - |
586 |
| - |
587 | 580 | def test_wf_with_blocked_tasks(tmpdir):
|
588 |
| - dummy_task = add_together(name="dummy", x=1, y=2) |
589 |
| - lf = LazyField(node=dummy_task, attr_type="output") |
590 |
| - lf.field = "out" |
591 | 581 | wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x"])
|
592 |
| - wf.add(sleep_add_one(name="taska", x=wf.lzin.x)) |
593 |
| - wf.add(sleep_add_one(name="taskb", x=lf)) # wf.taska.lzout.out)) |
594 |
| - # wf.add(sleep_add_one(name="taskc", x=wf.taska.lzout.out)) |
595 |
| - # wf.add(add_together(name="taskd", x=wf.taskb.lzout.out, y=wf.taskc.lzout.out)) |
| 582 | + wf.add(identity(name="taska", x=wf.lzin.x)) |
| 583 | + wf.add(alter_input(name="taskb", x=wf.taska.lzout.out)) |
| 584 | + wf.add(to_tuple(name="taskc", x=wf.taska.lzout.out, y=wf.taskb.lzout.out)) |
596 | 585 | wf.set_output([("out", wf.taskb.lzout.out)])
|
597 | 586 |
|
598 |
| - wf.inputs.x = 1 |
| 587 | + wf.inputs.x = A(1) |
599 | 588 |
|
600 | 589 | wf.cache_dir = tmpdir
|
601 | 590 |
|
602 |
| - with Submitter("serial") as sub: |
603 |
| - sub(wf) |
| 591 | + with pytest.raises(Exception, match="graph is not empty,"): |
| 592 | + with Submitter("serial") as sub: |
| 593 | + sub(wf) |
| 594 | + |
| 595 | + |
| 596 | +class A: |
| 597 | + def __init__(self, a): |
| 598 | + self.a = a |
| 599 | + |
| 600 | + def __hash__(self): |
| 601 | + return hash(self.a) |
604 | 602 |
|
605 |
| - # with pytest.raises(Exception) as exc: |
606 |
| - # wf_graph = wf.graph.copy() |
607 |
| - # runnable_tasks, _ = get_runnable_tasks(wf_graph) |
608 |
| - # assert runnable_tasks == [] |
609 |
| - # blocked = _list_blocked_tasks(wf_graph) |
610 | 603 |
|
611 |
| - # assert "graph is not empty" in str(exc) |
| 604 | +@mark.task |
| 605 | +def identity(x): |
| 606 | + return x |
| 607 | + |
| 608 | + |
| 609 | +@mark.task |
| 610 | +def alter_input(x): |
| 611 | + x.a = 2 |
| 612 | + return x |
| 613 | + |
| 614 | + |
| 615 | +@mark.task |
| 616 | +def to_tuple(x, y): |
| 617 | + return (x, y) |
0 commit comments