Skip to content

Commit f1dba27

Browse files
MSealalexrudy
authored andcommitted
Backport PR #437: Avoid kernel failures with multiple processes
1 parent 077dc8d commit f1dba27

File tree

2 files changed

+120
-2
lines changed

2 files changed

+120
-2
lines changed

jupyter_client/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class KernelClient(ConnectionFileMixin):
5151
# The PyZMQ Context to use for communication with the kernel.
5252
context = Instance(zmq.Context)
5353
def _context_default(self):
54-
return zmq.Context.instance()
54+
return zmq.Context()
5555

5656
# The classes to use for the various channels
5757
shell_channel_class = Type(ChannelABC)

jupyter_client/tests/test_kernelmanager.py

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
from subprocess import PIPE
1212
import sys
1313
import time
14+
import threading
15+
import multiprocessing as mp
16+
import pytest
1417
from unittest import TestCase
1518

1619
from traitlets.config.loader import Config
@@ -28,7 +31,7 @@ def setUp(self):
2831

2932
def tearDown(self):
3033
self.env_patch.stop()
31-
34+
3235
def _install_test_kernel(self):
3336
kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
3437
os.makedirs(kernel_dir)
@@ -127,3 +130,118 @@ def test_start_new_kernel(self):
127130

128131
self.assertTrue(km.is_alive())
129132
self.assertTrue(kc.is_alive())
133+
134+
@pytest.mark.parallel
135+
class TestParallel:
136+
137+
@pytest.fixture(autouse=True)
138+
def env(self):
139+
env_patch = test_env()
140+
env_patch.start()
141+
yield
142+
env_patch.stop()
143+
144+
@pytest.fixture(params=['tcp', 'ipc'])
145+
def transport(self, request):
146+
return request.param
147+
148+
@pytest.fixture
149+
def config(self, transport):
150+
c = Config()
151+
c.transport = transport
152+
if transport == 'ipc':
153+
c.ip = 'test'
154+
return c
155+
156+
def _install_test_kernel(self):
157+
kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
158+
os.makedirs(kernel_dir)
159+
with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f:
160+
f.write(json.dumps({
161+
'argv': [sys.executable,
162+
'-m', 'jupyter_client.tests.signalkernel',
163+
'-f', '{connection_file}'],
164+
'display_name': "Signal Test Kernel",
165+
}))
166+
167+
def test_start_sequence_kernels(self, config):
168+
"""Ensure that a sequence of kernel startups doesn't break anything."""
169+
170+
self._install_test_kernel()
171+
self._run_signaltest_lifecycle(config)
172+
self._run_signaltest_lifecycle(config)
173+
self._run_signaltest_lifecycle(config)
174+
175+
def test_start_parallel_thread_kernels(self, config):
176+
self._install_test_kernel()
177+
self._run_signaltest_lifecycle(config)
178+
179+
thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
180+
thread2 = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
181+
try:
182+
thread.start()
183+
thread2.start()
184+
finally:
185+
thread.join()
186+
thread2.join()
187+
188+
def test_start_parallel_process_kernels(self, config):
189+
self._install_test_kernel()
190+
191+
self._run_signaltest_lifecycle(config)
192+
thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
193+
proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
194+
try:
195+
thread.start()
196+
proc.start()
197+
finally:
198+
thread.join()
199+
proc.join()
200+
201+
assert proc.exitcode == 0
202+
203+
def test_start_sequence_process_kernels(self, config):
204+
self._install_test_kernel()
205+
self._run_signaltest_lifecycle(config)
206+
proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
207+
try:
208+
proc.start()
209+
finally:
210+
proc.join()
211+
212+
assert proc.exitcode == 0
213+
214+
def _prepare_kernel(self, km, startup_timeout=TIMEOUT, **kwargs):
215+
km.start_kernel(**kwargs)
216+
kc = km.client()
217+
kc.start_channels()
218+
try:
219+
kc.wait_for_ready(timeout=startup_timeout)
220+
except RuntimeError:
221+
kc.stop_channels()
222+
km.shutdown_kernel()
223+
raise
224+
225+
return kc
226+
227+
def _run_signaltest_lifecycle(self, config=None):
228+
km = KernelManager(config=config, kernel_name='signaltest')
229+
kc = self._prepare_kernel(km, stdout=PIPE, stderr=PIPE)
230+
231+
def execute(cmd):
232+
kc.execute(cmd)
233+
reply = kc.get_shell_msg(TIMEOUT)
234+
content = reply['content']
235+
assert content['status'] == 'ok'
236+
return content
237+
238+
execute("start")
239+
assert km.is_alive()
240+
execute('check')
241+
assert km.is_alive()
242+
243+
km.restart_kernel(now=True)
244+
assert km.is_alive()
245+
execute('check')
246+
247+
km.shutdown_kernel()

0 commit comments

Comments
 (0)