Skip to content

Commit 374cdec

Browse files
authored
Merge pull request #623 from tclose/list-blocked-tasks-bug
FIX: Type error in blocking task list, detailed exception message
2 parents b5fe4c0 + c8a2b96 commit 374cdec

File tree

2 files changed

+56
-7
lines changed

2 files changed

+56
-7
lines changed

pydra/engine/submitter.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,22 @@ async def expand_workflow(self, wf, rerun=False):
169169
await asyncio.sleep(1)
170170
if ii > 60:
171171
blocked = _list_blocked_tasks(graph_copy)
172-
get_runnable_tasks(graph_copy)
172+
# get_runnable_tasks(graph_copy) # Uncomment to debug `get_runnable_tasks`
173173
raise Exception(
174174
"graph is not empty, but not able to get more tasks "
175175
"- something may have gone wrong when retrieving the results "
176-
"of predecessor tasks caused by a file-system error or a bug "
177-
"in the internal workflow logic.\n\nBlocked tasks\n-------------\n"
178-
+ "\n".join(blocked)
176+
"of predecessor tasks. This could be caused by a file-system "
177+
"error or a bug in the internal workflow logic, but is likely "
178+
"to be caused by the hash of an upstream node being unstable."
179+
" \n\nHash instability can be caused by an input of the node being "
180+
"modified in place, or by psuedo-random ordering of `set` or "
181+
"`frozenset` inputs (or nested attributes of inputs) in the hash "
182+
"calculation. To ensure that sets are hashed consistently you can "
183+
"you can try set the environment variable PYTHONHASHSEED=0 for "
184+
"all processes, but it is best to try to identify where the set "
185+
"objects are occurring and manually hash their sorted elements. "
186+
"(or use list objects instead)"
187+
"\n\nBlocked tasks\n-------------\n" + "\n".join(blocked)
179188
)
180189
for task in tasks:
181190
# grab inputs if needed
@@ -307,7 +316,7 @@ def _list_blocked_tasks(graph):
307316
matching_name.append(
308317
f"{saved_tsk.name} ({tsk_work_dir.name})"
309318
)
310-
blocking.append(pred, ", ".join(matching_name))
319+
blocking.append((pred, ", ".join(matching_name)))
311320
if blocking:
312321
blocked.append(
313322
f"\n{tsk.name} ({tsk.checksum}) is blocked by "

pydra/engine/tests/test_submitter.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from dateutil import parser
22
import re
3-
import shutil
43
import subprocess as sp
54
import time
65

@@ -14,10 +13,11 @@
1413
gen_basic_wf_with_threadcount_concurrent,
1514
)
1615
from ..core import Workflow
17-
from ..submitter import Submitter
16+
from ..submitter import Submitter, get_runnable_tasks, _list_blocked_tasks
1817
from ... import mark
1918
from pathlib import Path
2019
from datetime import datetime
20+
from pydra.engine.specs import LazyField
2121

2222

2323
@mark.task
@@ -575,3 +575,43 @@ def test_sge_no_limit_maxthreads(tmpdir):
575575
out_job2_dict["start_time"][0], f"%a %b %d %H:%M:%S %Y"
576576
)
577577
assert job_1_endtime > job_2_starttime
578+
579+
580+
def test_wf_with_blocked_tasks(tmpdir):
581+
wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x"])
582+
wf.add(identity(name="taska", x=wf.lzin.x))
583+
wf.add(alter_input(name="taskb", x=wf.taska.lzout.out))
584+
wf.add(to_tuple(name="taskc", x=wf.taska.lzout.out, y=wf.taskb.lzout.out))
585+
wf.set_output([("out", wf.taskb.lzout.out)])
586+
587+
wf.inputs.x = A(1)
588+
589+
wf.cache_dir = tmpdir
590+
591+
with pytest.raises(Exception, match="graph is not empty,"):
592+
with Submitter("serial") as sub:
593+
sub(wf)
594+
595+
596+
class A:
597+
def __init__(self, a):
598+
self.a = a
599+
600+
def __hash__(self):
601+
return hash(self.a)
602+
603+
604+
@mark.task
605+
def identity(x):
606+
return x
607+
608+
609+
@mark.task
610+
def alter_input(x):
611+
x.a = 2
612+
return x
613+
614+
615+
@mark.task
616+
def to_tuple(x, y):
617+
return (x, y)

0 commit comments

Comments
 (0)