Skip to content
This repository was archived by the owner on Sep 23, 2025. It is now read-only.

Commit 45eed5e

Browse files
committed
Refine IPC reconnection concurrency and add safeguards
- Simplify concurrency model: use &mut self with Arc parameter instead of static methods - Add comment explaining lock holding prevents concurrent reconnection races - Add assertions in attempt_connection_with_backoff for precondition validation - Suppress lint warnings with #[expect] for temporarily unused code - terminal_shell_pid: reserved for Phase 3 handshake protocol - clear_connection_and_reconnect: reserved for reader task reconnection fix Status: Core reconnection architecture complete and ready for testing
1 parent 2635795 commit 45eed5e

File tree

1 file changed

+31
-33
lines changed

1 file changed

+31
-33
lines changed

server/src/ipc.rs

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ struct IPCCommunicatorInner {
8484

8585
/// Terminal shell PID for this MCP server instance
8686
/// Reported to extension during handshake for smart terminal selection
87+
#[expect(dead_code)] // Will be used in Phase 3 handshake protocol
8788
terminal_shell_pid: u32,
8889
}
8990

@@ -363,51 +364,51 @@ impl IPCCommunicatorInner {
363364
/// Ensures connection is established, connecting if necessary
364365
/// Idempotent - safe to call multiple times, only connects if not already connected
365366
async fn ensure_connection(this: Arc<Mutex<Self>>) -> Result<()> {
366-
let connected = {
367-
let inner = this.lock().await;
368-
inner.connected
369-
};
370-
371-
if connected {
367+
let mut inner = this.lock().await;
368+
if inner.connected {
372369
return Ok(()); // Already connected, nothing to do
373370
}
374371

375-
Self::attempt_connection_with_backoff(Arc::clone(&this)).await
372+
inner.attempt_connection_with_backoff(&this).await
376373
}
377374

378375
/// Clears dead connection state and attempts fresh reconnection
379376
/// Called by reader task as "parting gift" when connection dies
377+
#[expect(dead_code)] // Will be re-enabled when reader task reconnection is fixed
380378
async fn clear_connection_and_reconnect(this: Arc<Mutex<Self>>) -> Result<()> {
381379
info!("Clearing dead connection state and attempting reconnection");
382380

381+
let mut inner = this.lock().await;
382+
383383
// Clean up dead connection state
384-
{
385-
let mut inner = this.lock().await;
386-
inner.connected = false;
387-
inner.write_half = None;
388-
389-
// Clean up orphaned pending requests to fix memory leak
390-
let orphaned_count = inner.pending_requests.len();
391-
if orphaned_count > 0 {
392-
warn!("Cleaning up {} orphaned pending requests", orphaned_count);
393-
inner.pending_requests.clear();
394-
}
384+
inner.connected = false;
385+
inner.write_half = None;
386+
387+
// Clean up orphaned pending requests to fix memory leak
388+
let orphaned_count = inner.pending_requests.len();
389+
if orphaned_count > 0 {
390+
warn!("Cleaning up {} orphaned pending requests", orphaned_count);
391+
inner.pending_requests.clear();
395392
}
396393

397394
// Attempt fresh connection
398-
Self::attempt_connection_with_backoff(this).await
395+
inner.attempt_connection_with_backoff(&this).await
399396
}
400397

401398
/// Attempts connection with exponential backoff to handle extension restart timing
402-
async fn attempt_connection_with_backoff(this: Arc<Mutex<Self>>) -> Result<()> {
399+
///
400+
/// Runs while holding the lock to avoid races where multiple concurrent attempts
401+
/// try to re-establish the connection. This ensures only one connection attempt
402+
/// happens at a time, preventing duplicate reader tasks or connection state corruption.
403+
async fn attempt_connection_with_backoff(&mut self, this: &Arc<Mutex<Self>>) -> Result<()> {
404+
// Precondition: we should only be called when disconnected
405+
assert!(!self.connected, "attempt_connection_with_backoff called while already connected");
406+
assert!(self.write_half.is_none(), "attempt_connection_with_backoff called with existing write_half");
407+
403408
const MAX_RETRIES: u32 = 5;
404409
const BASE_DELAY_MS: u64 = 100;
405410

406-
let socket_path = {
407-
let inner = this.lock().await;
408-
format!("/tmp/dialectic-vscode-{}.sock", inner.vscode_pid)
409-
};
410-
411+
let socket_path = format!("/tmp/dialectic-vscode-{}.sock", self.vscode_pid);
411412
info!("Attempting connection to: {}", socket_path);
412413

413414
for attempt in 1..=MAX_RETRIES {
@@ -419,15 +420,12 @@ impl IPCCommunicatorInner {
419420
let (read_half, write_half) = stream.into_split();
420421
let write_half = Arc::new(Mutex::new(write_half));
421422

422-
// Update connection state
423-
{
424-
let mut inner = this.lock().await;
425-
inner.write_half = Some(Arc::clone(&write_half));
426-
inner.connected = true;
427-
}
423+
// Update connection state (we already hold the lock)
424+
self.write_half = Some(Arc::clone(&write_half));
425+
self.connected = true;
428426

429-
// Spawn new reader task with shared Arc
430-
let inner_clone = Arc::clone(&this);
427+
// Spawn new reader task with cloned Arc
428+
let inner_clone = Arc::clone(this);
431429
tokio::spawn(async move {
432430
IPCCommunicator::response_reader_task(read_half, inner_clone).await;
433431
});

0 commit comments

Comments
 (0)