Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 65 additions & 17 deletions lsp/lsp-harness/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use log::debug;
use lsp_server::ResponseError;
use lsp_types::{
notification::{
DidChangeTextDocument, DidOpenTextDocument, Exit, Initialized,
Cancel, DidChangeTextDocument, DidOpenTextDocument, Exit, Initialized,
Notification as LspNotification,
},
request::{GotoDefinition, Initialize, Request as LspRequest, Shutdown},
ClientCapabilities, DidChangeTextDocumentParams, DidOpenTextDocumentParams,
GotoDefinitionParams, GotoDefinitionResponse, InitializeParams, InitializedParams, Position,
TextDocumentContentChangeEvent, TextDocumentIdentifier, TextDocumentPositionParams, Url,
VersionedTextDocumentIdentifier, WorkDoneProgressParams,
request::{ExecuteCommand, GotoDefinition, Initialize, Request as LspRequest, Shutdown},
CancelParams, ClientCapabilities, DidChangeTextDocumentParams, DidOpenTextDocumentParams,
ExecuteCommandParams, GotoDefinitionParams, GotoDefinitionResponse, InitializeParams,
InitializedParams, Position, TextDocumentContentChangeEvent, TextDocumentIdentifier,
TextDocumentPositionParams, Url, VersionedTextDocumentIdentifier, WorkDoneProgressParams,
};
use std::{
io::{BufRead, BufReader, Read, Write},
Expand All @@ -40,6 +40,7 @@ pub struct Server {
/// A buffer for notifications that have been received from the lsp but not
/// yet delivered to the client.
pending_notifications: Vec<Notification>,
pause_request_ids: Vec<u32>,
}

/// A dynamically typed message from the LSP server.
Expand Down Expand Up @@ -90,10 +91,10 @@ pub struct Response {
/// The result. The structure of this should be determined by whatever
/// request method this is a response to. But it hasn't been checked yet.
#[serde(default)]
result: serde_json::Value,
pub result: serde_json::Value,
/// Populated if the request generated an error.
#[serde(default)]
error: Option<ResponseError>,
pub error: Option<ResponseError>,
}

impl Drop for Server {
Expand Down Expand Up @@ -133,6 +134,7 @@ impl Server {
write: Box::new(stdin),
read: Box::new(BufReader::new(stdout)),
pending_notifications: Vec::new(),
pause_request_ids: Vec::new(),
id: 0,
};

Expand Down Expand Up @@ -207,6 +209,21 @@ impl Server {

/// Send a request to the language server and wait for the response.
pub fn send_request<T: LspRequest>(&mut self, params: T::Params) -> Result<T::Result> {
let resp = self.send_request_with_options::<T>(params, false)?;
if let Some(err) = resp.error {
bail!(err.message);
}
Ok(serde_json::value::from_value(resp.result)?)
}

/// Send a request to the language server and wait for the response.
/// Optionally allows a cancellation notification to be sent before waiting for a response.
/// This does not check the response for errors first, and will return whatever was received.
pub fn send_request_with_options<T: LspRequest>(
&mut self,
params: T::Params,
cancel: bool,
) -> Result<Response> {
self.id += 1;

let req = SendRequest::<T> {
Expand All @@ -216,17 +233,48 @@ impl Server {
id: self.id,
};
self.send(&req)?;
let resp = self.recv_response()?;
if resp.id != self.id {
// In general, LSP responses can come out of order. But because we always
// wait for a response after sending a request, there's only one outstanding
// response.
bail!("expected id {}, got {}", self.id, resp.id);

if cancel {
self.send_notification::<Cancel>(CancelParams {
id: lsp_types::NumberOrString::Number(self.id.try_into().unwrap()),
})?;
}
if let Some(err) = resp.error {
bail!(err.message);

loop {
let resp = self.recv_response()?;
if self.pause_request_ids.contains(&resp.id) {
// In order to test queuing behavior, we send an EvalCommand request that pauses
// the language server. We don't wait for that request to respond like we do other
// requests, so it's possible that we'll receive a response for a pause request
// ahead of the response for the request we sent. Responses to these pause requests
// can be ignored.
continue;
} else if resp.id != self.id {
// In general, LSP responses can come out of order. Apart from the pause exception
// handled above, we always wait for a response after sending a request, so there's
// only one outstanding response.
bail!("expected id {}, got {}", self.id, resp.id);
}
return Ok(resp);
}
Ok(serde_json::value::from_value(resp.result)?)
}

/// Send a request to the language server that will pause it, and don't wait for a response.
/// This can be used to test queueing and cancellation behavior.
pub fn send_pause(&mut self) -> Result<()> {
self.id += 1;
self.pause_request_ids.push(self.id);

let req = SendRequest::<ExecuteCommand> {
jsonrpc: "2.0",
method: ExecuteCommand::METHOD,
id: self.id,
params: ExecuteCommandParams {
command: "pause".into(),
..Default::default()
},
};
self.send(&req)
}

/// Send a notification to the language server.
Expand Down
13 changes: 13 additions & 0 deletions lsp/lsp-harness/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ impl TestHarness {
self.out.push(b'\n');
}

// Send a request to the language server and immediately cancel it.
pub fn request_and_cancel<T: LspRequest>(&mut self, params: T::Params) -> jsonrpc::Response {
self.srv
.send_request_with_options::<T>(params, true)
.unwrap()
}

pub fn request_dyn(&mut self, req: Request) {
match req {
Request::GotoDefinition(d) => self.request::<GotoDefinition>(d),
Expand Down Expand Up @@ -311,4 +318,10 @@ impl TestHarness {
}
}
}

/// Sends a request to pause the language server and returns without waiting for a response.
/// This can be used to test queuing and cancellation behavior.
pub fn pause_language_server(&mut self) {
self.srv.send_pause().unwrap();
}
}
9 changes: 9 additions & 0 deletions lsp/nls/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ pub fn handle_command(
server.reply(Response::new_ok(req, None::<()>));
Ok(())
}
#[cfg(debug_assertions)]
"pause" => {
// This command is only used by integration tests. It is intended to simulate
// an operation taking some amount of time so that the behavior of requests
// queuing behind it can be tested.
std::thread::sleep(std::time::Duration::from_millis(100));
server.reply(Response::new_ok(req, None::<()>));
Ok(())
}
_ => Err(Error::CommandNotFound(params.command).into()),
}
}
Expand Down
36 changes: 35 additions & 1 deletion lsp/nls/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ fn run() -> Result<()> {
)))?;
}

let (connection, _threads) = Connection::stdio();
let connection = buffered_lsp_connection();

let capabilities = Server::capabilities();

Expand All @@ -116,3 +116,37 @@ fn run() -> Result<()> {

Ok(())
}

/// Creates an LSP connection reading from stdio with an additional message buffer.
fn buffered_lsp_connection() -> Connection {
// We add an additional buffer for the LSP connection to avoid a race condition that made
// request cancellation ineffective.
//
// The lsp server connection this receives on is a zero size bounded channel. This
// causes an issue where once we pull something from the channel, the lsp server thread
// must fetch the next message from stdin, leaving this channel empty for a short time
// even if there are multiple messages already sent to the server. When it's queueing
// tasks this loop will usually beat the thread reading from stdin and will find the
// channel empty. This can cause the main loop to begin handling a request even if
// there's already a notification in stdin cancelling it.
//
// The buffer lets multiple messages stack up so there's not a problem with empty reads from
// the channel.

let (buf_sender, buf_receiver) = crossbeam::channel::bounded(30);

let (Connection { sender, receiver }, _threads) = Connection::stdio();
std::thread::Builder::new()
.name("LspMessageBuffer".into())
.spawn(move || {
receiver
.into_iter()
.try_for_each(|msg| buf_sender.send(msg))
})
.unwrap();

Connection {
sender,
receiver: buf_receiver,
}
}
25 changes: 25 additions & 0 deletions lsp/nls/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl Server {
let Ok(msg) = msg else {
break;
};

let result = self.task_queue.queue_message(msg);
if let Err(err) = result {
warn!("Failed to add a message to the queue: {}", err);
Expand Down Expand Up @@ -217,6 +218,10 @@ impl Server {
}
Ok(Shutdown::Continue)
}
Task::CancelRequest(id) => {
self.cancel_request(id);
Ok(Shutdown::Continue)
}
}
}

Expand Down Expand Up @@ -333,6 +338,26 @@ impl Server {
Ok(())
}

/// Sends a response to the client acknowledging that a request has been cancelled.
///
/// It's assumed that the cancellation was requested by the client and not initiated by the
/// server, and the response is sent accordingly.
fn cancel_request(&mut self, id: RequestId) {
debug!("Cancelling request {}", id);
self.reply(Response {
id,
result: None,
error: Some(ResponseError {
// If we need to support server cancelled requests at some point this error code is
// the only thing that really needs to change. `ErrorCode::ServerCanceled` would be
// used instead.
code: ErrorCode::RequestCanceled as i32,
message: "Request cancelled".into(),
data: None,
}),
});
}

pub fn issue_diagnostics(
&mut self,
file_id: FileId,
Expand Down
Loading