Skip to content

Commit 107c9e5

Browse files
committed
reverted get_runnable_tasks and added guard against adding task futures multiple times
1 parent 5360c73 commit 107c9e5

File tree

3 files changed

+60
-46
lines changed

3 files changed

+60
-46
lines changed

pydra/engine/submitter.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ async def expand_workflow_async(
341341
workflow_task.return_values = {"workflow": wf, "exec_graph": exec_graph}
342342
# keep track of pending futures
343343
task_futures = set()
344+
futured = set()
344345
tasks = self.get_runnable_tasks(exec_graph)
345346
while tasks or task_futures or any(not n.done for n in exec_graph.nodes):
346347
if not tasks and not task_futures:
@@ -421,10 +422,11 @@ async def expand_workflow_async(
421422
await self.worker.run_async(
422423
task, rerun=rerun and self.propagate_rerun
423424
)
424-
else:
425+
elif task.checksum not in futured:
425426
task_futures.add(
426427
self.worker.run(task, rerun=rerun and self.propagate_rerun)
427428
)
429+
futured.add(task.checksum)
428430
task_futures = await self.worker.fetch_finished(task_futures)
429431
tasks = self.get_runnable_tasks(exec_graph)
430432

@@ -818,7 +820,8 @@ def get_runnable_tasks(self, graph: DiGraph) -> list["Task[DefType]"]:
818820
break
819821
if is_runnable:
820822
runnable.append(self.blocked.pop(index))
821-
return runnable
823+
self.queued.update({t.state_index: t for t in runnable})
824+
return list(self.queued.values())
822825

823826

824827
async def prepare_runnable(runnable):

pydra/engine/tests/test_specs.py

Lines changed: 36 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,15 @@
1616
from pydra.engine.node import Node
1717
from pydra.engine.submitter import Submitter, NodeExecution, DiGraph
1818
from pydra.design import python, workflow
19-
from .utils import Foo, FunAddTwo, FunAddVar, ListSum
19+
from .utils import (
20+
Foo,
21+
FunAddTwo,
22+
FunAddVar,
23+
ListSum,
24+
FileOrIntIdentity,
25+
ListOfListOfFileOrIntIdentity,
26+
ListOfDictOfFileOrIntIdentity,
27+
)
2028

2129

2230
@workflow.define
@@ -95,16 +103,16 @@ def test_input_file_hash_1(tmp_path):
95103
outfile = tmp_path / "test.file"
96104
outfile.touch()
97105

98-
@python.define
99-
def A(in_file: File) -> File:
100-
return in_file
101-
102-
assert A(in_file=outfile)._hash == "e708da65b720212c5ce9ed2c65aae59c"
106+
assert (
107+
FileOrIntIdentity(in_file=outfile)._hash == "e708da65b720212c5ce9ed2c65aae59c"
108+
)
103109

104110
with open(outfile, "w") as fp:
105111
fp.write("test")
106112

107-
assert A(in_file=outfile)._hash == "f726a193430352bb3b92dccf5eccff3a"
113+
assert (
114+
FileOrIntIdentity(in_file=outfile)._hash == "f726a193430352bb3b92dccf5eccff3a"
115+
)
108116

109117

110118
def test_input_file_hash_2(tmp_path):
@@ -113,26 +121,22 @@ def test_input_file_hash_2(tmp_path):
113121
with open(file, "w") as f:
114122
f.write("hello")
115123

116-
@python.define
117-
def A(in_file: File) -> File:
118-
return in_file
119-
120124
# checking specific hash value
121-
hash1 = A(in_file=file)._hash
125+
hash1 = FileOrIntIdentity(in_file=file)._hash
122126
assert hash1 == "eba2fafb8df4bae94a7aa42bb159b778"
123127

124128
# checking if different name doesn't affect the hash
125129
file_diffname = tmp_path / "in_file_2.txt"
126130
with open(file_diffname, "w") as f:
127131
f.write("hello")
128-
hash2 = A(in_file=file_diffname)._hash
132+
hash2 = FileOrIntIdentity(in_file=file_diffname)._hash
129133
assert hash1 == hash2
130134

131135
# checking if different content (the same name) affects the hash
132136
file_diffcontent = tmp_path / "in_file_1.txt"
133137
with open(file_diffcontent, "w") as f:
134138
f.write("hi")
135-
hash3 = A(in_file=file_diffcontent)._hash
139+
hash3 = FileOrIntIdentity(in_file=file_diffcontent)._hash
136140
assert hash1 != hash3
137141

138142

@@ -142,30 +146,26 @@ def test_input_file_hash_2a(tmp_path):
142146
with open(file, "w") as f:
143147
f.write("hello")
144148

145-
@python.define
146-
def A(in_file: ty.Union[File, int]) -> File:
147-
return in_file
148-
149149
# checking specific hash value
150-
hash1 = A(in_file=file)._hash
150+
hash1 = FileOrIntIdentity(in_file=file)._hash
151151
assert hash1 == "eba2fafb8df4bae94a7aa42bb159b778"
152152

153153
# checking if different name doesn't affect the hash
154154
file_diffname = tmp_path / "in_file_2.txt"
155155
with open(file_diffname, "w") as f:
156156
f.write("hello")
157-
hash2 = A(in_file=file_diffname)._hash
157+
hash2 = FileOrIntIdentity(in_file=file_diffname)._hash
158158
assert hash1 == hash2
159159

160160
# checking if string is also accepted
161-
hash3 = A(in_file=str(file))._hash
161+
hash3 = FileOrIntIdentity(in_file=str(file))._hash
162162
assert hash3 == hash1
163163

164164
# checking if different content (the same name) affects the hash
165165
file_diffcontent = tmp_path / "in_file_1.txt"
166166
with open(file_diffcontent, "w") as f:
167167
f.write("hi")
168-
hash4 = A(in_file=file_diffcontent)._hash
168+
hash4 = FileOrIntIdentity(in_file=file_diffcontent)._hash
169169
assert hash1 != hash4
170170

171171

@@ -175,16 +175,12 @@ def test_input_file_hash_3(tmp_path):
175175
with open(file, "w") as f:
176176
f.write("hello")
177177

178-
@python.define
179-
def A(in_file: File, in_int: int) -> File:
180-
return in_file, in_int
181-
182-
a = A(in_file=file, in_int=3)
178+
a = FileOrIntIdentity(in_file=file, in_int=3)
183179
# original hash and files_hash (dictionary contains info about files)
184180
hash1 = a._hash
185181
# files_hash1 = deepcopy(my_inp.files_hash)
186182
# file name should be in files_hash1[in_file]
187-
filename = str(Path(file))
183+
# filename = str(Path(file))
188184
# assert filename in files_hash1["in_file"]
189185

190186
# changing int input
@@ -230,31 +226,27 @@ def test_input_file_hash_4(tmp_path):
230226
with open(file, "w") as f:
231227
f.write("hello")
232228

233-
@python.define
234-
def A(in_file: ty.List[ty.List[ty.Union[int, File]]]) -> File:
235-
return in_file
236-
237229
# checking specific hash value
238-
hash1 = A(in_file=[[file, 3]])._hash
230+
hash1 = ListOfListOfFileOrIntIdentity(in_file=[[file, 3]])._hash
239231
assert hash1 == "b583e0fd5501d3bed9bf510ce2a9e379"
240232

241233
# the same file, but int field changes
242-
hash1a = A(in_file=[[file, 5]])._hash
234+
hash1a = ListOfListOfFileOrIntIdentity(in_file=[[file, 5]])._hash
243235
assert hash1 != hash1a
244236

245237
# checking if different name doesn't affect the hash
246238
file_diffname = tmp_path / "in_file_2.txt"
247239
with open(file_diffname, "w") as f:
248240
f.write("hello")
249-
hash2 = A(in_file=[[file_diffname, 3]])._hash
241+
hash2 = ListOfListOfFileOrIntIdentity(in_file=[[file_diffname, 3]])._hash
250242
assert hash1 == hash2
251243

252244
# checking if different content (the same name) affects the hash
253245
time.sleep(2) # need the mtime to be different
254246
file_diffcontent = tmp_path / "in_file_1.txt"
255247
with open(file_diffcontent, "w") as f:
256248
f.write("hi")
257-
hash3 = A(in_file=[[file_diffcontent, 3]])._hash
249+
hash3 = ListOfListOfFileOrIntIdentity(in_file=[[file_diffcontent, 3]])._hash
258250
assert hash1 != hash3
259251

260252

@@ -264,31 +256,31 @@ def test_input_file_hash_5(tmp_path):
264256
with open(file, "w") as f:
265257
f.write("hello")
266258

267-
@python.define
268-
def A(in_file: ty.List[ty.Dict[ty.Any, ty.Union[File, int]]]) -> File:
269-
return in_file
270-
271259
# checking specific hash value
272-
hash1 = A(in_file=[{"file": file, "int": 3}])._hash
260+
hash1 = ListOfDictOfFileOrIntIdentity(in_file=[{"file": file, "int": 3}])._hash
273261
assert hash1 == "aa2d4b708ed0dd8340582a6514bfd5ce"
274262

275263
# the same file, but int field changes
276-
hash1a = A(in_file=[{"file": file, "int": 5}])._hash
264+
hash1a = ListOfDictOfFileOrIntIdentity(in_file=[{"file": file, "int": 5}])._hash
277265
assert hash1 != hash1a
278266

279267
# checking if different name doesn't affect the hash
280268
file_diffname = tmp_path / "in_file_2.txt"
281269
with open(file_diffname, "w") as f:
282270
f.write("hello")
283-
hash2 = A(in_file=[{"file": file_diffname, "int": 3}])._hash
271+
hash2 = ListOfDictOfFileOrIntIdentity(
272+
in_file=[{"file": file_diffname, "int": 3}]
273+
)._hash
284274
assert hash1 == hash2
285275

286276
# checking if different content (the same name) affects the hash
287277
time.sleep(2) # ensure mtime is different
288278
file_diffcontent = tmp_path / "in_file_1.txt"
289279
with open(file_diffcontent, "w") as f:
290280
f.write("hi")
291-
hash3 = A(in_file=[{"file": file_diffcontent, "int": 3}])._hash
281+
hash3 = ListOfDictOfFileOrIntIdentity(
282+
in_file=[{"file": file_diffcontent, "int": 3}]
283+
)._hash
292284
assert hash1 != hash3
293285

294286

pydra/engine/tests/utils.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,25 @@ def Add2(x: int) -> int:
191191
return x + 2
192192

193193

194+
@python.define
195+
def FileOrIntIdentity(in_file: ty.Union[File, int]) -> File:
196+
return in_file
197+
198+
199+
@python.define
200+
def ListOfListOfFileOrIntIdentity(
201+
in_file: ty.List[ty.List[ty.Union[int, File]]],
202+
) -> ty.List[ty.List[ty.Union[int, File]]]:
203+
return in_file
204+
205+
206+
@python.define
207+
def ListOfDictOfFileOrIntIdentity(
208+
in_file: ty.List[ty.Dict[ty.Any, ty.Union[File, int]]],
209+
) -> ty.List[ty.Dict[ty.Any, ty.Union[File, int]]]:
210+
return in_file
211+
212+
194213
@python.define
195214
def RaiseXeq1(x: int) -> int:
196215
if x == 1:

0 commit comments

Comments
 (0)