Skip to content

Commit 2632ccf

Browse files
committed
Explicitly flush the file
1 parent ca7e810 commit 2632ccf

File tree

2 files changed

+108
-73
lines changed

2 files changed

+108
-73
lines changed

libdd-profiling/src/exporter/file_exporter.rs

Lines changed: 94 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
//!
99
//! This is primarily used for testing to validate the exact bytes sent over the wire.
1010
11+
use anyhow::Context as _;
1112
use std::path::PathBuf;
1213

1314
/// HTTP 200 OK response with no body
@@ -124,7 +125,9 @@ async fn run_dump_server_unix(
124125
) -> anyhow::Result<()> {
125126
loop {
126127
let (stream, _) = listener.accept().await?;
127-
handle_connection_async(stream, output_path.clone()).await;
128+
if let Err(e) = handle_connection_async(stream, output_path.clone()).await {
129+
eprintln!("[dump-server] Error handling connection: {:#}", e);
130+
}
128131
}
129132
}
130133

@@ -139,7 +142,9 @@ async fn run_dump_server_windows(
139142

140143
// Handle first connection
141144
first_server.connect().await?;
142-
handle_connection_async(first_server, output_path.clone()).await;
145+
if let Err(e) = handle_connection_async(first_server, output_path.clone()).await {
146+
eprintln!("[dump-server] Error handling connection: {:#}", e);
147+
}
143148

144149
// Handle subsequent connections
145150
loop {
@@ -152,7 +157,9 @@ async fn run_dump_server_windows(
152157
server.connect().await?;
153158

154159
// Handle connection sequentially (this is just a debugging API)
155-
handle_connection_async(server, output_path.clone()).await;
160+
if let Err(e) = handle_connection_async(server, output_path.clone()).await {
161+
eprintln!("[dump-server] Error handling connection: {:#}", e);
162+
}
156163
}
157164
}
158165

@@ -203,39 +210,41 @@ fn is_request_complete(
203210
}
204211

205212
/// Read complete HTTP request from an async stream
206-
async fn read_http_request_async<R: tokio::io::AsyncReadExt + Unpin>(stream: &mut R) -> Vec<u8> {
213+
async fn read_http_request_async<R: tokio::io::AsyncReadExt + Unpin>(
214+
stream: &mut R,
215+
) -> anyhow::Result<Vec<u8>> {
207216
let mut request_data = Vec::new();
208217
let mut buffer = [0u8; 8192];
209218
let mut content_length: Option<usize> = None;
210219
let mut headers_end_pos: Option<usize> = None;
211220

212221
loop {
213-
match stream.read(&mut buffer).await {
214-
Ok(0) => break, // Connection closed
215-
Ok(n) => {
216-
request_data.extend_from_slice(&buffer[..n]);
217-
218-
// Look for end of headers if we haven't found it yet
219-
if headers_end_pos.is_none() {
220-
if let Some(pos) = find_subsequence(&request_data, b"\r\n\r\n") {
221-
headers_end_pos = Some(pos + 4);
222-
content_length = parse_content_length(&request_data[..pos]);
223-
}
224-
}
222+
let n = stream
223+
.read(&mut buffer)
224+
.await
225+
.context("Failed to read from connection")?;
225226

226-
// Check if we have the complete request
227-
if is_request_complete(&request_data, headers_end_pos, content_length) {
228-
break;
229-
}
230-
}
231-
Err(e) => {
232-
eprintln!("[dump-server] Failed to read from connection: {}", e);
233-
break;
227+
if n == 0 {
228+
break; // Connection closed
229+
}
230+
231+
request_data.extend_from_slice(&buffer[..n]);
232+
233+
// Look for end of headers if we haven't found it yet
234+
if headers_end_pos.is_none() {
235+
if let Some(pos) = find_subsequence(&request_data, b"\r\n\r\n") {
236+
headers_end_pos = Some(pos + 4);
237+
content_length = parse_content_length(&request_data[..pos]);
234238
}
235239
}
240+
241+
// Check if we have the complete request
242+
if is_request_complete(&request_data, headers_end_pos, content_length) {
243+
break;
244+
}
236245
}
237246

238-
request_data
247+
Ok(request_data)
239248
}
240249

241250
/// Decode chunked transfer encoding
@@ -245,49 +254,51 @@ fn decode_chunked_body(chunked_data: &[u8]) -> Vec<u8> {
245254

246255
while pos < chunked_data.len() {
247256
// Find the end of the chunk size line (\r\n)
248-
if let Some(line_end) = find_subsequence(&chunked_data[pos..], b"\r\n") {
249-
// Parse the chunk size (hex)
250-
if let Ok(size_str) = std::str::from_utf8(&chunked_data[pos..pos + line_end]) {
251-
if let Ok(chunk_size) = usize::from_str_radix(size_str.trim(), 16) {
252-
if chunk_size == 0 {
253-
// End of chunks
254-
break;
255-
}
257+
let Some(line_end) = find_subsequence(&chunked_data[pos..], b"\r\n") else {
258+
break;
259+
};
256260

257-
// Move past the size line and \r\n
258-
pos += line_end + 2;
261+
// Parse the chunk size (hex)
262+
let Ok(size_str) = std::str::from_utf8(&chunked_data[pos..pos + line_end]) else {
263+
break;
264+
};
259265

260-
// Read the chunk data
261-
if pos + chunk_size <= chunked_data.len() {
262-
result.extend_from_slice(&chunked_data[pos..pos + chunk_size]);
263-
pos += chunk_size;
266+
let Ok(chunk_size) = usize::from_str_radix(size_str.trim(), 16) else {
267+
break;
268+
};
264269

265-
// Skip the trailing \r\n after the chunk
266-
if pos + 2 <= chunked_data.len() && &chunked_data[pos..pos + 2] == b"\r\n" {
267-
pos += 2;
268-
}
269-
} else {
270-
break;
271-
}
272-
} else {
273-
break;
274-
}
275-
} else {
276-
break;
277-
}
278-
} else {
270+
if chunk_size == 0 {
271+
break; // End of chunks
272+
}
273+
274+
// Move past the size line and \r\n
275+
pos += line_end + 2;
276+
277+
// Read the chunk data
278+
if pos + chunk_size > chunked_data.len() {
279279
break;
280280
}
281+
282+
result.extend_from_slice(&chunked_data[pos..pos + chunk_size]);
283+
pos += chunk_size;
284+
285+
// Skip the trailing \r\n after the chunk
286+
if pos + 2 <= chunked_data.len() && &chunked_data[pos..pos + 2] == b"\r\n" {
287+
pos += 2;
288+
}
281289
}
282290

283291
result
284292
}
285293

286294
/// Write request data to file if non-empty (async version)
287295
/// Decodes chunked transfer encoding if present
288-
async fn write_request_to_file_async(output_path: &PathBuf, request_data: &[u8]) {
296+
async fn write_request_to_file_async(
297+
output_path: &PathBuf,
298+
request_data: &[u8],
299+
) -> anyhow::Result<()> {
289300
if request_data.is_empty() {
290-
return;
301+
return Ok(());
291302
}
292303

293304
// Check if this is a chunked request and decode it
@@ -296,13 +307,11 @@ async fn write_request_to_file_async(output_path: &PathBuf, request_data: &[u8])
296307
let body = &request_data[headers_end + 4..];
297308

298309
// Check for transfer-encoding: chunked
299-
let is_chunked = if let Ok(headers_str) = std::str::from_utf8(headers) {
310+
let is_chunked = std::str::from_utf8(headers).is_ok_and(|headers_str| {
300311
headers_str
301312
.to_lowercase()
302313
.contains("transfer-encoding: chunked")
303-
} else {
304-
false
305-
};
314+
});
306315

307316
if is_chunked {
308317
// Decode the chunked body and reconstruct the request with Content-Length
@@ -335,23 +344,37 @@ async fn write_request_to_file_async(output_path: &PathBuf, request_data: &[u8])
335344
request_data.to_vec()
336345
};
337346

338-
if let Err(e) = tokio::fs::write(output_path, data_to_write).await {
339-
eprintln!(
340-
"[dump-server] Failed to write request dump to {:?}: {}",
341-
output_path, e
342-
);
343-
}
347+
// Write to file and explicitly sync to ensure data is on disk before responding
348+
use tokio::io::AsyncWriteExt;
349+
350+
let mut file = tokio::fs::File::create(output_path)
351+
.await
352+
.context("Failed to create dump file")?;
353+
354+
file.write_all(&data_to_write)
355+
.await
356+
.context("Failed to write request dump")?;
357+
358+
// Sync to ensure data is persisted to disk before sending HTTP response
359+
file.sync_all()
360+
.await
361+
.context("Failed to sync request dump to disk")?;
362+
363+
Ok(())
344364
}
345365

346366
/// Handle a connection: read HTTP request, write to file, send response
347-
async fn handle_connection_async<S>(mut stream: S, output_path: PathBuf)
367+
async fn handle_connection_async<S>(mut stream: S, output_path: PathBuf) -> anyhow::Result<()>
348368
where
349369
S: tokio::io::AsyncReadExt + tokio::io::AsyncWriteExt + Unpin,
350370
{
351-
let request_data = read_http_request_async(&mut stream).await;
352-
write_request_to_file_async(&output_path, &request_data).await;
371+
let request_data = read_http_request_async(&mut stream).await?;
372+
write_request_to_file_async(&output_path, &request_data).await?;
353373

354-
if let Err(e) = stream.write_all(HTTP_200_RESPONSE).await {
355-
eprintln!("[dump-server] Failed to send HTTP response: {}", e);
356-
}
374+
stream
375+
.write_all(HTTP_200_RESPONSE)
376+
.await
377+
.context("Failed to send HTTP response")?;
378+
379+
Ok(())
357380
}

libdd-profiling/tests/file_exporter_test.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ fn create_file_exporter(
1818

1919
// Create a unique temp file path
2020
let temp_dir = std::env::temp_dir();
21+
let random_id: u64 = rand::random();
2122
let file_path = temp_dir.join(format!(
22-
"libdd_test_{}_{}.http",
23+
"libdd_test_{}_{}_{:x}.http",
2324
std::process::id(),
24-
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
25+
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
26+
random_id
2527
));
2628

2729
let mut endpoint = config::file(file_path.to_string_lossy().as_ref())?;
@@ -73,6 +75,8 @@ mod tests {
7375
exporter.send(request, None).expect("send to succeed");
7476

7577
// Read the dump file (wait a moment for it to be written)
78+
// The file is synced before the 200 response, but we still need a small delay
79+
// to ensure the background thread's runtime has fully completed the async operation
7680
std::thread::sleep(std::time::Duration::from_millis(200));
7781
let request_bytes = std::fs::read(&file_path).expect("read dump file");
7882

@@ -193,6 +197,8 @@ mod tests {
193197
exporter.send(request, None).expect("send to succeed");
194198

195199
// Read the dump file (wait a moment for it to be written)
200+
// The file is synced before the 200 response, but we still need a small delay
201+
// to ensure the background thread's runtime has fully completed the async operation
196202
std::thread::sleep(std::time::Duration::from_millis(200));
197203
let request_bytes = std::fs::read(&file_path).expect("read dump file");
198204

@@ -249,6 +255,8 @@ mod tests {
249255
exporter.send(request, None).expect("send to succeed");
250256

251257
// Read the dump file (wait a moment for it to be written)
258+
// The file is synced before the 200 response, but we still need a small delay
259+
// to ensure the background thread's runtime has fully completed the async operation
252260
std::thread::sleep(std::time::Duration::from_millis(200));
253261
let request_bytes = std::fs::read(&file_path).expect("read dump file");
254262

@@ -312,6 +320,8 @@ mod tests {
312320
exporter.send(request, None).expect("send to succeed");
313321

314322
// Read the dump file (wait a moment for it to be written)
323+
// The file is synced before the 200 response, but we still need a small delay
324+
// to ensure the background thread's runtime has fully completed the async operation
315325
std::thread::sleep(std::time::Duration::from_millis(200));
316326
let request_bytes = std::fs::read(&file_path).expect("read dump file");
317327

@@ -359,6 +369,8 @@ mod tests {
359369
exporter.send(request, None).expect("send to succeed");
360370

361371
// Read the dump file (wait a moment for it to be written)
372+
// The file is synced before the 200 response, but we still need a small delay
373+
// to ensure the background thread's runtime has fully completed the async operation
362374
std::thread::sleep(std::time::Duration::from_millis(200));
363375
let request_bytes = std::fs::read(&file_path).expect("read dump file");
364376

0 commit comments

Comments
 (0)