Skip to content

Commit 0422bfa

Browse files
maartenbreddelsedisongustavo
authored andcommitted
WIP feat(fork): Allow forking of a kernel
1 parent 3e20f81 commit 0422bfa

File tree

2 files changed

+96
-15
lines changed

2 files changed

+96
-15
lines changed

ipykernel/kernelapp.py

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from ipython_genutils.importstring import import_item
3333
from jupyter_core.paths import jupyter_runtime_dir
3434
from jupyter_client import write_connection_file
35-
from jupyter_client.connect import ConnectionFileMixin
35+
from jupyter_client.connect import ConnectionFileMixin, port_names
3636

3737
# local imports
3838
from .iostream import IOPubThread
@@ -436,16 +436,22 @@ def init_kernel(self):
436436

437437
kernel_factory = self.kernel_class.instance
438438

439-
kernel = kernel_factory(parent=self, session=self.session,
440-
control_stream=control_stream,
441-
shell_streams=[shell_stream, control_stream],
442-
iopub_thread=self.iopub_thread,
443-
iopub_socket=self.iopub_socket,
444-
stdin_socket=self.stdin_socket,
445-
log=self.log,
446-
profile_dir=self.profile_dir,
447-
user_ns=self.user_ns,
439+
params = dict(
440+
parent=self,
441+
session=self.session,
442+
control_stream=control_stream,
443+
shell_streams=[shell_stream, control_stream],
444+
iopub_thread=self.iopub_thread,
445+
iopub_socket=self.iopub_socket,
446+
stdin_socket=self.stdin_socket,
447+
log=self.log,
448+
profile_dir=self.profile_dir,
449+
user_ns=self.user_ns,
448450
)
451+
kernel = kernel_factory(**params)
452+
for k, v in params.items():
453+
setattr(kernel, k, v)
454+
449455
kernel.record_ports({
450456
name + '_port': port for name, port in self.ports.items()
451457
})
@@ -559,10 +565,64 @@ def start(self):
559565
self.poller.start()
560566
self.kernel.start()
561567
self.io_loop = ioloop.IOLoop.current()
562-
try:
563-
self.io_loop.start()
564-
except KeyboardInterrupt:
565-
pass
568+
keep_running = True
569+
while keep_running:
570+
try:
571+
self.io_loop.start()
572+
except KeyboardInterrupt:
573+
pass
574+
if not getattr(self.io_loop, '_fork_requested', False):
575+
keep_running = False
576+
else:
577+
self.fork()
578+
579+
def fork(self):
580+
# Create a temporary connection file that will be inherited by the child process.
581+
connection_file, conn = write_connection_file()
582+
583+
parent_pid = os.getpid()
584+
pid = os.fork()
585+
self.io_loop._fork_requested = False # reset for parent AND child
586+
if pid == 0:
587+
import asyncio
588+
self.log.debug('Child kernel with pid ', os.getpid())
589+
590+
# close all sockets and ioloops
591+
self.close()
592+
593+
self.io_loop.close(all_fds=True)
594+
self.io_loop.clear_current()
595+
ioloop.IOLoop.clear_current()
596+
asyncio.new_event_loop()
597+
598+
import tornado.platform.asyncio as tasio
599+
# explicitly create a new io loop that will also be the current
600+
self.io_loop = tasio.AsyncIOMainLoop(make_current=True)
601+
assert self.io_loop == ioloop.IOLoop.current()
602+
603+
# Reset all ports so they will be reinitialized with the ports from the connection file
604+
for name in port_names:
605+
setattr(self, name, 0)
606+
self.connection_file = connection_file
607+
608+
# Reset the ZMQ context for it to be recreated in initialize()
609+
self.context = None
610+
611+
# Make ParentPoller work correctly (the new process is a child of the previous kernel)
612+
self.parent_handle = parent_pid
613+
614+
# Session have a protection to send messages from forked processes through the `check_pid` flag.
615+
self.session.pid = os.getpid()
616+
self.session.key = conn['key'].encode()
617+
618+
self.initialize(argv=['-f', self.abs_connection_file, '--debug'])
619+
self.start()
620+
else:
621+
self.log.debug('Parent kernel will resume')
622+
# keep a reference, since the will set this to None
623+
post_fork_callback = self.io_loop._post_fork_callback
624+
self.io_loop.add_callback(lambda: post_fork_callback(pid, conn))
625+
self.io_loop._post_fork_callback = None
566626

567627

568628
launch_new_instance = IPKernelApp.launch_instance
@@ -577,3 +637,4 @@ def main():
577637

578638
if __name__ == '__main__':
579639
main()
640+

ipykernel/kernelbase.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,30 @@ def _default_ident(self):
154154
'connect_request', 'shutdown_request',
155155
'is_complete_request',
156156
# deprecated:
157-
'apply_request',
157+
'apply_request', 'fork'
158158
]
159159
# add deprecated ipyparallel control messages
160160
control_msg_types = msg_types + ['clear_request', 'abort_request']
161161

162+
def fork(self, stream, ident, parent):
163+
# Forking in the (async)io loop is not supported.
164+
# instead, we stop it, and use the io loop to pass
165+
# information up the callstack
166+
loop = ioloop.IOLoop.current()
167+
loop._fork_requested = True
168+
169+
def post_fork_callback(pid, conn):
170+
reply_content = json_clean({'status': 'ok', 'pid': pid, 'conn': conn})
171+
metadata = {}
172+
metadata = self.finish_metadata(parent, metadata, reply_content)
173+
174+
self.session.send(stream, u'execute_reply',
175+
reply_content, parent, metadata=metadata,
176+
ident=ident)
177+
178+
loop._post_fork_callback = post_fork_callback
179+
loop.stop()
180+
162181
def __init__(self, **kwargs):
163182
super(Kernel, self).__init__(**kwargs)
164183
# Build dict of handlers for message types
@@ -514,6 +533,7 @@ def finish_metadata(self, parent, metadata, reply_content):
514533
def execute_request(self, stream, ident, parent):
515534
"""handle an execute_request"""
516535

536+
517537
try:
518538
content = parent[u'content']
519539
code = py3compat.cast_unicode_py2(content[u'code'])

0 commit comments

Comments
 (0)