diff --git a/CHANGELOG.md b/CHANGELOG.md index 0628270e7..f299e84d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - implement jump target selector and jump to references ([#739]) - implement settings UI using native JupyterLab 3.3 UI ([#778]) - add option to show hover tooltip automatically ([#864], thanks @yamaton) + - add support for language servers that can (only) communicate through TCP rather than stdio (there is no support yet for servers already running on another machine and/or port) [(#636)] - bug fixes: - use correct websocket URL if configured as different from base URL ([#820], thanks @MikeSem) - clean up all completer styles when completer feature is disabled ([#829]). @@ -61,6 +62,7 @@ [#860]: https://github.com/jupyter-lsp/jupyterlab-lsp/pull/860 [#864]: https://github.com/jupyter-lsp/jupyterlab-lsp/pull/864 [#882]: https://github.com/jupyter-lsp/jupyterlab-lsp/pull/882 +[#636]: https://github.com/jupyter-lsp/jupyterlab-lsp/pull/636 ### `@krassowski/jupyterlab-lsp 3.10.1` (2022-03-21) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e46e0d86a..c7598172b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -388,8 +388,11 @@ otherwise an empty dictionary (`{}`) should be returned. ##### Common Concerns - some language servers need to have their connection mode specified - - the `stdio` interface is the only one supported by `jupyter_lsp` - - PRs welcome to support other modes! + - `jupyter_lsp` currently supports the `stdio` and `tcp` interface + - the mode used by `jupyter_lsp` to connect to the language server can be specified by including `mode="stdio"` or `mode="tcp"` in the language server `spec`-dictionary + - currently it is not possible to connect to externally running language servers via tcp, but only to servers spawned by `jupyter_lsp` as given by the `argv` specs entry + - PRs welcome to support externally running language servers! + - use the placeholder `{port}` within the `argv` entry to allow `jupyter_lsp` to specify the port on which to launch the language server - because of its VSCode heritage, many language servers use `nodejs` - `LanguageServerManager.nodejs` will provide the location of our best guess at where a user's `nodejs` might be found diff --git a/atest/ports.py b/atest/ports.py index 439227b6d..dac9307ed 100644 --- a/atest/ports.py +++ b/atest/ports.py @@ -9,7 +9,7 @@ def get_unused_port(): Probably could introduce race conditions if inside a tight loop. """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(("localhost", 0)) + sock.bind(("127.0.0.1", 0)) sock.listen(1) port = sock.getsockname()[1] sock.close() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/connection.py b/python_packages/jupyter_lsp/jupyter_lsp/connection.py new file mode 100644 index 000000000..661e1a80a --- /dev/null +++ b/python_packages/jupyter_lsp/jupyter_lsp/connection.py @@ -0,0 +1,169 @@ +""" Language Server readers and writers + +Parts of this code are derived from: + +> https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/pyls_jsonrpc/streams.py#L83 # noqa +> https://github.com/palantir/python-jsonrpc-server/blob/45ed1931e4b2e5100cc61b3992c16d6f68af2e80/pyls_jsonrpc/streams.py # noqa +> > MIT License https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/LICENSE +> > Copyright 2018 Palantir Technologies, Inc. +""" +from abc import ABC, ABCMeta, abstractmethod +from typing import Text + +# pylint: disable=broad-except +import anyio +from anyio.streams.buffered import BufferedByteReceiveStream +from anyio.streams.stapled import StapledObjectStream +from anyio.streams.text import TextSendStream +from tornado.httputil import HTTPHeaders +from traitlets import Instance, Int +from traitlets.config import LoggingConfigurable +from traitlets.traitlets import MetaHasTraits + + +class LspStreamMeta(MetaHasTraits, ABCMeta): + pass + + +class LspStreamBase(LoggingConfigurable, ABC, metaclass=LspStreamMeta): + """Non-blocking, queued base for communicating with Language Servers through anyio + streams + """ + + queue = Instance(StapledObjectStream, help="queue to get/put") + + def __repr__(self): # pragma: no cover + return "<{}(parent={})>".format(self.__class__.__name__, self.parent) + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.log.debug("%s initialized", self) + + @abstractmethod + async def close(self): + pass # pragma: no cover + + +class LspStreamReader(LspStreamBase): + """Language Server Reader""" + + receive_max_bytes = Int( + 65536, + help="the maximum size a header line send by the language server may have", + ).tag(config=True) + + stream = Instance( # type:ignore[assignment] + BufferedByteReceiveStream, help="the stream to read from" + ) # type: BufferedByteReceiveStream + + def __init__(self, stream: anyio.abc.ByteReceiveStream, **kwargs): + super().__init__(**kwargs) + self.stream = BufferedByteReceiveStream(stream) + + async def close(self): + await self.stream.aclose() + self.log.debug("%s closed", self) + + async def read(self) -> None: + """Read from a Language Server until it is closed""" + while True: + message = None + try: + message = await self.read_one() + await self.queue.send(message) + except anyio.ClosedResourceError: + # stream was closed -> terminate + self.log.debug("Stream closed while a read was still in progress") + break + except Exception as e: # pragma: no cover + self.log.exception( + "%s couldn't enqueue message: %s (%s)", self, message, e + ) + + async def _read_content(self, length: int) -> bytes: + """Read the full length of the message. + + Args: + - length: the content length + """ + try: + return await self.stream.receive_exactly(length) + except anyio.IncompleteRead: # pragma: no cover + # resource has been closed before the requested bytes could be retrieved + # -> signal recource closed + raise anyio.ClosedResourceError + + async def read_one(self) -> Text: + """Read a single message""" + message = "" + headers = HTTPHeaders() + + line = await self._readline() + + if line: + while line and line.strip(): + headers.parse_line(line) + line = await self._readline() + + content_length = int(headers.get("content-length", "0")) + + if content_length: + raw = await self._read_content(length=content_length) + message = raw.decode("utf-8").strip() + + return message + + async def _readline(self) -> Text: + """Read a line""" + try: + # use same max_bytes as is default for receive for now. It seems there is no + # way of getting the bytes read until max_bytes is reached, so we cannot + # iterate the receive_until call with smaller max_bytes values + line = await self.stream.receive_until(b"\r\n", self.receive_max_bytes) + return line.decode("utf-8").strip() + except anyio.IncompleteRead: + # resource has been closed before the requested bytes could be retrieved + # -> signal recource closed + raise anyio.ClosedResourceError + except anyio.DelimiterNotFound: # pragma: no cover + self.log.error( + "Readline hit max_bytes before newline character was encountered" + ) + return "" + + +class LspStreamWriter(LspStreamBase): + """Language Server Writer""" + + stream = Instance( # type:ignore[assignment] + TextSendStream, help="the stream to write to" + ) # type: TextSendStream + + def __init__(self, stream: anyio.abc.ByteSendStream, **kwargs): + super().__init__(**kwargs) + self.stream = TextSendStream(stream, encoding="utf-8") + + async def close(self): + await self.stream.aclose() + self.log.debug("%s closed", self) + + async def write(self) -> None: + """Write to a Language Server until it closes""" + while True: + message = await self.queue.receive() + try: + n_bytes = len(message.encode("utf-8")) + response = "Content-Length: {}\r\n\r\n{}".format(n_bytes, message) + await self._write_one(response) + except ( + anyio.ClosedResourceError, + anyio.BrokenResourceError, + ): # pragma: no cover + # stream was closed -> terminate + self.log.debug("Stream closed while a write was still in progress") + break + except Exception: # pragma: no cover + self.log.exception("%s couldn't write message: %s", self, response) + + async def _write_one(self, message) -> None: + await self.stream.send(message) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/manager.py b/python_packages/jupyter_lsp/jupyter_lsp/manager.py index d6874e43c..10c8ee878 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/manager.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/manager.py @@ -1,4 +1,4 @@ -""" A configurable frontend for stdio-based Language Servers +""" A configurable frontend for stream-based Language Servers """ import asyncio import os @@ -35,7 +35,11 @@ EP_SPEC_V1, ) from .schema import LANGUAGE_SERVER_SPEC_MAP -from .session import LanguageServerSession +from .session import ( + LanguageServerSessionBase, + LanguageServerSessionStdio, + LanguageServerSessionTCP, +) from .trait_types import LoadableCallable, Schema from .types import ( KeyedLanguageServerSpecs, @@ -68,10 +72,10 @@ class LanguageServerManager(LanguageServerManagerAPI): ) # type: bool sessions = Dict_( # type:ignore[assignment] - trait=Instance(LanguageServerSession), + trait=Instance(LanguageServerSessionBase), default_value={}, help="sessions keyed by language server name", - ) # type: Dict[Tuple[Text], LanguageServerSession] + ) # type: Dict[Tuple[Text], LanguageServerSessionBase] virtual_documents_dir = Unicode( help="""Path to virtual documents relative to the content manager root @@ -160,9 +164,21 @@ def init_sessions(self): """create, but do not initialize all sessions""" sessions = {} for language_server, spec in self.language_servers.items(): - sessions[language_server] = LanguageServerSession( - language_server=language_server, spec=spec, parent=self - ) + mode = spec.get("mode", "stdio") + if mode == "stdio": + sessions[language_server] = LanguageServerSessionStdio( + language_server=language_server, spec=spec, parent=self + ) + elif mode == "tcp": + sessions[language_server] = LanguageServerSessionTCP( + language_server=language_server, spec=spec, parent=self + ) + else: # pragma: no cover + raise ValueError( + "Unknown session mode {} for language server '{}'".format( + mode, language_server + ) + ) self.sessions = sessions def init_listeners(self): diff --git a/python_packages/jupyter_lsp/jupyter_lsp/non_blocking.py b/python_packages/jupyter_lsp/jupyter_lsp/non_blocking.py deleted file mode 100644 index ef6f9de71..000000000 --- a/python_packages/jupyter_lsp/jupyter_lsp/non_blocking.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -Derived from - -> https://github.com/rudolfwalter/pygdbmi/blob/0.7.4.2/pygdbmi/gdbcontroller.py -> MIT License https://github.com/rudolfwalter/pygdbmi/blob/master/LICENSE -> Copyright (c) 2016 Chad Smith gmail.com> -""" -import os - -if os.name == "nt": # pragma: no cover - import msvcrt - from ctypes import POINTER, WinError, byref, windll, wintypes # type: ignore - from ctypes.wintypes import BOOL, DWORD, HANDLE # type: ignore -else: # pragma: no cover - import fcntl - - -def make_non_blocking(file_obj): # pragma: no cover - """ - make file object non-blocking - - Windows doesn't have the fcntl module, but someone on - stack overflow supplied this code as an answer, and it works - http://stackoverflow.com/a/34504971/2893090 - """ - - if os.name == "nt": - LPDWORD = POINTER(DWORD) - PIPE_NOWAIT = wintypes.DWORD(0x00000001) - - SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState - SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD] - SetNamedPipeHandleState.restype = BOOL - - h = msvcrt.get_osfhandle(file_obj.fileno()) - - res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None) - if res == 0: - raise ValueError(WinError()) - - else: - # Set the file status flag (F_SETFL) on the pipes to be non-blocking - # so we can attempt to read from a pipe with no new data without locking - # the program up - fcntl.fcntl(file_obj, fcntl.F_SETFL, os.O_NONBLOCK) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json b/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json index 61f31d1cb..7d3e1fa9d 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json +++ b/python_packages/jupyter_lsp/jupyter_lsp/schema/schema.json @@ -139,6 +139,13 @@ "description": "list of MIME types supported by the language server", "title": "MIME Types" }, + "mode": { + "description": "connection mode used, e.g. stdio (default), tcp", + "title": "Mode", + "type": "string", + "enum": ["stdio", "tcp"], + "default": "stdio" + }, "troubleshoot": { "type": "string", "description": "information on troubleshooting the installation or auto-detection of the language server", diff --git a/python_packages/jupyter_lsp/jupyter_lsp/session.py b/python_packages/jupyter_lsp/jupyter_lsp/session.py index 082142164..440b83a7c 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/session.py @@ -2,42 +2,78 @@ """ import asyncio import atexit +import math import os import string import subprocess +import sys +from abc import ABC, ABCMeta, abstractmethod from copy import copy from datetime import datetime, timezone +from threading import Event, Thread +from typing import List +import anyio +from anyio import CancelScope +from anyio.abc import Process, SocketStream +from anyio.streams.stapled import StapledObjectStream from tornado.ioloop import IOLoop -from tornado.queues import Queue from tornado.websocket import WebSocketHandler -from traitlets import Bunch, Instance, Set, Unicode, UseEnum, observe +from traitlets import Bunch, Float, Instance, Set, Unicode, UseEnum, observe from traitlets.config import LoggingConfigurable +from traitlets.traitlets import MetaHasTraits -from . import stdio +from .connection import LspStreamReader, LspStreamWriter from .schema import LANGUAGE_SERVER_SPEC from .specs.utils import censored_spec from .trait_types import Schema from .types import SessionStatus +from .utils import get_unused_port -class LanguageServerSession(LoggingConfigurable): +class LanguageServerSessionMeta(MetaHasTraits, ABCMeta): + pass + + +class LanguageServerSessionBase( + LoggingConfigurable, ABC, metaclass=LanguageServerSessionMeta +): """Manage a session for a connection to a language server""" language_server = Unicode(help="the language server implementation name") spec = Schema(LANGUAGE_SERVER_SPEC) # run-time specifics - process = Instance( - subprocess.Popen, help="the language server subprocess", allow_none=True + process = Instance(Process, help="the language server subprocess", allow_none=True) + cancelscope = Instance( + CancelScope, help="scope used for stopping the session", allow_none=True + ) + started = Instance( + Event, + args=(), + help="event signaling that the session has finished starting", + allow_none=False, + ) + thread = Instance( + Thread, help="worker thread for running an event loop", allow_none=True ) - writer = Instance(stdio.LspStdIoWriter, help="the JSON-RPC writer", allow_none=True) - reader = Instance(stdio.LspStdIoReader, help="the JSON-RPC reader", allow_none=True) + main_loop = Instance( + IOLoop, help="the event loop of the main thread", allow_none=True + ) + thread_loop = Instance( + IOLoop, help="the event loop of the worker thread", allow_none=True + ) + writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True) + reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True) from_lsp = Instance( - Queue, help="a queue for string messages from the server", allow_none=True + StapledObjectStream, + help="a queue for string messages from the server", + allow_none=True, ) to_lsp = Instance( - Queue, help="a queue for string message to the server", allow_none=True + StapledObjectStream, + help="a queue for string messages to the server", + allow_none=True, ) handlers = Set( trait=Instance(WebSocketHandler), @@ -48,7 +84,20 @@ class LanguageServerSession(LoggingConfigurable): last_handler_message_at = Instance(datetime, allow_none=True) last_server_message_at = Instance(datetime, allow_none=True) - _tasks = None + stop_timeout = Float( + 5, + help="timeout in seconds after which a server process will be terminated " + "forcefully", + ).tag(config=True) + start_timeout = Float( + 240, + help="timeout in seconds after which server process startup will be aborted", + ).tag(config=True) + queue_size = Float( + -1, + help="the maximum number of messages that can be buffered in the queue or -1 " + "for an unbounded queue", + ).tag(config=True) _skip_serialize = ["argv", "debug_argv"] @@ -75,40 +124,92 @@ def to_json(self): spec=censored_spec(self.spec), ) - def initialize(self): - """(re)initialize a language server session""" - self.stop() + def start(self): + """run a language server session asynchronously inside a worker thread + + will return as soon as the session is ready for communication + """ + self.main_loop = IOLoop.current() + self.started.clear() + + if sys.platform == "win32": # pragma: no cover + # harmonizes event loop across Python version on Windows. + # Python <3.8 did not use ProactorEventLoop as default. + # ProactorEventLoop supports subprocesses. + policy = asyncio.WindowsProactorEventLoopPolicy() + else: + policy = asyncio.DefaultEventLoopPolicy() + if sys.version_info < (3, 8): # pragma: no cover + from .threaded_child_watcher import ThreadedChildWatcher + + policy.set_child_watcher(ThreadedChildWatcher()) + self.thread = Thread( + target=anyio.run, + kwargs={"func": self.run, "backend_options": {"policy": policy}}, + daemon=True, + ) + self.thread.start() + self.started.wait(timeout=self.start_timeout) + + def stop(self): + """shut down the session""" + if self.cancelscope is not None: + self.thread_loop.add_callback(self.cancelscope.cancel) + + # wait for the session to get cleaned up + if self.thread and self.thread.is_alive(): + self.thread.join() + self.main_loop = None + + async def run(self): + """run this session in a task group and clean everything up on cancellation""" + self.thread_loop = IOLoop.current() + + try: + async with anyio.create_task_group() as tg: + self.cancelscope = tg.cancel_scope + await self.initialize() + self.started.set() + tg.start_soon(self._read_lsp) + tg.start_soon(self._write_lsp) + tg.start_soon(self._broadcast_from_lsp) + except Exception as e: # pragma: no cover + self.log.exception("Exception while listening %s", e) + finally: + await self.cleanup() + self.cancelscope = None + self.thread_loop = None + + async def initialize(self): + """initialize a language server session""" self.status = SessionStatus.STARTING + self.init_queues() - self.init_process() + await self.init_process() self.init_writer() self.init_reader() - loop = asyncio.get_event_loop() - self._tasks = [ - loop.create_task(coro()) - for coro in [self._read_lsp, self._write_lsp, self._broadcast_from_lsp] - ] - self.status = SessionStatus.STARTED - def stop(self): + async def cleanup(self): """clean up all of the state of the session""" - self.status = SessionStatus.STOPPING - if self.process: - self.process.terminate() - self.process = None - if self.reader: - self.reader.close() + if self.reader is not None: + await self.reader.close() self.reader = None - if self.writer: - self.writer.close() + if self.writer is not None: + await self.writer.close() self.writer = None - - if self._tasks: - [task.cancel() for task in self._tasks] + if self.process is not None: + await self.stop_process(self.stop_timeout) + self.process = None + if self.from_lsp is not None: + await self.from_lsp.aclose() + self.from_lsp = None + if self.to_lsp is not None: + await self.to_lsp.aclose() + self.to_lsp = None self.status = SessionStatus.STOPPED @@ -116,43 +217,77 @@ def stop(self): def _on_handlers(self, change: Bunch): """re-initialize if someone starts listening, or stop if nobody is""" if change["new"] and not self.process: - self.initialize() + self.start() elif not change["new"] and self.process: self.stop() def write(self, message): """wrapper around the write queue to keep it mostly internal""" self.last_handler_message_at = self.now() - IOLoop.current().add_callback(self.to_lsp.put_nowait, message) + self.thread_loop.add_callback(self.to_lsp.send, message) def now(self): return datetime.now(timezone.utc) - def init_process(self): - """start the language server subprocess""" - self.process = subprocess.Popen( - self.spec["argv"], + async def start_process(self, argv: List[str]): + """start the language server subprocess given in argv""" + self.process = await anyio.open_process( + argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, env=self.substitute_env(self.spec.get("env", {}), os.environ), - bufsize=0, ) + async def stop_process(self, timeout: int = 5): + """stop the language server subprocess + + If the process does not terminate within timeout seconds it will be killed + forcefully. + """ + if self.process is None: # pragma: no cover + return + + if timeout < 0.0: # pragma: no cover + raise ValueError("Timeout must not be negative!") + + # try to stop the process gracefully + self.process.terminate() + with anyio.move_on_after(timeout + 0.1): # avoid timeout of 0s + self.log.debug("Waiting for process to terminate") + await self.process.wait() + return + + if sys.platform.startswith("win32"): # pragma: no cover + # On Windows Process.kill() is an alias to Process.terminate() so we cannot + # force the process to stop. if you know of a better way to handle this on + # Windows please consider contributing + self.log.warning( + ( + "The language server process (PID: {}) did not terminate within {} " + "seconds. Beware, it might continue running as a zombie process." + ).format(self.process.pid, timeout) + ) + else: # pragma: no cover + self.log.debug( + ( + "Process did not terminate within {} seconds. " + "Bringing it down the hard way!" + ).format(timeout) + ) + try: # pragma: no cover + self.process.kill() + except ProcessLookupError: + # process terminated on its own in the meantime + pass + def init_queues(self): """create the queues""" - self.from_lsp = Queue() - self.to_lsp = Queue() - - def init_reader(self): - """create the stdout reader (from the language server)""" - self.reader = stdio.LspStdIoReader( - stream=self.process.stdout, queue=self.from_lsp, parent=self + queue_size = math.inf if self.queue_size < 0 else self.queue_size + self.from_lsp = StapledObjectStream( + *anyio.create_memory_object_stream(max_buffer_size=queue_size) ) - - def init_writer(self): - """create the stdin writer (to the language server)""" - self.writer = stdio.LspStdIoWriter( - stream=self.process.stdin, queue=self.to_lsp, parent=self + self.to_lsp = StapledObjectStream( + *anyio.create_memory_object_stream(max_buffer_size=queue_size) ) def substitute_env(self, env, base): @@ -163,6 +298,25 @@ def substitute_env(self, env, base): return final_env + @abstractmethod + async def init_process(self): + """start the language server subprocess and store it in self.process""" + pass # pragma: no cover + + @abstractmethod + def init_reader(self): + """create the stream reader (from the language server) and store it in + self.reader + """ + pass # pragma: no cover + + @abstractmethod + def init_writer(self): + """create the stream writer (to the language server) and store it in + self.writer + """ + pass # pragma: no cover + async def _read_lsp(self): await self.reader.read() @@ -175,5 +329,85 @@ async def _broadcast_from_lsp(self): """ async for message in self.from_lsp: self.last_server_message_at = self.now() - await self.parent.on_server_message(message, self) - self.from_lsp.task_done() + # handle message in the main thread's event loop + self.main_loop.add_callback(self.parent.on_server_message, message, self) + + +class LanguageServerSessionStdio(LanguageServerSessionBase): + async def init_process(self): + await self.start_process(self.spec["argv"]) + + def init_reader(self): + self.reader = LspStreamReader( + stream=self.process.stdout, queue=self.from_lsp, parent=self + ) + + def init_writer(self): + self.writer = LspStreamWriter( + stream=self.process.stdin, queue=self.to_lsp, parent=self + ) + + +class LanguageServerSessionTCP(LanguageServerSessionBase): + + tcp_con = Instance(SocketStream, help="the tcp connection", allow_none=True) + + async def init_process(self): + """start the language server subprocess""" + argv = self.spec["argv"] + + host = "127.0.0.1" + port = get_unused_port() + + # substitute arguments for host and port into the environment + argv = [arg.format(host=host, port=port) for arg in argv] + + # start the process + await self.start_process(argv) + + # finally open the tcp connection to the now running process + self.tcp_con = await self.init_tcp_connection(host, port) + + async def stop_process(self, timeout: int = 5): + await self.tcp_con.aclose() + self.tcp_con = None + + await super().stop_process(timeout) + + async def init_tcp_connection(self, host, port, retries=12, sleep=5.0): + server = "{}:{}".format(host, port) + + tries = 0 + while tries < retries: + tries = tries + 1 + try: + return await anyio.connect_tcp(host, port) + except OSError: # pragma: no cover + if tries < retries: + self.log.warning( + ( + "Connection to server {} refused! " + "Attempt {}/{}. " + "Retrying in {}s" + ).format(server, tries, retries, sleep) + ) + await anyio.sleep(sleep) + else: + self.log.warning( + "Connection to server {} refused! Attempt {}/{}.".format( + server, tries, retries + ) + ) + raise OSError( + "Unable to connect to server {}".format(server) + ) # pragma: no cover + + def init_reader(self): + self.reader = LspStreamReader( + stream=self.tcp_con, queue=self.from_lsp, parent=self + ) + + def init_writer(self): + self.writer = LspStreamWriter( + stream=self.tcp_con, queue=self.to_lsp, parent=self + ) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py b/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py index edd15f613..c96164579 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/specs/__init__.py @@ -9,7 +9,7 @@ from .julia_language_server import JuliaLanguageServer from .pyls import PalantirPythonLanguageServer from .pyright import PyrightLanguageServer -from .python_lsp_server import PythonLSPServer +from .python_lsp_server import PythonLSPServer, PythonLSPServerTCP from .r_languageserver import RLanguageServer from .sql_language_server import SQLLanguageServer from .texlab import Texlab @@ -30,6 +30,7 @@ md = UnifiedLanguageServer() py_palantir = PalantirPythonLanguageServer() py_lsp_server = PythonLSPServer() +py_lsp_server_tcp = PythonLSPServerTCP() pyright = PyrightLanguageServer() r = RLanguageServer() tex = Texlab() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py b/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py index 799222b2d..6dc266f67 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/specs/python_lsp_server.py @@ -47,3 +47,11 @@ class PythonLSPServer(PythonModuleSpec): config_schema=load_config_schema(key), env=dict(PYTHONUNBUFFERED="1"), ) + + +class PythonLSPServerTCP(PythonLSPServer): + key = "pylsp-tcp" + args = ["--tcp", "--port", "{port}"] + spec = PythonLSPServer.spec.copy() + spec["display_name"] = "python-lsp-server (pylsp) over tcp" + spec["mode"] = "tcp" diff --git a/python_packages/jupyter_lsp/jupyter_lsp/stdio.py b/python_packages/jupyter_lsp/jupyter_lsp/stdio.py deleted file mode 100644 index e9af4d5ae..000000000 --- a/python_packages/jupyter_lsp/jupyter_lsp/stdio.py +++ /dev/null @@ -1,202 +0,0 @@ -""" Language Server stdio-mode readers - -Parts of this code are derived from: - -> https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/pyls_jsonrpc/streams.py#L83 # noqa -> https://github.com/palantir/python-jsonrpc-server/blob/45ed1931e4b2e5100cc61b3992c16d6f68af2e80/pyls_jsonrpc/streams.py # noqa -> > MIT License https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/LICENSE -> > Copyright 2018 Palantir Technologies, Inc. -""" -# pylint: disable=broad-except -import asyncio -import io -import os -from concurrent.futures import ThreadPoolExecutor -from typing import List, Optional, Text - -from tornado.concurrent import run_on_executor -from tornado.gen import convert_yielded -from tornado.httputil import HTTPHeaders -from tornado.ioloop import IOLoop -from tornado.queues import Queue -from traitlets import Float, Instance, default -from traitlets.config import LoggingConfigurable - -from .non_blocking import make_non_blocking - - -class LspStdIoBase(LoggingConfigurable): - """Non-blocking, queued base for communicating with stdio Language Servers""" - - executor = None - - stream = Instance( # type:ignore[assignment] - io.RawIOBase, help="the stream to read/write" - ) # type: io.RawIOBase - queue = Instance(Queue, help="queue to get/put") - - def __repr__(self): # pragma: no cover - return "<{}(parent={})>".format(self.__class__.__name__, self.parent) - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.log.debug("%s initialized", self) - self.executor = ThreadPoolExecutor(max_workers=1) - - def close(self): - self.stream.close() - self.log.debug("%s closed", self) - - -class LspStdIoReader(LspStdIoBase): - """Language Server stdio Reader - - Because non-blocking (but still synchronous) IO is used, rudimentary - exponential backoff is used. - """ - - max_wait = Float(help="maximum time to wait on idle stream").tag(config=True) - min_wait = Float(0.05, help="minimum time to wait on idle stream").tag(config=True) - next_wait = Float(0.05, help="next time to wait on idle stream").tag(config=True) - - @default("max_wait") - def _default_max_wait(self): - return 0.1 if os.name == "nt" else self.min_wait * 2 - - async def sleep(self): - """Simple exponential backoff for sleeping""" - if self.stream.closed: # pragma: no cover - return - self.next_wait = min(self.next_wait * 2, self.max_wait) - try: - await asyncio.sleep(self.next_wait) - except Exception: # pragma: no cover - pass - - def wake(self): - """Reset the wait time""" - self.wait = self.min_wait - - async def read(self) -> None: - """Read from a Language Server until it is closed""" - make_non_blocking(self.stream) - - while not self.stream.closed: - message = None - try: - message = await self.read_one() - - if not message: - await self.sleep() - continue - else: - self.wake() - - IOLoop.current().add_callback(self.queue.put_nowait, message) - except Exception as e: # pragma: no cover - self.log.exception( - "%s couldn't enqueue message: %s (%s)", self, message, e - ) - await self.sleep() - - async def _read_content( - self, length: int, max_parts=1000, max_empties=200 - ) -> Optional[bytes]: - """Read the full length of the message unless exceeding max_parts or - max_empties empty reads occur. - - See https://github.com/jupyter-lsp/jupyterlab-lsp/issues/450 - - Crucial docs or read(): - "If the argument is positive, and the underlying raw - stream is not interactive, multiple raw reads may be issued - to satisfy the byte count (unless EOF is reached first)" - - Args: - - length: the content length - - max_parts: prevent absurdly long messages (1000 parts is several MBs): - 1 part is usually sufficient but not enough for some long - messages 2 or 3 parts are often needed. - """ - raw = None - raw_parts: List[bytes] = [] - received_size = 0 - while received_size < length and len(raw_parts) < max_parts and max_empties > 0: - part = None - try: - part = self.stream.read(length - received_size) - except OSError: # pragma: no cover - pass - if part is None: - max_empties -= 1 - await self.sleep() - continue - received_size += len(part) - raw_parts.append(part) - - if raw_parts: - raw = b"".join(raw_parts) - if len(raw) != length: # pragma: no cover - self.log.warning( - f"Readout and content-length mismatch: {len(raw)} vs {length};" - f"remaining empties: {max_empties}; remaining parts: {max_parts}" - ) - - return raw - - async def read_one(self) -> Text: - """Read a single message""" - message = "" - headers = HTTPHeaders() - - line = await convert_yielded(self._readline()) - - if line: - while line and line.strip(): - headers.parse_line(line) - line = await convert_yielded(self._readline()) - - content_length = int(headers.get("content-length", "0")) - - if content_length: - raw = await self._read_content(length=content_length) - if raw is not None: - message = raw.decode("utf-8").strip() - else: # pragma: no cover - self.log.warning( - "%s failed to read message of length %s", - self, - content_length, - ) - - return message - - @run_on_executor - def _readline(self) -> Text: - """Read a line (or immediately return None)""" - try: - return self.stream.readline().decode("utf-8").strip() - except OSError: # pragma: no cover - return "" - - -class LspStdIoWriter(LspStdIoBase): - """Language Server stdio Writer""" - - async def write(self) -> None: - """Write to a Language Server until it closes""" - while not self.stream.closed: - message = await self.queue.get() - try: - body = message.encode("utf-8") - response = "Content-Length: {}\r\n\r\n{}".format(len(body), message) - await convert_yielded(self._write_one(response.encode("utf-8"))) - except Exception: # pragma: no cover - self.log.exception("%s couldn't write message: %s", self, response) - finally: - self.queue.task_done() - - @run_on_executor - def _write_one(self, message) -> None: - self.stream.write(message) - self.stream.flush() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py index fd8e0b93a..4ece104fe 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py @@ -20,6 +20,7 @@ "dockerfile-language-server-nodejs", "typescript-language-server", "pylsp", + "pylsp-tcp", "unified-language-server", "sql-language-server", "vscode-css-languageserver-bin", @@ -112,6 +113,12 @@ def jsonrpc_init_msg(): ) +# only run tests with asyncio +@fixture +def anyio_backend(): + return "asyncio" + + @fixture def app(): return MockServerApp() diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py index 528be4034..09ab2c319 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_bad_spec.py @@ -1,7 +1,8 @@ import pytest import traitlets -from jupyter_lsp.session import LanguageServerSession +from jupyter_lsp.schema import SPEC_VERSION +from jupyter_lsp.session import LanguageServerSessionStdio @pytest.mark.parametrize( @@ -12,8 +13,14 @@ {"languages": None}, {"languages": 1}, {"languages": [1, "two"]}, + { + "argv": ["command"], + "languages": ["some language"], + "version": SPEC_VERSION, + "mode": "unknown", + }, ], ) def test_bad_spec(spec): with pytest.raises(traitlets.TraitError): - LanguageServerSession(spec=spec) + LanguageServerSessionStdio(spec=spec) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py new file mode 100644 index 000000000..203576e7d --- /dev/null +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py @@ -0,0 +1,161 @@ +import math +import subprocess + +import anyio +import pytest +from anyio.streams.stapled import StapledObjectStream + +from jupyter_lsp.connection import LspStreamReader +from jupyter_lsp.utils import get_unused_port + +WRITER_TEMPLATE = """ +from time import sleep + +# the LSP states that each header field must be terminated by \\r\\n +print('Content-Length: {length}', end='\\r\\n') +# and the header must be terminated again by \\r\\n +print(end='\\r\\n') + +for repeat in range({repeats}): + sleep({interval}) + print('{message}', end='') + +if {add_excess}: + print("extra", end='') + +print() +""" + +TCP_WRITER_TEMPLATE = """ +from anyio import create_tcp_listener, create_task_group, run, sleep, Event + +async def serve_once(listener): + async def handle(client): + async with client: + # the LSP states that each header field must be terminated by \\r\\n + await client.send(b'Content-Length: {length}\\r\\n') + # and the header must be terminated again by \\r\\n + await client.send(b'\\r\\n') + + for repeat in range({repeats}): + await sleep({interval}) + await client.send(b'{message}') + + if {add_excess}: + await client.send(b"extra") + + await client.send(b'\\n') + + stop.set() + + async def cancel_on_event(scope): + await stop.wait() + scope.cancel() + + stop = Event() + async with create_task_group() as tg: + tg.start_soon(listener.serve, handle) + tg.start_soon(cancel_on_event, tg.cancel_scope) + +async def main(): + listener = await create_tcp_listener(local_port={port}) + await serve_once(listener) + +run(main) +""" + + +class CommunicatorSpawner: + def __init__(self, tmp_path): + self.tmp_path = tmp_path + + async def spawn_writer( + self, message: str, repeats: int = 1, interval=None, add_excess=False, port=None + ): + template = WRITER_TEMPLATE if port is None else TCP_WRITER_TEMPLATE + length = len(message) * repeats + commands_file = self.tmp_path / "writer.py" + commands_file.write_text( + template.format( + length=length, + repeats=repeats, + interval=interval or 0, + message=message, + add_excess=add_excess, + port=port, + ) + ) + return await anyio.open_process( + ["python", "-u", str(commands_file)], stdout=subprocess.PIPE + ) + + +@pytest.fixture +def communicator_spawner(tmp_path): + return CommunicatorSpawner(tmp_path) + + +async def join_process(process: anyio.abc.Process, headstart=1, timeout=1): + await anyio.sleep(headstart) + # wait for timeout second for the process to terminate before raising a TimeoutError + with anyio.fail_after(timeout): + result = await process.wait() + # close any streams attached to stdout + if process.stdout: + await process.stdout.aclose() + return result + + +@pytest.mark.parametrize( + "message,repeats,interval,add_excess", + [ + ["short", 1, None, False], + ["ab" * 100_000, 1, None, False], + ["ab", 2, 0.01, False], + ["ab", 45, 0.01, False], + ["message", 2, 0.01, True], + ], + ids=[ + "short", + "long", + "intermittent", + "intensive-intermittent", + "with-excess", + ], +) +@pytest.mark.parametrize("mode", ["stdio", "tcp"], ids=["stdio", "tcp"]) +@pytest.mark.anyio +async def test_reader( + message, repeats, interval, add_excess, mode, communicator_spawner +): + queue = StapledObjectStream( + *anyio.create_memory_object_stream(max_buffer_size=math.inf) + ) + + port = get_unused_port() if mode == "tcp" else None + process = await communicator_spawner.spawn_writer( + message=message, + repeats=repeats, + interval=interval, + add_excess=add_excess, + port=port, + ) + stream = None + if port is None: + stream = process.stdout + else: + # give the server some time to start + await anyio.sleep(2) + stream = await anyio.connect_tcp("127.0.0.1", port) + + reader = LspStreamReader(stream=stream, queue=queue) + + async with anyio.create_task_group() as tg: + tg.start_soon(join_process, process, 3, 1) + tg.start_soon(reader.read) + + if port is not None: + await stream.aclose() + + result = await queue.receive() + assert result == message * repeats diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py index 5e704550b..7ac68a84d 100644 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py +++ b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py @@ -1,4 +1,6 @@ import asyncio +import os +from sys import platform import pytest @@ -20,7 +22,7 @@ async def assert_status_set(handler, expected_statuses, language_server=None): assert statuses == expected_statuses, payload -@pytest.mark.asyncio +@pytest.mark.anyio async def test_start_known(known_server, handlers, jsonrpc_init_msg): """will a process start for a known server if a handler starts?""" handler, ws_handler = handlers @@ -100,3 +102,46 @@ async def test_ping(handlers): assert ws_handler._ping_sent is True ws_handler.on_close() + + +def exists_process_with_pid(pid): + try: + os.kill(pid, 0) + except OSError: + return False + else: + return True + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "timeout", [0, 5], ids=["terminate immediately", "after 5 seconds"] +) +async def test_stop(handlers, timeout): + """Test whether process will stop gracefully or forcefully""" + a_server = "pylsp" + + handler, ws_handler = handlers + manager = handler.manager + + manager.initialize() + + await ws_handler.open(a_server) + + session = manager.sessions[ws_handler.language_server] + session.stop_timeout = timeout + + process_pid = session.process.pid + assert exists_process_with_pid(process_pid) is True + + ws_handler.on_close() + + if platform.startswith("win32"): # pragma: no cover + # currently we cannot forcefully terminate the process on windows, so we just + # give it a little more extra time to finish on its own + await asyncio.sleep(timeout + 10) + else: # pragma: no cover + # linux and darwin + await asyncio.sleep(timeout + 2) + + assert exists_process_with_pid(process_pid) is False diff --git a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py b/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py deleted file mode 100644 index 8df7e2c57..000000000 --- a/python_packages/jupyter_lsp/jupyter_lsp/tests/test_stdio.py +++ /dev/null @@ -1,85 +0,0 @@ -import asyncio -import subprocess - -import pytest -from tornado.queues import Queue - -from jupyter_lsp.stdio import LspStdIoReader - -WRITER_TEMPLATE = """ -from time import sleep - -print('Content-Length: {length}') -print() - -for repeat in range({repeats}): - sleep({interval}) - print('{message}', end='') - -if {add_excess}: - print("extra", end='') - -print() -""" - - -class CommunicatorSpawner: - def __init__(self, tmp_path): - self.tmp_path = tmp_path - - def spawn_writer( - self, message: str, repeats: int = 1, interval=None, add_excess=False - ): - length = len(message) * repeats - commands_file = self.tmp_path / "writer.py" - commands_file.write_text( - WRITER_TEMPLATE.format( - length=length, - repeats=repeats, - interval=interval or 0, - message=message, - add_excess=add_excess, - ) - ) - return subprocess.Popen( - ["python", "-u", str(commands_file)], stdout=subprocess.PIPE, bufsize=0 - ) - - -@pytest.fixture -def communicator_spawner(tmp_path): - return CommunicatorSpawner(tmp_path) - - -async def join_process(process: subprocess.Popen, headstart=1, timeout=1): - await asyncio.sleep(headstart) - result = process.wait(timeout=timeout) - if process.stdout: - process.stdout.close() - return result - - -@pytest.mark.parametrize( - "message,repeats,interval,add_excess", - [ - ["short", 1, None, False], - ["ab" * 10_0000, 1, None, False], - ["ab", 2, 0.01, False], - ["ab", 45, 0.01, False], - ["message", 2, 0.01, True], - ], - ids=["short", "long", "intermittent", "intensive-intermittent", "with-excess"], -) -@pytest.mark.asyncio -async def test_reader(message, repeats, interval, add_excess, communicator_spawner): - queue = Queue() - - process = communicator_spawner.spawn_writer( - message=message, repeats=repeats, interval=interval, add_excess=add_excess - ) - reader = LspStdIoReader(stream=process.stdout, queue=queue) - - await asyncio.gather(join_process(process, headstart=3, timeout=1), reader.read()) - - result = queue.get_nowait() - assert result == message * repeats diff --git a/python_packages/jupyter_lsp/jupyter_lsp/threaded_child_watcher.py b/python_packages/jupyter_lsp/jupyter_lsp/threaded_child_watcher.py new file mode 100644 index 000000000..2df9488e0 --- /dev/null +++ b/python_packages/jupyter_lsp/jupyter_lsp/threaded_child_watcher.py @@ -0,0 +1,172 @@ +""" +This is derived from +https://github.com/python/cpython/blob/4649202ea75d48e1496e99911709824ca2d3170e/Lib/asyncio/unix_events.py +to workaround https://bugs.python.org/issue35621 on Python < 3.8 as suggested by Python commited in +https://bugs.python.org/issue35621#msg370355. No modifications were made. + +PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2 +-------------------------------------------- + +1. This LICENSE AGREEMENT is between the Python Software Foundation +("PSF"), and the Individual or Organization ("Licensee") accessing and +otherwise using this software ("Python") in source or binary form and +its associated documentation. + +2. Subject to the terms and conditions of this License Agreement, PSF hereby +grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce, +analyze, test, perform and/or display publicly, prepare derivative works, +distribute, and otherwise use Python alone or in any derivative version, +provided, however, that PSF's License Agreement and PSF's notice of copyright, +i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020 Python Software Foundation; +All Rights Reserved" are retained in Python alone or in any derivative version +prepared by Licensee. + +3. In the event Licensee prepares a derivative work that is based on +or incorporates Python or any part thereof, and wants to make +the derivative work available to others as provided herein, then +Licensee hereby agrees to include in any such work a brief summary of +the changes made to Python. + +4. PSF is making Python available to Licensee on an "AS IS" +basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR +IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND +DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS +FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT +INFRINGE ANY THIRD PARTY RIGHTS. + +5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON +FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS +A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON, +OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF. + +6. This License Agreement will automatically terminate upon a material +breach of its terms and conditions. + +7. Nothing in this License Agreement shall be deemed to create any +relationship of agency, partnership, or joint venture between PSF and +Licensee. This License Agreement does not grant permission to use PSF +trademarks or trade name in a trademark sense to endorse or promote +products or services of Licensee, or any third party. + +8. By copying, installing or otherwise using Python, Licensee +agrees to be bound by the terms and conditions of this License +Agreement. +""" +# flake8: noqa +# pragma: no cover +import itertools +import os +import threading +import warnings +from asyncio import AbstractChildWatcher, events +from asyncio.log import logger + + +def _compute_returncode(status): + if os.WIFSIGNALED(status): + # The child process died because of a signal. + return -os.WTERMSIG(status) + elif os.WIFEXITED(status): + # The child process exited (e.g sys.exit()). + return os.WEXITSTATUS(status) + else: + # The child exited, but we don't understand its status. + # This shouldn't happen, but if it does, let's just + # return that status; perhaps that helps debug it. + return status + + +class ThreadedChildWatcher(AbstractChildWatcher): + """Threaded child watcher implementation. + The watcher uses a thread per process + for waiting for the process finish. + It doesn't require subscription on POSIX signal + but a thread creation is not free. + The watcher has O(1) complexity, its performance doesn't depend + on amount of spawn processes. + """ + + def __init__(self): + self._pid_counter = itertools.count(0) + self._threads = {} + + def is_active(self): + return True + + def close(self): + self._join_threads() + + def _join_threads(self): + """Internal: Join all non-daemon threads""" + threads = [ + thread + for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon + ] + for thread in threads: + thread.join() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def __del__(self, _warn=warnings.warn): + threads = [ + thread for thread in list(self._threads.values()) if thread.is_alive() + ] + if threads: + _warn( + f"{self.__class__} has registered but not finished child processes", + ResourceWarning, + source=self, + ) + + def add_child_handler(self, pid, callback, *args): + loop = events.get_running_loop() + thread = threading.Thread( + target=self._do_waitpid, + name=f"waitpid-{next(self._pid_counter)}", + args=(loop, pid, callback, args), + daemon=True, + ) + self._threads[pid] = thread + thread.start() + + def remove_child_handler(self, pid): + # asyncio never calls remove_child_handler() !!! + # The method is no-op but is implemented because + # abstract base classe requires it + return True + + def attach_loop(self, loop): + pass + + def _do_waitpid(self, loop, expected_pid, callback, args): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, 0) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", pid + ) + else: + returncode = _compute_returncode(status) + if loop.get_debug(): + logger.debug( + "process %s exited with returncode %s", expected_pid, returncode + ) + + if loop.is_closed(): + logger.warning("Loop %r that handles pid %r is closed", loop, pid) + else: + loop.call_soon_threadsafe(callback, pid, returncode, *args) + + self._threads.pop(expected_pid) diff --git a/python_packages/jupyter_lsp/jupyter_lsp/utils.py b/python_packages/jupyter_lsp/jupyter_lsp/utils.py new file mode 100644 index 000000000..dac9307ed --- /dev/null +++ b/python_packages/jupyter_lsp/jupyter_lsp/utils.py @@ -0,0 +1,16 @@ +""" get a random port +""" +import socket + + +def get_unused_port(): + """Get an unused port by trying to listen to any random port. + + Probably could introduce race conditions if inside a tight loop. + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(("127.0.0.1", 0)) + sock.listen(1) + port = sock.getsockname()[1] + sock.close() + return port diff --git a/python_packages/jupyter_lsp/setup.cfg b/python_packages/jupyter_lsp/setup.cfg index f52cd4212..b0a20952e 100644 --- a/python_packages/jupyter_lsp/setup.cfg +++ b/python_packages/jupyter_lsp/setup.cfg @@ -41,6 +41,7 @@ jupyter_lsp_spec_v1 = julia-language-server = jupyter_lsp.specs:julia python-language-server = jupyter_lsp.specs:py_palantir python-lsp-server = jupyter_lsp.specs:py_lsp_server + python-lsp-server-tcp = jupyter_lsp.specs:py_lsp_server_tcp pyright = jupyter_lsp.specs:pyright r-languageserver = jupyter_lsp.specs:r texlab = jupyter_lsp.specs:tex @@ -60,3 +61,4 @@ addopts = --cov-report term-missing:skip-covered -p no:warnings --flake8 +timeout = 300 diff --git a/requirements/github-actions.yml b/requirements/github-actions.yml index 551567015..79aa755a2 100644 --- a/requirements/github-actions.yml +++ b/requirements/github-actions.yml @@ -34,6 +34,7 @@ dependencies: # test tools - pytest-asyncio - pytest-cov + - pytest-timeout # temporary pin added on 2022-12-28, if editing this file try to remove it: # pytoolconfig 1.2.4 depends on packaging>=22.0 which breaks `pip check` - pytoolconfig <1.2.4 diff --git a/requirements/utest.txt b/requirements/utest.txt index 726fccae4..83ab8c57b 100644 --- a/requirements/utest.txt +++ b/requirements/utest.txt @@ -5,5 +5,6 @@ pytest-asyncio pytest-cov pytest-flake8 pytest-runner +pytest-timeout python-lsp-server pluggy<1.0,>=0.12 # Python 3.5 CI Travis, may need update with new pytest releases, see issue 259