Skip to content

Commit 8345dcb

Browse files
committed
Terminate htex result queue thread by polling for an exit flag
Before this PR, this thread stays running forever this *requires* socket to be closed at exit -- and this PR introduces code to do that context: see recent flux PR for same problem. because stopping this thread is now allowing garbage collection to happen, it looks like? or something similar... see PR #3517 for the same problem in Flux counts: before this PR, on parsl/tests/test_monitoring/ 451 fds, 32 threads after this PR, 48 fds, 1 thread
1 parent 4366951 commit 8345dcb

File tree

3 files changed

+44
-7
lines changed

3 files changed

+44
-7
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,9 @@ def __init__(self,
347347
interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD
348348
self.interchange_launch_cmd = interchange_launch_cmd
349349

350+
self._result_queue_thread_exit = threading.Event()
351+
self._result_queue_thread: Optional[threading.Thread] = None
352+
350353
radio_mode = "htex"
351354

352355
def _warn_deprecated(self, old: str, new: str):
@@ -465,9 +468,11 @@ def _result_queue_worker(self):
465468
"""
466469
logger.debug("Result queue worker starting")
467470

468-
while not self.bad_state_is_set:
471+
while not self.bad_state_is_set and not self._result_queue_thread_exit.is_set():
469472
try:
470-
msgs = self.incoming_q.get()
473+
msgs = self.incoming_q.get(timeout_ms=1000)
474+
if msgs is None: # timeout
475+
continue
471476

472477
except IOError as e:
473478
logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e))
@@ -504,7 +509,6 @@ def _result_queue_worker(self):
504509
if 'result' in msg:
505510
result = deserialize(msg['result'])
506511
task_fut.set_result(result)
507-
508512
elif 'exception' in msg:
509513
try:
510514
s = deserialize(msg['exception'])
@@ -527,6 +531,8 @@ def _result_queue_worker(self):
527531
else:
528532
raise BadMessage("Message received with unknown type {}".format(msg['type']))
529533

534+
logger.info("Closing result ZMQ pipe")
535+
self.incoming_q.close()
530536
logger.info("Result queue worker finished")
531537

532538
def _start_local_interchange_process(self) -> None:
@@ -827,6 +833,8 @@ def shutdown(self, timeout: float = 10.0):
827833

828834
logger.info("Attempting HighThroughputExecutor shutdown")
829835

836+
logger.info("Terminating interchange and result queue thread")
837+
self._result_queue_thread_exit.set()
830838
self.interchange_proc.terminate()
831839
try:
832840
self.interchange_proc.wait(timeout=timeout)
@@ -851,6 +859,10 @@ def shutdown(self, timeout: float = 10.0):
851859
logger.info("Closing command client")
852860
self.command_client.close()
853861

862+
logger.info("Waiting for result queue thread exit")
863+
if self._result_queue_thread:
864+
self._result_queue_thread.join()
865+
854866
logger.info("Finished HighThroughputExecutor shutdown attempt")
855867

856868
def get_usage_information(self):

parsl/executors/high_throughput/zmq_pipes.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,21 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
205205
self.port = self.results_receiver.bind_to_random_port("tcp://{}".format(ip_address),
206206
min_port=port_range[0],
207207
max_port=port_range[1])
208+
self.poller = zmq.Poller()
209+
self.poller.register(self.results_receiver, zmq.POLLIN)
208210

209-
def get(self):
211+
def get(self, timeout_ms=None):
212+
"""Get a message from the queue, returning None if timeout expires
213+
without a message. timeout is measured in milliseconds.
214+
"""
210215
logger.debug("Waiting for ResultsIncoming message")
211-
m = self.results_receiver.recv_multipart()
212-
logger.debug("Received ResultsIncoming message")
213-
return m
216+
socks = dict(self.poller.poll(timeout=timeout_ms))
217+
if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN:
218+
m = self.results_receiver.recv_multipart()
219+
logger.debug("Received ResultsIncoming message")
220+
return m
221+
else:
222+
return None
214223

215224
def close(self):
216225
self.results_receiver.close()

parsl/tests/test_htex/test_disconnected_blocks.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
from parsl.jobs.states import JobState, JobStatus
1010
from parsl.providers import LocalProvider
1111

12+
logger = logging.getLogger(__name__)
13+
1214

1315
def local_config():
1416
"""Config to simulate failing blocks without connecting"""
@@ -41,25 +43,39 @@ def double(x):
4143
@pytest.mark.local
4244
def test_disconnected_blocks():
4345
"""Test reporting of blocks that fail to connect from HTEX"""
46+
logger.info("BENC: 1")
4447
dfk = parsl.dfk()
48+
logger.info("BENC: 2")
4549
executor = dfk.executors["HTEX"]
50+
logger.info("BENC: 3")
4651

4752
connected_blocks = executor.connected_blocks()
53+
logger.info("BENC: 4")
4854
assert not connected_blocks, "Expected 0 blocks"
55+
logger.info("BENC: 5")
4956

5057
future = double(5)
58+
logger.info("BENC: 6")
5159
with pytest.raises(BadStateException):
60+
logger.info("BENC: 7")
5261
future.result()
62+
logger.info("BENC: 8")
63+
logger.info("BENC: 9")
5364

5465
assert isinstance(future.exception(), BadStateException)
66+
logger.info("BENC: 10")
5567
exception_body = str(future.exception())
5668
assert "EXIT CODE: 0" in exception_body
69+
logger.info("BENC: 11")
5770

5871
status_dict = executor.status()
72+
logger.info("BENC: 12")
5973
assert len(status_dict) == 1, "Expected only 1 block"
6074
for status in status_dict.values():
6175
assert isinstance(status, JobStatus)
6276
assert status.state == JobState.MISSING
6377

78+
logger.info("BENC: 13")
6479
connected_blocks = executor.connected_blocks()
80+
logger.info("BENC: 14")
6581
assert not connected_blocks, "Expected 0 blocks"

0 commit comments

Comments
 (0)