diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index a14650bf5fa47c..0004edf760701f 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -518,7 +518,9 @@ def flag_executor_shutting_down(self): # to only have futures that are currently running. new_pending_work_items = {} for work_id, work_item in self.pending_work_items.items(): - if not work_item.future.cancel(): + if work_item.future.cancel(): + work_item.future.set_running_or_notify_cancel() + else: new_pending_work_items[work_id] = work_item self.pending_work_items = new_pending_work_items # Drain work_ids_queue since we no longer need to diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 909359b648709f..bd8eb33d753598 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -264,6 +264,7 @@ def shutdown(self, wait=True, *, cancel_futures=False): break if work_item is not None: work_item.future.cancel() + work_item.future.set_running_or_notify_cancel() # Send a wake-up to prevent threads calling # _work_queue.get(block=True) from permanently blocking. diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 95bf8fcd25bf54..9a993fe32aa69a 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -233,3 +233,30 @@ def test_swallows_falsey_exceptions(self): msg = 'lenlen' with self.assertRaisesRegex(FalseyLenException, msg): self.executor.submit(raiser, FalseyLenException, msg).result() + + def test_shutdown_notifies_cancelled_futures(self): + # gh-136655: ensure cancelled futures are notified + count = self.worker_count * 2 + barrier = self.create_barrier(self.worker_count + 1) + with self.executor as exec: + fs = [exec.submit(blocking_raiser, + barrier if index < self.worker_count else None) + for index in range(count)] + + exec.shutdown(wait=False, cancel_futures=True) + try: + barrier.wait() + except threading.BrokenBarrierError: + pass + + for future in fs: + self.assertRaises( + (FalseyBoolException, futures.CancelledError, threading.BrokenBarrierError), + future.result) + + self.assertIn('CANCELLED_AND_NOTIFIED', [f._state for f in fs]) + +def blocking_raiser(barrier=None): + if barrier is not None: + barrier.wait(1) + raise FalseyBoolException() diff --git a/Lib/test/test_concurrent_futures/test_thread_pool.py b/Lib/test/test_concurrent_futures/test_thread_pool.py index 4324241b374967..ab270a91670f10 100644 --- a/Lib/test/test_concurrent_futures/test_thread_pool.py +++ b/Lib/test/test_concurrent_futures/test_thread_pool.py @@ -112,6 +112,61 @@ def log_n_wait(ident): # ident='third' is cancelled because it remained in the collection of futures self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"]) + def test_shutdown_cancels_pending_futures(self): + # gh-109934: ensure shutdown cancels and notifies pending futures + def waiter(b1, b2): + b1.wait(3) + b2.wait(3) + def noop(): + pass + b1 = threading.Barrier(2) + b2 = threading.Barrier(2) + called_back_1 = threading.Event() + called_back_2 = threading.Event() + with self.executor_type(max_workers=1) as pool: + + # Submit two futures, the first of which will block and prevent the + # second from running + f1 = pool.submit(waiter, b1, b2) + f2 = pool.submit(noop) + f1.add_done_callback(lambda f: called_back_1.set()) + f2.add_done_callback(lambda f: called_back_2.set()) + fs = {f1, f2} + + completed_iter = futures.as_completed(fs, timeout=0) + self.assertRaises(TimeoutError, next, completed_iter) + + # Ensure the first task has started running then shutdown the + # pool, cancelling the unstarted task + b1.wait(3) + pool.shutdown(wait=False, cancel_futures=True) + self.assertTrue(f1.running()) + self.assertTrue(f2.cancelled()) + self.assertFalse(called_back_1.is_set()) + self.assertTrue(called_back_2.is_set()) + + completed_iter = futures.as_completed(fs, timeout=0) + f = next(completed_iter) + self.assertIs(f, f2) + self.assertRaises(TimeoutError, next, completed_iter) + + result = futures.wait(fs, timeout=0) + self.assertEqual(result.not_done, {f1}) + self.assertEqual(result.done, {f2}) + + # Unblock and wait for the first future to complete + b2.wait(3) + called_back_1.wait(3) + self.assertTrue(f1.done()) + self.assertTrue(called_back_1.is_set()) + + completed = set(futures.as_completed(fs, timeout=0)) + self.assertEqual(set(fs), completed) + + result = futures.wait(fs, timeout=0) + self.assertEqual(result.not_done, set()) + self.assertEqual(result.done, set(fs)) + def setUpModule(): setup_module() diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py index b12940414d9142..aceb51cacfae26 100644 --- a/Lib/test/test_concurrent_futures/util.py +++ b/Lib/test/test_concurrent_futures/util.py @@ -79,6 +79,9 @@ def get_context(self): class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor + def create_barrier(self, count): + return threading.Barrier(count) + def create_event(self): return threading.Event() @@ -87,6 +90,9 @@ def create_event(self): class InterpreterPoolMixin(ExecutorMixin): executor_type = futures.InterpreterPoolExecutor + def create_barrier(self, count): + self.skipTest("InterpreterPoolExecutor doesn't support barriers") + def create_event(self): self.skipTest("InterpreterPoolExecutor doesn't support events") @@ -106,6 +112,9 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() + def create_barrier(self, count): + return self.manager.Barrier(count) + def create_event(self): return self.manager.Event() @@ -121,6 +130,9 @@ def get_context(self): self.skipTest("ProcessPoolExecutor unavailable on this system") return super().get_context() + def create_barrier(self, count): + return self.manager.Barrier(count) + def create_event(self): return self.manager.Event() @@ -140,6 +152,9 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() + def create_barrier(self, count): + return self.manager.Barrier(count) + def create_event(self): return self.manager.Event() diff --git a/Misc/NEWS.d/next/Library/2025-05-24-15-15-43.gh-issue-109934.WXOdC8.rst b/Misc/NEWS.d/next/Library/2025-05-24-15-15-43.gh-issue-109934.WXOdC8.rst new file mode 100644 index 00000000000000..21b097f9b72e13 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-05-24-15-15-43.gh-issue-109934.WXOdC8.rst @@ -0,0 +1,3 @@ +Ensure :class:`concurrent.futures.ThreadPoolExecutor` and +:class:`concurrent.futures.ProcessPoolExecutor` notifies any futures it cancels +on shutdown.