Skip to content

Commit c84f7c7

Browse files
committed
format and copyright headers and fix windows
1 parent fc14de1 commit c84f7c7

File tree

4 files changed

+32
-29
lines changed

4 files changed

+32
-29
lines changed

libdd-profiling/src/exporter/file_exporter.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
14
#[cfg(any(unix, windows))]
25
use std::path::PathBuf;
36

@@ -36,7 +39,7 @@ pub(crate) fn spawn_dump_server(output_path: PathBuf) -> anyhow::Result<PathBuf>
3639

3740
let socket_path_clone = socket_path.clone();
3841
let (tx, rx) = std::sync::mpsc::channel();
39-
42+
4043
std::thread::spawn(move || {
4144
// Top-level error handler - all errors logged here
4245
let result = (|| -> anyhow::Result<()> {
@@ -47,7 +50,7 @@ pub(crate) fn spawn_dump_server(output_path: PathBuf) -> anyhow::Result<PathBuf>
4750
run_dump_server_unix(output_path, listener).await
4851
})
4952
})();
50-
53+
5154
if let Err(e) = result {
5255
eprintln!("[dump-server] Error: {}", e);
5356
let _ = tx.send(Err(e));
@@ -66,7 +69,7 @@ async fn run_dump_server_windows(output_path: PathBuf, pipe_name: String) -> any
6669

6770
loop {
6871
// Create server instance
69-
let mut server = ServerOptions::new()
72+
let server = ServerOptions::new()
7073
.first_pipe_instance(true)
7174
.create(&pipe_name)?;
7275

@@ -92,7 +95,7 @@ pub(crate) fn spawn_dump_server(output_path: PathBuf) -> anyhow::Result<PathBuf>
9295
let pipe_path = PathBuf::from(&pipe_name);
9396

9497
let (tx, rx) = std::sync::mpsc::channel();
95-
98+
9699
std::thread::spawn(move || {
97100
// Top-level error handler - all errors logged here
98101
let result = (|| -> anyhow::Result<()> {
@@ -102,7 +105,7 @@ pub(crate) fn spawn_dump_server(output_path: PathBuf) -> anyhow::Result<PathBuf>
102105
run_dump_server_windows(output_path, pipe_name).await
103106
})
104107
})();
105-
108+
106109
if let Err(e) = result {
107110
eprintln!("[dump-server] Error: {}", e);
108111
let _ = tx.send(Err(e));
@@ -155,9 +158,7 @@ fn is_request_complete(
155158

156159
/// Read complete HTTP request from an async stream
157160
#[cfg(any(unix, windows))]
158-
async fn read_http_request_async<R: tokio::io::AsyncReadExt + Unpin>(
159-
stream: &mut R,
160-
) -> Vec<u8> {
161+
async fn read_http_request_async<R: tokio::io::AsyncReadExt + Unpin>(stream: &mut R) -> Vec<u8> {
161162
let mut request_data = Vec::new();
162163
let mut buffer = [0u8; 8192];
163164
let mut content_length: Option<usize> = None;
@@ -197,7 +198,10 @@ async fn read_http_request_async<R: tokio::io::AsyncReadExt + Unpin>(
197198
async fn write_request_to_file_async(output_path: &PathBuf, request_data: &[u8]) {
198199
if !request_data.is_empty() {
199200
if let Err(e) = tokio::fs::write(output_path, request_data).await {
200-
eprintln!("[dump-server] Failed to write request dump to {:?}: {}", output_path, e);
201+
eprintln!(
202+
"[dump-server] Failed to write request dump to {:?}: {}",
203+
output_path, e
204+
);
201205
}
202206
}
203207
}
@@ -210,7 +214,7 @@ where
210214
{
211215
let request_data = read_http_request_async(&mut stream).await;
212216
write_request_to_file_async(&output_path, &request_data).await;
213-
217+
214218
if let Err(e) = stream.write_all(HTTP_200_RESPONSE).await {
215219
eprintln!("[dump-server] Failed to send HTTP response: {}", e);
216220
}

libdd-profiling/src/exporter/profile_exporter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl ProfileExporter {
8686
let output_path = libdd_common::decode_uri_path_in_authority(&endpoint.url)
8787
.context("Failed to decode file path from URI")?;
8888
let pipe_path = spawn_dump_server(output_path)?;
89-
builder = builder.windows_named_pipe(pipe_path);
89+
builder = builder.windows_named_pipe(pipe_path.to_string_lossy().to_string());
9090
"http://localhost/v1/input".to_string()
9191
}
9292

@@ -108,7 +108,7 @@ impl ProfileExporter {
108108
Some("windows") => {
109109
use libdd_common::connector::named_pipe::named_pipe_path_from_uri;
110110
let pipe_path = named_pipe_path_from_uri(&endpoint.url)?;
111-
builder = builder.windows_named_pipe(pipe_path);
111+
builder = builder.windows_named_pipe(pipe_path.to_string_lossy().to_string());
112112
format!("http://localhost{}", endpoint.url.path())
113113
}
114114
#[cfg(unix)]

libdd-profiling/src/exporter/utils.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,7 @@ pub fn parse_http_request(data: &[u8]) -> anyhow::Result<HttpRequest> {
5858
let mut lines = header_str.lines();
5959

6060
// Parse request line
61-
let request_line = lines
62-
.next()
63-
.context("No request line found")?;
61+
let request_line = lines.next().context("No request line found")?;
6462
let parts: Vec<&str> = request_line.split_whitespace().collect();
6563
if parts.len() < 2 {
6664
anyhow::bail!("Invalid request line");
@@ -174,12 +172,12 @@ pub fn parse_multipart(body: &[u8], boundary: &str) -> anyhow::Result<Vec<Multip
174172

175173
// Move past the delimiter
176174
pos = part_end + delimiter_bytes.len();
177-
175+
178176
// Check if this is the end delimiter (delimiter followed by --)
179177
if pos + 2 <= body.len() && &body[pos..pos + 2] == b"--" {
180178
break;
181179
}
182-
180+
183181
// Skip CRLF after boundary
184182
if pos + 2 <= body.len() && &body[pos..pos + 2] == b"\r\n" {
185183
pos += 2;

libdd-profiling/tests/exporter_e2e.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ async fn read_and_capture_request<S>(
147147
) where
148148
S: tokio::io::AsyncReadExt + tokio::io::AsyncWriteExt + Unpin,
149149
{
150-
151150
let mut buffer = Vec::new();
152151
let mut temp_buf = [0u8; 8192];
153152
let mut headers_complete = false;
@@ -177,9 +176,7 @@ async fn read_and_capture_request<S>(
177176
}
178177
}
179178

180-
if let (Some(headers_end), Some(expected_len)) =
181-
(headers_end_pos, content_length)
182-
{
179+
if let (Some(headers_end), Some(expected_len)) = (headers_end_pos, content_length) {
183180
if buffer.len() - headers_end >= expected_len {
184181
break;
185182
}
@@ -337,10 +334,7 @@ fn validate_full_export(req: &ReceivedRequest, expected_path: &str) -> anyhow::R
337334
}
338335

339336
// Verify process_tags
340-
assert_eq!(
341-
event_json["process_tags"],
342-
"entrypoint.name:main,pid:12345"
343-
);
337+
assert_eq!(event_json["process_tags"], "entrypoint.name:main,pid:12345");
344338

345339
// Verify attachments
346340
let attachments = event_json["attachments"]
@@ -375,7 +369,11 @@ fn validate_full_export(req: &ReceivedRequest, expected_path: &str) -> anyhow::R
375369
.iter()
376370
.find(|p| p.name == *part_name)
377371
.ok_or_else(|| anyhow::anyhow!("Missing part: {}", part_name))?;
378-
assert!(!part.content.is_empty(), "{} should not be empty", part_name);
372+
assert!(
373+
!part.content.is_empty(),
374+
"{} should not be empty",
375+
part_name
376+
);
379377
}
380378

381379
Ok(())
@@ -405,7 +403,8 @@ async fn test_agent_with_transport(transport: Transport) -> anyhow::Result<()> {
405403
};
406404

407405
// Run the full export test
408-
let req = export_full_profile(endpoint, RequestSource::Captured(server.received_requests)).await?;
406+
let req =
407+
export_full_profile(endpoint, RequestSource::Captured(server.received_requests)).await?;
409408

410409
// Validate
411410
validate_full_export(&req, "/profiling/v1/input")?;
@@ -446,7 +445,8 @@ async fn test_agentless_with_transport(transport: Transport) -> anyhow::Result<(
446445
{
447446
let pipe_path = server.pipe_path.as_ref().unwrap();
448447
// For named pipes, we need to create endpoint manually
449-
let endpoint_url = libdd_common::connector::named_pipe::named_pipe_path_to_uri(pipe_path)?;
448+
let endpoint_url =
449+
libdd_common::connector::named_pipe::named_pipe_path_to_uri(pipe_path)?;
450450
let mut parts = endpoint_url.into_parts();
451451
parts.path_and_query = Some("/api/v2/profile".parse()?);
452452
let url = http::Uri::from_parts(parts)?;
@@ -459,7 +459,8 @@ async fn test_agentless_with_transport(transport: Transport) -> anyhow::Result<(
459459
};
460460

461461
// Run the full export test
462-
let req = export_full_profile(endpoint, RequestSource::Captured(server.received_requests)).await?;
462+
let req =
463+
export_full_profile(endpoint, RequestSource::Captured(server.received_requests)).await?;
463464

464465
// Validate - agentless uses /api/v2/profile path
465466
validate_full_export(&req, "/api/v2/profile")?;

0 commit comments

Comments
 (0)