Skip to content

Commit 7adc28c

Browse files
committed
windows named pipes
1 parent 84b67f2 commit 7adc28c

File tree

4 files changed

+318
-139
lines changed

4 files changed

+318
-139
lines changed

libdd-profiling/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ serde = {version = "1.0", features = ["derive"]}
5454
serde_json = {version = "1.0"}
5555
target-triple = "0.1.4"
5656
thiserror = "2"
57-
tokio = {version = "1.23", features = ["rt", "macros"]}
57+
tokio = {version = "1.23", features = ["rt", "rt-multi-thread", "macros", "fs", "io-util", "net"]}
5858
tokio-util = "0.7.1"
5959
zstd = { version = "0.13", default-features = false }
6060

Lines changed: 185 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,27 @@
1-
#[cfg(unix)]
1+
#[cfg(any(unix, windows))]
22
use std::path::PathBuf;
33

4+
/// HTTP 200 OK response with no body
5+
#[cfg(any(unix, windows))]
6+
const HTTP_200_RESPONSE: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
7+
8+
/// Async server loop for Unix sockets
9+
#[cfg(unix)]
10+
async fn run_dump_server_unix(
11+
output_path: PathBuf,
12+
listener: tokio::net::UnixListener,
13+
) -> anyhow::Result<()> {
14+
loop {
15+
let (stream, _) = listener.accept().await?;
16+
handle_connection_async(stream, output_path.clone()).await;
17+
}
18+
}
19+
420
/// Spawns a HTTP dump server that saves incoming requests to a file
521
/// Returns the Unix socket path that the server is listening on
622
#[cfg(unix)]
723
pub(crate) fn spawn_dump_server(output_path: PathBuf) -> anyhow::Result<PathBuf> {
8-
use std::io::{Read, Write};
9-
use std::os::unix::net::UnixListener;
10-
11-
use anyhow::Context;
24+
use tokio::net::UnixListener;
1225

1326
// Create a temporary socket path with randomness to avoid collisions
1427
let random_id: u64 = rand::random();
@@ -21,108 +34,184 @@ pub(crate) fn spawn_dump_server(output_path: PathBuf) -> anyhow::Result<PathBuf>
2134
// Remove socket file if it already exists
2235
let _ = std::fs::remove_file(&socket_path);
2336

24-
let listener =
25-
UnixListener::bind(&socket_path).context("Failed to bind Unix socket for dump server")?;
26-
2737
let socket_path_clone = socket_path.clone();
28-
29-
// Spawn the server thread
38+
let (tx, rx) = std::sync::mpsc::channel();
39+
3040
std::thread::spawn(move || {
31-
loop {
32-
match listener.accept() {
33-
Ok((mut stream, _)) => {
34-
let output_path = output_path.clone();
35-
36-
std::thread::spawn(move || {
37-
// Read the HTTP request in chunks
38-
let mut request_data = Vec::new();
39-
let mut buffer = [0u8; 8192];
40-
let mut content_length: Option<usize> = None;
41-
let mut headers_end_pos: Option<usize> = None;
42-
43-
// Read headers first
44-
loop {
45-
match stream.read(&mut buffer) {
46-
Ok(0) => break, // Connection closed
47-
Ok(n) => {
48-
request_data.extend_from_slice(&buffer[..n]);
49-
50-
// Look for end of headers if we haven't found it yet
51-
if headers_end_pos.is_none() {
52-
if let Some(pos) =
53-
find_subsequence(&request_data, b"\r\n\r\n")
54-
{
55-
headers_end_pos = Some(pos + 4);
56-
57-
// Parse Content-Length from headers
58-
if let Ok(headers_str) =
59-
std::str::from_utf8(&request_data[..pos])
60-
{
61-
for line in headers_str.lines() {
62-
if line
63-
.to_lowercase()
64-
.starts_with("content-length:")
65-
{
66-
if let Some(len_str) =
67-
line.split(':').nth(1)
68-
{
69-
content_length =
70-
len_str.trim().parse().ok();
71-
}
72-
break;
73-
}
74-
}
75-
}
76-
}
77-
}
78-
79-
// Check if we have the complete request
80-
if let Some(headers_end) = headers_end_pos {
81-
if let Some(expected_len) = content_length {
82-
let body_len = request_data.len() - headers_end;
83-
if body_len >= expected_len {
84-
break; // Complete request received
85-
}
86-
}
87-
}
88-
}
89-
Err(e) => {
90-
eprintln!("Failed to read from dump server socket: {}", e);
91-
break;
92-
}
93-
}
94-
}
95-
96-
if !request_data.is_empty() {
97-
// Write the request directly to the specified file
98-
if let Err(e) = std::fs::write(&output_path, &request_data) {
99-
eprintln!(
100-
"Failed to write request dump to {:?}: {}",
101-
output_path, e
102-
);
103-
}
104-
}
105-
106-
// Send a simple HTTP 200 response
107-
let response = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
108-
let _ = stream.write_all(response);
109-
});
110-
}
111-
Err(e) => {
112-
eprintln!("Failed to accept connection on dump server: {}", e);
113-
break;
114-
}
115-
}
41+
// Top-level error handler - all errors logged here
42+
let result = (|| -> anyhow::Result<()> {
43+
let rt = tokio::runtime::Runtime::new()?;
44+
rt.block_on(async {
45+
let listener = UnixListener::bind(&socket_path)?;
46+
tx.send(Ok(()))?;
47+
run_dump_server_unix(output_path, listener).await
48+
})
49+
})();
50+
51+
if let Err(e) = result {
52+
eprintln!("[dump-server] Error: {}", e);
53+
let _ = tx.send(Err(e));
11654
}
11755
});
11856

57+
// Wait for server to be ready
58+
rx.recv()??;
11959
Ok(socket_path_clone)
12060
}
12161

62+
/// Async server loop for Windows named pipes
63+
#[cfg(windows)]
64+
async fn run_dump_server_windows(output_path: PathBuf, pipe_name: String) -> anyhow::Result<()> {
65+
use tokio::net::windows::named_pipe::ServerOptions;
66+
67+
loop {
68+
// Create server instance
69+
let mut server = ServerOptions::new()
70+
.first_pipe_instance(true)
71+
.create(&pipe_name)?;
72+
73+
// Wait for client connection
74+
server.connect().await?;
75+
76+
// Handle connection sequentially (this is just a debugging API)
77+
handle_connection_async(server, output_path.clone()).await;
78+
}
79+
}
80+
81+
/// Spawns a HTTP dump server that saves incoming requests to a file
82+
/// Returns the named pipe path that the server is listening on
83+
#[cfg(windows)]
84+
pub(crate) fn spawn_dump_server(output_path: PathBuf) -> anyhow::Result<PathBuf> {
85+
// Create a unique named pipe name with randomness to avoid collisions
86+
let random_id: u64 = rand::random();
87+
let pipe_name = format!(
88+
r"\\.\pipe\libdatadog_dump_{}_{:x}",
89+
std::process::id(),
90+
random_id
91+
);
92+
let pipe_path = PathBuf::from(&pipe_name);
93+
94+
let (tx, rx) = std::sync::mpsc::channel();
95+
96+
std::thread::spawn(move || {
97+
// Top-level error handler - all errors logged here
98+
let result = (|| -> anyhow::Result<()> {
99+
let rt = tokio::runtime::Runtime::new()?;
100+
rt.block_on(async {
101+
tx.send(Ok(()))?;
102+
run_dump_server_windows(output_path, pipe_name).await
103+
})
104+
})();
105+
106+
if let Err(e) = result {
107+
eprintln!("[dump-server] Error: {}", e);
108+
let _ = tx.send(Err(e));
109+
}
110+
});
111+
112+
// Wait for server to be ready
113+
rx.recv()??;
114+
Ok(pipe_path)
115+
}
116+
122117
/// Helper function to find a subsequence in a byte slice
123-
#[cfg(unix)]
118+
#[cfg(any(unix, windows))]
124119
fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
125120
haystack
126121
.windows(needle.len())
127122
.position(|window| window == needle)
128123
}
124+
125+
/// Parse Content-Length from HTTP headers
126+
#[cfg(any(unix, windows))]
127+
fn parse_content_length(headers_data: &[u8]) -> Option<usize> {
128+
if let Ok(headers_str) = std::str::from_utf8(headers_data) {
129+
for line in headers_str.lines() {
130+
if line.to_lowercase().starts_with("content-length:") {
131+
if let Some(len_str) = line.split(':').nth(1) {
132+
return len_str.trim().parse().ok();
133+
}
134+
}
135+
}
136+
}
137+
None
138+
}
139+
140+
/// Check if we have received a complete HTTP request
141+
#[cfg(any(unix, windows))]
142+
fn is_request_complete(
143+
request_data: &[u8],
144+
headers_end_pos: Option<usize>,
145+
content_length: Option<usize>,
146+
) -> bool {
147+
if let Some(headers_end) = headers_end_pos {
148+
if let Some(expected_len) = content_length {
149+
let body_len = request_data.len() - headers_end;
150+
return body_len >= expected_len;
151+
}
152+
}
153+
false
154+
}
155+
156+
/// Read complete HTTP request from an async stream
157+
#[cfg(any(unix, windows))]
158+
async fn read_http_request_async<R: tokio::io::AsyncReadExt + Unpin>(
159+
stream: &mut R,
160+
) -> Vec<u8> {
161+
let mut request_data = Vec::new();
162+
let mut buffer = [0u8; 8192];
163+
let mut content_length: Option<usize> = None;
164+
let mut headers_end_pos: Option<usize> = None;
165+
166+
loop {
167+
match stream.read(&mut buffer).await {
168+
Ok(0) => break, // Connection closed
169+
Ok(n) => {
170+
request_data.extend_from_slice(&buffer[..n]);
171+
172+
// Look for end of headers if we haven't found it yet
173+
if headers_end_pos.is_none() {
174+
if let Some(pos) = find_subsequence(&request_data, b"\r\n\r\n") {
175+
headers_end_pos = Some(pos + 4);
176+
content_length = parse_content_length(&request_data[..pos]);
177+
}
178+
}
179+
180+
// Check if we have the complete request
181+
if is_request_complete(&request_data, headers_end_pos, content_length) {
182+
break;
183+
}
184+
}
185+
Err(e) => {
186+
eprintln!("[dump-server] Failed to read from connection: {}", e);
187+
break;
188+
}
189+
}
190+
}
191+
192+
request_data
193+
}
194+
195+
/// Write request data to file if non-empty (async version)
196+
#[cfg(any(unix, windows))]
197+
async fn write_request_to_file_async(output_path: &PathBuf, request_data: &[u8]) {
198+
if !request_data.is_empty() {
199+
if let Err(e) = tokio::fs::write(output_path, request_data).await {
200+
eprintln!("[dump-server] Failed to write request dump to {:?}: {}", output_path, e);
201+
}
202+
}
203+
}
204+
205+
/// Handle a connection: read HTTP request, write to file, send response
206+
#[cfg(any(unix, windows))]
207+
async fn handle_connection_async<S>(mut stream: S, output_path: PathBuf)
208+
where
209+
S: tokio::io::AsyncReadExt + tokio::io::AsyncWriteExt + Unpin,
210+
{
211+
let request_data = read_http_request_async(&mut stream).await;
212+
write_request_to_file_async(&output_path, &request_data).await;
213+
214+
if let Err(e) = stream.write_all(HTTP_200_RESPONSE).await {
215+
eprintln!("[dump-server] Failed to send HTTP response: {}", e);
216+
}
217+
}

0 commit comments

Comments
 (0)