Skip to content

Commit c8f4896

Browse files
committed
Adds tracking of which process in the pool takes which job from the queue; adds test for issue22393/issue38084.
1 parent 6a517c6 commit c8f4896

File tree

2 files changed

+63
-20
lines changed

2 files changed

+63
-20
lines changed

Lib/multiprocessing/pool.py

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
121121
break
122122

123123
job, i, func, args, kwds = task
124+
put((job, i, (None, os.getpid()))) # Provide info on who took job
124125
try:
125126
result = (True, func(*args, **kwds))
126127
except Exception as e:
@@ -220,12 +221,14 @@ def __init__(self, processes=None, initializer=None, initargs=(),
220221

221222
sentinels = self._get_sentinels()
222223

224+
self._job_assignments = {}
223225
self._worker_handler = threading.Thread(
224226
target=Pool._handle_workers,
225227
args=(self._cache, self._taskqueue, self._ctx, self.Process,
226228
self._processes, self._pool, self._inqueue, self._outqueue,
227229
self._initializer, self._initargs, self._maxtasksperchild,
228-
self._wrap_exception, sentinels, self._change_notifier)
230+
self._wrap_exception, sentinels, self._change_notifier,
231+
self._job_assignments)
229232
)
230233
self._worker_handler.daemon = True
231234
self._worker_handler._state = RUN
@@ -243,7 +246,8 @@ def __init__(self, processes=None, initializer=None, initargs=(),
243246

244247
self._result_handler = threading.Thread(
245248
target=Pool._handle_results,
246-
args=(self._outqueue, self._quick_get, self._cache)
249+
args=(self._outqueue, self._quick_get, self._cache,
250+
self._job_assignments)
247251
)
248252
self._result_handler.daemon = True
249253
self._result_handler._state = RUN
@@ -284,7 +288,7 @@ def _get_worker_sentinels(workers):
284288
workers if hasattr(worker, "sentinel")]
285289

286290
@staticmethod
287-
def _join_exited_workers(pool):
291+
def _join_exited_workers(pool, outqueue, job_assignments):
288292
"""Cleanup after any worker processes which have exited due to reaching
289293
their specified lifetime. Returns True if any workers were cleaned up.
290294
"""
@@ -294,8 +298,15 @@ def _join_exited_workers(pool):
294298
if worker.exitcode is not None:
295299
# worker exited
296300
util.debug('cleaning up worker %d' % i)
301+
pid = worker.ident
297302
worker.join()
298303
cleaned = True
304+
if pid in job_assignments:
305+
# If the worker process died without communicating back
306+
# while running a job, add a default result for it.
307+
job = job_assignments[pid]
308+
outqueue.put((job, i, (False, RuntimeError("Worker died"))))
309+
del job_assignments[pid]
299310
del pool[i]
300311
return cleaned
301312

@@ -330,10 +341,10 @@ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
330341
@staticmethod
331342
def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
332343
initializer, initargs, maxtasksperchild,
333-
wrap_exception):
344+
wrap_exception, job_assignments):
334345
"""Clean up any exited workers and start replacements for them.
335346
"""
336-
if Pool._join_exited_workers(pool):
347+
if Pool._join_exited_workers(pool, outqueue, job_assignments):
337348
Pool._repopulate_pool_static(ctx, Process, processes, pool,
338349
inqueue, outqueue, initializer,
339350
initargs, maxtasksperchild,
@@ -504,15 +515,16 @@ def _wait_for_updates(sentinels, change_notifier, timeout=None):
504515
def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
505516
pool, inqueue, outqueue, initializer, initargs,
506517
maxtasksperchild, wrap_exception, sentinels,
507-
change_notifier):
518+
change_notifier, job_assignments):
508519
thread = threading.current_thread()
509520

510521
# Keep maintaining workers until the cache gets drained, unless the pool
511522
# is terminated.
512523
while thread._state == RUN or (cache and thread._state != TERMINATE):
513524
cls._maintain_pool(ctx, Process, processes, pool, inqueue,
514525
outqueue, initializer, initargs,
515-
maxtasksperchild, wrap_exception)
526+
maxtasksperchild, wrap_exception,
527+
job_assignments)
516528

517529
current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
518530

@@ -568,7 +580,7 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
568580
util.debug('task handler exiting')
569581

570582
@staticmethod
571-
def _handle_results(outqueue, get, cache):
583+
def _handle_results(outqueue, get, cache, job_assignments):
572584
thread = threading.current_thread()
573585

574586
while 1:
@@ -587,12 +599,18 @@ def _handle_results(outqueue, get, cache):
587599
util.debug('result handler got sentinel')
588600
break
589601

590-
job, i, obj = task
591-
try:
592-
cache[job]._set(i, obj)
593-
except KeyError:
594-
pass
595-
task = job = obj = None
602+
job, i, (task_info, value) = task
603+
if task_info is None:
604+
# task_info is True or False when a task has completed but
605+
# None indicates information about which process has
606+
# accepted a job from the queue.
607+
job_assignments[value] = job
608+
else:
609+
try:
610+
cache[job]._set(i, (task_info, value))
611+
except KeyError:
612+
pass
613+
task = job = task_info = value = None
596614

597615
while cache and thread._state != TERMINATE:
598616
try:
@@ -604,12 +622,16 @@ def _handle_results(outqueue, get, cache):
604622
if task is None:
605623
util.debug('result handler ignoring extra sentinel')
606624
continue
607-
job, i, obj = task
608-
try:
609-
cache[job]._set(i, obj)
610-
except KeyError:
611-
pass
612-
task = job = obj = None
625+
626+
job, i, (task_info, value) = task
627+
if task_info is None:
628+
job_assignments[value] = job
629+
else:
630+
try:
631+
cache[job]._set(i, (task_info, value))
632+
except KeyError:
633+
pass
634+
task = job = task_info = value = None
613635

614636
if hasattr(outqueue, '_reader'):
615637
util.debug('ensuring that outqueue is not full')

Lib/test/_test_multiprocessing.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2712,6 +2712,27 @@ def errback(exc):
27122712
p.close()
27132713
p.join()
27142714

2715+
def test_pool_worker_died_without_communicating(self):
2716+
# Issue22393: test fix of indefinite hang caused by worker processes
2717+
# exiting abruptly (such as via os._exit()) without communicating
2718+
# back to the pool at all.
2719+
prog = (
2720+
"import os, multiprocessing as mp; "
2721+
"is_main = (__name__ == '__main__'); "
2722+
"p = mp.Pool(1) if is_main else print('worker'); "
2723+
"p.map(os._exit, [1]) if is_main else None; "
2724+
"(p.close() or p.join()) if is_main else None"
2725+
)
2726+
# Only if there is a regression will this ever trigger a
2727+
# subprocess.TimeoutExpired.
2728+
completed_process = subprocess.run(
2729+
[sys.executable, '-E', '-S', '-O', '-c', prog],
2730+
check=False,
2731+
timeout=100,
2732+
capture_output=True
2733+
)
2734+
self.assertNotEqual(0, completed_process.returncode)
2735+
27152736
class _TestPoolWorkerLifetime(BaseTestCase):
27162737
ALLOWED_TYPES = ('processes', )
27172738

0 commit comments

Comments
 (0)