Skip to content

Commit 1f6c646

Browse files
committed
assert on number of threads left running after each test in config local test
1 parent e61027d commit 1f6c646

File tree

4 files changed

+78
-54
lines changed

4 files changed

+78
-54
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,9 @@ def __init__(self,
329329
launch_cmd = DEFAULT_LAUNCH_CMD
330330
self.launch_cmd = launch_cmd
331331

332+
self._queue_management_thread_exit = threading.Event()
333+
self._queue_management_thread: Optional[threading.Thread] = None
334+
332335
radio_mode = "htex"
333336

334337
def _warn_deprecated(self, old: str, new: str):
@@ -450,9 +453,9 @@ def _queue_management_worker(self):
450453
"""
451454
logger.debug("Queue management worker starting")
452455

453-
while not self.bad_state_is_set:
456+
while not self.bad_state_is_set and not self._queue_management_thread_exit.is_set():
454457
try:
455-
msgs = self.incoming_q.get()
458+
msgs = self.incoming_q.get(timeout_ms=1000)
456459

457460
except IOError as e:
458461
logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e))
@@ -465,57 +468,55 @@ def _queue_management_worker(self):
465468
else:
466469

467470
if msgs is None:
468-
logger.debug("Got None, exiting")
469-
return
471+
continue
472+
473+
for serialized_msg in msgs:
474+
try:
475+
msg = pickle.loads(serialized_msg)
476+
except pickle.UnpicklingError:
477+
raise BadMessage("Message received could not be unpickled")
470478

471-
else:
472-
for serialized_msg in msgs:
479+
if msg['type'] == 'heartbeat':
480+
continue
481+
elif msg['type'] == 'result':
473482
try:
474-
msg = pickle.loads(serialized_msg)
475-
except pickle.UnpicklingError:
476-
raise BadMessage("Message received could not be unpickled")
483+
tid = msg['task_id']
484+
except Exception:
485+
raise BadMessage("Message received does not contain 'task_id' field")
486+
487+
if tid == -1 and 'exception' in msg:
488+
logger.warning("Executor shutting down due to exception from interchange")
489+
exception = deserialize(msg['exception'])
490+
self.set_bad_state_and_fail_all(exception)
491+
break
477492

478-
if msg['type'] == 'heartbeat':
479-
continue
480-
elif msg['type'] == 'result':
493+
task_fut = self.tasks.pop(tid)
494+
495+
if 'result' in msg:
496+
result = deserialize(msg['result'])
497+
task_fut.set_result(result)
498+
499+
elif 'exception' in msg:
481500
try:
482-
tid = msg['task_id']
483-
except Exception:
484-
raise BadMessage("Message received does not contain 'task_id' field")
485-
486-
if tid == -1 and 'exception' in msg:
487-
logger.warning("Executor shutting down due to exception from interchange")
488-
exception = deserialize(msg['exception'])
489-
self.set_bad_state_and_fail_all(exception)
490-
break
491-
492-
task_fut = self.tasks.pop(tid)
493-
494-
if 'result' in msg:
495-
result = deserialize(msg['result'])
496-
task_fut.set_result(result)
497-
498-
elif 'exception' in msg:
499-
try:
500-
s = deserialize(msg['exception'])
501-
# s should be a RemoteExceptionWrapper... so we can reraise it
502-
if isinstance(s, RemoteExceptionWrapper):
503-
try:
504-
s.reraise()
505-
except Exception as e:
506-
task_fut.set_exception(e)
507-
elif isinstance(s, Exception):
508-
task_fut.set_exception(s)
509-
else:
510-
raise ValueError("Unknown exception-like type received: {}".format(type(s)))
511-
except Exception as e:
512-
# TODO could be a proper wrapped exception?
513-
task_fut.set_exception(
514-
DeserializationError("Received exception, but handling also threw an exception: {}".format(e)))
515-
else:
516-
raise BadMessage("Message received is neither result or exception")
501+
s = deserialize(msg['exception'])
502+
# s should be a RemoteExceptionWrapper... so we can reraise it
503+
if isinstance(s, RemoteExceptionWrapper):
504+
try:
505+
s.reraise()
506+
except Exception as e:
507+
task_fut.set_exception(e)
508+
elif isinstance(s, Exception):
509+
task_fut.set_exception(s)
510+
else:
511+
raise ValueError("Unknown exception-like type received: {}".format(type(s)))
512+
except Exception as e:
513+
# TODO could be a proper wrapped exception?
514+
task_fut.set_exception(
515+
DeserializationError("Received exception, but handling also threw an exception: {}".format(e)))
517516
else:
518-
raise BadMessage("Message received with unknown type {}".format(msg['type']))
517+
raise BadMessage("Message received is neither result or exception")
518+
else:
519+
raise BadMessage("Message received with unknown type {}".format(msg['type']))
519520

520521
logger.info("Queue management worker finished")
521522

@@ -815,13 +816,19 @@ def shutdown(self, timeout: float = 10.0):
815816

816817
logger.info("Attempting HighThroughputExecutor shutdown")
817818

819+
logger.info("Terminating interchange and queue management thread")
820+
self._queue_management_thread_exit.set()
818821
self.interchange_proc.terminate()
819822
try:
820823
self.interchange_proc.wait(timeout=timeout)
821824
except subprocess.TimeoutExpired:
822-
logger.info("Unable to terminate Interchange process; sending SIGKILL")
825+
logger.warning("Unable to terminate Interchange process; sending SIGKILL")
823826
self.interchange_proc.kill()
824827

828+
logger.info("Waiting for queue management thread exit")
829+
if self._queue_management_thread:
830+
self._queue_management_thread.join()
831+
825832
logger.info("closing context sockets")
826833
# this might block if there are outstanding messages (eg if the interchange
827834
# has gone away... probably something to do with zmq.LINGER sockopt to remove

parsl/executors/high_throughput/zmq_pipes.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,21 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
205205
self.port = self.results_receiver.bind_to_random_port("tcp://{}".format(ip_address),
206206
min_port=port_range[0],
207207
max_port=port_range[1])
208+
self.poller = zmq.Poller()
209+
self.poller.register(self.results_receiver, zmq.POLLIN)
208210

209-
def get(self):
211+
def get(self, timeout_ms=None):
212+
"""Get a message from the queue, returning None if timeout expires
213+
without a message. timeout is measured in milliseconds.
214+
"""
210215
logger.debug("Waiting for ResultsIncoming message")
211-
m = self.results_receiver.recv_multipart()
212-
logger.debug("Received ResultsIncoming message")
213-
return m
216+
socks = dict(self.poller.poll(timeout=timeout_ms))
217+
if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN:
218+
m = self.results_receiver.recv_multipart()
219+
logger.debug("Received ResultsIncoming message")
220+
return m
221+
else:
222+
return None
214223

215224
def close(self):
216225
self.results_receiver.close()

parsl/tests/conftest.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
180180
config = pytestconfig.getoption('config')[0]
181181

182182
if config != 'local':
183+
assert threading.active_count() == 1, "precondition: only one thread can be running before this test: " + repr(threading.enumerate())
184+
183185
spec = importlib.util.spec_from_file_location('', config)
184186
module = importlib.util.module_from_spec(spec)
185187
spec.loader.exec_module(module)
@@ -207,6 +209,9 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
207209
raise RuntimeError("DFK changed unexpectedly during test")
208210
dfk.cleanup()
209211
assert DataFlowKernelLoader._dfk is None
212+
213+
assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
214+
210215
else:
211216
yield
212217

@@ -232,6 +237,7 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
232237
start_fds = this_process.num_fds()
233238
logger.error(f"BENC: start open fds: {start_fds}")
234239

240+
assert threading.active_count() == 1, "precondition: only one thread can be running before this test"
235241
local_setup = getattr(request.module, "local_setup", None)
236242
local_teardown = getattr(request.module, "local_teardown", None)
237243
local_config = getattr(request.module, "local_config", None)
@@ -266,6 +272,8 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
266272
end_fds = this_process.num_fds()
267273
logger.error(f"BENC: end open fds: {end_fds} (vs start {start_fds}")
268274

275+
assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
276+
269277
else:
270278
yield
271279

parsl/tests/test_htex/test_htex.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def kill_interchange(*args, **kwargs):
116116
assert mock_ix_proc.wait.called
117117
assert {"timeout": 10} == mock_ix_proc.wait.call_args[1]
118118
if timeout_expires:
119-
assert "Unable to terminate Interchange" in mock_logs[1][0][0]
119+
assert "Unable to terminate Interchange" in mock_logs[3][0][0]
120120
assert mock_ix_proc.kill.called
121121
assert "Attempting" in mock_logs[0][0][0]
122122
assert "Finished" in mock_logs[-1][0][0]

0 commit comments

Comments
 (0)