Skip to content

Commit f1b0cf6

Browse files
committed
PR feedback: swap to dict with constants, better subtest parameterization, lint
1 parent 1bedb28 commit f1b0cf6

File tree

2 files changed

+83
-81
lines changed

2 files changed

+83
-81
lines changed

Lib/concurrent/futures/process.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545

4646
__author__ = 'Brian Quinlan ([email protected])'
4747

48-
import enum
4948
import os
5049
from concurrent.futures import _base
5150
import queue
@@ -627,17 +626,13 @@ class BrokenProcessPool(_base.BrokenExecutor):
627626
while a future was in the running state.
628627
"""
629628

630-
class _TerminateOrKillOperation(enum.Enum):
631-
"""Enum for _terminate_or_kill_workers().
629+
_TERMINATE = "terminate"
630+
_KILL = "kill"
632631

633-
Used to determine the operation used by the
634-
_terminate_or_kill_workers() method.
635-
"""
636-
# Delegate to call process.terminate()
637-
TERMINATE = 1
638-
639-
# Delegate to call process.kill()
640-
KILL = 2
632+
_TERMINATE_OR_KILL_OPERATION = {
633+
_TERMINATE,
634+
_KILL
635+
}
641636

642637

643638
class ProcessPoolExecutor(_base.Executor):
@@ -869,24 +864,25 @@ def shutdown(self, wait=True, *, cancel_futures=False):
869864

870865
shutdown.__doc__ = _base.Executor.shutdown.__doc__
871866

872-
def _terminate_or_kill_workers(self, operation: _TerminateOrKillOperation):
873-
"""Attempts to terminate or kill the executor's workers based off the given
874-
operation. Iterates through all of the current processes and performs the
875-
relevant task if the process is still alive.
867+
def _terminate_or_kill_workers(self, operation):
868+
"""Attempts to terminate or kill the executor's workers based off the
869+
given operation. Iterates through all of the current processes and
870+
performs the relevant task if the process is still alive.
876871
877872
After terminating workers, the pool will be in a broken state
878873
and no longer usable (for instance, new tasks should not be
879874
submitted).
880875
"""
881-
if operation not in _TerminateOrKillOperation:
876+
if operation not in _TERMINATE_OR_KILL_OPERATION:
882877
raise ValueError(f"Unsupported operation: {operation}")
883878

884879
processes = {}
885880
if self._processes:
886881
processes = self._processes.copy()
887882

888-
# shutdown will invalidate ._processes, so we copy it right before calling.
889-
# If we waited here, we would deadlock if a process decides not to exit.
883+
# shutdown will invalidate ._processes, so we copy it right before
884+
# calling. If we waited here, we would deadlock if a process decides not
885+
# to exit.
890886
self.shutdown(wait=False, cancel_futures=True)
891887

892888
if not processes:
@@ -901,9 +897,9 @@ def _terminate_or_kill_workers(self, operation: _TerminateOrKillOperation):
901897
continue
902898

903899
try:
904-
if operation == _TerminateOrKillOperation.TERMINATE:
900+
if operation == _TERMINATE:
905901
proc.terminate()
906-
elif operation == _TerminateOrKillOperation.KILL:
902+
elif operation == _KILL:
907903
proc.kill()
908904
except ProcessLookupError:
909905
# The process just ended before our signal
@@ -918,7 +914,7 @@ def terminate_workers(self):
918914
and no longer usable (for instance, new tasks should not be
919915
submitted).
920916
"""
921-
return self._terminate_or_kill_workers(operation=_TerminateOrKillOperation.TERMINATE)
917+
return self._terminate_or_kill_workers(operation=_TERMINATE)
922918

923919
def kill_workers(self):
924920
"""Attempts to kill the executor's workers.
@@ -929,4 +925,4 @@ def kill_workers(self):
929925
and no longer usable (for instance, new tasks should not be
930926
submitted).
931927
"""
932-
return self._terminate_or_kill_workers(operation=_TerminateOrKillOperation.KILL)
928+
return self._terminate_or_kill_workers(operation=_KILL)

Lib/test/test_concurrent_futures/test_process_pool.py

Lines changed: 65 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from test import support
1414
from test.support import hashlib_helper
15+
from test.test_importlib.metadata.fixtures import parameterize
1516

1617
from .executor import ExecutorTest, mul
1718
from .util import (
@@ -26,6 +27,11 @@ def __init__(self, mgr):
2627
def __del__(self):
2728
self.event.set()
2829

30+
TERMINATE_OR_KILL_PARAMS = [
31+
dict(function_name='terminate_workers'),
32+
dict(function_name='kill_workers'),
33+
]
34+
2935
def _put_sleep_put(queue):
3036
""" Used as part of test_process_pool_executor_terminate_workers """
3137
queue.put('started')
@@ -228,80 +234,80 @@ def mock_start_new_thread(func, *args, **kwargs):
228234
list(executor.map(mul, [(2, 3)] * 10))
229235
executor.shutdown()
230236

231-
def test_process_pool_executor_terminate_kill_workers(self):
232-
for function_name in ('terminate_workers', 'kill_workers'):
233-
with self.subTest(function_name):
234-
manager = multiprocessing.Manager()
235-
q = manager.Queue()
236-
237-
with futures.ProcessPoolExecutor(max_workers=1) as executor:
238-
executor.submit(_put_sleep_put, q)
239-
240-
# We should get started, but not finished since we'll terminate the workers just after
241-
self.assertEqual(q.get(timeout=5), 'started')
237+
def test_process_pool_executor_terminate_workers(self):
238+
with futures.ProcessPoolExecutor(max_workers=1) as executor:
239+
executor._terminate_or_kill_workers = unittest.mock.Mock()
240+
executor.terminate_workers()
242241

243-
worker_process = list(executor._processes.values())[0]
244-
getattr(executor, function_name)()
245-
worker_process.join()
242+
executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TERMINATE)
246243

247-
if function_name == 'terminate_workers' or sys.platform == 'win32':
248-
# On windows, kill and terminate both send SIGTERM
249-
self.assertEqual(worker_process.exitcode, -signal.SIGTERM)
250-
elif function_name == 'kill_workers':
251-
self.assertEqual(worker_process.exitcode, -signal.SIGKILL)
244+
def test_process_pool_executor_kill_workers(self):
245+
with futures.ProcessPoolExecutor(max_workers=1) as executor:
246+
executor._terminate_or_kill_workers = unittest.mock.Mock()
247+
executor.kill_workers()
252248

253-
self.assertRaises(queue.Empty, q.get, timeout=1)
249+
executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._KILL)
254250

255-
def test_process_pool_executor_terminate_kill_workers_dead_workers(self):
256-
for function_name in ('terminate_workers', 'kill_workers'):
257-
with self.subTest(function_name=function_name):
258-
with futures.ProcessPoolExecutor(max_workers=1) as executor:
259-
future = executor.submit(os._exit, 1)
260-
self.assertRaises(BrokenProcessPool, future.result)
251+
def test_process_pool_executor_terminate_or_kill_workers_invalid_op(self):
252+
with futures.ProcessPoolExecutor(max_workers=1) as executor:
253+
self.assertRaises(ValueError,
254+
executor._terminate_or_kill_workers,
255+
operation='invalid operation'),
261256

262-
# even though the pool is broken, this shouldn't raise
263-
getattr(executor, function_name)()
257+
@parameterize(*TERMINATE_OR_KILL_PARAMS)
258+
def test_process_pool_executor_terminate_kill_workers(self, function_name):
259+
manager = multiprocessing.Manager()
260+
q = manager.Queue()
264261

265-
def test_process_pool_executor_terminate_kill_workers_not_started_yet(self):
266-
for function_name in ('terminate_workers', 'kill_workers'):
267-
with self.subTest(function_name=function_name):
268-
context_with_mocked_process = multiprocessing.get_context()
269-
with unittest.mock.patch.object(context_with_mocked_process, 'Process') as mock_process:
270-
with futures.ProcessPoolExecutor(max_workers=1, mp_context=context_with_mocked_process) as executor:
271-
# The worker has not been started yet, terminate/kill_workers should basically no-op
272-
getattr(executor, function_name)()
262+
with futures.ProcessPoolExecutor(max_workers=1) as executor:
263+
executor.submit(_put_sleep_put, q)
273264

274-
mock_process.return_value.kill.assert_not_called()
275-
mock_process.return_value.terminate.assert_not_called()
265+
# We should get started, but not finished since we'll terminate the
266+
# workers just after
267+
self.assertEqual(q.get(timeout=5), 'started')
276268

277-
def test_process_pool_executor_terminate_kill_workers_stops_pool(self):
278-
for function_name in ('terminate_workers', 'kill_workers'):
279-
with self.subTest(function_name=function_name):
280-
with futures.ProcessPoolExecutor(max_workers=1) as executor:
281-
executor.submit(time.sleep, 0).result()
269+
worker_process = list(executor._processes.values())[0]
270+
getattr(executor, function_name)()
271+
worker_process.join()
282272

283-
getattr(executor, function_name)()
273+
if function_name == 'terminate_workers' or \
274+
sys.platform == 'win32':
275+
# On windows, kill and terminate both send SIGTERM
276+
self.assertEqual(worker_process.exitcode, -signal.SIGTERM)
277+
elif function_name == 'kill_workers':
278+
self.assertEqual(worker_process.exitcode, -signal.SIGKILL)
284279

285-
self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)
280+
self.assertRaises(queue.Empty, q.get, timeout=1)
286281

287-
def test_process_pool_executor_terminate_workers(self):
282+
@parameterize(*TERMINATE_OR_KILL_PARAMS)
283+
def test_process_pool_executor_terminate_kill_workers_dead_workers(self, function_name):
288284
with futures.ProcessPoolExecutor(max_workers=1) as executor:
289-
executor._terminate_or_kill_workers = unittest.mock.Mock()
290-
executor.terminate_workers()
291-
292-
executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TerminateOrKillOperation.TERMINATE)
293-
294-
def test_process_pool_executor_kill_workers(self):
285+
future = executor.submit(os._exit, 1)
286+
self.assertRaises(BrokenProcessPool, future.result)
287+
288+
# even though the pool is broken, this shouldn't raise
289+
getattr(executor, function_name)()
290+
291+
@parameterize(*TERMINATE_OR_KILL_PARAMS)
292+
def test_process_pool_executor_terminate_kill_workers_not_started_yet(self, function_name):
293+
ctx = self.get_context()
294+
with unittest.mock.patch.object(ctx, 'Process') as mock_process:
295+
with futures.ProcessPoolExecutor(max_workers=1, mp_context=ctx) as executor:
296+
# The worker has not been started yet, terminate/kill_workers
297+
# should basically no-op
298+
getattr(executor, function_name)()
299+
300+
mock_process.return_value.kill.assert_not_called()
301+
mock_process.return_value.terminate.assert_not_called()
302+
303+
@parameterize(*TERMINATE_OR_KILL_PARAMS)
304+
def test_process_pool_executor_terminate_kill_workers_stops_pool(self, function_name):
295305
with futures.ProcessPoolExecutor(max_workers=1) as executor:
296-
executor._terminate_or_kill_workers = unittest.mock.Mock()
297-
executor.kill_workers()
306+
executor.submit(time.sleep, 0).result()
298307

299-
executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TerminateOrKillOperation.KILL)
300-
301-
def test_process_pool_executor_terminate_or_kill_workers_invalid_operation(self):
302-
with futures.ProcessPoolExecutor(max_workers=1) as executor:
303-
self.assertRaises(ValueError, executor._terminate_or_kill_workers, operation='invalid operation'),
308+
getattr(executor, function_name)()
304309

310+
self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)
305311

306312

307313
create_executor_tests(globals(), ProcessPoolExecutorTest,

0 commit comments

Comments
 (0)