From 5e8dca243b55993e984a5315afa8d646bb2e4826 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Fri, 4 Oct 2024 15:24:35 +0200 Subject: [PATCH 1/4] gh-71936: Fix race condition in multiprocessing.Pool Proxes of shared objects register a Finalizer in BaseProxy._incref(), and it will call BaseProxy._decref() when it is GCed. This may cause a race condition with Pool(maxtasksperchild=None) on Windows. A connection would be closed and raised TypeError when a GC occurs between _ConnectionBase._check_writable() and _ConnectionBase._send_bytes() in _ConnectionBase.send() in the second or later task, and a new object is allocated that shares the id() of a previously deleted one. Instead of using the id() of the token (or the proxy), use a unique, non-reusable number. Co-Authored-By: Akinori Hattori --- Lib/multiprocessing/managers.py | 15 +++++++++++---- Misc/ACKS | 1 + .../2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst | 1 + 3 files changed, 13 insertions(+), 4 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 0f5f9f64c2de9e..6431c17c85ef49 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -758,6 +758,10 @@ class BaseProxy(object): _address_to_local = {} _mutex = util.ForkAwareThreadLock() + # Each instance gets a `_serial` number. Unlike `id(...)`, this number + # is never reused. + _next_serial = 0 + def __init__(self, token, serializer, manager=None, authkey=None, exposed=None, incref=True, manager_owned=False): with BaseProxy._mutex: @@ -766,6 +770,9 @@ def __init__(self, token, serializer, manager=None, tls_idset = util.ForkAwareLocal(), ProcessLocalSet() BaseProxy._address_to_local[token.address] = tls_idset + self._serial = BaseProxy._next_serial + BaseProxy._next_serial += 1 + # self._tls is used to record the connection used by this # thread to communicate with the manager at token.address self._tls = tls_idset[0] @@ -856,20 +863,20 @@ def _incref(self): dispatch(conn, None, 'incref', (self._id,)) util.debug('INCREF %r', self._token.id) - self._idset.add(self._id) + self._idset.add(self._serial) state = self._manager and self._manager._state self._close = util.Finalize( self, BaseProxy._decref, - args=(self._token, self._authkey, state, + args=(self._token, self._serial, self._authkey, state, self._tls, self._idset, self._Client), exitpriority=10 ) @staticmethod - def _decref(token, authkey, state, tls, idset, _Client): - idset.discard(token.id) + def _decref(token, serial, authkey, state, tls, idset, _Client): + idset.discard(serial) # check whether manager is still alive if state is None or state.value == State.STARTED: diff --git a/Misc/ACKS b/Misc/ACKS index d94cbacf888468..24b42ff253e3b8 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -732,6 +732,7 @@ Larry Hastings Tim Hatch Zac Hatfield-Dodds Shane Hathaway +Akinori Hattori Michael Haubenwallner Janko Hauser Flavian Hautbois diff --git a/Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst b/Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst new file mode 100644 index 00000000000000..ac1081e060ba2d --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst @@ -0,0 +1 @@ +Fix a race condition in :class:`multiprocessing.Pool`. From 45d6502830e71b895c0ddfda60f23f474ac8bbd0 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Fri, 4 Oct 2024 15:58:33 +0200 Subject: [PATCH 2/4] NEWS: Fix docs reference --- .../next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst b/Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst index ac1081e060ba2d..a0959cc086fa9e 100644 --- a/Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst +++ b/Misc/NEWS.d/next/Library/2022-10-15-10-18-20.gh-issue-71936.MzJjc_.rst @@ -1 +1 @@ -Fix a race condition in :class:`multiprocessing.Pool`. +Fix a race condition in :class:`multiprocessing.pool.Pool`. From 4ff6a8fa68b2358f4ef340f8086a3bba2c757158 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Tue, 12 Nov 2024 13:24:36 +0100 Subject: [PATCH 3/4] Start at one --- Lib/multiprocessing/managers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 67fdfca212f79a..566333668e869d 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -761,7 +761,7 @@ class BaseProxy(object): # Each instance gets a `_serial` number. Unlike `id(...)`, this number # is never reused. - _next_serial = 0 + _next_serial = 1 def __init__(self, token, serializer, manager=None, authkey=None, exposed=None, incref=True, manager_owned=False): From 556d4fffea4e0d24c11c1e2eb870dd794350c7ab Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Tue, 12 Nov 2024 13:24:44 +0100 Subject: [PATCH 4/4] Rename "_idset" to "_all_serials" --- Lib/multiprocessing/managers.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 566333668e869d..040f4674d735c0 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -766,22 +766,22 @@ class BaseProxy(object): def __init__(self, token, serializer, manager=None, authkey=None, exposed=None, incref=True, manager_owned=False): with BaseProxy._mutex: - tls_idset = BaseProxy._address_to_local.get(token.address, None) - if tls_idset is None: - tls_idset = util.ForkAwareLocal(), ProcessLocalSet() - BaseProxy._address_to_local[token.address] = tls_idset + tls_serials = BaseProxy._address_to_local.get(token.address, None) + if tls_serials is None: + tls_serials = util.ForkAwareLocal(), ProcessLocalSet() + BaseProxy._address_to_local[token.address] = tls_serials self._serial = BaseProxy._next_serial BaseProxy._next_serial += 1 # self._tls is used to record the connection used by this # thread to communicate with the manager at token.address - self._tls = tls_idset[0] + self._tls = tls_serials[0] - # self._idset is used to record the identities of all shared - # objects for which the current process owns references and + # self._all_serials is a set used to record the identities of all + # shared objects for which the current process owns references and # which are in the manager at token.address - self._idset = tls_idset[1] + self._all_serials = tls_serials[1] self._token = token self._id = self._token.id @@ -864,14 +864,14 @@ def _incref(self): dispatch(conn, None, 'incref', (self._id,)) util.debug('INCREF %r', self._token.id) - self._idset.add(self._serial) + self._all_serials.add(self._serial) state = self._manager and self._manager._state self._close = util.Finalize( self, BaseProxy._decref, args=(self._token, self._serial, self._authkey, state, - self._tls, self._idset, self._Client), + self._tls, self._all_serials, self._Client), exitpriority=10 )