Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
70a35a2
Updated Resource Tracker to return exit code based on leaking found o…
bityob Jul 16, 2023
da7e245
Add NEWS file
bityob Jul 16, 2023
1de022d
Use subTest for the parametrize the same tests with different values
bityob Jul 17, 2023
48bea84
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 17, 2023
31bc4af
Updated tests based on PR comments, and fix the resource_tracker code…
bityob Jul 18, 2023
47551a3
Clean test
bityob Jul 18, 2023
a02fdbe
Added comment for test and simplified the logic
bityob Jul 18, 2023
9d0369e
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 18, 2023
4dbb777
Fix tests and News based on PR comments
bityob Jul 18, 2023
bfd1e8e
Added reseting the exit code when the process should run again
bityob Jul 18, 2023
043ae90
Added more clearing queue methods beside del (closing and calling gc.…
bityob Jul 18, 2023
850b5bb
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 18, 2023
6e82940
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 19, 2023
a5cb934
Merge branch 'gh-104090-fix-leaked-semaphors-on-test_concurrent_futur…
bityob Jul 19, 2023
ccd736c
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 20, 2023
3b21d5c
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 23, 2023
fbd18ec
Merge main
bityob Aug 31, 2023
e258cb3
Moved FailingInitializerResourcesTest class from old path (Lib/test/t…
bityob Aug 31, 2023
5b886bf
Merge branch 'gh-104090-fix-leaked-semaphors-on-test_concurrent_futur…
bityob Aug 31, 2023
915c8a3
Added missing import
bityob Aug 31, 2023
cd377e6
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
pitrou Sep 19, 2023
27245ec
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
encukou Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(self):
self._lock = threading.Lock()
self._fd = None
self._pid = None
self._exitcode = None

def _stop(self):
with self._lock:
Expand All @@ -68,9 +69,16 @@ def _stop(self):
os.close(self._fd)
self._fd = None

os.waitpid(self._pid, 0)
_, status = os.waitpid(self._pid, 0)

self._pid = None

try:
self._exitcode = os.waitstatus_to_exitcode(status)
except ValueError:
# os.waitstatus_to_exitcode may raise an exception for invalid values
self._exitcode = None

def getfd(self):
self.ensure_running()
return self._fd
Expand Down Expand Up @@ -100,6 +108,7 @@ def ensure_running(self):
pass
self._fd = None
self._pid = None
self._exitcode = None

warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')
Expand Down Expand Up @@ -191,6 +200,8 @@ def main(fd):
pass

cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
exit_code = 0

try:
# keep track of registered/unregistered resources
with open(fd, 'rb') as f:
Expand Down Expand Up @@ -221,6 +232,7 @@ def main(fd):
for rtype, rtype_cache in cache.items():
if rtype_cache:
try:
exit_code = 1
warnings.warn('resource_tracker: There appear to be %d '
'leaked %s objects to clean up at shutdown' %
(len(rtype_cache), rtype))
Expand All @@ -237,3 +249,5 @@ def main(fd):
warnings.warn('resource_tracker: %r: %s' % (name, e))
finally:
pass

sys.exit(exit_code)
41 changes: 41 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5536,6 +5536,47 @@ def test_too_long_name_resource(self):
with self.assertRaises(ValueError):
resource_tracker.register(too_long_name_resource, rtype)

def _test_resource_tracker_leak_resources(self, context, delete_queue):
from multiprocessing.resource_tracker import _resource_tracker
_resource_tracker.ensure_running()
self.assertTrue(_resource_tracker._check_alive())

# Reset exit code value
_resource_tracker._exitcode = None

mp_context = multiprocessing.get_context(context)

# Keep it on variable, so it won't be cleared yet
q = mp_context.Queue()
if delete_queue:
# Clearing the queue resource to be sure explicitly with deleting
# and gc.collect
q.close()
del q
gc.collect()
expected_exit_code = 0
else:
expected_exit_code = 1

self.assertIsNone(_resource_tracker._exitcode)
_resource_tracker._stop()

self.assertEqual(_resource_tracker._exitcode, expected_exit_code)

def test_resource_tracker_exit_code(self):
"""
Test the exit code of the resource tracker based on if there were left
leaked resources when we stop the process. If not leaked resources were
found, exit code should be 0, otherwise 1
"""
for context in ["spawn", "forkserver"]:
for delete_queue in [True, False]:
with self.subTest(context=context, delete_queue=delete_queue):
self._test_resource_tracker_leak_resources(
context=context,
delete_queue=delete_queue,
)


class TestSimpleQueue(unittest.TestCase):

Expand Down
25 changes: 25 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,31 @@ def _assert_logged(self, msg):
create_executor_tests(FailingInitializerMixin)


@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows")
class FailingInitializerResourcesTest(unittest.TestCase):
"""
Source: https://github.com/python/cpython/issues/104090
"""

def _test(self, test_class):
runner = unittest.TextTestRunner()
runner.run(test_class('test_initializer'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, can you please instead put these checks directly in FailingInitializerMixin?
For example, add a tearDown method:

    def tearDown(self):
        super().tearDown()  # shutdown executor
        if self.mp_context and self.mp_context.get_start_method() in ("spawn", "forkserver"):
            # Stop resource tracker manually now, so we can verify there are
            # not leaked resources by checking the process exit code
            from multiprocessing.resource_tracker import _resource_tracker
            _resource_tracker._stop()
            self.assertEqual(_resource_tracker._exitcode, 0)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pitrou Checking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't work

$ ./python -m unittest test.test_concurrent_futures.ProcessPoolForkserverFailingInitializerTest -v
test_initializer (test.test_concurrent_futures.ProcessPoolForkserverFailingInitializerTest.test_initializer) ... /home/user/cpython/Lib/multiprocessing/resource_tracker.py:236: UserWarning: resource_tracker: There appear to be 3 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
FAIL
Traceback (most recent call last):
  File "/home/user/cpython/Lib/multiprocessing/util.py", line 300, in _run_finalizers
    finalizer()
  File "/home/user/cpython/Lib/multiprocessing/util.py", line 224, in __call__
    res = self._callback(*self._args, **self._kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/cpython/Lib/multiprocessing/synchronize.py", line 87, in _cleanup
    sem_unlink(name)
FileNotFoundError: [Errno 2] No such file or directory
Traceback (most recent call last):
  File "/home/user/cpython/Lib/multiprocessing/util.py", line 300, in _run_finalizers
    finalizer()
  File "/home/user/cpython/Lib/multiprocessing/util.py", line 224, in __call__
    res = self._callback(*self._args, **self._kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/cpython/Lib/multiprocessing/synchronize.py", line 87, in _cleanup
    sem_unlink(name)
FileNotFoundError: [Errno 2] No such file or directory
Traceback (most recent call last):
  File "/home/user/cpython/Lib/multiprocessing/util.py", line 300, in _run_finalizers
    finalizer()
  File "/home/user/cpython/Lib/multiprocessing/util.py", line 224, in __call__
    res = self._callback(*self._args, **self._kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/cpython/Lib/multiprocessing/synchronize.py", line 87, in _cleanup
    sem_unlink(name)
FileNotFoundError: [Errno 2] No such file or directory

======================================================================
FAIL: test_initializer (test.test_concurrent_futures.ProcessPoolForkserverFailingInitializerTest.test_initializer)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/user/cpython/Lib/test/test_concurrent_futures.py", line 298, in tearDown
    self.assertEqual(_resource_tracker._exitcode, 0)
AssertionError: 1 != 0

----------------------------------------------------------------------
Ran 1 test in 0.269s

FAILED (failures=1)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, too bad :-(


# GH-104090:
# Stop resource tracker manually now, so we can verify there are not leaked resources by checking
# the process exit code
from multiprocessing.resource_tracker import _resource_tracker
_resource_tracker._stop()

self.assertEqual(_resource_tracker._exitcode, 0)

def test_spawn(self):
self._test(ProcessPoolSpawnFailingInitializerTest)

def test_forkserver(self):
self._test(ProcessPoolForkserverFailingInitializerTest)


class ExecutorShutdownTest:
def test_run_after_shutdown(self):
self.executor.shutdown()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The multiprocessing resource tracker now exits with status code 1 if a resource
leak was detected. It still exits with status code 0 otherwise.