Skip to content

Commit 06d0c94

Browse files
committed
Fix race conditions between indexer and diagnostics
1 parent 57aa70c commit 06d0c94

File tree

3 files changed

+202
-52
lines changed

3 files changed

+202
-52
lines changed

crates/ark/src/lsp/main_loop.rs

Lines changed: 194 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@ use std::path::PathBuf;
1111
use std::pin::Pin;
1212
use std::sync::atomic::AtomicBool;
1313
use std::sync::atomic::Ordering;
14+
use std::sync::LazyLock;
1415
use std::sync::RwLock;
1516

1617
use anyhow::anyhow;
18+
use futures::stream::FuturesUnordered;
1719
use futures::StreamExt;
20+
use tokio::sync::mpsc;
1821
use tokio::sync::mpsc::unbounded_channel as tokio_unbounded_channel;
22+
use tokio::task;
1923
use tokio::task::JoinHandle;
2024
use tower_lsp::lsp_types;
2125
use tower_lsp::lsp_types::Diagnostic;
@@ -30,9 +34,10 @@ use crate::lsp::backend::LspNotification;
3034
use crate::lsp::backend::LspRequest;
3135
use crate::lsp::backend::LspResponse;
3236
use crate::lsp::capabilities::Capabilities;
33-
use crate::lsp::diagnostics;
37+
use crate::lsp::diagnostics::generate_diagnostics;
3438
use crate::lsp::documents::Document;
3539
use crate::lsp::handlers;
40+
use crate::lsp::indexer;
3641
use crate::lsp::inputs::library::Library;
3742
use crate::lsp::state::WorldState;
3843
use crate::lsp::state_handlers;
@@ -641,27 +646,6 @@ where
641646
send_auxiliary(AuxiliaryEvent::SpawnedTask(handle));
642647
}
643648

644-
pub(crate) fn spawn_diagnostics_refresh(uri: Url, document: Document, state: WorldState) {
645-
lsp::spawn_blocking(move || {
646-
let _s = tracing::info_span!("diagnostics_refresh", uri = %uri).entered();
647-
648-
let version = document.version;
649-
let diagnostics = diagnostics::generate_diagnostics(document, state);
650-
651-
Ok(Some(AuxiliaryEvent::PublishDiagnostics(
652-
uri,
653-
diagnostics,
654-
version,
655-
)))
656-
})
657-
}
658-
659-
pub(crate) fn spawn_diagnostics_refresh_all(state: WorldState) {
660-
for (url, document) in state.documents.iter() {
661-
spawn_diagnostics_refresh(url.clone(), document.clone(), state.clone())
662-
}
663-
}
664-
665649
pub(crate) fn publish_diagnostics(uri: Url, diagnostics: Vec<Diagnostic>, version: Option<i32>) {
666650
send_auxiliary(AuxiliaryEvent::PublishDiagnostics(
667651
uri,
@@ -692,3 +676,191 @@ impl std::fmt::Debug for TraceKernelNotification<'_> {
692676
}
693677
}
694678
}
679+
680+
#[derive(Debug)]
681+
pub(crate) enum IndexerQueueTask {
682+
Indexer(IndexerTask),
683+
Diagnostics(RefreshDiagnosticsTask),
684+
}
685+
686+
#[derive(Debug)]
687+
pub enum IndexerTask {
688+
Start { folders: Vec<String> },
689+
Update { document: Document, uri: Url },
690+
}
691+
692+
#[derive(Debug)]
693+
pub(crate) struct RefreshDiagnosticsTask {
694+
uri: Url,
695+
state: WorldState,
696+
}
697+
698+
#[derive(Debug)]
699+
struct RefreshDiagnosticsResult {
700+
uri: Url,
701+
diagnostics: Vec<Diagnostic>,
702+
version: Option<i32>,
703+
}
704+
705+
static INDEXER_QUEUE: LazyLock<tokio::sync::mpsc::UnboundedSender<IndexerQueueTask>> =
706+
LazyLock::new(|| {
707+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
708+
tokio::spawn(process_indexer_queue(rx));
709+
tx
710+
});
711+
712+
/// Process indexer and diagnostics tasks
713+
///
714+
/// Diagnostics need an up-to-date index to be accurate, so we synchronise
715+
/// indexing and diagnostics tasks using a simple queue.
716+
///
717+
/// - We make sure to refresh diagnostics after every indexer updates.
718+
/// - Indexer tasks are batched together, same for diagnostics tasks.
719+
/// - Cancellation is simply dealt with by deduplicating tasks for the same URI,
720+
/// retaining only the most recent one.
721+
///
722+
/// Ideally we'd process indexer tasks continually without making them dependent
723+
/// on diagnostics tasks. The current setup blocks the queue loop while
724+
/// diagnostics are running, but it has the benefit that rounds of diagnostic
725+
/// refreshes don't race against each other. The frontend will receive all
726+
/// results in order, ensuring that diagnostics for an outdated version are
727+
/// eventually replaced by the most up-to-date diagnostics.
728+
async fn process_indexer_queue(mut rx: mpsc::UnboundedReceiver<IndexerQueueTask>) {
729+
while let Some(task) = rx.recv().await {
730+
// Drain all available tasks
731+
let mut tasks = vec![task];
732+
while let Ok(next_task) = rx.try_recv() {
733+
tasks.push(next_task);
734+
}
735+
736+
// Separate by type
737+
let mut diagnostics_batch = Vec::new();
738+
let mut indexer_batch = Vec::new();
739+
740+
for task in tasks {
741+
match task {
742+
IndexerQueueTask::Indexer(indexer_task) => indexer_batch.push(indexer_task),
743+
IndexerQueueTask::Diagnostics(diagnostic_task) => {
744+
diagnostics_batch.push(diagnostic_task)
745+
},
746+
}
747+
}
748+
749+
// Process indexer tasks first so diagnostics tasks work with an up-to-date index
750+
process_indexer_batch(indexer_batch).await;
751+
process_diagnostics_batch(diagnostics_batch).await;
752+
}
753+
}
754+
755+
async fn process_indexer_batch(batch: Vec<IndexerTask>) {
756+
// Deduplicate tasks by key. We use a `HashMap` so only the last insertion
757+
// is retained. `Update` tasks use URI as key, `Start` tasks use None (we
758+
// only expect one though). This is effectively a way of cancelling `Update`
759+
// tasks for outdated documents.
760+
let batch: std::collections::HashMap<_, _> = batch
761+
.into_iter()
762+
.map(|task| match &task {
763+
IndexerTask::Update { uri, .. } => (Some(uri.clone()), task),
764+
IndexerTask::Start { .. } => (None, task),
765+
})
766+
.collect();
767+
768+
let mut handles = Vec::new();
769+
770+
for (_, task) in batch {
771+
handles.push(tokio::task::spawn_blocking(move || match task {
772+
IndexerTask::Start { folders } => {
773+
indexer::start(folders);
774+
},
775+
IndexerTask::Update { document, uri } => {
776+
let result = if let Ok(path) = uri.to_file_path() {
777+
indexer::update(&document, &path)
778+
} else {
779+
Err(anyhow!("Failed to convert URI to file path: {uri}"))
780+
};
781+
if let Err(err) = result {
782+
log::error!("Indexer update failed: {err}");
783+
}
784+
},
785+
}));
786+
}
787+
788+
for handle in handles {
789+
let _ = handle.await;
790+
}
791+
}
792+
793+
async fn process_diagnostics_batch(batch: Vec<RefreshDiagnosticsTask>) {
794+
// Deduplicate tasks by keeping only the last one for each URI. We use a
795+
// `HashMap` so only the last insertion is retained. This is effectively a
796+
// way of cancelling diagnostics tasks for outdated documents.
797+
let batch: std::collections::HashMap<_, _> = batch
798+
.into_iter()
799+
.map(|task| (task.uri, task.state))
800+
.collect();
801+
802+
let mut futures = FuturesUnordered::new();
803+
804+
for (uri, state) in batch {
805+
futures.push(task::spawn_blocking(move || {
806+
let _span = tracing::info_span!("diagnostics_refresh", uri = %uri).entered();
807+
808+
if let Some(document) = state.documents.get(&uri) {
809+
let diagnostics = generate_diagnostics(document.clone(), state.clone());
810+
Some(RefreshDiagnosticsResult {
811+
uri,
812+
diagnostics,
813+
version: document.version,
814+
})
815+
} else {
816+
None
817+
}
818+
}));
819+
}
820+
821+
// Publish results as they complete
822+
while let Some(Ok(Some(result))) = futures.next().await {
823+
publish_diagnostics(result.uri, result.diagnostics, result.version);
824+
}
825+
}
826+
827+
pub(crate) fn index_start(folders: Vec<String>, state: WorldState) {
828+
INDEXER_QUEUE
829+
.send(IndexerQueueTask::Indexer(IndexerTask::Start { folders }))
830+
.unwrap_or_else(|err| lsp::log_error!("Failed to queue initial indexing: {err}"));
831+
832+
diagnostics_refresh_all(state);
833+
}
834+
835+
pub(crate) fn index_update(uri: Url, document: Document, state: WorldState) {
836+
INDEXER_QUEUE
837+
.send(IndexerQueueTask::Indexer(IndexerTask::Update {
838+
document,
839+
uri: uri.clone(),
840+
}))
841+
.unwrap_or_else(|err| lsp::log_error!("Failed to queue index update: {err}"));
842+
843+
diagnostics_refresh(uri, state.clone());
844+
}
845+
846+
pub(crate) fn diagnostics_refresh(uri: Url, state: WorldState) {
847+
INDEXER_QUEUE
848+
.send(IndexerQueueTask::Diagnostics(RefreshDiagnosticsTask {
849+
uri,
850+
state,
851+
}))
852+
.unwrap_or_else(|err| lsp::log_error!("Failed to queue diagnostics refresh: {err}"));
853+
}
854+
855+
pub(crate) fn diagnostics_refresh_all(state: WorldState) {
856+
// Expand RefreshAll into individual RefreshDiagnostics tasks
857+
// This allows the deduplication logic to work uniformly
858+
for (uri, _document) in state.documents.iter() {
859+
INDEXER_QUEUE
860+
.send(IndexerQueueTask::Diagnostics(RefreshDiagnosticsTask {
861+
uri: uri.clone(),
862+
state: state.clone(),
863+
}))
864+
.unwrap_or_else(|err| lsp::log_error!("Failed to queue diagnostics refresh: {err}"));
865+
}
866+
}

crates/ark/src/lsp/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ pub(crate) use _log;
6363
pub(crate) use log_error;
6464
pub(crate) use log_info;
6565
pub(crate) use log_warn;
66+
pub(crate) use main_loop::diagnostics_refresh_all;
6667
pub(crate) use main_loop::publish_diagnostics;
6768
pub(crate) use main_loop::spawn_blocking;
68-
pub(crate) use main_loop::spawn_diagnostics_refresh;
69-
pub(crate) use main_loop::spawn_diagnostics_refresh_all;

crates/ark/src/lsp/state_handlers.rs

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
//
66
//
77

8-
use std::path::Path;
9-
108
use anyhow::anyhow;
119
use tower_lsp::lsp_types;
1210
use tower_lsp::lsp_types::CompletionOptions;
@@ -44,7 +42,6 @@ use crate::lsp::config::DOCUMENT_SETTINGS;
4442
use crate::lsp::config::GLOBAL_SETTINGS;
4543
use crate::lsp::documents::Document;
4644
use crate::lsp::encoding::get_position_encoding_kind;
47-
use crate::lsp::indexer;
4845
use crate::lsp::inputs::package::Package;
4946
use crate::lsp::inputs::source_root::SourceRoot;
5047
use crate::lsp::main_loop::DidCloseVirtualDocumentParams;
@@ -121,10 +118,7 @@ pub(crate) fn initialize(
121118
}
122119

123120
// Start first round of indexing
124-
lsp::spawn_blocking(|| {
125-
indexer::start(folders);
126-
Ok(None)
127-
});
121+
lsp::main_loop::index_start(folders, state.clone());
128122

129123
Ok(InitializeResult {
130124
server_info: Some(ServerInfo {
@@ -206,8 +200,7 @@ pub(crate) fn did_open(
206200
// NOTE: Do we need to call `update_config()` here?
207201
// update_config(vec![uri]).await;
208202

209-
update_index(&uri, &document);
210-
lsp::spawn_diagnostics_refresh(uri, document, state.clone());
203+
lsp::main_loop::index_update(uri.clone(), document.clone(), state.clone());
211204

212205
Ok(())
213206
}
@@ -219,17 +212,16 @@ pub(crate) fn did_change(
219212
state: &mut WorldState,
220213
) -> anyhow::Result<()> {
221214
let uri = &params.text_document.uri;
222-
let doc = state.get_document_mut(uri)?;
215+
let document = state.get_document_mut(uri)?;
223216

224217
let mut parser = lsp_state
225218
.parsers
226219
.get_mut(uri)
227220
.ok_or(anyhow!("No parser for {uri}"))?;
228221

229-
doc.on_did_change(&mut parser, &params);
222+
document.on_did_change(&mut parser, &params);
230223

231-
update_index(uri, doc);
232-
lsp::spawn_diagnostics_refresh(uri.clone(), doc.clone(), state.clone());
224+
lsp::main_loop::index_update(uri.clone(), document.clone(), state.clone());
233225

234226
Ok(())
235227
}
@@ -384,7 +376,7 @@ async fn update_config(
384376
// Refresh diagnostics if the configuration changed
385377
if state.config.diagnostics != diagnostics_config {
386378
tracing::info!("Refreshing diagnostics after configuration changed");
387-
lsp::spawn_diagnostics_refresh_all(state.clone());
379+
lsp::main_loop::diagnostics_refresh_all(state.clone());
388380
}
389381

390382
Ok(())
@@ -402,7 +394,7 @@ pub(crate) fn did_change_console_inputs(
402394
// during package development in conjunction with `devtools::load_all()`.
403395
// Ideally diagnostics would not rely on these though, and we wouldn't need
404396
// to refresh from here.
405-
lsp::spawn_diagnostics_refresh_all(state.clone());
397+
lsp::diagnostics_refresh_all(state.clone());
406398

407399
Ok(())
408400
}
@@ -425,16 +417,3 @@ pub(crate) fn did_close_virtual_document(
425417
state.virtual_documents.remove(&params.uri);
426418
Ok(())
427419
}
428-
429-
// FIXME: The initial indexer is currently racing against our state notification
430-
// handlers. The indexer is synchronised through a mutex but we might end up in
431-
// a weird state. Eventually the index should be moved to WorldState and created
432-
// on demand with Salsa instrumenting and cancellation.
433-
fn update_index(uri: &url::Url, doc: &Document) {
434-
if let Ok(path) = uri.to_file_path() {
435-
let path = Path::new(&path);
436-
if let Err(err) = indexer::update(&doc, &path) {
437-
lsp::log_error!("{err:?}");
438-
}
439-
}
440-
}

0 commit comments

Comments
 (0)