Skip to content

Commit dc3a3fe

Browse files
committed
gh-136655: ensure cancelled futures are notified on process pool shutdown
At present when a process pool executor shuts down it is cancelling pending work items, but failing to notify any waiting threads. Fix this. See also gh-109934, which is a similar bug in the thread pool executor.
1 parent bb85af3 commit dc3a3fe

File tree

4 files changed

+52
-1
lines changed

4 files changed

+52
-1
lines changed

Lib/concurrent/futures/process.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,9 @@ def flag_executor_shutting_down(self):
518518
# to only have futures that are currently running.
519519
new_pending_work_items = {}
520520
for work_id, work_item in self.pending_work_items.items():
521-
if not work_item.future.cancel():
521+
if work_item.future.cancel():
522+
work_item.future.set_running_or_notify_cancel()
523+
else:
522524
new_pending_work_items[work_id] = work_item
523525
self.pending_work_items = new_pending_work_items
524526
# Drain work_ids_queue since we no longer need to

Lib/test/test_concurrent_futures/executor.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,3 +247,35 @@ def test_swallows_falsey_exceptions(self):
247247
msg = 'lenlen'
248248
with self.assertRaisesRegex(FalseyLenException, msg):
249249
self.executor.submit(raiser, FalseyLenException, msg).result()
250+
251+
def test_shutdown_notifies_cancelled_futures(self):
252+
253+
# TODO: remove when gh-109934 is fixed
254+
if self.executor_type is futures.ThreadPoolExecutor:
255+
self.skipTest("gh-109934: skipping thread pool executor")
256+
257+
# gh-136655: ensure cancelled futures are notified
258+
count = self.worker_count * 2
259+
barrier = self.create_barrier(self.worker_count + 1)
260+
with self.executor as exec:
261+
fs = [exec.submit(blocking_raiser,
262+
barrier if index < self.worker_count else None)
263+
for index in range(count)]
264+
265+
exec.shutdown(wait=False, cancel_futures=True)
266+
try:
267+
barrier.wait()
268+
except threading.BrokenBarrierError:
269+
pass
270+
271+
for future in fs:
272+
self.assertRaises(
273+
(FalseyBoolException, futures.CancelledError, threading.BrokenBarrierError),
274+
future.result)
275+
276+
self.assertIn('CANCELLED_AND_NOTIFIED', [f._state for f in fs])
277+
278+
def blocking_raiser(barrier=None):
279+
if barrier is not None:
280+
barrier.wait(1)
281+
raise FalseyBoolException()

Lib/test/test_concurrent_futures/util.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ def get_context(self):
8080
class ThreadPoolMixin(ExecutorMixin):
8181
executor_type = futures.ThreadPoolExecutor
8282

83+
def create_barrier(self, count):
84+
return threading.Barrier(count)
85+
8386
def create_event(self):
8487
return threading.Event()
8588

@@ -88,6 +91,9 @@ def create_event(self):
8891
class InterpreterPoolMixin(ExecutorMixin):
8992
executor_type = futures.InterpreterPoolExecutor
9093

94+
def create_barrier(self, count):
95+
self.skipTest("InterpreterPoolExecutor doesn't support barriers")
96+
9197
def create_event(self):
9298
self.skipTest("InterpreterPoolExecutor doesn't support events")
9399

@@ -107,6 +113,9 @@ def get_context(self):
107113
self.skipTest("TSAN doesn't support threads after fork")
108114
return super().get_context()
109115

116+
def create_barrier(self, count):
117+
return self.manager.Barrier(count)
118+
110119
def create_event(self):
111120
return self.manager.Event()
112121

@@ -122,6 +131,9 @@ def get_context(self):
122131
self.skipTest("ProcessPoolExecutor unavailable on this system")
123132
return super().get_context()
124133

134+
def create_barrier(self, count):
135+
return self.manager.Barrier(count)
136+
125137
def create_event(self):
126138
return self.manager.Event()
127139

@@ -141,6 +153,9 @@ def get_context(self):
141153
self.skipTest("TSAN doesn't support threads after fork")
142154
return super().get_context()
143155

156+
def create_barrier(self, count):
157+
return self.manager.Barrier(count)
158+
144159
def create_event(self):
145160
return self.manager.Event()
146161

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
+Ensure :class:`concurrent.futures.ProcessPoolExecutor` notifies any futures
2+
it cancels on shutdown.

0 commit comments

Comments
 (0)