Skip to content

Commit 38509ef

Browse files
committed
update lsp-server
1 parent b73c1fc commit 38509ef

File tree

9 files changed

+408
-366
lines changed

9 files changed

+408
-366
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ emmylua_parser_desc = { path = "crates/emmylua_parser_desc", version = "0.17.0"
1313
emmylua_diagnostic_macro = { path = "crates/emmylua_diagnostic_macro", version = "0.5.0" }
1414

1515
# external
16-
lsp-server = "0.7.7"
16+
lsp-server = "0.7.9"
1717
tokio = { version = "1.46", features = ["full"] }
1818
serde = { version = "1.0.219", features = ["derive"] }
1919
serde_json = "1.0.141"

crates/emmylua_ls/src/lib.rs

Lines changed: 2 additions & 363 deletions
Original file line numberDiff line numberDiff line change
@@ -3,374 +3,13 @@ mod context;
33
mod handlers;
44
mod logger;
55
mod meta_text;
6+
mod server;
67
mod util;
78

8-
use crate::handlers::{
9-
initialized_handler, on_notification_handler, on_request_handler, on_response_handler,
10-
};
119
pub use clap::Parser;
1210
pub use cmd_args::*;
13-
use handlers::server_capabilities;
14-
use lsp_server::{Connection, Message, Response};
15-
use lsp_types::InitializeParams;
16-
use std::sync::Arc;
17-
use std::{env, error::Error};
18-
use tokio::sync::{mpsc, oneshot};
11+
pub use server::{AsyncConnection, ExitError, run_ls};
1912

2013
#[macro_use]
2114
extern crate rust_i18n;
2215
rust_i18n::i18n!("./locales", fallback = "en");
23-
24-
const CRATE_NAME: &str = env!("CARGO_PKG_NAME");
25-
const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION");
26-
27-
/// Async wrapper for LSP Connection with tokio support
28-
pub struct AsyncConnection {
29-
connection: Arc<Connection>,
30-
receiver: mpsc::UnboundedReceiver<Message>,
31-
_receiver_task: tokio::task::JoinHandle<()>,
32-
}
33-
34-
impl AsyncConnection {
35-
/// Create async version from sync Connection
36-
pub fn from_sync(connection: Connection) -> Self {
37-
let (tx, rx) = mpsc::unbounded_channel();
38-
let connection = Arc::new(connection);
39-
40-
// Spawn blocking task to convert sync receiver to async
41-
let connection_clone = connection.clone();
42-
let receiver_task = tokio::task::spawn_blocking(move || {
43-
for msg in &connection_clone.receiver {
44-
if tx.send(msg).is_err() {
45-
break; // Receiver closed
46-
}
47-
}
48-
});
49-
50-
Self {
51-
connection,
52-
receiver: rx,
53-
_receiver_task: receiver_task,
54-
}
55-
}
56-
57-
/// Receive message asynchronously
58-
pub async fn recv(&mut self) -> Option<Message> {
59-
self.receiver.recv().await
60-
}
61-
62-
/// Send message to client
63-
pub fn send(&self, msg: Message) -> Result<(), Box<dyn Error + Send + Sync>> {
64-
self.connection
65-
.sender
66-
.send(msg)
67-
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
68-
}
69-
70-
/// Handle shutdown request
71-
pub async fn handle_shutdown(
72-
&mut self,
73-
req: &lsp_server::Request,
74-
) -> Result<bool, Box<dyn Error + Send + Sync>> {
75-
if req.method != "shutdown" {
76-
return Ok(false);
77-
}
78-
let resp = Response::new_ok(req.id.clone(), ());
79-
let _ = self.connection.sender.send(resp.into());
80-
match tokio::time::timeout(std::time::Duration::from_secs(30), self.receiver.recv()).await {
81-
Ok(Some(Message::Notification(n))) if n.method == "exit" => (),
82-
Ok(Some(msg)) => {
83-
return Err(Box::new(ExitError(format!(
84-
"unexpected message during shutdown: {msg:?}"
85-
))));
86-
}
87-
Ok(None) => {
88-
return Err(Box::new(ExitError(
89-
"channel closed while waiting for exit notification".to_owned(),
90-
)));
91-
}
92-
Err(_) => {
93-
return Err(Box::new(ExitError(
94-
"timed out waiting for exit notification".to_owned(),
95-
)));
96-
}
97-
}
98-
Ok(true)
99-
}
100-
}
101-
102-
pub struct ExitError(pub String);
103-
104-
impl std::fmt::Debug for ExitError {
105-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106-
write!(f, "ExitError: {}", self.0)
107-
}
108-
}
109-
110-
impl std::fmt::Display for ExitError {
111-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112-
write!(f, "{}", self.0)
113-
}
114-
}
115-
116-
impl Error for ExitError {}
117-
118-
/// Server initialization and message processing state
119-
struct ServerMessageProcessor {
120-
initialization_complete: bool,
121-
pending_messages: Vec<Message>,
122-
init_rx: oneshot::Receiver<()>,
123-
}
124-
125-
impl ServerMessageProcessor {
126-
fn new(init_rx: oneshot::Receiver<()>) -> Self {
127-
Self {
128-
initialization_complete: false,
129-
pending_messages: Vec::new(),
130-
init_rx,
131-
}
132-
}
133-
134-
/// Check if message can be processed during initialization
135-
fn can_process_during_init(&self, msg: &Message) -> bool {
136-
match msg {
137-
// Allow all responses (including configuration responses)
138-
Message::Response(_) => true,
139-
// Allow specific notifications
140-
Message::Notification(notify) => {
141-
matches!(
142-
notify.method.as_str(),
143-
"workspace/didChangeConfiguration" | "$/cancelRequest" | "initialized"
144-
)
145-
}
146-
// Don't process other requests during initialization
147-
Message::Request(_) => false,
148-
}
149-
}
150-
151-
/// Process message during normal operation (after initialization)
152-
async fn process_message(
153-
&mut self,
154-
msg: Message,
155-
connection: &mut AsyncConnection,
156-
server_context: &mut context::ServerContext,
157-
) -> Result<bool, Box<dyn Error + Sync + Send>> {
158-
// During normal operation, process all messages
159-
self.handle_message(msg, connection, server_context).await
160-
}
161-
162-
/// Check if initialization is complete and process pending messages
163-
fn check_initialization_complete(&mut self) -> Result<bool, Box<dyn Error + Sync + Send>> {
164-
if !self.initialization_complete {
165-
match self.init_rx.try_recv() {
166-
Ok(_) => {
167-
self.initialization_complete = true;
168-
return Ok(true); // Signal to process pending messages
169-
}
170-
Err(oneshot::error::TryRecvError::Empty) => {
171-
// Still initializing
172-
}
173-
Err(oneshot::error::TryRecvError::Closed) => {
174-
// Initialization task closed unexpectedly
175-
self.initialization_complete = true;
176-
return Ok(true); // Signal to process pending messages
177-
}
178-
}
179-
}
180-
Ok(false)
181-
}
182-
183-
/// Process all pending messages after initialization
184-
async fn process_pending_messages(
185-
&mut self,
186-
connection: &mut AsyncConnection,
187-
server_context: &mut context::ServerContext,
188-
) -> Result<bool, Box<dyn Error + Sync + Send>> {
189-
let messages = std::mem::take(&mut self.pending_messages);
190-
for msg in messages {
191-
if self.handle_message(msg, connection, server_context).await? {
192-
return Ok(true); // Shutdown requested
193-
}
194-
}
195-
Ok(false)
196-
}
197-
198-
/// Handle individual message
199-
async fn handle_message(
200-
&self,
201-
msg: Message,
202-
connection: &mut AsyncConnection,
203-
server_context: &mut context::ServerContext,
204-
) -> Result<bool, Box<dyn Error + Sync + Send>> {
205-
match msg {
206-
Message::Request(req) => {
207-
if connection.handle_shutdown(&req).await? {
208-
server_context.close().await;
209-
return Ok(true); // Shutdown requested
210-
}
211-
on_request_handler(req, server_context).await?;
212-
}
213-
Message::Notification(notify) => {
214-
on_notification_handler(notify, server_context).await?;
215-
}
216-
Message::Response(response) => {
217-
on_response_handler(response, server_context).await?;
218-
}
219-
}
220-
Ok(false)
221-
}
222-
}
223-
224-
#[allow(unused)]
225-
pub async fn run_ls(cmd_args: CmdArgs) -> Result<(), Box<dyn Error + Sync + Send>> {
226-
let (connection, threads) = match cmd_args.communication {
227-
cmd_args::Communication::Stdio => Connection::stdio(),
228-
cmd_args::Communication::Tcp => {
229-
let port = cmd_args.port;
230-
let ip = cmd_args.ip.clone();
231-
let addr = (ip.as_str(), port);
232-
Connection::listen(addr).unwrap()
233-
}
234-
};
235-
236-
let (id, params) = connection.initialize_start()?;
237-
let initialization_params: InitializeParams = serde_json::from_value(params).unwrap();
238-
let server_capbilities = server_capabilities(&initialization_params.capabilities);
239-
let initialize_data = serde_json::json!({
240-
"capabilities": server_capbilities,
241-
"serverInfo": {
242-
"name": CRATE_NAME,
243-
"version": CRATE_VERSION
244-
}
245-
});
246-
247-
connection.initialize_finish(id, initialize_data)?;
248-
249-
// Create async connection wrapper
250-
let mut async_connection = AsyncConnection::from_sync(connection);
251-
main_loop(async_connection, initialization_params, cmd_args).await?;
252-
threads.join()?;
253-
254-
eprintln!("Server shutting down.");
255-
Ok(())
256-
}
257-
258-
/// LSP Server manages the entire server lifecycle
259-
struct LspServer {
260-
connection: AsyncConnection,
261-
server_context: context::ServerContext,
262-
processor: ServerMessageProcessor,
263-
}
264-
265-
impl LspServer {
266-
/// Create a new LSP server instance
267-
fn new(
268-
connection: AsyncConnection,
269-
params: &InitializeParams,
270-
init_rx: oneshot::Receiver<()>,
271-
) -> Self {
272-
let server_context = context::ServerContext::new(
273-
Connection {
274-
sender: connection.connection.sender.clone(),
275-
receiver: connection.connection.receiver.clone(),
276-
},
277-
params.capabilities.clone(),
278-
);
279-
280-
Self {
281-
connection,
282-
server_context,
283-
processor: ServerMessageProcessor::new(init_rx),
284-
}
285-
}
286-
287-
/// Run the main server loop
288-
async fn run(mut self) -> Result<(), Box<dyn Error + Sync + Send>> {
289-
// First, wait for initialization to complete while handling allowed messages
290-
self.wait_for_initialization().await?;
291-
292-
// Process all pending messages after initialization
293-
if self
294-
.processor
295-
.process_pending_messages(&mut self.connection, &mut self.server_context)
296-
.await?
297-
{
298-
self.server_context.close().await;
299-
return Ok(()); // Shutdown requested during pending message processing
300-
}
301-
302-
// Now focus on normal message processing
303-
while let Some(msg) = self.connection.recv().await {
304-
if self
305-
.processor
306-
.process_message(msg, &mut self.connection, &mut self.server_context)
307-
.await?
308-
{
309-
break; // Shutdown requested
310-
}
311-
}
312-
313-
self.server_context.close().await;
314-
Ok(())
315-
}
316-
317-
/// Wait for initialization to complete while handling initialization-allowed messages
318-
async fn wait_for_initialization(&mut self) -> Result<(), Box<dyn Error + Sync + Send>> {
319-
loop {
320-
// Check if initialization is complete
321-
if self.processor.check_initialization_complete()? {
322-
break; // Initialization completed
323-
}
324-
325-
// Use a short timeout to check for messages during initialization
326-
match tokio::time::timeout(
327-
tokio::time::Duration::from_millis(50),
328-
self.connection.recv(),
329-
)
330-
.await
331-
{
332-
Ok(Some(msg)) => {
333-
// Process message if allowed during initialization, otherwise queue it
334-
if self.processor.can_process_during_init(&msg) {
335-
self.processor
336-
.handle_message(msg, &mut self.connection, &mut self.server_context)
337-
.await?;
338-
} else {
339-
self.processor.pending_messages.push(msg);
340-
}
341-
}
342-
Ok(None) => {
343-
// Connection closed during initialization
344-
return Ok(());
345-
}
346-
Err(_) => {
347-
// Timeout - continue checking for initialization completion
348-
continue;
349-
}
350-
}
351-
}
352-
Ok(())
353-
}
354-
}
355-
356-
async fn main_loop(
357-
connection: AsyncConnection,
358-
params: InitializeParams,
359-
cmd_args: CmdArgs,
360-
) -> Result<(), Box<dyn Error + Sync + Send>> {
361-
// Setup initialization completion signal
362-
let (init_tx, init_rx) = oneshot::channel::<()>();
363-
364-
// Create and configure server instance
365-
let server = LspServer::new(connection, &params, init_rx);
366-
367-
// Start initialization process
368-
let server_context_snapshot = server.server_context.snapshot();
369-
tokio::spawn(async move {
370-
initialized_handler(server_context_snapshot, params, cmd_args).await;
371-
let _ = init_tx.send(());
372-
});
373-
374-
// Run the server
375-
server.run().await
376-
}

0 commit comments

Comments
 (0)