Skip to content

Commit b776437

Browse files
committed
assert on number of threads left running after each test in config local test
1 parent 6dfaa10 commit b776437

File tree

4 files changed

+79
-53
lines changed

4 files changed

+79
-53
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 57 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,9 @@ def __init__(self,
335335
launch_cmd = DEFAULT_LAUNCH_CMD
336336
self.launch_cmd = launch_cmd
337337

338+
self._queue_management_thread_exit = threading.Event()
339+
self._queue_management_thread: Optional[threading.Thread] = None
340+
338341
radio_mode = "htex"
339342

340343
def _warn_deprecated(self, old: str, new: str):
@@ -456,9 +459,9 @@ def _queue_management_worker(self):
456459
"""
457460
logger.debug("Queue management worker starting")
458461

459-
while not self.bad_state_is_set:
462+
while not self.bad_state_is_set and not self._queue_management_thread_exit.is_set():
460463
try:
461-
msgs = self.incoming_q.get()
464+
msgs = self.incoming_q.get(timeout_ms=1000)
462465

463466
except IOError as e:
464467
logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e))
@@ -471,57 +474,55 @@ def _queue_management_worker(self):
471474
else:
472475

473476
if msgs is None:
474-
logger.debug("Got None, exiting")
475-
return
477+
continue
478+
479+
for serialized_msg in msgs:
480+
try:
481+
msg = pickle.loads(serialized_msg)
482+
except pickle.UnpicklingError:
483+
raise BadMessage("Message received could not be unpickled")
476484

477-
else:
478-
for serialized_msg in msgs:
485+
if msg['type'] == 'heartbeat':
486+
continue
487+
elif msg['type'] == 'result':
479488
try:
480-
msg = pickle.loads(serialized_msg)
481-
except pickle.UnpicklingError:
482-
raise BadMessage("Message received could not be unpickled")
489+
tid = msg['task_id']
490+
except Exception:
491+
raise BadMessage("Message received does not contain 'task_id' field")
492+
493+
if tid == -1 and 'exception' in msg:
494+
logger.warning("Executor shutting down due to exception from interchange")
495+
exception = deserialize(msg['exception'])
496+
self.set_bad_state_and_fail_all(exception)
497+
break
498+
499+
task_fut = self.tasks.pop(tid)
483500

484-
if msg['type'] == 'heartbeat':
485-
continue
486-
elif msg['type'] == 'result':
501+
if 'result' in msg:
502+
result = deserialize(msg['result'])
503+
task_fut.set_result(result)
504+
505+
elif 'exception' in msg:
487506
try:
488-
tid = msg['task_id']
489-
except Exception:
490-
raise BadMessage("Message received does not contain 'task_id' field")
491-
492-
if tid == -1 and 'exception' in msg:
493-
logger.warning("Executor shutting down due to exception from interchange")
494-
exception = deserialize(msg['exception'])
495-
self.set_bad_state_and_fail_all(exception)
496-
break
497-
498-
task_fut = self.tasks.pop(tid)
499-
500-
if 'result' in msg:
501-
result = deserialize(msg['result'])
502-
task_fut.set_result(result)
503-
504-
elif 'exception' in msg:
505-
try:
506-
s = deserialize(msg['exception'])
507-
# s should be a RemoteExceptionWrapper... so we can reraise it
508-
if isinstance(s, RemoteExceptionWrapper):
509-
try:
510-
s.reraise()
511-
except Exception as e:
512-
task_fut.set_exception(e)
513-
elif isinstance(s, Exception):
514-
task_fut.set_exception(s)
515-
else:
516-
raise ValueError("Unknown exception-like type received: {}".format(type(s)))
517-
except Exception as e:
518-
# TODO could be a proper wrapped exception?
519-
task_fut.set_exception(
520-
DeserializationError("Received exception, but handling also threw an exception: {}".format(e)))
521-
else:
522-
raise BadMessage("Message received is neither result or exception")
507+
s = deserialize(msg['exception'])
508+
# s should be a RemoteExceptionWrapper... so we can reraise it
509+
if isinstance(s, RemoteExceptionWrapper):
510+
try:
511+
s.reraise()
512+
except Exception as e:
513+
task_fut.set_exception(e)
514+
elif isinstance(s, Exception):
515+
task_fut.set_exception(s)
516+
else:
517+
raise ValueError("Unknown exception-like type received: {}".format(type(s)))
518+
except Exception as e:
519+
# TODO could be a proper wrapped exception?
520+
task_fut.set_exception(
521+
DeserializationError("Received exception, but handling also threw an exception: {}".format(e)))
523522
else:
524-
raise BadMessage("Message received with unknown type {}".format(msg['type']))
523+
raise BadMessage("Message received is neither result or exception")
524+
else:
525+
raise BadMessage("Message received with unknown type {}".format(msg['type']))
525526

526527
logger.info("Queue management worker finished")
527528

@@ -813,14 +814,22 @@ def shutdown(self, timeout: float = 10.0):
813814

814815
logger.info("Attempting HighThroughputExecutor shutdown")
815816

817+
logger.info("Terminating interchange and queue management thread")
816818
self.interchange_proc.terminate()
819+
self._queue_management_thread_exit.set()
820+
821+
logger.info("Waiting for interchange exit")
817822
self.interchange_proc.join(timeout=timeout)
818823
if self.interchange_proc.is_alive():
819824
logger.info("Unable to terminate Interchange process; sending SIGKILL")
820825
self.interchange_proc.kill()
821826

822827
self.interchange_proc.close()
823828

829+
logger.info("Waiting for queue management thread exit")
830+
if self._queue_management_thread:
831+
self._queue_management_thread.join()
832+
824833
logger.info("Finished HighThroughputExecutor shutdown attempt")
825834

826835
def get_usage_information(self):

parsl/executors/high_throughput/zmq_pipes.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,21 @@ def __init__(self, zmq_context: curvezmq.ClientContext, ip_address, port_range):
157157
self.port = self.results_receiver.bind_to_random_port("tcp://{}".format(ip_address),
158158
min_port=port_range[0],
159159
max_port=port_range[1])
160+
self.poller = zmq.Poller()
161+
self.poller.register(self.results_receiver, zmq.POLLIN)
160162

161-
def get(self):
163+
def get(self, timeout_ms=None):
164+
"""Get a message from the queue, returning None if timeout expires
165+
without a message. timeout is measured in milliseconds.
166+
"""
162167
logger.debug("Waiting for ResultsIncoming message")
163-
m = self.results_receiver.recv_multipart()
164-
logger.debug("Received ResultsIncoming message")
165-
return m
168+
socks = dict(self.poller.poll(timeout=timeout_ms))
169+
if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN:
170+
m = self.results_receiver.recv_multipart()
171+
logger.debug("Received ResultsIncoming message")
172+
return m
173+
else:
174+
return None
166175

167176
def close(self):
168177
self.results_receiver.close()

parsl/tests/conftest.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
175175
config = pytestconfig.getoption('config')[0]
176176

177177
if config != 'local':
178+
assert threading.active_count() == 1, "precondition: only one thread can be running before this test: " + repr(threading.enumerate())
179+
178180
spec = importlib.util.spec_from_file_location('', config)
179181
module = importlib.util.module_from_spec(spec)
180182
spec.loader.exec_module(module)
@@ -202,6 +204,9 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
202204
raise RuntimeError("DFK changed unexpectedly during test")
203205
dfk.cleanup()
204206
parsl.clear()
207+
208+
assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
209+
205210
else:
206211
yield
207212

@@ -223,6 +228,7 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
223228
config = pytestconfig.getoption('config')[0]
224229

225230
if config == 'local':
231+
assert threading.active_count() == 1, "precondition: only one thread can be running before this test"
226232
local_setup = getattr(request.module, "local_setup", None)
227233
local_teardown = getattr(request.module, "local_teardown", None)
228234
local_config = getattr(request.module, "local_config", None)
@@ -255,6 +261,8 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
255261
dfk.cleanup()
256262
parsl.clear()
257263

264+
assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
265+
258266
else:
259267
yield
260268

parsl/tests/test_htex/test_htex.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def kill_interchange(*args, **kwargs):
100100
assert mock_ix_proc.join.called
101101
assert {"timeout": 10} == mock_ix_proc.join.call_args[1]
102102
if timeout_expires:
103-
assert "Unable to terminate Interchange" in mock_logs[1][0][0]
103+
assert "Unable to terminate Interchange" in mock_logs[3][0][0]
104104
assert mock_ix_proc.kill.called
105105
assert "Attempting" in mock_logs[0][0][0]
106106
assert "Finished" in mock_logs[-1][0][0]

0 commit comments

Comments
 (0)