Skip to content

Commit 518ec45

Browse files
committed
use pipe to notify nanny of zombie parents
if parent is not properly reaped (e.g. MPI), use a pipe to ensure it wakes only use the pipe on non-Windows
1 parent 45f896c commit 518ec45

File tree

2 files changed

+51
-10
lines changed

2 files changed

+51
-10
lines changed

ipyparallel/engine/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ def urls(key):
519519
# control stream:
520520
control_url = url('control')
521521
if self.enable_nanny:
522-
nanny_url = self.start_nanny(
522+
nanny_url, self.nanny_pipe = self.start_nanny(
523523
control_url=control_url,
524524
)
525525
control_url = nanny_url

ipyparallel/engine/nanny.py

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,18 @@
1515
"""
1616
import asyncio
1717
import logging
18+
import multiprocessing
1819
import os
1920
import signal
21+
import sys
2022
from multiprocessing import Pipe
21-
from multiprocessing import Process
23+
from multiprocessing.connection import Connection
2224
from threading import Thread
2325

2426
import psutil
25-
import tornado.ioloop
2627
import zmq
2728
from jupyter_client.session import Session
29+
from tornado.ioloop import IOLoop
2830
from traitlets.config import Config
2931
from zmq.eventloop.zmqstream import ZMQStream
3032

@@ -35,7 +37,10 @@
3537
class KernelNanny:
3638
"""Object for monitoring
3739
38-
Must be handling signal messages"""
40+
Must be child of engine
41+
42+
Handles signal messages and watches Engine process for exiting
43+
"""
3944

4045
def __init__(
4146
self,
@@ -65,6 +70,7 @@ def __init__(
6570
self.control_handlers = {
6671
"signal_request": self.signal_request,
6772
}
73+
self._finish_called = False
6874

6975
def wait_for_parent_thread(self):
7076
"""Wait for my parent to exit, then I'll notify the controller and shut down"""
@@ -79,6 +85,20 @@ def wait_for_parent_thread(self):
7985
self.log.critical(f"Parent {self.pid} exited with status {exit_code}.")
8086
self.loop.add_callback(self.finish)
8187

88+
def pipe_handler(self, fd, events):
89+
self.log.debug(f"Pipe event {events}")
90+
self.loop.remove_handler(fd)
91+
try:
92+
status = self.parent_process.wait(0)
93+
except psutil.TimeoutExpired:
94+
try:
95+
status = self.parent_process.status()
96+
except psutil.NoSuchProcessError:
97+
status = "exited"
98+
99+
self.log.critical(f"Pipe closed, parent {self.pid} has status: {status}")
100+
self.finish()
101+
82102
def notify_exit(self):
83103
"""Notify the Hub that our parent has exited"""
84104
self.log.info("Notifying Hub that our parent has shut down")
@@ -91,6 +111,9 @@ def notify_exit(self):
91111

92112
def finish(self):
93113
"""Prepare to exit and stop our event loop."""
114+
if self._finish_called:
115+
return
116+
self._finish_called = True
94117
self.notify_exit()
95118
self.loop.add_callback(self.loop.stop)
96119

@@ -160,7 +183,7 @@ def start(self):
160183
# ignore SIGINT sent to parent
161184
signal.signal(signal.SIGINT, signal.SIG_IGN)
162185

163-
self.loop = tornado.ioloop.IOLoop.current()
186+
self.loop = IOLoop.current()
164187
self.context = zmq.Context()
165188

166189
# set up control socket (connection to Scheduler)
@@ -176,7 +199,15 @@ def start(self):
176199

177200
# now that we've bound, pass port to parent via AsyncResult
178201
self.start_pipe.send(f"tcp://127.0.0.1:{port}")
179-
self.loop.add_timeout(self.loop.time() + 10, self.start_pipe.close)
202+
if sys.platform.startswith("win"):
203+
# close the pipe on Windows
204+
self.loop.add_timeout(self.loop.time() + 10, self.start_pipe.close)
205+
else:
206+
# otherwise, watch for the pipe to close
207+
# as a signal that our parent is shutting down
208+
self.loop.add_handler(
209+
self.start_pipe, self.pipe_handler, IOLoop.READ | IOLoop.ERROR
210+
)
180211
self.parent_stream = ZMQStream(self.parent_socket)
181212
self.parent_stream.on_recv_stream(self.dispatch_parent)
182213
try:
@@ -197,7 +228,7 @@ def main(cls, *args, **kwargs):
197228
"""
198229
# start a new event loop for the forked process
199230
asyncio.set_event_loop(asyncio.new_event_loop())
200-
tornado.ioloop.IOLoop().make_current()
231+
IOLoop().make_current()
201232
self = cls(*args, **kwargs)
202233
self.start()
203234

@@ -214,12 +245,22 @@ def start_nanny(**kwargs):
214245
"""
215246

216247
pipe_r, pipe_w = Pipe(duplex=False)
248+
217249
kwargs['start_pipe'] = pipe_w
218250
kwargs['pid'] = os.getpid()
219-
p = Process(target=KernelNanny.main, kwargs=kwargs, name="KernelNanny", daemon=True)
251+
# make sure to not use fork, which can be an issue for MPI
252+
p = multiprocessing.get_context("spawn").Process(
253+
target=KernelNanny.main,
254+
kwargs=kwargs,
255+
name="KernelNanny",
256+
daemon=True,
257+
)
220258
p.start()
221259
# close our copy of the write pipe
222260
pipe_w.close()
223261
nanny_url = pipe_r.recv()
224-
pipe_r.close()
225-
return nanny_url
262+
if sys.platform.startswith("win"):
263+
pipe_r.close()
264+
# return the handle on the read pipe
265+
# need to keep this open for the nanny
266+
return nanny_url, pipe_r

0 commit comments

Comments
 (0)