Skip to content

Commit 769a8b2

Browse files
committed
issue #547: core/service: race/deadlock-free service pool init
The previous method of spinning up a transient thread to import the service pool in a child context could deadlock with use of the importer on the main thread. Therefore wake the main thread to handle import for us, and use a regular Receiver to buffer messages to the stub, which is inherited rather than replaced by the real service pool.
1 parent 50b2d59 commit 769a8b2

File tree

2 files changed

+72
-41
lines changed

2 files changed

+72
-41
lines changed

mitogen/core.py

Lines changed: 53 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
FORWARD_MODULE = 108
123123
DETACHING = 109
124124
CALL_SERVICE = 110
125+
STUB_CALL_SERVICE = 111
125126

126127
#: Special value used to signal disconnection or the inability to route a
127128
#: message, when it appears in the `reply_to` field. Usually causes
@@ -3432,9 +3433,22 @@ def __init__(self, econtext):
34323433
self.econtext = econtext
34333434
#: Chain ID -> CallError if prior call failed.
34343435
self._error_by_chain_id = {}
3435-
self.recv = Receiver(router=econtext.router,
3436-
handle=CALL_FUNCTION,
3437-
policy=has_parent_authority)
3436+
self.recv = Receiver(
3437+
router=econtext.router,
3438+
handle=CALL_FUNCTION,
3439+
policy=has_parent_authority,
3440+
)
3441+
#: The :data:`CALL_SERVICE` :class:`Receiver` that will eventually be
3442+
#: reused by :class:`mitogen.service.Pool`, should it ever be loaded.
3443+
#: This is necessary for race-free reception of all service requests
3444+
#: delivered regardless of whether the stub or real service pool are
3445+
#: loaded. See #547 for related sorrows.
3446+
Dispatcher._service_recv = Receiver(
3447+
router=econtext.router,
3448+
handle=CALL_SERVICE,
3449+
policy=has_parent_authority,
3450+
)
3451+
self._service_recv.notify = self._on_call_service
34383452
listen(econtext.broker, 'shutdown', self.recv.close)
34393453

34403454
@classmethod
@@ -3475,8 +3489,44 @@ def _dispatch_one(self, msg):
34753489
self._error_by_chain_id[chain_id] = e
34763490
return chain_id, e
34773491

3492+
def _on_call_service(self, recv):
3493+
"""
3494+
Notifier for the :data:`CALL_SERVICE` receiver. This is called on the
3495+
:class:`Broker` thread for any service messages arriving at this
3496+
context, for as long as no real service pool implementation is loaded.
3497+
3498+
In order to safely bootstrap the service pool implementation a sentinel
3499+
message is enqueued on the :data:`CALL_FUNCTION` receiver in order to
3500+
wake the main thread, where the importer can run without any
3501+
possibility of suffering deadlock due to concurrent uses of the
3502+
importer.
3503+
3504+
Should the main thread be blocked indefinitely, preventing the import
3505+
from ever running, if it is blocked waiting on a service call, then it
3506+
means :mod:`mitogen.service` has already been imported and
3507+
:func:`mitogen.service.get_or_create_pool` has already run, meaning the
3508+
service pool is already active and the duplicate initialization was not
3509+
needed anyway.
3510+
3511+
#547: This trickery is needed to avoid the alternate option of spinning
3512+
a temporary thread to import the service pool, which could deadlock if
3513+
a custom import hook executing on the main thread (under the importer
3514+
lock) would block waiting for some data that was in turn received by a
3515+
service. Main thread import lock can't be released until service is
3516+
running, service cannot satisfy request until import lock is released.
3517+
"""
3518+
self.recv._on_receive(Message(handle=STUB_CALL_SERVICE))
3519+
3520+
def _init_service_pool(self):
3521+
import mitogen.service
3522+
mitogen.service.get_or_create_pool(router=self.econtext.router)
3523+
34783524
def _dispatch_calls(self):
34793525
for msg in self.recv:
3526+
if msg.handle == STUB_CALL_SERVICE:
3527+
self._init_service_pool()
3528+
continue
3529+
34803530
chain_id, ret = self._dispatch_one(msg)
34813531
_v and LOG.debug('%r: %r -> %r', self, msg, ret)
34823532
if msg.reply_to:
@@ -3535,34 +3585,6 @@ def _on_broker_exit(self):
35353585
if not self.config['profiling']:
35363586
os.kill(os.getpid(), signal.SIGTERM)
35373587

3538-
#: On Python >3.4, the global importer lock has split into per-module
3539-
#: locks, so there is no guarantee the import statement in
3540-
#: service_stub_main will complete before a second thread attempting the
3541-
#: same import will see a partially initialized module. Therefore serialize
3542-
#: the stub explicitly.
3543-
service_stub_lock = threading.Lock()
3544-
3545-
def _service_stub_main(self, msg):
3546-
self.service_stub_lock.acquire()
3547-
try:
3548-
import mitogen.service
3549-
pool = mitogen.service.get_or_create_pool(router=self.router)
3550-
pool._receiver._on_receive(msg)
3551-
finally:
3552-
self.service_stub_lock.release()
3553-
3554-
def _on_call_service_msg(self, msg):
3555-
"""
3556-
Stub service handler. Start a thread to import the mitogen.service
3557-
implementation from, and deliver the message to the newly constructed
3558-
pool. This must be done as CALL_SERVICE for e.g. PushFileService may
3559-
race with a CALL_FUNCTION blocking the main thread waiting for a result
3560-
from that service.
3561-
"""
3562-
if not msg.is_dead:
3563-
th = threading.Thread(target=self._service_stub_main, args=(msg,))
3564-
th.start()
3565-
35663588
def _on_shutdown_msg(self, msg):
35673589
if not msg.is_dead:
35683590
_v and LOG.debug('shutdown request from context %d', msg.src_id)
@@ -3606,11 +3628,6 @@ def _setup_master(self):
36063628
handle=SHUTDOWN,
36073629
policy=has_parent_authority,
36083630
)
3609-
self.router.add_handler(
3610-
fn=self._on_call_service_msg,
3611-
handle=CALL_SERVICE,
3612-
policy=has_parent_authority,
3613-
)
36143631
self.master = Context(self.router, 0, 'master')
36153632
parent_id = self.config['parent_ids'][0]
36163633
if parent_id == 0:

mitogen/service.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,13 @@ def get_or_create_pool(size=None, router=None):
8686
_pool_lock.acquire()
8787
try:
8888
if _pool_pid != my_pid:
89-
_pool = Pool(router, [], size=size or DEFAULT_POOL_SIZE,
90-
overwrite=True)
89+
_pool = Pool(
90+
router,
91+
services=[],
92+
size=size or DEFAULT_POOL_SIZE,
93+
overwrite=True,
94+
recv=mitogen.core.Dispatcher._service_recv,
95+
)
9196
# In case of Broker shutdown crash, Pool can cause 'zombie'
9297
# processes.
9398
mitogen.core.listen(router.broker, 'shutdown',
@@ -475,22 +480,31 @@ class Pool(object):
475480
program's configuration or its input data.
476481
477482
:param mitogen.core.Router router:
478-
Router to listen for ``CALL_SERVICE`` messages on.
483+
:class:`mitogen.core.Router` to listen for
484+
:data:`mitogen.core.CALL_SERVICE` messages.
479485
:param list services:
480486
Initial list of services to register.
487+
:param mitogen.core.Receiver recv:
488+
:data:`mitogen.core.CALL_SERVICE` receiver to reuse. This is used by
489+
:func:`get_or_create_pool` to hand off a queue of messages from the
490+
Dispatcher stub handler while avoiding a race.
481491
"""
482492
activator_class = Activator
483493

484-
def __init__(self, router, services=(), size=1, overwrite=False):
494+
def __init__(self, router, services=(), size=1, overwrite=False,
495+
recv=None):
485496
self.router = router
486497
self._activator = self.activator_class()
487498
self._ipc_latch = mitogen.core.Latch()
488-
self._receiver = mitogen.core.Receiver(
499+
self._receiver = recv or mitogen.core.Receiver(
489500
router=router,
490501
handle=mitogen.core.CALL_SERVICE,
491502
overwrite=overwrite,
492503
)
493504

505+
# If self._receiver was inherited from mitogen.core.Dispatcher, we must
506+
# remove its stub notification function before adding it to our Select.
507+
self._receiver.notify = None
494508
self._select = mitogen.select.Select(oneshot=False)
495509
self._select.add(self._receiver)
496510
self._select.add(self._ipc_latch)

0 commit comments

Comments
 (0)