diff --git a/LSP.sublime-settings b/LSP.sublime-settings index 5944477cb..3da982f93 100644 --- a/LSP.sublime-settings +++ b/LSP.sublime-settings @@ -324,12 +324,18 @@ // // file on disk. // "schemes": ["file", "buffer", "res"], // - // // When you want to connect to the language server via TCP (on - // // localhost), specify the port here. If you put a value of 0 here, - // // then LSP will select a free port number on localhost. In that case, - // // you can use the string templates $port or ${port} in the "command". - // // The syntax "{port}" is also allowed, but deprecated in favor of - // // $port and ${port}. + // // When set to a positive number bigger than 0, specifies the TCP port to + // // use to connect to the language server process listening on the given + // // port. When set to zero, a free TCP port is chosen. Chosen TCP port + // // number can be accessed through a template variable, i.e. as `${port}` + // // in the "command"`. + // // + // // Set to a negative number to make the LSP client act as TCP server + // // awaiting connection from the LSP server. Using `-1` opens a random port. + // // To use a fixed port number, use `-X` as the value for `tcp_port`, where + // // `X` is the desired (positive) port number. + // // + // // If not specified, STDIO is used as the transport. // "tcp_port": 1234, // // // Sent to server once using workspace/didChangeConfiguration diff --git a/plugin/core/sessions.py b/plugin/core/sessions.py index 8a2965ce3..cef085069 100644 --- a/plugin/core/sessions.py +++ b/plugin/core/sessions.py @@ -111,8 +111,8 @@ from .protocol import ResponseError from .settings import globalprefs from .settings import userprefs -from .transports import Transport from .transports import TransportCallbacks +from .transports import TransportWrapper from .types import Capabilities from .types import ClientConfig from .types import ClientStates @@ -958,15 +958,15 @@ def check_applicable(self, sb: SessionBufferProtocol, *, suppress_requests: bool _PARTIAL_RESULT_PROGRESS_PREFIX = "$ublime-partial-result-progress-" -class Session(APIHandler, TransportCallbacks['dict[str, Any]']): +class Session(APIHandler, TransportCallbacks): def __init__(self, manager: Manager, logger: Logger, workspace_folders: list[WorkspaceFolder], config: ClientConfig, plugin_class: type[AbstractPlugin] | None) -> None: - self.transport: Transport | None = None + self.transport: TransportWrapper | None = None self.working_directory: str | None = None self.request_id = 0 # Our request IDs are always integers. self._logger = logger - self._response_handlers: dict[int, tuple[Request[Any, Any], Callable[[Any], None], Callable[[ResponseError], None]]] = {} # noqa: E501 + self._response_handlers: dict[str | int, tuple[Request[Any, Any], Callable[[Any], None], Callable[[ResponseError], None]]] = {} # noqa: E501 self.config = config self.config_status_message = '' self.manager = weakref.ref(manager) @@ -1238,7 +1238,7 @@ def initialize_async( self, variables: dict[str, str], working_directory: str | None, - transport: Transport, + transport: TransportWrapper, init_callback: InitCallback ) -> None: self.transport = transport @@ -2046,7 +2046,8 @@ def on_progress(self, params: ProgressParams) -> None: # call window/workDoneProgress/create before hand. In that case, we check the 'kind' field of the # progress data. If the 'kind' field is 'begin', we set up a progress reporter anyway. try: - request_id = int(token[len(_WORK_DONE_PROGRESS_PREFIX):]) # type: ignore + token = str(token) + request_id = int(token[len(_WORK_DONE_PROGRESS_PREFIX):]) request = self._response_handlers[request_id][0] self._invoke_views(request, "on_request_progress", request_id, params) return @@ -2196,21 +2197,20 @@ def send_error_response(self, request_id: int | str, error: Error) -> None: def exit(self) -> None: self.send_notification(Notification.exit()) - try: - self.transport.close() # type: ignore - except AttributeError: - pass + if self.transport: + self.transport.close() + self.transport = None def send_payload(self, payload: JSONRPCMessage) -> None: try: - self.transport.send(payload) # type: ignore + self.transport.send(payload) # pyright: ignore[reportOptionalMemberAccess] except AttributeError: pass def deduce_payload( self, - payload: dict[str, Any] - ) -> tuple[Callable | None, Any, int | None, str | None, str | None]: + payload: JSONRPCMessage + ) -> tuple[Callable | None, Any, str | int | None, str | None, str | None]: if "method" in payload: method = payload["method"] handler = self._get_handler(method) @@ -2230,7 +2230,7 @@ def deduce_payload( return res elif "id" in payload: response_id = payload["id"] - if response_id is None: + if response_id is None: # pyright: ignore[reportUnnecessaryComparison] self._logger.incoming_response('', payload.get("error"), True) return (None, None, None, None, None) handler, method, result, is_error = self.response_handler(response_id, payload) @@ -2240,10 +2240,10 @@ def deduce_payload( self._plugin.on_server_response_async(method, response) # type: ignore return handler, response.result, None, None, None else: - debug("Unknown payload type: ", payload) + debug("Unknown payload type: ", payload) # pyright: ignore[reportUnreachable] return (None, None, None, None, None) - def on_payload(self, payload: dict[str, Any]) -> None: + def on_payload(self, payload: JSONRPCMessage) -> None: handler, result, req_id, typestr, _method = self.deduce_payload(payload) if handler: result_promise: Promise[Response[Any]] | None = None @@ -2268,7 +2268,7 @@ def on_payload(self, payload: dict[str, Any]) -> None: result_promise.then(self.send_response) def response_handler( - self, response_id: int, response: dict[str, Any] + self, response_id: str | int, response: JSONRPCMessage ) -> tuple[Callable[[ResponseError], None], str | None, Any, bool]: matching_handler = self._response_handlers.pop(response_id) if not matching_handler: diff --git a/plugin/core/transports.py b/plugin/core/transports.py index d13e3ed7d..cc8f44e68 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -3,19 +3,22 @@ from .constants import ST_PLATFORM from .logging import debug from .logging import exception_log -from .types import TCP_CONNECT_TIMEOUT -from .types import TransportConfig +from .promise import PackagedTask +from .promise import Promise +from .protocol import JSONRPCMessage +from abc import ABC +from abc import abstractmethod from contextlib import closing from functools import partial +from io import BufferedIOBase from queue import Queue from typing import Any from typing import Callable -from typing import Dict -from typing import Generic +from typing import final from typing import IO -from typing import Protocol -from typing import TypeVar -import http +from typing_extensions import override +import contextlib +import http.client import json import os import shutil @@ -31,144 +34,353 @@ except ImportError: orjson = None -T = TypeVar('T') -T_contra = TypeVar('T_contra', contravariant=True) +TCP_CONNECT_TIMEOUT = 5 # seconds class StopLoopError(Exception): pass -class Transport(Generic[T]): +# --- Transport Configs ------------------------------------------------------------------------------------------------ - def send(self, payload: T) -> None: - raise NotImplementedError - - def close(self) -> None: - raise NotImplementedError - - -class TransportCallbacks(Protocol[T_contra]): - - def on_transport_close(self, exit_code: int, exception: Exception | None) -> None: - ... - - def on_payload(self, payload: T_contra) -> None: - ... - def on_stderr_message(self, message: str) -> None: - ... +class TransportConfig(ABC): + """The object that does the actual RPC communication.""" + @staticmethod + def resolve_launch_config( + command: list[str], + env: dict[str, str] | None, + variables: dict[str, str], + ) -> LaunchConfig: + command = sublime.expand_variables(command, variables) + command = [os.path.expanduser(arg) for arg in command] + resolved_env = os.environ.copy() + if env: + for key, value in env.items(): + if key == "PATH": + resolved_env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + resolved_env[key] + else: + resolved_env[key] = sublime.expand_variables(value, variables) + return LaunchConfig(command, resolved_env) + + @abstractmethod + def start( + self, + command: list[str] | None, + env: dict[str, str] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks, + ) -> TransportWrapper: + raise NotImplementedError -class AbstractProcessor(Generic[T]): - def write_data(self, writer: IO[bytes], data: T) -> None: +class StdioTransportConfig(TransportConfig): + """ + The simplest of transport configs: launch the subprocess and communicate with it over standard I/O. This transport + config requires a "command". This is the default transport config when only a "command" is specified in the + ClientConfig. + """ + + @override + def start( + self, + command: list[str] | None, + env: dict[str, str] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks, + ) -> TransportWrapper: + if not command: + raise RuntimeError('missing "command" to start a child process for running the language server') + process = TransportConfig.resolve_launch_config(command, env, variables).start( + cwd, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if not process.stdout or not process.stdin or not process.stderr: + raise Exception('Failed to create transport config due to not being able to pipe stdio') + return TransportWrapper( + callback_object=callbacks, + transport=FileObjectTransport(encode_json, decode_json, process.stdout, process.stdin), + process=process, + error_reader=ErrorReader(callbacks, process.stderr), + ) + + +class TcpClientTransportConfig(TransportConfig): + """ + Transport for communicating to a language server that expects incoming client connections. The language server acts + as the TCP server, this text editor acts as the TCP client. One can have a "command" with this transport + configuration. In that case the subprocess is launched, and then the TCP connection is attempted. If no "command" is + given, a TCP connection is still made. This can be used for cases where the language server is already running as + part of some larger application, like Godot Editor. + """ + + def __init__(self, port: int | None) -> None: + super().__init__() + self._port = port + if isinstance(self._port, int) and self._port <= 0: + raise RuntimeError("invalid port number") + + @override + def start( + self, + command: list[str] | None, + env: dict[str, str] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks, + ) -> TransportWrapper: + port = _add_and_resolve_port_variable(variables, self._port) + if command: + process = TransportConfig.resolve_launch_config(command, env, variables).start( + cwd, + stdout=subprocess.PIPE, + stdin=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) + if not process.stdout: + raise Exception('Failed to create transport config due to not being able to pipe stdout') + error_reader = ErrorReader(callbacks, process.stdout) + else: + process = None + error_reader = None + return TransportWrapper( + callback_object=callbacks, + transport=SocketTransport(encode_json, decode_json, self._connect(port)), + process=process, + error_reader=error_reader, + ) + + def _connect(self, port: int) -> socket.socket: + start_time = time.time() + while time.time() - start_time < TCP_CONNECT_TIMEOUT: + try: + return socket.create_connection(('localhost', port)) + except ConnectionRefusedError: + pass + raise RuntimeError("failed to connect") + + +class TcpServerTransportConfig(TransportConfig): + """ + Transport for communicating to a language server over TCP. The difference, however, is that this transport will + start a TCP listener socket accepting new TCP cliet connections. Once a client connects to this text editor acting + as the TCP server, we'll assume it's the language server we just launched. As such, this tranport requires a + "command" for starting the language server subprocess. + """ + + def __init__(self, port: int | None) -> None: + self._port = port + if isinstance(self._port, int) and self._port <= 0: + raise RuntimeError("invalid port number") + + @override + def start( + self, + command: list[str] | None, + env: dict[str, str] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks, + ) -> TransportWrapper: + if not command: + raise RuntimeError('missing "command" to start a child process for running the language server') + port = _add_and_resolve_port_variable(variables, self._port) + launch = TransportConfig.resolve_launch_config(command, env, variables) + listener_socket = socket.socket() + listener_socket.bind(('localhost', port)) + listener_socket.settimeout(TCP_CONNECT_TIMEOUT) + listener_socket.listen(1) + process_task: PackagedTask[subprocess.Popen[bytes] | None] = Promise.packaged_task() + process_promise, resolve_process = process_task + + # We need to be able to start the process while also awaiting a client connection. + def start_in_background() -> None: + # Sleep for one second, because the listener socket needs to be in the "accept" state before starting the + # subprocess. This is hacky, and will get better when we can use asyncio. + time.sleep(1) + resolve_process(launch.start( + cwd, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)) + + thread = threading.Thread(target=start_in_background) + thread.start() + with closing(listener_socket): + # Await one client connection (blocking!) + sock, _ = listener_socket.accept() + thread.join() + process = process_promise.value + if not process: + raise Exception('Failed to create transport config from separate thread.') + if not process.stderr: + raise Exception('Failed to create transport config due to not being able to pipe stderr') + error_reader = ErrorReader(callbacks, process.stderr) + return TransportWrapper( + callback_object=callbacks, + transport=SocketTransport(encode_json, decode_json, sock), + process=process, + error_reader=error_reader, + ) + + +# --- Transports ------------------------------------------------------------------------------------------------------- + + +class TransportCallbacks: + def on_transport_close(self, exit_code: int, exception: Exception | None) -> None: ... + + def on_payload(self, payload: JSONRPCMessage) -> None: ... + + def on_stderr_message(self, message: str) -> None: ... + + +class Transport(ABC): + def __init__( + self, + encoder: Callable[[JSONRPCMessage], bytes], + decoder: Callable[[bytes], JSONRPCMessage] + ) -> None: + self._encoder = encoder + self._decoder = decoder + + @abstractmethod + def read(self) -> JSONRPCMessage | None: raise NotImplementedError - def read_data(self, reader: IO[bytes]) -> T | None: + @abstractmethod + def write(self, payload: JSONRPCMessage) -> None: raise NotImplementedError + @abstractmethod + def close(self) -> None: + raise NotImplementedError -class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): - def write_data(self, writer: IO[bytes], data: dict[str, Any]) -> None: - body = self._encode(data) - writer.writelines((f"Content-Length: {len(body)}\r\n\r\n".encode('ascii'), body)) +class FileObjectTransport(Transport): + def __init__( + self, + encoder: Callable[[JSONRPCMessage], bytes], + decoder: Callable[[bytes], JSONRPCMessage], + reader: IO[bytes] | BufferedIOBase, + writer: IO[bytes] | BufferedIOBase, + ) -> None: + super().__init__(encoder, decoder) + self._reader = reader + self._writer = writer - def read_data(self, reader: IO[bytes]) -> dict[str, Any] | None: - headers = http.client.parse_headers(reader) # type: ignore + @override + def read(self) -> JSONRPCMessage: + headers: http.client.HTTPMessage | None = None try: - body = reader.read(int(headers.get("Content-Length"))) + headers = http.client.parse_headers(self._reader) + content_length = headers.get("Content-Length") + if not isinstance(content_length, str): + raise TypeError("Missing Content-Length header") + body = self._reader.read(int(content_length)) except TypeError as ex: - if str(headers) == '\n': + if str(headers) == "\n": # Expected on process stopping. Gracefully stop the transport. raise StopLoopError from None # Propagate server's output to the UI. raise Exception(f"Unexpected payload in server's stdout:\n\n{headers}") from ex try: - return self._decode(body) + return self._decoder(body) except Exception as ex: raise Exception(f"JSON decode error: {ex}") from ex - @staticmethod - def _encode(data: dict[str, Any]) -> bytes: - if orjson: - return orjson.dumps(data) - return json.dumps( - data, - ensure_ascii=False, - sort_keys=False, - check_circular=False, - separators=(',', ':') - ).encode('utf-8') + @override + def write(self, payload: JSONRPCMessage) -> None: + body = self._encoder(payload) + self._writer.writelines((f"Content-Length: {len(body)}\r\n\r\n".encode("ascii"), body)) + self._writer.flush() + + @override + def close(self) -> None: + self._writer.close() + self._reader.close() + + +class SocketTransport(FileObjectTransport): + def __init__( + self, + encoder: Callable[[JSONRPCMessage], bytes], + decoder: Callable[[bytes], JSONRPCMessage], + sock: socket.socket + ) -> None: + reader_writer_pair = sock.makefile("rwb") + super().__init__(encoder, decoder, reader_writer_pair, reader_writer_pair) + self._socket = sock + + @override + def close(self) -> None: + super().close() + self._socket.close() - @staticmethod - def _decode(message: bytes) -> dict[str, Any]: - if orjson: - return orjson.loads(message) - return json.loads(message.decode('utf-8')) +# --- TransportWrapper ------------------------------------------------------------------------------------------------- -class ProcessTransport(Transport[T]): - def __init__(self, name: str, process: subprocess.Popen | None, socket: socket.socket | None, - reader: IO[bytes], writer: IO[bytes], stderr: IO[bytes] | None, - processor: AbstractProcessor[T], callback_object: TransportCallbacks[T]) -> None: +@final +class TransportWrapper: + """ + Double dispatch-like class that takes a (subclass of) Transport, and provides to a (subclass of) TransportCallbacks + appropriately decoded messages. The TransportWrapper is also responsible for keeping the spawned child + process around (if any), and also keeps track of the ErrorReader. It can be the case that there is no ErrorReader, + for instance when talking to a remote TCP language server. So it can be None. + """ + + def __init__( + self, + callback_object: TransportCallbacks, + transport: Transport, + process: subprocess.Popen[bytes] | None, + error_reader: ErrorReader | None, + ) -> None: self._closed = False - self._process = process - self._socket = socket - self._reader = reader - self._writer = writer - self._stderr = stderr - self._processor = processor - self._reader_thread = threading.Thread(target=self._read_loop, name=f'{name}-reader') - self._writer_thread = threading.Thread(target=self._write_loop, name=f'{name}-writer') self._callback_object = weakref.ref(callback_object) - self._send_queue: Queue[T | None] = Queue(0) + self._transport = transport + self._process = process + self._error_reader = error_reader + self._reader_thread = threading.Thread(target=self._read_loop) + self._writer_thread = threading.Thread(target=self._write_loop) + self._send_queue: Queue[JSONRPCMessage | None] = Queue(0) self._reader_thread.start() self._writer_thread.start() - if stderr: - self._stderr_thread = threading.Thread(target=self._stderr_loop, name=f'{name}-stderr') - self._stderr_thread.start() - def send(self, payload: T) -> None: + @property + def process_args(self) -> Any: + return self._process.args if self._process else None + + def send(self, payload: JSONRPCMessage) -> None: self._send_queue.put_nowait(payload) def close(self) -> None: if not self._closed: - self._send_queue.put_nowait(None) - if self._socket: - self._socket.close() self._closed = True - - def _join_thread(self, t: threading.Thread) -> None: - if t.ident == threading.current_thread().ident: - return - try: - t.join(2) - except TimeoutError as ex: - exception_log(f"failed to join {t.name} thread", ex) - - def __del__(self) -> None: - self.close() - self._join_thread(self._writer_thread) - self._join_thread(self._reader_thread) - if self._stderr_thread: - self._join_thread(self._stderr_thread) + self._send_queue.put_nowait(None) + _join_thread(self._writer_thread) + _join_thread(self._reader_thread) + if self._error_reader: + self._error_reader.on_transport_close() + self._error_reader = None + if self._transport: + self._transport.close() + self._transport = None def _read_loop(self) -> None: exception = None try: - while self._reader: - payload = self._processor.read_data(self._reader) - if payload is None: + while self._transport: + if (payload := self._transport.read()) is None: continue - def invoke(p: T) -> None: + def invoke(p: JSONRPCMessage) -> None: if self._closed: return - callback_object = self._callback_object() - if callback_object: + if callback_object := self._callback_object(): callback_object.on_payload(p) sublime.set_timeout_async(partial(invoke, payload)) @@ -213,25 +425,60 @@ def invoke() -> None: def _write_loop(self) -> None: exception: Exception | None = None try: - while self._writer: - d = self._send_queue.get() - if d is None: + while self._transport: + if (d := self._send_queue.get()) is None: break - self._processor.write_data(self._writer, d) - self._writer.flush() + self._transport.write(d) except (BrokenPipeError, AttributeError): pass except Exception as ex: exception = ex self._end(exception) - def _stderr_loop(self) -> None: + +class LaunchConfig: + __slots__ = ("command", "env") + + def __init__(self, command: list[str], env: dict[str, str] | None = None) -> None: + self.command: list[str] = command + self.env: dict[str, str] = env or {} + + def start( + self, + cwd: str | None, + stdin: int, + stdout: int, + stderr: int, + ) -> subprocess.Popen[bytes]: + startupinfo = _fixup_startup_args(self.command) + return _start_subprocess(self.command, stdin, stdout, stderr, startupinfo, self.env, cwd) + + +# --- Utils ------------------------------------------------------------------------------------------------------- + +class ErrorReader: + """ + Relays log messages from a raw stream to a (subclass of) TransportCallbacks. + + Because the various transport configurations want to listen to different streams, perhaps completely separate from + the regular RPC transport, this is wrapped in a different class. For instance, a TCP client transport communicating + via a socket, while it listens for log messages on the stdout/stderr streams of a spawned child process. + """ + + def __init__(self, callback_object: TransportCallbacks, reader: IO[bytes]) -> None: + self._callback_object = weakref.ref(callback_object) + self._reader = reader + self._thread = threading.Thread(target=self._loop) + self._thread.start() + + def on_transport_close(self) -> None: + self._reader = None + _join_thread(self._thread) + + def _loop(self) -> None: try: - while self._stderr: - if self._closed: - # None message already posted, just return - return - message = self._stderr.readline().decode('utf-8', 'replace') + while self._reader: + message = self._reader.readline().decode("utf-8", "replace") if not message: continue callback_object = self._callback_object() @@ -242,61 +489,28 @@ def _stderr_loop(self) -> None: except (BrokenPipeError, AttributeError): pass except Exception as ex: - exception_log('unexpected exception type in stderr loop', ex) - self._send_queue.put_nowait(None) + exception_log("unexpected exception type in error reader", ex) -# Can be a singleton since it doesn't hold any state. -json_rpc_processor = JsonRpcProcessor() +def encode_json(data: JSONRPCMessage) -> bytes: + if orjson: + return orjson.dumps(data) + return json.dumps( + data, + ensure_ascii=False, + sort_keys=False, + check_circular=False, + separators=(",", ":"), + ).encode("utf-8") -def create_transport(config: TransportConfig, cwd: str | None, - callback_object: TransportCallbacks) -> Transport[dict[str, Any]]: - if config.tcp_port is not None: - assert config.tcp_port is not None - if config.tcp_port < 0: - stdout = subprocess.PIPE - else: - stdout = subprocess.DEVNULL - stdin = subprocess.DEVNULL - else: - stdout = subprocess.PIPE - stdin = subprocess.PIPE - sock: socket.socket | None = None - process: subprocess.Popen | None = None - - def start_subprocess() -> subprocess.Popen: - startupinfo = _fixup_startup_args(config.command) - return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd) - - if config.listener_socket: - assert isinstance(config.tcp_port, int) - assert config.tcp_port > 0 - if config.command: - process, sock, reader, writer = _start_subprocess_and_await_connection( - config.listener_socket, start_subprocess - ) - else: - sock, reader, writer = _await_client_connection(config.listener_socket) - else: - if config.command: - process = start_subprocess() - elif not config.tcp_port: - raise RuntimeError("Failed to provide command or tcp_port, at least one of them has to be configured") - if config.tcp_port: - sock = _connect_tcp(config.tcp_port) - if sock is None: - raise RuntimeError(f"Failed to connect on port {config.tcp_port}") - reader = sock.makefile('rwb') # type: ignore - writer = reader - else: - reader = process.stdout # type: ignore - writer = process.stdin # type: ignore - if not reader or not writer: - raise RuntimeError(f'Failed initializing transport: reader: {reader}, writer: {writer}') - stderr = process.stderr if process else None - return ProcessTransport( - config.name, process, sock, reader, writer, stderr, json_rpc_processor, callback_object) # type: ignore +def decode_json(message: bytes) -> JSONRPCMessage: + if orjson: + return orjson.loads(message) + return json.loads(message.decode("utf-8")) + + +# --- Internal --------------------------------------------------------------------------------------------------------- g_subprocesses: weakref.WeakSet[subprocess.Popen[bytes]] = weakref.WeakSet() @@ -343,8 +557,8 @@ def _start_subprocess( stderr: int, startupinfo: Any, env: dict[str, str], - cwd: str | None -) -> subprocess.Popen: + cwd: str | None, +) -> subprocess.Popen[bytes]: debug(f"starting {args} in {cwd or os.getcwd()}") process = subprocess.Popen( args=args, @@ -353,46 +567,30 @@ def _start_subprocess( stderr=stderr, startupinfo=startupinfo, env=env, - cwd=cwd) + cwd=cwd, + ) g_subprocesses.add(process) return process -def _await_client_connection(listener_socket: socket.socket) -> tuple[socket.socket, IO[bytes], IO[bytes]]: - with closing(listener_socket): - # Await one client connection (blocking!) - sock, _ = listener_socket.accept() - reader = sock.makefile('rwb') # type: ignore - writer = reader - return sock, reader, writer # type: ignore - +def _find_free_port() -> int: + with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(("", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] -def _start_subprocess_and_await_connection( - listener_socket: socket.socket, subprocess_starter: Callable[[], subprocess.Popen] -) -> tuple[subprocess.Popen, socket.socket, IO[bytes], IO[bytes]]: - process = None - # We need to be able to start the process while also awaiting a client connection. - def start_in_background() -> None: - nonlocal process - # Sleep for one second, because the listener socket needs to be in the "accept" state before starting the - # subprocess. This is hacky, and will get better when we can use asyncio. - time.sleep(1) - process = subprocess_starter() +def _add_and_resolve_port_variable(variables: dict[str, str], port: int | None) -> int: + if port is None: + port = _find_free_port() + variables["port"] = str(port) + return port - thread = threading.Thread(target=start_in_background) - thread.start() - sock, reader, writer = _await_client_connection(listener_socket) - thread.join() - assert process is not None - return process, sock, reader, writer # type: ignore - -def _connect_tcp(port: int) -> socket.socket | None: - start_time = time.time() - while time.time() - start_time < TCP_CONNECT_TIMEOUT: - try: - return socket.create_connection(('localhost', port)) - except ConnectionRefusedError: - pass - return None +def _join_thread(t: threading.Thread) -> None: + if t.ident == threading.current_thread().ident: + return + try: + t.join(2) + except TimeoutError as ex: + exception_log(f"failed to join {t.name} thread", ex) diff --git a/plugin/core/types.py b/plugin/core/types.py index 20e0fa7a1..d59e420d6 100644 --- a/plugin/core/types.py +++ b/plugin/core/types.py @@ -13,6 +13,10 @@ from .file_watcher import FileWatcherEventType from .logging import debug from .logging import set_debug_logging +from .transports import StdioTransportConfig +from .transports import TcpClientTransportConfig +from .transports import TcpServerTransportConfig +from .transports import TransportConfig from .url import filename_to_uri from .url import parse_uri from abc import ABC @@ -40,12 +44,10 @@ import fnmatch import os import posixpath -import socket import sublime import time import weakref -TCP_CONNECT_TIMEOUT = 5 # seconds FEATURES_TIMEOUT = 300 # milliseconds PANEL_FILE_REGEX = r"^(\S.*):$" @@ -723,26 +725,6 @@ def map_from_remote_to_local(self, uri: str) -> tuple[str, bool]: return _translate_path(uri, self._remote, self._local) -class TransportConfig: - __slots__ = ("name", "command", "tcp_port", "env", "listener_socket") - - def __init__( - self, - name: str, - command: list[str], - tcp_port: int | None, - env: dict[str, str], - listener_socket: socket.socket | None - ) -> None: - if not command and not tcp_port: - raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server') - self.name = name - self.command = command - self.tcp_port = tcp_port - self.env = env - self.listener_socket = listener_socket - - class DefaultViewStatusHandler(ViewStatusHandler): @override @@ -778,7 +760,7 @@ def __init__( enabled: bool = True, initialization_options: DottedDict | None = None, settings: DottedDict | None = None, - env: dict[str, str | list[str]] | None = None, + env: dict[str, str] | None = None, experimental_capabilities: dict[str, Any] | None = None, disabled_capabilities: DottedDict | None = None, file_watcher: FileWatcherConfig | None = None, @@ -1010,7 +992,7 @@ def from_config(cls, src_config: ClientConfig, override: dict[str, Any]) -> Clie all_settings={**src_config._all_settings, **override} # shallow merge ) - def resolve_transport_config(self, variables: dict[str, str]) -> TransportConfig: + def create_transport_config(self) -> TransportConfig: """ Build a :class:`TransportConfig` ready for starting the language server. @@ -1020,35 +1002,11 @@ def resolve_transport_config(self, variables: dict[str, str]) -> TransportConfig :param variables: Sublime Text variable substitution dict (e.g. from `window.extract_variables()`). A `"port"` key is added automatically when a TCP port is in use. """ - tcp_port: int | None = None - listener_socket: socket.socket | None = None if self.tcp_port is not None: - # < 0 means we're hosting a TCP server if self.tcp_port < 0: - # -1 means pick any free port - if self.tcp_port < -1: - tcp_port = -self.tcp_port - # Create a listener socket for incoming connections - listener_socket = _start_tcp_listener(tcp_port) - tcp_port = int(listener_socket.getsockname()[1]) - else: - tcp_port = _find_free_port() if self.tcp_port == 0 else self.tcp_port - if tcp_port is not None: - variables["port"] = str(tcp_port) - command = sublime.expand_variables(self.command, variables) - command = [os.path.expanduser(arg) for arg in command] - if tcp_port is not None: - # DEPRECATED -- replace {port} with $port or ${port} in your client config - command = [a.replace('{port}', str(tcp_port)) for a in command] - env = os.environ.copy() - for key, value in self.env.items(): - if isinstance(value, list): - value = os.path.pathsep.join(value) - if key == 'PATH': - env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key] - else: - env[key] = sublime.expand_variables(value, variables) - return TransportConfig(self.name, command, tcp_port, env, listener_socket) + return TcpServerTransportConfig(None if self.tcp_port == -1 else -self.tcp_port) + return TcpClientTransportConfig(None if self.tcp_port == 0 else self.tcp_port) + return StdioTransportConfig() def set_view_status_handler(self, handler: ViewStatusHandler) -> None: self._view_status_handler = handler @@ -1185,18 +1143,3 @@ def _read_priority_selector(config: sublime.Settings | dict[str, Any]) -> str: if isinstance(selector, str): return selector return "" - - -def _find_free_port() -> int: - with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - s.bind(('', 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - return s.getsockname()[1] - - -def _start_tcp_listener(tcp_port: int | None) -> socket.socket: - sock = socket.socket() - sock.bind(('localhost', tcp_port or 0)) - sock.settimeout(TCP_CONNECT_TIMEOUT) - sock.listen(1) - return sock diff --git a/plugin/core/windows.py b/plugin/core/windows.py index 6dbde9c14..720bb119f 100644 --- a/plugin/core/windows.py +++ b/plugin/core/windows.py @@ -32,7 +32,6 @@ from .settings import client_configs from .settings import LspSettingsChangeListener from .settings import userprefs -from .transports import create_transport from .types import ClientConfig from .types import matches_pattern from .types import sublime_pattern_to_glob @@ -277,8 +276,8 @@ def start_async(self, config: ClientConfig, initiating_view: sublime.View) -> No transport_cwd: str | None = cwd else: transport_cwd = workspace_folders[0].path if workspace_folders else None - transport_config = config.resolve_transport_config(variables) - transport = create_transport(transport_config, transport_cwd, session) + transport = config.create_transport_config().start( + config.command, config.env, transport_cwd, variables, session) if plugin_class: plugin_class.on_post_start(self._window, initiating_view, workspace_folders, config) config.set_view_status(initiating_view, "initialize") @@ -293,7 +292,7 @@ def start_async(self, config: ClientConfig, initiating_view: sublime.View) -> No message = (f'Failed to start {config.name} - disabling for this window for the duration of the current ' 'session.\nRe-enable by running "LSP: Enable Language Server In Project" from the Command ' f'Palette.\n\n--- Error: ---\n{e}') - exception_log(f"Unable to start subprocess for {config.name}", e) + exception_log(f"Unable to initialize language server for {config.name}", e) if isinstance(e, CalledProcessError): print("Server output:\n{}".format(e.output.decode('utf-8', 'replace'))) self._config_manager.disable_config(config.name, only_for_session=True) diff --git a/plugin/tooling.py b/plugin/tooling.py index 16dab20cc..8495d9dd9 100644 --- a/plugin/tooling.py +++ b/plugin/tooling.py @@ -4,9 +4,8 @@ from .core.css import css from .core.logging import debug from .core.registry import windows -from .core.transports import create_transport -from .core.transports import Transport from .core.transports import TransportCallbacks +from .core.transports import TransportWrapper from .core.types import Capabilities from .core.types import ClientConfig from .core.version import __version__ @@ -493,7 +492,7 @@ def __init__( on_close: Callable[[list[str], str, int], None] ) -> None: self._on_close = on_close - self._transport: Transport | None = None + self._transport: TransportWrapper | None = None self._resolved_command: list[str] = [] self._stderr_lines: list[str] = [] try: @@ -514,9 +513,9 @@ def __init__( cwd = plugin_class.on_pre_start(window, initiating_view, workspace_folders, config) if not cwd and workspace_folders: cwd = workspace_folders[0].path - transport_config = config.resolve_transport_config(variables) - self._resolved_command = transport_config.command - self._transport = create_transport(transport_config, cwd, self) + transport_config = config.create_transport_config() + self._transport = transport_config.start(config.command, config.env, cwd, variables, self) + self._resolved_command = self._transport.process_args sublime.set_timeout_async(self.force_close_transport, self.CLOSE_TIMEOUT_SEC * 1000) except Exception as ex: self.on_transport_close(-1, ex) diff --git a/sublime-package.json b/sublime-package.json index fb47336ce..a455c63af 100644 --- a/sublime-package.json +++ b/sublime-package.json @@ -281,19 +281,8 @@ "type": "object", "markdownDescription": "Specify environment variables to pass to the language server process on startup.", "additionalProperties": { - "oneOf": [ - { - "type": "string", - "markdownDescription": "The value for this environment variable." - }, - { - "type": "array", - "markdownDescription": "A list of values. The list will be joined into a string with your native file system's path separator. For example, `[\"a\", \"b\"]` will turn into `\"a;b\"` on Windows.", - "items": { - "type": "string" - } - } - ] + "type": "string", + "markdownDescription": "The value for this environment variable." } }, "ClientSelector": { @@ -349,9 +338,8 @@ }, "tcp_port": { "type": "integer", - "minimum": 0, "default": 0, - "markdownDescription": "When specified, the TCP port to use to connect to the language server process. If not specified, STDIO is used as the transport. When set to zero, a free TCP port is chosen. You can use that free TCP port number as a template variable, i.e. as `${tcp_port}` in the `\"command\"`." + "markdownDescription": "When set to a positive number bigger than 0, specifies the TCP port to use to connect to the language server process listening on the given port. When set to zero, a free TCP port is chosen. Chosen TCP port number can be accessed through a template variable, i.e. as `${port}` in the `\"command\"`.\n\nSet to a negative number to make the LSP client act as TCP server awaiting connection from the LSP server. Using `-1` opens a random port. To use a fixed port number, use `-X` as the value for `tcp_port`, where `X` is the desired (positive) port number.\n\nIf not specified, STDIO is used as the transport" }, "auto_complete_selector": { "$ref": "#/definitions/ClientAutoCompleteSelector" diff --git a/tests/test_configs.py b/tests/test_configs.py index 0f24d18e3..201bdb7ef 100644 --- a/tests/test_configs.py +++ b/tests/test_configs.py @@ -1,6 +1,7 @@ from __future__ import annotations from LSP.plugin.core.collections import DottedDict +from LSP.plugin.core.transports import TransportConfig from LSP.plugin.core.types import ClientConfig from LSP.plugin.core.views import get_uri_and_position_from_location from LSP.plugin.core.views import to_encoded_filename @@ -9,7 +10,6 @@ from os.path import pathsep from typing import Any from unittesting import DeferrableTestCase -import sublime import sys import unittest @@ -65,28 +65,11 @@ def test_transport_config_extends_env_path(self) -> None: } } config = read_client_config("pyls", settings) - transport_config = config.resolve_transport_config({}) + launch_config = TransportConfig.resolve_launch_config(config.command, config.env, {}) original_path = environ.copy()['PATH'] - resolved_path = transport_config.env['PATH'] + resolved_path = launch_config.env['PATH'] self.assertEqual(resolved_path, f'/a/b/{pathsep}{original_path}') - def test_list_in_environment(self) -> None: - settings = { - "command": ["pyls"], - "selector": "source.python", - "env": { - "FOO": ["C:/hello", "X:/there", "Y:/$foobar"], - "BAR": "baz" - } - } - config = read_client_config("pyls", settings) - resolved = config.resolve_transport_config({"foobar": "asdf"}) - if sublime.platform() == "windows": - self.assertEqual(resolved.env["FOO"], "C:/hello;X:/there;Y:/asdf") - else: - self.assertEqual(resolved.env["FOO"], "C:/hello:X:/there:Y:/asdf") - self.assertEqual(resolved.env["BAR"], "baz") - def test_disabled_capabilities(self) -> None: settings = { "command": ["pyls"], diff --git a/tests/test_protocol.py b/tests/test_protocol.py index a65b26b67..97d23cf1d 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -3,7 +3,8 @@ from LSP.plugin.core.protocol import Notification from LSP.plugin.core.protocol import Point from LSP.plugin.core.protocol import Request -from LSP.plugin.core.transports import JsonRpcProcessor +from LSP.plugin.core.transports import decode_json +from LSP.plugin.core.transports import encode_json from LSP.protocol import Position from LSP.protocol import Range import unittest @@ -26,9 +27,9 @@ def test_lsp_conversion(self) -> None: class EncodingTests(unittest.TestCase): def test_encode(self) -> None: - encoded = JsonRpcProcessor._encode({"text": "😃"}) + encoded = encode_json({"text": "😃"}) self.assertEqual(encoded, b'{"text":"\xF0\x9F\x98\x83"}') - decoded = JsonRpcProcessor._decode(encoded) + decoded = decode_json(encoded) self.assertEqual(decoded, {"text": "😃"})