Skip to content

Commit 0964f5a

Browse files
committed
Prototype htex result queue shutdown
Before this PR, this thread stays running forever, until it is terminated by Python process exit. After this PR, this thread polls an flag once per second, then properly closes the ZMQ socket that it is using. pytest parsl/tests/test_monitoring/ --config local Before this PR, at end of test: 32 threads, 451 fds open. After this PR, at end of test: 1 thread, 48 fds open.
1 parent b857453 commit 0964f5a

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,9 @@ def __init__(self,
331331
interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD
332332
self.interchange_launch_cmd = interchange_launch_cmd
333333

334+
self._result_queue_thread_exit = threading.Event()
335+
self._result_queue_thread: Optional[threading.Thread] = None
336+
334337
radio_mode = "htex"
335338
enable_mpi_mode: bool = False
336339
mpi_launcher: str = "mpiexec"
@@ -455,9 +458,11 @@ def _result_queue_worker(self):
455458
"""
456459
logger.debug("Result queue worker starting")
457460

458-
while not self.bad_state_is_set:
461+
while not self.bad_state_is_set and not self._result_queue_thread_exit.is_set():
459462
try:
460-
msgs = self.incoming_q.get()
463+
msgs = self.incoming_q.get(timeout_ms=1000)
464+
if msgs is None: # timeout
465+
continue
461466

462467
except IOError as e:
463468
logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e))
@@ -492,7 +497,6 @@ def _result_queue_worker(self):
492497
if 'result' in msg:
493498
result = deserialize(msg['result'])
494499
task_fut.set_result(result)
495-
496500
elif 'exception' in msg:
497501
try:
498502
s = deserialize(msg['exception'])
@@ -515,6 +519,8 @@ def _result_queue_worker(self):
515519
else:
516520
raise BadMessage("Message received with unknown type {}".format(msg['type']))
517521

522+
logger.info("Closing result ZMQ pipe")
523+
self.incoming_q.close()
518524
logger.info("Result queue worker finished")
519525

520526
def _start_local_interchange_process(self) -> None:
@@ -817,6 +823,8 @@ def shutdown(self, timeout: float = 10.0):
817823

818824
logger.info("Attempting HighThroughputExecutor shutdown")
819825

826+
logger.info("Terminating interchange and result queue thread")
827+
self._result_queue_thread_exit.set()
820828
self.interchange_proc.terminate()
821829
try:
822830
self.interchange_proc.wait(timeout=timeout)
@@ -841,6 +849,10 @@ def shutdown(self, timeout: float = 10.0):
841849
logger.info("Closing command client")
842850
self.command_client.close()
843851

852+
logger.info("Waiting for result queue thread exit")
853+
if self._result_queue_thread:
854+
self._result_queue_thread.join()
855+
844856
logger.info("Finished HighThroughputExecutor shutdown attempt")
845857

846858
def get_usage_information(self):

parsl/executors/high_throughput/zmq_pipes.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,21 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
206206
self.port = self.results_receiver.bind_to_random_port(tcp_url(ip_address),
207207
min_port=port_range[0],
208208
max_port=port_range[1])
209+
self.poller = zmq.Poller()
210+
self.poller.register(self.results_receiver, zmq.POLLIN)
209211

210-
def get(self):
212+
def get(self, timeout_ms=None):
213+
"""Get a message from the queue, returning None if timeout expires
214+
without a message. timeout is measured in milliseconds.
215+
"""
211216
logger.debug("Waiting for ResultsIncoming message")
212-
m = self.results_receiver.recv_multipart()
213-
logger.debug("Received ResultsIncoming message")
214-
return m
217+
socks = dict(self.poller.poll(timeout=timeout_ms))
218+
if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN:
219+
m = self.results_receiver.recv_multipart()
220+
logger.debug("Received ResultsIncoming message")
221+
return m
222+
else:
223+
return None
215224

216225
def close(self):
217226
self.results_receiver.close()

0 commit comments

Comments
 (0)