55
66use anyhow:: Result ;
77use std:: collections:: HashMap ;
8- use std:: os:: unix:: net:: UnixListener ;
9- use std:: path:: Path ;
10- use tokio:: time:: { interval, Duration } ;
118use tracing:: { error, info} ;
129
13- /// Run the message bus daemon for multi-window support
14- pub async fn run_daemon ( vscode_pid : u32 ) -> Result < ( ) > {
15- run_daemon_with_prefix ( vscode_pid, "dialectic-daemon" , None ) . await
10+ /// Spawn the daemon as a separate detached process
11+ pub async fn spawn_daemon_process ( vscode_pid : u32 ) -> Result < ( ) > {
12+ use std:: process:: Command ;
13+
14+ let socket_path = format ! ( "/tmp/dialectic-daemon-{}.sock" , vscode_pid) ;
15+
16+ // Check if daemon is already running by trying to connect
17+ if tokio:: net:: UnixStream :: connect ( & socket_path) . await . is_ok ( ) {
18+ info ! (
19+ "Message bus daemon already running for VSCode PID {}" ,
20+ vscode_pid
21+ ) ;
22+ return Ok ( ( ) ) ;
23+ }
24+
25+ info ! (
26+ "Starting message bus daemon as separate process for VSCode PID {}" ,
27+ vscode_pid
28+ ) ;
29+
30+ // Get the current executable path to spawn daemon
31+ let current_exe = std:: env:: current_exe ( )
32+ . map_err ( |e| anyhow:: anyhow!( "Failed to get current executable path: {}" , e) ) ?;
33+
34+ // Spawn daemon as separate detached process
35+ let mut cmd = Command :: new ( & current_exe) ;
36+ cmd. args ( & [ "daemon" , & vscode_pid. to_string ( ) ] ) ;
37+ cmd. stdout ( std:: process:: Stdio :: piped ( ) ) ; // Capture stdout to read readiness message
38+ cmd. stderr ( std:: process:: Stdio :: null ( ) ) ; // Suppress stderr to avoid noise
39+
40+ // Detach from parent process (Unix-specific)
41+ #[ cfg( unix) ]
42+ {
43+ use std:: os:: unix:: process:: CommandExt ;
44+ cmd. process_group ( 0 ) ; // Create new process group
45+ }
46+
47+ let mut child = cmd
48+ . spawn ( )
49+ . map_err ( |e| anyhow:: anyhow!( "Failed to spawn daemon process: {}" , e) ) ?;
50+
51+ info ! ( "Spawned daemon process with PID: {}" , child. id( ) ) ;
52+
53+ // Read stdout until we get the "OK" message indicating daemon is ready
54+ if let Some ( stdout) = child. stdout . take ( ) {
55+ use std:: io:: { BufRead , BufReader } ;
56+ use std:: time:: Duration ;
57+
58+ let reader = BufReader :: new ( stdout) ;
59+
60+ // Use a timeout as a safety net, but rely primarily on the OK message
61+ let timeout_result = tokio:: time:: timeout ( Duration :: from_secs ( 10 ) , async {
62+ // We need to use blocking I/O here since we're reading from a process
63+ tokio:: task:: spawn_blocking ( move || {
64+ for line in reader. lines ( ) {
65+ match line {
66+ Ok ( line) => {
67+ if line. trim ( ) == "DAEMON_READY" {
68+ return Ok ( ( ) ) ;
69+ }
70+ }
71+ Err ( e) => {
72+ return Err ( anyhow:: anyhow!( "Error reading daemon stdout: {}" , e) ) ;
73+ }
74+ }
75+ }
76+ Err ( anyhow:: anyhow!(
77+ "Daemon process ended without sending DAEMON_READY message"
78+ ) )
79+ } )
80+ . await ?
81+ } )
82+ . await ;
83+
84+ match timeout_result {
85+ Ok ( Ok ( ( ) ) ) => {
86+ info ! ( "Message bus daemon confirmed ready" ) ;
87+ Ok ( ( ) )
88+ }
89+ Ok ( Err ( e) ) => Err ( e) ,
90+ Err ( _) => {
91+ anyhow:: bail!( "Timeout waiting for daemon readiness confirmation (10 seconds)" ) ;
92+ }
93+ }
94+ } else {
95+ anyhow:: bail!( "Failed to capture daemon stdout for readiness confirmation" ) ;
96+ }
1697}
1798
1899/// Run the message bus daemon with custom socket path prefix
19100/// If ready_barrier is provided, it will be signaled when the daemon is ready to accept connections
20101pub async fn run_daemon_with_prefix (
21- vscode_pid : u32 ,
102+ vscode_pid : u32 ,
22103 socket_prefix : & str ,
23- ready_barrier : Option < std:: sync:: Arc < tokio:: sync:: Barrier > >
104+ ready_barrier : Option < std:: sync:: Arc < tokio:: sync:: Barrier > > ,
24105) -> Result < ( ) > {
25106 use std:: os:: unix:: net:: UnixListener ;
26107 use std:: path:: Path ;
27-
108+
28109 let socket_path = format ! ( "/tmp/{}-{}.sock" , socket_prefix, vscode_pid) ;
29110 info ! ( "Attempting to claim socket: {}" , socket_path) ;
30111
@@ -37,21 +118,30 @@ pub async fn run_daemon_with_prefix(
37118 Err ( e) => {
38119 if e. kind ( ) == std:: io:: ErrorKind :: AddrInUse {
39120 error ! ( "❌ Failed to claim socket {}: {}" , socket_path, e) ;
40- error ! ( "Another daemon is already running for VSCode PID {}" , vscode_pid) ;
121+ error ! (
122+ "Another daemon is already running for VSCode PID {}" ,
123+ vscode_pid
124+ ) ;
41125 } else {
42126 error ! ( "❌ Failed to claim socket {}: {}" , socket_path, e) ;
43127 }
44128 return Err ( e. into ( ) ) ;
45129 }
46130 } ;
47131
48- info ! ( "🚀 Message bus daemon started for VSCode PID {}" , vscode_pid) ;
132+ info ! (
133+ "🚀 Message bus daemon started for VSCode PID {}" ,
134+ vscode_pid
135+ ) ;
49136 info ! ( "📡 Listening on socket: {}" , socket_path) ;
50137
51138 // Convert std::os::unix::net::UnixListener to tokio::net::UnixListener
52139 _listener. set_nonblocking ( true ) ?;
53140 let listener = tokio:: net:: UnixListener :: from_std ( _listener) ?;
54141
142+ // Signal that daemon is ready to accept connections
143+ println ! ( "DAEMON_READY" ) ;
144+
55145 // Run the message bus loop
56146 run_message_bus ( listener, vscode_pid, ready_barrier) . await ?;
57147
@@ -67,23 +157,23 @@ pub async fn run_daemon_with_prefix(
67157
68158/// Run the message bus loop - accept connections, broadcast messages, monitor VSCode
69159pub async fn run_message_bus (
70- listener : tokio:: net:: UnixListener ,
160+ listener : tokio:: net:: UnixListener ,
71161 vscode_pid : u32 ,
72- ready_barrier : Option < std:: sync:: Arc < tokio:: sync:: Barrier > >
162+ ready_barrier : Option < std:: sync:: Arc < tokio:: sync:: Barrier > > ,
73163) -> Result < ( ) > {
74164 use tokio:: sync:: broadcast;
75165 use tokio:: time:: { interval, Duration } ;
76166
77167 info ! ( "Starting message bus loop" ) ;
78-
168+
79169 // Signal that daemon is ready to accept connections
80170 if let Some ( barrier) = ready_barrier {
81171 barrier. wait ( ) . await ;
82172 }
83-
173+
84174 // Broadcast channel for distributing messages to all clients
85175 let ( tx, _rx) = broadcast:: channel :: < String > ( 1000 ) ;
86-
176+
87177 // Track connected clients
88178 let mut clients: HashMap < usize , tokio:: task:: JoinHandle < ( ) > > = HashMap :: new ( ) ;
89179 let mut next_client_id = 0 ;
@@ -99,9 +189,9 @@ pub async fn run_message_bus(
99189 Ok ( ( stream, _addr) ) => {
100190 let client_id = next_client_id;
101191 next_client_id += 1 ;
102-
192+
103193 info!( "Client {} connected" , client_id) ;
104-
194+
105195 // Spawn task to handle this client
106196 let tx_clone = tx. clone( ) ;
107197 let rx = tx. subscribe( ) ;
@@ -113,7 +203,7 @@ pub async fn run_message_bus(
113203 }
114204 }
115205 }
116-
206+
117207 // Check if VSCode process is still alive
118208 _ = vscode_check_interval. tick( ) => {
119209 match nix:: sys:: signal:: kill( nix:: unistd:: Pid :: from_raw( vscode_pid as i32 ) , None ) {
@@ -129,7 +219,7 @@ pub async fn run_message_bus(
129219 }
130220 }
131221 }
132-
222+
133223 // Clean up finished client tasks
134224 _ = tokio:: time:: sleep( Duration :: from_secs( 1 ) ) => {
135225 clients. retain( |& client_id, handle| {
@@ -145,7 +235,10 @@ pub async fn run_message_bus(
145235 }
146236
147237 // Shutdown: wait for all client tasks to finish
148- info ! ( "Shutting down message bus, waiting for {} clients" , clients. len( ) ) ;
238+ info ! (
239+ "Shutting down message bus, waiting for {} clients" ,
240+ clients. len( )
241+ ) ;
149242 for ( client_id, handle) in clients {
150243 handle. abort ( ) ;
151244 info ! ( "Disconnected client {}" , client_id) ;
@@ -162,7 +255,7 @@ pub async fn handle_client(
162255 mut rx : tokio:: sync:: broadcast:: Receiver < String > ,
163256) {
164257 use tokio:: io:: { AsyncBufReadExt , AsyncWriteExt , BufReader } ;
165-
258+
166259 let ( reader, mut writer) = stream. split ( ) ;
167260 let mut reader = BufReader :: new ( reader) ;
168261 let mut line = String :: new ( ) ;
@@ -181,7 +274,7 @@ pub async fn handle_client(
181274 let message = line. trim( ) . to_string( ) ;
182275 if !message. is_empty( ) {
183276 info!( "Client {} sent: {}" , client_id, message) ;
184-
277+
185278 // Broadcast message to all other clients
186279 if let Err ( e) = tx. send( message) {
187280 error!( "Failed to broadcast message from client {}: {}" , client_id, e) ;
@@ -195,7 +288,7 @@ pub async fn handle_client(
195288 }
196289 }
197290 }
198-
291+
199292 // Receive broadcasts from other clients
200293 result = rx. recv( ) => {
201294 match result {
@@ -223,6 +316,6 @@ pub async fn handle_client(
223316 }
224317 }
225318 }
226-
319+
227320 info ! ( "Client {} handler finished" , client_id) ;
228321}
0 commit comments