Skip to content

Commit 030fae0

Browse files
committed
Concurrency Improvement
1 parent c67cde8 commit 030fae0

File tree

6 files changed

+106
-33
lines changed

6 files changed

+106
-33
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ cookie = "0.18.1"
4747
criterion = "0.6.0"
4848
crossterm = { version = "0.28.1", features = ["event-stream", "events"] }
4949
ctrlc = "3.4.6"
50+
dashmap = "5.5.3"
5051
dialoguer = { version = "0.11.0", features = ["fuzzy-select"] }
5152
dirs = "5.0.0"
5253
eyre = "0.6.8"

crates/chat-cli/src/cli/chat/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,14 @@ impl ChatSession {
651651
.workspace_root(std::env::current_dir().unwrap_or_default())
652652
.auto_detect_languages()
653653
.build() {
654-
Ok(client) => Some(client),
654+
Ok(mut client) => {
655+
if let Err(e) = client.initialize().await {
656+
eprintln!("⚠️ Failed to initialize code intelligence: {}", e);
657+
None
658+
} else {
659+
Some(client)
660+
}
661+
}
655662
Err(e) => {
656663
eprintln!("⚠️ Failed to create code intelligence client: {}", e);
657664
None

crates/code-agent-sdk/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ rmcp = { workspace = true, features = ["server", "macros", "transport-io"] }
2222
notify.workspace = true
2323
ignore.workspace = true
2424
globset.workspace = true
25+
dashmap.workspace = true
2526

2627
[dev-dependencies]
2728
tempfile.workspace = true

crates/code-agent-sdk/src/sdk/file_watcher.rs

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
use crate::model::{FsEvent, FsEventKind};
2+
use crate::sdk::workspace_manager::FileState;
23
use anyhow::Result;
4+
use dashmap::DashMap;
35
use globset::{Glob, GlobSetBuilder};
46
use ignore::gitignore::{Gitignore, GitignoreBuilder};
57
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
68

79
use std::path::{Path, PathBuf};
10+
use std::sync::Arc;
811
use std::time::Instant;
912
use tokio::sync::mpsc;
1013
use url::Url;
@@ -247,16 +250,23 @@ pub(crate) struct EventProcessor {
247250
event_rx: mpsc::UnboundedReceiver<FsEvent>,
248251
workspace_manager: *mut crate::sdk::WorkspaceManager,
249252
workspace_root: PathBuf,
253+
opened_files: Arc<DashMap<PathBuf, FileState>>, // Thread-safe shared state
250254
}
251255

252256
unsafe impl Send for EventProcessor {}
253257

254258
impl EventProcessor {
255-
pub fn new(event_rx: mpsc::UnboundedReceiver<FsEvent>, workspace_manager: *mut crate::sdk::WorkspaceManager, workspace_root: PathBuf) -> Self {
259+
pub fn new(
260+
event_rx: mpsc::UnboundedReceiver<FsEvent>,
261+
workspace_manager: *mut crate::sdk::WorkspaceManager,
262+
workspace_root: PathBuf,
263+
opened_files: Arc<DashMap<PathBuf, FileState>>
264+
) -> Self {
256265
Self {
257266
event_rx,
258267
workspace_manager,
259268
workspace_root,
269+
opened_files,
260270
}
261271
}
262272

@@ -290,9 +300,11 @@ impl EventProcessor {
290300
match event.kind {
291301
FsEventKind::Created => {
292302
// Send workspace/didChangeWatchedFiles for new files (only if not already open)
293-
unsafe {
294-
let workspace_manager = &mut *self.workspace_manager;
295-
if !workspace_manager.is_file_opened(&absolute_path) {
303+
let is_file_open = self.opened_files.get(&absolute_path).map_or(false, |state| state.is_open);
304+
305+
if !is_file_open {
306+
unsafe {
307+
let workspace_manager = &mut *self.workspace_manager;
296308
if let Ok(Some(client)) = workspace_manager.get_client_for_file(&absolute_path).await {
297309
// 1. Send didChangeWatchedFiles notification
298310
let params = DidChangeWatchedFilesParams {
@@ -305,17 +317,19 @@ impl EventProcessor {
305317
tracing::info!("📄 File created, sending didChangeWatchedFiles: {:?}", absolute_path);
306318
let _ = client.did_change_watched_files(params).await;
307319
}
308-
} else {
309-
tracing::info!("📄 File created but already open, skipping notification: {:?}", absolute_path);
310320
}
321+
} else {
322+
tracing::info!("📄 File created but already open, skipping notification: {:?}", absolute_path);
311323
}
312324
}
313325

314326
FsEventKind::Deleted => {
327+
let is_file_open = self.opened_files.get(&absolute_path).map_or(false, |state| state.is_open);
328+
315329
unsafe {
316330
let workspace_manager = &mut *self.workspace_manager;
317331

318-
if workspace_manager.is_file_opened(&absolute_path) {
332+
if is_file_open {
319333
// File was opened - send didClose AND didChangeWatchedFiles
320334
if let Ok(Some(client)) = workspace_manager.get_client_for_file(&absolute_path).await {
321335
// 1. Close the opened file
@@ -337,7 +351,12 @@ impl EventProcessor {
337351
tracing::info!("🗑️ Sending didChangeWatchedFiles for deleted file: {:?}", absolute_path);
338352
let _ = client.did_change_watched_files(watch_params).await;
339353
}
340-
workspace_manager.mark_file_closed(&absolute_path);
354+
355+
// Mark file as closed in shared state
356+
if let Some(mut state) = self.opened_files.get_mut(&absolute_path) {
357+
state.is_open = false;
358+
state.version = 0;
359+
}
341360
} else {
342361
// File was closed - just send didChangeWatchedFiles
343362
if let Ok(Some(client)) = workspace_manager.get_client_for_file(&absolute_path).await {
@@ -355,13 +374,27 @@ impl EventProcessor {
355374
}
356375

357376
FsEventKind::Modified => {
358-
// SAFETY: We know workspace_manager is valid during EventProcessor lifetime
377+
let is_file_open = self.opened_files.get(&absolute_path).map_or(false, |state| state.is_open);
378+
359379
unsafe {
360380
let workspace_manager = &mut *self.workspace_manager;
361381

362-
if workspace_manager.is_file_opened(&absolute_path) {
363-
// Send didChange for opened files
364-
let version = workspace_manager.get_next_version(&absolute_path);
382+
if is_file_open {
383+
// Send didChange for opened files - increment version in shared state
384+
let version = match self.opened_files.get_mut(&absolute_path) {
385+
Some(mut state) => {
386+
state.version += 1;
387+
state.version
388+
}
389+
None => {
390+
// File not tracked, start at version 1
391+
self.opened_files.insert(absolute_path.clone(), FileState {
392+
version: 1,
393+
is_open: true,
394+
});
395+
1
396+
}
397+
};
365398

366399
if let Ok(Some(client)) = workspace_manager.get_client_for_file(&absolute_path).await {
367400
if let Ok(content) = std::fs::read_to_string(&absolute_path) {
@@ -403,11 +436,13 @@ impl EventProcessor {
403436
let from_absolute = self.workspace_root.join(&from_path);
404437
tracing::info!("📋 File renamed: {:?} -> {:?}", from_absolute, absolute_path);
405438

439+
let was_from_open = self.opened_files.get(&from_absolute).map_or(false, |state| state.is_open);
440+
406441
unsafe {
407442
let workspace_manager = &mut *self.workspace_manager;
408443

409444
// Handle as Delete(old) + Create(new)
410-
if workspace_manager.is_file_opened(&from_absolute) {
445+
if was_from_open {
411446
// Old file was opened - send didClose
412447
if let Ok(Some(client)) = workspace_manager.get_client_for_file(&from_absolute).await {
413448
if let Ok(from_uri) = Url::from_file_path(&from_absolute) {
@@ -420,7 +455,12 @@ impl EventProcessor {
420455
let _ = client.did_close(params).await;
421456
}
422457
}
423-
workspace_manager.mark_file_closed(&from_absolute);
458+
459+
// Mark old file as closed in shared state
460+
if let Some(mut state) = self.opened_files.get_mut(&from_absolute) {
461+
state.is_open = false;
462+
state.version = 0;
463+
}
424464
}
425465

426466
// Send didChangeWatchedFiles for new file

crates/code-agent-sdk/src/sdk/workspace_manager.rs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ use crate::model::types::{LspInfo, WorkspaceInfo};
44
use crate::model::FsEvent;
55
use crate::sdk::file_watcher::{FileWatcher, FileWatcherConfig};
66
use anyhow::Result;
7-
use std::collections::{HashMap, HashSet};
7+
use dashmap::DashMap;
8+
use std::collections::HashSet;
89
use std::path::{Path, PathBuf};
10+
use std::sync::Arc;
911
use tokio::sync::mpsc;
1012
use tracing::warn;
1113
use url::Url;
@@ -23,7 +25,7 @@ pub struct WorkspaceManager {
2325
workspace_root: PathBuf,
2426
registry: LspRegistry,
2527
initialized: bool,
26-
opened_files: HashMap<PathBuf, FileState>, // Track version and open state
28+
opened_files: Arc<DashMap<PathBuf, FileState>>, // Thread-safe file state tracking
2729
workspace_info: Option<WorkspaceInfo>,
2830

2931
// File watching infrastructure
@@ -48,7 +50,7 @@ impl WorkspaceManager {
4850
workspace_root: resolved_root,
4951
registry,
5052
initialized: false,
51-
opened_files: HashMap::new(),
53+
opened_files: Arc::new(DashMap::new()),
5254
workspace_info: None,
5355
_file_watcher: None,
5456
event_processor_handle: None,
@@ -117,8 +119,8 @@ impl WorkspaceManager {
117119
}
118120
};
119121

120-
// Add 3-second timeout to prevent hanging on unavailable servers
121-
match tokio::time::timeout(tokio::time::Duration::from_secs(3), init_future).await {
122+
// Add 1-second timeout to prevent hanging on unavailable servers
123+
match tokio::time::timeout(tokio::time::Duration::from_secs(1), init_future).await {
122124
Ok(_) => {
123125
}
124126
Err(_) => {
@@ -135,7 +137,7 @@ impl WorkspaceManager {
135137

136138
// Start file watching after LSP initialization
137139
if let Err(e) = self.start_file_watching() {
138-
tracing::warn!("Failed to start file watching: {}", e);
140+
warn!("Failed to start file watching: {}", e);
139141
}
140142

141143
Ok(())
@@ -320,22 +322,25 @@ impl WorkspaceManager {
320322

321323
/// Get next version for file and increment it
322324
pub fn get_next_version(&mut self, file_path: &Path) -> i32 {
323-
if let Some(state) = self.opened_files.get_mut(file_path) {
324-
state.version += 1;
325-
state.version
326-
} else {
327-
// File not tracked, start at version 1
328-
self.opened_files.insert(file_path.to_path_buf(), FileState {
329-
version: 1,
330-
is_open: true,
331-
});
332-
1
325+
match self.opened_files.get_mut(file_path) {
326+
Some(mut state) => {
327+
state.version += 1;
328+
state.version
329+
}
330+
None => {
331+
// File not tracked, start at version 1
332+
self.opened_files.insert(file_path.to_path_buf(), FileState {
333+
version: 1,
334+
is_open: true,
335+
});
336+
1
337+
}
333338
}
334339
}
335340

336341
/// Mark file as closed
337342
pub fn mark_file_closed(&mut self, file_path: &Path) {
338-
if let Some(state) = self.opened_files.get_mut(file_path) {
343+
if let Some(mut state) = self.opened_files.get_mut(file_path) {
339344
state.is_open = false;
340345
state.version = 0;
341346
}
@@ -385,7 +390,12 @@ impl WorkspaceManager {
385390
let file_watcher = FileWatcher::new(self.workspace_root.clone(), tx, config)?;
386391

387392
// Start event processor with workspace manager reference
388-
let processor = crate::sdk::file_watcher::EventProcessor::new(rx, self as *mut _, self.workspace_root.clone());
393+
let processor = crate::sdk::file_watcher::EventProcessor::new(
394+
rx,
395+
self as *mut _,
396+
self.workspace_root.clone(),
397+
self.opened_files.clone() // Share the Arc<DashMap>
398+
);
389399
let handle = tokio::spawn(async move {
390400
processor.run().await;
391401
});

0 commit comments

Comments
 (0)