11import  os 
2- import  queue 
3- import  signal 
42import  sys 
53import  threading 
64import  time 
75import  unittest 
8- import  unittest .mock 
96from  concurrent  import  futures 
107from  concurrent .futures .process  import  BrokenProcessPool 
118
129from  test  import  support 
1310from  test .support  import  hashlib_helper 
14- from  test .test_importlib .metadata .fixtures  import  parameterize 
1511
1612from  .executor  import  ExecutorTest , mul 
1713from  .util  import  (
@@ -26,19 +22,6 @@ def __init__(self, mgr):
2622    def  __del__ (self ):
2723        self .event .set ()
2824
29- TERMINATE_WORKERS  =  futures .ProcessPoolExecutor .terminate_workers .__name__ 
30- KILL_WORKERS  =  futures .ProcessPoolExecutor .kill_workers .__name__ 
31- FORCE_SHUTDOWN_PARAMS  =  [
32-     dict (function_name = TERMINATE_WORKERS ),
33-     dict (function_name = KILL_WORKERS ),
34- ]
35- 
36- def  _put_sleep_put (queue ):
37-     """ Used as part of test_terminate_workers """ 
38-     queue .put ('started' )
39-     time .sleep (2 )
40-     queue .put ('finished' )
41- 
4225
4326class  ProcessPoolExecutorTest (ExecutorTest ):
4427
@@ -235,86 +218,6 @@ def mock_start_new_thread(func, *args, **kwargs):
235218                    list (executor .map (mul , [(2 , 3 )] *  10 ))
236219            executor .shutdown ()
237220
238-     def  test_terminate_workers (self ):
239-         mock_fn  =  unittest .mock .Mock ()
240-         with  self .executor_type (max_workers = 1 ) as  executor :
241-             executor ._force_shutdown  =  mock_fn 
242-             executor .terminate_workers ()
243- 
244-         mock_fn .assert_called_once_with (operation = futures .process ._TERMINATE )
245- 
246-     def  test_kill_workers (self ):
247-         mock_fn  =  unittest .mock .Mock ()
248-         with  self .executor_type (max_workers = 1 ) as  executor :
249-             executor ._force_shutdown  =  mock_fn 
250-             executor .kill_workers ()
251- 
252-         mock_fn .assert_called_once_with (operation = futures .process ._KILL )
253- 
254-     def  test_force_shutdown_workers_invalid_op (self ):
255-         with  self .executor_type (max_workers = 1 ) as  executor :
256-             self .assertRaises (ValueError ,
257-                               executor ._force_shutdown ,
258-                               operation = 'invalid operation' ),
259- 
260-     @parameterize (* FORCE_SHUTDOWN_PARAMS ) 
261-     def  test_force_shutdown_workers (self , function_name ):
262-         manager  =  self .get_context ().Manager ()
263-         q  =  manager .Queue ()
264- 
265-         with  self .executor_type (max_workers = 1 ) as  executor :
266-             executor .submit (_put_sleep_put , q )
267- 
268-             # We should get started, but not finished since we'll terminate the 
269-             # workers just after 
270-             self .assertEqual (q .get (timeout = 5 ), 'started' )
271- 
272-             worker_process  =  list (executor ._processes .values ())[0 ]
273-             getattr (executor , function_name )()
274-             worker_process .join ()
275- 
276-             if  function_name  ==  TERMINATE_WORKERS  or  \
277-                 sys .platform  ==  'win32' :
278-                 # On windows, kill and terminate both send SIGTERM 
279-                 self .assertEqual (worker_process .exitcode , - signal .SIGTERM )
280-             elif  function_name  ==  KILL_WORKERS :
281-                 self .assertEqual (worker_process .exitcode , - signal .SIGKILL )
282-             else :
283-                 self .fail (f"Unknown operation: { function_name }  " )
284- 
285-             self .assertRaises (queue .Empty , q .get , timeout = 1 )
286- 
287-     @parameterize (* FORCE_SHUTDOWN_PARAMS ) 
288-     def  test_force_shutdown_workers_dead_workers (self , function_name ):
289-         with  self .executor_type (max_workers = 1 ) as  executor :
290-             future  =  executor .submit (os ._exit , 1 )
291-             self .assertRaises (BrokenProcessPool , future .result )
292- 
293-             # even though the pool is broken, this shouldn't raise 
294-             getattr (executor , function_name )()
295- 
296-     @parameterize (* FORCE_SHUTDOWN_PARAMS ) 
297-     def  test_force_shutdown_workers_not_started_yet (self , function_name ):
298-         ctx  =  self .get_context ()
299-         with  unittest .mock .patch .object (ctx , 'Process' ) as  mock_process :
300-             with  self .executor_type (max_workers = 1 , mp_context = ctx ) as  executor :
301-                 # The worker has not been started yet, terminate/kill_workers 
302-                 # should basically no-op 
303-                 getattr (executor , function_name )()
304- 
305-             mock_process .return_value .kill .assert_not_called ()
306-             mock_process .return_value .terminate .assert_not_called ()
307- 
308-     @parameterize (* FORCE_SHUTDOWN_PARAMS ) 
309-     def  test_force_shutdown_workers_stops_pool (self , function_name ):
310-         with  self .executor_type (max_workers = 1 ) as  executor :
311-             task  =  executor .submit (time .sleep , 0 )
312-             self .assertIsNone (task .result ())
313- 
314-             getattr (executor , function_name )()
315- 
316-             self .assertRaises (RuntimeError , executor .submit , time .sleep , 0 )
317- 
318221
319222create_executor_tests (globals (), ProcessPoolExecutorTest ,
320223                      executor_mixins = (ProcessPoolForkMixin ,
0 commit comments