Skip to content

Commit c236f6f

Browse files
committed
Cancel token behavior for named pipe loops
1 parent 19d5635 commit c236f6f

File tree

4 files changed

+100
-52
lines changed

4 files changed

+100
-52
lines changed

crates/datadog-trace-agent/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ pub struct Tags {
2323
function_tags_string: OnceLock<String>,
2424
}
2525

26+
impl Default for Tags {
27+
fn default() -> Self {
28+
Self::new()
29+
}
30+
}
31+
2632
impl Tags {
2733
pub fn from_env_string(env_tags: &str) -> Self {
2834
let mut tags = HashMap::new();

crates/datadog-trace-agent/src/trace_processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
101101
// double check content length is < max request content length in case transfer encoding is used
102102
if body_size > config.max_request_content_length {
103103
return log_and_create_http_response(
104-
&format!("Error processing traces: Payload too large"),
104+
"Error processing traces: Payload too large",
105105
StatusCode::PAYLOAD_TOO_LARGE,
106106
);
107107
}

crates/dogstatsd/src/dogstatsd.rs

Lines changed: 64 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,15 @@ impl std::fmt::Display for MessageSource {
6565
// BufferReader abstracts transport methods for metric data.
6666
enum BufferReader {
6767
/// UDP socket reader (cross-platform, default transport)
68-
UdpSocketReader(tokio::net::UdpSocket),
68+
UdpSocket(tokio::net::UdpSocket),
6969

7070
/// Mirror reader for testing - replays a fixed buffer
7171
#[allow(dead_code)]
72-
MirrorReader(Vec<u8>, SocketAddr),
72+
MirrorTest(Vec<u8>, SocketAddr),
7373

7474
/// Windows named pipe reader (Windows-only transport)
7575
#[cfg(windows)]
76-
NamedPipeReader {
76+
NamedPipe {
7777
pipe_name: Arc<String>,
7878
cancel_token: tokio_util::sync::CancellationToken,
7979
},
@@ -84,7 +84,7 @@ impl BufferReader {
8484
/// Note: Different transports have different blocking behaviors.
8585
async fn read(&self) -> std::io::Result<(Vec<u8>, MessageSource)> {
8686
match self {
87-
BufferReader::UdpSocketReader(socket) => {
87+
BufferReader::UdpSocket(socket) => {
8888
// UDP socket: blocks until a packet arrives
8989
let mut buf = [0; BUFFER_SIZE];
9090

@@ -95,16 +95,17 @@ impl BufferReader {
9595
.expect("didn't receive data");
9696
Ok((buf[..amt].to_owned(), MessageSource::Network(src)))
9797
}
98-
BufferReader::MirrorReader(data, socket) => {
98+
BufferReader::MirrorTest(data, socket) => {
9999
// Mirror Reader: returns immediately with stored data
100100
Ok((data.clone(), MessageSource::Network(*socket)))
101101
}
102102
#[cfg(windows)]
103-
BufferReader::NamedPipeReader {
103+
BufferReader::NamedPipe {
104104
pipe_name,
105105
cancel_token,
106106
} => {
107107
// Named Pipe Reader: retries with backoff due to expected transient errors
108+
#[allow(clippy::expect_used)]
108109
let data = read_from_named_pipe(pipe_name, cancel_token)
109110
.await
110111
.expect("didn't receive data");
@@ -127,28 +128,35 @@ const MAX_NAMED_PIPE_ERRORS: u32 = 5;
127128
#[cfg(windows)]
128129
async fn create_and_connect_named_pipe(
129130
pipe_name: &str,
131+
cancel_token: &tokio_util::sync::CancellationToken,
130132
) -> std::io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
131133
let named_pipe = ServerOptions::new()
132134
.access_outbound(false)
133135
.first_pipe_instance(false)
134-
.write_dac()
135136
.create(pipe_name)
136137
.map_err(|e| {
137138
error!("Failed to create named pipe: {}", e);
138139
e
139140
})?;
140141

141-
// Wait for client to connect to named pipe
142-
named_pipe.connect().await?;
143-
debug!("Client connected to DogStatsD named pipe");
144-
Ok(named_pipe)
142+
// Wait for client to connect to named pipe, or for cancellation
143+
tokio::select! {
144+
result = named_pipe.connect() => {
145+
result?;
146+
debug!("Client connected to DogStatsD named pipe");
147+
Ok(named_pipe)
148+
}
149+
_ = cancel_token.cancelled() => {
150+
Err(std::io::Error::other("Server Shutdown, do not create a named pipe."))
151+
}
152+
}
145153
}
146154

147155
/// Reads data from a Windows named pipe with retry logic.
148156
///
149157
/// Windows named pipes can experience transient failures (client disconnect, pipe errors).
150158
/// This function implements a retry loop with exponential backoff to handle these failures
151-
/// gracefully while allowing responsive shutdown via the cancel_token.
159+
/// and has additional logic to allow clean shutdown via cancel_token.
152160
#[cfg(windows)]
153161
async fn read_from_named_pipe(
154162
pipe_name: &str,
@@ -157,31 +165,29 @@ async fn read_from_named_pipe(
157165
let mut consecutive_errors = 0;
158166
let mut current_pipe: Option<tokio::net::windows::named_pipe::NamedPipeServer> = None;
159167

160-
loop {
161-
// Check if cancelled between operations
162-
if cancel_token.is_cancelled() {
163-
return Err(std::io::Error::new(
164-
std::io::ErrorKind::Interrupted,
165-
"Operation cancelled",
166-
));
167-
}
168-
168+
// Let named pipes cancel cleanly when the server is shut down
169+
'reading_named_pipe: while !cancel_token.is_cancelled() {
169170
// Create pipe if needed (initial startup, after client disconnect, or after error)
170171
if current_pipe.is_none() {
171-
match create_and_connect_named_pipe(pipe_name).await {
172+
match create_and_connect_named_pipe(pipe_name, cancel_token).await {
172173
Ok(new_pipe) => {
173174
consecutive_errors = 0;
174175
current_pipe = Some(new_pipe);
175176
}
176177
Err(e) => {
178+
// Check for cancellation before retrying
179+
if cancel_token.is_cancelled() {
180+
break 'reading_named_pipe;
181+
}
182+
177183
// Handle pipe creation/connection errors with retry logic
178184
consecutive_errors += 1;
179185

180186
if consecutive_errors >= MAX_NAMED_PIPE_ERRORS {
181-
return Err(std::io::Error::new(
182-
std::io::ErrorKind::Other,
183-
format!("Too many consecutive pipe errors: {}", e),
184-
));
187+
return Err(std::io::Error::other(format!(
188+
"Too many consecutive pipe errors: {}",
189+
e
190+
)));
185191
}
186192

187193
let backoff_ms = 10u64 * (1 << consecutive_errors.min(6));
@@ -191,11 +197,24 @@ async fn read_from_named_pipe(
191197
}
192198
}
193199

194-
// Read from the connected pipe
200+
// Read from the connected pipe. Use select! to allow cancellation during blocking read operation.
195201
let mut buf = [0; BUFFER_SIZE];
196-
let pipe = current_pipe.as_mut().unwrap();
197202

198-
match pipe.read(&mut buf).await {
203+
#[allow(clippy::expect_used)]
204+
let pipe = current_pipe
205+
.as_mut()
206+
.expect("did not create and connect to a named pipe");
207+
208+
// Allow read operation to be interrupted by cancellation token for clean shutdown
209+
let read_result = tokio::select! {
210+
result = pipe.read(&mut buf) => result,
211+
_ = cancel_token.cancelled() => {
212+
// Server shutdown requested during read operation
213+
return Err(std::io::Error::other("Server shutdown during pipe read"));
214+
}
215+
};
216+
217+
match read_result {
199218
Ok(0) => {
200219
// Client disconnected gracefully
201220
current_pipe = None;
@@ -208,14 +227,19 @@ async fn read_from_named_pipe(
208227
Err(e) => {
209228
// Read error - retry with backoff
210229
error!("Error reading from named pipe: {}", e);
211-
consecutive_errors += 1;
212230
current_pipe = None;
213231

232+
// Check for cancellation before retrying
233+
if cancel_token.is_cancelled() {
234+
break 'reading_named_pipe;
235+
}
236+
237+
consecutive_errors += 1;
214238
if consecutive_errors >= MAX_NAMED_PIPE_ERRORS {
215-
return Err(std::io::Error::new(
216-
std::io::ErrorKind::Other,
217-
"Too many consecutive read errors",
218-
));
239+
return Err(std::io::Error::other(format!(
240+
"Too many consecutive read errors: {}",
241+
e
242+
)));
219243
}
220244

221245
let backoff_ms = 10u64 * (1 << consecutive_errors.min(6));
@@ -224,6 +248,9 @@ async fn read_from_named_pipe(
224248
}
225249
}
226250
}
251+
252+
// If we exit due to cancellation, return an empty result.
253+
Ok(Vec::new())
227254
}
228255

229256
/// TDogStatsD server to receive, parse, and forward metrics.
@@ -249,7 +276,7 @@ impl DogStatsD {
249276
let buffer_reader = if let Some(ref pipe_name) = config.windows_pipe_name {
250277
#[cfg(windows)]
251278
{
252-
BufferReader::NamedPipeReader {
279+
BufferReader::NamedPipe {
253280
pipe_name: Arc::new(pipe_name.clone()),
254281
cancel_token: cancel_token.clone(),
255282
}
@@ -267,7 +294,7 @@ impl DogStatsD {
267294
let socket = tokio::net::UdpSocket::bind(addr)
268295
.await
269296
.expect("couldn't bind to address");
270-
BufferReader::UdpSocketReader(socket)
297+
BufferReader::UdpSocket(socket)
271298
};
272299

273300
DogStatsD {
@@ -460,7 +487,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
460487
let dogstatsd = DogStatsD {
461488
cancel_token,
462489
aggregator_handle: handle.clone(),
463-
buffer_reader: BufferReader::MirrorReader(
490+
buffer_reader: BufferReader::MirrorTest(
464491
statsd_string.as_bytes().to_vec(),
465492
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(111, 112, 113, 114)), 0),
466493
),

crates/dogstatsd/tests/integration_test.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ async fn dogstatsd_server_ships_series() {
4646
// Start the service in a background task
4747
tokio::spawn(service.run());
4848

49-
let _ = start_dogstatsd(handle.clone()).await;
49+
let cancel_token = start_dogstatsd(handle.clone()).await;
5050

5151
let api_key_factory = ApiKeyFactory::new("mock-api-key");
5252

@@ -92,6 +92,9 @@ async fn dogstatsd_server_ships_series() {
9292
Ok(_) => mock.assert(),
9393
Err(_) => panic!("timed out before server received metric flush"),
9494
}
95+
96+
// Cleanup
97+
cancel_token.cancel();
9598
}
9699

97100
async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationToken {
@@ -315,7 +318,12 @@ async fn test_named_pipe_basic_communication() {
315318
sleep(Duration::from_millis(100)).await;
316319

317320
// Connect client and send metric
318-
let mut client = ClientOptions::new().open(pipe_name).expect("client open");
321+
// Client opens with write-only access to match server's inbound-only configuration
322+
let mut client = ClientOptions::new()
323+
.read(false)
324+
.write(true)
325+
.open(pipe_name)
326+
.expect("client open");
319327
client
320328
.write_all(b"test.metric:42|c\n")
321329
.await
@@ -327,12 +335,11 @@ async fn test_named_pipe_basic_communication() {
327335
// Verify metric was received
328336
let response = handle.flush().await.expect("flush failed");
329337
assert_eq!(response.series.len(), 1);
330-
assert_eq!(response.series[0].series[0].metric, "test.metric");
331338

332339
// Cleanup
333340
cancel_token.cancel();
334-
drop(client);
335-
let _ = timeout(Duration::from_millis(500), dogstatsd_task).await;
341+
let result = timeout(Duration::from_millis(500), dogstatsd_task).await;
342+
assert!(result.is_ok(), "task should complete after cancellation");
336343
handle.shutdown().expect("shutdown failed");
337344
}
338345

@@ -350,7 +357,7 @@ async fn test_named_pipe_disconnect_reconnect() {
350357
// Start DogStatsD server
351358
let dogstatsd_task = {
352359
let handle = handle.clone();
353-
let cancel_token = cancel_token.clone();
360+
let cancel_token_clone = cancel_token.clone();
354361
tokio::spawn(async move {
355362
let dogstatsd = DogStatsD::new(
356363
&DogStatsDConfig {
@@ -360,7 +367,7 @@ async fn test_named_pipe_disconnect_reconnect() {
360367
windows_pipe_name: Some(pipe_name.to_string()),
361368
},
362369
handle,
363-
cancel_token,
370+
cancel_token_clone.child_token(),
364371
)
365372
.await;
366373
dogstatsd.spin().await;
@@ -371,15 +378,23 @@ async fn test_named_pipe_disconnect_reconnect() {
371378

372379
// First client - connect, send, disconnect
373380
{
374-
let mut client1 = ClientOptions::new().open(pipe_name).expect("client1 open");
381+
let mut client1 = ClientOptions::new()
382+
.read(false)
383+
.write(true)
384+
.open(pipe_name)
385+
.expect("client1 open");
375386
client1.write_all(b"metric1:1|c\n").await.expect("write1");
376387
client1.flush().await.expect("flush1");
377388
} // client1 drops here (disconnect)
378389

379-
sleep(Duration::from_millis(200)).await;
390+
sleep(Duration::from_millis(100)).await;
380391

381392
// Second client - reconnect and send
382-
let mut client2 = ClientOptions::new().open(pipe_name).expect("client2 open");
393+
let mut client2 = ClientOptions::new()
394+
.read(false)
395+
.write(true)
396+
.open(pipe_name)
397+
.expect("client2 open");
383398
client2.write_all(b"metric2:2|c\n").await.expect("write2");
384399
client2.flush().await.expect("flush2");
385400

@@ -391,8 +406,8 @@ async fn test_named_pipe_disconnect_reconnect() {
391406

392407
// Cleanup
393408
cancel_token.cancel();
394-
drop(client2);
395-
let _ = timeout(Duration::from_millis(500), dogstatsd_task).await;
409+
let result = timeout(Duration::from_millis(500), dogstatsd_task).await;
410+
assert!(result.is_ok(), "tasks should complete after cancellation");
396411
handle.shutdown().expect("shutdown failed");
397412
}
398413

@@ -410,7 +425,7 @@ async fn test_named_pipe_cancellation() {
410425
// Start DogStatsD server
411426
let dogstatsd_task = {
412427
let handle = handle.clone();
413-
let cancel_token = cancel_token.clone();
428+
let cancel_token_clone = cancel_token.clone();
414429
tokio::spawn(async move {
415430
let dogstatsd = DogStatsD::new(
416431
&DogStatsDConfig {
@@ -420,7 +435,7 @@ async fn test_named_pipe_cancellation() {
420435
windows_pipe_name: Some(pipe_name.to_string()),
421436
},
422437
handle,
423-
cancel_token,
438+
cancel_token_clone.child_token(),
424439
)
425440
.await;
426441
dogstatsd.spin().await;

0 commit comments

Comments
 (0)