Skip to content

Commit a1408a4

Browse files
committed
gh-130895: fix multiprocessing.Process join/wait/poll races
This bug is caused by race conditions in the poll implementations (which are called by join/wait) where if multiple threads try to reap the dead process only one "wins" and gets the exit code, while the others get an error. In the forkserver implementation the losing thread(s) set the code to an error, possibly overwriting the correct code set by the winning thread. This is relatively easy to fix: we can just take a lock before waiting for the process, since at that point we know the call should not block. In the fork and spawn implementations the losers of the race return before the exit code is set, meaning the process may still report itself as alive after join returns. Fixing this is trickier as we have to support a mixture of blocking and non-blocking calls to poll, and we cannot have the latter waiting to take a lock held by the former. The approach taken is to split the blocking and non-blocking call variants. The non-blocking variant does its work with the lock held: since it won't block this should be safe. The blocking variant releases the lock before making the blocking operating system call. It then retakes the lock and either sets the code if it wins or waits for a potentially racing thread to do so otherwise. If a non-blocking call is racing with the unlocked part of a blocking call it may still "lose" the race, and return None instead of the exit code, even though the process is dead. However, as the process could be alive at the time the call is made but die immediately afterwards, this situation should already be handled by correctly written code. To verify the behaviour a test is added which reliably triggers failures for all three implementations. A work-around for this bug in a test added for gh-128041 is also reverted.
1 parent b754aee commit a1408a4

File tree

4 files changed

+107
-23
lines changed

4 files changed

+107
-23
lines changed

Lib/multiprocessing/popen_fork.py

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import atexit
22
import os
33
import signal
4+
import threading
45

56
from . import util
67

@@ -17,23 +18,71 @@ def __init__(self, process_obj):
1718
util._flush_std_streams()
1819
self.returncode = None
1920
self.finalizer = None
21+
self._exit_condition = threading.Condition()
22+
self._exit_blockers = 0
2023
self._launch(process_obj)
2124

2225
def duplicate_for_child(self, fd):
2326
return fd
2427

2528
def poll(self, flag=os.WNOHANG):
26-
if self.returncode is None:
27-
try:
28-
pid, sts = os.waitpid(self.pid, flag)
29-
except OSError:
30-
# Child process not yet created. See #1731717
31-
# e.errno == errno.ECHILD == 10
32-
return None
29+
with self._exit_condition:
30+
if self.returncode is not None:
31+
return self.returncode
32+
elif flag & os.WNOHANG == os.WNOHANG:
33+
return self._nonblocking_poll(flag)
34+
else:
35+
self._exit_blockers += 1
36+
37+
# We have released the lock, so may be racing with blocking &
38+
# non-blocking calls at this point...
39+
pid = None
40+
try:
41+
pid, sts = os.waitpid(self.pid, flag)
42+
except OSError:
43+
# Child process doesn't exist because it hasn't started yet (see
44+
# bpo-1731717) or has already been awaited on a racing thread (see
45+
# gh-130895)
46+
pass
47+
48+
with self._exit_condition:
49+
self._exit_blockers -= 1
3350
if pid == self.pid:
34-
self.returncode = os.waitstatus_to_exitcode(sts)
51+
self._set_returncode(sts)
52+
elif self._exit_blockers == 0:
53+
self._exit_condition.notify_all()
54+
55+
# Wait until we get a definitive result, or we know there are no
56+
# racing calls that might be about to set it
57+
while self.returncode is None and self._exit_blockers > 0:
58+
self._exit_condition.wait()
59+
60+
return self.returncode
61+
62+
def _nonblocking_poll(self, flag):
63+
assert self._exit_condition._is_owned()
64+
assert self.returncode is None
65+
assert flag & os.WNOHANG == os.WNOHANG
66+
try:
67+
pid, sts = os.waitpid(self.pid, flag)
68+
if pid == self.pid:
69+
self._set_returncode(sts)
70+
except OSError:
71+
# See comments in the poll(...) except clause above
72+
pass
73+
74+
# We may be racing with a blocking wait call, in which case (if we lose
75+
# the race) it is arbitrary whether this returns None or the exit code
76+
# (if there is one): calling code must always be prepared to handle a
77+
# situation where this method returns None but the process has ended.
3578
return self.returncode
3679

80+
def _set_returncode(self, sts):
81+
assert self._exit_condition._is_owned()
82+
assert self.returncode is None
83+
self.returncode = os.waitstatus_to_exitcode(sts)
84+
self._exit_condition.notify_all()
85+
3786
def wait(self, timeout=None):
3887
if self.returncode is None:
3988
if timeout is not None:

Lib/multiprocessing/popen_forkserver.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import io
22
import os
3+
import threading
34

45
from .context import reduction, set_spawning_popen
56
if not reduction.HAVE_SEND_HANDLE:
@@ -32,6 +33,7 @@ class Popen(popen_fork.Popen):
3233

3334
def __init__(self, process_obj):
3435
self._fds = []
36+
self._lock = threading.Lock()
3537
super().__init__(process_obj)
3638

3739
def duplicate_for_child(self, fd):
@@ -64,11 +66,14 @@ def poll(self, flag=os.WNOHANG):
6466
timeout = 0 if flag == os.WNOHANG else None
6567
if not wait([self.sentinel], timeout):
6668
return None
67-
try:
68-
self.returncode = forkserver.read_signed(self.sentinel)
69-
except (OSError, EOFError):
70-
# This should not happen usually, but perhaps the forkserver
71-
# process itself got killed
72-
self.returncode = 255
69+
70+
with self._lock:
71+
if self.returncode is None:
72+
try:
73+
self.returncode = forkserver.read_signed(self.sentinel)
74+
except (OSError, EOFError):
75+
# This should not happen usually, but perhaps the
76+
# forkserver process itself got killed
77+
self.returncode = 255
7378

7479
return self.returncode

Lib/test/_test_multiprocessing.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -953,6 +953,44 @@ def test_forkserver_without_auth_fails(self):
953953
proc.start()
954954
proc.join()
955955

956+
@staticmethod
957+
def _wait_for_barrier(barrier):
958+
barrier.wait()
959+
960+
def _wait_on_proc(self, barrier, proc, errs):
961+
barrier.wait()
962+
proc.join()
963+
if proc.is_alive():
964+
errs.append("process alive after join")
965+
if proc.exitcode != 0:
966+
errs.append("process reported non-zero exit code")
967+
968+
def test_racing_joins(self):
969+
if self.TYPE == "threads":
970+
self.skipTest(f"test not appropriate for {self.TYPE}")
971+
972+
N = 5
973+
ITERATIONS = 10
974+
for _ in range(ITERATIONS):
975+
barrier = self.Barrier(N+1)
976+
proc = self.Process(target=self._wait_for_barrier, args=(barrier,))
977+
978+
errs = []
979+
threads = [threading.Thread(target=self._wait_on_proc,
980+
args=(barrier, proc, errs))
981+
for _ in range(N)]
982+
for t in threads:
983+
t.start()
984+
985+
proc.start()
986+
for t in threads:
987+
t.join()
988+
989+
# On failure(s), report the first since they are likely the same
990+
# error reported from multiple threads
991+
if errs:
992+
raise AssertionError(errs[0])
993+
956994
#
957995
#
958996
#

Lib/test/test_concurrent_futures/test_process_pool.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -327,15 +327,7 @@ def test_force_shutdown_workers_stops_pool(self, function_name):
327327
# error since the process would be alive immediately after the
328328
# test run.. and die a moment later.
329329
worker_process.join(support.SHORT_TIMEOUT)
330-
331-
# Oddly enough, even though join completes, sometimes it takes a
332-
# moment for the process to actually be marked as dead.
333-
# ... that seems a bit buggy.
334-
# We need it dead before ending the test to ensure it doesn't
335-
# get marked as an ENV CHANGE due to living child process.
336-
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
337-
if not worker_process.is_alive():
338-
break
330+
self.assertFalse(worker_process.is_alive())
339331

340332

341333
create_executor_tests(globals(), ProcessPoolExecutorTest,

0 commit comments

Comments
 (0)