@@ -17,7 +17,11 @@ use tracing::{debug, error};
1717
1818// Windows-specific imports
1919#[ cfg( windows) ]
20- use { std:: sync:: Arc , tokio:: io:: AsyncReadExt , tokio:: net:: windows:: named_pipe:: ServerOptions } ;
20+ use {
21+ std:: sync:: Arc ,
22+ tokio:: io:: AsyncReadExt ,
23+ tokio:: net:: windows:: named_pipe:: { ClientOptions , ServerOptions } ,
24+ } ;
2125
2226// DogStatsD buffer size for receiving metrics
2327// TODO(astuyve) buf should be dynamic
@@ -164,6 +168,31 @@ fn normalize_pipe_name(pipe_name: &str) -> String {
164168 }
165169}
166170
171+ /// Checks if a named pipe already exists by attempting to open it as a client.
172+ #[ cfg( windows) ]
173+ async fn pipe_exists ( pipe_name : & str ) -> std:: io:: Result < bool > {
174+ match ClientOptions :: new ( ) . open ( pipe_name) {
175+ Ok ( _client) => Ok ( true ) ,
176+ Err ( e) => match e. raw_os_error ( ) {
177+ Some ( 2 ) => {
178+ // ERROR_FILE_NOT_FOUND - pipe doesn't exist
179+ debug ! ( "Named pipe '{}' does not exist (error 2)" , pipe_name) ;
180+ Ok ( false )
181+ }
182+ _ => {
183+ // Other errors (access denied, invalid handle, etc.)
184+ debug ! (
185+ "Unable to check if pipe '{}' exists: {} (error code: {:?})" ,
186+ pipe_name,
187+ e,
188+ e. raw_os_error( )
189+ ) ;
190+ Err ( e)
191+ }
192+ } ,
193+ }
194+ }
195+
167196/// Reads data from a Windows named pipe with retry logic.
168197///
169198/// Windows named pipes can experience transient failures (client disconnect, pipe errors).
@@ -180,8 +209,15 @@ async fn read_from_named_pipe(
180209
181210 // Let named pipes cancel cleanly when the server is shut down
182211 while !cancel_token. is_cancelled ( ) {
183- // Create pipe if needed (initial startup or after error)
212+ // Create server if needed (initial startup or after error)
184213 if current_pipe. is_none ( ) {
214+ // First, check if pipe already exists (e.g. multiple dogstatsd instances)
215+ if let Ok ( true ) = pipe_exists ( pipe_name) . await {
216+ debug ! ( "DogStatsD server with named pipe '{}' exists, pipe can be shared." , pipe_name) ;
217+ return Ok ( ( ) ) ;
218+ }
219+
220+ // Attempt to create the pipe server
185221 match ServerOptions :: new ( ) . create ( pipe_name) {
186222 Ok ( new_pipe) => {
187223 consecutive_errors = 0 ; // Reset on successful pipe creation
0 commit comments