Skip to content

Commit c0a5986

Browse files
authored
Add regression test for htex command client concurrency fix (#3935)
The concurrency fix is in PR #1321, around 6 years ago. With this long hindsight, the corresponding issue #1146 is a concurrency problem in use of the ZMQ REQ/REP socket used by the HTEX command client; and PR #1321 is a fix for that concurrency. After that realization, it was straightforward to build this concurrency test which shows failure within a couple of seconds when #1321 is removed, on both my laptop and in GitHub CI. I'm adding this test now because I would like to simplify the command client code that this tests in upcoming PRs. ## Type of change - Code maintenance/cleanup
1 parent d6d00f5 commit c0a5986

File tree

1 file changed

+54
-0
lines changed

1 file changed

+54
-0
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import time
2+
from threading import Event, Thread
3+
4+
import pytest
5+
6+
import parsl
7+
from parsl.tests.configs.htex_local import fresh_config as local_config
8+
9+
N_THREADS = 50
10+
DURATION_S = 10
11+
12+
13+
@pytest.mark.local
14+
def test_concurrency_blast():
15+
"""Blast interchange command channel from many threads.
16+
"""
17+
18+
cc = parsl.dfk().executors['htex_local'].command_client
19+
20+
threads = []
21+
22+
ok_so_far = True
23+
24+
for _ in range(N_THREADS):
25+
26+
# This event will be set if the thread reaches the end of its body.
27+
event = Event()
28+
29+
thread = Thread(target=blast, args=(cc, event))
30+
threads.append((thread, event))
31+
32+
for thread, event in threads:
33+
thread.start()
34+
35+
for thread, event in threads:
36+
thread.join()
37+
if not event.is_set():
38+
ok_so_far = False
39+
40+
assert ok_so_far, "at least one thread did not exit normally"
41+
42+
43+
def blast(cc, e):
44+
target_end = time.monotonic() + DURATION_S
45+
46+
while time.monotonic() < target_end:
47+
cc.run("WORKERS")
48+
cc.run("MANGERs_PACKAGES")
49+
cc.run("CONNECTED_BLOCKS")
50+
cc.run("WORKER_BINDS")
51+
52+
# If any of the preceeding cc.run calls raises an exception, the thread
53+
# will not set its successful completion event.
54+
e.set()

0 commit comments

Comments
 (0)