Skip to content
8 changes: 6 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,16 @@ jobs:
if: ${{ needs.check_duplicate_runs.outputs.should_skip != 'true' }}

runs-on: ${{ matrix.os }}
timeout-minutes: 20

strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python-version: [3.6, 3.7, 3.8, 3.9]

python-version: [3.6, 3.7, 3.8, 3.9, pypy-3.7]
exclude:
- os: windows-latest
python-version: pypy-3.7
env:
OS: ${{ matrix.os }}

Expand Down Expand Up @@ -112,6 +115,7 @@ jobs:
pip freeze

- name: Check types
if: matrix.python-version != 'pypy-3.7'
run: mypy jupyter_client --exclude '\/tests|kernelspecapp|ioloop|runapp' --install-types --non-interactive

- name: Run the tests
Expand Down
4 changes: 3 additions & 1 deletion jupyter_client/provisioning/local_provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ async def wait(self) -> Optional[int]:
await asyncio.sleep(0.1)

# Process is no longer alive, wait and clear
ret = self.process.wait()
# Popen.__exit__ cleans up resources such as pipes
with self.process:
ret = self.process.wait()
self.process = None # allow has_process to now return False
return ret

Expand Down
47 changes: 47 additions & 0 deletions jupyter_client/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
import gc
import os
import sys

import pytest
import zmq
from jupyter_core import paths
from zmq.tests import BaseZMQTestCase

from .utils import test_env

Expand Down Expand Up @@ -39,3 +42,47 @@ def env():
@pytest.fixture()
def kernel_dir():
return pjoin(paths.jupyter_data_dir(), 'kernels')


def assert_no_zmq():
"""Verify that there are no zmq resources

avoids reference leaks across tests,
which can lead to FD exhaustion
"""
# zmq garbage collection uses a zmq socket in a thread
# we don't want to delete these from the main thread!
from zmq.utils import garbage

garbage.gc.stop()
sockets = [
obj
for obj in gc.get_referrers(zmq.Socket)
if isinstance(obj, zmq.Socket) and not obj.closed
]
if sockets:
message = f"{len(sockets)} unclosed sockets: {sockets}"
for s in sockets:
s.close(linger=0)
raise AssertionError(message)
contexts = [
obj
for obj in gc.get_referrers(zmq.Context)
if isinstance(obj, zmq.Context) and not obj.closed
]
# allow for single zmq.Context.instance()
if contexts and len(contexts) > 1:
message = f"{len(contexts)} unclosed contexts: {contexts}"
for ctx in contexts:
ctx.destroy(linger=0)
raise AssertionError(message)


@pytest.fixture(autouse=True)
def check_zmq(request):
yield
if request.instance and isinstance(request.instance, BaseZMQTestCase):
# can't run this check on old-style TestCases with tearDown methods
# because this check runs before tearDown
return
assert_no_zmq()
16 changes: 15 additions & 1 deletion jupyter_client/tests/signalkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Distributed under the terms of the Modified BSD License.
import os
import signal
import sys
import time
from subprocess import PIPE
from subprocess import Popen
Expand Down Expand Up @@ -40,7 +41,20 @@ def do_execute(
"user_expressions": {},
}
if code == "start":
child = Popen(["bash", "-i", "-c", "sleep 30"], stderr=PIPE)
child = Popen(
[
sys.executable,
"-c",
'; '.join(
[
"import signal, time",
"signal.signal(signal.SIGINT, signal.SIG_DFL)",
"time.sleep(30)",
]
),
],
stderr=PIPE,
)
self.children.append(child)
reply["user_expressions"]["pid"] = self.children[-1].pid
elif code == "check":
Expand Down
115 changes: 76 additions & 39 deletions jupyter_client/tests/test_kernelapp.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import shutil
import sys
import time
from queue import Empty
from queue import Queue
from subprocess import PIPE
from subprocess import Popen
from tempfile import mkdtemp
from threading import Thread


def _launch(extra_env):
Expand All @@ -21,44 +22,80 @@ def _launch(extra_env):
POLL_FREQ = 10


def test_kernelapp_lifecycle():
def test_kernelapp_lifecycle(request, tmpdir):
# Check that 'jupyter kernel' starts and terminates OK.
runtime_dir = mkdtemp()
startup_dir = mkdtemp()
runtime_dir = str(tmpdir.join("runtime").mkdir())
startup_dir = str(tmpdir.join("startup").mkdir())
started = os.path.join(startup_dir, "started")
try:
p = _launch(
{
"JUPYTER_RUNTIME_DIR": runtime_dir,
"JUPYTER_CLIENT_TEST_RECORD_STARTUP_PRIVATE": started,
}
)
# Wait for start
for _ in range(WAIT_TIME * POLL_FREQ):
if os.path.isfile(started):
break
time.sleep(1 / POLL_FREQ)
else:
raise AssertionError("No started file created in {} seconds".format(WAIT_TIME))

# Connection file should be there by now
for _ in range(WAIT_TIME * POLL_FREQ):
files = os.listdir(runtime_dir)
if files:
p = _launch(
{
"JUPYTER_RUNTIME_DIR": runtime_dir,
"JUPYTER_CLIENT_TEST_RECORD_STARTUP_PRIVATE": started,
}
)
request.addfinalizer(p.terminate)

# Wait for start
for _ in range(WAIT_TIME * POLL_FREQ):
if os.path.isfile(started):
break
time.sleep(1 / POLL_FREQ)
else:
raise AssertionError("No started file created in {} seconds".format(WAIT_TIME))

# Connection file should be there by now
for _ in range(WAIT_TIME * POLL_FREQ):
files = os.listdir(runtime_dir)
if files:
break
time.sleep(1 / POLL_FREQ)
else:
raise AssertionError("No connection file created in {} seconds".format(WAIT_TIME))

assert len(files) == 1
cf = files[0]
assert cf.startswith("kernel")
assert cf.endswith(".json")

# pexpect-style wait-for-text with timeout
# use blocking background thread to read output
# so this works on Windows
t = time.perf_counter()
deadline = t + WAIT_TIME
remaining = WAIT_TIME

stderr = ""
q = Queue()

def _readlines():
while True:
line = p.stderr.readline()
q.put(line.decode("utf8", "replace"))
if not line:
break
time.sleep(1 / POLL_FREQ)
else:
raise AssertionError("No connection file created in {} seconds".format(WAIT_TIME))
assert len(files) == 1
cf = files[0]
assert cf.startswith("kernel")
assert cf.endswith(".json")

# Send SIGTERM to shut down
time.sleep(1)

stderr_thread = Thread(target=_readlines, daemon=True)
stderr_thread.start()

while remaining >= 0 and p.poll() is None:
try:
line = q.get(timeout=remaining)
except Empty:
break
stderr += line
if cf in stderr:
break
remaining = deadline - time.perf_counter()

if p.poll() is None:
p.terminate()
_, stderr = p.communicate(timeout=WAIT_TIME)
assert cf in stderr.decode("utf-8", "replace")
finally:
shutil.rmtree(runtime_dir)
shutil.rmtree(startup_dir)

# finish reading stderr
stderr_thread.join()
while True:
try:
line = q.get_nowait()
except Empty:
break
stderr += line
assert cf in stderr
5 changes: 3 additions & 2 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ def execute(cmd):
km.restart_kernel(now=True)
assert km.is_alive()
execute("check")
kc.stop_channels()

km.shutdown_kernel()
assert km.context.closed
Expand Down Expand Up @@ -459,7 +460,7 @@ async def test_get_connect_info(self, async_km):
)
assert keys == expected

@pytest.mark.timeout(10)
@pytest.mark.timeout(20)
@pytest.mark.skipif(sys.platform == "win32", reason="Windows doesn't support signals")
async def test_signal_kernel_subprocesses(self, install_kernel, start_async_kernel):

Expand Down Expand Up @@ -499,7 +500,7 @@ async def execute(cmd):
# wait up to 5s for subprocesses to handle signal
for i in range(50):
reply = await execute("check")
if reply["user_expressions"]["poll"] != [-signal.SIGINT] * N:
if any(status is None for status in reply["user_expressions"]["poll"]):
await asyncio.sleep(0.1)
else:
break
Expand Down
9 changes: 9 additions & 0 deletions jupyter_client/tests/test_multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def _run_lifecycle(km, test_kid=None):
assert isinstance(k, KernelManager)
km.shutdown_kernel(kid, now=True)
assert kid not in km, f"{kid} not in {km}"
km.context.term()

def _run_cinfo(self, km, transport, ip):
kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
Expand All @@ -87,6 +88,7 @@ def _run_cinfo(self, km, transport, ip):
stream = km.connect_hb(kid)
stream.close()
km.shutdown_kernel(kid, now=True)
km.context.term()

# static so picklable for multiprocessing on Windows
@classmethod
Expand All @@ -106,6 +108,7 @@ def test_shutdown_all(self):
self.assertNotIn(kid, km)
# shutdown again is okay, because we have no kernels
km.shutdown_all()
km.context.term()

def test_tcp_cinfo(self):
km = self._get_tcp_km()
Expand Down Expand Up @@ -217,6 +220,7 @@ def test_subclass_callables(self):
assert km.call_count("cleanup_resources") == 0

assert kid not in km, f"{kid} not in {km}"
km.context.term()


class TestAsyncKernelManager(AsyncTestCase):
Expand Down Expand Up @@ -263,6 +267,7 @@ async def _run_lifecycle(km, test_kid=None):
assert isinstance(k, AsyncKernelManager)
await km.shutdown_kernel(kid, now=True)
assert kid not in km, f"{kid} not in {km}"
km.context.term()

async def _run_cinfo(self, km, transport, ip):
kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)
Expand All @@ -282,6 +287,7 @@ async def _run_cinfo(self, km, transport, ip):
stream.close()
await km.shutdown_kernel(kid, now=True)
self.assertNotIn(kid, km)
km.context.term()

@gen_test
async def test_tcp_lifecycle(self):
Expand Down Expand Up @@ -316,6 +322,7 @@ async def test_use_after_shutdown_all(self):
self.assertNotIn(kid, km)
# shutdown again is okay, because we have no kernels
await km.shutdown_all()
km.context.term()

@gen_test(timeout=20)
async def test_shutdown_all_while_starting(self):
Expand All @@ -333,6 +340,7 @@ async def test_shutdown_all_while_starting(self):
self.assertNotIn(kid, km)
# shutdown again is okay, because we have no kernels
await km.shutdown_all()
km.context.term()

@gen_test
async def test_tcp_cinfo(self):
Expand Down Expand Up @@ -466,3 +474,4 @@ async def test_subclass_callables(self):
assert mkm.call_count("cleanup_resources") == 0

assert kid not in mkm, f"{kid} not in {mkm}"
mkm.context.term()
9 changes: 3 additions & 6 deletions jupyter_client/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,13 @@ def test_tracking(self):
s.copy_threshold = 1
ZMQStream(a)
msg = s.send(a, "hello", track=False)
self.assertTrue(msg["tracker"] is ss.DONE)
assert msg["tracker"] is ss.DONE
msg = s.send(a, "hello", track=True)
self.assertTrue(isinstance(msg["tracker"], zmq.MessageTracker))
assert isinstance(msg["tracker"], zmq.MessageTracker)
M = zmq.Message(b"hi there", track=True)
msg = s.send(a, "hello", buffers=[M], track=True)
t = msg["tracker"]
self.assertTrue(isinstance(t, zmq.MessageTracker))
self.assertRaises(zmq.NotDone, t.wait, 0.1)
del M
t.wait(1) # this will raise
assert t is not ss.DONE

def test_unique_msg_ids(self):
"""test that messages receive unique ids"""
Expand Down
2 changes: 1 addition & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ipykernel
ipython
jedi<0.18; python_version<="3.6"
mock
mypy
mypy; implementation_name == "cpython"
pre-commit
pytest
pytest-asyncio
Expand Down