Skip to content

Commit 7a9e5d8

Browse files
authored
Merge pull request #523 from minrk/broadcast-view-engine-death
Improvements for handling engine death
2 parents 211c7eb + cb8a547 commit 7a9e5d8

File tree

6 files changed

+135
-47
lines changed

6 files changed

+135
-47
lines changed

ipyparallel/client/view.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -891,15 +891,29 @@ def _make_async_result(self, message_future, s_idents, **kwargs):
891891
msg_and_target_id, async_result=True, track=True
892892
)
893893
self.client.outstanding.add(msg_and_target_id)
894+
self.client._outstanding_dict[ident].add(msg_and_target_id)
894895
self.outstanding.add(msg_and_target_id)
895896
futures.append(future[0])
896897
if original_msg_id in self.outstanding:
897898
self.outstanding.remove(original_msg_id)
898899
else:
899900
self.client.outstanding.add(original_msg_id)
901+
for ident in s_idents:
902+
self.client._outstanding_dict[ident].add(original_msg_id)
900903
futures = message_future
901904

902-
return AsyncResult(self.client, futures, owner=True, **kwargs)
905+
ar = AsyncResult(self.client, futures, owner=True, **kwargs)
906+
907+
if self.is_coalescing:
908+
# if coalescing, discard outstanding-tracking when we are done
909+
def _rm_outstanding(_):
910+
for ident in s_idents:
911+
if ident in self.client._outstanding_dict:
912+
self.client._outstanding_dict[ident].discard(original_msg_id)
913+
914+
ar.add_done_callback(_rm_outstanding)
915+
916+
return ar
903917

904918
@sync_results
905919
@save_ids

ipyparallel/controller/hub.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -913,14 +913,17 @@ def unregister_engine(self, ident, msg):
913913
eid = msg['content']['id']
914914
except:
915915
self.log.error(
916-
"registration::bad engine id for unregistration: %r",
917-
ident,
918-
exc_info=True,
916+
f"registration::bad request for engine for unregistration: {msg['content']}",
919917
)
920918
return
921-
self.log.info("registration::unregister_engine(%r)", eid)
922-
919+
if eid not in self.engines:
920+
self.log.info(
921+
f"registration::unregister_engine({eid}) already unregistered"
922+
)
923+
return
924+
self.log.info(f"registration::unregister_engine({eid})")
923925
ec = self.engines[eid]
926+
924927
content = dict(id=eid, uuid=ec.uuid)
925928

926929
# stop the heartbeats

ipyparallel/engine/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ def urls(key):
519519
# control stream:
520520
control_url = url('control')
521521
if self.enable_nanny:
522-
nanny_url = self.start_nanny(
522+
nanny_url, self.nanny_pipe = self.start_nanny(
523523
control_url=control_url,
524524
)
525525
control_url = nanny_url

ipyparallel/engine/nanny.py

Lines changed: 83 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@
1616
import asyncio
1717
import logging
1818
import os
19+
import pickle
1920
import signal
20-
from multiprocessing import Pipe
21-
from multiprocessing import Process
21+
import sys
22+
from subprocess import PIPE
23+
from subprocess import Popen
2224
from threading import Thread
2325

2426
import psutil
25-
import tornado.ioloop
2627
import zmq
2728
from jupyter_client.session import Session
29+
from tornado.ioloop import IOLoop
2830
from traitlets.config import Config
2931
from zmq.eventloop.zmqstream import ZMQStream
3032

@@ -35,7 +37,10 @@
3537
class KernelNanny:
3638
"""Object for monitoring
3739
38-
Must be handling signal messages"""
40+
Must be child of engine
41+
42+
Handles signal messages and watches Engine process for exiting
43+
"""
3944

4045
def __init__(
4146
self,
@@ -46,7 +51,7 @@ def __init__(
4651
registration_url: str,
4752
identity: bytes,
4853
config: Config,
49-
start_pipe: Pipe,
54+
pipe,
5055
log_level: int = logging.INFO,
5156
):
5257
self.pid = pid
@@ -55,16 +60,18 @@ def __init__(
5560
self.control_url = control_url
5661
self.registration_url = registration_url
5762
self.identity = identity
58-
self.start_pipe = start_pipe
5963
self.config = config
64+
self.pipe = pipe
6065
self.session = Session(config=self.config)
66+
log_level = 10
6167

6268
self.log = local_logger(f"{self.__class__.__name__}.{engine_id}", log_level)
6369
self.log.propagate = False
6470

6571
self.control_handlers = {
6672
"signal_request": self.signal_request,
6773
}
74+
self._finish_called = False
6875

6976
def wait_for_parent_thread(self):
7077
"""Wait for my parent to exit, then I'll notify the controller and shut down"""
@@ -79,6 +86,24 @@ def wait_for_parent_thread(self):
7986
self.log.critical(f"Parent {self.pid} exited with status {exit_code}.")
8087
self.loop.add_callback(self.finish)
8188

89+
def pipe_handler(self, fd, events):
90+
self.log.debug(f"Pipe event {events}")
91+
self.loop.remove_handler(fd)
92+
try:
93+
fd.close()
94+
except BrokenPipeError:
95+
pass
96+
try:
97+
status = self.parent_process.wait(0)
98+
except psutil.TimeoutExpired:
99+
try:
100+
status = self.parent_process.status()
101+
except psutil.NoSuchProcessError:
102+
status = "exited"
103+
104+
self.log.critical(f"Pipe closed, parent {self.pid} has status: {status}")
105+
self.finish()
106+
82107
def notify_exit(self):
83108
"""Notify the Hub that our parent has exited"""
84109
self.log.info("Notifying Hub that our parent has shut down")
@@ -91,6 +116,9 @@ def notify_exit(self):
91116

92117
def finish(self):
93118
"""Prepare to exit and stop our event loop."""
119+
if self._finish_called:
120+
return
121+
self._finish_called = True
94122
self.notify_exit()
95123
self.loop.add_callback(self.loop.stop)
96124

@@ -160,7 +188,7 @@ def start(self):
160188
# ignore SIGINT sent to parent
161189
signal.signal(signal.SIGINT, signal.SIG_IGN)
162190

163-
self.loop = tornado.ioloop.IOLoop.current()
191+
self.loop = IOLoop.current()
164192
self.context = zmq.Context()
165193

166194
# set up control socket (connection to Scheduler)
@@ -175,15 +203,25 @@ def start(self):
175203
port = self.parent_socket.bind_to_random_port("tcp://127.0.0.1")
176204

177205
# now that we've bound, pass port to parent via AsyncResult
178-
self.start_pipe.send(f"tcp://127.0.0.1:{port}")
179-
self.loop.add_timeout(self.loop.time() + 10, self.start_pipe.close)
206+
self.pipe.write(f"tcp://127.0.0.1:{port}\n")
207+
if not sys.platform.startswith("win"):
208+
# watch for the stdout pipe to close
209+
# as a signal that our parent is shutting down
210+
self.loop.add_handler(
211+
self.pipe, self.pipe_handler, IOLoop.READ | IOLoop.ERROR
212+
)
180213
self.parent_stream = ZMQStream(self.parent_socket)
181214
self.parent_stream.on_recv_stream(self.dispatch_parent)
182215
try:
183216
self.loop.start()
184217
finally:
185218
self.loop.close(all_fds=True)
186219
self.context.term()
220+
try:
221+
self.pipe.close()
222+
except BrokenPipeError:
223+
pass
224+
self.log.debug("exiting")
187225

188226
@classmethod
189227
def main(cls, *args, **kwargs):
@@ -197,7 +235,7 @@ def main(cls, *args, **kwargs):
197235
"""
198236
# start a new event loop for the forked process
199237
asyncio.set_event_loop(asyncio.new_event_loop())
200-
tornado.ioloop.IOLoop().make_current()
238+
IOLoop().make_current()
201239
self = cls(*args, **kwargs)
202240
self.start()
203241

@@ -213,13 +251,39 @@ def start_nanny(**kwargs):
213251
instead of connecting directly to the control Scheduler.
214252
"""
215253

216-
pipe_r, pipe_w = Pipe(duplex=False)
217-
kwargs['start_pipe'] = pipe_w
218254
kwargs['pid'] = os.getpid()
219-
p = Process(target=KernelNanny.main, kwargs=kwargs, name="KernelNanny", daemon=True)
220-
p.start()
221-
# close our copy of the write pipe
222-
pipe_w.close()
223-
nanny_url = pipe_r.recv()
224-
pipe_r.close()
225-
return nanny_url
255+
256+
env = os.environ.copy()
257+
env['PYTHONUNBUFFERED'] = '1'
258+
p = Popen(
259+
[sys.executable, '-m', __name__],
260+
stdin=PIPE,
261+
stdout=PIPE,
262+
env=env,
263+
start_new_session=True, # don't inherit signals
264+
)
265+
p.stdin.write(pickle.dumps(kwargs))
266+
p.stdin.close()
267+
out = p.stdout.readline()
268+
nanny_url = out.decode("utf8").strip()
269+
if not nanny_url:
270+
p.terminate()
271+
raise RuntimeError("nanny failed")
272+
# return the handle on the process
273+
# need to keep the pipe open for the nanny
274+
return nanny_url, p
275+
276+
277+
def main():
278+
"""Entrypoint from the command-line
279+
280+
Loads kwargs from stdin,
281+
sets pipe to stdout
282+
"""
283+
kwargs = pickle.load(os.fdopen(sys.stdin.fileno(), mode='rb'))
284+
kwargs['pipe'] = sys.stdout
285+
KernelNanny.main(**kwargs)
286+
287+
288+
if __name__ == "__main__":
289+
main()

ipyparallel/tests/clienttest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ def crash():
3232
os._exit(1)
3333

3434

35+
def conditional_crash(condition):
36+
"""Ungracefully exit the process"""
37+
if condition:
38+
crash()
39+
40+
3541
def wait(n):
3642
"""sleep for a time"""
3743
import time

0 commit comments

Comments
 (0)