Skip to content
This repository was archived by the owner on Sep 23, 2025. It is now read-only.

Commit 2635795

Browse files
committed
Implement PID-based IPC reconnection refactor
- Move PID discovery from connect() to constructor for one-time discovery - Store vscode_pid and terminal_shell_pid as persistent state in IPCCommunicatorInner - Rename reader_started to connected for clearer semantics - Add IPCCommunicatorInner::ensure_connection() with idempotent pattern - Add ensure_connection() calls to all MCP operations (present_review, get_selection) - Fix timeout memory leak by cleaning up pending_requests on timeout - Remove old discover_vscode_socket() and connect() methods - Update DialecticServer::new() to use async IPCCommunicator constructor Status: Basic refactor complete, compiles and runs TODO: Add reader task 'parting gift' reconnection (currently disabled due to cycle issues) TODO: Add integration tests for extension restart scenarios
1 parent 2932766 commit 2635795

File tree

2 files changed

+155
-73
lines changed

2 files changed

+155
-73
lines changed

server/src/ipc.rs

Lines changed: 154 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use serde_json;
77
use std::collections::HashMap;
88
use std::sync::Arc;
9+
use std::time::Duration;
910
use thiserror::Error;
1011
use tokio::io::{AsyncWriteExt, AsyncBufReadExt, BufReader};
1112
use tokio::net::UnixStream;
@@ -73,21 +74,53 @@ struct IPCCommunicatorInner {
7374
/// Enables concurrent request/response handling with proper correlation
7475
pending_requests: HashMap<String, oneshot::Sender<IPCResponse>>,
7576

76-
/// Flag to track if the response reader task is running
77-
/// Prevents multiple reader tasks from being spawned
78-
reader_started: bool,
77+
/// Flag to track if we have an active connection and reader task
78+
/// When true, ensure_connection() is a no-op
79+
connected: bool,
80+
81+
/// VSCode process PID discovered during initialization
82+
/// Used to construct socket path: /tmp/dialectic-vscode-{vscode_pid}.sock
83+
vscode_pid: u32,
84+
85+
/// Terminal shell PID for this MCP server instance
86+
/// Reported to extension during handshake for smart terminal selection
87+
terminal_shell_pid: u32,
7988
}
8089

8190
impl IPCCommunicator {
82-
pub fn new() -> Self {
83-
Self {
91+
pub async fn new() -> Result<Self> {
92+
// Perform PID discovery once during construction
93+
let current_pid = std::process::id();
94+
info!("Starting PID discovery from MCP server PID: {}", current_pid);
95+
96+
let (vscode_pid, terminal_shell_pid) = match pid_discovery::find_vscode_pid_from_mcp(current_pid).await {
97+
Ok(Some((vscode_pid, terminal_shell_pid))) => {
98+
info!("Discovered VSCode PID: {}, Terminal Shell PID: {}", vscode_pid, terminal_shell_pid);
99+
(vscode_pid, terminal_shell_pid)
100+
}
101+
Ok(None) => {
102+
let error_msg = "Could not find VSCode PID in process tree. \
103+
Ensure MCP server is running from a VSCode terminal.";
104+
error!("{}", error_msg);
105+
return Err(IPCError::PidDiscoveryFailed(error_msg.to_string()).into());
106+
}
107+
Err(e) => {
108+
let error_msg = format!("PID discovery failed: {}", e);
109+
error!("{}", error_msg);
110+
return Err(IPCError::PidDiscoveryFailed(error_msg).into());
111+
}
112+
};
113+
114+
Ok(Self {
84115
inner: Arc::new(Mutex::new(IPCCommunicatorInner {
85116
write_half: None,
86117
pending_requests: HashMap::new(),
87-
reader_started: false,
118+
connected: false,
119+
vscode_pid,
120+
terminal_shell_pid,
88121
})),
89122
test_mode: false,
90-
}
123+
})
91124
}
92125

93126
/// Creates a new IPCCommunicator in test mode
@@ -97,7 +130,9 @@ impl IPCCommunicator {
97130
inner: Arc::new(Mutex::new(IPCCommunicatorInner {
98131
write_half: None,
99132
pending_requests: HashMap::new(),
100-
reader_started: false,
133+
connected: false,
134+
vscode_pid: 0, // Dummy PID for test mode
135+
terminal_shell_pid: 0, // Dummy PID for test mode
101136
})),
102137
test_mode: true,
103138
}
@@ -109,69 +144,13 @@ impl IPCCommunicator {
109144
return Ok(());
110145
}
111146

112-
// Use PID discovery to find VSCode and construct socket path
113-
let socket_path = self.discover_vscode_socket().await?;
114-
115-
info!("Connecting to VSCode extension at: {}", socket_path);
116-
117-
// Create cross-platform connection
118-
self.connect(&socket_path).await?;
147+
// Use ensure_connection for initial connection
148+
IPCCommunicatorInner::ensure_connection(Arc::clone(&self.inner)).await?;
119149

120150
info!("Connected to VSCode extension via IPC");
121151
Ok(())
122152
}
123153

124-
/// Discover VSCode socket path using PID discovery
125-
async fn discover_vscode_socket(&self) -> Result<String> {
126-
let current_pid = std::process::id();
127-
info!("Starting PID discovery from MCP server PID: {}", current_pid);
128-
129-
match pid_discovery::find_vscode_pid_from_mcp(current_pid).await {
130-
Ok(Some((vscode_pid, _terminal_shell_pid))) => {
131-
let socket_path = format!("/tmp/dialectic-vscode-{}.sock", vscode_pid);
132-
info!("Discovered VSCode PID: {}, socket path: {}", vscode_pid, socket_path);
133-
Ok(socket_path)
134-
}
135-
Ok(None) => {
136-
let error_msg = "Could not find VSCode PID in process tree. \
137-
Ensure MCP server is running from a VSCode terminal.";
138-
error!("{}", error_msg);
139-
Err(IPCError::PidDiscoveryFailed(error_msg.to_string()))
140-
}
141-
Err(e) => {
142-
let error_msg = format!("PID discovery failed: {}", e);
143-
error!("{}", error_msg);
144-
Err(IPCError::PidDiscoveryFailed(error_msg))
145-
}
146-
}
147-
}
148-
149-
async fn connect(&mut self, socket_path: &str) -> Result<()> {
150-
let stream = UnixStream::connect(socket_path).await
151-
.map_err(|e| IPCError::ConnectionFailed {
152-
path: socket_path.to_string(),
153-
source: e
154-
})?;
155-
156-
// Split the stream into read and write halves to avoid deadlock
157-
let (read_half, write_half) = stream.into_split();
158-
let write_half = Arc::new(Mutex::new(write_half));
159-
160-
let mut inner = self.inner.lock().await;
161-
inner.write_half = Some(Arc::clone(&write_half));
162-
163-
// Start the response reader task if not already started
164-
if !inner.reader_started {
165-
inner.reader_started = true;
166-
let inner_clone = Arc::clone(&self.inner);
167-
tokio::spawn(async move {
168-
Self::response_reader_task(read_half, inner_clone).await;
169-
});
170-
}
171-
172-
Ok(())
173-
}
174-
175154
pub async fn present_review(&self, params: PresentReviewParams) -> Result<PresentReviewResult> {
176155
if self.test_mode {
177156
info!("Present review called (test mode): {:?}", params);
@@ -181,6 +160,9 @@ impl IPCCommunicator {
181160
});
182161
}
183162

163+
// Ensure connection is established before proceeding
164+
IPCCommunicatorInner::ensure_connection(Arc::clone(&self.inner)).await?;
165+
184166
let message = IPCMessage {
185167
message_type: IPCMessageType::PresentReview,
186168
payload: serde_json::to_value(params)?,
@@ -224,6 +206,9 @@ impl IPCCommunicator {
224206
});
225207
}
226208

209+
// Ensure connection is established before proceeding
210+
IPCCommunicatorInner::ensure_connection(Arc::clone(&self.inner)).await?;
211+
227212
let message = IPCMessage {
228213
message_type: IPCMessageType::GetSelection,
229214
payload: serde_json::json!({}),
@@ -322,6 +307,13 @@ impl IPCCommunicator {
322307
rx
323308
).await
324309
.map_err(|_| {
310+
// Clean up the leaked entry on timeout to fix memory leak
311+
let inner_clone = Arc::clone(&self.inner);
312+
let message_id = message.id.clone();
313+
tokio::spawn(async move {
314+
let mut inner = inner_clone.lock().await;
315+
inner.pending_requests.remove(&message_id);
316+
});
325317
error!("Timeout waiting for response to message ID: {}", message.id);
326318
IPCError::Timeout
327319
})?
@@ -365,13 +357,103 @@ impl IPCCommunicator {
365357
Err(IPCError::NotConnected)
366358
}
367359
}
360+
}
368361

369-
/// Low-level primitive for writing raw JSON data to the IPC connection (Windows)
370-
///
371-
/// This is the underlying method used by both `send_message_with_reply` and
372-
/// `send_message_without_reply`. It handles the platform-specific pipe writing
373-
/// Background task that reads responses from the IPC connection
374-
/// Runs continuously to handle incoming messages from VSCode extension
362+
impl IPCCommunicatorInner {
363+
/// Ensures connection is established, connecting if necessary
364+
/// Idempotent - safe to call multiple times, only connects if not already connected
365+
async fn ensure_connection(this: Arc<Mutex<Self>>) -> Result<()> {
366+
let connected = {
367+
let inner = this.lock().await;
368+
inner.connected
369+
};
370+
371+
if connected {
372+
return Ok(()); // Already connected, nothing to do
373+
}
374+
375+
Self::attempt_connection_with_backoff(Arc::clone(&this)).await
376+
}
377+
378+
/// Clears dead connection state and attempts fresh reconnection
379+
/// Called by reader task as "parting gift" when connection dies
380+
async fn clear_connection_and_reconnect(this: Arc<Mutex<Self>>) -> Result<()> {
381+
info!("Clearing dead connection state and attempting reconnection");
382+
383+
// Clean up dead connection state
384+
{
385+
let mut inner = this.lock().await;
386+
inner.connected = false;
387+
inner.write_half = None;
388+
389+
// Clean up orphaned pending requests to fix memory leak
390+
let orphaned_count = inner.pending_requests.len();
391+
if orphaned_count > 0 {
392+
warn!("Cleaning up {} orphaned pending requests", orphaned_count);
393+
inner.pending_requests.clear();
394+
}
395+
}
396+
397+
// Attempt fresh connection
398+
Self::attempt_connection_with_backoff(this).await
399+
}
400+
401+
/// Attempts connection with exponential backoff to handle extension restart timing
402+
async fn attempt_connection_with_backoff(this: Arc<Mutex<Self>>) -> Result<()> {
403+
const MAX_RETRIES: u32 = 5;
404+
const BASE_DELAY_MS: u64 = 100;
405+
406+
let socket_path = {
407+
let inner = this.lock().await;
408+
format!("/tmp/dialectic-vscode-{}.sock", inner.vscode_pid)
409+
};
410+
411+
info!("Attempting connection to: {}", socket_path);
412+
413+
for attempt in 1..=MAX_RETRIES {
414+
match UnixStream::connect(&socket_path).await {
415+
Ok(stream) => {
416+
info!("Successfully connected on attempt {}", attempt);
417+
418+
// Split the stream into read and write halves
419+
let (read_half, write_half) = stream.into_split();
420+
let write_half = Arc::new(Mutex::new(write_half));
421+
422+
// Update connection state
423+
{
424+
let mut inner = this.lock().await;
425+
inner.write_half = Some(Arc::clone(&write_half));
426+
inner.connected = true;
427+
}
428+
429+
// Spawn new reader task with shared Arc
430+
let inner_clone = Arc::clone(&this);
431+
tokio::spawn(async move {
432+
IPCCommunicator::response_reader_task(read_half, inner_clone).await;
433+
});
434+
435+
return Ok(());
436+
}
437+
Err(e) if attempt < MAX_RETRIES => {
438+
let delay = Duration::from_millis(BASE_DELAY_MS * 2_u64.pow(attempt - 1));
439+
warn!("Connection attempt {} failed: {}. Retrying in {:?}", attempt, e, delay);
440+
tokio::time::sleep(delay).await;
441+
}
442+
Err(e) => {
443+
error!("All connection attempts failed. Last error: {}", e);
444+
return Err(IPCError::ConnectionFailed {
445+
path: socket_path,
446+
source: e
447+
}.into());
448+
}
449+
}
450+
}
451+
452+
unreachable!("Loop should always return or error")
453+
}
454+
}
455+
456+
impl IPCCommunicator {
375457
async fn response_reader_task(
376458
mut read_half: tokio::net::unix::OwnedReadHalf,
377459
inner: Arc<Mutex<IPCCommunicatorInner>>,

server/src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub struct DialecticServer {
3131
#[tool_router]
3232
impl DialecticServer {
3333
pub async fn new() -> Result<Self> {
34-
let mut ipc = IPCCommunicator::new();
34+
let mut ipc = IPCCommunicator::new().await?;
3535

3636
// Initialize IPC connection to VSCode extension
3737
ipc.initialize().await?;

0 commit comments

Comments
 (0)