Skip to content

Commit a4ac357

Browse files
committed
count fds
1 parent b776437 commit a4ac357

File tree

9 files changed

+52
-11
lines changed

9 files changed

+52
-11
lines changed

parsl/dataflow/dflow.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def __init__(self, config: Config) -> None:
8686
self.run_dir = make_rundir(config.run_dir)
8787

8888
if config.initialize_logging:
89-
parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG)
89+
_, self.logging_handler = parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG)
9090

9191
logger.info("Starting DataFlowKernel with config\n{}".format(config))
9292

@@ -1272,7 +1272,13 @@ def cleanup(self) -> None:
12721272
self.monitoring.close()
12731273
logger.info("Terminated monitoring")
12741274

1275-
logger.info("DFK cleanup complete")
1275+
# TODO: do this in parsl/logutils.py
1276+
logger.info("DFK cleanup complete - removing parsl.log handler")
1277+
logger_to_remove = logging.getLogger("parsl")
1278+
logger_to_remove.removeHandler(self.logging_handler)
1279+
self.logging_handler.close()
1280+
1281+
logger.info("handler closed - is this going to break things?")
12761282

12771283
def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str:
12781284
"""Checkpoint the dfk incrementally to a checkpoint file.

parsl/executors/high_throughput/executor.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,8 @@ def _start_local_interchange_process(self):
555555
self.interchange_proc.start()
556556
try:
557557
(self.worker_task_port, self.worker_result_port) = comm_q.get(block=True, timeout=120)
558+
comm_q.close()
559+
comm_q.join_thread()
558560
except queue.Empty:
559561
logger.error("Interchange has not completed initialization in 120s. Aborting")
560562
raise Exception("Interchange failed to start")
@@ -830,6 +832,21 @@ def shutdown(self, timeout: float = 10.0):
830832
if self._queue_management_thread:
831833
self._queue_management_thread.join()
832834

835+
logger.info("closing context sockets")
836+
# this might block if there are outstanding messages (eg if the interchange
837+
# has gone away... probably something to do with zmq.LINGER sockopt to remove
838+
# this hang risk?
839+
840+
# these should be initialized to none rather than being absent?
841+
if hasattr(self, "incoming_q"):
842+
self.incoming_q.close()
843+
844+
if hasattr(self, "outgoing_q"):
845+
self.outgoing_q.close()
846+
847+
if hasattr(self, "command_client"):
848+
self.command_client.close()
849+
833850
logger.info("Finished HighThroughputExecutor shutdown attempt")
834851

835852
def get_usage_information(self):

parsl/executors/taskvine/executor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,11 +638,13 @@ def shutdown(self, *args, **kwargs):
638638
# Join all processes before exiting
639639
logger.debug("Joining on submit process")
640640
self._submit_process.join()
641+
self._submit_process.close()
641642
logger.debug("Joining on collector thread")
642643
self._collector_thread.join()
643644
if self.worker_launch_method == 'factory':
644645
logger.debug("Joining on factory process")
645646
self._factory_process.join()
647+
self._factory_process.close()
646648

647649
# Shutdown multiprocessing queues
648650
self._ready_task_queue.close()

parsl/executors/workqueue/executor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,8 @@ def shutdown(self, *args, **kwargs):
732732

733733
logger.debug("Joining on submit process")
734734
self.submit_process.join()
735+
self.submit_process.close()
736+
735737
logger.debug("Joining on collector thread")
736738
self.collector_thread.join()
737739

parsl/log_utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import logging
1515
import typeguard
1616

17-
from typing import Optional
17+
from typing import Optional, Tuple
1818

1919

2020
DEFAULT_FORMAT = (
@@ -66,7 +66,7 @@ def set_stream_logger(name: str = 'parsl',
6666
def set_file_logger(filename: str,
6767
name: str = 'parsl',
6868
level: int = logging.DEBUG,
69-
format_string: Optional[str] = None) -> logging.Logger:
69+
format_string: Optional[str] = None) -> Tuple[logging.Logger, logging.FileHandler]:
7070
"""Add a file log handler.
7171
7272
Args:
@@ -94,4 +94,4 @@ def set_file_logger(filename: str,
9494
futures_logger = logging.getLogger("concurrent.futures")
9595
futures_logger.addHandler(handler)
9696

97-
return logger
97+
return (logger, handler)

parsl/monitoring/monitoring.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ def close(self) -> None:
252252
self.router_exit_event.set()
253253
logger.info("Waiting for router to terminate")
254254
self.router_proc.join()
255+
self.router_proc.close()
255256
logger.debug("Finished waiting for router termination")
256257
if len(exception_msgs) == 0:
257258
logger.debug("Sending STOP to DBM")
@@ -260,13 +261,15 @@ def close(self) -> None:
260261
logger.debug("Not sending STOP to DBM, because there were DBM exceptions")
261262
logger.debug("Waiting for DB termination")
262263
self.dbm_proc.join()
264+
self.dbm_proc.close()
263265
logger.debug("Finished waiting for DBM termination")
264266

265267
# should this be message based? it probably doesn't need to be if
266268
# we believe we've received all messages
267269
logger.info("Terminating filesystem radio receiver process")
268270
self.filesystem_proc.terminate()
269271
self.filesystem_proc.join()
272+
self.filesystem_proc.close()
270273

271274
logger.info("Closing monitoring multiprocessing queues")
272275
self.exception_q.close()
@@ -284,9 +287,9 @@ def close(self) -> None:
284287

285288
@wrap_with_logs
286289
def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None:
287-
logger = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
288-
name="monitoring_filesystem_radio",
289-
level=logging.INFO)
290+
logger, _ = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
291+
name="monitoring_filesystem_radio",
292+
level=logging.INFO)
290293

291294
logger.info("Starting filesystem radio receiver")
292295
setproctitle("parsl: monitoring filesystem receiver")

parsl/monitoring/router.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ def __init__(self,
5858
5959
"""
6060
os.makedirs(logdir, exist_ok=True)
61-
self.logger = set_file_logger("{}/monitoring_router.log".format(logdir),
62-
name="monitoring_router",
63-
level=logging_level)
61+
self.logger, _ = set_file_logger("{}/monitoring_router.log".format(logdir),
62+
name="monitoring_router",
63+
level=logging_level)
6464
self.logger.debug("Monitoring router starting")
6565

6666
self.hub_address = hub_address

parsl/tests/conftest.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import os
55
import pathlib
6+
import psutil
67
import random
78
import re
89
import shutil
@@ -228,6 +229,9 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
228229
config = pytestconfig.getoption('config')[0]
229230

230231
if config == 'local':
232+
this_process = psutil.Process()
233+
start_fds = this_process.num_fds()
234+
logger.error(f"BENC: open fds: {start_fds}")
231235
assert threading.active_count() == 1, "precondition: only one thread can be running before this test"
232236
local_setup = getattr(request.module, "local_setup", None)
233237
local_teardown = getattr(request.module, "local_teardown", None)
@@ -262,6 +266,12 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
262266
parsl.clear()
263267

264268
assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())
269+
end_fds = this_process.num_fds()
270+
logger.error(f"BENC: open fds END: {end_fds}")
271+
if end_fds > start_fds:
272+
logger.error(f"Open files (not all fds, though?): {this_process.open_files()!r}")
273+
os.system(f"ls -l /proc/{os.getpid()}/fd")
274+
pytest.fail("BENC: number of open fds increased across test")
265275

266276
else:
267277
yield

parsl/tests/test_providers/test_local_provider.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ def test_ssh_channel():
104104

105105
def _stop_sshd(sshd_thread):
106106
sshd_thread.stop()
107+
sshd_thread.join()
107108

108109

109110
class SSHDThread(threading.Thread):

0 commit comments

Comments
 (0)