11import  os 
2+ import  queue 
3+ import  signal 
24import  sys 
35import  threading 
46import  time 
57import  unittest 
8+ import  unittest .mock 
69from  concurrent  import  futures 
710from  concurrent .futures .process  import  BrokenProcessPool 
811
912from  test  import  support 
1013from  test .support  import  hashlib_helper 
14+ from  test .test_importlib .metadata .fixtures  import  parameterize 
1115
1216from  .executor  import  ExecutorTest , mul 
1317from  .util  import  (
@@ -22,6 +26,21 @@ def __init__(self, mgr):
2226    def  __del__ (self ):
2327        self .event .set ()
2428
29+ TERMINATE_WORKERS  =  futures .ProcessPoolExecutor .terminate_workers .__name__ 
30+ KILL_WORKERS  =  futures .ProcessPoolExecutor .kill_workers .__name__ 
31+ FORCE_SHUTDOWN_PARAMS  =  [
32+     dict (function_name = TERMINATE_WORKERS ),
33+     dict (function_name = KILL_WORKERS ),
34+ ]
35+ 
36+ def  _put_wait_put (queue , event ):
37+     """ Used as part of test_terminate_workers """ 
38+     queue .put ('started' )
39+     event .wait ()
40+ 
41+     # We should never get here since the event will not get set 
42+     queue .put ('finished' )
43+ 
2544
2645class  ProcessPoolExecutorTest (ExecutorTest ):
2746
@@ -218,6 +237,107 @@ def mock_start_new_thread(func, *args, **kwargs):
218237                    list (executor .map (mul , [(2 , 3 )] *  10 ))
219238            executor .shutdown ()
220239
240+     def  test_terminate_workers (self ):
241+         mock_fn  =  unittest .mock .Mock ()
242+         with  self .executor_type (max_workers = 1 ) as  executor :
243+             executor ._force_shutdown  =  mock_fn 
244+             executor .terminate_workers ()
245+ 
246+         mock_fn .assert_called_once_with (operation = futures .process ._TERMINATE )
247+ 
248+     def  test_kill_workers (self ):
249+         mock_fn  =  unittest .mock .Mock ()
250+         with  self .executor_type (max_workers = 1 ) as  executor :
251+             executor ._force_shutdown  =  mock_fn 
252+             executor .kill_workers ()
253+ 
254+         mock_fn .assert_called_once_with (operation = futures .process ._KILL )
255+ 
256+     def  test_force_shutdown_workers_invalid_op (self ):
257+         with  self .executor_type (max_workers = 1 ) as  executor :
258+             self .assertRaises (ValueError ,
259+                               executor ._force_shutdown ,
260+                               operation = 'invalid operation' ),
261+ 
262+     @parameterize (* FORCE_SHUTDOWN_PARAMS ) 
263+     def  test_force_shutdown_workers (self , function_name ):
264+         manager  =  self .get_context ().Manager ()
265+         q  =  manager .Queue ()
266+         e  =  manager .Event ()
267+ 
268+         with  self .executor_type (max_workers = 1 ) as  executor :
269+             executor .submit (_put_wait_put , q , e )
270+ 
271+             # We should get started, but not finished since we'll terminate the 
272+             # workers just after and never set the event. 
273+             self .assertEqual (q .get (timeout = support .SHORT_TIMEOUT ), 'started' )
274+ 
275+             worker_process  =  list (executor ._processes .values ())[0 ]
276+ 
277+             Mock  =  unittest .mock .Mock 
278+             worker_process .terminate  =  Mock (wraps = worker_process .terminate )
279+             worker_process .kill  =  Mock (wraps = worker_process .kill )
280+ 
281+             getattr (executor , function_name )()
282+             worker_process .join ()
283+ 
284+             if  function_name  ==  TERMINATE_WORKERS :
285+                 worker_process .terminate .assert_called ()
286+             elif  function_name  ==  KILL_WORKERS :
287+                 worker_process .kill .assert_called ()
288+             else :
289+                 self .fail (f"Unknown operation: { function_name }  )
290+ 
291+             self .assertRaises (queue .Empty , q .get , timeout = 0.01 )
292+ 
293+     @parameterize (* FORCE_SHUTDOWN_PARAMS ) 
294+     def  test_force_shutdown_workers_dead_workers (self , function_name ):
295+         with  self .executor_type (max_workers = 1 ) as  executor :
296+             future  =  executor .submit (os ._exit , 1 )
297+             self .assertRaises (BrokenProcessPool , future .result )
298+ 
299+             # even though the pool is broken, this shouldn't raise 
300+             getattr (executor , function_name )()
301+ 
302+     @parameterize (* FORCE_SHUTDOWN_PARAMS ) 
303+     def  test_force_shutdown_workers_not_started_yet (self , function_name ):
304+         ctx  =  self .get_context ()
305+         with  unittest .mock .patch .object (ctx , 'Process' ) as  mock_process :
306+             with  self .executor_type (max_workers = 1 , mp_context = ctx ) as  executor :
307+                 # The worker has not been started yet, terminate/kill_workers 
308+                 # should basically no-op 
309+                 getattr (executor , function_name )()
310+ 
311+             mock_process .return_value .kill .assert_not_called ()
312+             mock_process .return_value .terminate .assert_not_called ()
313+ 
314+     @parameterize (* FORCE_SHUTDOWN_PARAMS ) 
315+     def  test_force_shutdown_workers_stops_pool (self , function_name ):
316+         with  self .executor_type (max_workers = 1 ) as  executor :
317+             task  =  executor .submit (time .sleep , 0 )
318+             self .assertIsNone (task .result ())
319+ 
320+             worker_process  =  list (executor ._processes .values ())[0 ]
321+             getattr (executor , function_name )()
322+ 
323+             self .assertRaises (RuntimeError , executor .submit , time .sleep , 0 )
324+ 
325+             # A signal sent, is not a signal reacted to. 
326+             # So wait a moment here for the process to die. 
327+             # If we don't, every once in a while we may get an ENV CHANGE 
328+             # error since the process would be alive immediately after the 
329+             # test run.. and die a moment later. 
330+             worker_process .join (support .SHORT_TIMEOUT )
331+ 
332+             # Oddly enough, even though join completes, sometimes it takes a 
333+             # moment for the process to actually be marked as dead. 
334+             # ...  that seems a bit buggy. 
335+             # We need it dead before ending the test to ensure it doesn't 
336+             # get marked as an ENV CHANGE due to living child process. 
337+             for  _  in  support .sleeping_retry (support .SHORT_TIMEOUT ):
338+                 if  not  worker_process .is_alive ():
339+                     break 
340+ 
221341
222342create_executor_tests (globals (), ProcessPoolExecutorTest ,
223343                      executor_mixins = (ProcessPoolForkMixin ,
0 commit comments