Skip to content

Commit 0027552

Browse files
authored
Merge pull request #460 from jw-96/deadlock_fix
SoftFileLock asyncio workaround (#444)
2 parents c8b0f08 + 77f9cd8 commit 0027552

File tree

5 files changed

+113
-2
lines changed

5 files changed

+113
-2
lines changed

.zenodo.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@
5555
"name": "Johnson, Charles E.",
5656
"orcid": "0000-0001-7814-3501"
5757
},
58+
{
59+
"affiliation": "FCBG, EPFL",
60+
"name": "Wigger, Jeffrey",
61+
"orcid": "0000-0003-0978-4326"
62+
},
5863
{
5964
"affiliation": "MIT, HMS",
6065
"name": "Ghosh, Satrajit",

pydra/engine/core.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
ensure_list,
3737
record_error,
3838
hash_function,
39+
PydraFileLock,
3940
)
4041
from .helpers_file import copyfile_input, template_update
4142
from .graph import DiGraph
@@ -1007,7 +1008,7 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
10071008
self.create_connections(task)
10081009
lockfile = self.cache_dir / (checksum + ".lock")
10091010
self.hooks.pre_run(self)
1010-
with SoftFileLock(lockfile):
1011+
async with PydraFileLock(lockfile):
10111012
# retrieve cached results
10121013
if not (rerun or self.task_rerun):
10131014
result = self.result()

pydra/engine/helpers.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import attr
55
import cloudpickle as cp
66
from pathlib import Path
7-
from filelock import SoftFileLock
7+
from filelock import SoftFileLock, Timeout
88
import os
99
import sys
1010
from hashlib import sha256
@@ -895,3 +895,29 @@ def argstr_formatting(argstr, inputs, value_updates=None):
895895
.strip()
896896
)
897897
return argstr_formatted
898+
899+
900+
class PydraFileLock:
901+
"""Wrapper for filelock's SoftFileLock that makes it work with asyncio."""
902+
903+
def __init__(self, lockfile):
904+
self.lockfile = lockfile
905+
self.timeout = 0.1
906+
907+
async def __aenter__(self):
908+
lock = SoftFileLock(self.lockfile)
909+
acquired_lock = False
910+
while not acquired_lock:
911+
try:
912+
lock.acquire(timeout=0)
913+
acquired_lock = True
914+
except Timeout:
915+
await asyncio.sleep(self.timeout)
916+
if self.timeout <= 2:
917+
self.timeout = self.timeout * 2
918+
self.lock = lock
919+
return self
920+
921+
async def __aexit__(self, exc_type, exc_value, traceback):
922+
self.lock.release()
923+
return None

pydra/engine/tests/test_workflow.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
)
2828
from ..submitter import Submitter
2929
from ..core import Workflow
30+
from ... import mark
3031

3132

3233
def test_wf_name_conflict1():
@@ -4561,3 +4562,79 @@ def test_graph_5(tmpdir):
45614562
if DOT_FLAG:
45624563
name = f"graph_{sys._getframe().f_code.co_name}"
45634564
exporting_graphs(wf=wf, name=name)
4565+
4566+
4567+
@pytest.mark.timeout(20)
4568+
def test_duplicate_input_on_split_wf(tmpdir):
4569+
""" checking if the workflow gets stuck if it has to run two tasks with equal checksum;
4570+
This can occur when splitting on a list containing duplicate values.
4571+
"""
4572+
text = ["test"] * 2
4573+
4574+
@mark.task
4575+
def printer(a):
4576+
return a
4577+
4578+
wf = Workflow(name="wf", input_spec=["text"], cache_dir=tmpdir)
4579+
wf.split(("text"), text=text)
4580+
4581+
wf.add(printer(name="printer1", a=wf.lzin.text))
4582+
4583+
wf.set_output([("out1", wf.printer1.lzout.out)])
4584+
4585+
with Submitter(plugin="cf", n_procs=6) as sub:
4586+
sub(wf)
4587+
4588+
res = wf.result()
4589+
4590+
assert res[0].output.out1 == "test" and res[1].output.out1 == "test"
4591+
4592+
4593+
@pytest.mark.timeout(40)
4594+
def test_inner_outer_wf_duplicate(tmpdir):
4595+
""" checking if the execution gets stuck if there is an inner and outer workflows
4596+
thar run two nodes with the exact same inputs.
4597+
"""
4598+
task_list = ["First", "Second"]
4599+
start_list = [3]
4600+
4601+
@mark.task
4602+
def one_arg(start_number):
4603+
for k in range(10):
4604+
start_number += 1
4605+
return start_number
4606+
4607+
@mark.task
4608+
def one_arg_inner(start_number):
4609+
for k in range(10):
4610+
start_number += 1
4611+
return start_number
4612+
4613+
# Outer workflow
4614+
test_outer = Workflow(
4615+
name="test_outer", input_spec=["start_number", "task_name"], cache_dir=tmpdir
4616+
)
4617+
# Splitting on both arguments
4618+
test_outer.split(
4619+
["start_number", "task_name"], start_number=start_list, task_name=task_list
4620+
)
4621+
4622+
# Inner Workflow
4623+
test_inner = Workflow(name="test_inner", input_spec=["start_number1"])
4624+
test_inner.add(
4625+
one_arg_inner(name="Ilevel1", start_number=test_inner.lzin.start_number1)
4626+
)
4627+
test_inner.set_output([("res", test_inner.Ilevel1.lzout.out)])
4628+
4629+
# Outer workflow has two nodes plus the inner workflow
4630+
test_outer.add(one_arg(name="level1", start_number=test_outer.lzin.start_number))
4631+
test_outer.add(test_inner)
4632+
test_inner.inputs.start_number1 = test_outer.level1.lzout.out
4633+
4634+
test_outer.set_output([("res2", test_outer.test_inner.lzout.res)])
4635+
4636+
with Submitter(plugin="cf") as sub:
4637+
sub(test_outer)
4638+
4639+
res = test_outer.result()
4640+
assert res[0].output.res2 == 23 and res[1].output.res2 == 23

setup.cfg

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ test_requires =
3535
pytest-env
3636
pytest-xdist < 2.0
3737
pytest-rerunfailures
38+
pytest-timeout
3839
codecov
3940
numpy
4041
psutil
@@ -68,6 +69,7 @@ test =
6869
pytest-env
6970
pytest-xdist < 2.0
7071
pytest-rerunfailures
72+
pytest-timeout
7173
codecov
7274
numpy
7375
pyld

0 commit comments

Comments
 (0)