Skip to content

Commit 0e991ac

Browse files
committed
Handle LSP notifications for workspace files
1 parent 6341e01 commit 0e991ac

File tree

4 files changed

+333
-48
lines changed

4 files changed

+333
-48
lines changed

crates/ark/src/lsp/backend.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ pub(crate) enum LspNotification {
125125
DidChangeTextDocument(DidChangeTextDocumentParams),
126126
DidSaveTextDocument(DidSaveTextDocumentParams),
127127
DidCloseTextDocument(DidCloseTextDocumentParams),
128+
DidCreateFiles(CreateFilesParams),
129+
DidDeleteFiles(DeleteFilesParams),
130+
DidRenameFiles(RenameFilesParams),
128131
}
129132

130133
#[derive(Debug)]
@@ -244,6 +247,18 @@ impl LanguageServer for Backend {
244247
self.notify(LspNotification::DidChangeWatchedFiles(params));
245248
}
246249

250+
async fn did_create_files(&self, params: CreateFilesParams) {
251+
self.notify(LspNotification::DidCreateFiles(params));
252+
}
253+
254+
async fn did_delete_files(&self, params: DeleteFilesParams) {
255+
self.notify(LspNotification::DidDeleteFiles(params));
256+
}
257+
258+
async fn did_rename_files(&self, params: RenameFilesParams) {
259+
self.notify(LspNotification::DidRenameFiles(params));
260+
}
261+
247262
async fn symbol(
248263
&self,
249264
params: WorkspaceSymbolParams,

crates/ark/src/lsp/indexer.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pub fn start(folders: Vec<String>) {
7777
for entry in walker.into_iter().filter_entry(|e| filter_entry(e)) {
7878
if let Ok(entry) = entry {
7979
if entry.file_type().is_file() {
80-
if let Err(err) = index_file(entry.path()) {
80+
if let Err(err) = create(entry.path()) {
8181
lsp::log_error!("Can't index file {:?}: {err:?}", entry.path());
8282
}
8383
}
@@ -116,7 +116,7 @@ pub fn map(mut callback: impl FnMut(&Path, &String, &IndexEntry)) {
116116

117117
#[tracing::instrument(level = "trace", skip_all, fields(path = ?path))]
118118
pub fn update(document: &Document, path: &Path) -> anyhow::Result<()> {
119-
clear(path)?;
119+
delete(path)?;
120120
index_document(document, path);
121121
Ok(())
122122
}
@@ -147,7 +147,8 @@ fn index_insert(index: &mut HashMap<String, IndexEntry>, entry: IndexEntry) {
147147
}
148148
}
149149

150-
fn clear(path: &Path) -> anyhow::Result<()> {
150+
#[tracing::instrument(level = "trace")]
151+
pub(crate) fn delete(path: &Path) -> anyhow::Result<()> {
151152
let mut index = WORKSPACE_INDEX.lock().unwrap();
152153
let path = str_from_path(path)?;
153154

@@ -159,6 +160,19 @@ fn clear(path: &Path) -> anyhow::Result<()> {
159160
Ok(())
160161
}
161162

163+
#[tracing::instrument(level = "trace")]
164+
pub(crate) fn rename(old: &Path, new: &Path) -> anyhow::Result<()> {
165+
let mut index = WORKSPACE_INDEX.lock().unwrap();
166+
let old = str_from_path(old)?;
167+
let new = str_from_path(new)?;
168+
169+
if let Some(entries) = index.remove(old) {
170+
index.insert(new.to_string(), entries);
171+
}
172+
173+
Ok(())
174+
}
175+
162176
#[cfg(test)]
163177
pub(crate) fn indexer_clear() {
164178
let mut index = WORKSPACE_INDEX.lock().unwrap();
@@ -208,8 +222,8 @@ pub fn filter_entry(entry: &DirEntry) -> bool {
208222
true
209223
}
210224

211-
fn index_file(path: &Path) -> anyhow::Result<()> {
212-
// only index R files
225+
pub(crate) fn create(path: &Path) -> anyhow::Result<()> {
226+
// Only index R files
213227
let ext = path.extension().unwrap_or_default();
214228
if ext != "r" && ext != "R" {
215229
return Ok(());

crates/ark/src/lsp/main_loop.rs

Lines changed: 207 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,15 @@ impl GlobalState {
295295
LspNotification::DidCloseTextDocument(params) => {
296296
state_handlers::did_close(params, &mut self.lsp_state, &mut self.world)?;
297297
},
298+
LspNotification::DidCreateFiles(params) => {
299+
state_handlers::did_create_files(params, &self.world)?;
300+
},
301+
LspNotification::DidDeleteFiles(params) => {
302+
state_handlers::did_delete_files(params, &mut self.world)?;
303+
},
304+
LspNotification::DidRenameFiles(params) => {
305+
state_handlers::did_rename_files(params, &mut self.world)?;
306+
},
298307
}
299308
},
300309

@@ -686,8 +695,10 @@ pub(crate) enum IndexerQueueTask {
686695

687696
#[derive(Debug)]
688697
pub enum IndexerTask {
689-
Start { folders: Vec<String> },
690-
Update { document: Document, uri: Url },
698+
Create { uri: Url },
699+
Delete { uri: Url },
700+
Rename { uri: Url, new: Url },
701+
Update { uri: Url, document: Document },
691702
}
692703

693704
#[derive(Debug)]
@@ -703,6 +714,27 @@ struct RefreshDiagnosticsResult {
703714
version: Option<i32>,
704715
}
705716

717+
fn summarize_indexer_task(batch: &[IndexerTask]) -> String {
718+
let mut counts = std::collections::HashMap::new();
719+
for task in batch {
720+
let type_name = match task {
721+
IndexerTask::Create { .. } => "Create",
722+
IndexerTask::Delete { .. } => "Delete",
723+
IndexerTask::Rename { .. } => "Rename",
724+
IndexerTask::Update { .. } => "Update",
725+
};
726+
*counts.entry(type_name).or_insert(0) += 1;
727+
}
728+
729+
let mut summary = String::new();
730+
for (task_type, count) in counts.iter() {
731+
use std::fmt::Write;
732+
let _ = write!(summary, "{task_type}: {count} ");
733+
}
734+
735+
summary.trim_end().to_string()
736+
}
737+
706738
static INDEXER_QUEUE: LazyLock<tokio::sync::mpsc::UnboundedSender<IndexerQueueTask>> =
707739
LazyLock::new(|| {
708740
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
@@ -770,44 +802,94 @@ async fn process_indexer_queue(mut rx: mpsc::UnboundedReceiver<IndexerQueueTask>
770802
}
771803

772804
async fn process_indexer_batch(batch: Vec<IndexerTask>) {
773-
// Deduplicate tasks by key. We use a `HashMap` so only the last insertion
774-
// is retained. `Update` tasks use URI as key, `Start` tasks use None (we
775-
// only expect one though). This is effectively a way of cancelling `Update`
776-
// tasks for outdated documents.
777-
let batch: std::collections::HashMap<_, _> = batch
778-
.into_iter()
779-
.map(|task| match &task {
780-
IndexerTask::Update { uri, .. } => (Some(uri.clone()), task),
781-
IndexerTask::Start { .. } => (None, task),
782-
})
783-
.collect();
805+
tracing::trace!(
806+
"Processing {n} indexer tasks ({summary})",
807+
n = batch.len(),
808+
summary = summarize_indexer_task(&batch)
809+
);
810+
811+
let mut updates: HashMap<PathBuf, IndexerTask> = HashMap::new();
812+
813+
fn flush_updates(
814+
updates: &mut HashMap<PathBuf, IndexerTask>,
815+
) -> Vec<tokio::task::JoinHandle<()>> {
816+
tracing::trace!("Flushing {n} update tasks", n = updates.len());
817+
818+
let mut handles = Vec::new();
819+
for (path, task) in updates.drain() {
820+
handles.push(tokio::task::spawn_blocking(move || {
821+
if let IndexerTask::Update { document, .. } = task {
822+
if let Err(err) = indexer::update(&document, &path) {
823+
log::warn!("Indexer update failed: {err}");
824+
}
825+
}
826+
}));
827+
}
828+
handles
829+
}
784830

785-
let mut handles = Vec::new();
831+
let to_path_buf = |uri: &url::Url| {
832+
uri.to_file_path()
833+
.map_err(|_| anyhow!("Failed to convert URI '{uri}' to file path"))
834+
};
786835

787-
for (_, task) in batch {
788-
handles.push(tokio::task::spawn_blocking(move || match task {
789-
IndexerTask::Start { folders } => {
790-
indexer::start(folders);
791-
},
792-
IndexerTask::Update { document, uri } => {
793-
let result = if let Ok(path) = uri.to_file_path() {
794-
indexer::update(&document, &path)
795-
} else {
796-
Err(anyhow!("Failed to convert URI to file path: {uri}"))
797-
};
798-
if let Err(err) = result {
799-
log::error!("Indexer update failed: {err}");
800-
}
801-
},
802-
}));
836+
for task in batch {
837+
let result: anyhow::Result<()> = (|| async {
838+
match &task {
839+
IndexerTask::Create { uri } => {
840+
let path = to_path_buf(uri)?;
841+
indexer::create(&path)?;
842+
},
843+
844+
IndexerTask::Update { uri, .. } => {
845+
let path = to_path_buf(uri)?;
846+
updates.insert(path, task);
847+
},
848+
849+
IndexerTask::Delete { uri } => {
850+
let handles = flush_updates(&mut updates);
851+
for handle in handles {
852+
let _ = handle.await;
853+
}
854+
let path = to_path_buf(uri)?;
855+
indexer::delete(&path)?;
856+
},
857+
858+
IndexerTask::Rename {
859+
uri: old_uri,
860+
new: new_uri,
861+
} => {
862+
let handles = flush_updates(&mut updates);
863+
for handle in handles {
864+
let _ = handle.await;
865+
}
866+
867+
let old_path = to_path_buf(old_uri)?;
868+
let new_path = to_path_buf(new_uri)?;
869+
870+
indexer::rename(&old_path, &new_path)?;
871+
},
872+
}
873+
874+
Ok(())
875+
})()
876+
.await;
877+
878+
if let Err(err) = result {
879+
tracing::warn!("Can't process indexer task: {err}");
880+
continue;
881+
}
803882
}
804883

884+
let handles = flush_updates(&mut updates);
805885
for handle in handles {
806886
let _ = handle.await;
807887
}
808888
}
809889

810890
async fn process_diagnostics_batch(batch: Vec<RefreshDiagnosticsTask>) {
891+
tracing::trace!("Processing {n} diagnostic tasks", n = batch.len());
892+
811893
// Deduplicate tasks by keeping only the last one for each URI. We use a
812894
// `HashMap` so only the last insertion is retained. This is effectively a
813895
// way of cancelling diagnostics tasks for outdated documents.
@@ -850,27 +932,112 @@ async fn process_diagnostics_batch(batch: Vec<RefreshDiagnosticsTask>) {
850932
}
851933

852934
pub(crate) fn index_start(folders: Vec<String>, state: WorldState) {
853-
INDEXER_QUEUE
854-
.send(IndexerQueueTask::Indexer(IndexerTask::Start { folders }))
855-
.unwrap_or_else(|err| lsp::log_error!("Failed to queue initial indexing: {err}"));
935+
lsp::log_info!("Initial indexing started");
856936

937+
let uris: Vec<Url> = folders
938+
.into_iter()
939+
.flat_map(|folder| {
940+
walkdir::WalkDir::new(folder)
941+
.into_iter()
942+
.filter_entry(|e| indexer::filter_entry(e))
943+
.filter_map(|entry| {
944+
let entry = match entry {
945+
Ok(e) => e,
946+
Err(_) => return None,
947+
};
948+
949+
if !entry.file_type().is_file() {
950+
return None;
951+
}
952+
let path = entry.path();
953+
954+
// Only index R files
955+
let ext = path.extension().unwrap_or_default();
956+
if ext != "r" && ext != "R" {
957+
return None;
958+
}
959+
960+
if let Ok(uri) = url::Url::from_file_path(path) {
961+
Some(uri)
962+
} else {
963+
tracing::warn!("Can't convert path to URI: {:?}", path);
964+
None
965+
}
966+
})
967+
})
968+
.collect();
969+
970+
index_create(uris, state);
971+
}
972+
973+
pub(crate) fn index_create(uris: Vec<Url>, state: WorldState) {
974+
for uri in uris {
975+
crate::lsp::main_loop::INDEXER_QUEUE
976+
.send(crate::lsp::main_loop::IndexerQueueTask::Indexer(
977+
crate::lsp::main_loop::IndexerTask::Create { uri },
978+
))
979+
.unwrap_or_else(|err| crate::lsp::log_error!("Failed to queue index create: {err}"));
980+
}
981+
982+
diagnostics_refresh_all(state);
983+
}
984+
985+
pub(crate) fn index_update(uris: Vec<Url>, state: WorldState) {
986+
for uri in uris {
987+
let document = match state.get_document(&uri) {
988+
Ok(doc) => doc.clone(),
989+
Err(err) => {
990+
tracing::warn!("Can't get document '{uri}' for indexing: {err:?}");
991+
continue;
992+
},
993+
};
994+
995+
INDEXER_QUEUE
996+
.send(IndexerQueueTask::Indexer(IndexerTask::Update {
997+
document,
998+
uri,
999+
}))
1000+
.unwrap_or_else(|err| lsp::log_error!("Failed to queue index update: {err}"));
1001+
}
1002+
1003+
// Refresh all diagnostics since the indexer results for one file may affect
1004+
// other files
1005+
diagnostics_refresh_all(state);
1006+
}
1007+
1008+
pub(crate) fn index_delete(uris: Vec<Url>, state: WorldState) {
1009+
for uri in uris {
1010+
INDEXER_QUEUE
1011+
.send(IndexerQueueTask::Indexer(IndexerTask::Delete { uri }))
1012+
.unwrap_or_else(|err| lsp::log_error!("Failed to queue index update: {err}"));
1013+
}
1014+
1015+
// Refresh all diagnostics since the indexer results for one file may affect
1016+
// other files
8571017
diagnostics_refresh_all(state);
8581018
}
8591019

860-
pub(crate) fn index_update(uri: Url, document: Document, state: WorldState) {
861-
INDEXER_QUEUE
862-
.send(IndexerQueueTask::Indexer(IndexerTask::Update {
863-
document,
864-
uri: uri.clone(),
865-
}))
866-
.unwrap_or_else(|err| lsp::log_error!("Failed to queue index update: {err}"));
1020+
pub(crate) fn index_rename(uris: Vec<(Url, Url)>, state: WorldState) {
1021+
for (old, new) in uris {
1022+
INDEXER_QUEUE
1023+
.send(IndexerQueueTask::Indexer(IndexerTask::Rename {
1024+
uri: old,
1025+
new,
1026+
}))
1027+
.unwrap_or_else(|err| lsp::log_error!("Failed to queue index update: {err}"));
1028+
}
8671029

8681030
// Refresh all diagnostics since the indexer results for one file may affect
8691031
// other files
8701032
diagnostics_refresh_all(state);
8711033
}
8721034

8731035
pub(crate) fn diagnostics_refresh_all(state: WorldState) {
1036+
tracing::trace!(
1037+
"Refreshing diagnostics for {n} documents",
1038+
n = state.documents.len()
1039+
);
1040+
8741041
for (uri, _document) in state.documents.iter() {
8751042
INDEXER_QUEUE
8761043
.send(IndexerQueueTask::Diagnostics(RefreshDiagnosticsTask {

0 commit comments

Comments
 (0)