-
-
Notifications
You must be signed in to change notification settings - Fork 33.2k
Description
Bug report
Bug description:
ProcessPoolExecutor workers do not handle KeyboardInterrupt properly and leads to messy cleanup that either takes FOREVER or requires you to interrupt/break the join procedure.
Consistently, I need to interrupt at least 2 or 3 times in order to get the script to actually exit.
Description
This happens because workers ignore KeyboardInterrupts and assume they come from the call item function.
Because of this, they will continue processing the rest of the items in the queue.
(futures look like: [done, done, Interrupt, Interrupt, done, done, done, done]
)
The closest way to get the pool to shutdown is to use executor.shutdown(cancel_futures=True)
, however there is a race condition where the child receives the interrupt before the main process and starts the next item before the executor has the time to cancel the futures (it happens nearly every time).
(futures look like: [done, done, Interrupt, Interrupt, done, done, cancel, cancel]
)
This means that you need to either wait for the extra work item to complete (could be hours!), or interrupt the join procedure.
This seems like undesirable behavior to me, especially with longer running tasks.
When I keyboard interrupt, I want all tasks to stop, not just the current work item.
Proposal
Workers should exit when receiving KeyboardInterrupt.
with ProcessPoolExecutor() as executor: # fixed functionality
...
with ProcessPoolExecutor(interruptable_worker=False) as executor: # legacy functionality
...
I'm not sure under what scenarios the current functionality is desirable, but in the off-chance there is one, it could be configurable via a flag. Alternatively, you could let the worker exit and respawn the workers if necessary.
Fix π
This alternative worker function lets ProcessPoolExecutor exit immediately:
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
return
num_tasks = 0
exit_pid = None
while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(os.getpid())
return
if max_tasks is not None:
num_tasks += 1
if num_tasks >= max_tasks:
exit_pid = os.getpid()
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except KeyboardInterrupt as e:
exit_pid = os.getpid() # +++ Added this so KeyboardInterrupts stop the worker
exc = _ExceptionWithTraceback(e, e.__traceback__)
_sendback_result(result_queue, call_item.work_id, exception=exc,
exit_pid=exit_pid)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
_sendback_result(result_queue, call_item.work_id, exception=exc,
exit_pid=exit_pid)
else:
_sendback_result(result_queue, call_item.work_id, result=r,
exit_pid=exit_pid)
del r
del call_item
if exit_pid is not None:
return
Rest of the implementation:
import os
from concurrent.futures.process import _base, _ResultItem, _ExceptionWithTraceback, ProcessPoolExecutor
# _ResultItem, _sendback_result, _max_tasks_per_child needed for 3.10 compatability
class _ResultItem(_ResultItem):
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
self.work_id = work_id
self.exception = exception
self.result = result
self.exit_pid = exit_pid
def _sendback_result(result_queue, work_id, result=None, exception=None,
exit_pid=None):
try:
result_queue.put(_ResultItem(work_id, result=result,
exception=exception, exit_pid=exit_pid))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc,
exit_pid=exit_pid))
class ProcessPoolExecutor(ProcessPoolExecutor):
_max_tasks_per_child = None
def _spawn_process(self):
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._initializer,
self._initargs,
self._max_tasks_per_child))
p.start()
self._processes[p.pid] = p
import time
def work(x):
print("Starting", x)
for i in range(1000):
time.sleep(0.1)
if i % 100 == 0:
print(x, i)
print("Done", x)
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=3) as executor:
try:
futures = []
for i in range(30):
futures.append(executor.submit(work, i))
for future in futures:
future.result()
except KeyboardInterrupt:
pass
print("All Done")
Outputs:
Starting 0
Starting 1
Starting 2
0 0
1 0
2 0
^CAll Done
I can't tell you how satisfying this is after 10 years of python with broken process interrupts.
Reproduce original bug
import time
from concurrent.futures import ProcessPoolExecutor
def work(x):
print("Starting", x)
for i in range(1000):
time.sleep(0.1)
if i % 100 == 0:
print(x, i)
print("Done", x)
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=3) as executor:
try:
futures = []
for i in range(30):
futures.append(executor.submit(work, i))
for future in futures:
future.result()
except KeyboardInterrupt:
executor.shutdown(cancel_futures=True)
print("All Done")
with executor.shutdown(cancel_futures=True)
:
executor.shutdown(cancel_futures=True)
:New tasks start after interrupt is issued. Multiple interrupts needed to exit.
Starting 0
Starting 1
Starting 2
0 0
1 0
2 0
0 100
1 100
2 100
^CStarting 3
3 0
3 100
3 200
^CProcess SpawnProcess-2:
Process SpawnProcess-3:
Traceback (most recent call last):
File "/Users/bea/Desktop/process_interrupt.py", line 19, in <module>
future.result()
File "/opt/miniconda/envs/flood/lib/python3.10/concurrent/futures/_base.py", line 453, in result
Exception in thread Thread-1:
Traceback (most recent call last):
File "/opt/miniconda/envs/flood/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
Traceback (most recent call last):
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/miniconda/envs/flood/lib/python3.10/concurrent/futures/process.py", line 240, in _process_worker
call_item = call_queue.get(block=True)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/queues.py", line 103, in get
res = self._recv_bytes()
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/connection.py", line 414, in _recv_bytes
buf = self._recv(4)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
self._condition.wait(timeout)
File "/opt/miniconda/envs/flood/lib/python3.10/threading.py", line 320, in wait
Traceback (most recent call last):
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/miniconda/envs/flood/lib/python3.10/concurrent/futures/process.py", line 240, in _process_worker
call_item = call_queue.get(block=True)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/queues.py", line 102, in get
with self._rlock:
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
self.run()
File "/opt/miniconda/envs/flood/lib/python3.10/concurrent/futures/process.py", line 320, in run
waiter.acquire()
KeyboardInterrupt
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/bea/Desktop/process_interrupt.py", line 21, in <module>
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
File "/opt/miniconda/envs/flood/lib/python3.10/concurrent/futures/process.py", line 385, in wait_result_broken_or_wakeup
executor.shutdown(cancel_futures=True)
File "/opt/miniconda/envs/flood/lib/python3.10/concurrent/futures/process.py", line 780, in shutdown
self._executor_manager_thread.join()
File "/opt/miniconda/envs/flood/lib/python3.10/threading.py", line 1096, in join
ready = mp.connection.wait(readers + worker_sentinels)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/connection.py", line 925, in wait
self._wait_for_tstate_lock()
File "/opt/miniconda/envs/flood/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
selector.register(obj, selectors.EVENT_READ)
File "/opt/miniconda/envs/flood/lib/python3.10/selectors.py", line 353, in register
if lock.acquire(block, timeout):
KeyboardInterrupt
key = super().register(fileobj, events, data)
File "/opt/miniconda/envs/flood/lib/python3.10/selectors.py", line 239, in register
key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
File "/opt/miniconda/envs/flood/lib/python3.10/selectors.py", line 226, in _fileobj_lookup
return _fileobj_to_fd(fileobj)
File "/opt/miniconda/envs/flood/lib/python3.10/selectors.py", line 37, in _fileobj_to_fd
fd = int(fileobj.fileno())
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/connection.py", line 170, in fileno
self._check_closed()
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/connection.py", line 136, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
^CException ignored in atexit callback: <function _exit_function at 0x10172f130>
Traceback (most recent call last):
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/util.py", line 357, in _exit_function
Process SpawnProcess-1:
p.join()
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/process.py", line 149, in join
res = self._popen.wait(timeout)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/popen_fork.py", line 43, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/popen_fork.py", line 27, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt:
Traceback (most recent call last):
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/miniconda/envs/flood/lib/python3.10/concurrent/futures/process.py", line 240, in _process_worker
call_item = call_queue.get(block=True)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/queues.py", line 103, in get
res = self._recv_bytes()
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/connection.py", line 414, in _recv_bytes
buf = self._recv(4)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
Without executor.shutdown(cancel_futures=True)
:
executor.shutdown(cancel_futures=True)
:All items in the task queue continue running. Multiple interrupts needed to even think about exiting.
Workers may end up orphaned once main process exits.
Starting 0
Starting 1
Starting 2
2 0
0 0
1 0
^CAll Done
β― python ~/Desktop/process_interrupt.py
Starting 0
Starting 1
Starting 2
1 0
0 0
2 0
^CStarting 3
Starting 4
Starting 5
5 0
3 0
4 0
^CTraceback (most recent call last):
File "/Users/bea/Desktop/process_interrupt.py", line 99, in <module>
Starting 6
Starting 7
with ProcessPoolExecutor(max_workers=3) as executor:
File "/opt/miniconda/envs/flood/lib/python3.10/concurrent/futures/_base.py", line 649, in __exit__
Starting 8
self.shutdown(wait=True)
File "/opt/miniconda/envs/flood/lib/python3.10/concurrent/futures/process.py", line 780, in shutdown
self._executor_manager_thread.join()
File "/opt/miniconda/envs/flood/lib/python3.10/threading.py", line 1096, in join
self._wait_for_tstate_lock()
File "/opt/miniconda/envs/flood/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
if lock.acquire(block, timeout):
KeyboardInterrupt
8 0
6 0
7 0
^CException ignored in atexit callback: <function _exit_function at 0x10202f0a0>
Traceback (most recent call last):
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/util.py", line 357, in _exit_function
Starting 9
Starting 10
Starting 11
p.join()
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/process.py", line 149, in join
res = self._popen.wait(timeout)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/popen_fork.py", line 43, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/opt/miniconda/envs/flood/lib/python3.10/multiprocessing/popen_fork.py", line 27, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt:
3.10.11 (s) β― 9 0
11 100
10 100
9 100
10 200
11 200
9 200
9 300
11 300
10 300
9 400
3.10.11 (s) β―
CPython versions tested on:
3.10
Operating systems tested on:
macOS