diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index ff7c17efaab694..574b42928c6dd5 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -766,6 +766,10 @@ def _start_executor_manager_thread(self): self._executor_manager_thread_wakeup def _adjust_process_count(self): + # gh-132969 + if self._processes is None: + return + # if there's an idle process, we don't need to spawn a new one. if self._idle_worker_semaphore.acquire(blocking=False): return diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 7a4065afd46fc8..8d30b672d6f4b3 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -20,6 +20,16 @@ def sleep_and_print(t, msg): sys.stdout.flush() +def failing_task_132969(n: int) -> int: + raise ValueError("failing task") + + +def good_task_132969(n: int) -> int: + time.sleep(0.1 * n) + return n + + + class ExecutorShutdownTest: def test_run_after_shutdown(self): self.executor.shutdown() @@ -330,6 +340,43 @@ def test_shutdown_no_wait(self): # shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + def _run_test_issue_132969(self, max_workers: int) -> int: + # max_workers=2 will repro exception + # max_workers=4 will repro exception and then hang + + import multiprocessing as mp + + # Repro conditions + # max_tasks_per_child=1 + # a task ends abnormally + # shutdown(wait=False) is called + executor = futures.ProcessPoolExecutor( + max_workers=max_workers, + max_tasks_per_child=1, + mp_context=mp.get_context("forkserver")) + f1 = executor.submit(good_task_132969, 1) + f2 = executor.submit(failing_task_132969, 2) + f3 = executor.submit(good_task_132969, 3) + result:int = 0 + try: + result += f1.result() + result += f2.result() + result += f3.result() + except ValueError: + # stop processing results upon first exception + pass + + executor.shutdown(wait=False) + return result + + def test_shutdown_len_exception_132969(self): + result = self._run_test_issue_132969(2) + self.assertEqual(result, 1) + + def test_shutdown_process_hang_132969(self): + result = self._run_test_issue_132969(4) + self.assertEqual(result, 1) + create_executor_tests(globals(), ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, diff --git a/Misc/NEWS.d/next/Library/2025-04-30-18-01-47.gh-issue-132969.EagQ3G.rst b/Misc/NEWS.d/next/Library/2025-04-30-18-01-47.gh-issue-132969.EagQ3G.rst new file mode 100644 index 00000000000000..d0c89ad1790838 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-04-30-18-01-47.gh-issue-132969.EagQ3G.rst @@ -0,0 +1 @@ +Fixes error+hang when ProcessPoolExecutor shutdown called with wait=False and a task ended abnormally.