Skip to content

Commit ca7e810

Browse files
committed
feat(profiling): File endpoint for exporter
1 parent 5027515 commit ca7e810

File tree

8 files changed

+1058
-4
lines changed

8 files changed

+1058
-4
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

LICENSE-3rdparty.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27494,9 +27494,9 @@ third_party_libraries:
2749427494
- package_name: stringmetrics
2749527495
package_version: 2.2.2
2749627496
repository: https://github.com/pluots/stringmetrics
27497-
license: License specified in file ($CARGO_HOME/registry/src/github.com-25cdd57fae9f0462/stringmetrics-2.2.2/LICENSE)
27497+
license: License specified in file ($CARGO_HOME/registry/src/index.crates.io-1949cf8c6b5b557f/stringmetrics-2.2.2/LICENSE)
2749827498
licenses:
27499-
- license: License specified in file ($CARGO_HOME/registry/src/github.com-25cdd57fae9f0462/stringmetrics-2.2.2/LICENSE)
27499+
- license: License specified in file ($CARGO_HOME/registry/src/index.crates.io-1949cf8c6b5b557f/stringmetrics-2.2.2/LICENSE)
2750027500
text: |
2750127501
Copyright 2022 Trevor Gross
2750227502

libdd-profiling/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,9 @@ serde = {version = "1.0", features = ["derive"]}
5151
serde_json = {version = "1.0"}
5252
target-triple = "0.1.4"
5353
thiserror = "2"
54-
tokio = {version = "1.23", features = ["rt", "macros"]}
54+
tokio = {version = "1.23", features = ["rt", "macros", "net", "io-util", "fs", "sync"]}
5555
tokio-util = "0.7.1"
56+
rand = "0.8"
5657
zstd = { version = "0.13", default-features = false }
5758
cxx = { version = "1.0", optional = true }
5859

libdd-profiling/src/exporter/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ pub fn agentless<AsStrRef: AsRef<str>, IntoCow: Into<Cow<'static, str>>>(
7070
})
7171
}
7272

73+
/// Creates an Endpoint for dumping HTTP requests to a file for testing/debugging.
74+
///
75+
/// # Arguments
76+
/// * `path` - File system path where the HTTP request bytes should be written
7377
pub fn file(path: impl AsRef<str>) -> anyhow::Result<Endpoint> {
7478
let url: String = format!("file://{}", path.as_ref());
7579
Ok(Endpoint::from_slice(&url))
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! File-based HTTP request dumping for testing and debugging.
5+
//!
6+
//! This module implements a local server (Unix domain socket on Unix,
7+
//! named pipe on Windows) that captures raw HTTP requests and writes them to disk.
8+
//!
9+
//! This is primarily used for testing to validate the exact bytes sent over the wire.
10+
11+
use std::path::PathBuf;
12+
13+
/// HTTP 200 OK response with no body
14+
const HTTP_200_RESPONSE: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
15+
16+
/// Spawns a dump server that intercepts HTTP requests and writes them to a file
17+
///
18+
/// Returns the socket/pipe path that can be used as a unix:// or windows:// URI
19+
///
20+
/// # Arguments
21+
/// * `output_path` - Where to write the captured HTTP request bytes
22+
///
23+
/// # Returns
24+
/// The path to the Unix socket (on Unix) or named pipe (on Windows) that the server is listening on
25+
#[cfg(unix)]
26+
pub(crate) fn spawn_dump_server(output_path: PathBuf) -> anyhow::Result<PathBuf> {
27+
use tokio::net::UnixListener;
28+
29+
// Create a temporary socket path with randomness to avoid collisions
30+
let random_id: u64 = rand::random();
31+
let socket_path = std::env::temp_dir().join(format!(
32+
"libdatadog_dump_{}_{:x}.sock",
33+
std::process::id(),
34+
random_id
35+
));
36+
37+
// Remove socket file if it already exists
38+
let _ = std::fs::remove_file(&socket_path);
39+
40+
let socket_path_clone = socket_path.clone();
41+
let (tx, rx) = std::sync::mpsc::channel();
42+
43+
std::thread::spawn(move || {
44+
// Top-level error handler - all errors logged here
45+
let result = (|| -> anyhow::Result<()> {
46+
let rt = tokio::runtime::Builder::new_current_thread()
47+
.enable_all()
48+
.build()?;
49+
rt.block_on(async {
50+
let listener = UnixListener::bind(&socket_path)?;
51+
tx.send(Ok(()))?;
52+
run_dump_server_unix(output_path, listener).await
53+
})
54+
})();
55+
56+
if let Err(e) = result {
57+
eprintln!("[dump-server] Error: {}", e);
58+
let _ = tx.send(Err(e));
59+
}
60+
});
61+
62+
// Wait for server to be ready
63+
rx.recv()??;
64+
Ok(socket_path_clone)
65+
}
66+
67+
/// Spawns a dump server that intercepts HTTP requests and writes them to a file
68+
///
69+
/// Returns the pipe path that can be used as a windows:// URI
70+
///
71+
/// # Arguments
72+
/// * `output_path` - Where to write the captured HTTP request bytes
73+
///
74+
/// # Returns
75+
/// The path to the Windows named pipe that the server is listening on
76+
#[cfg(windows)]
77+
pub(crate) fn spawn_dump_server(output_path: PathBuf) -> anyhow::Result<PathBuf> {
78+
use tokio::net::windows::named_pipe::ServerOptions;
79+
80+
// Create a unique named pipe name with randomness to avoid collisions
81+
let random_id: u64 = rand::random();
82+
let pipe_name = format!(
83+
r"\\.\pipe\libdatadog_dump_{}_{:x}",
84+
std::process::id(),
85+
random_id
86+
);
87+
let pipe_path = PathBuf::from(&pipe_name);
88+
89+
let (tx, rx) = std::sync::mpsc::channel();
90+
91+
std::thread::spawn(move || {
92+
// Top-level error handler - all errors logged here
93+
let result = (|| -> anyhow::Result<()> {
94+
let rt = tokio::runtime::Builder::new_current_thread()
95+
.enable_all()
96+
.build()?;
97+
rt.block_on(async {
98+
// Create the first pipe instance before signaling ready
99+
let first_server = ServerOptions::new()
100+
.first_pipe_instance(true)
101+
.create(&pipe_name)?;
102+
103+
tx.send(Ok(()))?;
104+
run_dump_server_windows(output_path, pipe_name, first_server).await
105+
})
106+
})();
107+
108+
if let Err(e) = result {
109+
eprintln!("[dump-server] Error: {}", e);
110+
let _ = tx.send(Err(e));
111+
}
112+
});
113+
114+
// Wait for server to be ready
115+
rx.recv()??;
116+
Ok(pipe_path)
117+
}
118+
119+
/// Async server loop for Unix sockets
120+
#[cfg(unix)]
121+
async fn run_dump_server_unix(
122+
output_path: PathBuf,
123+
listener: tokio::net::UnixListener,
124+
) -> anyhow::Result<()> {
125+
loop {
126+
let (stream, _) = listener.accept().await?;
127+
handle_connection_async(stream, output_path.clone()).await;
128+
}
129+
}
130+
131+
/// Async server loop for Windows named pipes
132+
#[cfg(windows)]
133+
async fn run_dump_server_windows(
134+
output_path: PathBuf,
135+
pipe_name: String,
136+
first_server: tokio::net::windows::named_pipe::NamedPipeServer,
137+
) -> anyhow::Result<()> {
138+
use tokio::net::windows::named_pipe::ServerOptions;
139+
140+
// Handle first connection
141+
first_server.connect().await?;
142+
handle_connection_async(first_server, output_path.clone()).await;
143+
144+
// Handle subsequent connections
145+
loop {
146+
// Create server instance (not the first one)
147+
let server = ServerOptions::new()
148+
.first_pipe_instance(false)
149+
.create(&pipe_name)?;
150+
151+
// Wait for client connection
152+
server.connect().await?;
153+
154+
// Handle connection sequentially (this is just a debugging API)
155+
handle_connection_async(server, output_path.clone()).await;
156+
}
157+
}
158+
159+
/// Helper function to find a subsequence in a byte slice
160+
fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
161+
haystack
162+
.windows(needle.len())
163+
.position(|window| window == needle)
164+
}
165+
166+
/// Parse Content-Length from HTTP headers
167+
fn parse_content_length(headers_data: &[u8]) -> Option<usize> {
168+
if let Ok(headers_str) = std::str::from_utf8(headers_data) {
169+
for line in headers_str.lines() {
170+
if line.to_lowercase().starts_with("content-length:") {
171+
if let Some(len_str) = line.split(':').nth(1) {
172+
return len_str.trim().parse().ok();
173+
}
174+
}
175+
}
176+
}
177+
None
178+
}
179+
180+
/// Check if we have received a complete HTTP request
181+
fn is_request_complete(
182+
request_data: &[u8],
183+
headers_end_pos: Option<usize>,
184+
content_length: Option<usize>,
185+
) -> bool {
186+
if let Some(headers_end) = headers_end_pos {
187+
if let Some(expected_len) = content_length {
188+
let body_len = request_data.len() - headers_end;
189+
return body_len >= expected_len;
190+
}
191+
192+
// For chunked transfer encoding, look for the end chunk marker
193+
// The end of a chunked body is: 0\r\n\r\n
194+
if request_data.len() >= headers_end + 5 {
195+
let body = &request_data[headers_end..];
196+
// Check if body ends with the chunked encoding terminator
197+
if body.ends_with(b"0\r\n\r\n") {
198+
return true;
199+
}
200+
}
201+
}
202+
false
203+
}
204+
205+
/// Read complete HTTP request from an async stream
206+
async fn read_http_request_async<R: tokio::io::AsyncReadExt + Unpin>(stream: &mut R) -> Vec<u8> {
207+
let mut request_data = Vec::new();
208+
let mut buffer = [0u8; 8192];
209+
let mut content_length: Option<usize> = None;
210+
let mut headers_end_pos: Option<usize> = None;
211+
212+
loop {
213+
match stream.read(&mut buffer).await {
214+
Ok(0) => break, // Connection closed
215+
Ok(n) => {
216+
request_data.extend_from_slice(&buffer[..n]);
217+
218+
// Look for end of headers if we haven't found it yet
219+
if headers_end_pos.is_none() {
220+
if let Some(pos) = find_subsequence(&request_data, b"\r\n\r\n") {
221+
headers_end_pos = Some(pos + 4);
222+
content_length = parse_content_length(&request_data[..pos]);
223+
}
224+
}
225+
226+
// Check if we have the complete request
227+
if is_request_complete(&request_data, headers_end_pos, content_length) {
228+
break;
229+
}
230+
}
231+
Err(e) => {
232+
eprintln!("[dump-server] Failed to read from connection: {}", e);
233+
break;
234+
}
235+
}
236+
}
237+
238+
request_data
239+
}
240+
241+
/// Decode chunked transfer encoding
242+
fn decode_chunked_body(chunked_data: &[u8]) -> Vec<u8> {
243+
let mut result = Vec::new();
244+
let mut pos = 0;
245+
246+
while pos < chunked_data.len() {
247+
// Find the end of the chunk size line (\r\n)
248+
if let Some(line_end) = find_subsequence(&chunked_data[pos..], b"\r\n") {
249+
// Parse the chunk size (hex)
250+
if let Ok(size_str) = std::str::from_utf8(&chunked_data[pos..pos + line_end]) {
251+
if let Ok(chunk_size) = usize::from_str_radix(size_str.trim(), 16) {
252+
if chunk_size == 0 {
253+
// End of chunks
254+
break;
255+
}
256+
257+
// Move past the size line and \r\n
258+
pos += line_end + 2;
259+
260+
// Read the chunk data
261+
if pos + chunk_size <= chunked_data.len() {
262+
result.extend_from_slice(&chunked_data[pos..pos + chunk_size]);
263+
pos += chunk_size;
264+
265+
// Skip the trailing \r\n after the chunk
266+
if pos + 2 <= chunked_data.len() && &chunked_data[pos..pos + 2] == b"\r\n" {
267+
pos += 2;
268+
}
269+
} else {
270+
break;
271+
}
272+
} else {
273+
break;
274+
}
275+
} else {
276+
break;
277+
}
278+
} else {
279+
break;
280+
}
281+
}
282+
283+
result
284+
}
285+
286+
/// Write request data to file if non-empty (async version)
287+
/// Decodes chunked transfer encoding if present
288+
async fn write_request_to_file_async(output_path: &PathBuf, request_data: &[u8]) {
289+
if request_data.is_empty() {
290+
return;
291+
}
292+
293+
// Check if this is a chunked request and decode it
294+
let data_to_write = if let Some(headers_end) = find_subsequence(request_data, b"\r\n\r\n") {
295+
let headers = &request_data[..headers_end];
296+
let body = &request_data[headers_end + 4..];
297+
298+
// Check for transfer-encoding: chunked
299+
let is_chunked = if let Ok(headers_str) = std::str::from_utf8(headers) {
300+
headers_str
301+
.to_lowercase()
302+
.contains("transfer-encoding: chunked")
303+
} else {
304+
false
305+
};
306+
307+
if is_chunked {
308+
// Decode the chunked body and reconstruct the request with Content-Length
309+
let decoded_body = decode_chunked_body(body);
310+
let mut reconstructed = Vec::new();
311+
312+
// Add headers but replace transfer-encoding with content-length
313+
if let Ok(headers_str) = std::str::from_utf8(headers) {
314+
for line in headers_str.lines() {
315+
if !line.to_lowercase().starts_with("transfer-encoding:") {
316+
reconstructed.extend_from_slice(line.as_bytes());
317+
reconstructed.extend_from_slice(b"\r\n");
318+
}
319+
}
320+
// Add content-length header
321+
reconstructed.extend_from_slice(
322+
format!("Content-Length: {}\r\n", decoded_body.len()).as_bytes(),
323+
);
324+
}
325+
326+
// Add the decoded body
327+
reconstructed.extend_from_slice(b"\r\n");
328+
reconstructed.extend_from_slice(&decoded_body);
329+
330+
reconstructed
331+
} else {
332+
request_data.to_vec()
333+
}
334+
} else {
335+
request_data.to_vec()
336+
};
337+
338+
if let Err(e) = tokio::fs::write(output_path, data_to_write).await {
339+
eprintln!(
340+
"[dump-server] Failed to write request dump to {:?}: {}",
341+
output_path, e
342+
);
343+
}
344+
}
345+
346+
/// Handle a connection: read HTTP request, write to file, send response
347+
async fn handle_connection_async<S>(mut stream: S, output_path: PathBuf)
348+
where
349+
S: tokio::io::AsyncReadExt + tokio::io::AsyncWriteExt + Unpin,
350+
{
351+
let request_data = read_http_request_async(&mut stream).await;
352+
write_request_to_file_async(&output_path, &request_data).await;
353+
354+
if let Err(e) = stream.write_all(HTTP_200_RESPONSE).await {
355+
eprintln!("[dump-server] Failed to send HTTP response: {}", e);
356+
}
357+
}

0 commit comments

Comments
 (0)