Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions parsl/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from itertools import chain

import _pytest.runner as runner
import psutil
import pytest

import parsl
Expand Down Expand Up @@ -191,6 +192,17 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
config = pytestconfig.getoption('config')[0]

if config != 'local':
this_process = psutil.Process()
start_fds = this_process.num_fds()
logger.error(f"BENC: start open fds: {start_fds}")

# GC executor starts a thread at import(!)
# so we can't make this assertion...
# assert threading.active_count() == 1, "precondition: only one thread can be running before this test: " + repr(threading.enumerate())
pre_ac = threading.active_count()
if pre_ac > 1:
logger.warning("precondition: only one thread should be running before this test: " + repr(threading.enumerate()))

spec = importlib.util.spec_from_file_location('', config)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
Expand Down Expand Up @@ -218,6 +230,17 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
raise RuntimeError("DFK changed unexpectedly during test")
dfk.cleanup()
assert DataFlowKernelLoader._dfk is None
end_fds = this_process.num_fds()
logger.error(f"BENC: end open fds: {end_fds}")

post_ac = threading.active_count()
assert pre_ac == post_ac, "test left threads running: " + repr(threading.enumerate())
# assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())

end_fds = this_process.num_fds()
logger.error(f"BENC: end open fds: {end_fds} (vs {start_fds} at start)")
assert start_fds == end_fds, "number of open fds changed across test run"

else:
yield

Expand All @@ -239,6 +262,16 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
config = pytestconfig.getoption('config')[0]

if config == 'local':
this_process = psutil.Process()
start_fds = this_process.num_fds()
logger.error(f"BENC: start open fds: {start_fds}")
logger.error(f"BENC: start threads: {threading.active_count()}")

pre_ac = threading.active_count()
if pre_ac > 1:
logger.warning("precondition: only one thread should be running before this test: " + repr(threading.enumerate()))
# assert threading.active_count() == 1, "precondition: only one thread can be running before this test: " + repr(threading.enumerate())

local_setup = getattr(request.module, "local_setup", None)
local_teardown = getattr(request.module, "local_teardown", None)
local_config = getattr(request.module, "local_config", None)
Expand Down Expand Up @@ -270,6 +303,21 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session):
raise RuntimeError("DFK changed unexpectedly during test")
dfk.cleanup()
assert DataFlowKernelLoader._dfk is None
end_fds = this_process.num_fds()
logger.error(f"BENC: end open fds: {end_fds} (vs start {start_fds}")
logger.error(f"BENC: end threads: {threading.active_count()}")

post_ac = threading.active_count()
assert pre_ac == post_ac, "test left threads running: " + repr(threading.enumerate())

# assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate())

end_fds = this_process.num_fds()
logger.error(f"BENC: open fds END: {end_fds}")
if end_fds > start_fds:
logger.error(f"Open files (not all fds, though?): {this_process.open_files()!r}")
os.system(f"ls -l /proc/{os.getpid()}/fd")
pytest.fail("BENC: number of open fds increased across test")

else:
yield
Expand Down
Loading