Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
70a35a2
Updated Resource Tracker to return exit code based on leaking found o…
bityob Jul 16, 2023
da7e245
Add NEWS file
bityob Jul 16, 2023
1de022d
Use subTest for the parametrize the same tests with different values
bityob Jul 17, 2023
48bea84
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 17, 2023
31bc4af
Updated tests based on PR comments, and fix the resource_tracker code…
bityob Jul 18, 2023
47551a3
Clean test
bityob Jul 18, 2023
a02fdbe
Added comment for test and simplified the logic
bityob Jul 18, 2023
9d0369e
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 18, 2023
4dbb777
Fix tests and News based on PR comments
bityob Jul 18, 2023
bfd1e8e
Added reseting the exit code when the process should run again
bityob Jul 18, 2023
043ae90
Added more clearing queue methods beside del (closing and calling gc.…
bityob Jul 18, 2023
850b5bb
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 18, 2023
6e82940
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 19, 2023
a5cb934
Merge branch 'gh-104090-fix-leaked-semaphors-on-test_concurrent_futur…
bityob Jul 19, 2023
ccd736c
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 20, 2023
3b21d5c
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 23, 2023
fbd18ec
Merge main
bityob Aug 31, 2023
e258cb3
Moved FailingInitializerResourcesTest class from old path (Lib/test/t…
bityob Aug 31, 2023
5b886bf
Merge branch 'gh-104090-fix-leaked-semaphors-on-test_concurrent_futur…
bityob Aug 31, 2023
915c8a3
Added missing import
bityob Aug 31, 2023
cd377e6
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
pitrou Sep 19, 2023
27245ec
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
encukou Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(self):
self._lock = threading.Lock()
self._fd = None
self._pid = None
self._exitcode = None

def _stop(self):
with self._lock:
Expand All @@ -68,9 +69,16 @@ def _stop(self):
os.close(self._fd)
self._fd = None

os.waitpid(self._pid, 0)
_, status = os.waitpid(self._pid, 0)

self._pid = None

try:
self._exitcode = os.waitstatus_to_exitcode(status)
except ValueError:
# os.waitstatus_to_exitcode may raise an exception for invalid values
self._exitcode = None

def getfd(self):
self.ensure_running()
return self._fd
Expand Down Expand Up @@ -191,6 +199,8 @@ def main(fd):
pass

cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
exit_code = 0

try:
# keep track of registered/unregistered resources
with open(fd, 'rb') as f:
Expand Down Expand Up @@ -221,6 +231,7 @@ def main(fd):
for rtype, rtype_cache in cache.items():
if rtype_cache:
try:
exit_code = 1
warnings.warn('resource_tracker: There appear to be %d '
'leaked %s objects to clean up at shutdown' %
(len(rtype_cache), rtype))
Expand All @@ -237,3 +248,5 @@ def main(fd):
warnings.warn('resource_tracker: %r: %s' % (name, e))
finally:
pass

sys.exit(exit_code)
36 changes: 36 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5536,6 +5536,42 @@ def test_too_long_name_resource(self):
with self.assertRaises(ValueError):
resource_tracker.register(too_long_name_resource, rtype)

def _test_resource_tracker_leak_resources(self, context, delete_queue):
from multiprocessing.resource_tracker import _resource_tracker
_resource_tracker.ensure_running()
self.assertTrue(_resource_tracker._check_alive())

# Reset exit code value
_resource_tracker._exitcode = None

mp_context = multiprocessing.get_context(context)

# Keep it on variable, so it won't be cleared yet
q = mp_context.Queue()
if delete_queue:
del q
expected_exit_code = 0
else:
expected_exit_code = 1

self.assertIsNone(_resource_tracker._exitcode)
_resource_tracker._stop()

self.assertEqual(_resource_tracker._exitcode, expected_exit_code)

def test_resource_tracker_exit_code(self):
"""
Test the exit code of the resource tracker based on if there were left leaked resources when we stop the process.
If not leaked resources were found, exit code should be 0, otherwise 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. Nit, but can you please ensure the line length remains reasonable (say < 80 characters) ? This can otherwise be annoying when reading/reviewing code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll fix it. I'm used to 120 chars line length...

"""
for context in ["spawn", "forkserver"]:
for delete_queue in [True, False]:
with self.subTest(context=context, delete_queue=delete_queue):
self._test_resource_tracker_leak_resources(
context=context,
delete_queue=delete_queue,
)


class TestSimpleQueue(unittest.TestCase):

Expand Down
29 changes: 29 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,35 @@ def _assert_logged(self, msg):
create_executor_tests(FailingInitializerMixin)


@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows")
class FailingInitializerResourcesTest(unittest.TestCase):
"""
Source: https://github.com/python/cpython/issues/104090
"""

def _test(self, test_class):
runner = unittest.TextTestRunner()
result = runner.run(test_class('test_initializer'))

self.assertEqual(result.testsRun, 1)
self.assertEqual(result.failures, [])
self.assertEqual(result.errors, [])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm, why not do the simple thing and add these checks somewhere in FailingInitializerMixin?
It should also avoid running those tests twice (which I assume this does).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result object exists when we run the test using Test Runner, which is not the case on FailingInitializerMixin.
This is used for safety, ensuring the inner test passed before we check the resource tracker

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this is needlessly complex (and might interfere with other test options/customizations). Let's do the check at the end of said tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pitrou I don't understand how you want those checks to be since we can check it only after a test run with the returned result, which is not the case in regular tests. Anyway, I'm removing those checks since it's not mandatory and just for safety

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly don't understand what the difficulty would be. You can make the resource tracker stop at the end of the test, or at the end of the testcase (in a teardown method for example).


# GH-104090:
# Stop resource tracker manually now, so we can verify there are not leaked resources by checking
# the process exit code
from multiprocessing.resource_tracker import _resource_tracker
_resource_tracker._stop()

self.assertEqual(_resource_tracker._exitcode, 0)

def test_spawn(self):
self._test(ProcessPoolSpawnFailingInitializerTest)

def test_forkserver(self):
self._test(ProcessPoolForkserverFailingInitializerTest)


class ExecutorShutdownTest:
def test_run_after_shutdown(self):
self.executor.shutdown()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Updated Resource Tracker to return exit code based on resource leaked found
or not