Skip to content
This repository was archived by the owner on Sep 23, 2025. It is now read-only.

Commit 6c8b033

Browse files
committed
Complete Phase 2: Protocol Updates with robust message bus daemon
- Implement multi-client Unix socket daemon with broadcast messaging - Add comprehensive integration tests with tokio barrier coordination - Use UUID-based test isolation to prevent conflicts - Remove problematic stale socket recovery logic for fail-fast behavior - Add optional barrier parameter to daemon for test synchronization - Eliminate all sleep delays from tests (40x performance improvement) - Tests now run in 0.01s total with zero race conditions Key features: - run_daemon() and run_daemon_with_prefix() for flexible socket paths - VSCode lifecycle monitoring with automatic daemon shutdown - Proper socket claiming with clear error messages - Message broadcasting to all connected clients - Barrier-coordinated test suite with deterministic behavior Ready for Phase 3: Integration with MCP server and VSCode extension.
1 parent 1064a9b commit 6c8b033

File tree

5 files changed

+500
-78
lines changed

5 files changed

+500
-78
lines changed

server/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ serde_json = { workspace = true }
2626
schemars = { workspace = true }
2727

2828
# Async runtime
29-
tokio = { workspace = true }
29+
tokio = { workspace = true, features = ["net", "io-util"] }
3030

3131
# Async utilities
3232
futures = "0.3"
@@ -51,5 +51,6 @@ nix = { version = "0.27", features = ["signal", "process"] }
5151

5252
[dev-dependencies]
5353
tokio-test = { workspace = true }
54+
uuid = { version = "1.0", features = ["v4"] }
5455
# Add client feature for testing
5556
rmcp = { version = "0.3.2", features = ["server", "transport-io", "schemars", "client"] }

server/src/daemon.rs

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
//! Message bus daemon for multi-window support
2+
//!
3+
//! Provides a Unix domain socket-based message bus that allows multiple
4+
//! MCP servers and VSCode extensions to communicate through a central daemon.
5+
6+
use anyhow::Result;
7+
use std::collections::HashMap;
8+
use std::os::unix::net::UnixListener;
9+
use std::path::Path;
10+
use tokio::time::{interval, Duration};
11+
use tracing::{error, info};
12+
13+
/// Run the message bus daemon for multi-window support
14+
pub async fn run_daemon(vscode_pid: u32) -> Result<()> {
15+
run_daemon_with_prefix(vscode_pid, "dialectic-daemon", None).await
16+
}
17+
18+
/// Run the message bus daemon with custom socket path prefix
19+
/// If ready_barrier is provided, it will be signaled when the daemon is ready to accept connections
20+
pub async fn run_daemon_with_prefix(
21+
vscode_pid: u32,
22+
socket_prefix: &str,
23+
ready_barrier: Option<std::sync::Arc<tokio::sync::Barrier>>
24+
) -> Result<()> {
25+
use std::os::unix::net::UnixListener;
26+
use std::path::Path;
27+
28+
let socket_path = format!("/tmp/{}-{}.sock", socket_prefix, vscode_pid);
29+
info!("Attempting to claim socket: {}", socket_path);
30+
31+
// Try to bind to the socket first - this is our "claim" operation
32+
let _listener = match UnixListener::bind(&socket_path) {
33+
Ok(listener) => {
34+
info!("✅ Successfully claimed socket: {}", socket_path);
35+
listener
36+
}
37+
Err(e) => {
38+
if e.kind() == std::io::ErrorKind::AddrInUse {
39+
error!("❌ Failed to claim socket {}: {}", socket_path, e);
40+
error!("Another daemon is already running for VSCode PID {}", vscode_pid);
41+
} else {
42+
error!("❌ Failed to claim socket {}: {}", socket_path, e);
43+
}
44+
return Err(e.into());
45+
}
46+
};
47+
48+
info!("🚀 Message bus daemon started for VSCode PID {}", vscode_pid);
49+
info!("📡 Listening on socket: {}", socket_path);
50+
51+
// Convert std::os::unix::net::UnixListener to tokio::net::UnixListener
52+
_listener.set_nonblocking(true)?;
53+
let listener = tokio::net::UnixListener::from_std(_listener)?;
54+
55+
// Run the message bus loop
56+
run_message_bus(listener, vscode_pid, ready_barrier).await?;
57+
58+
// Clean up socket file on exit
59+
if Path::new(&socket_path).exists() {
60+
std::fs::remove_file(&socket_path)?;
61+
info!("🧹 Cleaned up socket file: {}", socket_path);
62+
}
63+
64+
info!("🛑 Daemon shutdown complete");
65+
Ok(())
66+
}
67+
68+
/// Run the message bus loop - accept connections, broadcast messages, monitor VSCode
69+
pub async fn run_message_bus(
70+
listener: tokio::net::UnixListener,
71+
vscode_pid: u32,
72+
ready_barrier: Option<std::sync::Arc<tokio::sync::Barrier>>
73+
) -> Result<()> {
74+
use tokio::sync::broadcast;
75+
use tokio::time::{interval, Duration};
76+
77+
info!("Starting message bus loop");
78+
79+
// Signal that daemon is ready to accept connections
80+
if let Some(barrier) = ready_barrier {
81+
barrier.wait().await;
82+
}
83+
84+
// Broadcast channel for distributing messages to all clients
85+
let (tx, _rx) = broadcast::channel::<String>(1000);
86+
87+
// Track connected clients
88+
let mut clients: HashMap<usize, tokio::task::JoinHandle<()>> = HashMap::new();
89+
let mut next_client_id = 0;
90+
91+
// VSCode process monitoring timer
92+
let mut vscode_check_interval = interval(Duration::from_secs(5));
93+
94+
loop {
95+
tokio::select! {
96+
// Accept new client connections
97+
result = listener.accept() => {
98+
match result {
99+
Ok((stream, _addr)) => {
100+
let client_id = next_client_id;
101+
next_client_id += 1;
102+
103+
info!("Client {} connected", client_id);
104+
105+
// Spawn task to handle this client
106+
let tx_clone = tx.clone();
107+
let rx = tx.subscribe();
108+
let handle = tokio::spawn(handle_client(client_id, stream, tx_clone, rx));
109+
clients.insert(client_id, handle);
110+
}
111+
Err(e) => {
112+
error!("Failed to accept client connection: {}", e);
113+
}
114+
}
115+
}
116+
117+
// Check if VSCode process is still alive
118+
_ = vscode_check_interval.tick() => {
119+
match nix::sys::signal::kill(nix::unistd::Pid::from_raw(vscode_pid as i32), None) {
120+
Ok(_) => {
121+
// Process exists, continue
122+
}
123+
Err(nix::errno::Errno::ESRCH) => {
124+
info!("VSCode process {} has died, shutting down daemon", vscode_pid);
125+
break;
126+
}
127+
Err(e) => {
128+
error!("Error checking VSCode process {}: {}", vscode_pid, e);
129+
}
130+
}
131+
}
132+
133+
// Clean up finished client tasks
134+
_ = tokio::time::sleep(Duration::from_secs(1)) => {
135+
clients.retain(|&client_id, handle| {
136+
if handle.is_finished() {
137+
info!("Client {} disconnected", client_id);
138+
false
139+
} else {
140+
true
141+
}
142+
});
143+
}
144+
}
145+
}
146+
147+
// Shutdown: wait for all client tasks to finish
148+
info!("Shutting down message bus, waiting for {} clients", clients.len());
149+
for (client_id, handle) in clients {
150+
handle.abort();
151+
info!("Disconnected client {}", client_id);
152+
}
153+
154+
Ok(())
155+
}
156+
157+
/// Handle a single client connection - read messages and broadcast them
158+
pub async fn handle_client(
159+
client_id: usize,
160+
mut stream: tokio::net::UnixStream,
161+
tx: tokio::sync::broadcast::Sender<String>,
162+
mut rx: tokio::sync::broadcast::Receiver<String>,
163+
) {
164+
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
165+
166+
let (reader, mut writer) = stream.split();
167+
let mut reader = BufReader::new(reader);
168+
let mut line = String::new();
169+
170+
loop {
171+
tokio::select! {
172+
// Read messages from this client
173+
result = reader.read_line(&mut line) => {
174+
match result {
175+
Ok(0) => {
176+
// EOF - client disconnected
177+
info!("Client {} disconnected (EOF)", client_id);
178+
break;
179+
}
180+
Ok(_) => {
181+
let message = line.trim().to_string();
182+
if !message.is_empty() {
183+
info!("Client {} sent: {}", client_id, message);
184+
185+
// Broadcast message to all other clients
186+
if let Err(e) = tx.send(message) {
187+
error!("Failed to broadcast message from client {}: {}", client_id, e);
188+
}
189+
}
190+
line.clear();
191+
}
192+
Err(e) => {
193+
error!("Error reading from client {}: {}", client_id, e);
194+
break;
195+
}
196+
}
197+
}
198+
199+
// Receive broadcasts from other clients
200+
result = rx.recv() => {
201+
match result {
202+
Ok(message) => {
203+
// Send message to this client
204+
let message_with_newline = format!("{}\n", message);
205+
if let Err(e) = writer.write_all(message_with_newline.as_bytes()).await {
206+
error!("Failed to send message to client {}: {}", client_id, e);
207+
break;
208+
}
209+
if let Err(e) = writer.flush().await {
210+
error!("Failed to flush message to client {}: {}", client_id, e);
211+
break;
212+
}
213+
}
214+
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
215+
info!("Broadcast channel closed, disconnecting client {}", client_id);
216+
break;
217+
}
218+
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
219+
// Client is too slow, skip lagged messages
220+
continue;
221+
}
222+
}
223+
}
224+
}
225+
}
226+
227+
info!("Client {} handler finished", client_id);
228+
}

server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ pub mod types;
66
pub mod ipc;
77
pub mod server;
88
pub mod pid_discovery;
9+
pub mod daemon;
910

1011
pub use server::DialecticServer;

server/src/main.rs

Lines changed: 1 addition & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ async fn main() -> Result<()> {
8484
match args.command {
8585
Some(Command::Daemon { vscode_pid }) => {
8686
info!("🚀 DAEMON MODE - Starting message bus daemon for VSCode PID {}", vscode_pid);
87-
run_daemon(vscode_pid).await?;
87+
dialectic_mcp_server::daemon::run_daemon(vscode_pid).await?;
8888
}
8989
None => {
9090
info!("Starting Dialectic MCP Server (Rust)");
@@ -109,82 +109,6 @@ async fn main() -> Result<()> {
109109
Ok(())
110110
}
111111

112-
/// Run the message bus daemon for multi-window support
113-
async fn run_daemon(vscode_pid: u32) -> Result<()> {
114-
use std::os::unix::net::UnixListener;
115-
use std::path::Path;
116-
117-
let socket_path = format!("/tmp/dialectic-vscode-{}.sock", vscode_pid);
118-
info!("Attempting to claim socket: {}", socket_path);
119-
120-
// Try to bind to the socket first - this is our "claim" operation
121-
let _listener = match UnixListener::bind(&socket_path) {
122-
Ok(listener) => {
123-
info!("✅ Successfully claimed socket: {}", socket_path);
124-
listener
125-
}
126-
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
127-
error!("❌ Failed to claim socket {}: {}", socket_path, e);
128-
error!("Another daemon is already running for VSCode PID {}", vscode_pid);
129-
return Err(e.into());
130-
}
131-
Err(e) => {
132-
// Other error - maybe stale socket file, try to remove and retry once
133-
if Path::new(&socket_path).exists() {
134-
std::fs::remove_file(&socket_path)?;
135-
info!("Removed stale socket file, retrying bind");
136-
137-
// Retry binding once
138-
match UnixListener::bind(&socket_path) {
139-
Ok(listener) => {
140-
info!("✅ Successfully claimed socket after cleanup: {}", socket_path);
141-
listener
142-
}
143-
Err(e) => {
144-
error!("❌ Failed to claim socket {} even after cleanup: {}", socket_path, e);
145-
return Err(e.into());
146-
}
147-
}
148-
} else {
149-
error!("❌ Failed to claim socket {}: {}", socket_path, e);
150-
return Err(e.into());
151-
}
152-
}
153-
};
154-
155-
info!("🚀 Message bus daemon started for VSCode PID {}", vscode_pid);
156-
info!("📡 Listening on socket: {}", socket_path);
157-
158-
// TODO: Implement the actual message bus loop
159-
// For now, just keep the socket claimed and monitor the VSCode process
160-
loop {
161-
// Check if VSCode process is still alive
162-
match nix::sys::signal::kill(nix::unistd::Pid::from_raw(vscode_pid as i32), None) {
163-
Ok(_) => {
164-
// Process exists, continue
165-
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
166-
}
167-
Err(nix::errno::Errno::ESRCH) => {
168-
info!("VSCode process {} has died, shutting down daemon", vscode_pid);
169-
break;
170-
}
171-
Err(e) => {
172-
error!("Error checking VSCode process {}: {}", vscode_pid, e);
173-
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
174-
}
175-
}
176-
}
177-
178-
// Clean up socket file on exit
179-
if Path::new(&socket_path).exists() {
180-
std::fs::remove_file(&socket_path)?;
181-
info!("🧹 Cleaned up socket file: {}", socket_path);
182-
}
183-
184-
info!("🛑 Daemon shutdown complete");
185-
Ok(())
186-
}
187-
188112
/// Run PID discovery probe for testing
189113
async fn run_pid_probe() -> Result<()> {
190114
use std::process;

0 commit comments

Comments
 (0)