@@ -17,12 +17,7 @@ use tracing::{debug, error};
1717
1818// Windows-specific imports
1919#[ cfg( windows) ]
20- use {
21- std:: sync:: Arc ,
22- tokio:: io:: AsyncReadExt ,
23- tokio:: net:: windows:: named_pipe:: ServerOptions ,
24- tokio:: time:: { sleep, Duration } ,
25- } ;
20+ use { std:: sync:: Arc , tokio:: io:: AsyncReadExt , tokio:: net:: windows:: named_pipe:: ServerOptions } ;
2621
2722// DogStatsD buffer size for receiving metrics
2823// TODO(astuyve) buf should be dynamic
@@ -36,6 +31,10 @@ const BUFFER_SIZE: usize = 8192;
3631#[ cfg( windows) ]
3732const MAX_NAMED_PIPE_ERRORS : u32 = 5 ;
3833
34+ // Windows named pipe prefix. All named pipes on Windows must start with this prefix.
35+ #[ cfg( windows) ]
36+ const NAMED_PIPE_PREFIX : & str = "\\ \\ .\\ pipe\\ " ;
37+
3938/// Configuration for the DogStatsD server
4039pub struct DogStatsDConfig {
4140 /// Host to bind UDP socket to (e.g., "127.0.0.1")
@@ -44,7 +43,8 @@ pub struct DogStatsDConfig {
4443 pub port : u16 ,
4544 /// Optional namespace to prepend to all metric names (e.g., "myapp")
4645 pub metric_namespace : Option < String > ,
47- /// Optional Windows named pipe path (e.g., "\\\\.\\pipe\\my_pipe")
46+ /// Optional Windows named pipe name. Can be either a simple name (e.g., "my_pipe")
47+ /// or a full path (e.g., "\\\\.\\pipe\\my_pipe"). The prefix will be added automatically if missing.
4848 pub windows_pipe_name : Option < String > ,
4949}
5050
@@ -152,6 +152,18 @@ async fn handle_pipe_error_with_backoff(
152152 }
153153}
154154
155+ /// Normalizes a Windows named pipe name by adding the required prefix if missing.
156+ ///
157+ /// Windows named pipes must have the format `\\.\pipe\{name}`.
158+ #[ cfg( windows) ]
159+ fn normalize_pipe_name ( pipe_name : & str ) -> String {
160+ if pipe_name. starts_with ( NAMED_PIPE_PREFIX ) {
161+ pipe_name. to_string ( )
162+ } else {
163+ format ! ( "{}{}" , NAMED_PIPE_PREFIX , pipe_name)
164+ }
165+ }
166+
155167/// Reads data from a Windows named pipe with retry logic.
156168///
157169/// Windows named pipes can experience transient failures (client disconnect, pipe errors).
@@ -310,8 +322,10 @@ impl DogStatsD {
310322 let buffer_reader = if let Some ( ref pipe_name) = config. windows_pipe_name {
311323 #[ cfg( windows) ]
312324 {
325+ // Normalize pipe name to ensure it has the \\.\pipe\ prefix
326+ let normalized_pipe_name = normalize_pipe_name ( pipe_name) ;
313327 BufferReader :: NamedPipe {
314- pipe_name : Arc :: new ( pipe_name . clone ( ) ) ,
328+ pipe_name : Arc :: new ( normalized_pipe_name ) ,
315329 cancel_token : cancel_token. clone ( ) ,
316330 }
317331 }
@@ -546,6 +560,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
546560 #[ cfg( windows) ]
547561 #[ tokio:: test]
548562 async fn test_handle_pipe_error_with_backoff_max_errors ( ) {
563+ use super :: handle_pipe_error_with_backoff;
549564 let cancel_token = CancellationToken :: new ( ) ;
550565 let mut consecutive_errors = 0 ;
551566 let error = std:: io:: Error :: new ( std:: io:: ErrorKind :: PermissionDenied , "test error" ) ;
@@ -585,14 +600,15 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
585600 #[ cfg( windows) ]
586601 #[ tokio:: test]
587602 async fn test_handle_pipe_error_with_backoff_cancellation ( ) {
603+ use super :: handle_pipe_error_with_backoff;
588604 let cancel_token = CancellationToken :: new ( ) ;
589605 let mut consecutive_errors = 0 ;
590606 let error = std:: io:: Error :: new ( std:: io:: ErrorKind :: PermissionDenied , "test error" ) ;
591607
592608 // Spawn task to cancel after 10ms
593609 let cancel_clone = cancel_token. clone ( ) ;
594610 tokio:: spawn ( async move {
595- tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
611+ tokio:: time:: sleep ( tokio :: time :: Duration :: from_millis ( 10 ) ) . await ;
596612 cancel_clone. cancel ( ) ;
597613 } ) ;
598614
@@ -606,4 +622,61 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
606622 assert_eq ! ( result. unwrap( ) , false , "Should return false when cancelled" ) ;
607623 assert_eq ! ( consecutive_errors, 1 , "Error count should be 1" ) ;
608624 }
625+
626+ #[ cfg( windows) ]
627+ #[ test]
628+ fn test_normalize_pipe_name ( ) {
629+ use super :: normalize_pipe_name;
630+
631+ // Test cases: (input, expected_output, description)
632+ let test_cases = vec ! [
633+ // Names without prefix should get it added
634+ (
635+ "my_pipe" ,
636+ "\\ \\ .\\ pipe\\ my_pipe" ,
637+ "simple name without prefix" ,
638+ ) ,
639+ (
640+ "dd_dogstatsd" ,
641+ "\\ \\ .\\ pipe\\ dd_dogstatsd" ,
642+ "dd_dogstatsd without prefix" ,
643+ ) ,
644+ (
645+ "datadog_statsd" ,
646+ "\\ \\ .\\ pipe\\ datadog_statsd" ,
647+ "datadog_statsd without prefix" ,
648+ ) ,
649+ (
650+ "test-pipe_123" ,
651+ "\\ \\ .\\ pipe\\ test-pipe_123" ,
652+ "name with hyphens and underscores" ,
653+ ) ,
654+ ( "" , "\\ \\ .\\ pipe\\ " , "empty string" ) ,
655+ // Names with prefix should remain unchanged
656+ (
657+ "\\ \\ .\\ pipe\\ my_pipe" ,
658+ "\\ \\ .\\ pipe\\ my_pipe" ,
659+ "already has prefix" ,
660+ ) ,
661+ (
662+ "\\ \\ .\\ pipe\\ dd_dogstatsd" ,
663+ "\\ \\ .\\ pipe\\ dd_dogstatsd" ,
664+ "dd_dogstatsd with prefix" ,
665+ ) ,
666+ (
667+ "\\ \\ .\\ pipe\\ test" ,
668+ "\\ \\ .\\ pipe\\ test" ,
669+ "short name with prefix" ,
670+ ) ,
671+ ] ;
672+
673+ for ( input, expected, description) in test_cases {
674+ assert_eq ! (
675+ normalize_pipe_name( input) ,
676+ expected,
677+ "Failed for test case: {}" ,
678+ description
679+ ) ;
680+ }
681+ }
609682}
0 commit comments