Skip to content

Commit 13137a7

Browse files
WIP feat(fork): Allow forking of a kernel
1 parent a5cff52 commit 13137a7

File tree

2 files changed

+98
-5
lines changed

2 files changed

+98
-5
lines changed

ipykernel/kernelapp.py

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -501,10 +501,83 @@ def start(self):
501501
self.poller.start()
502502
self.kernel.start()
503503
self.io_loop = ioloop.IOLoop.current()
504-
try:
505-
self.io_loop.start()
506-
except KeyboardInterrupt:
507-
pass
504+
keep_running = True
505+
while keep_running:
506+
try:
507+
self.io_loop.start()
508+
except KeyboardInterrupt:
509+
pass
510+
if not getattr(self.io_loop, '_fork_requested', False):
511+
keep_running = False
512+
else:
513+
pid = os.fork()
514+
self.io_loop._fork_requested = False # reset for parent AND child
515+
if pid == 0:
516+
import asyncio
517+
self.log.debug('Child kernel with pid ', os.getpid())
518+
519+
# try to aggresively close all sockets/ioloops etc
520+
for socket in [self.control_socket, self.iopub_socket, self.shell_socket, self.stdin_socket]:
521+
socket.close()
522+
self.io_loop.close(all_fds=True)
523+
self.io_loop.clear_current()
524+
ioloop.IOLoop.clear_current()
525+
526+
# check if the event loop is really being replaced
527+
loop = asyncio.get_event_loop()
528+
self.log.debug('asyncioloop: %r %r', loop, id(loop))
529+
530+
# install a new policy, sources:
531+
# https://bugs.python.org/issue21998
532+
# it will give each pid a new io loop
533+
_default_policy = asyncio.get_event_loop_policy()
534+
_pid_loop = {}
535+
class MultiprocessingPolicy(asyncio.AbstractEventLoopPolicy):
536+
def get_event_loop(self):
537+
pid = os.getpid()
538+
loop = _pid_loop.get(pid)
539+
if loop is None:
540+
loop = self.new_event_loop()
541+
_pid_loop[pid] = loop
542+
return loop
543+
544+
def set_event_loop(self, loop):
545+
pid = os.getpid()
546+
_pid_loop[pid] = loop
547+
548+
def new_event_loop(self):
549+
return _default_policy.new_event_loop()
550+
551+
asyncio.set_event_loop_policy(MultiprocessingPolicy())
552+
# del _pid_loop[os.getpid()]
553+
asyncio.new_event_loop()
554+
loop = asyncio.get_event_loop()
555+
self.log.debug('asyncioloop: %r %r', loop, id(loop))
556+
557+
import tornado.platform.asyncio as tasio
558+
# explicitly create a new io look that will also be the current
559+
self.io_loop = tasio.AsyncIOMainLoop(make_current=True)
560+
assert self.io_loop == ioloop.IOLoop.current()
561+
562+
# reset ports, so they will actually get updated
563+
self.hb_port = 0
564+
self.shell_port = 0
565+
self.iopub_port = 0
566+
self.stdin_port = 0
567+
self.control_port = 0
568+
# TODO: we might want to pass the same arguments, except the file
569+
# NOTE: we actually start a new kernel, but once this works
570+
# we can actually think about reusing the kernel object
571+
self.initialize(argv=['-f', 'conn_fork.json', '--debug'])
572+
self.start()
573+
pass
574+
else:
575+
self.log.debug('Parent kernel will resume')
576+
# keep a reference, since the will set this to None
577+
post_fork_callback = self.io_loop._post_fork_callback
578+
self.io_loop.add_callback(lambda: post_fork_callback(pid))
579+
self.io_loop._post_fork_callback = None
580+
508581

509582
launch_new_instance = IPKernelApp.launch_instance
510583

ipykernel/kernelbase.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,31 @@ def _default_ident(self):
154154
'connect_request', 'shutdown_request',
155155
'is_complete_request',
156156
# deprecated:
157-
'apply_request',
157+
'apply_request', 'fork'
158158
]
159159
# add deprecated ipyparallel control messages
160160
control_msg_types = msg_types + ['clear_request', 'abort_request']
161161

162+
def fork(self, stream, ident, parent):
163+
# Forking in the (async)io loop is not supported.
164+
# instead, we stop it, and use the io loop to pass
165+
# information up the callstack
166+
loop = ioloop.IOLoop.current()
167+
loop._fork_requested = True
168+
def post_fork_callback(pid):
169+
# we might be able to pass back the port information/connection
170+
# info file here. This is just a proof of concept
171+
reply_content = json_clean({'status': 'ok', 'fork_id': pid})
172+
metadata = {}
173+
metadata = self.finish_metadata(parent, metadata, reply_content)
174+
175+
reply_msg = self.session.send(stream, u'execute_reply',
176+
reply_content, parent, metadata=metadata,
177+
ident=ident)
178+
179+
loop._post_fork_callback = post_fork_callback
180+
loop.stop()
181+
162182
def __init__(self, **kwargs):
163183
super(Kernel, self).__init__(**kwargs)
164184
# Build dict of handlers for message types

0 commit comments

Comments
 (0)