Skip to content

Commit be8d11f

Browse files
committed
fix(benchmark): fix zero memory, throughput, and batch duration metrics
- Add snapshot_current_memory() fallback to SubprocessAdapter for both single-file and batch extract paths, matching NativeAdapter pattern. Fixes peak_memory_bytes always being 0 for subprocess frameworks when background sampler collects no samples during sub-millisecond extractions. - Use extraction_duration (reported by subprocess) for throughput calculation instead of wall-clock duration (I/O roundtrip). Fall back to wall-clock duration when extraction_duration is unavailable. - Lower MIN_VALID_DURATION_SECS from 1ms to 1µs to avoid filtering out legitimate sub-millisecond extractions from throughput calculation. - Filter extraction_duration_ms == 0 in native batch mode, falling back to avg_duration_per_file. The u64 millisecond field truncates sub-millisecond extractions to 0. - Add pre-flight fixture validation in runner to fail fast on missing files. - Enable BENCHMARK_DEBUG=1 in CI benchmark script for diagnostic logging.
1 parent cade78c commit be8d11f

File tree

4 files changed

+295
-11
lines changed

4 files changed

+295
-11
lines changed

scripts/benchmarks/run-benchmark.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ if [ "$OCR_ENABLED" = "true" ]; then
3737
EXTRA_ARGS+=("--ocr")
3838
fi
3939

40-
"${HARNESS_PATH}" \
40+
BENCHMARK_DEBUG=1 "${HARNESS_PATH}" \
4141
run \
4242
--fixtures "${FIXTURES_DIR}" \
4343
--frameworks "${FRAMEWORK}" \

tools/benchmark-harness/src/adapters/native.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,13 @@ impl FrameworkAdapter for NativeAdapter {
321321
.map(|(file_path, extraction_result)| {
322322
let file_size = std::fs::metadata(file_path).map(|m| m.len()).unwrap_or(0);
323323

324-
// Read per-file extraction timing from metadata, fallback to average
324+
// Read per-file extraction timing from metadata, fallback to average.
325+
// Note: extraction_duration_ms is u64, so sub-millisecond extractions
326+
// truncate to 0ms. Fall back to avg_duration_per_file in that case.
325327
let extraction_duration = extraction_result
326328
.metadata
327329
.extraction_duration_ms
330+
.filter(|&ms| ms > 0)
328331
.map(Duration::from_millis)
329332
.unwrap_or(avg_duration_per_file);
330333

tools/benchmark-harness/src/adapters/subprocess.rs

Lines changed: 261 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@ use tokio::process::Command;
1919
/// Minimum duration in seconds for a valid throughput calculation.
2020
/// Durations below this threshold produce unreliable throughput values
2121
/// and will result in throughput being set to 0.0 (filtered in aggregation).
22-
const MIN_VALID_DURATION_SECS: f64 = 0.001; // 1 millisecond
22+
const MIN_VALID_DURATION_SECS: f64 = 0.000_001; // 1 microsecond
23+
24+
/// Check if verbose benchmark debugging is enabled via BENCHMARK_DEBUG env var.
25+
fn is_debug_enabled() -> bool {
26+
std::env::var("BENCHMARK_DEBUG").is_ok()
27+
}
2328

2429
/// State for a persistent subprocess that stays alive across multiple extractions
2530
struct PersistentProcess {
@@ -329,12 +334,13 @@ impl SubprocessAdapter {
329334
std::env::current_dir().map_err(Error::Io)?.join(file_path)
330335
};
331336

332-
let start = Instant::now();
333337
let mut guard = self.process.lock().await;
334338
let proc = guard
335339
.as_mut()
336340
.ok_or_else(|| Error::Benchmark("Persistent process not started".into()))?;
337341

342+
let start = Instant::now();
343+
338344
// Send file path
339345
proc.stdin
340346
.write_all(absolute_path.to_string_lossy().as_bytes())
@@ -349,14 +355,42 @@ impl SubprocessAdapter {
349355
.await
350356
.map_err(|e| Error::Benchmark(format!("Failed to flush stdin: {}", e)))?;
351357

358+
let write_elapsed = start.elapsed();
359+
352360
// Read one JSON line
353361
let mut line = String::new();
354-
tokio::time::timeout(timeout, proc.stdout.read_line(&mut line))
362+
let bytes_read = tokio::time::timeout(timeout, proc.stdout.read_line(&mut line))
355363
.await
356364
.map_err(|_| Error::Timeout(format!("Persistent process response exceeded {:?}", timeout)))?
357365
.map_err(|e| Error::Benchmark(format!("Failed to read from persistent process: {}", e)))?;
358366

359367
let duration = start.elapsed();
368+
369+
if bytes_read == 0 {
370+
return Err(Error::Benchmark(
371+
"Persistent process returned empty response (EOF — process may have crashed)".to_string(),
372+
));
373+
}
374+
375+
// Validate response is valid JSON before returning
376+
if line.trim().is_empty() {
377+
return Err(Error::Benchmark(
378+
"Persistent process returned blank line (expected JSON)".to_string(),
379+
));
380+
}
381+
382+
if is_debug_enabled() {
383+
eprintln!(
384+
"[persistent:{}] write={:.2}ms read={:.2}ms total={:.2}ms bytes={} path={}",
385+
self.name,
386+
write_elapsed.as_secs_f64() * 1000.0,
387+
(duration - write_elapsed).as_secs_f64() * 1000.0,
388+
duration.as_secs_f64() * 1000.0,
389+
bytes_read,
390+
absolute_path.display()
391+
);
392+
}
393+
360394
Ok((line, duration))
361395
}
362396

@@ -371,6 +405,20 @@ impl SubprocessAdapter {
371405
/// }
372406
/// ```
373407
fn parse_output(&self, stdout: &str) -> Result<serde_json::Value> {
408+
if is_debug_enabled() {
409+
let preview = if stdout.len() > 300 {
410+
format!("{}...[{} bytes total]", &stdout[..300], stdout.len())
411+
} else {
412+
stdout.to_string()
413+
};
414+
eprintln!(
415+
"[parse_output:{}] raw_len={} preview={}",
416+
self.name,
417+
stdout.len(),
418+
preview.trim()
419+
);
420+
}
421+
374422
let parsed: serde_json::Value = serde_json::from_str(stdout)
375423
.map_err(|e| Error::Benchmark(format!("Failed to parse subprocess output as JSON: {}", e)))?;
376424

@@ -522,7 +570,14 @@ impl FrameworkAdapter for SubprocessAdapter {
522570
}
523571
};
524572

525-
let samples = monitor.stop().await;
573+
// Take a post-extraction snapshot before stopping the monitor.
574+
// This provides a fallback memory measurement for sub-millisecond extractions
575+
// where the background sampler may not have collected any samples.
576+
let post_sample = monitor.snapshot_current_memory();
577+
let mut samples = monitor.stop().await;
578+
if samples.is_empty() {
579+
samples.push(post_sample);
580+
}
526581
let snapshots = monitor.get_snapshots().await;
527582
let resource_stats = ResourceMonitor::calculate_stats(&samples, &snapshots);
528583

@@ -575,8 +630,17 @@ impl FrameworkAdapter for SubprocessAdapter {
575630
}
576631
};
577632

578-
let extraction_duration = parsed
579-
.get("_extraction_time_ms")
633+
let extraction_time_raw = parsed.get("_extraction_time_ms");
634+
if is_debug_enabled() {
635+
eprintln!(
636+
"[extract:{}] _extraction_time_ms raw={:?}, keys={:?}",
637+
self.name,
638+
extraction_time_raw,
639+
parsed.as_object().map(|o| o.keys().collect::<Vec<_>>())
640+
);
641+
}
642+
643+
let extraction_duration = extraction_time_raw
580644
.and_then(|v| v.as_f64())
581645
.map(|ms| Duration::from_secs_f64(ms / 1000.0));
582646

@@ -585,8 +649,11 @@ impl FrameworkAdapter for SubprocessAdapter {
585649

586650
let subprocess_overhead = extraction_duration.map(|ext| duration.saturating_sub(ext));
587651

588-
let throughput = if duration.as_secs_f64() >= MIN_VALID_DURATION_SECS {
589-
file_size as f64 / duration.as_secs_f64()
652+
// Use extraction_duration for throughput when available (more accurate for persistent mode
653+
// where `duration` is just I/O roundtrip). Fall back to wall-clock `duration`.
654+
let effective_duration = extraction_duration.unwrap_or(duration);
655+
let throughput = if effective_duration.as_secs_f64() >= MIN_VALID_DURATION_SECS {
656+
file_size as f64 / effective_duration.as_secs_f64()
590657
} else {
591658
0.0 // Below minimum threshold - will be filtered in aggregation
592659
};
@@ -756,7 +823,12 @@ impl FrameworkAdapter for SubprocessAdapter {
756823
}
757824
};
758825

759-
let samples = monitor.stop().await;
826+
// Take a post-extraction snapshot as fallback for fast batch operations
827+
let post_sample = monitor.snapshot_current_memory();
828+
let mut samples = monitor.stop().await;
829+
if samples.is_empty() {
830+
samples.push(post_sample);
831+
}
760832
let snapshots = monitor.get_snapshots().await;
761833
let resource_stats = ResourceMonitor::calculate_stats(&samples, &snapshots);
762834

@@ -975,4 +1047,184 @@ mod tests {
9751047
assert!(adapter.supports_format("docx"));
9761048
assert!(!adapter.supports_format("unknown"));
9771049
}
1050+
1051+
#[tokio::test]
1052+
async fn test_persistent_echo_server() {
1053+
// Create inline echo server script in temp dir
1054+
let tmp_dir = tempfile::TempDir::new().unwrap();
1055+
let script_path = tmp_dir.path().join("echo_server.py");
1056+
std::fs::write(
1057+
&script_path,
1058+
r#"
1059+
import json, sys, time
1060+
for line in sys.stdin:
1061+
fp = line.strip()
1062+
if not fp:
1063+
continue
1064+
start = time.perf_counter()
1065+
try:
1066+
with open(fp, 'r', errors='replace') as f:
1067+
content = f.read()
1068+
except Exception as e:
1069+
content = f"error: {e}"
1070+
ms = (time.perf_counter() - start) * 1000.0
1071+
print(json.dumps({"content": content[:1000], "_extraction_time_ms": ms}), flush=True)
1072+
"#,
1073+
)
1074+
.unwrap();
1075+
1076+
let small_file = tmp_dir.path().join("small.txt");
1077+
std::fs::write(&small_file, "Hello, small file!").unwrap();
1078+
1079+
let medium_file = tmp_dir.path().join("medium.txt");
1080+
std::fs::write(&medium_file, "x".repeat(100_000)).unwrap(); // 100KB
1081+
1082+
let large_file = tmp_dir.path().join("large.txt");
1083+
std::fs::write(&large_file, "y".repeat(1_000_000)).unwrap(); // 1MB
1084+
1085+
// Create persistent adapter pointing to echo server
1086+
let adapter = SubprocessAdapter::with_persistent_mode(
1087+
"test-echo",
1088+
"python3",
1089+
vec![script_path.to_string_lossy().to_string()],
1090+
vec![],
1091+
vec!["txt".to_string()],
1092+
);
1093+
1094+
// Setup (spawns the persistent process)
1095+
adapter.setup().await.expect("setup should succeed");
1096+
1097+
// Warmup extraction (like CI does)
1098+
let warmup_result = adapter
1099+
.extract(&small_file, Duration::from_secs(10))
1100+
.await
1101+
.expect("warmup should succeed");
1102+
eprintln!(
1103+
"Warmup: success={}, duration={:?}, extraction_duration={:?}",
1104+
warmup_result.success, warmup_result.duration, warmup_result.extraction_duration
1105+
);
1106+
1107+
// Run 3 benchmark iterations like CI (different files to check for desync)
1108+
let files = [&small_file, &medium_file, &large_file];
1109+
for (i, file) in files.iter().enumerate() {
1110+
let result = adapter
1111+
.extract(file, Duration::from_secs(30))
1112+
.await
1113+
.expect("extract should succeed");
1114+
1115+
eprintln!(
1116+
"Iter {}: file={:?} size={} duration={:?} extraction_duration={:?} has_text={}",
1117+
i + 1,
1118+
file.file_name().unwrap(),
1119+
result.file_size,
1120+
result.duration,
1121+
result.extraction_duration,
1122+
result.extracted_text.is_some()
1123+
);
1124+
1125+
assert!(result.success, "Extraction {} should succeed", i + 1);
1126+
assert!(
1127+
result.extraction_duration.is_some(),
1128+
"Iteration {}: extraction_duration should NOT be null",
1129+
i + 1
1130+
);
1131+
assert!(
1132+
result.extracted_text.is_some(),
1133+
"Iteration {}: extracted_text should be present",
1134+
i + 1
1135+
);
1136+
1137+
// Duration should be reasonable
1138+
assert!(
1139+
result.duration.as_micros() > 10,
1140+
"Iteration {}: Duration too short: {:?}",
1141+
i + 1,
1142+
result.duration
1143+
);
1144+
}
1145+
1146+
// Verify durations scale with file size
1147+
let r_small = adapter.extract(&small_file, Duration::from_secs(10)).await.unwrap();
1148+
let r_large = adapter.extract(&large_file, Duration::from_secs(30)).await.unwrap();
1149+
eprintln!(
1150+
"Small duration: {:?}, Large duration: {:?}",
1151+
r_small.duration, r_large.duration
1152+
);
1153+
1154+
// Teardown
1155+
adapter.teardown().await.expect("teardown should succeed");
1156+
}
1157+
1158+
#[tokio::test]
1159+
async fn test_persistent_kreuzberg_python() {
1160+
// Test with actual kreuzberg Python script if available
1161+
let script_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1162+
.join("scripts")
1163+
.join("kreuzberg_extract.py");
1164+
if !script_path.exists() {
1165+
eprintln!("Skipping test: kreuzberg script not found");
1166+
return;
1167+
}
1168+
1169+
// Check if python3 has kreuzberg installed (skip if not)
1170+
let check = std::process::Command::new("python3")
1171+
.arg("-c")
1172+
.arg("import kreuzberg")
1173+
.output();
1174+
if check.is_err() || !check.unwrap().status.success() {
1175+
eprintln!("Skipping test: kreuzberg not installed in python3");
1176+
return;
1177+
}
1178+
1179+
let tmp_dir = tempfile::TempDir::new().unwrap();
1180+
let test_file = tmp_dir.path().join("test.txt");
1181+
std::fs::write(&test_file, "Hello from kreuzberg benchmark test!").unwrap();
1182+
1183+
let adapter = SubprocessAdapter::with_persistent_mode(
1184+
"kreuzberg-python-test",
1185+
"python3",
1186+
vec![
1187+
script_path.to_string_lossy().to_string(),
1188+
"--no-ocr".to_string(),
1189+
"server".to_string(),
1190+
],
1191+
vec![],
1192+
vec!["txt".to_string()],
1193+
);
1194+
1195+
adapter.setup().await.expect("setup should succeed");
1196+
1197+
// Run warmup + 3 iterations (like CI)
1198+
let warmup = adapter.extract(&test_file, Duration::from_secs(30)).await;
1199+
eprintln!(
1200+
"Kreuzberg warmup: {:?}",
1201+
warmup.as_ref().map(|r| (r.success, r.duration, r.extraction_duration))
1202+
);
1203+
1204+
for i in 0..3 {
1205+
let result = adapter.extract(&test_file, Duration::from_secs(30)).await;
1206+
match &result {
1207+
Ok(r) => {
1208+
eprintln!(
1209+
"Kreuzberg iter {}: success={} duration={:?} extraction_duration={:?}",
1210+
i + 1,
1211+
r.success,
1212+
r.duration,
1213+
r.extraction_duration
1214+
);
1215+
assert!(r.success, "Kreuzberg iter {} should succeed", i + 1);
1216+
assert!(
1217+
r.extraction_duration.is_some(),
1218+
"Kreuzberg iter {}: extraction_duration must not be null!",
1219+
i + 1
1220+
);
1221+
}
1222+
Err(e) => {
1223+
eprintln!("Kreuzberg iter {} failed: {}", i + 1, e);
1224+
}
1225+
}
1226+
}
1227+
1228+
adapter.teardown().await.expect("teardown should succeed");
1229+
}
9781230
}

0 commit comments

Comments
 (0)