Skip to content

Commit ff70843

Browse files
committed
gh-136655: ensure cancelled futures are notified on process pool shutdown
1 parent bb89020 commit ff70843

File tree

3 files changed

+40
-1
lines changed

3 files changed

+40
-1
lines changed

Lib/concurrent/futures/process.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,9 @@ def flag_executor_shutting_down(self):
518518
# to only have futures that are currently running.
519519
new_pending_work_items = {}
520520
for work_id, work_item in self.pending_work_items.items():
521-
if not work_item.future.cancel():
521+
if work_item.future.cancel():
522+
work_item.future.set_running_or_notify_cancel()
523+
else:
522524
new_pending_work_items[work_id] = work_item
523525
self.pending_work_items = new_pending_work_items
524526
# Drain work_ids_queue since we no longer need to

Lib/test/test_concurrent_futures/executor.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,3 +233,25 @@ def test_swallows_falsey_exceptions(self):
233233
msg = 'lenlen'
234234
with self.assertRaisesRegex(FalseyLenException, msg):
235235
self.executor.submit(raiser, FalseyLenException, msg).result()
236+
237+
def test_shutdown_notifies_cancelled_futures(self):
238+
# gh-136655: ensure cancelled futures are notified
239+
count = self.worker_count * 2
240+
barrier = self.create_barrier(self.worker_count + 1)
241+
fs = [self.executor.submit(blocking_raiser,
242+
barrier if index < self.worker_count else None)
243+
for index in range(count)]
244+
245+
self.executor.shutdown(wait=False, cancel_futures=True)
246+
barrier.wait()
247+
248+
for future in fs:
249+
self.assertRaises((FalseyBoolException, futures.CancelledError),
250+
future.result)
251+
252+
self.assertIn('CANCELLED_AND_NOTIFIED', [f._state for f in fs])
253+
254+
def blocking_raiser(barrier=None):
255+
if barrier is not None:
256+
barrier.wait(1)
257+
raise FalseyBoolException()

Lib/test/test_concurrent_futures/util.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ def get_context(self):
7979
class ThreadPoolMixin(ExecutorMixin):
8080
executor_type = futures.ThreadPoolExecutor
8181

82+
def create_barrier(self, count):
83+
return threading.Barrier(count)
84+
8285
def create_event(self):
8386
return threading.Event()
8487

@@ -87,6 +90,9 @@ def create_event(self):
8790
class InterpreterPoolMixin(ExecutorMixin):
8891
executor_type = futures.InterpreterPoolExecutor
8992

93+
def create_barrier(self, count):
94+
self.skipTest("InterpreterPoolExecutor doesn't support barriers")
95+
9096
def create_event(self):
9197
self.skipTest("InterpreterPoolExecutor doesn't support events")
9298

@@ -106,6 +112,9 @@ def get_context(self):
106112
self.skipTest("TSAN doesn't support threads after fork")
107113
return super().get_context()
108114

115+
def create_barrier(self, count):
116+
return self.manager.Barrier(count)
117+
109118
def create_event(self):
110119
return self.manager.Event()
111120

@@ -121,6 +130,9 @@ def get_context(self):
121130
self.skipTest("ProcessPoolExecutor unavailable on this system")
122131
return super().get_context()
123132

133+
def create_barrier(self, count):
134+
return self.manager.Barrier(count)
135+
124136
def create_event(self):
125137
return self.manager.Event()
126138

@@ -140,6 +152,9 @@ def get_context(self):
140152
self.skipTest("TSAN doesn't support threads after fork")
141153
return super().get_context()
142154

155+
def create_barrier(self, count):
156+
return self.manager.Barrier(count)
157+
143158
def create_event(self):
144159
return self.manager.Event()
145160

0 commit comments

Comments
 (0)