Skip to content

Commit 4429b2f

Browse files
committed
PR feedback. Split terminate_workers into terminate_workers and kill_workers.
Remove the ability to give an arbitrary signal, though leave plumbing to make it easy enough to re-add if needed.
1 parent 6d77c10 commit 4429b2f

File tree

5 files changed

+118
-57
lines changed

5 files changed

+118
-57
lines changed

Doc/library/concurrent.futures.rst

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -415,13 +415,24 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
415415
require the *fork* start method for :class:`ProcessPoolExecutor` you must
416416
explicitly pass ``mp_context=multiprocessing.get_context("fork")``.
417417

418-
.. method:: terminate_workers(signal=signal.SIGTERM)
418+
.. method:: terminate_workers()
419419

420-
Attempt to terminate all living worker processes immediately by sending
421-
each of them the given signal. If the signal is not specified, the default
422-
signal :data:`signal.SIGTERM` is used. Internally, it will also call
423-
:meth:`Executor.shutdown` to ensure that all other resources associated with
424-
the executor are freed.
420+
Attempt to terminate all living worker processes immediately by calling
421+
:meth:`Process.terminate <multiprocessing.Process.terminate>` on each of them.
422+
Internally, it will also call :meth:`Executor.shutdown` to ensure that all
423+
other resources associated with the executor are freed.
424+
425+
After calling this method the caller should no longer submit tasks to the
426+
executor.
427+
428+
.. versionadded:: next
429+
430+
.. method:: kill_workers()
431+
432+
Attempt to kill all living worker processes immediately by calling
433+
:meth:`Process.terminate <multiprocessing.Process.kill>` on each of them.
434+
Internally, it will also call:meth:`Executor.shutdown` to ensure that all
435+
other resources associated with the executor are freed.
425436

426437
After calling this method the caller should no longer submit tasks to the
427438
executor.

Doc/whatsnew/3.14.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,8 +440,9 @@ contextvars
440440
(Contributed by Andrew Svetlov in :gh:`129889`.)
441441

442442

443-
* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as
444-
a way to terminate all living worker processes in the given pool.
443+
* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
444+
:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
445+
ways to terminate or kill all living worker processes in the given pool.
445446
(Contributed by Charles Machalow in :gh:`128043`.)
446447

447448
ctypes

Lib/concurrent/futures/process.py

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
__author__ = 'Brian Quinlan ([email protected])'
4747

48+
import enum
4849
import os
4950
from concurrent.futures import _base
5051
import queue
@@ -627,6 +628,18 @@ class BrokenProcessPool(_base.BrokenExecutor):
627628
while a future was in the running state.
628629
"""
629630

631+
class _TerminateOrKillOperation(enum.Enum):
632+
"""Enum for _terminate_or_kill_workers().
633+
634+
Used to determine the operation used by the
635+
_terminate_or_kill_workers() method.
636+
"""
637+
# Delegate to call process.terminate()
638+
TERMINATE = 1
639+
640+
# Delegate to call process.kill()
641+
KILL = 2
642+
630643

631644
class ProcessPoolExecutor(_base.Executor):
632645
def __init__(self, max_workers=None, mp_context=None,
@@ -857,19 +870,18 @@ def shutdown(self, wait=True, *, cancel_futures=False):
857870

858871
shutdown.__doc__ = _base.Executor.shutdown.__doc__
859872

860-
def terminate_workers(self, signal=signal.SIGTERM):
861-
"""Attempts to terminate the executor's workers using the given signal.
862-
Iterates through all of the current processes and sends the given signal if
863-
the process is still alive.
873+
def _terminate_or_kill_workers(self, operation: _TerminateOrKillOperation):
874+
"""Attempts to terminate or kill the executor's workers based off the given
875+
operation. Iterates through all of the current processes and performs the
876+
relevant task if the process is still alive.
864877
865878
After terminating workers, the pool will be in a broken state
866879
and no longer usable (for instance, new tasks should not be
867880
submitted).
868-
869-
Args:
870-
signal: The signal to send to each worker process. Defaults to
871-
signal.SIGTERM.
872881
"""
882+
if operation not in _TerminateOrKillOperation._member_map_.values():
883+
raise ValueError(f"Unsupported operation: {operation}")
884+
873885
processes = {}
874886
if self._processes:
875887
processes = self._processes.copy()
@@ -881,7 +893,7 @@ def terminate_workers(self, signal=signal.SIGTERM):
881893
if not processes:
882894
return
883895

884-
for pid, proc in processes.items():
896+
for proc in processes.values():
885897
try:
886898
if not proc.is_alive():
887899
continue
@@ -890,7 +902,32 @@ def terminate_workers(self, signal=signal.SIGTERM):
890902
continue
891903

892904
try:
893-
os.kill(pid, signal)
905+
if operation == _TerminateOrKillOperation.TERMINATE:
906+
proc.terminate()
907+
elif operation == _TerminateOrKillOperation.KILL:
908+
proc.kill()
894909
except ProcessLookupError:
895910
# The process just ended before our signal
896911
continue
912+
913+
def terminate_workers(self):
914+
"""Attempts to terminate the executor's workers.
915+
Iterates through all of the current worker processes and terminates
916+
each one that is still alive.
917+
918+
After terminating workers, the pool will be in a broken state
919+
and no longer usable (for instance, new tasks should not be
920+
submitted).
921+
"""
922+
return self._terminate_or_kill_workers(operation=_TerminateOrKillOperation.TERMINATE)
923+
924+
def kill_workers(self):
925+
"""Attempts to kill the executor's workers.
926+
Iterates through all of the current worker processes and kills
927+
each one that is still alive.
928+
929+
After killing workers, the pool will be in a broken state
930+
and no longer usable (for instance, new tasks should not be
931+
submitted).
932+
"""
933+
return self._terminate_or_kill_workers(operation=_TerminateOrKillOperation.KILL)

Lib/test/test_concurrent_futures/test_process_pool.py

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -228,59 +228,70 @@ def mock_start_new_thread(func, *args, **kwargs):
228228
list(executor.map(mul, [(2, 3)] * 10))
229229
executor.shutdown()
230230

231-
def test_process_pool_executor_terminate_workers(self):
232-
manager = multiprocessing.Manager()
233-
q = manager.Queue()
231+
def test_process_pool_executor_terminate_kill_workers(self):
232+
for function_name in ('terminate_workers', 'kill_workers'):
233+
manager = multiprocessing.Manager()
234+
q = manager.Queue()
234235

235-
with futures.ProcessPoolExecutor(max_workers=1) as executor:
236-
executor.submit(_put_sleep_put, q)
236+
with futures.ProcessPoolExecutor(max_workers=1) as executor:
237+
executor.submit(_put_sleep_put, q)
237238

238-
# We should get started, but not finished since we'll terminate the workers just after
239-
self.assertEqual(q.get(timeout=1), 'started')
239+
# We should get started, but not finished since we'll terminate the workers just after
240+
self.assertEqual(q.get(timeout=1), 'started')
240241

241-
executor.terminate_workers()
242+
getattr(executor, function_name)()
242243

243-
self.assertRaises(queue.Empty, q.get, timeout=1)
244+
self.assertRaises(queue.Empty, q.get, timeout=1)
244245

245-
def test_process_pool_executor_terminate_workers_dead_workers(self):
246-
with futures.ProcessPoolExecutor(max_workers=1) as executor:
247-
future = executor.submit(os._exit, 1)
248-
self.assertRaises(BrokenProcessPool, future.result)
246+
def test_process_pool_executor_terminate_kill_workers_dead_workers(self):
247+
for function_name in ('terminate_workers', 'kill_workers'):
248+
with futures.ProcessPoolExecutor(max_workers=1) as executor:
249+
future = executor.submit(os._exit, 1)
250+
self.assertRaises(BrokenProcessPool, future.result)
249251

250-
@unittest.mock.patch('concurrent.futures.process.os.kill')
251-
def test_process_pool_executor_terminate_workers_not_started_yet(self, mock_kill):
252-
with futures.ProcessPoolExecutor(max_workers=1) as executor:
253-
# The worker has not been started yet, terminate_workers should basically no-op
254-
executor.terminate_workers()
252+
# even though the pool is broken, this shouldn't raise
253+
getattr(executor, function_name)()
255254

256-
mock_kill.assert_not_called()
255+
def test_process_pool_executor_terminate_kill_workers_not_started_yet(self):
256+
for function_name in ('terminate_workers', 'kill_workers'):
257257

258-
def test_process_pool_executor_terminate_workers_stops_pool(self):
259-
with futures.ProcessPoolExecutor(max_workers=1) as executor:
260-
executor.submit(time.sleep, 0).result()
258+
context_with_mocked_process = multiprocessing.get_context()
259+
with unittest.mock.patch.object(context_with_mocked_process, 'Process') as mock_process:
261260

262-
executor.terminate_workers()
261+
with futures.ProcessPoolExecutor(max_workers=1, mp_context=context_with_mocked_process) as executor:
262+
# The worker has not been started yet, terminate/kill_workers should basically no-op
263+
getattr(executor, function_name)()
264+
265+
mock_process.return_value.kill.assert_not_called()
266+
mock_process.return_value.terminate.assert_not_called()
263267

264-
self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)
268+
def test_process_pool_executor_terminate_kill_workers_stops_pool(self):
269+
for function_name in ('terminate_workers', 'kill_workers'):
270+
with futures.ProcessPoolExecutor(max_workers=1) as executor:
271+
executor.submit(time.sleep, 0).result()
265272

266-
@unittest.mock.patch('concurrent.futures.process.os.kill')
267-
def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill):
273+
getattr(executor, function_name)()
274+
275+
self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)
276+
277+
def test_process_pool_executor_terminate_workers(self):
268278
with futures.ProcessPoolExecutor(max_workers=1) as executor:
269-
future = executor.submit(time.sleep, 0)
270-
future.result()
279+
executor._terminate_or_kill_workers = unittest.mock.Mock()
280+
executor.terminate_workers()
281+
282+
executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TerminateOrKillOperation.TERMINATE)
271283

272-
worker_process = list(executor._processes.values())[0]
273-
executor.terminate_workers(signal.SIGABRT)
284+
def test_process_pool_executor_kill_workers(self):
285+
with futures.ProcessPoolExecutor(max_workers=1) as executor:
286+
executor._terminate_or_kill_workers = unittest.mock.Mock()
287+
executor.kill_workers()
274288

275-
mock_kill.assert_called_once_with(worker_process.pid, signal.SIGABRT)
289+
executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TerminateOrKillOperation.KILL)
276290

277-
def test_process_pool_executor_terminate_workers_passes_even_bad_signals(self):
291+
def test_process_pool_executor_terminate_or_kill_workers_invalid_operation(self):
278292
with futures.ProcessPoolExecutor(max_workers=1) as executor:
279-
future = executor.submit(time.sleep, 0)
280-
future.result()
293+
self.assertRaises(ValueError, executor._terminate_or_kill_workers, operation='invalid operation'),
281294

282-
# 'potatoes' isn't a valid signal, so os.kill will raise a TypeError
283-
self.assertRaises(TypeError, executor.terminate_workers, 'potatoes')
284295

285296

286297
create_executor_tests(globals(), ProcessPoolExecutorTest,
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1-
Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as
2-
a way to terminate all living worker processes in the given pool.
3-
(Contributed by Charles Machalow in :gh:`128043`.)
1+
* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
2+
:meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as
3+
ways to terminate or kill all living worker processes in the given pool.
4+
(Contributed by Charles Machalow in :gh:`128043`.)

0 commit comments

Comments
 (0)