Skip to content

Commit 9972ed9

Browse files
committed
feat: update server
1 parent f5fd7be commit 9972ed9

File tree

5 files changed

+123
-165
lines changed

5 files changed

+123
-165
lines changed

src/lsp_client/server/abc.py

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@
33
from abc import ABC, abstractmethod
44
from collections.abc import AsyncGenerator
55
from contextlib import asynccontextmanager
6+
from functools import cached_property
67
from typing import Self
78

9+
import anyio
810
import asyncer
11+
from anyio.abc import AnyByteReceiveStream, AnyByteSendStream
12+
from anyio.streams.buffered import BufferedByteReceiveStream
913
from attrs import define, field
1014
from loguru import logger
1115

1216
from lsp_client.jsonrpc.channel import ResponseTable, response_channel
17+
from lsp_client.jsonrpc.parse import read_raw_package, write_raw_package
1318
from lsp_client.jsonrpc.types import (
1419
RawNotification,
1520
RawPackage,
@@ -27,51 +32,67 @@ class Server(ABC):
2732

2833
_resp_table: ResponseTable = field(factory=ResponseTable, init=False)
2934

35+
@property
36+
@abstractmethod
37+
def send_stream(self) -> AnyByteSendStream:
38+
"""Stream for sending data to the server."""
39+
40+
@property
41+
@abstractmethod
42+
def receive_stream(self) -> AnyByteReceiveStream:
43+
"""Stream for receiving data from the server."""
44+
45+
@cached_property
46+
def _buffered_receive_stream(self) -> BufferedByteReceiveStream:
47+
return BufferedByteReceiveStream(self.receive_stream)
48+
3049
@abstractmethod
3150
async def check_availability(self) -> None:
3251
"""Check if the server runtime is available."""
3352

34-
@abstractmethod
53+
async def kill(self) -> None:
54+
await self.receive_stream.aclose()
55+
3556
async def send(self, package: RawPackage) -> None:
3657
"""Send a package to the runtime."""
58+
await write_raw_package(self.send_stream, package)
59+
logger.debug("Package sent: {}", package)
3760

38-
@abstractmethod
3961
async def receive(self) -> RawPackage | None:
4062
"""Receive a package from the runtime."""
4163

42-
@abstractmethod
43-
async def kill(self) -> None:
44-
"""Kill the runtime process."""
45-
46-
async def _dispatch(self, sender: Sender[ServerRequest] | None) -> None:
47-
if not sender:
48-
logger.warning(
49-
"No ServerRequest sender provided, all server requests and notifications will be ignored."
50-
)
51-
52-
async def handle(package: RawPackage) -> None:
53-
match package:
54-
case {"result": _, "id": id} | {"error": _, "id": id}:
55-
self._resp_table.send(id, package) # ty: ignore[invalid-argument-type]
56-
case {"id": id, "method": _}:
57-
if not sender:
58-
raise RuntimeError(
59-
"Received a server request without a sender provided."
60-
)
61-
62-
tx, rx = response_channel.create()
63-
await sender.send((package, tx)) # ty: ignore[invalid-argument-type]
64-
resp = await rx.receive()
65-
await self.send(resp)
66-
case {"method": _}:
67-
if not sender:
68-
return
69-
70-
await sender.send(package) # ty: ignore[invalid-argument-type]
71-
64+
try:
65+
package = await read_raw_package(self._buffered_receive_stream)
66+
logger.debug("Received package: {}", package)
67+
return package
68+
except (anyio.EndOfStream, anyio.IncompleteRead, anyio.ClosedResourceError):
69+
logger.debug("Stream closed")
70+
return None
71+
72+
async def iter_receive(self) -> AsyncGenerator[RawPackage]:
73+
while package := await self.receive():
74+
yield package
75+
76+
async def _handle_package(
77+
self, sender: Sender[ServerRequest], package: RawPackage
78+
) -> None:
79+
match package:
80+
case {"result": _, "id": id} | {"error": _, "id": id} as resp:
81+
self._resp_table.send(id, resp)
82+
case {"id": id, "method": _} as server_req:
83+
tx, rx = response_channel.create()
84+
await sender.send((server_req, tx))
85+
resp = await rx.receive()
86+
await self.send(resp)
87+
case {"method": _} as noti:
88+
if not sender:
89+
return
90+
await sender.send(noti)
91+
92+
async def _dispatch(self, sender: Sender[ServerRequest]) -> None:
7293
async with asyncer.create_task_group() as tg:
7394
while package := await self.receive():
74-
tg.soonify(handle)(package)
95+
tg.soonify(self._handle_package)(sender, package)
7596

7697
async def request(self, request: RawRequest) -> RawResponsePackage:
7798
await self.send(request)
@@ -80,12 +101,11 @@ async def request(self, request: RawRequest) -> RawResponsePackage:
80101
async def notify(self, notification: RawNotification) -> None:
81102
await self.send(notification)
82103

83-
@abstractmethod
84104
@asynccontextmanager
85-
def run(
86-
self,
87-
workspace: Workspace,
88-
*,
89-
sender: Sender[ServerRequest] | None = None,
105+
async def run(
106+
self, workspace: Workspace, sender: Sender[ServerRequest]
90107
) -> AsyncGenerator[Self]:
91108
"""Run the server."""
109+
async with asyncer.create_task_group() as tg:
110+
tg.soonify(self._dispatch)(sender)
111+
yield self

src/lsp_client/server/container.py

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
from typing import Literal, Self, final, override
88

99
import anyio
10+
from anyio.abc import AnyByteReceiveStream, AnyByteSendStream
1011
from attrs import Factory, define, field
1112
from loguru import logger
1213

13-
from lsp_client.jsonrpc.parse import RawPackage
1414
from lsp_client.server.types import ServerRequest
1515
from lsp_client.utils.channel import Sender
1616
from lsp_client.utils.workspace import Workspace
@@ -147,6 +147,34 @@ class ContainerServer(Server):
147147

148148
_local: LocalServer = field(init=False)
149149

150+
@property
151+
@override
152+
def send_stream(self) -> AnyByteSendStream:
153+
return self._local.send_stream
154+
155+
@property
156+
@override
157+
def receive_stream(self) -> AnyByteReceiveStream:
158+
return self._local.receive_stream
159+
160+
@override
161+
async def kill(self) -> None:
162+
await self._local.kill()
163+
164+
@override
165+
async def check_availability(self) -> None:
166+
try:
167+
await anyio.run_process(
168+
[self.backend, "pull", self.image],
169+
stdout=subprocess.DEVNULL,
170+
stderr=subprocess.DEVNULL,
171+
)
172+
except anyio.ProcessError as e:
173+
raise RuntimeError(
174+
f"Container backend '{self.backend}' is not available or image '{self.image}' "
175+
"could not be pulled."
176+
) from e
177+
150178
def format_args(self, workspace: Workspace) -> list[str]:
151179
args = ["run", "--rm", "-i"]
152180

@@ -183,38 +211,9 @@ def format_args(self, workspace: Workspace) -> list[str]:
183211

184212
return args
185213

186-
@override
187-
async def send(self, package: RawPackage) -> None:
188-
await self._local.send(package)
189-
190-
@override
191-
async def receive(self) -> RawPackage | None:
192-
return await self._local.receive()
193-
194-
@override
195-
async def kill(self) -> None:
196-
await self._local.kill()
197-
198-
@override
199-
async def check_availability(self) -> None:
200-
try:
201-
await anyio.run_process(
202-
[self.backend, "pull", self.image],
203-
stdout=subprocess.DEVNULL,
204-
stderr=subprocess.DEVNULL,
205-
)
206-
except anyio.ProcessError as e:
207-
raise RuntimeError(
208-
f"Container backend '{self.backend}' is not available or image '{self.image}' "
209-
"could not be pulled."
210-
) from e
211-
212214
@asynccontextmanager
213215
async def run(
214-
self,
215-
workspace: Workspace,
216-
*,
217-
sender: Sender[ServerRequest] | None = None,
216+
self, workspace: Workspace, sender: Sender[ServerRequest]
218217
) -> AsyncGenerator[Self]:
219218
args = self.format_args(workspace)
220219
logger.debug("Running docker runtime with command: {}", args)

src/lsp_client/server/default.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,12 @@
11
from __future__ import annotations
22

3-
from collections.abc import AsyncIterator
4-
from contextlib import suppress
5-
63
from attrs import frozen
74

8-
from .abc import Server
95
from .container import ContainerServer
10-
from .error import ServerRuntimeError
116
from .local import LocalServer
12-
from .types import ServerType
137

148

159
@frozen
1610
class DefaultServers:
1711
local: LocalServer
1812
container: ContainerServer
19-
20-
async def iter_candidate(
21-
self,
22-
*,
23-
server: Server | ServerType = "local",
24-
) -> AsyncIterator[Server]:
25-
"""
26-
Server candidates in order of priority:
27-
1. User-provided server
28-
2. Local server (if available)
29-
3. Containerized server
30-
4. Local server with auto-install (if enabled)
31-
"""
32-
33-
match server:
34-
case "container":
35-
yield self.container
36-
case "local":
37-
yield self.local
38-
case Server() as server:
39-
yield server
40-
41-
with suppress(ServerRuntimeError):
42-
await self.local.check_availability()
43-
yield self.local
44-
45-
yield self.container
46-
yield self.local

src/lsp_client/server/local.py

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,16 @@
22

33
from collections.abc import AsyncGenerator, Sequence
44
from contextlib import asynccontextmanager
5-
from functools import cached_property
65
from pathlib import Path
76
from typing import Protocol, Self, final, override
87

98
import aioshutil
109
import anyio
11-
import asyncer
12-
from anyio.abc import AnyByteSendStream, Process
13-
from anyio.streams.buffered import BufferedByteReceiveStream
10+
from anyio.abc import AnyByteReceiveStream, AnyByteSendStream, Process
1411
from attrs import Factory, define, field
1512
from loguru import logger
1613

1714
from lsp_client.env import disable_auto_installation
18-
from lsp_client.jsonrpc.parse import read_raw_package, write_raw_package
19-
from lsp_client.jsonrpc.types import RawPackage
2015
from lsp_client.server.types import ServerRequest
2116
from lsp_client.utils.channel import Sender
2217
from lsp_client.utils.workspace import Workspace
@@ -43,38 +38,25 @@ class LocalServer(Server):
4338

4439
_process: Process = field(init=False, default=None)
4540

46-
@cached_property
47-
def stdin(self) -> AnyByteSendStream:
41+
@property
42+
@override
43+
def send_stream(self) -> AnyByteSendStream:
4844
stdin = self._process.stdin
4945
assert stdin, "Process stdin is not available"
5046
return stdin
5147

52-
@cached_property
53-
def stdout(self) -> BufferedByteReceiveStream:
48+
@property
49+
@override
50+
def receive_stream(self) -> AnyByteReceiveStream:
5451
stdout = self._process.stdout
5552
assert stdout, "Process stdout is not available"
56-
return BufferedByteReceiveStream(stdout)
53+
return stdout
5754

58-
@cached_property
59-
def stderr(self) -> BufferedByteReceiveStream:
55+
@property
56+
def stderr(self) -> AnyByteReceiveStream:
6057
stderr = self._process.stderr
6158
assert stderr, "Process stderr is not available"
62-
return BufferedByteReceiveStream(stderr)
63-
64-
@override
65-
async def send(self, package: RawPackage) -> None:
66-
await write_raw_package(self.stdin, package)
67-
logger.debug("Package sent: {}", package)
68-
69-
@override
70-
async def receive(self) -> RawPackage | None:
71-
try:
72-
package = await read_raw_package(self.stdout)
73-
logger.debug("Received package: {}", package)
74-
return package
75-
except (anyio.EndOfStream, anyio.IncompleteRead, anyio.ClosedResourceError):
76-
logger.debug("Process stdout closed")
77-
return None
59+
return stderr
7860

7961
@override
8062
async def kill(self) -> None:
@@ -130,14 +112,10 @@ async def run_process(self, workspace: Workspace) -> AsyncGenerator[None]:
130112

131113
@asynccontextmanager
132114
async def run(
133-
self,
134-
workspace: Workspace,
135-
*,
136-
sender: Sender[ServerRequest] | None = None,
115+
self, workspace: Workspace, sender: Sender[ServerRequest]
137116
) -> AsyncGenerator[Self]:
138117
async with (
139118
self.run_process(workspace),
140-
asyncer.create_task_group() as tg,
119+
super().run(workspace, sender=sender),
141120
):
142-
tg.soonify(self._dispatch)(sender)
143121
yield self

0 commit comments

Comments
 (0)