Skip to content

Commit 8641f1a

Browse files
authored
Address race condition in htex priority test (#3998)
This race condition is in the test, not in the main Parsl codebase, and is that the test was not waiting for tasks to all arrive in the interchange before launching a worker. This was happening fast enough most of the time, but under some extreme race condition testing, this was breaking the test - I'm working towards that testing being automated in CI, so this is not a theoretical concern. If the tasks are not all in the interchange when a worker connects, they won't necessarily be run in priority order: the highest priority task of the subset of tasks that has already arrived will be run first, even if it is not the highest priority task of the entire task set. # Changed Behaviour this is a test behaviour change, should not be user-exposed ## Type of change - Bug fix
1 parent 9768530 commit 8641f1a

File tree

1 file changed

+18
-1
lines changed

1 file changed

+18
-1
lines changed

parsl/tests/test_htex/test_priority_queue.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def fake_task(parsl_resource_specification=None):
1818

1919

2020
@pytest.mark.local
21-
def test_priority_queue():
21+
def test_priority_queue(try_assert):
2222
provider = LocalProvider(
2323
init_blocks=0,
2424
max_blocks=0,
@@ -30,6 +30,7 @@ def test_priority_queue():
3030
max_workers_per_node=1,
3131
manager_selector=RandomManagerSelector(),
3232
provider=provider,
33+
worker_debug=True, # needed to instrospect interchange logs
3334
)
3435

3536
config = Config(
@@ -50,6 +51,22 @@ def test_priority_queue():
5051
spec = {'priority': priority}
5152
futures[(priority, i)] = fake_task(parsl_resource_specification=spec)
5253

54+
# wait for the interchange to have received all tasks
55+
# (which happens asynchronously to the main thread, and is otherwise
56+
# a race condition which can cause this test to fail)
57+
58+
n = len(priorities)
59+
60+
def interchange_logs_task_count():
61+
with open(htex.worker_logdir + "/interchange.log", "r") as f:
62+
lines = f.readlines()
63+
for line in lines:
64+
if f"Fetched {n} tasks so far" in line:
65+
return True
66+
return False
67+
68+
try_assert(interchange_logs_task_count)
69+
5370
provider.max_blocks = 1
5471
htex.scale_out_facade(1) # don't wait for the JSP to catch up
5572

0 commit comments

Comments
 (0)