Skip to content
93 changes: 70 additions & 23 deletions plugin/core/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from queue import Queue
import http
import json
import multiprocessing.connection
import os
import shutil
import socket
Expand Down Expand Up @@ -48,26 +49,33 @@ def on_stderr_message(self, message: str) -> None:

class AbstractProcessor(Generic[T]):

def write_data(self, writer: IO[bytes], data: T) -> None:
def write_data(self, writer: IO[bytes], data: T, is_node_ipc: bool) -> None:
raise NotImplementedError()

def read_data(self, reader: IO[bytes]) -> Optional[T]:
def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[T]:
raise NotImplementedError()


class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]):

def write_data(self, writer: IO[bytes], data: Dict[str, Any]) -> None:
def write_data(self, writer: IO[bytes], data: Dict[str, Any], is_node_ipc: bool) -> None:
body = self._encode(data)
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body))
if not is_node_ipc:
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body))
else:
writer.write(body + b"\n")

def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[Dict[str, Any]]:
if not is_node_ipc:
headers = http.client.parse_headers(reader) # type: ignore
try:
body = reader.read(int(headers.get("Content-Length")))
except TypeError:
# Expected error on process stopping. Stop the read loop.
raise StopLoopError()
else:
body = reader.readline()

def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]:
headers = http.client.parse_headers(reader) # type: ignore
try:
body = reader.read(int(headers.get("Content-Length")))
except TypeError:
# Expected error on process stopping. Stop the read loop.
raise StopLoopError()
try:
return self._decode(body)
except Exception as ex:
Expand All @@ -79,7 +87,6 @@ def _encode(data: Dict[str, Any]) -> bytes:
return json.dumps(
data,
ensure_ascii=False,
sort_keys=False,
check_circular=False,
separators=(',', ':')
).encode('utf-8')
Expand All @@ -93,7 +100,7 @@ class ProcessTransport(Transport[T]):

def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes],
writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T],
callback_object: TransportCallbacks[T]) -> None:
callback_object: TransportCallbacks[T], is_node_ipc: bool) -> None:
self._closed = False
self._process = process
self._socket = socket
Expand All @@ -105,6 +112,7 @@ def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket
self._writer_thread = threading.Thread(target=self._write_loop, name='{}-writer'.format(name))
self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name))
self._callback_object = weakref.ref(callback_object)
self._is_node_ipc = is_node_ipc
self._send_queue = Queue(0) # type: Queue[Union[T, None]]
self._reader_thread.start()
self._writer_thread.start()
Expand Down Expand Up @@ -137,7 +145,7 @@ def __del__(self) -> None:
def _read_loop(self) -> None:
try:
while self._reader:
payload = self._processor.read_data(self._reader)
payload = self._processor.read_data(self._reader, self._is_node_ipc)
if payload is None:
continue

Expand Down Expand Up @@ -190,8 +198,9 @@ def _write_loop(self) -> None:
d = self._send_queue.get()
if d is None:
break
self._processor.write_data(self._writer, d)
self._writer.flush()
self._processor.write_data(self._writer, d, self._is_node_ipc)
if not self._is_node_ipc:
self._writer.flush()
except (BrokenPipeError, AttributeError):
pass
except Exception as ex:
Expand Down Expand Up @@ -223,24 +232,57 @@ def _stderr_loop(self) -> None:
json_rpc_processor = JsonRpcProcessor()


class NodeIpcIO():
_buf = bytearray()
_lines = 0

def __init__(self, conn: multiprocessing.connection._ConnectionBase):
self._conn = conn

# https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392
def readline(self) -> bytearray:
while self._lines == 0:
chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore
self._buf += chunk
self._lines += chunk.count(b'\n')

self._lines -= 1
line, _, self._buf = self._buf.partition(b'\n')
return line

# https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376
def write(self, data: bytes) -> None:
while len(data):
n = self._conn._write(self._conn.fileno(), data) # type: ignore
data = data[n:]


def create_transport(config: TransportConfig, cwd: Optional[str],
callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]:
stderr = subprocess.PIPE
pass_fds = () # type: Union[Tuple[()], Tuple[int]]
if config.tcp_port is not None:
assert config.tcp_port is not None
if config.tcp_port < 0:
stdout = subprocess.PIPE
else:
stdout = subprocess.DEVNULL
stdin = subprocess.DEVNULL
else:
elif not config.node_ipc:
stdout = subprocess.PIPE
stdin = subprocess.PIPE
else:
stdout = subprocess.PIPE
stdin = subprocess.DEVNULL
stderr = subprocess.STDOUT
pass_fds = (config.node_ipc.child_conn.fileno(),)

startupinfo = _fixup_startup_args(config.command)
sock = None # type: Optional[socket.socket]
process = None # type: Optional[subprocess.Popen]

def start_subprocess() -> subprocess.Popen:
return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd)
return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, pass_fds)

if config.listener_socket:
assert isinstance(config.tcp_port, int) and config.tcp_port > 0
Expand All @@ -258,13 +300,16 @@ def start_subprocess() -> subprocess.Popen:
raise RuntimeError("Failed to connect on port {}".format(config.tcp_port))
reader = sock.makefile('rwb') # type: ignore
writer = reader
else:
elif not config.node_ipc:
reader = process.stdout # type: ignore
writer = process.stdin # type: ignore
else:
reader = writer = NodeIpcIO(config.node_ipc.parent_conn) # type: ignore
if not reader or not writer:
raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer))
return ProcessTransport(config.name, process, sock, reader, writer, process.stderr, json_rpc_processor,
callback_object)
stderr_reader = process.stdout if config.node_ipc else process.stderr
return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, json_rpc_processor,
callback_object, bool(config.node_ipc))


_subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen]
Expand Down Expand Up @@ -312,7 +357,8 @@ def _start_subprocess(
stderr: int,
startupinfo: Any,
env: Dict[str, str],
cwd: Optional[str]
cwd: Optional[str],
pass_fds: Union[Tuple[()], Tuple[int]]
) -> subprocess.Popen:
debug("starting {} in {}".format(args, cwd if cwd else os.getcwd()))
process = subprocess.Popen(
Expand All @@ -322,7 +368,8 @@ def _start_subprocess(
stderr=stderr,
startupinfo=startupinfo,
env=env,
cwd=cwd)
cwd=cwd,
pass_fds=pass_fds)
_subprocesses.add(process)
return process

Expand Down
22 changes: 19 additions & 3 deletions plugin/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from wcmatch.glob import BRACE
from wcmatch.glob import globmatch
from wcmatch.glob import GLOBSTAR
import collections
import contextlib
import fnmatch
import multiprocessing
import os
import posixpath
import socket
Expand Down Expand Up @@ -605,24 +607,34 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]:
return _translate_path(uri, self._remote, self._local)


NodeIpc = collections.namedtuple('NodeIpc', 'parent_conn,child_conn')


class TransportConfig:
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket")
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket", "node_ipc")

def __init__(
self,
name: str,
command: List[str],
tcp_port: Optional[int],
env: Dict[str, str],
listener_socket: Optional[socket.socket]
listener_socket: Optional[socket.socket],
node_ipc: Optional[NodeIpc]
) -> None:
if not command and not tcp_port:
raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server')
if node_ipc and (tcp_port or listener_socket):
raise ValueError(
'"tcp_port" and "listener_socket" can\'t be provided in "--node-ipc" mode; ' +
'cannot start a language server'
)
self.name = name
self.command = command
self.tcp_port = tcp_port
self.env = env
self.listener_socket = listener_socket
self.node_ipc = node_ipc


class ClientConfig:
Expand Down Expand Up @@ -790,7 +802,11 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig
env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key]
else:
env[key] = sublime.expand_variables(value, variables)
return TransportConfig(self.name, command, tcp_port, env, listener_socket)
node_ipc = None
if '--node-ipc' in command:
node_ipc = NodeIpc(*multiprocessing.Pipe())
env["NODE_CHANNEL_FD"] = str(node_ipc.child_conn.fileno())
return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc)

def set_view_status(self, view: sublime.View, message: str) -> None:
if sublime.load_settings("LSP.sublime-settings").get("show_view_status"):
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Tox (http://tox.testrun.org/) is a tool for running tests
# Tox (https://github.com/tox-dev/tox) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
Expand Down