Skip to content

Commit b7f2c81

Browse files
cursoragentscript3r
andcommitted
Refactor scanner to use rayon for parallel file processing
Co-authored-by: script3r <[email protected]>
1 parent 9d42d4b commit b7f2c81

File tree

1 file changed

+90
-94
lines changed

1 file changed

+90
-94
lines changed

crates/scanner-core/src/lib.rs

Lines changed: 90 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use aho_corasick::AhoCorasickBuilder;
22
use anyhow::{anyhow, Context, Result};
33
use crossbeam_channel::{bounded, Receiver, Sender};
44
use ignore::WalkBuilder;
5-
// use rayon::prelude::*; // no longer using rayon parallel iterators in run()
5+
// use rayon::prelude::*; // Using rayon for parallel processing of discovered files (imported locally where needed)
66
use regex::Regex;
77
use serde::{Deserialize, Serialize};
88
use std::collections::{BTreeSet, HashMap};
@@ -903,29 +903,7 @@ impl<'a> Scanner<'a> {
903903
cb(0, 0, 0);
904904
}
905905

906-
let (tx, rx) = bounded::<Finding>(8192);
907-
908-
// Collector thread to drain findings as they arrive and keep count
909-
let findings_vec_ref = findings_vec.clone();
910-
let findings_cnt_ref = findings_cnt.clone();
911-
let progress_cb = self.config.progress_callback.clone();
912-
let processed_ref = processed.clone();
913-
let discovered_ref = discovered.clone();
914-
let collector = std::thread::spawn(move || {
915-
for f in rx.iter() {
916-
if let Ok(mut guard) = findings_vec_ref.lock() {
917-
guard.push(f);
918-
}
919-
let new_cnt = findings_cnt_ref.fetch_add(1, Ordering::Relaxed) + 1;
920-
if let Some(cb) = &progress_cb {
921-
cb(
922-
processed_ref.load(Ordering::Relaxed),
923-
discovered_ref.load(Ordering::Relaxed),
924-
new_cnt,
925-
);
926-
}
927-
}
928-
});
906+
// Simplified approach: collect findings directly without a separate collector thread
929907

930908
// Prepare include/exclude matchers for filtering
931909
let include_matcher: Option<globset::GlobSet> = if !self.config.include_globs.is_empty() {
@@ -971,8 +949,6 @@ impl<'a> Scanner<'a> {
971949
};
972950

973951
if roots.is_empty() {
974-
drop(tx);
975-
let _ = collector.join();
976952
return Ok(findings_vec.lock().unwrap().clone());
977953
}
978954

@@ -994,71 +970,77 @@ impl<'a> Scanner<'a> {
994970
builder.threads(n.get());
995971
}
996972

997-
let tx_ref = tx.clone();
998973
let result_cb = self.config.result_callback.clone();
999974
let detectors = &self.detectors;
1000975
let max_file_size = self.config.max_file_size;
1001976
let processed_ref = processed.clone();
1002977
let discovered_ref = discovered.clone();
1003-
let findings_cnt_ref = findings_cnt.clone();
978+
let _findings_cnt_ref = findings_cnt.clone();
1004979
let progress_cb_inner = self.config.progress_callback.clone();
1005980

1006-
builder.build_parallel().run(|| {
1007-
let tx = tx_ref.clone();
1008-
let result_cb = result_cb.clone();
1009-
let progress_cb_inner = progress_cb_inner.clone();
1010-
let processed_ref2 = processed_ref.clone();
1011-
let discovered_ref2 = discovered_ref.clone();
1012-
let findings_cnt_ref2 = findings_cnt_ref.clone();
1013-
Box::new(move |res| {
1014-
let entry = match res {
1015-
Ok(e) => e,
1016-
Err(_) => return ignore::WalkState::Continue,
1017-
};
981+
// Use sequential walker to collect all paths (this guarantees completion)
982+
let mut all_paths = Vec::new();
983+
984+
for result in builder.build() {
985+
let entry = match result {
986+
Ok(e) => e,
987+
Err(_) => continue,
988+
};
1018989

1019-
if let Some(ft) = entry.file_type() {
1020-
if !ft.is_file() {
1021-
return ignore::WalkState::Continue;
1022-
}
1023-
} else if let Ok(md) = entry.metadata() {
1024-
if !md.is_file() {
1025-
return ignore::WalkState::Continue;
1026-
}
1027-
} else {
1028-
return ignore::WalkState::Continue;
990+
if let Some(ft) = entry.file_type() {
991+
if !ft.is_file() {
992+
continue;
1029993
}
1030-
1031-
let path = entry.into_path();
1032-
if !path_allowed(&path) {
1033-
return ignore::WalkState::Continue;
994+
} else if let Ok(md) = entry.metadata() {
995+
if !md.is_file() {
996+
continue;
1034997
}
998+
} else {
999+
continue;
1000+
}
10351001

1036-
// Count as discovered candidate
1037-
discovered_ref2.fetch_add(1, Ordering::Relaxed);
1002+
let path = entry.into_path();
1003+
if !path_allowed(&path) {
1004+
continue;
1005+
}
10381006

1039-
// Size check
1040-
if let Ok(md) = fs::metadata(&path) {
1041-
if (md.len() as usize) > max_file_size {
1042-
return ignore::WalkState::Continue;
1043-
}
1044-
} else {
1045-
return ignore::WalkState::Continue;
1007+
// Size check
1008+
if let Ok(md) = fs::metadata(&path) {
1009+
if (md.len() as usize) > max_file_size {
1010+
continue;
10461011
}
1012+
} else {
1013+
continue;
1014+
}
10471015

1048-
if let Some(lang) = Scanner::detect_language(&path) {
1049-
if let Ok(bytes) = Scanner::load_file(&path) {
1050-
let unit = ScanUnit {
1051-
path: path.clone(),
1052-
lang,
1053-
bytes: bytes.clone(),
1054-
};
1055-
let stripped = strip_comments(lang, &bytes);
1056-
let stripped_s = String::from_utf8_lossy(&stripped);
1057-
let index = LineIndex::new(stripped_s.as_bytes());
1058-
1059-
// Create a minimal emitter that streams results via callback and sends to collector
1060-
let (_dtx, dummy_rx) = bounded(0);
1061-
let mut em = Emitter { tx: tx.clone(), rx: dummy_rx, on_result: result_cb.clone() };
1016+
all_paths.push(path);
1017+
}
1018+
1019+
// Process all discovered paths using rayon for parallel processing
1020+
use rayon::prelude::*;
1021+
use std::sync::Mutex as StdMutex;
1022+
1023+
let all_findings = Arc::new(StdMutex::new(Vec::new()));
1024+
1025+
all_paths.par_iter().for_each(|path| {
1026+
// Count as discovered candidate
1027+
discovered_ref.fetch_add(1, Ordering::Relaxed);
1028+
1029+
if let Some(lang) = Scanner::detect_language(path) {
1030+
if let Ok(bytes) = Scanner::load_file(path) {
1031+
let unit = ScanUnit {
1032+
path: path.clone(),
1033+
lang,
1034+
bytes: bytes.clone(),
1035+
};
1036+
let stripped = strip_comments(lang, &bytes);
1037+
let stripped_s = String::from_utf8_lossy(&stripped);
1038+
let index = LineIndex::new(stripped_s.as_bytes());
1039+
1040+
// Collect findings locally first
1041+
{
1042+
let (local_tx, local_rx) = bounded(100);
1043+
let mut em = Emitter { tx: local_tx, rx: local_rx, on_result: result_cb.clone() };
10621044
for det in detectors {
10631045
if !det.languages().contains(&lang) {
10641046
continue;
@@ -1068,27 +1050,41 @@ impl<'a> Scanner<'a> {
10681050
}
10691051
let _ = det.scan_optimized(&unit, &stripped_s, &index, &mut em);
10701052
}
1053+
// Collect all findings from this file and add to global collection
1054+
let local_findings = em.drain();
1055+
if !local_findings.is_empty() {
1056+
if let Ok(mut guard) = all_findings.lock() {
1057+
guard.extend(local_findings);
1058+
}
1059+
}
10711060
}
10721061
}
1062+
}
10731063

1074-
// Mark processed and update progress
1075-
let new_proc = processed_ref2.fetch_add(1, Ordering::Relaxed) + 1;
1076-
if let Some(ref cb) = progress_cb_inner {
1077-
cb(
1078-
new_proc,
1079-
discovered_ref2.load(Ordering::Relaxed),
1080-
findings_cnt_ref2.load(Ordering::Relaxed),
1081-
);
1082-
}
1083-
1084-
ignore::WalkState::Continue
1085-
})
1064+
// Mark processed and update progress
1065+
let new_proc = processed_ref.fetch_add(1, Ordering::Relaxed) + 1;
1066+
if let Some(ref cb) = progress_cb_inner {
1067+
let current_findings = {
1068+
let guard = all_findings.lock().unwrap();
1069+
guard.len()
1070+
};
1071+
cb(
1072+
new_proc,
1073+
discovered_ref.load(Ordering::Relaxed),
1074+
current_findings,
1075+
);
1076+
}
10861077
});
1087-
1088-
drop(tx);
1089-
let _ = collector.join();
1090-
1091-
let mut findings = findings_vec.lock().unwrap().clone();
1078+
1079+
// Extract all findings and add them to the main findings vector
1080+
let mut findings = {
1081+
let collected_findings = all_findings.lock().unwrap();
1082+
let mut findings_guard = findings_vec.lock().unwrap();
1083+
findings_guard.extend(collected_findings.clone());
1084+
findings_guard.clone()
1085+
};
1086+
1087+
// All processing completed successfully
10921088
if self.config.deterministic {
10931089
findings.sort_by(|a, b| {
10941090
(

0 commit comments

Comments
 (0)