-
-
Couldn't load subscription status.
- Fork 33.3k
Description
Bug report
Bug description:
The problem, simplified
When using a Manager with a custom authkey, a pickled proxy object can not be unpickled:
from multiprocessing.managers import BaseManager, BaseProxy
class MyManager(BaseManager):
pass
class MyObject:
def __init__(self):
self.value = 42
def get_value(self):
return self.value
def set_value(self, new_value):
self.value = new_value
class MyProxy(BaseProxy):
_exposed_ = ("get_value", "set_value")
def get_value(self):
return self._callmethod("get_value")
def set_value(self, new_value):
return self._callmethod("set_value", (new_value,))
_my_object = MyObject()
def _get_my_object():
return _my_object
def test_proxy_unpickle():
MyManager.register("get_my_object", callable=_get_my_object, proxytype=MyProxy)
manager = MyManager(
address=("", 0),
authkey=b"customkey", # <--- CUSTOM AUTHKEY
)
manager.start()
proxy_object = manager.get_my_object()
assert isinstance(proxy_object, MyProxy)
assert proxy_object.get_value() == 42
# We simulate pickling...
rebuild_proxy, args = proxy_object.__reduce__()
# ... and unpickling in a child process
# Fails with: multiprocessing.context.AuthenticationError: digest sent was rejected
rebuild_proxy(*args)(It works when no custom authkey is passed to MyManager.)
This is because When a proxy is pickled the authkey is deliberately dropped. Accordingly, BaseProxy.__init__ uses process.current_process().authkey and fails to connect to the manager.
Actually, there is one condition where an authkey is passed:
class BaseProxy:
def __reduce__(self):
kwds = {}
if get_spawning_popen() is not None:
kwds['authkey'] = self._authkey(So only while spawning a new process, but not at any other time later.)
This makes me wonder why the authkey parameter of BaseManager was introduced at all if it does not work in all cases but the default process.current_process().authkey does...
The documentation says:
An important feature of proxy objects is that they are picklable so they can be passed between processes.
As shown above, this is not unconditionally true. I have the feeling that the implementation can not be changed, so I would suggest to make the documentation more precise.
The context
I'm using dask_jobqueue.SLURMCluster to start worker processes on a distributed system. I want to use a multiprocessing Manager to enable communication between the main process and it's workers for progress reporting. (I already have it running for "regular" multiprocessing and I'm hesitant to switch to another means of IPC just because of this problem...)
I see two ways how I could make this work:
- Do
multiprocessing.process.current_process().authkey = b"customkey"during the initialization of the Dask worker. - Override
BaseProxy.__reduce__to unconditionally setkwds['authkey'] = bytes(self._authkey).
In both cases, I potentially expose authkey to the outside world... Any advice?
CPython versions tested on:
3.13
Operating systems tested on:
Linux
Metadata
Metadata
Assignees
Labels
Projects
Status
Status