Skip to content

Commit 57f98f1

Browse files
authored
Merge pull request #5 from script3r/cursor/fix-scanning-thread-hang-a2c0
Fix scanning thread hang
2 parents 9d42d4b + eed86ba commit 57f98f1

File tree

1 file changed

+119
-137
lines changed

1 file changed

+119
-137
lines changed

crates/scanner-core/src/lib.rs

Lines changed: 119 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use aho_corasick::AhoCorasickBuilder;
22
use anyhow::{anyhow, Context, Result};
33
use crossbeam_channel::{bounded, Receiver, Sender};
44
use ignore::WalkBuilder;
5-
// use rayon::prelude::*; // no longer using rayon parallel iterators in run()
5+
// use rayon::prelude::*; // Using rayon for parallel processing of discovered files (imported locally where needed)
66
use regex::Regex;
77
use serde::{Deserialize, Serialize};
88
use std::collections::{BTreeSet, HashMap};
@@ -803,57 +803,43 @@ impl<'a> Scanner<'a> {
803803
builder.threads(n.get());
804804
}
805805

806-
let out: Arc<Mutex<Vec<PathBuf>>> = Arc::new(Mutex::new(Vec::with_capacity(4096)));
807-
let out_ref = out.clone();
808-
809-
builder.build_parallel().run(|| {
810-
let out = out_ref.clone();
811-
Box::new(move |res| {
812-
let entry = match res {
813-
Ok(e) => e,
814-
Err(_) => return ignore::WalkState::Continue,
815-
};
806+
// Use sequential walker to collect all paths (this guarantees completion)
807+
let mut discovered_paths = Vec::new();
808+
809+
for result in builder.build() {
810+
let entry = match result {
811+
Ok(e) => e,
812+
Err(_) => continue,
813+
};
816814

817-
if let Some(ft) = entry.file_type() {
818-
if !ft.is_file() {
819-
return ignore::WalkState::Continue;
820-
}
821-
} else if let Ok(md) = entry.metadata() {
822-
if !md.is_file() {
823-
return ignore::WalkState::Continue;
824-
}
825-
} else {
826-
return ignore::WalkState::Continue;
815+
if let Some(ft) = entry.file_type() {
816+
if !ft.is_file() {
817+
continue;
827818
}
828-
829-
let path = entry.into_path();
830-
if !path_allowed(&path) {
831-
return ignore::WalkState::Continue;
819+
} else if let Ok(md) = entry.metadata() {
820+
if !md.is_file() {
821+
continue;
832822
}
823+
} else {
824+
continue;
825+
}
833826

834-
if let Ok(md) = fs::metadata(&path) {
835-
if (md.len() as usize) > self.config.max_file_size {
836-
return ignore::WalkState::Continue;
837-
}
838-
} else {
839-
return ignore::WalkState::Continue;
840-
}
827+
let path = entry.into_path();
828+
if !path_allowed(&path) {
829+
continue;
830+
}
841831

842-
if let Ok(mut guard) = out.lock() {
843-
guard.push(path);
832+
if let Ok(md) = fs::metadata(&path) {
833+
if (md.len() as usize) > self.config.max_file_size {
834+
continue;
844835
}
836+
} else {
837+
continue;
838+
}
845839

846-
ignore::WalkState::Continue
847-
})
848-
});
840+
discovered_paths.push(path);
841+
}
849842

850-
let drained: Vec<PathBuf> = {
851-
match out.lock() {
852-
Ok(mut guard) => guard.drain(..).collect(),
853-
Err(_) => Vec::new(),
854-
}
855-
};
856-
discovered_paths.extend(drained);
857843
discovered_paths
858844
}
859845

@@ -903,29 +889,7 @@ impl<'a> Scanner<'a> {
903889
cb(0, 0, 0);
904890
}
905891

906-
let (tx, rx) = bounded::<Finding>(8192);
907-
908-
// Collector thread to drain findings as they arrive and keep count
909-
let findings_vec_ref = findings_vec.clone();
910-
let findings_cnt_ref = findings_cnt.clone();
911-
let progress_cb = self.config.progress_callback.clone();
912-
let processed_ref = processed.clone();
913-
let discovered_ref = discovered.clone();
914-
let collector = std::thread::spawn(move || {
915-
for f in rx.iter() {
916-
if let Ok(mut guard) = findings_vec_ref.lock() {
917-
guard.push(f);
918-
}
919-
let new_cnt = findings_cnt_ref.fetch_add(1, Ordering::Relaxed) + 1;
920-
if let Some(cb) = &progress_cb {
921-
cb(
922-
processed_ref.load(Ordering::Relaxed),
923-
discovered_ref.load(Ordering::Relaxed),
924-
new_cnt,
925-
);
926-
}
927-
}
928-
});
892+
// Simplified approach: collect findings directly without a separate collector thread
929893

930894
// Prepare include/exclude matchers for filtering
931895
let include_matcher: Option<globset::GlobSet> = if !self.config.include_globs.is_empty() {
@@ -971,8 +935,6 @@ impl<'a> Scanner<'a> {
971935
};
972936

973937
if roots.is_empty() {
974-
drop(tx);
975-
let _ = collector.join();
976938
return Ok(findings_vec.lock().unwrap().clone());
977939
}
978940

@@ -994,71 +956,77 @@ impl<'a> Scanner<'a> {
994956
builder.threads(n.get());
995957
}
996958

997-
let tx_ref = tx.clone();
998959
let result_cb = self.config.result_callback.clone();
999960
let detectors = &self.detectors;
1000961
let max_file_size = self.config.max_file_size;
1001962
let processed_ref = processed.clone();
1002963
let discovered_ref = discovered.clone();
1003-
let findings_cnt_ref = findings_cnt.clone();
964+
let _findings_cnt_ref = findings_cnt.clone();
1004965
let progress_cb_inner = self.config.progress_callback.clone();
1005966

1006-
builder.build_parallel().run(|| {
1007-
let tx = tx_ref.clone();
1008-
let result_cb = result_cb.clone();
1009-
let progress_cb_inner = progress_cb_inner.clone();
1010-
let processed_ref2 = processed_ref.clone();
1011-
let discovered_ref2 = discovered_ref.clone();
1012-
let findings_cnt_ref2 = findings_cnt_ref.clone();
1013-
Box::new(move |res| {
1014-
let entry = match res {
1015-
Ok(e) => e,
1016-
Err(_) => return ignore::WalkState::Continue,
1017-
};
967+
// Use sequential walker to collect all paths (this guarantees completion)
968+
let mut all_paths = Vec::new();
969+
970+
for result in builder.build() {
971+
let entry = match result {
972+
Ok(e) => e,
973+
Err(_) => continue,
974+
};
1018975

1019-
if let Some(ft) = entry.file_type() {
1020-
if !ft.is_file() {
1021-
return ignore::WalkState::Continue;
1022-
}
1023-
} else if let Ok(md) = entry.metadata() {
1024-
if !md.is_file() {
1025-
return ignore::WalkState::Continue;
1026-
}
1027-
} else {
1028-
return ignore::WalkState::Continue;
976+
if let Some(ft) = entry.file_type() {
977+
if !ft.is_file() {
978+
continue;
1029979
}
1030-
1031-
let path = entry.into_path();
1032-
if !path_allowed(&path) {
1033-
return ignore::WalkState::Continue;
980+
} else if let Ok(md) = entry.metadata() {
981+
if !md.is_file() {
982+
continue;
1034983
}
984+
} else {
985+
continue;
986+
}
1035987

1036-
// Count as discovered candidate
1037-
discovered_ref2.fetch_add(1, Ordering::Relaxed);
988+
let path = entry.into_path();
989+
if !path_allowed(&path) {
990+
continue;
991+
}
1038992

1039-
// Size check
1040-
if let Ok(md) = fs::metadata(&path) {
1041-
if (md.len() as usize) > max_file_size {
1042-
return ignore::WalkState::Continue;
1043-
}
1044-
} else {
1045-
return ignore::WalkState::Continue;
993+
// Size check
994+
if let Ok(md) = fs::metadata(&path) {
995+
if (md.len() as usize) > max_file_size {
996+
continue;
1046997
}
998+
} else {
999+
continue;
1000+
}
10471001

1048-
if let Some(lang) = Scanner::detect_language(&path) {
1049-
if let Ok(bytes) = Scanner::load_file(&path) {
1050-
let unit = ScanUnit {
1051-
path: path.clone(),
1052-
lang,
1053-
bytes: bytes.clone(),
1054-
};
1055-
let stripped = strip_comments(lang, &bytes);
1056-
let stripped_s = String::from_utf8_lossy(&stripped);
1057-
let index = LineIndex::new(stripped_s.as_bytes());
1058-
1059-
// Create a minimal emitter that streams results via callback and sends to collector
1060-
let (_dtx, dummy_rx) = bounded(0);
1061-
let mut em = Emitter { tx: tx.clone(), rx: dummy_rx, on_result: result_cb.clone() };
1002+
all_paths.push(path);
1003+
}
1004+
1005+
// Process all discovered paths using rayon for parallel processing
1006+
use rayon::prelude::*;
1007+
use std::sync::Mutex as StdMutex;
1008+
1009+
let all_findings = Arc::new(StdMutex::new(Vec::new()));
1010+
1011+
all_paths.par_iter().for_each(|path| {
1012+
// Count as discovered candidate
1013+
discovered_ref.fetch_add(1, Ordering::Relaxed);
1014+
1015+
if let Some(lang) = Scanner::detect_language(path) {
1016+
if let Ok(bytes) = Scanner::load_file(path) {
1017+
let unit = ScanUnit {
1018+
path: path.clone(),
1019+
lang,
1020+
bytes: bytes.clone(),
1021+
};
1022+
let stripped = strip_comments(lang, &bytes);
1023+
let stripped_s = String::from_utf8_lossy(&stripped);
1024+
let index = LineIndex::new(stripped_s.as_bytes());
1025+
1026+
// Collect findings locally first
1027+
{
1028+
let (local_tx, local_rx) = bounded(100);
1029+
let mut em = Emitter { tx: local_tx, rx: local_rx, on_result: result_cb.clone() };
10621030
for det in detectors {
10631031
if !det.languages().contains(&lang) {
10641032
continue;
@@ -1068,27 +1036,41 @@ impl<'a> Scanner<'a> {
10681036
}
10691037
let _ = det.scan_optimized(&unit, &stripped_s, &index, &mut em);
10701038
}
1039+
// Collect all findings from this file and add to global collection
1040+
let local_findings = em.drain();
1041+
if !local_findings.is_empty() {
1042+
if let Ok(mut guard) = all_findings.lock() {
1043+
guard.extend(local_findings);
1044+
}
1045+
}
10711046
}
10721047
}
1048+
}
10731049

1074-
// Mark processed and update progress
1075-
let new_proc = processed_ref2.fetch_add(1, Ordering::Relaxed) + 1;
1076-
if let Some(ref cb) = progress_cb_inner {
1077-
cb(
1078-
new_proc,
1079-
discovered_ref2.load(Ordering::Relaxed),
1080-
findings_cnt_ref2.load(Ordering::Relaxed),
1081-
);
1082-
}
1083-
1084-
ignore::WalkState::Continue
1085-
})
1050+
// Mark processed and update progress
1051+
let new_proc = processed_ref.fetch_add(1, Ordering::Relaxed) + 1;
1052+
if let Some(ref cb) = progress_cb_inner {
1053+
let current_findings = {
1054+
let guard = all_findings.lock().unwrap();
1055+
guard.len()
1056+
};
1057+
cb(
1058+
new_proc,
1059+
discovered_ref.load(Ordering::Relaxed),
1060+
current_findings,
1061+
);
1062+
}
10861063
});
1087-
1088-
drop(tx);
1089-
let _ = collector.join();
1090-
1091-
let mut findings = findings_vec.lock().unwrap().clone();
1064+
1065+
// Extract all findings and add them to the main findings vector
1066+
let mut findings = {
1067+
let collected_findings = all_findings.lock().unwrap();
1068+
let mut findings_guard = findings_vec.lock().unwrap();
1069+
findings_guard.extend(collected_findings.clone());
1070+
findings_guard.clone()
1071+
};
1072+
1073+
// All processing completed successfully
10921074
if self.config.deterministic {
10931075
findings.sort_by(|a, b| {
10941076
(

0 commit comments

Comments
 (0)