Skip to content
4 changes: 3 additions & 1 deletion Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,9 @@ def flag_executor_shutting_down(self):
# to only have futures that are currently running.
new_pending_work_items = {}
for work_id, work_item in self.pending_work_items.items():
if not work_item.future.cancel():
if work_item.future.cancel():
work_item.future.set_running_or_notify_cancel()
else:
new_pending_work_items[work_id] = work_item
self.pending_work_items = new_pending_work_items
# Drain work_ids_queue since we no longer need to
Expand Down
1 change: 1 addition & 0 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def shutdown(self, wait=True, *, cancel_futures=False):
break
if work_item is not None:
work_item.future.cancel()
work_item.future.set_running_or_notify_cancel()

# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
Expand Down
27 changes: 27 additions & 0 deletions Lib/test/test_concurrent_futures/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,30 @@ def test_swallows_falsey_exceptions(self):
msg = 'lenlen'
with self.assertRaisesRegex(FalseyLenException, msg):
self.executor.submit(raiser, FalseyLenException, msg).result()

def test_shutdown_notifies_cancelled_futures(self):
# gh-136655: ensure cancelled futures are notified
count = self.worker_count * 2
barrier = self.create_barrier(self.worker_count + 1)
with self.executor as exec:
fs = [exec.submit(blocking_raiser,
barrier if index < self.worker_count else None)
for index in range(count)]

exec.shutdown(wait=False, cancel_futures=True)
try:
barrier.wait()
except threading.BrokenBarrierError:
pass

for future in fs:
self.assertRaises(
(FalseyBoolException, futures.CancelledError, threading.BrokenBarrierError),
future.result)

self.assertIn('CANCELLED_AND_NOTIFIED', [f._state for f in fs])

def blocking_raiser(barrier=None):
if barrier is not None:
barrier.wait(1)
raise FalseyBoolException()
55 changes: 55 additions & 0 deletions Lib/test/test_concurrent_futures/test_thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,61 @@ def log_n_wait(ident):
# ident='third' is cancelled because it remained in the collection of futures
self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])

def test_shutdown_cancels_pending_futures(self):
# gh-109934: ensure shutdown cancels and notifies pending futures
def waiter(b1, b2):
b1.wait(3)
b2.wait(3)
def noop():
pass
b1 = threading.Barrier(2)
b2 = threading.Barrier(2)
called_back_1 = threading.Event()
called_back_2 = threading.Event()
with self.executor_type(max_workers=1) as pool:

# Submit two futures, the first of which will block and prevent the
# second from running
f1 = pool.submit(waiter, b1, b2)
f2 = pool.submit(noop)
f1.add_done_callback(lambda f: called_back_1.set())
f2.add_done_callback(lambda f: called_back_2.set())
fs = {f1, f2}

completed_iter = futures.as_completed(fs, timeout=0)
self.assertRaises(TimeoutError, next, completed_iter)

# Ensure the first task has started running then shutdown the
# pool, cancelling the unstarted task
b1.wait(3)
pool.shutdown(wait=False, cancel_futures=True)
self.assertTrue(f1.running())
self.assertTrue(f2.cancelled())
self.assertFalse(called_back_1.is_set())
self.assertTrue(called_back_2.is_set())

completed_iter = futures.as_completed(fs, timeout=0)
f = next(completed_iter)
self.assertIs(f, f2)
self.assertRaises(TimeoutError, next, completed_iter)

result = futures.wait(fs, timeout=0)
self.assertEqual(result.not_done, {f1})
self.assertEqual(result.done, {f2})

# Unblock and wait for the first future to complete
b2.wait(3)
called_back_1.wait(3)
self.assertTrue(f1.done())
self.assertTrue(called_back_1.is_set())

completed = set(futures.as_completed(fs, timeout=0))
self.assertEqual(set(fs), completed)

result = futures.wait(fs, timeout=0)
self.assertEqual(result.not_done, set())
self.assertEqual(result.done, set(fs))


def setUpModule():
setup_module()
Expand Down
15 changes: 15 additions & 0 deletions Lib/test/test_concurrent_futures/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ def get_context(self):
class ThreadPoolMixin(ExecutorMixin):
executor_type = futures.ThreadPoolExecutor

def create_barrier(self, count):
return threading.Barrier(count)

def create_event(self):
return threading.Event()

Expand All @@ -87,6 +90,9 @@ def create_event(self):
class InterpreterPoolMixin(ExecutorMixin):
executor_type = futures.InterpreterPoolExecutor

def create_barrier(self, count):
self.skipTest("InterpreterPoolExecutor doesn't support barriers")

def create_event(self):
self.skipTest("InterpreterPoolExecutor doesn't support events")

Expand All @@ -106,6 +112,9 @@ def get_context(self):
self.skipTest("TSAN doesn't support threads after fork")
return super().get_context()

def create_barrier(self, count):
return self.manager.Barrier(count)

def create_event(self):
return self.manager.Event()

Expand All @@ -121,6 +130,9 @@ def get_context(self):
self.skipTest("ProcessPoolExecutor unavailable on this system")
return super().get_context()

def create_barrier(self, count):
return self.manager.Barrier(count)

def create_event(self):
return self.manager.Event()

Expand All @@ -140,6 +152,9 @@ def get_context(self):
self.skipTest("TSAN doesn't support threads after fork")
return super().get_context()

def create_barrier(self, count):
return self.manager.Barrier(count)

def create_event(self):
return self.manager.Event()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Ensure :class:`concurrent.futures.ThreadPoolExecutor` and
:class:`concurrent.futures.ProcessPoolExecutor` notifies any futures it cancels
on shutdown.
Loading