Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,12 @@ with the :class:`Pool` class.
.. versionadded:: 3.4
*context*

.. versionchanged:: 3.8
When one of the worker processes terminates abruptly (e.g. the
Out Of Memory Killer of linux kicked in), a :exc:`BrokenProcessPool`
error is now raised. Previously, behavior was undefined and
the :class:`Pool` or its workers would often freeze or deadlock.

.. note::

Worker processes within a :class:`Pool` typically live for the complete
Expand Down Expand Up @@ -2225,6 +2231,12 @@ with the :class:`Pool` class.
:ref:`typecontextmanager`. :meth:`~contextmanager.__enter__` returns the
pool object, and :meth:`~contextmanager.__exit__` calls :meth:`terminate`.

.. exception:: BrokenProcessPool

Derived from :exc:`RuntimeError`, this exception class is raised when
one of the workers of a :class:`Pool` has terminated in a non-clean
fashion (for example, if it was killed from the outside).


.. class:: AsyncResult

Expand Down
84 changes: 68 additions & 16 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,29 @@
RUN = 0
CLOSE = 1
TERMINATE = 2
BROKEN = 3

#
# Miscellaneous
#

job_counter = itertools.count()


def mapstar(args):
return list(map(*args))


def starmapstar(args):
return list(itertools.starmap(args[0], args[1]))


class BrokenProcessPool(RuntimeError):
"""
Raised when a process in a ProcessPoolExecutor terminated abruptly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe avoid using ProcessPoolExecutor and future terms, which are objects of the concurrent.futures package and not the multiprocessing package.

while a future was in the running state.
"""

#
# Hack to embed stringification of remote traceback in local traceback
#
Expand Down Expand Up @@ -104,6 +114,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
if initializer is not None:
initializer(*initargs)

util.debug('worker started')
completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
Expand Down Expand Up @@ -167,7 +178,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
if processes is None:
processes = os.cpu_count() or 1
if processes < 1:
raise ValueError("Number of processes must be at least 1")
raise ValueError("Number of processes must be 2 or more")

if initializer is not None and not callable(initializer):
raise TypeError('initializer must be a callable')
Expand Down Expand Up @@ -220,20 +231,34 @@ def __init__(self, processes=None, initializer=None, initargs=(),
exitpriority=15
)


@staticmethod
def _join_exited_workers(pool):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
Returns None if the process pool is broken.
"""
cleaned = False
for i in reversed(range(len(pool))):
worker = pool[i]
if worker.exitcode is not None:
broken = []
for i, p in reversed(list(enumerate(pool))):
broken.append(p.exitcode not in (None, 0))
if p.exitcode is not None:
# worker exited
util.debug('cleaning up worker %d' % i)
worker.join()
p.join()
cleaned = True
del pool[i]

if any(broken):
# Stop all workers
util.info('worker handler: process pool is broken, terminating workers...')
for p in pool:
if p.exitcode is None:
p.terminate()
for p in pool:
p.join()
del pool[:]
return None
return cleaned

def _repopulate_pool(self):
Expand Down Expand Up @@ -271,11 +296,14 @@ def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
wrap_exception):
"""Clean up any exited workers and start replacements for them.
"""
if Pool._join_exited_workers(pool):
thread = threading.current_thread()
need_repopulate = Pool._join_exited_workers(pool)
if need_repopulate:
Pool._repopulate_pool_static(ctx, Process, processes, pool,
inqueue, outqueue, initializer,
initargs, maxtasksperchild,
wrap_exception)
return need_repopulate

def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue()
Expand Down Expand Up @@ -437,13 +465,24 @@ def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
inqueue, outqueue, initializer, initargs,
maxtasksperchild, wrap_exception):
thread = threading.current_thread()
util.debug('worker handler entering')

# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
while thread._state == RUN or (cache and thread._state != TERMINATE):
Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception)
new_workers = Pool._maintain_pool(
ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception)
if new_workers is None:
thread._state = BROKEN
for i, cache_ent in list(cache.items()):
err = BrokenProcessPool(
'A worker of the pool terminated abruptly '
'while the child process was still executing.')
# Exhaust MapResult with errors
while cache_ent._number_left > 0:
cache_ent._set(i, (False, err))
time.sleep(0.1)
# send sentinel to stop workers
taskqueue.put(None)
Expand All @@ -452,6 +491,7 @@ def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
@staticmethod
def _handle_tasks(taskqueue, put, outqueue, pool, cache):
thread = threading.current_thread()
util.debug('task handler entering')

for taskseq, set_length in iter(taskqueue.get, None):
task = None
Expand Down Expand Up @@ -497,6 +537,7 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):

@staticmethod
def _handle_results(outqueue, get, cache):
util.debug('result handler entering')
thread = threading.current_thread()

while 1:
Expand Down Expand Up @@ -573,7 +614,9 @@ def close(self):
util.debug('closing pool')
if self._state == RUN:
self._state = CLOSE
self._worker_handler._state = CLOSE
# Avert race condition in broken pools
if self._worker_handler._state != BROKEN:
self._worker_handler._state = CLOSE

def terminate(self):
util.debug('terminating pool')
Expand Down Expand Up @@ -606,13 +649,21 @@ def _help_stuff_finish(inqueue, task_handler, size):
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
worker_handler, task_handler, result_handler, cache):
# this is guaranteed to only be called once
util.debug('finalizing pool')
util.debug('terminate pool entering')
is_broken = BROKEN in (task_handler._state,
worker_handler._state,
result_handler._state)

worker_handler._state = TERMINATE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to use the _worker_state_lock here? And in other places where _worker_handler._state is manipulated?

task_handler._state = TERMINATE

util.debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
# Skip _help_finish_stuff if the pool is broken, because
# the broken process may have been holding the inqueue lock.
if not is_broken:
util.debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
else:
util.debug('finalizing BROKEN procress pool')

if (not result_handler.is_alive()) and (len(cache) != 0):
raise AssertionError(
Expand All @@ -623,8 +674,8 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,

# We must wait for the worker handler to exit before terminating
# workers because we don't want workers to be restarted behind our back.
util.debug('joining worker handler')
if threading.current_thread() is not worker_handler:
util.debug('joining worker handler')
worker_handler.join()

# Terminate workers which haven't already finished.
Expand All @@ -634,12 +685,12 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
if p.exitcode is None:
p.terminate()

util.debug('joining task handler')
if threading.current_thread() is not task_handler:
util.debug('joining task handler')
task_handler.join()

util.debug('joining result handler')
if threading.current_thread() is not result_handler:
util.debug('joining result handler')
result_handler.join()

if pool and hasattr(pool[0], 'terminate'):
Expand All @@ -649,6 +700,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
# worker has not yet exited
util.debug('cleaning up worker %d' % p.pid)
p.join()
util.debug('terminate pool finalized')

def __enter__(self):
return self
Expand Down
44 changes: 44 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2571,6 +2571,15 @@ def raising():
def unpickleable_result():
return lambda: 42

def waiting(args):
time.sleep(7)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it's not gonna wait that long when running the test? Just checking.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised tests. I wanted to make sure we did not break the pool before any tasks were running. Now I've reduced that wait to 0.5 sec - https://github.com/python/cpython/pull/10441/files#diff-b046ab474480855fb4a01de88cfc82bbR2672


def bad_exit(value):
if value:
from sys import exit
exit(123)


class _TestPoolWorkerErrors(BaseTestCase):
ALLOWED_TYPES = ('processes', )

Expand Down Expand Up @@ -2611,6 +2620,41 @@ def errback(exc):
p.close()
p.join()

def test_broken_process_pool1(self):
from multiprocessing.pool import BrokenProcessPool
p = multiprocessing.Pool(2)
res = p.map_async(waiting, range(10))
# Kill one of the pool workers.
waiting(None)
pid = p._pool[0].pid
os.kill(pid, signal.SIGTERM)
with self.assertRaises(BrokenProcessPool):
res.get()
p.close()
p.join()


def test_broken_process_pool2(self):
from multiprocessing.pool import BrokenProcessPool
p = multiprocessing.Pool(2)
res = p.map_async(waiting, [1])
# Kill one of the pool workers.
pid = p._pool[0].pid
os.kill(pid, signal.SIGTERM)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

with self.assertRaises(BrokenProcessPool):
res.get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've only launched a single task, so what if it was scheduled on the other worker? I don't think this test is reliable.

Copy link
Author

@oesteban oesteban Dec 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point (without the patch) you would inevitably get the same behavior: the pool hangs forever. With this patch, if you launch a single task on a broken pool (or the pool will be broken before the result is collected), you'll get the BrokenPoolError, regardless of the worker that was killed. We could keep track of sane results and try to rescue the most, but the original fix didn't look into that and it might be subject for a different PR. Similarly (matter of another PR), we could identify when the pool could be recovered (e.g., the worker died when the pool was idle waiting for tasks).

p.close()
p.join()

def test_broken_process_pool3(self):
from multiprocessing.pool import BrokenProcessPool
p = multiprocessing.Pool(2)
with self.assertRaises(BrokenProcessPool):
res = p.map(bad_exit, [0, 0, 1, 0])
p.close()
p.join()


class _TestPoolWorkerLifetime(BaseTestCase):
ALLOWED_TYPES = ('processes', )

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix ``multiprocessing.Pool`` indefintely hang when a worker process dies
unexpectedly. Patch by Oscar Esteban, based on code from Dan O'Reilly.