-
Notifications
You must be signed in to change notification settings - Fork 185
Add --node-ipc support #2015
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add --node-ipc support #2015
Changes from 7 commits
f14e088
6ab3fca
ca34372
bdcd755
47fde0d
93157e7
e6ae057
fd31756
45861a0
3dc8daa
7ad90c4
06365a3
d2a7249
9082544
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,15 @@ | |
from contextlib import closing | ||
from functools import partial | ||
from queue import Queue | ||
import http | ||
import http.client | ||
import json | ||
import multiprocessing.connection | ||
import os | ||
import shutil | ||
import socket | ||
import sublime | ||
import subprocess | ||
import sys | ||
import threading | ||
import time | ||
import weakref | ||
|
@@ -48,18 +50,35 @@ def on_stderr_message(self, message: str) -> None: | |
|
||
class AbstractProcessor(Generic[T]): | ||
|
||
def write_data(self, writer: IO[bytes], data: T) -> None: | ||
def write_data(self, writer: Any, data: T) -> None: | ||
raise NotImplementedError() | ||
|
||
def read_data(self, reader: IO[bytes]) -> Optional[T]: | ||
def read_data(self, reader: Any) -> Optional[T]: | ||
raise NotImplementedError() | ||
|
||
|
||
class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): | ||
def encode_payload(data: Dict[str, Any]) -> bytes: | ||
return json.dumps( | ||
data, | ||
ensure_ascii=False, | ||
check_circular=False, | ||
separators=(',', ':') | ||
).encode('utf-8') | ||
|
||
|
||
def decode_payload(message: bytes) -> Optional[Dict[str, Any]]: | ||
try: | ||
return json.loads(message.decode('utf-8')) | ||
except Exception as ex: | ||
exception_log("JSON decode error", ex) | ||
return None | ||
|
||
|
||
class StandardProcessor(AbstractProcessor[Dict[str, Any]]): | ||
def write_data(self, writer: IO[bytes], data: Dict[str, Any]) -> None: | ||
body = self._encode(data) | ||
body = encode_payload(data) | ||
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) | ||
writer.flush() | ||
|
||
|
||
def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]: | ||
headers = http.client.parse_headers(reader) # type: ignore | ||
|
@@ -68,31 +87,38 @@ def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]: | |
except TypeError: | ||
# Expected error on process stopping. Stop the read loop. | ||
raise StopLoopError() | ||
try: | ||
return self._decode(body) | ||
except Exception as ex: | ||
exception_log("JSON decode error", ex) | ||
return None | ||
|
||
@staticmethod | ||
def _encode(data: Dict[str, Any]) -> bytes: | ||
return json.dumps( | ||
data, | ||
ensure_ascii=False, | ||
sort_keys=False, | ||
check_circular=False, | ||
separators=(',', ':') | ||
).encode('utf-8') | ||
|
||
@staticmethod | ||
def _decode(message: bytes) -> Dict[str, Any]: | ||
return json.loads(message.decode('utf-8')) | ||
return decode_payload(body) | ||
|
||
|
||
class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]): | ||
_buf = bytearray() | ||
_lines = 0 | ||
|
||
|
||
def write_data(self, connection: multiprocessing.connection._ConnectionBase, data: Dict[str, Any]) -> None: | ||
|
||
body = encode_payload(data) + b"\n" | ||
while len(body): | ||
n = connection._write(connection.fileno(), body) # type: ignore | ||
body = body[n:] | ||
|
||
def read_data(self, connection: multiprocessing.connection._ConnectionBase) -> Optional[Dict[str, Any]]: | ||
while self._lines == 0: | ||
chunk = connection._read(connection.fileno(), 65536) # type: ignore | ||
|
||
if len(chunk) == 0: | ||
# EOF reached: https://docs.python.org/3/library/os.html#os.read | ||
raise StopLoopError() | ||
|
||
self._buf += chunk | ||
self._lines += chunk.count(b'\n') | ||
|
||
self._lines -= 1 | ||
message, _, self._buf = self._buf.partition(b'\n') | ||
return decode_payload(message) | ||
|
||
|
||
class ProcessTransport(Transport[T]): | ||
|
||
def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes], | ||
writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], | ||
def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: Any, | ||
writer: Any, stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], | ||
|
||
callback_object: TransportCallbacks[T]) -> None: | ||
self._closed = False | ||
self._process = process | ||
|
@@ -191,7 +217,6 @@ def _write_loop(self) -> None: | |
if d is None: | ||
break | ||
self._processor.write_data(self._writer, d) | ||
self._writer.flush() | ||
except (BrokenPipeError, AttributeError): | ||
pass | ||
except Exception as ex: | ||
|
@@ -220,27 +245,37 @@ def _stderr_loop(self) -> None: | |
|
||
|
||
# Can be a singleton since it doesn't hold any state. | ||
json_rpc_processor = JsonRpcProcessor() | ||
standard_processor = StandardProcessor() | ||
node_ipc_processor = NodeIpcProcessor() | ||
|
||
|
||
|
||
def create_transport(config: TransportConfig, cwd: Optional[str], | ||
callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]: | ||
stderr = subprocess.PIPE | ||
# https://github.com/python/cpython/blob/17bf6b4671ec02d80ad29b278639d5307baddeb5/Lib/subprocess.py#L706 | ||
close_fds = True if sys.version_info >= (3, 8, 0) else subprocess._PLATFORM_DEFAULT_CLOSE_FDS # type: ignore | ||
|
||
if config.tcp_port is not None: | ||
assert config.tcp_port is not None | ||
if config.tcp_port < 0: | ||
stdout = subprocess.PIPE | ||
else: | ||
stdout = subprocess.DEVNULL | ||
stdin = subprocess.DEVNULL | ||
elif config.node_ipc: | ||
stdout = subprocess.PIPE | ||
stdin = subprocess.DEVNULL | ||
stderr = subprocess.STDOUT | ||
close_fds = False | ||
else: | ||
stdout = subprocess.PIPE | ||
stdin = subprocess.PIPE | ||
|
||
startupinfo = _fixup_startup_args(config.command) | ||
sock = None # type: Optional[socket.socket] | ||
process = None # type: Optional[subprocess.Popen] | ||
|
||
def start_subprocess() -> subprocess.Popen: | ||
return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd) | ||
return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, close_fds) | ||
|
||
if config.listener_socket: | ||
assert isinstance(config.tcp_port, int) and config.tcp_port > 0 | ||
|
@@ -256,14 +291,24 @@ def start_subprocess() -> subprocess.Popen: | |
sock = _connect_tcp(config.tcp_port) | ||
if sock is None: | ||
raise RuntimeError("Failed to connect on port {}".format(config.tcp_port)) | ||
reader = sock.makefile('rwb') # type: ignore | ||
writer = reader | ||
reader = writer = sock.makefile('rwb') | ||
elif config.node_ipc: | ||
reader = writer = config.node_ipc.parent_connection # type: ignore | ||
else: | ||
reader = process.stdout # type: ignore | ||
writer = process.stdin # type: ignore | ||
if not process.stdout or not process.stdin: | ||
predragnikolic marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
raise RuntimeError( | ||
'Failed initializing transport: reader: {}, writer: {}' | ||
.format(process.stdout, process.stdin) | ||
) | ||
reader = process.stdout | ||
writer = process.stdin | ||
stderr_reader = process.stdout if config.node_ipc else process.stderr | ||
predragnikolic marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
processor = node_ipc_processor if config.node_ipc else standard_processor | ||
|
||
if not reader or not writer: | ||
raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) | ||
return ProcessTransport(config.name, process, sock, reader, writer, process.stderr, json_rpc_processor, | ||
|
||
return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, processor, | ||
callback_object) | ||
|
||
|
||
|
@@ -312,7 +357,8 @@ def _start_subprocess( | |
stderr: int, | ||
startupinfo: Any, | ||
env: Dict[str, str], | ||
cwd: Optional[str] | ||
cwd: Optional[str], | ||
close_fds: bool | ||
) -> subprocess.Popen: | ||
debug("starting {} in {}".format(args, cwd if cwd else os.getcwd())) | ||
process = subprocess.Popen( | ||
|
@@ -322,7 +368,8 @@ def _start_subprocess( | |
stderr=stderr, | ||
startupinfo=startupinfo, | ||
env=env, | ||
cwd=cwd) | ||
cwd=cwd, | ||
close_fds=close_fds) | ||
_subprocesses.add(process) | ||
return process | ||
|
||
|
@@ -356,8 +403,7 @@ def start_in_background(d: _SubprocessData) -> None: | |
# Await one client connection (blocking!) | ||
sock, _ = listener_socket.accept() | ||
thread.join() | ||
reader = sock.makefile('rwb') # type: IO[bytes] | ||
writer = reader | ||
reader = writer = sock.makefile('rwb') | ||
assert data.process | ||
return data.process, sock, reader, writer | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
from wcmatch.glob import GLOBSTAR | ||
import contextlib | ||
import fnmatch | ||
import multiprocessing | ||
import os | ||
import posixpath | ||
import socket | ||
|
@@ -605,16 +606,24 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]: | |
return _translate_path(uri, self._remote, self._local) | ||
|
||
|
||
class NodeIpcPipe(): | ||
def __init__(self) -> None: | ||
parent_connection, child_connection = multiprocessing.Pipe() | ||
self.parent_connection = parent_connection | ||
self.child_connection = child_connection | ||
|
||
|
||
class TransportConfig: | ||
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket") | ||
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket", "node_ipc") | ||
|
||
def __init__( | ||
self, | ||
name: str, | ||
command: List[str], | ||
tcp_port: Optional[int], | ||
env: Dict[str, str], | ||
listener_socket: Optional[socket.socket] | ||
listener_socket: Optional[socket.socket], | ||
node_ipc: Optional[NodeIpcPipe] | ||
) -> None: | ||
if not command and not tcp_port: | ||
raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server') | ||
|
@@ -623,6 +632,7 @@ def __init__( | |
self.tcp_port = tcp_port | ||
self.env = env | ||
self.listener_socket = listener_socket | ||
self.node_ipc = node_ipc | ||
|
||
|
||
class ClientConfig: | ||
|
@@ -632,6 +642,7 @@ def __init__(self, | |
priority_selector: Optional[str] = None, | ||
schemes: Optional[List[str]] = None, | ||
command: Optional[List[str]] = None, | ||
use_node_ipc: bool = False, | ||
binary_args: Optional[List[str]] = None, # DEPRECATED | ||
tcp_port: Optional[int] = None, | ||
auto_complete_selector: Optional[str] = None, | ||
|
@@ -656,6 +667,7 @@ def __init__(self, | |
else: | ||
assert isinstance(binary_args, list) | ||
self.command = binary_args | ||
self.use_node_ipc = use_node_ipc | ||
self.tcp_port = tcp_port | ||
self.auto_complete_selector = auto_complete_selector | ||
self.enabled = enabled | ||
|
@@ -689,9 +701,10 @@ def from_sublime_settings(cls, name: str, s: sublime.Settings, file: str) -> "Cl | |
priority_selector=_read_priority_selector(s), | ||
schemes=s.get("schemes"), | ||
command=read_list_setting(s, "command", []), | ||
use_node_ipc=bool(s.get("use_node_ipc", False)), | ||
tcp_port=s.get("tcp_port"), | ||
auto_complete_selector=s.get("auto_complete_selector"), | ||
# Default to True, because an LSP plugin is enabled iff it is enabled as a Sublime package. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "iff" is short for "if and only if" :) but keep the change, no one really cares. |
||
# Default to True, because an LSP plugin is enabled if it is enabled as a Sublime package. | ||
enabled=bool(s.get("enabled", True)), | ||
init_options=init_options, | ||
settings=settings, | ||
|
@@ -719,6 +732,7 @@ def from_dict(cls, name: str, d: Dict[str, Any]) -> "ClientConfig": | |
priority_selector=_read_priority_selector(d), | ||
schemes=schemes, | ||
command=d.get("command", []), | ||
use_node_ipc=d.get("use_node_ipc", False), | ||
tcp_port=d.get("tcp_port"), | ||
auto_complete_selector=d.get("auto_complete_selector"), | ||
enabled=d.get("enabled", False), | ||
|
@@ -746,6 +760,7 @@ def from_config(cls, src_config: "ClientConfig", override: Dict[str, Any]) -> "C | |
priority_selector=_read_priority_selector(override) or src_config.priority_selector, | ||
schemes=override.get("schemes", src_config.schemes), | ||
command=override.get("command", src_config.command), | ||
use_node_ipc=override.get("use_node_ipc", src_config.use_node_ipc), | ||
tcp_port=override.get("tcp_port", src_config.tcp_port), | ||
auto_complete_selector=override.get("auto_complete_selector", src_config.auto_complete_selector), | ||
enabled=override.get("enabled", src_config.enabled), | ||
|
@@ -790,7 +805,11 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig | |
env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key] | ||
else: | ||
env[key] = sublime.expand_variables(value, variables) | ||
return TransportConfig(self.name, command, tcp_port, env, listener_socket) | ||
node_ipc = None | ||
if self.use_node_ipc: | ||
node_ipc = NodeIpcPipe() | ||
env["NODE_CHANNEL_FD"] = str(node_ipc.child_connection.fileno()) | ||
return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc) | ||
|
||
def set_view_status(self, view: sublime.View, message: str) -> None: | ||
if sublime.load_settings("LSP.sublime-settings").get("show_view_status"): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fits one line now :)