diff --git a/Cargo.lock b/Cargo.lock index 5dc165892675a..928e0bdc6cfba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4370,6 +4370,7 @@ dependencies = [ "serde", "serde_json", "thiserror 2.0.16", + "tokio", "toml 0.9.7", "tracing", "walkdir", @@ -4692,6 +4693,7 @@ dependencies = [ "indicatif 0.18.0", "parking_lot", "proptest", + "rayon", "revm", "revm-inspectors", "serde", diff --git a/crates/cheatcodes/Cargo.toml b/crates/cheatcodes/Cargo.toml index 305107affee4d..7eee0748e597c 100644 --- a/crates/cheatcodes/Cargo.toml +++ b/crates/cheatcodes/Cargo.toml @@ -60,6 +60,7 @@ revm-inspectors.workspace = true semver.workspace = true serde_json.workspace = true thiserror.workspace = true +tokio.workspace = true toml = { workspace = true, features = ["preserve_order"] } tracing.workspace = true walkdir.workspace = true diff --git a/crates/cheatcodes/src/test/expect.rs b/crates/cheatcodes/src/test/expect.rs index c98ccca56ae98..1cec671faf98c 100644 --- a/crates/cheatcodes/src/test/expect.rs +++ b/crates/cheatcodes/src/test/expect.rs @@ -1094,8 +1094,12 @@ fn decode_event( return None; } let t0 = topics[0]; // event sig - // Try to identify the event - let event = foundry_common::block_on(identifier.identify_event(t0))?; + // Try to identify the event - detect if we're in a Tokio runtime and use appropriate method + let event = if tokio::runtime::Handle::try_current().is_ok() { + foundry_common::block_on(identifier.identify_event(t0))? + } else { + identifier.identify_event_sync(t0)? + }; // Check if event already has indexed information from signatures let has_indexed_info = event.inputs.iter().any(|p| p.indexed); diff --git a/crates/config/src/fuzz.rs b/crates/config/src/fuzz.rs index ccb8cf45b632b..1571185821f8b 100644 --- a/crates/config/src/fuzz.rs +++ b/crates/config/src/fuzz.rs @@ -34,6 +34,11 @@ pub struct FuzzConfig { pub show_logs: bool, /// Optional timeout (in seconds) for each property test pub timeout: Option, + /// Number of threads to use for parallel fuzz runs + /// + /// This is set by passing `-j` or `--jobs` + #[serde(skip)] + pub threads: Option, } impl Default for FuzzConfig { @@ -49,6 +54,7 @@ impl Default for FuzzConfig { failure_persist_dir: None, show_logs: false, timeout: None, + threads: None, } } } diff --git a/crates/evm/evm/Cargo.toml b/crates/evm/evm/Cargo.toml index b3e9553f40304..ea247c5dcbba7 100644 --- a/crates/evm/evm/Cargo.toml +++ b/crates/evm/evm/Cargo.toml @@ -56,3 +56,4 @@ indicatif.workspace = true serde_json.workspace = true serde.workspace = true uuid.workspace = true +rayon.workspace = true diff --git a/crates/evm/evm/src/executors/corpus.rs b/crates/evm/evm/src/executors/corpus.rs index 03de74df2c8d9..4df467b8126cf 100644 --- a/crates/evm/evm/src/executors/corpus.rs +++ b/crates/evm/evm/src/executors/corpus.rs @@ -19,7 +19,10 @@ use serde::Serialize; use std::{ fmt, path::PathBuf, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, time::{SystemTime, UNIX_EPOCH}, }; use uuid::Uuid; @@ -101,6 +104,37 @@ impl CorpusEntry { } #[derive(Serialize, Default)] +pub(crate) struct GlobalCorpusMetrics { + // Number of edges seen during the invariant run + cumulative_edges_seen: Arc, + // Number of features (new hitcount bin of previously hit edge) seen during the invariant run + cumulative_features_seen: Arc, + // Number of corpus entries + corpus_count: Arc, + // Number of corpus entries that are favored + favored_items: Arc, +} + +impl fmt::Display for GlobalCorpusMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f)?; + writeln!( + f, + " - cumulative edges seen: {}", + self.cumulative_edges_seen.load(Ordering::Relaxed) + )?; + writeln!( + f, + " - cumulative features seen: {}", + self.cumulative_features_seen.load(Ordering::Relaxed) + )?; + writeln!(f, " - corpus count: {}", self.corpus_count.load(Ordering::Relaxed))?; + write!(f, " - favored items: {}", self.favored_items.load(Ordering::Relaxed))?; + Ok(()) + } +} + +#[derive(Serialize, Default, Clone)] pub(crate) struct CorpusMetrics { // Number of edges seen during the invariant run. cumulative_edges_seen: usize, @@ -171,6 +205,8 @@ pub struct WorkerCorpus { /// Worker Dir /// corpus_dir/worker1/ worker_dir: Option, + /// Metrics at last sync - used to calculate deltas while syncing with global metrics + last_sync_metrics: CorpusMetrics, } impl WorkerCorpus { @@ -317,6 +353,7 @@ impl WorkerCorpus { new_entry_indices: Default::default(), last_sync_timestamp: 0, worker_dir, + last_sync_metrics: Default::default(), }) } @@ -555,8 +592,7 @@ impl WorkerCorpus { let corpus = &self.in_memory_corpus [test_runner.rng().random_range(0..self.in_memory_corpus.len())]; self.current_mutated = Some(corpus.uuid); - let new_seq = corpus.tx_seq.clone(); - let mut tx = new_seq.first().unwrap().clone(); + let mut tx = corpus.tx_seq.first().unwrap().clone(); self.abi_mutate(&mut tx, function, test_runner, fuzz_state)?; tx } else { @@ -769,7 +805,6 @@ impl WorkerCorpus { }; if timestamp <= self.last_sync_timestamp { - // TODO: Delete synced file continue; } @@ -890,7 +925,7 @@ impl WorkerCorpus { /// To be run by the master worker (id = 0) to distribute the global corpus to sync/ directories /// of other workers. - fn distribute(&mut self, num_workers: usize) -> eyre::Result<()> { + fn distribute(&mut self, num_workers: u32) -> eyre::Result<()> { if self.id != 0 || self.worker_dir.is_none() { return Ok(()); } @@ -942,14 +977,70 @@ impl WorkerCorpus { Ok(()) } + /// Syncs local metrics with global corpus metrics by calculating and applying deltas + pub(crate) fn sync_metrics(&mut self, global_corpus_metrics: &GlobalCorpusMetrics) { + // Calculate delta metrics since last sync + let edges_delta = self + .metrics + .cumulative_edges_seen + .saturating_sub(self.last_sync_metrics.cumulative_edges_seen); + let features_delta = self + .metrics + .cumulative_features_seen + .saturating_sub(self.last_sync_metrics.cumulative_features_seen); + // For corpus count and favored items, calculate deltas + let corpus_count_delta = + self.metrics.corpus_count as isize - self.last_sync_metrics.corpus_count as isize; + let favored_delta = + self.metrics.favored_items as isize - self.last_sync_metrics.favored_items as isize; + + // Add delta values to global metrics + + if edges_delta > 0 { + global_corpus_metrics.cumulative_edges_seen.fetch_add(edges_delta, Ordering::Relaxed); + } + if features_delta > 0 { + global_corpus_metrics + .cumulative_features_seen + .fetch_add(features_delta, Ordering::Relaxed); + } + + if corpus_count_delta > 0 { + global_corpus_metrics + .corpus_count + .fetch_add(corpus_count_delta as usize, Ordering::Relaxed); + } else if corpus_count_delta < 0 { + global_corpus_metrics + .corpus_count + .fetch_sub((-corpus_count_delta) as usize, Ordering::Relaxed); + } + + if favored_delta > 0 { + global_corpus_metrics + .favored_items + .fetch_add(favored_delta as usize, Ordering::Relaxed); + } else if favored_delta < 0 { + global_corpus_metrics + .favored_items + .fetch_sub((-favored_delta) as usize, Ordering::Relaxed); + } + + // Store current metrics as last sync metrics for next delta calculation + self.last_sync_metrics = self.metrics.clone(); + } + /// Syncs the workers in_memory_corpus and history_map with the findings from other workers. - pub fn sync( + pub(crate) fn sync( &mut self, - num_workers: usize, + num_workers: u32, executor: &Executor, fuzzed_function: Option<&Function>, fuzzed_contracts: Option<&FuzzRunIdentifiedContracts>, + global_corpus_metrics: &GlobalCorpusMetrics, ) -> eyre::Result<()> { + // Sync metrics with global corpus metrics + self.sync_metrics(global_corpus_metrics); + if self.id == 0 { // Master worker self.calibrate(executor, fuzzed_function, fuzzed_contracts)?; @@ -1040,6 +1131,7 @@ mod tests { new_entry_indices: Default::default(), last_sync_timestamp: 0, worker_dir: Some(corpus_root), + last_sync_metrics: CorpusMetrics::default(), }; (manager, seed_uuid) @@ -1146,6 +1238,7 @@ mod tests { new_entry_indices: Default::default(), last_sync_timestamp: 0, worker_dir: Some(corpus_root), + last_sync_metrics: CorpusMetrics::default(), }; // First eviction should remove the non-favored one diff --git a/crates/evm/evm/src/executors/fuzz/mod.rs b/crates/evm/evm/src/executors/fuzz/mod.rs index 09a8a8916964a..96b0ae574bb42 100644 --- a/crates/evm/evm/src/executors/fuzz/mod.rs +++ b/crates/evm/evm/src/executors/fuzz/mod.rs @@ -1,15 +1,15 @@ use crate::executors::{ - DURATION_BETWEEN_METRICS_REPORT, Executor, FailFast, FuzzTestTimer, RawCallResult, + DURATION_BETWEEN_METRICS_REPORT, Executor, FailFast, corpus::WorkerCorpus, + fuzz::types::{FuzzWorker, SharedFuzzState}, }; use alloy_dyn_abi::JsonAbiExt; use alloy_json_abi::Function; -use alloy_primitives::{Address, Bytes, Log, U256, map::HashMap}; +use alloy_primitives::{Address, Bytes, U256, keccak256}; use eyre::Result; use foundry_common::sh_println; use foundry_config::FuzzConfig; use foundry_evm_core::{ - Breakpoints, constants::{CHEATCODE_ADDRESS, MAGIC_ASSUME}, decode::{RevertDecoder, SkipReason}, }; @@ -19,44 +19,22 @@ use foundry_evm_fuzz::{ FuzzFixtures, FuzzTestResult, strategies::{EvmFuzzState, fuzz_calldata, fuzz_calldata_from_state}, }; -use foundry_evm_traces::SparsedTraceArena; use indicatif::ProgressBar; use proptest::{ strategy::Strategy, - test_runner::{TestCaseError, TestRunner}, + test_runner::{RngAlgorithm, TestCaseError, TestRng, TestRunner}, }; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use serde_json::json; -use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use std::{ + sync::Arc, + time::{Instant, SystemTime, UNIX_EPOCH}, +}; mod types; pub use types::{CaseOutcome, CounterExampleOutcome, FuzzOutcome}; - -/// Contains data collected during fuzz test runs. -#[derive(Default)] -struct FuzzTestData { - // Stores the first fuzz case. - first_case: Option, - // Stored gas usage per fuzz case. - gas_by_case: Vec<(u64, u64)>, - // Stores the result and calldata of the last failed call, if any. - counterexample: (Bytes, RawCallResult), - // Stores up to `max_traces_to_collect` traces. - traces: Vec, - // Stores breakpoints for the last fuzz case. - breakpoints: Option, - // Stores coverage information for all fuzz cases. - coverage: Option, - // Stores logs for all fuzz cases - logs: Vec, - // Deprecated cheatcodes mapped to their replacements. - deprecated_cheatcodes: HashMap<&'static str, Option<&'static str>>, - // Runs performed in fuzz test. - runs: u32, - // Current assume rejects of the fuzz run. - rejects: u32, - // Test failure. - failure: Option, -} +/// Corpus syncs across workers every `SYNC_INTERVAL` runs. +const SYNC_INTERVAL: u32 = 1000; /// Wrapper around an [`Executor`] which provides fuzzing support using [`proptest`]. /// @@ -105,130 +83,415 @@ impl FuzzedExecutor { fail_fast: &FailFast, ) -> Result { // Stores the fuzz test execution data. - let mut test_data = FuzzTestData::default(); + let shared_state = Arc::new(SharedFuzzState::new( + self.config.runs, + self.config.timeout, + fail_fast.clone(), + )); + + // Determine number of workers + let num_workers = self.num_workers(); + // Use single worker for deterministic behavior when replaying persisted failures + let persisted_failure = self.persisted_failure.take(); + let workers = (0..num_workers) + .into_par_iter() + .map(|worker_id| { + self.run_worker( + worker_id, + func, + fuzz_fixtures, + deployed_libs, + address, + rd, + shared_state.clone(), + progress, + if worker_id == 0 { persisted_failure.clone() } else { None }, + ) + }) + .collect::>>()?; + + Ok(self.aggregate_results(workers, func, shared_state)) + } + + /// Granular and single-step function that runs only one fuzz and returns either a `CaseOutcome` + /// or a `CounterExampleOutcome` + fn single_fuzz( + &self, + address: Address, + calldata: Bytes, + coverage_metrics: &mut WorkerCorpus, + ) -> Result { + let mut call = self + .executor + .call_raw(self.sender, address, calldata.clone(), U256::ZERO) + .map_err(|e| TestCaseError::fail(e.to_string()))?; + let new_coverage = coverage_metrics.merge_edge_coverage(&mut call); + coverage_metrics.process_inputs( + &[BasicTxDetails { + sender: self.sender, + call_details: CallDetails { target: address, calldata: calldata.clone() }, + }], + new_coverage, + ); + + // Handle `vm.assume`. + if call.result.as_ref() == MAGIC_ASSUME { + return Err(TestCaseError::reject(FuzzError::TooManyRejects( + self.config.max_test_rejects, + ))); + } + + let (breakpoints, deprecated_cheatcodes) = + call.cheatcodes.as_ref().map_or_else(Default::default, |cheats| { + (cheats.breakpoints.clone(), cheats.deprecated.clone()) + }); + + // Consider call success if test should not fail on reverts and reverter is not the + // cheatcode or test address. + let success = if !self.config.fail_on_revert + && call + .reverter + .is_some_and(|reverter| reverter != address && reverter != CHEATCODE_ADDRESS) + { + true + } else { + self.executor.is_raw_call_mut_success(address, &mut call, false) + }; + + if success { + Ok(FuzzOutcome::Case(CaseOutcome { + case: FuzzCase { calldata, gas: call.gas_used, stipend: call.stipend }, + traces: call.traces, + coverage: call.line_coverage, + breakpoints, + logs: call.logs, + deprecated_cheatcodes, + })) + } else { + Ok(FuzzOutcome::CounterExample(CounterExampleOutcome { + exit_reason: call.exit_reason, + counterexample: (calldata, call), + breakpoints, + })) + } + } + + /// Aggregates the results from all workers + fn aggregate_results( + &self, + mut workers: Vec, + func: &Function, + shared_state: Arc, + ) -> FuzzTestResult { + let mut result = FuzzTestResult::default(); + + // Extract failed worker first if it exists + let failed_worker = shared_state.failed_worked_id().and_then(|id| { + workers.iter().position(|w| w.worker_id == id).map(|idx| workers.swap_remove(idx)) + }); + + // Process failure first if exists + if let Some(failed_worker) = failed_worker { + result.success = false; + let (calldata, call) = failed_worker.counterexample; + result.labels = call.labels; + result.traces = call.traces.clone(); + result.breakpoints = call.cheatcodes.map(|c| c.breakpoints); + + match failed_worker.failure { + Some(TestCaseError::Fail(reason)) => { + let reason = reason.to_string(); + result.reason = (!reason.is_empty()).then_some(reason); + let args = if let Some(data) = calldata.get(4..) { + func.abi_decode_input(data).unwrap_or_default() + } else { + vec![] + }; + result.counterexample = Some(CounterExample::Single( + BaseCounterExample::from_fuzz_call(calldata, args, call.traces), + )); + } + Some(TestCaseError::Reject(reason)) => { + let reason = reason.to_string(); + result.reason = (!reason.is_empty()).then_some(reason); + } + None => {} + } + } else { + result.success = true; + } + + // Single pass aggregation for remaining workers + let mut first_case_candidate: Option<(u32, FuzzCase)> = None; + let mut last_run_worker: Option<&FuzzWorker> = None; + let mut last_run_timestamp = 0u128; + + for worker in &workers { + // Track first case (compare without cloning) + if let Some((run, case)) = &worker.first_case + && first_case_candidate.as_ref().is_none_or(|(r, _)| run < r) + { + first_case_candidate = Some((*run, case.clone())); + } + + // Track last run worker (keep reference, no clone) + if worker.last_run_timestamp > last_run_timestamp { + last_run_timestamp = worker.last_run_timestamp; + last_run_worker = Some(worker); + } + + // Only retrieve from worker 0 i.e master worker which is responsible for replaying + // persisted corpus. + if worker.worker_id == 0 { + result.failed_corpus_replays = worker.failed_corpus_replays; + } + } + + // Set first case + result.first_case = first_case_candidate.map(|(_, case)| case).unwrap_or_default(); + + // If no failure, set traces and breakpoints from last run + if result.success + && let Some(last_worker) = last_run_worker + { + result.traces = last_worker.traces.last().cloned(); + result.breakpoints = last_worker.breakpoints.clone(); + } + + for mut worker in workers { + result.gas_by_case.append(&mut worker.gas_by_case); + result.logs.append(&mut worker.logs); + result.gas_report_traces.extend(worker.traces.into_iter().map(|t| t.arena)); + + // Merge coverage + HitMaps::merge_opt(&mut result.line_coverage, worker.coverage); + + result.deprecated_cheatcodes.extend(worker.deprecated_cheatcodes); + } + + // Check for skip reason + if let Some(reason) = &result.reason + && let Some(reason) = SkipReason::decode_self(reason) + { + result.skipped = true; + result.reason = reason.0; + } + + result + } + + /// Runs a single fuzz worker + #[allow(clippy::too_many_arguments)] + fn run_worker( + &self, + worker_id: u32, + func: &Function, + fuzz_fixtures: &FuzzFixtures, + deployed_libs: &[Address], + address: Address, + rd: &RevertDecoder, + shared_state: Arc, + progress: Option<&ProgressBar>, + mut persisted_failure: Option, + ) -> Result { + // Prepare let state = self.build_fuzz_state(deployed_libs); let dictionary_weight = self.config.dictionary.dictionary_weight.min(100); let strategy = proptest::prop_oneof![ 100 - dictionary_weight => fuzz_calldata(func.clone(), fuzz_fixtures), dictionary_weight => fuzz_calldata_from_state(func.clone(), &state), ] - .prop_map(move |calldata| BasicTxDetails { + .prop_map(|calldata| BasicTxDetails { sender: Default::default(), call_details: CallDetails { target: Default::default(), calldata }, }); - // We want to collect at least one trace which will be displayed to user. - let max_traces_to_collect = std::cmp::max(1, self.config.gas_report_samples) as usize; - let mut corpus_manager = WorkerCorpus::new( - 0, // Id of the Master + let mut corpus = WorkerCorpus::new( + worker_id, self.config.corpus.clone(), strategy.boxed(), - Some(&self.executor), + // Master worker replays the persisted corpus using the executor + if worker_id == 0 { Some(&self.executor) } else { None }, Some(func), - None, + None, // fuzzed_contracts for invariant tests )?; - // Start timer for this fuzz test. - let timer = FuzzTestTimer::new(self.config.timeout); - let mut last_metrics_report = Instant::now(); - let max_runs = self.config.runs; - let continue_campaign = |runs: u32| { - if fail_fast.should_stop() { - return false; - } + let mut worker = FuzzWorker::new(worker_id); + let num_workers = self.num_workers(); + let max_traces_to_collect = std::cmp::max(1, self.config.gas_report_samples / num_workers); + + // Calculate worker-specific run limit when not using timer + let worker_runs = if self.config.timeout.is_some() { + // When using timer, workers run as many as possible + u32::MAX + } else { + // Distribute runs evenly across workers, with worker 0 handling any remainder + let base_runs = self.config.runs / num_workers; + let remainder = self.config.runs % num_workers; + if worker_id == 0 { base_runs + remainder } else { base_runs } + }; - if timer.is_enabled() { !timer.is_timed_out() } else { runs < max_runs } + let mut runner_config = self.runner.config().clone(); + // Set the runner cases to worker_runs + runner_config.cases = worker_runs; + + let mut runner = if let Some(seed) = self.config.seed { + // For deterministic parallel fuzzing, derive a unique seed for each worker + let worker_seed = if worker_id == 0 { + // Master worker uses the provided seed as is. + seed + } else { + // Derive a worker-specific seed using keccak256(seed || worker_id) + let mut seed_data = [0u8; 36]; // 32 bytes for seed + 4 bytes for worker_id + seed_data[..32].copy_from_slice(&seed.to_be_bytes::<32>()); + seed_data[32..36].copy_from_slice(&worker_id.to_be_bytes()); + U256::from_be_bytes(keccak256(seed_data).0) + }; + trace!(target: "forge::test", ?worker_seed, "deterministic seed for worker {worker_id}"); + let rng = TestRng::from_seed(RngAlgorithm::ChaCha, &worker_seed.to_be_bytes::<32>()); + TestRunner::new_with_rng(runner_config, rng) + } else { + TestRunner::new(runner_config) }; - 'stop: while continue_campaign(test_data.runs) { - // If counterexample recorded, replay it first, without incrementing runs. - let input = if let Some(failure) = self.persisted_failure.take() { + // Offset to stagger corpus syncs across workers; so that workers don't sync at the same + // time. + let sync_offset = worker_id * 100; + let mut runs_since_sync = 0; + let sync_threshold = SYNC_INTERVAL + sync_offset; + let mut last_metrics_report = Instant::now(); + // Continue while: + // 1. Global state allows (not timed out, not at global limit, no failure found) + // 2. Worker hasn't reached its specific run limit + 'stop: while shared_state.should_continue() && worker.runs < worker_runs { + // Only the master worker replays the persisted failure, if any. + let input = if worker_id == 0 + && let Some(failure) = persisted_failure.take() + { failure.calldata } else { - // If running with progress, then increment current run. - if let Some(progress) = progress { - progress.inc(1); - // Display metrics in progress bar. - if self.config.corpus.collect_edge_coverage() { - progress.set_message(format!("{}", &corpus_manager.metrics)); - } - } else if self.config.corpus.collect_edge_coverage() - && last_metrics_report.elapsed() > DURATION_BETWEEN_METRICS_REPORT - { - // Display metrics inline. - let metrics = json!({ - "timestamp": SystemTime::now() - .duration_since(UNIX_EPOCH)? - .as_secs(), - "test": func.name, - "metrics": &corpus_manager.metrics, - }); - let _ = sh_println!("{}", serde_json::to_string(&metrics)?); - last_metrics_report = Instant::now(); - }; - - test_data.runs += 1; - - match corpus_manager.new_input(&mut self.runner, &state, func) { + runs_since_sync += 1; + if runs_since_sync >= sync_threshold { + let instance = Instant::now(); + corpus.sync( + num_workers, + &self.executor, + Some(func), + None, + &shared_state.global_corpus_metrics, + )?; + trace!("Worker {worker_id} finished corpus sync in {:?}", instance.elapsed()); + runs_since_sync = 0; + } + + match corpus.new_input(&mut runner, &state, func) { Ok(input) => input, Err(err) => { - test_data.failure = Some(TestCaseError::fail(format!( - "failed to generate fuzzed input: {err}" + worker.failure = Some(TestCaseError::fail(format!( + "failed to generate fuzzed input in worker {}: {err}", + worker.worker_id ))); + shared_state.try_claim_failure(worker_id); break 'stop; } } }; - match self.single_fuzz(address, input, &mut corpus_manager) { + worker.last_run_timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis(); + match self.single_fuzz(address, input, &mut corpus) { Ok(fuzz_outcome) => match fuzz_outcome { FuzzOutcome::Case(case) => { - test_data.gas_by_case.push((case.case.gas, case.case.stipend)); + // Only increment runs for successful non-rejected cases + // Check if we should actually count this run + if shared_state.try_increment_runs().is_none() { + // We've exceeded the run limit, stop + break 'stop; + } + worker.runs += 1; - if test_data.first_case.is_none() { - test_data.first_case.replace(case.case); + if let Some(progress) = progress { + progress.inc(1); + if self.config.corpus.collect_edge_coverage() && worker_id == 0 { + corpus.sync_metrics(&shared_state.global_corpus_metrics); + progress + .set_message(format!("{}", shared_state.global_corpus_metrics)); + } + } else if self.config.corpus.collect_edge_coverage() + && last_metrics_report.elapsed() > DURATION_BETWEEN_METRICS_REPORT + && worker_id == 0 + { + corpus.sync_metrics(&shared_state.global_corpus_metrics); + // Display metrics inline. + let metrics = json!({ + "timestamp": SystemTime::now() + .duration_since(UNIX_EPOCH)? + .as_secs(), + "test": func.name, + "metrics": &shared_state.global_corpus_metrics, + }); + let _ = sh_println!("{}", serde_json::to_string(&metrics)?); + last_metrics_report = Instant::now(); + } + + worker.gas_by_case.push((case.case.gas, case.case.stipend)); + + if worker.first_case.is_none() { + let total_runs = shared_state.total_runs(); + worker.first_case.replace((total_runs, case.case)); } if let Some(call_traces) = case.traces { - if test_data.traces.len() == max_traces_to_collect { - test_data.traces.pop(); + if worker.traces.len() == max_traces_to_collect as usize { + worker.traces.pop(); } - test_data.traces.push(call_traces); - test_data.breakpoints.replace(case.breakpoints); + worker.traces.push(call_traces); + worker.breakpoints.replace(case.breakpoints); } if self.config.show_logs { - test_data.logs.extend(case.logs); + worker.logs.extend(case.logs); } - HitMaps::merge_opt(&mut test_data.coverage, case.coverage); - test_data.deprecated_cheatcodes = case.deprecated_cheatcodes; + HitMaps::merge_opt(&mut worker.coverage, case.coverage); + worker.deprecated_cheatcodes = case.deprecated_cheatcodes; } FuzzOutcome::CounterExample(CounterExampleOutcome { exit_reason: status, counterexample: outcome, .. }) => { + // Count this as a run since we found a counterexample + // We always count counterexamples regardless of run limit + shared_state.increment_runs(); + worker.runs += 1; + + if let Some(progress) = progress { + progress.inc(1); + } let reason = rd.maybe_decode(&outcome.1.result, status); - test_data.logs.extend(outcome.1.logs.clone()); - test_data.counterexample = outcome; - test_data.failure = Some(TestCaseError::fail(reason.unwrap_or_default())); + worker.logs.extend(outcome.1.logs.clone()); + worker.counterexample = outcome; + worker.failure = Some(TestCaseError::fail(reason.unwrap_or_default())); + shared_state.try_claim_failure(worker_id); break 'stop; } }, Err(err) => { match err { TestCaseError::Fail(_) => { - test_data.failure = Some(err); + worker.failure = Some(err); + shared_state.try_claim_failure(worker_id); break 'stop; } TestCaseError::Reject(_) => { - // Discard run and apply max rejects if configured. - test_data.runs -= 1; + // Apply max rejects only if configured, otherwise silently discard run. if self.config.max_test_rejects > 0 { - test_data.rejects += 1; - if test_data.rejects >= self.config.max_test_rejects { - test_data.failure = Some(err); + worker.rejects += 1; + shared_state.increment_rejects(); + // Fail only total_rejects across workers exceeds the config value + if shared_state.total_rejects() >= self.config.max_test_rejects { + worker.failure = Some(err); + shared_state.try_claim_failure(worker_id); break 'stop; } } @@ -238,126 +501,16 @@ impl FuzzedExecutor { } } - let (calldata, call) = test_data.counterexample; - let mut traces = test_data.traces; - let (last_run_traces, last_run_breakpoints) = if test_data.failure.is_none() { - (traces.pop(), test_data.breakpoints) - } else { - (call.traces.clone(), call.cheatcodes.map(|c| c.breakpoints)) - }; - - let mut result = FuzzTestResult { - first_case: test_data.first_case.unwrap_or_default(), - gas_by_case: test_data.gas_by_case, - success: test_data.failure.is_none(), - skipped: false, - reason: None, - counterexample: None, - logs: test_data.logs, - labels: call.labels, - traces: last_run_traces, - breakpoints: last_run_breakpoints, - gas_report_traces: traces.into_iter().map(|a| a.arena).collect(), - line_coverage: test_data.coverage, - deprecated_cheatcodes: test_data.deprecated_cheatcodes, - failed_corpus_replays: corpus_manager.failed_replays, - }; - - match test_data.failure { - Some(TestCaseError::Fail(reason)) => { - let reason = reason.to_string(); - result.reason = (!reason.is_empty()).then_some(reason); - let args = if let Some(data) = calldata.get(4..) { - func.abi_decode_input(data).unwrap_or_default() - } else { - vec![] - }; - result.counterexample = Some(CounterExample::Single( - BaseCounterExample::from_fuzz_call(calldata, args, call.traces), - )); - } - Some(TestCaseError::Reject(reason)) => { - let reason = reason.to_string(); - result.reason = (!reason.is_empty()).then_some(reason); - } - None => {} - } - - if let Some(reason) = &result.reason - && let Some(reason) = SkipReason::decode_self(reason) - { - result.skipped = true; - result.reason = reason.0; + if worker_id == 0 { + worker.failed_corpus_replays = corpus.failed_replays; } + // Logs stats + trace!("worker {worker_id} fuzz stats"); state.log_stats(); - Ok(result) - } - - /// Granular and single-step function that runs only one fuzz and returns either a `CaseOutcome` - /// or a `CounterExampleOutcome` - fn single_fuzz( - &mut self, - address: Address, - calldata: Bytes, - coverage_metrics: &mut WorkerCorpus, - ) -> Result { - let mut call = self - .executor - .call_raw(self.sender, address, calldata.clone(), U256::ZERO) - .map_err(|e| TestCaseError::fail(e.to_string()))?; - let new_coverage = coverage_metrics.merge_edge_coverage(&mut call); - coverage_metrics.process_inputs( - &[BasicTxDetails { - sender: self.sender, - call_details: CallDetails { target: address, calldata: calldata.clone() }, - }], - new_coverage, - ); - - // Handle `vm.assume`. - if call.result.as_ref() == MAGIC_ASSUME { - return Err(TestCaseError::reject(FuzzError::TooManyRejects( - self.config.max_test_rejects, - ))); - } - - let (breakpoints, deprecated_cheatcodes) = - call.cheatcodes.as_ref().map_or_else(Default::default, |cheats| { - (cheats.breakpoints.clone(), cheats.deprecated.clone()) - }); - - // Consider call success if test should not fail on reverts and reverter is not the - // cheatcode or test address. - let success = if !self.config.fail_on_revert - && call - .reverter - .is_some_and(|reverter| reverter != address && reverter != CHEATCODE_ADDRESS) - { - true - } else { - self.executor.is_raw_call_mut_success(address, &mut call, false) - }; - - if success { - Ok(FuzzOutcome::Case(CaseOutcome { - case: FuzzCase { calldata, gas: call.gas_used, stipend: call.stipend }, - traces: call.traces, - coverage: call.line_coverage, - breakpoints, - logs: call.logs, - deprecated_cheatcodes, - })) - } else { - Ok(FuzzOutcome::CounterExample(CounterExampleOutcome { - exit_reason: call.exit_reason, - counterexample: (calldata, call), - breakpoints, - })) - } + Ok(worker) } - /// Stores fuzz state for use with [fuzz_calldata_from_state] pub fn build_fuzz_state(&self, deployed_libs: &[Address]) -> EvmFuzzState { if let Some(fork_db) = self.executor.backend().active_fork_db() { @@ -370,4 +523,20 @@ impl FuzzedExecutor { ) } } + + /// Determines the number of workers to run + /// + /// Depends on: + /// - Whether we're replaying a persisted failure + /// - `--jobs` specified by the user + /// - Available threads + fn num_workers(&self) -> u32 { + if self.persisted_failure.is_some() { + 1 + } else if let Some(threads) = self.config.threads { + threads as u32 + } else { + rayon::current_num_threads() as u32 + } + } } diff --git a/crates/evm/evm/src/executors/fuzz/types.rs b/crates/evm/evm/src/executors/fuzz/types.rs index c09f53bbd2e7f..55ed69049b69d 100644 --- a/crates/evm/evm/src/executors/fuzz/types.rs +++ b/crates/evm/evm/src/executors/fuzz/types.rs @@ -1,9 +1,15 @@ -use crate::executors::RawCallResult; +use std::sync::{ + Arc, OnceLock, + atomic::{AtomicU32, Ordering}, +}; + +use crate::executors::{FailFast, FuzzTestTimer, RawCallResult, corpus::GlobalCorpusMetrics}; use alloy_primitives::{Bytes, Log, map::HashMap}; use foundry_evm_core::Breakpoints; use foundry_evm_coverage::HitMaps; use foundry_evm_fuzz::FuzzCase; use foundry_evm_traces::SparsedTraceArena; +use proptest::prelude::TestCaseError; use revm::interpreter::InstructionResult; /// Returned by a single fuzz in the case of a successful run @@ -41,3 +47,151 @@ pub enum FuzzOutcome { Case(CaseOutcome), CounterExample(CounterExampleOutcome), } + +/// Shared state for coordinating parallel fuzz workers +pub struct SharedFuzzState { + /// Total runs across workers + total_runs: Arc, + /// Found failure + /// + /// The worker that found the failure sets it's ID. + /// + /// This ID is then used to correctly extract the failure reason and counterexample. + found_failure: OnceLock, + /// Maximum number of runs + max_runs: u32, + /// Total rejects across workers + total_rejects: Arc, + /// Fuzz timer + timer: FuzzTestTimer, + /// Fail Fast coordinator + fail_fast: FailFast, + /// Global corpus metrics + pub(crate) global_corpus_metrics: GlobalCorpusMetrics, +} + +impl SharedFuzzState { + pub fn new(max_runs: u32, timeout: Option, fail_fast: FailFast) -> Self { + Self { + total_runs: Arc::new(AtomicU32::new(0)), + found_failure: OnceLock::new(), + max_runs, + total_rejects: Arc::new(AtomicU32::new(0)), + timer: FuzzTestTimer::new(timeout), + fail_fast, + global_corpus_metrics: GlobalCorpusMetrics::default(), + } + } + + pub fn try_increment_runs(&self) -> Option { + // If using timer, always increment + if self.timer.is_enabled() { + return Some(self.total_runs.fetch_add(1, Ordering::Relaxed) + 1); + } + + // Simple atomic increment with check + let current = self.total_runs.fetch_add(1, Ordering::Relaxed); + + if current < self.max_runs { + Some(current + 1) + } else { + // We went over the limit, decrement back + self.total_runs.fetch_sub(1, Ordering::Relaxed); + None + } + } + + pub fn increment_runs(&self) -> u32 { + self.total_runs.fetch_add(1, Ordering::Relaxed) + } + + pub fn increment_rejects(&self) -> u32 { + self.total_rejects.fetch_add(1, Ordering::Relaxed) + } + + pub fn should_continue(&self) -> bool { + // Check fail-fast + if self.fail_fast.should_stop() { + return false; + } + + if self.timer.is_enabled() { + // Check timer + !self.timer.is_timed_out() + } else { + // Check runs + let total_runs = self.total_runs.load(Ordering::Relaxed); + total_runs < self.max_runs + } + } + + /// Returns true if the worker was able to claim the failure, false if failure was set by + /// another worker + pub fn try_claim_failure(&self, worker_id: u32) -> bool { + if self.found_failure.get().is_some() { + return false; + } + + let claimed = self.found_failure.set(worker_id).is_ok(); + if claimed { + // Record failure in FailFast as well + self.fail_fast.record_fail(); + } + + claimed + } + + pub fn total_runs(&self) -> u32 { + self.total_runs.load(Ordering::Relaxed) + } + + pub fn total_rejects(&self) -> u32 { + self.total_rejects.load(Ordering::Relaxed) + } + + pub fn failed_worked_id(&self) -> Option { + self.found_failure.get().copied() + } +} + +#[derive(Default)] +pub struct FuzzWorker { + /// Worker identifier + pub worker_id: u32, + /// First fuzz case this worker encountered (with global run number) + pub first_case: Option<(u32, FuzzCase)>, + /// Gas usage for all cases this worker ran + pub gas_by_case: Vec<(u64, u64)>, + /// Counterexample if this worker found one + pub counterexample: (Bytes, RawCallResult), + /// Traces collected by this worker + /// + /// Stores upto `max_traces_to_collect` which is `config.gas_report_samples / num_workers` + pub traces: Vec, + /// Last breakpoints from this worker + pub breakpoints: Option, + /// Coverage collected by this worker + pub coverage: Option, + /// Logs from all cases this worker ran + pub logs: Vec, + /// Deprecated cheatcodes seen by this worker + pub deprecated_cheatcodes: HashMap<&'static str, Option<&'static str>>, + /// Number of runs this worker completed + pub runs: u32, + /// Number of rejects this worker encountered + pub rejects: u32, + /// Failure reason if this worker failed + pub failure: Option, + /// Last run timestamp in milliseconds + /// + /// Used to identify which worker ran last and collect its traces and call breakpoints + pub last_run_timestamp: u128, + /// Failed corpus replays + pub failed_corpus_replays: usize, +} + +impl FuzzWorker { + pub fn new(worker_id: u32) -> Self { + Self { worker_id, ..Default::default() } + } +} diff --git a/crates/evm/fuzz/src/lib.rs b/crates/evm/fuzz/src/lib.rs index e0347dc5088a9..d35d992c53343 100644 --- a/crates/evm/fuzz/src/lib.rs +++ b/crates/evm/fuzz/src/lib.rs @@ -246,7 +246,7 @@ pub struct FuzzTestResult { // Deprecated cheatcodes mapped to their replacements. pub deprecated_cheatcodes: HashMap<&'static str, Option<&'static str>>, - /// NUmber of failed replays from persisted corpus. + /// Number of failed replays from persisted corpus. pub failed_corpus_replays: usize, } diff --git a/crates/evm/traces/src/identifier/signatures.rs b/crates/evm/traces/src/identifier/signatures.rs index 5c4765a851fc7..0e524f171053b 100644 --- a/crates/evm/traces/src/identifier/signatures.rs +++ b/crates/evm/traces/src/identifier/signatures.rs @@ -223,6 +223,13 @@ impl SignaturesIdentifier { self.identify_events([identifier]).await.pop().unwrap() } + /// Synchronously identifies an `Event` using only cached data. + pub fn identify_event_sync(&self, identifier: B256) -> Option { + let cache = self.0.cache.blocking_read(); + let selector = SelectorKind::Event(identifier); + cache.get(&selector).unwrap_or_default().and_then(|sig| get_event(&sig).ok()) + } + /// Identifies `Error`s. pub async fn identify_errors( &self, diff --git a/crates/forge/src/cmd/test/mod.rs b/crates/forge/src/cmd/test/mod.rs index 7c8a47da1bad9..7c851edd7b80c 100644 --- a/crates/forge/src/cmd/test/mod.rs +++ b/crates/forge/src/cmd/test/mod.rs @@ -257,6 +257,9 @@ impl TestArgs { // Merge all configs. let (mut config, evm_opts) = self.load_config_and_evm_opts()?; + // Set the number of threads in fuzz config. + config.fuzz.threads = config.threads; + // Install missing dependencies. if install::install_missing_dependencies(&mut config) && config.auto_detect_remappings { // need to re-configure here to also catch additional remappings diff --git a/crates/forge/tests/cli/test_cmd.rs b/crates/forge/tests/cli/test_cmd.rs index a263b8e0c3693..6a5e300c70b41 100644 --- a/crates/forge/tests/cli/test_cmd.rs +++ b/crates/forge/tests/cli/test_cmd.rs @@ -752,14 +752,14 @@ contract CounterTest is Test { Compiler run successful! Ran 1 test for test/CounterFuzz.t.sol:CounterTest -[FAIL: panic: arithmetic underflow or overflow (0x11); counterexample: calldata=0xa76d58f5fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffe args=[115792089237316195423570985008687907853269984665640564039457584007913129639934 [1.157e77]]] testAddOne(uint256) (runs: 27, [AVG_GAS]) +[FAIL: panic: arithmetic underflow or overflow (0x11); counterexample: calldata=[..] args=[..]] testAddOne(uint256) (runs: [..], [AVG_GAS]) Suite result: FAILED. 0 passed; 1 failed; 0 skipped; [ELAPSED] Ran 1 test suite [ELAPSED]: 0 tests passed, 1 failed, 0 skipped (1 total tests) Failing tests: Encountered 1 failing test in test/CounterFuzz.t.sol:CounterTest -[FAIL: panic: arithmetic underflow or overflow (0x11); counterexample: calldata=0xa76d58f5fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffe args=[115792089237316195423570985008687907853269984665640564039457584007913129639934 [1.157e77]]] testAddOne(uint256) (runs: 27, [AVG_GAS]) +[FAIL: panic: arithmetic underflow or overflow (0x11); counterexample: calldata=[..] args=[..]] testAddOne(uint256) (runs: [..], [AVG_GAS]) Encountered a total of 1 failing tests, 0 tests succeeded diff --git a/crates/forge/tests/it/fuzz.rs b/crates/forge/tests/it/fuzz.rs index 99932e1caa5d7..e4bb97619f028 100644 --- a/crates/forge/tests/it/fuzz.rs +++ b/crates/forge/tests/it/fuzz.rs @@ -208,10 +208,12 @@ contract FuzzerDictTest is Test { ); // Test that immutable address is used as fuzzed input, causing test to fail. - cmd.args(["test", "--fuzz-seed", "119", "--mt", "testImmutableOwner"]).assert_failure(); + // Use --jobs 1 to force single worker for deterministic behavior with seed + cmd.args(["test", "--fuzz-seed", "119", "--mt", "testImmutableOwner", "--jobs", "1"]) + .assert_failure(); // Test that storage address is used as fuzzed input, causing test to fail. cmd.forge_fuse() - .args(["test", "--fuzz-seed", "119", "--mt", "testStorageOwner"]) + .args(["test", "--fuzz-seed", "119", "--mt", "testStorageOwner", "--jobs", "1"]) .assert_failure(); }); @@ -260,7 +262,8 @@ contract FuzzTimeoutTest is Test { "#, ); - cmd.args(["test"]).assert_success().stdout_eq(str![[r#" + // Use single worker for deterministic timeout behavior + cmd.args(["test", "--jobs", "1"]).assert_success().stdout_eq(str![[r#" [COMPILING_FILES] with [SOLC_VERSION] [SOLC_VERSION] [ELAPSED] Compiler run successful! diff --git a/crates/forge/tests/it/test_helpers.rs b/crates/forge/tests/it/test_helpers.rs index 80cdd9d0df34e..56905c47005a2 100644 --- a/crates/forge/tests/it/test_helpers.rs +++ b/crates/forge/tests/it/test_helpers.rs @@ -121,6 +121,7 @@ impl ForgeTestProfile { failure_persist_dir: Some(tempfile::tempdir().unwrap().keep()), show_logs: false, timeout: None, + threads: None, }; config.invariant = InvariantConfig { runs: 256,