diff --git a/lsp/lsp-harness/src/jsonrpc.rs b/lsp/lsp-harness/src/jsonrpc.rs index bf8cf9a029..e9d15c808e 100644 --- a/lsp/lsp-harness/src/jsonrpc.rs +++ b/lsp/lsp-harness/src/jsonrpc.rs @@ -7,14 +7,14 @@ use log::debug; use lsp_server::ResponseError; use lsp_types::{ notification::{ - DidChangeTextDocument, DidOpenTextDocument, Exit, Initialized, + Cancel, DidChangeTextDocument, DidOpenTextDocument, Exit, Initialized, Notification as LspNotification, }, - request::{GotoDefinition, Initialize, Request as LspRequest, Shutdown}, - ClientCapabilities, DidChangeTextDocumentParams, DidOpenTextDocumentParams, - GotoDefinitionParams, GotoDefinitionResponse, InitializeParams, InitializedParams, Position, - TextDocumentContentChangeEvent, TextDocumentIdentifier, TextDocumentPositionParams, Url, - VersionedTextDocumentIdentifier, WorkDoneProgressParams, + request::{ExecuteCommand, GotoDefinition, Initialize, Request as LspRequest, Shutdown}, + CancelParams, ClientCapabilities, DidChangeTextDocumentParams, DidOpenTextDocumentParams, + ExecuteCommandParams, GotoDefinitionParams, GotoDefinitionResponse, InitializeParams, + InitializedParams, Position, TextDocumentContentChangeEvent, TextDocumentIdentifier, + TextDocumentPositionParams, Url, VersionedTextDocumentIdentifier, WorkDoneProgressParams, }; use std::{ io::{BufRead, BufReader, Read, Write}, @@ -40,6 +40,7 @@ pub struct Server { /// A buffer for notifications that have been received from the lsp but not /// yet delivered to the client. pending_notifications: Vec, + pause_request_ids: Vec, } /// A dynamically typed message from the LSP server. @@ -90,10 +91,10 @@ pub struct Response { /// The result. The structure of this should be determined by whatever /// request method this is a response to. But it hasn't been checked yet. #[serde(default)] - result: serde_json::Value, + pub result: serde_json::Value, /// Populated if the request generated an error. #[serde(default)] - error: Option, + pub error: Option, } impl Drop for Server { @@ -133,6 +134,7 @@ impl Server { write: Box::new(stdin), read: Box::new(BufReader::new(stdout)), pending_notifications: Vec::new(), + pause_request_ids: Vec::new(), id: 0, }; @@ -207,6 +209,21 @@ impl Server { /// Send a request to the language server and wait for the response. pub fn send_request(&mut self, params: T::Params) -> Result { + let resp = self.send_request_with_options::(params, false)?; + if let Some(err) = resp.error { + bail!(err.message); + } + Ok(serde_json::value::from_value(resp.result)?) + } + + /// Send a request to the language server and wait for the response. + /// Optionally allows a cancellation notification to be sent before waiting for a response. + /// This does not check the response for errors first, and will return whatever was received. + pub fn send_request_with_options( + &mut self, + params: T::Params, + cancel: bool, + ) -> Result { self.id += 1; let req = SendRequest:: { @@ -216,17 +233,48 @@ impl Server { id: self.id, }; self.send(&req)?; - let resp = self.recv_response()?; - if resp.id != self.id { - // In general, LSP responses can come out of order. But because we always - // wait for a response after sending a request, there's only one outstanding - // response. - bail!("expected id {}, got {}", self.id, resp.id); + + if cancel { + self.send_notification::(CancelParams { + id: lsp_types::NumberOrString::Number(self.id.try_into().unwrap()), + })?; } - if let Some(err) = resp.error { - bail!(err.message); + + loop { + let resp = self.recv_response()?; + if self.pause_request_ids.contains(&resp.id) { + // In order to test queuing behavior, we send an EvalCommand request that pauses + // the language server. We don't wait for that request to respond like we do other + // requests, so it's possible that we'll receive a response for a pause request + // ahead of the response for the request we sent. Responses to these pause requests + // can be ignored. + continue; + } else if resp.id != self.id { + // In general, LSP responses can come out of order. Apart from the pause exception + // handled above, we always wait for a response after sending a request, so there's + // only one outstanding response. + bail!("expected id {}, got {}", self.id, resp.id); + } + return Ok(resp); } - Ok(serde_json::value::from_value(resp.result)?) + } + + /// Send a request to the language server that will pause it, and don't wait for a response. + /// This can be used to test queueing and cancellation behavior. + pub fn send_pause(&mut self) -> Result<()> { + self.id += 1; + self.pause_request_ids.push(self.id); + + let req = SendRequest:: { + jsonrpc: "2.0", + method: ExecuteCommand::METHOD, + id: self.id, + params: ExecuteCommandParams { + command: "pause".into(), + ..Default::default() + }, + }; + self.send(&req) } /// Send a notification to the language server. diff --git a/lsp/lsp-harness/src/lib.rs b/lsp/lsp-harness/src/lib.rs index 19393f956e..0f243d5845 100644 --- a/lsp/lsp-harness/src/lib.rs +++ b/lsp/lsp-harness/src/lib.rs @@ -232,6 +232,13 @@ impl TestHarness { self.out.push(b'\n'); } + // Send a request to the language server and immediately cancel it. + pub fn request_and_cancel(&mut self, params: T::Params) -> jsonrpc::Response { + self.srv + .send_request_with_options::(params, true) + .unwrap() + } + pub fn request_dyn(&mut self, req: Request) { match req { Request::GotoDefinition(d) => self.request::(d), @@ -311,4 +318,10 @@ impl TestHarness { } } } + + /// Sends a request to pause the language server and returns without waiting for a response. + /// This can be used to test queuing and cancellation behavior. + pub fn pause_language_server(&mut self) { + self.srv.send_pause().unwrap(); + } } diff --git a/lsp/nls/src/command.rs b/lsp/nls/src/command.rs index 1dd3c34b8a..df2a58aa43 100644 --- a/lsp/nls/src/command.rs +++ b/lsp/nls/src/command.rs @@ -16,6 +16,15 @@ pub fn handle_command( server.reply(Response::new_ok(req, None::<()>)); Ok(()) } + #[cfg(debug_assertions)] + "pause" => { + // This command is only used by integration tests. It is intended to simulate + // an operation taking some amount of time so that the behavior of requests + // queuing behind it can be tested. + std::thread::sleep(std::time::Duration::from_millis(100)); + server.reply(Response::new_ok(req, None::<()>)); + Ok(()) + } _ => Err(Error::CommandNotFound(params.command).into()), } } diff --git a/lsp/nls/src/main.rs b/lsp/nls/src/main.rs index b323a61eb1..cccf166fa0 100644 --- a/lsp/nls/src/main.rs +++ b/lsp/nls/src/main.rs @@ -98,7 +98,7 @@ fn run() -> Result<()> { )))?; } - let (connection, _threads) = Connection::stdio(); + let connection = buffered_lsp_connection(); let capabilities = Server::capabilities(); @@ -116,3 +116,37 @@ fn run() -> Result<()> { Ok(()) } + +/// Creates an LSP connection reading from stdio with an additional message buffer. +fn buffered_lsp_connection() -> Connection { + // We add an additional buffer for the LSP connection to avoid a race condition that made + // request cancellation ineffective. + // + // The lsp server connection this receives on is a zero size bounded channel. This + // causes an issue where once we pull something from the channel, the lsp server thread + // must fetch the next message from stdin, leaving this channel empty for a short time + // even if there are multiple messages already sent to the server. When it's queueing + // tasks this loop will usually beat the thread reading from stdin and will find the + // channel empty. This can cause the main loop to begin handling a request even if + // there's already a notification in stdin cancelling it. + // + // The buffer lets multiple messages stack up so there's not a problem with empty reads from + // the channel. + + let (buf_sender, buf_receiver) = crossbeam::channel::bounded(30); + + let (Connection { sender, receiver }, _threads) = Connection::stdio(); + std::thread::Builder::new() + .name("LspMessageBuffer".into()) + .spawn(move || { + receiver + .into_iter() + .try_for_each(|msg| buf_sender.send(msg)) + }) + .unwrap(); + + Connection { + sender, + receiver: buf_receiver, + } +} diff --git a/lsp/nls/src/server.rs b/lsp/nls/src/server.rs index ff772398ba..0ee60b7c43 100644 --- a/lsp/nls/src/server.rs +++ b/lsp/nls/src/server.rs @@ -161,6 +161,7 @@ impl Server { let Ok(msg) = msg else { break; }; + let result = self.task_queue.queue_message(msg); if let Err(err) = result { warn!("Failed to add a message to the queue: {}", err); @@ -217,6 +218,10 @@ impl Server { } Ok(Shutdown::Continue) } + Task::CancelRequest(id) => { + self.cancel_request(id); + Ok(Shutdown::Continue) + } } } @@ -333,6 +338,26 @@ impl Server { Ok(()) } + /// Sends a response to the client acknowledging that a request has been cancelled. + /// + /// It's assumed that the cancellation was requested by the client and not initiated by the + /// server, and the response is sent accordingly. + fn cancel_request(&mut self, id: RequestId) { + debug!("Cancelling request {}", id); + self.reply(Response { + id, + result: None, + error: Some(ResponseError { + // If we need to support server cancelled requests at some point this error code is + // the only thing that really needs to change. `ErrorCode::ServerCanceled` would be + // used instead. + code: ErrorCode::RequestCanceled as i32, + message: "Request cancelled".into(), + data: None, + }), + }); + } + pub fn issue_diagnostics( &mut self, file_id: FileId, diff --git a/lsp/nls/src/task_queue.rs b/lsp/nls/src/task_queue.rs index c70eafa7b5..9119b92489 100644 --- a/lsp/nls/src/task_queue.rs +++ b/lsp/nls/src/task_queue.rs @@ -5,11 +5,13 @@ use std::{ use anyhow::Result; use log::debug; -use lsp_server::{Message, Notification}; +use lsp_server::{Message, Notification, RequestId}; use lsp_types::{ - DidChangeTextDocumentParams, DidCloseTextDocumentParams, DidOpenTextDocumentParams, Url, + CancelParams, DidChangeTextDocumentParams, DidCloseTextDocumentParams, + DidOpenTextDocumentParams, NumberOrString, Url, notification::{ - DidChangeTextDocument, DidCloseTextDocument, DidOpenTextDocument, Notification as _, + Cancel, DidChangeTextDocument, DidCloseTextDocument, DidOpenTextDocument, + Notification as NotificationTrait, }, request::{ExecuteCommand, Request as RequestTrait}, }; @@ -22,6 +24,8 @@ pub enum Task { HandleDocumentSync(DocumentSync), /// Publish diagnostics for a file Diagnostics(DiagnosticsRequest), + /// Cancel a request. + CancelRequest(RequestId), } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -47,6 +51,7 @@ pub enum DocumentSync { enum ReqOrSync { Request(lsp_server::Request), DocumentSync(DocumentSync), + CanceledRequest(RequestId), } pub struct TaskQueue { @@ -120,6 +125,20 @@ impl TaskQueue { serde_json::from_value::(notification.params)?; self.add_sync_task(DocumentSync::Change(params)); } + Cancel::METHOD => { + let params = serde_json::from_value::(notification.params)?; + let id = match params.id { + NumberOrString::Number(id) => id.into(), + NumberOrString::String(id) => id.into(), + }; + self.request_or_sync.iter_mut().for_each(|it| { + if let ReqOrSync::Request(req) = it + && req.id == id + { + *it = ReqOrSync::CanceledRequest(id.clone()); + } + }); + } method => debug!("No handler for notification type {}", method), }; Ok(()) @@ -192,6 +211,7 @@ impl TaskQueue { } match self.request_or_sync.pop_front() { Some(ReqOrSync::DocumentSync(task)) => Some(Task::HandleDocumentSync(task)), + Some(ReqOrSync::CanceledRequest(id)) => Some(Task::CancelRequest(id)), Some(ReqOrSync::Request(req)) => { let task = match categorize(&req) { RequestCategory::EvalCommand(uri) => { @@ -498,4 +518,110 @@ mod test { assert_matches!(queue.next_task().unwrap(), Task::Diagnostics(_)); assert_matches!(queue.next_task().unwrap(), Task::HandleRequest(_)); } + + #[test] + fn cancel_request_int_id() { + let mut queue = TaskQueue::new(); + let req = Request::new( + 123.into(), + ExecuteCommand::METHOD.into(), + json!({ + "command":"eval", + "arguments":[{"uri": "file:///test.ncl"}] + }), + ); + + let cancel = Notification::new( + Cancel::METHOD.into(), + json!({ + "id": 123 + }), + ); + + queue.queue_message(Message::Request(req)).unwrap(); + queue.queue_message(Message::Notification(cancel)).unwrap(); + let task = queue.next_task().unwrap(); + match task { + Task::CancelRequest(id) => assert_eq!(id, 123.into()), + _ => panic!("Wrong task type"), + } + // The request should actually be cancelled now. + assert!(queue.next_task().is_none()); + } + + #[test] + fn cancel_request_str_id() { + let mut queue = TaskQueue::new(); + let req = Request::new( + "123".to_string().into(), + ExecuteCommand::METHOD.into(), + json!({ + "command":"eval", + "arguments":[{"uri": "file:///test.ncl"}] + }), + ); + + let cancel = Notification::new( + Cancel::METHOD.into(), + json!({ + "id": "123" + }), + ); + + queue.queue_message(Message::Request(req)).unwrap(); + queue.queue_message(Message::Notification(cancel)).unwrap(); + let task = queue.next_task().unwrap(); + match task { + Task::CancelRequest(id) => assert_eq!(id, "123".to_string().into()), + _ => panic!("Wrong task type"), + } + // The request should actually be cancelled now. + assert!(queue.next_task().is_none()); + } + + #[test] + fn only_request_with_matching_id_is_cancelled() { + let mut queue = TaskQueue::new(); + let cancelled_req = Request::new( + 123.into(), + ExecuteCommand::METHOD.into(), + json!({ + "command":"eval", + "arguments":[{"uri": "file:///test.ncl"}] + }), + ); + + let other_req = Request::new( + 321.into(), + ExecuteCommand::METHOD.into(), + json!({ + "command":"eval", + "arguments":[{"uri": "file:///test.ncl"}] + }), + ); + + let cancel = Notification::new( + Cancel::METHOD.into(), + json!({ + "id": 123 + }), + ); + + queue + .queue_message(Message::Request(cancelled_req)) + .unwrap(); + queue.queue_message(Message::Request(other_req)).unwrap(); + queue.queue_message(Message::Notification(cancel)).unwrap(); + match queue.next_task().unwrap() { + Task::CancelRequest(id) => assert_eq!(id, 123.into()), + _ => panic!("Wrong task type"), + } + match queue.next_task().unwrap() { + Task::HandleRequest(req) => assert_eq!(req.id, 321.into()), + _ => panic!("Wrong task type"), + } + + // The request should actually be cancelled now. + assert!(queue.next_task().is_none()); + } } diff --git a/lsp/nls/tests/main.rs b/lsp/nls/tests/main.rs index a089cdc919..7470cdda1d 100644 --- a/lsp/nls/tests/main.rs +++ b/lsp/nls/tests/main.rs @@ -1,4 +1,5 @@ -use lsp_types::Url; +use lsp_server::ErrorCode; +use lsp_types::{ExecuteCommandParams, Url, request::ExecuteCommand}; use nickel_lang_utils::project_root::project_root; use pretty_assertions::assert_eq; use serde_json::json; @@ -135,6 +136,27 @@ fn refresh_missing_imports() { assert_eq!(dep_diags.uri, dep_uri); } +// This test is potentially subject to flakiness due to a race condition with how LSP messages are +// read from stdin. It's possible for the main loop to check for new messages and not find one even +// if there's a message waiting from stdin, and to begin handling the request without knowing that +// it has already been cancelled. To fix this, the main loop will wait with a short timeout when it +// tries to read a new message. The timeout it has set is well over what was needed to make this +// test work reliably in my testing, but I can't guarantee the timeout is correct under all +// conditions or all hardware. +#[test] +fn cancel_request() { + let _ = env_logger::try_init(); + let mut harness = TestHarness::new(); + harness.pause_language_server(); + let response = harness.request_and_cancel::(ExecuteCommandParams { + command: "eval".into(), + arguments: vec![json!({ "uri": "file:///test.ncl"})], + ..Default::default() + }); + let err = response.error.unwrap(); + assert_eq!(err.code, ErrorCode::RequestCanceled as i32); +} + #[test] fn apply_client_options() { let _ = env_logger::try_init();