Skip to content

Commit fa5a9ae

Browse files
cursoragentscript3r
andcommitted
feat: Stream findings as JSONL and improve scanner performance
Co-authored-by: script3r <[email protected]>
1 parent e55ca28 commit fa5a9ae

File tree

2 files changed

+207
-88
lines changed

2 files changed

+207
-88
lines changed

crates/cli/src/main.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ fn main() -> Result<()> {
155155
ProgressStyle::default_bar()
156156
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} files ({percent}%) | {msg}")
157157
.unwrap()
158-
.progress_chars("#>-"),
158+
.progress_chars("#>-")
159159
);
160160
pb.set_message("Scanning files...");
161161

@@ -166,6 +166,22 @@ fn main() -> Result<()> {
166166
}));
167167
}
168168

169+
// Stream JSONL findings as they arrive
170+
if args.json {
171+
let stdout = std::io::stdout();
172+
let lock = stdout.lock();
173+
let write = std::sync::Mutex::new(lock);
174+
cfg.result_callback = Some(Arc::new(move |f: &Finding| {
175+
if let Ok(s) = serde_json::to_string(f) {
176+
if let Ok(mut guard) = write.lock() {
177+
use std::io::Write;
178+
let _ = guard.write_all(s.as_bytes());
179+
let _ = guard.write_all(b"\n");
180+
}
181+
}
182+
}));
183+
}
184+
169185
let scanner = Scanner::new(&reg, dets, cfg);
170186
if args.dry_run {
171187
let files = scanner.discover_files(&args.paths);
@@ -183,9 +199,7 @@ fn main() -> Result<()> {
183199
}
184200

185201
if args.json {
186-
for f in &findings {
187-
println!("{}", serde_json::to_string(f)?);
188-
}
202+
// Already streamed above
189203
} else {
190204
print_table(&findings);
191205
}

crates/scanner-core/src/lib.rs

Lines changed: 189 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::io::Read;
1111
use std::path::{Path, PathBuf};
1212
use std::sync::Arc;
1313
use std::sync::Mutex;
14-
use std::process::Command;
14+
// use std::process::Command; // removed: no git fast path
1515

1616
// ---------------- Types ----------------
1717

@@ -113,15 +113,19 @@ pub trait Detector: Send + Sync {
113113
pub struct Emitter {
114114
tx: Sender<Finding>,
115115
rx: Receiver<Finding>,
116+
on_result: Option<Arc<dyn Fn(&Finding) + Send + Sync>>,
116117
}
117118

118119
impl Emitter {
119120
pub fn new(bound: usize) -> Self {
120121
let (tx, rx) = bounded(bound);
121-
Self { tx, rx }
122+
Self { tx, rx, on_result: None }
122123
}
123124

124125
pub fn send(&mut self, finding: Finding) -> Result<()> {
126+
if let Some(ref cb) = self.on_result {
127+
cb(&finding);
128+
}
125129
self.tx
126130
.send(finding)
127131
.map_err(|e| anyhow!("emitter send failed: {e}"))
@@ -183,6 +187,8 @@ pub struct Config {
183187
pub deterministic: bool,
184188
#[serde(skip)]
185189
pub progress_callback: Option<ProgressCallback>,
190+
#[serde(skip)]
191+
pub result_callback: Option<Arc<dyn Fn(&Finding) + Send + Sync>>,
186192
}
187193

188194
fn default_max_file_size() -> usize {
@@ -209,6 +215,7 @@ impl Clone for Config {
209215
exclude_globs: self.exclude_globs.clone(),
210216
deterministic: self.deterministic,
211217
progress_callback: self.progress_callback.clone(),
218+
result_callback: self.result_callback.clone(),
212219
}
213220
}
214221
}
@@ -221,6 +228,7 @@ impl Default for Config {
221228
exclude_globs: Vec::new(),
222229
deterministic: false,
223230
progress_callback: None,
231+
result_callback: None,
224232
}
225233
}
226234
}
@@ -771,25 +779,6 @@ impl<'a> Scanner<'a> {
771779
};
772780

773781
for root in roots {
774-
// Fast path: leverage git index if available
775-
if root.join(".git").exists() {
776-
if let Some(list) = git_list_files_fast(root) {
777-
for path in list {
778-
if !path_allowed(&path) {
779-
continue;
780-
}
781-
// Only then stat for size
782-
if let Ok(md) = fs::metadata(&path) {
783-
if md.is_file() && (md.len() as usize) <= self.config.max_file_size {
784-
discovered_paths.push(path);
785-
}
786-
}
787-
}
788-
// Move on to next root after using the git fast path
789-
continue;
790-
}
791-
}
792-
793782
// Fallback: parallel directory walk with ignore rules
794783
let mut builder = WalkBuilder::new(root);
795784
builder
@@ -897,86 +886,193 @@ impl<'a> Scanner<'a> {
897886
}
898887

899888
pub fn run(&self, roots: &[PathBuf]) -> Result<Vec<Finding>> {
900-
let files = self.discover_files(roots);
901-
let total_files = files.len();
902-
let mut findings: Vec<Finding> = Vec::new();
889+
use std::sync::atomic::{AtomicUsize, Ordering};
890+
891+
let findings_vec: Arc<Mutex<Vec<Finding>>> = Arc::new(Mutex::new(Vec::new()));
892+
let processed = Arc::new(AtomicUsize::new(0));
893+
let discovered = Arc::new(AtomicUsize::new(0));
894+
let findings_cnt = Arc::new(AtomicUsize::new(0));
903895

904-
// Call progress callback with initial state
905-
if let Some(ref callback) = self.config.progress_callback {
906-
callback(0, total_files, 0);
896+
// Initial progress callback (0 of 0)
897+
if let Some(ref cb) = self.config.progress_callback {
898+
cb(0, 0, 0);
907899
}
908900

909901
let (tx, rx) = bounded::<Finding>(8192);
910-
let (progress_tx, progress_rx) = bounded::<usize>(1000);
911-
912-
// Spawn a thread to collect progress updates
913-
let progress_handle = if let Some(ref callback) = self.config.progress_callback {
914-
let callback = callback.clone();
915-
Some(std::thread::spawn(move || {
916-
let mut processed = 0;
917-
let findings_count = 0;
918-
919-
while progress_rx.recv().is_ok() {
920-
processed += 1;
921-
callback(processed, total_files, findings_count);
902+
903+
// Collector thread to drain findings as they arrive and keep count
904+
let findings_vec_ref = findings_vec.clone();
905+
let findings_cnt_ref = findings_cnt.clone();
906+
let progress_cb = self.config.progress_callback.clone();
907+
let processed_ref = processed.clone();
908+
let discovered_ref = discovered.clone();
909+
let collector = std::thread::spawn(move || {
910+
for f in rx.iter() {
911+
if let Ok(mut guard) = findings_vec_ref.lock() {
912+
guard.push(f);
913+
}
914+
let new_cnt = findings_cnt_ref.fetch_add(1, Ordering::Relaxed) + 1;
915+
if let Some(cb) = &progress_cb {
916+
cb(
917+
processed_ref.load(Ordering::Relaxed),
918+
discovered_ref.load(Ordering::Relaxed),
919+
new_cnt,
920+
);
921+
}
922+
}
923+
});
924+
925+
// Prepare include/exclude matchers for filtering
926+
let include_matcher: Option<globset::GlobSet> = if !self.config.include_globs.is_empty() {
927+
let mut builder = globset::GlobSetBuilder::new();
928+
for pattern in &self.config.include_globs {
929+
if let Ok(glob) = globset::Glob::new(pattern) {
930+
builder.add(glob);
931+
} else {
932+
return Ok(Vec::new());
922933
}
923-
}))
934+
}
935+
builder.build().ok()
924936
} else {
925937
None
926938
};
927939

928-
files.par_iter().for_each_with(
929-
(tx.clone(), progress_tx.clone()),
930-
|(tx, progress_tx), path| {
931-
if let Some(lang) = Self::detect_language(path) {
932-
if let Ok(bytes) = Self::load_file(path) {
933-
let unit = ScanUnit {
934-
path: path.clone(),
935-
lang,
936-
bytes: bytes.clone(),
937-
};
938-
// Strip comments once and reuse
939-
let stripped = strip_comments(lang, &bytes);
940-
let stripped_s = String::from_utf8_lossy(&stripped);
941-
let index = LineIndex::new(stripped_s.as_bytes());
942-
943-
let mut em = Emitter {
944-
tx: tx.clone(),
945-
rx: rx.clone(),
946-
};
947-
for det in &self.detectors {
948-
if !det.languages().contains(&lang) {
949-
continue;
950-
}
951-
if !prefilter_hit(det.as_ref(), &stripped) {
952-
continue;
953-
}
954-
let _ = det.scan_optimized(&unit, &stripped_s, &index, &mut em);
940+
let exclude_matcher: Option<globset::GlobSet> = if !self.config.exclude_globs.is_empty() {
941+
let mut builder = globset::GlobSetBuilder::new();
942+
for pattern in &self.config.exclude_globs {
943+
if let Ok(glob) = globset::Glob::new(pattern) {
944+
builder.add(glob);
945+
} else {
946+
return Ok(Vec::new());
947+
}
948+
}
949+
builder.build().ok()
950+
} else {
951+
None
952+
};
953+
954+
let path_allowed = |p: &Path| -> bool {
955+
if let Some(ref ex) = exclude_matcher {
956+
if ex.is_match(p) {
957+
return false;
958+
}
959+
}
960+
if let Some(ref inc) = include_matcher {
961+
if !inc.is_match(p) {
962+
return false;
963+
}
964+
}
965+
true
966+
};
967+
968+
for root in roots {
969+
let mut builder = WalkBuilder::new(root);
970+
builder
971+
.hidden(false)
972+
.git_ignore(true)
973+
.git_exclude(true)
974+
.ignore(true)
975+
.parents(true)
976+
.follow_links(false)
977+
.same_file_system(false);
978+
979+
if let Ok(n) = std::thread::available_parallelism() {
980+
builder.threads(n.get());
981+
}
982+
983+
let tx_ref = tx.clone();
984+
let result_cb = self.config.result_callback.clone();
985+
let detectors = &self.detectors;
986+
let max_file_size = self.config.max_file_size;
987+
let processed_ref = processed.clone();
988+
let discovered_ref = discovered.clone();
989+
let findings_cnt_ref = findings_cnt.clone();
990+
let progress_cb_inner = self.config.progress_callback.clone();
991+
992+
builder.build_parallel().run(|| {
993+
let tx = tx_ref.clone();
994+
let result_cb = result_cb.clone();
995+
let progress_cb_inner = progress_cb_inner.clone();
996+
Box::new(move |res| {
997+
let entry = match res {
998+
Ok(e) => e,
999+
Err(_) => return ignore::WalkState::Continue,
1000+
};
1001+
1002+
if let Some(ft) = entry.file_type() {
1003+
if !ft.is_file() {
1004+
return ignore::WalkState::Continue;
1005+
}
1006+
} else if let Ok(md) = entry.metadata() {
1007+
if !md.is_file() {
1008+
return ignore::WalkState::Continue;
9551009
}
1010+
} else {
1011+
return ignore::WalkState::Continue;
9561012
}
957-
}
958-
// Signal that this file has been processed
959-
let _ = progress_tx.send(1);
960-
},
961-
);
9621013

963-
drop(tx);
964-
drop(progress_tx);
1014+
let path = entry.into_path();
1015+
if !path_allowed(&path) {
1016+
return ignore::WalkState::Continue;
1017+
}
9651018

966-
for f in rx.iter() {
967-
findings.push(f);
968-
}
1019+
// Count as discovered candidate
1020+
discovered_ref.fetch_add(1, Ordering::Relaxed);
9691021

970-
// Wait for progress thread to finish
971-
if let Some(handle) = progress_handle {
972-
let _ = handle.join();
973-
}
1022+
// Size check
1023+
if let Ok(md) = fs::metadata(&path) {
1024+
if (md.len() as usize) > max_file_size {
1025+
return ignore::WalkState::Continue;
1026+
}
1027+
} else {
1028+
return ignore::WalkState::Continue;
1029+
}
9741030

975-
// Final progress update
976-
if let Some(ref callback) = self.config.progress_callback {
977-
callback(total_files, total_files, findings.len());
1031+
if let Some(lang) = Scanner::detect_language(&path) {
1032+
if let Ok(bytes) = Scanner::load_file(&path) {
1033+
let unit = ScanUnit {
1034+
path: path.clone(),
1035+
lang,
1036+
bytes: bytes.clone(),
1037+
};
1038+
let stripped = strip_comments(lang, &bytes);
1039+
let stripped_s = String::from_utf8_lossy(&stripped);
1040+
let index = LineIndex::new(stripped_s.as_bytes());
1041+
1042+
// Create a minimal emitter that streams results via callback and sends to collector
1043+
let (_dtx, dummy_rx) = bounded(0);
1044+
let mut em = Emitter { tx: tx.clone(), rx: dummy_rx, on_result: result_cb.clone() };
1045+
for det in detectors {
1046+
if !det.languages().contains(&lang) {
1047+
continue;
1048+
}
1049+
if !prefilter_hit(det.as_ref(), &stripped) {
1050+
continue;
1051+
}
1052+
let _ = det.scan_optimized(&unit, &stripped_s, &index, &mut em);
1053+
}
1054+
}
1055+
}
1056+
1057+
// Mark processed and update progress
1058+
let new_proc = processed_ref.fetch_add(1, Ordering::Relaxed) + 1;
1059+
if let Some(ref cb) = progress_cb_inner {
1060+
cb(
1061+
new_proc,
1062+
discovered_ref.load(Ordering::Relaxed),
1063+
findings_cnt_ref.load(Ordering::Relaxed),
1064+
);
1065+
}
1066+
1067+
ignore::WalkState::Continue
1068+
})
1069+
});
9781070
}
9791071

1072+
drop(tx);
1073+
let _ = collector.join();
1074+
1075+
let mut findings = findings_vec.lock().unwrap().clone();
9801076
if self.config.deterministic {
9811077
findings.sort_by(|a, b| {
9821078
(
@@ -996,6 +1092,15 @@ impl<'a> Scanner<'a> {
9961092
});
9971093
}
9981094

1095+
// Final progress update
1096+
if let Some(ref cb) = self.config.progress_callback {
1097+
cb(
1098+
processed.load(Ordering::Relaxed),
1099+
discovered.load(Ordering::Relaxed),
1100+
findings_cnt.load(Ordering::Relaxed),
1101+
);
1102+
}
1103+
9991104
Ok(findings)
10001105
}
10011106
}

0 commit comments

Comments
 (0)