Skip to content

Commit 3001c28

Browse files
committed
Debug test_named_pipe_max_errors
1 parent a09de4d commit 3001c28

File tree

3 files changed

+130
-28
lines changed

3 files changed

+130
-28
lines changed

crates/dogstatsd/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ serde_json = { version = "1.0.116", default-features = false, features = ["alloc
2121
thiserror = { version = "1.0.58", default-features = false }
2222
tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net", "io-util"] }
2323
tokio-util = { version = "0.7.11", default-features = false }
24-
tracing = { version = "0.1.40", default-features = false }
24+
tracing = { version = "0.1.40", default-features = false, features = ["attributes"] }
2525
regex = { version = "1.10.6", default-features = false }
2626
zstd = { version = "0.13.3", default-features = false }
2727
datadog-fips = { path = "../datadog-fips", default-features = false }
@@ -30,6 +30,7 @@ datadog-fips = { path = "../datadog-fips", default-features = false }
3030
mockito = { version = "1.5.0", default-features = false }
3131
proptest = "1.4.0"
3232
tracing-test = { version = "0.2.5", default-features = false }
33+
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
3334

3435
[features]
3536
default = [ "reqwest/rustls-tls" ]

crates/dogstatsd/src/dogstatsd.rs

Lines changed: 83 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@ impl BufferReader {
8989
/// This is the main entry point for receiving metric data.
9090
/// Note: Different transports have different blocking behaviors.
9191
async fn read(&self) -> std::io::Result<(Vec<u8>, MessageSource)> {
92+
debug!(
93+
"BufferReader::read() called, variant: {}",
94+
match self {
95+
BufferReader::UdpSocket(_) => "UdpSocket",
96+
BufferReader::MirrorTest(_, _) => "MirrorTest",
97+
#[cfg(windows)]
98+
BufferReader::NamedPipe { .. } => "NamedPipe",
99+
}
100+
);
92101
match self {
93102
BufferReader::UdpSocket(socket) => {
94103
// UDP socket: blocks until a packet arrives
@@ -137,18 +146,30 @@ async fn handle_pipe_error_with_backoff(
137146
);
138147

139148
if *consecutive_errors >= MAX_NAMED_PIPE_ERRORS {
140-
return Err(std::io::Error::other(format!(
141-
"Too many consecutive {} errors: {}",
142-
error_type, error
143-
)));
149+
let err_msg = format!("Too many consecutive {} errors: {}", error_type, error);
150+
debug!(
151+
"MAX_NAMED_PIPE_ERRORS ({}) reached: {}",
152+
MAX_NAMED_PIPE_ERRORS, err_msg
153+
);
154+
return Err(std::io::Error::other(err_msg));
144155
}
145156

146157
let backoff_ms = 10u64 * (1 << *consecutive_errors);
158+
debug!(
159+
"Backing off {}ms after {} error (attempt {})",
160+
backoff_ms, error_type, consecutive_errors
161+
);
147162

148163
// Sleep with cancellation support for clean, fast shutdown
149164
tokio::select! {
150-
_ = sleep(Duration::from_millis(backoff_ms)) => Ok(true),
151-
_ = cancel_token.cancelled() => Ok(false),
165+
_ = sleep(Duration::from_millis(backoff_ms)) => {
166+
debug!("Backoff sleep completed");
167+
Ok(true)
168+
},
169+
_ = cancel_token.cancelled() => {
170+
debug!("Backoff cancelled via token");
171+
Ok(false)
172+
},
152173
}
153174
}
154175

@@ -162,21 +183,34 @@ async fn read_from_named_pipe(
162183
pipe_name: &str,
163184
cancel_token: &tokio_util::sync::CancellationToken,
164185
) -> std::io::Result<Vec<u8>> {
186+
debug!("read_from_named_pipe() called for: {}", pipe_name);
165187
let mut consecutive_errors = 0;
166188
let mut current_pipe: Option<tokio::net::windows::named_pipe::NamedPipeServer> = None;
167189
let mut needs_connection = true; // Track whether we need to wait for a client
168190

191+
debug!(
192+
"Entering retry loop, cancel_token.is_cancelled() = {}",
193+
cancel_token.is_cancelled()
194+
);
169195
// Let named pipes cancel cleanly when the server is shut down
170196
while !cancel_token.is_cancelled() {
171197
// Create pipe if needed (initial startup or after error)
172198
if current_pipe.is_none() {
199+
debug!("Attempting to create named pipe: {}", pipe_name);
173200
match ServerOptions::new().create(pipe_name) {
174201
Ok(new_pipe) => {
202+
debug!("Successfully created named pipe: {}", pipe_name);
175203
consecutive_errors = 0; // Reset on successful pipe creation
176204
current_pipe = Some(new_pipe);
177205
needs_connection = true;
178206
}
179207
Err(e) => {
208+
debug!(
209+
"Failed to create named pipe: {} - ErrorKind: {:?}, Error: {}",
210+
pipe_name,
211+
e.kind(),
212+
e
213+
);
180214
match handle_pipe_error_with_backoff(
181215
&mut consecutive_errors,
182216
"pipe creation",
@@ -185,8 +219,17 @@ async fn read_from_named_pipe(
185219
)
186220
.await?
187221
{
188-
true => continue,
189-
false => break,
222+
true => {
223+
debug!(
224+
"Retrying after pipe creation error backoff (attempt {})",
225+
consecutive_errors
226+
);
227+
continue;
228+
}
229+
false => {
230+
debug!("Breaking loop - cancelled during pipe creation error backoff");
231+
break;
232+
}
190233
}
191234
}
192235
}
@@ -197,19 +240,23 @@ async fn read_from_named_pipe(
197240
#[allow(clippy::expect_used)]
198241
let pipe = current_pipe.as_ref().expect("pipe must exist");
199242

243+
debug!("Waiting for client to connect to named pipe: {}", pipe_name);
200244
let connect_result = tokio::select! {
201245
result = pipe.connect() => result,
202246
_ = cancel_token.cancelled() => {
247+
debug!("Named pipe connect cancelled via token");
203248
break;
204249
}
205250
};
206251

207252
match connect_result {
208253
Ok(()) => {
254+
debug!("Client connected successfully to named pipe: {}", pipe_name);
209255
consecutive_errors = 0;
210256
needs_connection = false;
211257
}
212258
Err(e) => {
259+
debug!("Connection failed for named pipe: {} - {:?}", pipe_name, e);
213260
// Connection failed - disconnect and recreate pipe on next iteration
214261
if let Some(pipe) = current_pipe.as_ref() {
215262
let _ = pipe.disconnect(); // Ignore disconnect errors, we're recreating anyway
@@ -223,8 +270,14 @@ async fn read_from_named_pipe(
223270
)
224271
.await?
225272
{
226-
true => continue,
227-
false => break,
273+
true => {
274+
debug!("Retrying after connection error backoff");
275+
continue;
276+
}
277+
false => {
278+
debug!("Breaking loop - cancelled during connection error backoff");
279+
break;
280+
}
228281
}
229282
}
230283
}
@@ -306,8 +359,13 @@ impl DogStatsD {
306359
aggregator_handle: AggregatorHandle,
307360
cancel_token: tokio_util::sync::CancellationToken,
308361
) -> DogStatsD {
362+
debug!(
363+
"DogStatsD::new() called with windows_pipe_name: {:?}, host: {}, port: {}",
364+
config.windows_pipe_name, config.host, config.port
365+
);
309366
#[allow(unused_variables)] // pipe_name unused on non-Windows
310367
let buffer_reader = if let Some(ref pipe_name) = config.windows_pipe_name {
368+
debug!("Creating NamedPipe BufferReader for: {}", pipe_name);
311369
#[cfg(windows)]
312370
{
313371
BufferReader::NamedPipe {
@@ -321,6 +379,10 @@ impl DogStatsD {
321379
panic!("Named pipes are only supported on Windows.")
322380
}
323381
} else {
382+
debug!(
383+
"Creating UdpSocket BufferReader for: {}:{}",
384+
config.host, config.port
385+
);
324386
// UDP socket for all platforms
325387
let addr = format!("{}:{}", config.host, config.port);
326388
// TODO (UDS socket)
@@ -330,6 +392,7 @@ impl DogStatsD {
330392
.expect("couldn't bind to address");
331393
BufferReader::UdpSocket(socket)
332394
};
395+
debug!("BufferReader created successfully");
333396

334397
DogStatsD {
335398
cancel_token,
@@ -341,21 +404,31 @@ impl DogStatsD {
341404

342405
/// Main event loop that continuously receives and processes metrics.
343406
pub async fn spin(self) {
407+
debug!(
408+
"spin() starting, cancel_token.is_cancelled() = {}",
409+
self.cancel_token.is_cancelled()
410+
);
344411
let mut spin_cancelled = false;
345412
while !spin_cancelled {
413+
debug!("spin() loop iteration, calling consume_statsd()...");
346414
self.consume_statsd().await;
415+
debug!("consume_statsd() returned");
347416
spin_cancelled = self.cancel_token.is_cancelled();
417+
debug!("spin_cancelled = {}", spin_cancelled);
348418
}
419+
debug!("spin() exiting");
349420
}
350421

351422
/// Receive one batch of metrics from the transport layer and process them.
352423
async fn consume_statsd(&self) {
424+
debug!("consume_statsd() called, about to read from buffer_reader");
353425
#[allow(clippy::expect_used)]
354426
let (buf, src) = self
355427
.buffer_reader
356428
.read()
357429
.await
358430
.expect("didn't receive data");
431+
debug!("buffer_reader.read() completed with {} bytes", buf.len());
359432

360433
#[allow(clippy::expect_used)]
361434
let msgs = std::str::from_utf8(&buf).expect("couldn't parse as string");

crates/dogstatsd/tests/integration_test.rs

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use dogstatsd::metric::SortedTags;
54
use dogstatsd::{
65
aggregator::Aggregator as MetricsAggregator,
76
aggregator_service::{AggregatorHandle, AggregatorService},
87
api_key::ApiKeyFactory,
98
constants::CONTEXTS,
10-
datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride},
9+
datadog::{
10+
DdApi, DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride, RetryStrategy,
11+
},
1112
dogstatsd::{DogStatsD, DogStatsDConfig},
12-
flusher::{Flusher, FlusherConfig},
13+
flusher::{Flusher, FlusherConfig, ShippingError},
14+
metric::{parse, SortedTags},
1315
};
1416
use mockito::Server;
1517
use std::sync::Arc;
@@ -21,13 +23,14 @@ use tokio_util::sync::CancellationToken;
2123
use zstd::zstd_safe::CompressionLevel;
2224

2325
#[cfg(windows)]
24-
use tokio::{io::AsyncWriteExt, net::windows::named_pipe::ClientOptions};
26+
use tokio::{
27+
io::AsyncWriteExt, net::windows::named_pipe::ClientOptions,
28+
net::windows::named_pipe::ServerOptions,
29+
};
2530

2631
#[cfg(test)]
2732
#[tokio::test]
2833
async fn dogstatsd_server_ships_series() {
29-
use dogstatsd::datadog::RetryStrategy;
30-
3134
let mut mock_server = Server::new_async().await;
3235

3336
let mock = mock_server
@@ -122,9 +125,6 @@ async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationTok
122125
#[cfg(test)]
123126
#[tokio::test]
124127
async fn test_send_with_retry_immediate_failure() {
125-
use dogstatsd::datadog::{DdApi, DdDdUrl, RetryStrategy};
126-
use dogstatsd::metric::{parse, SortedTags};
127-
128128
let mut server = Server::new_async().await;
129129
let mock = server
130130
.mock("POST", "/api/v2/series")
@@ -170,9 +170,6 @@ async fn test_send_with_retry_immediate_failure() {
170170
#[cfg(test)]
171171
#[tokio::test]
172172
async fn test_send_with_retry_linear_backoff_success() {
173-
use dogstatsd::datadog::{DdApi, DdDdUrl, RetryStrategy};
174-
use dogstatsd::metric::{parse, SortedTags};
175-
176173
let mut server = Server::new_async().await;
177174
let mock = server
178175
.mock("POST", "/api/v2/series")
@@ -232,10 +229,6 @@ async fn test_send_with_retry_linear_backoff_success() {
232229
#[cfg(test)]
233230
#[tokio::test]
234231
async fn test_send_with_retry_immediate_failure_after_one_attempt() {
235-
use dogstatsd::datadog::{DdApi, DdDdUrl, RetryStrategy};
236-
use dogstatsd::flusher::ShippingError;
237-
use dogstatsd::metric::{parse, SortedTags};
238-
239232
let mut server = Server::new_async().await;
240233
let mock = server
241234
.mock("POST", "/api/v2/series")
@@ -454,26 +447,47 @@ async fn test_named_pipe_cancellation() {
454447
#[cfg(windows)]
455448
#[tokio::test]
456449
async fn test_named_pipe_max_errors() {
457-
use tokio::net::windows::named_pipe::ServerOptions;
450+
use tracing::info;
451+
use tracing_subscriber::{fmt, EnvFilter};
452+
453+
// Initialize subscriber that will capture ALL logs including from dogstatsd crate
454+
let _ = fmt()
455+
.with_env_filter(
456+
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("debug")),
457+
)
458+
.with_test_writer()
459+
.try_init();
458460

459461
let pipe_name = r"\\.\pipe\test_dogstatsd_max_errors";
462+
info!(
463+
"TEST START: test_named_pipe_max_errors with pipe: {}",
464+
pipe_name
465+
);
466+
460467
let (service, handle) = AggregatorService::new(SortedTags::parse("test:value").unwrap(), 1_024)
461468
.expect("aggregator service creation failed");
469+
info!("Aggregator service created");
462470
tokio::spawn(service.run());
471+
info!("Aggregator service spawned");
463472

464473
let cancel_token = CancellationToken::new();
474+
info!("Cancel token created");
465475

466476
// Create a pipe with first_pipe_instance(true) to block other instances
477+
info!("Creating blocking pipe with first_pipe_instance(true)...");
467478
let _blocking_pipe = ServerOptions::new()
468479
.first_pipe_instance(true)
469480
.create(pipe_name)
470481
.expect("failed to create blocking pipe");
482+
info!("Blocking pipe created successfully - should prevent other instances");
471483

472484
// Start DogStatsD server - should fail to create pipe due to existing instance
485+
info!("Spawning DogStatsD task (should fail to create pipe and panic after 5 retries)...");
473486
let dogstatsd_task = {
474487
let handle = handle.clone();
475488
let cancel_token_clone = cancel_token.clone();
476489
tokio::spawn(async move {
490+
info!("DogStatsD task starting...");
477491
let dogstatsd = DogStatsD::new(
478492
&DogStatsDConfig {
479493
host: String::new(),
@@ -485,14 +499,22 @@ async fn test_named_pipe_max_errors() {
485499
cancel_token_clone,
486500
)
487501
.await;
502+
info!("DogStatsD::new() completed, calling spin()...");
488503
dogstatsd.spin().await;
504+
info!("DogStatsD spin() completed (should not reach here if panics)");
489505
})
490506
};
507+
info!("DogStatsD task spawned, now waiting with 1500ms timeout...");
491508

492509
// Task should complete after exceeding MAX_NAMED_PIPE_ERRORS (5) attempts
493510
// With backoff: 20ms + 40ms + 80ms + 160ms + 320ms = ~620ms + overhead
494511
let result = timeout(Duration::from_millis(1500), dogstatsd_task).await;
495512

513+
info!(
514+
"Timeout completed, result: {:?}",
515+
result.as_ref().map(|_| "Ok").map_err(|_| "Timeout")
516+
);
517+
496518
// The task should panic because read_from_named_pipe returns an error after max retries
497519
assert!(
498520
result.is_ok(),
@@ -501,13 +523,19 @@ async fn test_named_pipe_max_errors() {
501523

502524
// Verify the task actually panicked (completed with error)
503525
if let Ok(join_result) = result {
526+
info!("Task completed, checking if it panicked...");
504527
assert!(
505528
join_result.is_err(),
506529
"task should have panicked after exceeding max consecutive errors"
507530
);
508531
}
509532

510533
// Cleanup
534+
info!("Test cleanup: cancelling token and shutting down...");
511535
cancel_token.cancel();
536+
info!("Blocking pipe being dropped...");
537+
drop(_blocking_pipe);
538+
info!("Blocking pipe dropped");
512539
handle.shutdown().expect("shutdown failed");
540+
info!("TEST END: test_named_pipe_max_errors");
513541
}

0 commit comments

Comments
 (0)