11import os
2+ import queue
3+ import signal
24import sys
35import threading
46import time
57import unittest
8+ import unittest .mock
69from concurrent import futures
710from concurrent .futures .process import BrokenProcessPool
811
912from test import support
1013from test .support import hashlib_helper
14+ from test .test_importlib .metadata .fixtures import parameterize
1115
1216from .executor import ExecutorTest , mul
1317from .util import (
@@ -22,6 +26,19 @@ def __init__(self, mgr):
2226 def __del__ (self ):
2327 self .event .set ()
2428
29+ TERMINATE_WORKERS = futures .ProcessPoolExecutor .terminate_workers .__name__
30+ KILL_WORKERS = futures .ProcessPoolExecutor .kill_workers .__name__
31+ FORCE_SHUTDOWN_PARAMS = [
32+ dict (function_name = TERMINATE_WORKERS ),
33+ dict (function_name = KILL_WORKERS ),
34+ ]
35+
36+ def _put_sleep_put (queue ):
37+ """ Used as part of test_terminate_workers """
38+ queue .put ('started' )
39+ time .sleep (2 )
40+ queue .put ('finished' )
41+
2542
2643class ProcessPoolExecutorTest (ExecutorTest ):
2744
@@ -218,6 +235,86 @@ def mock_start_new_thread(func, *args, **kwargs):
218235 list (executor .map (mul , [(2 , 3 )] * 10 ))
219236 executor .shutdown ()
220237
238+ def test_terminate_workers (self ):
239+ mock_fn = unittest .mock .Mock ()
240+ with self .executor_type (max_workers = 1 ) as executor :
241+ executor ._force_shutdown = mock_fn
242+ executor .terminate_workers ()
243+
244+ mock_fn .assert_called_once_with (operation = futures .process ._TERMINATE )
245+
246+ def test_kill_workers (self ):
247+ mock_fn = unittest .mock .Mock ()
248+ with self .executor_type (max_workers = 1 ) as executor :
249+ executor ._force_shutdown = mock_fn
250+ executor .kill_workers ()
251+
252+ mock_fn .assert_called_once_with (operation = futures .process ._KILL )
253+
254+ def test_force_shutdown_workers_invalid_op (self ):
255+ with self .executor_type (max_workers = 1 ) as executor :
256+ self .assertRaises (ValueError ,
257+ executor ._force_shutdown ,
258+ operation = 'invalid operation' ),
259+
260+ @parameterize (* FORCE_SHUTDOWN_PARAMS )
261+ def test_force_shutdown_workers (self , function_name ):
262+ manager = self .get_context ().Manager ()
263+ q = manager .Queue ()
264+
265+ with self .executor_type (max_workers = 1 ) as executor :
266+ executor .submit (_put_sleep_put , q )
267+
268+ # We should get started, but not finished since we'll terminate the
269+ # workers just after
270+ self .assertEqual (q .get (timeout = 5 ), 'started' )
271+
272+ worker_process = list (executor ._processes .values ())[0 ]
273+ getattr (executor , function_name )()
274+ worker_process .join ()
275+
276+ if function_name == TERMINATE_WORKERS or \
277+ sys .platform == 'win32' :
278+ # On windows, kill and terminate both send SIGTERM
279+ self .assertEqual (worker_process .exitcode , - signal .SIGTERM )
280+ elif function_name == KILL_WORKERS :
281+ self .assertEqual (worker_process .exitcode , - signal .SIGKILL )
282+ else :
283+ self .fail (f"Unknown operation: { function_name } " )
284+
285+ self .assertRaises (queue .Empty , q .get , timeout = 1 )
286+
287+ @parameterize (* FORCE_SHUTDOWN_PARAMS )
288+ def test_force_shutdown_workers_dead_workers (self , function_name ):
289+ with self .executor_type (max_workers = 1 ) as executor :
290+ future = executor .submit (os ._exit , 1 )
291+ self .assertRaises (BrokenProcessPool , future .result )
292+
293+ # even though the pool is broken, this shouldn't raise
294+ getattr (executor , function_name )()
295+
296+ @parameterize (* FORCE_SHUTDOWN_PARAMS )
297+ def test_force_shutdown_workers_not_started_yet (self , function_name ):
298+ ctx = self .get_context ()
299+ with unittest .mock .patch .object (ctx , 'Process' ) as mock_process :
300+ with self .executor_type (max_workers = 1 , mp_context = ctx ) as executor :
301+ # The worker has not been started yet, terminate/kill_workers
302+ # should basically no-op
303+ getattr (executor , function_name )()
304+
305+ mock_process .return_value .kill .assert_not_called ()
306+ mock_process .return_value .terminate .assert_not_called ()
307+
308+ @parameterize (* FORCE_SHUTDOWN_PARAMS )
309+ def test_force_shutdown_workers_stops_pool (self , function_name ):
310+ with self .executor_type (max_workers = 1 ) as executor :
311+ task = executor .submit (time .sleep , 0 )
312+ self .assertIsNone (task .result ())
313+
314+ getattr (executor , function_name )()
315+
316+ self .assertRaises (RuntimeError , executor .submit , time .sleep , 0 )
317+
221318
222319create_executor_tests (globals (), ProcessPoolExecutorTest ,
223320 executor_mixins = (ProcessPoolForkMixin ,
0 commit comments