Skip to content

Commit d755c09

Browse files
committed
Prototype htex result queue shutdown
Before this PR, this thread stays running forever, until it is terminated by Python process exit. After this PR, this thread polls an flag once per second, then properly closes the ZMQ socket that it is using. pytest parsl/tests/test_monitoring/ --config local Before this PR, at end of test: 32 threads, 451 fds open. After this PR, at end of test: 1 thread, 48 fds open.
1 parent 7421628 commit d755c09

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,9 @@ def __init__(self,
317317
interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD
318318
self.interchange_launch_cmd = interchange_launch_cmd
319319

320+
self._result_queue_thread_exit = threading.Event()
321+
self._result_queue_thread: Optional[threading.Thread] = None
322+
320323
radio_mode = "htex"
321324
enable_mpi_mode: bool = False
322325
mpi_launcher: str = "mpiexec"
@@ -439,9 +442,11 @@ def _result_queue_worker(self):
439442
"""
440443
logger.debug("Result queue worker starting")
441444

442-
while not self.bad_state_is_set:
445+
while not self.bad_state_is_set and not self._result_queue_thread_exit.is_set():
443446
try:
444-
msgs = self.incoming_q.get()
447+
msgs = self.incoming_q.get(timeout_ms=1000)
448+
if msgs is None: # timeout
449+
continue
445450

446451
except IOError as e:
447452
logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e))
@@ -478,7 +483,6 @@ def _result_queue_worker(self):
478483
if 'result' in msg:
479484
result = deserialize(msg['result'])
480485
task_fut.set_result(result)
481-
482486
elif 'exception' in msg:
483487
try:
484488
s = deserialize(msg['exception'])
@@ -501,6 +505,8 @@ def _result_queue_worker(self):
501505
else:
502506
raise BadMessage("Message received with unknown type {}".format(msg['type']))
503507

508+
logger.info("Closing result ZMQ pipe")
509+
self.incoming_q.close()
504510
logger.info("Result queue worker finished")
505511

506512
def _start_local_interchange_process(self) -> None:
@@ -803,6 +809,8 @@ def shutdown(self, timeout: float = 10.0):
803809

804810
logger.info("Attempting HighThroughputExecutor shutdown")
805811

812+
logger.info("Terminating interchange and result queue thread")
813+
self._result_queue_thread_exit.set()
806814
self.interchange_proc.terminate()
807815
try:
808816
self.interchange_proc.wait(timeout=timeout)
@@ -827,6 +835,10 @@ def shutdown(self, timeout: float = 10.0):
827835
logger.info("Closing command client")
828836
self.command_client.close()
829837

838+
logger.info("Waiting for result queue thread exit")
839+
if self._result_queue_thread:
840+
self._result_queue_thread.join()
841+
830842
logger.info("Finished HighThroughputExecutor shutdown attempt")
831843

832844
def get_usage_information(self):

parsl/executors/high_throughput/zmq_pipes.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,21 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
205205
self.port = self.results_receiver.bind_to_random_port("tcp://{}".format(ip_address),
206206
min_port=port_range[0],
207207
max_port=port_range[1])
208+
self.poller = zmq.Poller()
209+
self.poller.register(self.results_receiver, zmq.POLLIN)
208210

209-
def get(self):
211+
def get(self, timeout_ms=None):
212+
"""Get a message from the queue, returning None if timeout expires
213+
without a message. timeout is measured in milliseconds.
214+
"""
210215
logger.debug("Waiting for ResultsIncoming message")
211-
m = self.results_receiver.recv_multipart()
212-
logger.debug("Received ResultsIncoming message")
213-
return m
216+
socks = dict(self.poller.poll(timeout=timeout_ms))
217+
if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN:
218+
m = self.results_receiver.recv_multipart()
219+
logger.debug("Received ResultsIncoming message")
220+
return m
221+
else:
222+
return None
214223

215224
def close(self):
216225
self.results_receiver.close()

0 commit comments

Comments
 (0)