@@ -162,7 +162,7 @@ pub async fn run_message_bus(
162162 ready_barrier : Option < std:: sync:: Arc < tokio:: sync:: Barrier > > ,
163163) -> Result < ( ) > {
164164 use tokio:: sync:: broadcast;
165- use tokio:: time:: { interval , Duration } ;
165+ use tokio:: time:: { Duration , interval } ;
166166
167167 info ! ( "daemon: starting message bus loop" ) ;
168168
@@ -319,3 +319,313 @@ pub async fn handle_client(
319319
320320 info ! ( "Client {} handler finished" , client_id) ;
321321}
322+
323+ #[ cfg( test) ]
324+ mod test {
325+ use super :: run_daemon_with_prefix;
326+
327+ #[ tokio:: test]
328+ async fn test_daemon_message_broadcasting ( ) {
329+ use std:: sync:: Arc ;
330+ use tokio:: io:: { AsyncBufReadExt , AsyncWriteExt , BufReader } ;
331+ use tokio:: net:: UnixStream ;
332+ use tokio:: sync:: Barrier ;
333+ use tokio:: time:: { Duration , timeout} ;
334+ use uuid:: Uuid ;
335+
336+ // Initialize tracing for test output
337+ let _ = tracing_subscriber:: fmt:: try_init ( ) ;
338+
339+ // Use current process PID so daemon won't exit due to "VSCode died"
340+ let test_pid = std:: process:: id ( ) ;
341+ // Use UUID to ensure unique socket path per test run
342+ let test_id = Uuid :: new_v4 ( ) ;
343+ let socket_prefix = format ! ( "dialectic-test-{}" , test_id) ;
344+ let socket_path = format ! ( "/tmp/{}-{}.sock" , socket_prefix, test_pid) ;
345+
346+ // Clean up any existing socket
347+ let _ = std:: fs:: remove_file ( & socket_path) ;
348+
349+ // Barrier for coordinating when daemon is ready (2 participants: daemon + test)
350+ let ready_barrier = Arc :: new ( Barrier :: new ( 2 ) ) ;
351+ // Barrier for coordinating when both clients are connected and ready (2 participants: clients)
352+ let client_barrier = Arc :: new ( Barrier :: new ( 2 ) ) ;
353+
354+ // Start the full daemon with unique prefix and ready barrier
355+ let ready_barrier_clone = ready_barrier. clone ( ) ;
356+ let daemon_handle = tokio:: spawn ( async move {
357+ run_daemon_with_prefix ( test_pid, & socket_prefix, Some ( ready_barrier_clone) ) . await
358+ } ) ;
359+
360+ // Wait for daemon to be ready
361+ ready_barrier. wait ( ) . await ;
362+
363+ // Verify socket was created
364+ assert ! (
365+ std:: path:: Path :: new( & socket_path) . exists( ) ,
366+ "Daemon should create socket file"
367+ ) ;
368+
369+ // Test: Connect two clients and verify message broadcasting
370+ let socket_path_1 = socket_path. clone ( ) ;
371+ let barrier_1 = client_barrier. clone ( ) ;
372+ let client1_handle = tokio:: spawn ( async move {
373+ let mut stream = UnixStream :: connect ( & socket_path_1)
374+ . await
375+ . expect ( "Client 1 failed to connect" ) ;
376+
377+ // Wait at barrier until both clients are connected
378+ barrier_1. wait ( ) . await ;
379+
380+ // Client 1 sends first, then waits for response
381+ stream
382+ . write_all ( b"Hello from client 1\n " )
383+ . await
384+ . expect ( "Failed to send message" ) ;
385+ stream. flush ( ) . await . expect ( "Failed to flush" ) ;
386+
387+ // Read response from client 2
388+ let mut reader = BufReader :: new ( & mut stream) ;
389+ let mut response = String :: new ( ) ;
390+
391+ match timeout ( Duration :: from_secs ( 2 ) , reader. read_line ( & mut response) ) . await {
392+ Ok ( Ok ( _) ) => response. trim ( ) . to_string ( ) ,
393+ _ => "NO_RESPONSE" . to_string ( ) ,
394+ }
395+ } ) ;
396+
397+ let socket_path_2 = socket_path. clone ( ) ;
398+ let barrier_2 = client_barrier. clone ( ) ;
399+ let client2_handle = tokio:: spawn ( async move {
400+ let mut stream = UnixStream :: connect ( & socket_path_2)
401+ . await
402+ . expect ( "Client 2 failed to connect" ) ;
403+
404+ // Wait at barrier until both clients are connected
405+ barrier_2. wait ( ) . await ;
406+
407+ // Client 2 waits to receive message from client 1, then responds
408+ let mut reader = BufReader :: new ( & mut stream) ;
409+ let mut message = String :: new ( ) ;
410+
411+ let received =
412+ match timeout ( Duration :: from_secs ( 2 ) , reader. read_line ( & mut message) ) . await {
413+ Ok ( Ok ( _) ) => message. trim ( ) . to_string ( ) ,
414+ _ => "NO_MESSAGE" . to_string ( ) ,
415+ } ;
416+
417+ // Send response back to client 1
418+ stream
419+ . write_all ( b"Hello from client 2\n " )
420+ . await
421+ . expect ( "Failed to send response" ) ;
422+ stream. flush ( ) . await . expect ( "Failed to flush" ) ;
423+
424+ received
425+ } ) ;
426+
427+ // Wait for both clients to complete
428+ let ( client1_response, client2_received) = tokio:: join!( client1_handle, client2_handle) ;
429+
430+ // Verify message broadcasting worked
431+ let client1_response = client1_response. expect ( "Client 1 task failed" ) ;
432+ let client2_received = client2_received. expect ( "Client 2 task failed" ) ;
433+
434+ // Client 2 should always receive the message from Client 1
435+ assert_eq ! (
436+ client2_received, "Hello from client 1" ,
437+ "Client 2 should receive message from Client 1"
438+ ) ;
439+
440+ // Client 1 might receive either its own message (due to broadcast) or Client 2's response
441+ // Both are valid in a broadcast system - this verifies the broadcast mechanism works
442+ assert ! (
443+ client1_response == "Hello from client 1" || client1_response == "Hello from client 2" ,
444+ "Client 1 should receive either its own message or Client 2's response, got: '{}'" ,
445+ client1_response
446+ ) ;
447+
448+ // Clean up
449+ daemon_handle. abort ( ) ;
450+ }
451+
452+ #[ tokio:: test]
453+ async fn test_daemon_multiple_clients ( ) {
454+ use std:: sync:: Arc ;
455+ use tokio:: io:: { AsyncBufReadExt , AsyncWriteExt , BufReader } ;
456+ use tokio:: net:: UnixStream ;
457+ use tokio:: sync:: Barrier ;
458+ use tokio:: time:: { Duration , timeout} ;
459+ use uuid:: Uuid ;
460+
461+ // Initialize tracing for test output
462+ let _ = tracing_subscriber:: fmt:: try_init ( ) ;
463+
464+ // Use current process PID
465+ let test_pid = std:: process:: id ( ) ;
466+ // Use UUID to ensure unique socket path per test run
467+ let test_id = Uuid :: new_v4 ( ) ;
468+ let socket_prefix = format ! ( "dialectic-test-{}" , test_id) ;
469+ let socket_path = format ! ( "/tmp/{}-{}.sock" , socket_prefix, test_pid) ;
470+
471+ // Clean up any existing socket
472+ let _ = std:: fs:: remove_file ( & socket_path) ;
473+
474+ // Barrier for coordinating when daemon is ready (2 participants: daemon + test)
475+ let ready_barrier = Arc :: new ( Barrier :: new ( 2 ) ) ;
476+ // Barrier for coordinating when all clients are connected (1 sender + 2 receivers = 3)
477+ let client_barrier = Arc :: new ( Barrier :: new ( 3 ) ) ;
478+
479+ // Start the full daemon with unique prefix and ready barrier
480+ let ready_barrier_clone = ready_barrier. clone ( ) ;
481+ let daemon_handle = tokio:: spawn ( async move {
482+ run_daemon_with_prefix ( test_pid, & socket_prefix, Some ( ready_barrier_clone) ) . await
483+ } ) ;
484+
485+ // Wait for daemon to be ready
486+ ready_barrier. wait ( ) . await ;
487+
488+ // Verify socket was created
489+ assert ! (
490+ std:: path:: Path :: new( & socket_path) . exists( ) ,
491+ "Daemon should create socket file"
492+ ) ;
493+
494+ // Test: One sender, multiple receivers
495+ let socket_path_sender = socket_path. clone ( ) ;
496+ let barrier_sender = client_barrier. clone ( ) ;
497+ let sender_handle = tokio:: spawn ( async move {
498+ let mut stream = UnixStream :: connect ( & socket_path_sender)
499+ . await
500+ . expect ( "Sender failed to connect" ) ;
501+
502+ // Wait at barrier until all clients are connected
503+ barrier_sender. wait ( ) . await ;
504+
505+ // All clients are now connected and ready, send broadcast message
506+ stream
507+ . write_all ( b"Broadcast message\n " )
508+ . await
509+ . expect ( "Failed to send message" ) ;
510+ stream. flush ( ) . await . expect ( "Failed to flush" ) ;
511+ } ) ;
512+
513+ let socket_path_r1 = socket_path. clone ( ) ;
514+ let barrier_r1 = client_barrier. clone ( ) ;
515+ let receiver1_handle = tokio:: spawn ( async move {
516+ let mut stream = UnixStream :: connect ( & socket_path_r1)
517+ . await
518+ . expect ( "Receiver 1 failed to connect" ) ;
519+
520+ // Wait at barrier until all clients are connected
521+ barrier_r1. wait ( ) . await ;
522+
523+ // Wait for broadcast message from sender
524+ let mut reader = BufReader :: new ( & mut stream) ;
525+ let mut message = String :: new ( ) ;
526+
527+ match timeout ( Duration :: from_secs ( 2 ) , reader. read_line ( & mut message) ) . await {
528+ Ok ( Ok ( _) ) => message. trim ( ) . to_string ( ) ,
529+ _ => "NO_MESSAGE" . to_string ( ) ,
530+ }
531+ } ) ;
532+
533+ let socket_path_r2 = socket_path. clone ( ) ;
534+ let barrier_r2 = client_barrier. clone ( ) ;
535+ let receiver2_handle = tokio:: spawn ( async move {
536+ let mut stream = UnixStream :: connect ( & socket_path_r2)
537+ . await
538+ . expect ( "Receiver 2 failed to connect" ) ;
539+
540+ // Wait at barrier until all clients are connected
541+ barrier_r2. wait ( ) . await ;
542+
543+ // Wait for broadcast message from sender
544+ let mut reader = BufReader :: new ( & mut stream) ;
545+ let mut message = String :: new ( ) ;
546+
547+ match timeout ( Duration :: from_secs ( 2 ) , reader. read_line ( & mut message) ) . await {
548+ Ok ( Ok ( _) ) => message. trim ( ) . to_string ( ) ,
549+ _ => "NO_MESSAGE" . to_string ( ) ,
550+ }
551+ } ) ;
552+
553+ // Wait for all tasks
554+ let ( _, receiver1_msg, receiver2_msg) =
555+ tokio:: join!( sender_handle, receiver1_handle, receiver2_handle) ;
556+
557+ // Verify both receivers got the message
558+ let receiver1_msg = receiver1_msg. expect ( "Receiver 1 task failed" ) ;
559+ let receiver2_msg = receiver2_msg. expect ( "Receiver 2 task failed" ) ;
560+
561+ assert_eq ! (
562+ receiver1_msg, "Broadcast message" ,
563+ "Receiver 1 should get broadcast"
564+ ) ;
565+ assert_eq ! (
566+ receiver2_msg, "Broadcast message" ,
567+ "Receiver 2 should get broadcast"
568+ ) ;
569+
570+ // Clean up
571+ daemon_handle. abort ( ) ;
572+ }
573+
574+ #[ tokio:: test]
575+ async fn test_daemon_socket_claiming ( ) {
576+ use std:: sync:: Arc ;
577+ use tokio:: sync:: Barrier ;
578+ use uuid:: Uuid ;
579+
580+ // Initialize tracing for test output
581+ let _ = tracing_subscriber:: fmt:: try_init ( ) ;
582+
583+ // Use actual test process PID (so daemon won't exit due to "process died")
584+ let test_pid = std:: process:: id ( ) ;
585+ // Use UUID to ensure unique socket path per test run
586+ let test_id = Uuid :: new_v4 ( ) ;
587+ let socket_prefix = format ! ( "dialectic-test-{}" , test_id) ;
588+ let socket_path = format ! ( "/tmp/{}-{}.sock" , socket_prefix, test_pid) ;
589+
590+ // Clean up any existing socket
591+ let _ = std:: fs:: remove_file ( & socket_path) ;
592+
593+ // Barrier for coordinating when first daemon is ready (2 participants: daemon + test)
594+ let ready_barrier = Arc :: new ( Barrier :: new ( 2 ) ) ;
595+
596+ // Start first daemon with ready barrier
597+ let socket_prefix_1 = socket_prefix. clone ( ) ;
598+ let ready_barrier_clone = ready_barrier. clone ( ) ;
599+ let daemon1_handle = tokio:: spawn ( async move {
600+ run_daemon_with_prefix ( test_pid, & socket_prefix_1, Some ( ready_barrier_clone) ) . await
601+ } ) ;
602+
603+ // Wait for first daemon to be ready
604+ ready_barrier. wait ( ) . await ;
605+
606+ // Verify socket was created
607+ assert ! (
608+ std:: path:: Path :: new( & socket_path) . exists( ) ,
609+ "First daemon should create socket file"
610+ ) ;
611+
612+ // Try to start second daemon with same PID and prefix (should fail)
613+ let socket_prefix_2 = socket_prefix. clone ( ) ;
614+ let daemon2_result =
615+ tokio:: spawn (
616+ async move { run_daemon_with_prefix ( test_pid, & socket_prefix_2, None ) . await } ,
617+ )
618+ . await ;
619+
620+ // Second daemon should fail due to socket conflict
621+ assert ! ( daemon2_result. is_ok( ) , "Task should complete" ) ;
622+ let daemon2_inner_result = daemon2_result. unwrap ( ) ;
623+ assert ! (
624+ daemon2_inner_result. is_err( ) ,
625+ "Second daemon should fail due to socket conflict"
626+ ) ;
627+
628+ // Clean up first daemon
629+ daemon1_handle. abort ( ) ;
630+ }
631+ }
0 commit comments