1+ import multiprocessing
12import os
3+ import queue
24import sys
35import threading
46import time
57import unittest
8+ import unittest .mock
69from concurrent import futures
710from concurrent .futures .process import BrokenProcessPool
811
@@ -22,6 +25,12 @@ def __init__(self, mgr):
2225 def __del__ (self ):
2326 self .event .set ()
2427
28+ def _put_sleep_put (queue ):
29+ """ Used as part of test_process_pool_executor_terminate_workers """
30+ queue .put ('started' )
31+ time .sleep (2 )
32+ queue .put ('finished' )
33+
2534
2635class ProcessPoolExecutorTest (ExecutorTest ):
2736
@@ -218,6 +227,60 @@ def mock_start_new_thread(func, *args, **kwargs):
218227 list (executor .map (mul , [(2 , 3 )] * 10 ))
219228 executor .shutdown ()
220229
230+ def test_process_pool_executor_terminate_workers (self ):
231+ manager = multiprocessing .Manager ()
232+ q = manager .Queue ()
233+
234+ with futures .ProcessPoolExecutor (max_workers = 1 ) as executor :
235+ executor .submit (_put_sleep_put , q )
236+
237+ # We should get started, but not finished since we'll terminate the workers just after
238+ self .assertEqual (q .get (timeout = 1 ), 'started' )
239+
240+ executor .terminate_workers ()
241+
242+ try :
243+ q .get (timeout = 1 )
244+ raise RuntimeError ("Queue should not have gotten a second value" )
245+ except queue .Empty :
246+ pass
247+
248+ def test_process_pool_executor_terminate_workers_dead_workers (self ):
249+ with futures .ProcessPoolExecutor (max_workers = 1 ) as executor :
250+ try :
251+ executor .submit (os ._exit , 1 ).result ()
252+ except BrokenProcessPool :
253+ # BrokenProcessPool will be raised by our call to .result() since the worker will die
254+ pass
255+
256+ # The worker has been killed already, terminate_workers should basically no-op
257+ executor .terminate_workers ()
258+
259+ def test_process_pool_executor_terminate_workers_not_started_yet (self ):
260+ with futures .ProcessPoolExecutor (max_workers = 1 ) as executor :
261+ # The worker has not been started yet, terminate_workers should basically no-op
262+ executor .terminate_workers ()
263+
264+ def test_process_pool_executor_terminate_workers_stops_pool (self ):
265+ with futures .ProcessPoolExecutor (max_workers = 1 ) as executor :
266+ executor .submit (time .sleep , 0 ).result ()
267+
268+ executor .terminate_workers ()
269+
270+ try :
271+ executor .submit (time .sleep , 0 ).result ()
272+ raise RuntimeError ("Should have raised BrokenProcessPool" )
273+ except BrokenProcessPool :
274+ pass
275+
276+ @unittest .mock .patch ('concurrent.futures.process.os.kill' )
277+ def test_process_pool_executor_terminate_workers_passes_signal (self , mock_kill ):
278+ with futures .ProcessPoolExecutor (max_workers = 1 ) as executor :
279+ executor .submit (time .sleep , 0 ).result ()
280+
281+ executor .terminate_workers (9 )
282+ mock_kill .assert_called_once_with (list (executor ._processes .values ())[0 ].pid , 9 )
283+
221284
222285create_executor_tests (globals (), ProcessPoolExecutorTest ,
223286 executor_mixins = (ProcessPoolForkMixin ,
0 commit comments