diff --git a/crates/evm/evm/src/executors/corpus.rs b/crates/evm/evm/src/executors/corpus.rs index 4577c9788d708..0b49d6904e63c 100644 --- a/crates/evm/evm/src/executors/corpus.rs +++ b/crates/evm/evm/src/executors/corpus.rs @@ -19,6 +19,7 @@ use serde::Serialize; use std::{ fmt, path::PathBuf, + sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; use uuid::Uuid; @@ -27,6 +28,9 @@ const METADATA_SUFFIX: &str = "metadata.json"; const JSON_EXTENSION: &str = ".json"; const FAVORABILITY_THRESHOLD: f64 = 0.3; const COVERAGE_MAP_SIZE: usize = 65536; +const WORKER: &str = "worker"; +const CORPUS_DIR: &str = "corpus"; +const SYNC_DIR: &str = "sync"; /// Possible mutation strategies to apply on a call sequence. #[derive(Debug, Clone)] @@ -46,7 +50,7 @@ enum MutationType { } /// Holds Corpus information. -#[derive(Serialize)] +#[derive(Clone, Serialize)] struct CorpusEntry { // Unique corpus identifier. uuid: Uuid, @@ -60,6 +64,9 @@ struct CorpusEntry { // Whether this corpus is favored, i.e. producing new finds more often than // `FAVORABILITY_THRESHOLD`. is_favored: bool, + /// Timestamp of when this entry was written to disk in seconds. + #[serde(skip_serializing)] + timestamp: u64, } impl CorpusEntry { @@ -70,7 +77,14 @@ impl CorpusEntry { } else { Uuid::new_v4() }; - Ok(Self { uuid, total_mutations: 0, new_finds_produced: 0, tx_seq, is_favored: false }) + Ok(Self { + uuid, + total_mutations: 0, + new_finds_produced: 0, + tx_seq, + is_favored: false, + timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(), + }) } /// New corpus with given call sequence and new uuid. @@ -81,6 +95,7 @@ impl CorpusEntry { new_finds_produced: 0, tx_seq: tx_seq.into(), is_favored: false, + timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), } } } @@ -128,32 +143,43 @@ impl CorpusMetrics { } } -/// Fuzz corpus manager, used in coverage guided fuzzing mode by both stateless and stateful tests. -pub(crate) struct CorpusManager { - // Fuzzed calls generator. - tx_generator: BoxedStrategy, - // Call sequence mutation strategy type generator. - mutation_generator: BoxedStrategy, - // Corpus configuration. - config: FuzzCorpusConfig, - // In-memory corpus, populated from persisted files and current runs. - // Mutation is performed on these. +/// Per-worker corpus manager. +pub struct WorkerCorpus { + /// Worker Id + id: u32, + /// In-memory corpus entries populated from the persisted files and + /// runs administered by this worker. in_memory_corpus: Vec, - // Identifier of current mutated entry. - current_mutated: Option, - // Number of failed replays from persisted corpus. - failed_replays: usize, - // History of binned hitcount of edges seen during fuzzing. + /// History of binned hitcount of edges seen during fuzzing history_map: Vec, - // Corpus metrics. + /// Number of failed replays from initial corpus + pub(crate) failed_replays: usize, + /// Worker Metrics pub(crate) metrics: CorpusMetrics, + /// Fuzzed calls generator. + tx_generator: BoxedStrategy, + /// Call sequence mutation strategy type generator used by stateful fuzzing. + mutation_generator: BoxedStrategy, + /// Identifier of current mutated entry for this worker. + current_mutated: Option, + /// Config + config: Arc, + /// Indices of new entries added to [`WorkerCorpus::in_memory_corpus`] since last sync. + new_entry_indices: Vec, + /// Last sync timestamp in seconds. + last_sync_timestamp: u64, + /// Worker Dir + /// corpus_dir/worker1/ + worker_dir: Option, } -impl CorpusManager { +impl WorkerCorpus { pub fn new( + id: u32, config: FuzzCorpusConfig, tx_generator: BoxedStrategy, - executor: &Executor, + // Only required by master worker (id = 0) to replay existing corpus + executor: Option<&Executor>, fuzzed_function: Option<&Function>, fuzzed_contracts: Option<&FuzzRunIdentifiedContracts>, ) -> eyre::Result { @@ -166,127 +192,140 @@ impl CorpusManager { Just(MutationType::Abi), ] .boxed(); - let mut history_map = vec![0u8; COVERAGE_MAP_SIZE]; - let mut metrics = CorpusMetrics::default(); - let mut in_memory_corpus = vec![]; - let mut failed_replays = 0; - - // Early return if corpus dir / coverage guided fuzzing not configured. - let Some(corpus_dir) = &config.corpus_dir else { - return Ok(Self { - tx_generator, - mutation_generator, - config, - in_memory_corpus, - current_mutated: None, - failed_replays, - history_map, - metrics, - }); - }; - // Ensure corpus dir for current test is created. - if !corpus_dir.is_dir() { - foundry_common::fs::create_dir_all(corpus_dir)?; - } + let worker_dir = if let Some(corpus_dir) = &config.corpus_dir { + // Create the necessary directories for the worker + let worker_dir = corpus_dir.join(format!("{WORKER}{id}")); + let worker_corpus = &worker_dir.join(CORPUS_DIR); + let sync_dir = &worker_dir.join(SYNC_DIR); - let can_replay_tx = |tx: &BasicTxDetails| -> bool { - fuzzed_contracts.is_some_and(|contracts| contracts.targets.lock().can_replay(tx)) - || fuzzed_function.is_some_and(|function| { - tx.call_details - .calldata - .get(..4) - .is_some_and(|selector| function.selector() == selector) - }) - }; - - 'corpus_replay: for entry in std::fs::read_dir(corpus_dir)? { - let path = entry?.path(); - if path.is_file() - && let Some(name) = path.file_name().and_then(|s| s.to_str()) - && name.contains(METADATA_SUFFIX) - { - // Ignore metadata files - continue; + if !worker_corpus.is_dir() { + foundry_common::fs::create_dir_all(worker_corpus)?; } - let read_corpus_result = match path.extension().and_then(|ext| ext.to_str()) { - Some("gz") => foundry_common::fs::read_json_gzip_file::>(&path), - _ => foundry_common::fs::read_json_file::>(&path), - }; + if !sync_dir.is_dir() { + foundry_common::fs::create_dir_all(sync_dir)?; + } - let Ok(tx_seq) = read_corpus_result else { - trace!(target: "corpus", "failed to load corpus from {}", path.display()); - continue; - }; + Some(worker_dir) + } else { + None + }; - if !tx_seq.is_empty() { - // Warm up history map from loaded sequences. - let mut executor = executor.clone(); - for tx in &tx_seq { - if can_replay_tx(tx) { - let mut call_result = executor - .call_raw( - tx.sender, - tx.call_details.target, - tx.call_details.calldata.clone(), - U256::ZERO, - ) - .map_err(|e| eyre!(format!("Could not make raw evm call: {e}")))?; + let mut in_memory_corpus = vec![]; + let mut history_map = vec![0u8; COVERAGE_MAP_SIZE]; + let mut metrics = CorpusMetrics::default(); + let mut failed_replays = 0; - let (new_coverage, is_edge) = - call_result.merge_edge_coverage(&mut history_map); - if new_coverage { - metrics.update_seen(is_edge); - } + if id == 0 && config.corpus_dir.is_some() { + // Master worker loads the initial corpus if it exists + let corpus_dir = config.corpus_dir.as_ref().unwrap(); + + let can_replay_tx = |tx: &BasicTxDetails| -> bool { + fuzzed_contracts.is_some_and(|contracts| contracts.targets.lock().can_replay(tx)) + || fuzzed_function.is_some_and(|function| { + tx.call_details + .calldata + .get(..4) + .is_some_and(|selector| function.selector() == selector) + }) + }; - // Commit only when running invariant / stateful tests. - if fuzzed_contracts.is_some() { - executor.commit(&mut call_result); - } - } else { - failed_replays += 1; + let executor = executor.expect("Executor required for master worker"); + 'corpus_replay: for entry in std::fs::read_dir(corpus_dir)? { + let path = entry?.path(); + if path.is_file() + && let Some(name) = path.file_name().and_then(|s| s.to_str()) + && name.contains(METADATA_SUFFIX) + { + // Ignore metadata files + continue; + } - // If the only input for fuzzed function cannot be replied, then move to - // next one without adding it in memory. - if fuzzed_function.is_some() { - continue 'corpus_replay; + let read_corpus_result = match path.extension().and_then(|ext| ext.to_str()) { + Some("gz") => { + foundry_common::fs::read_json_gzip_file::>(&path) + } + _ => foundry_common::fs::read_json_file::>(&path), + }; + + let Ok(tx_seq) = read_corpus_result else { + trace!(target: "corpus", "failed to load corpus from {}", path.display()); + continue; + }; + + if !tx_seq.is_empty() { + // Warm up history map from loaded sequences. + let mut executor = executor.clone(); + for tx in &tx_seq { + if can_replay_tx(tx) { + let mut call_result = executor + .call_raw( + tx.sender, + tx.call_details.target, + tx.call_details.calldata.clone(), + U256::ZERO, + ) + .map_err(|e| eyre!(format!("Could not make raw evm call: {e}")))?; + + let (new_coverage, is_edge) = + call_result.merge_edge_coverage(&mut history_map); + if new_coverage { + metrics.update_seen(is_edge); + } + + // Commit only when running invariant / stateful tests. + if fuzzed_contracts.is_some() { + executor.commit(&mut call_result); + } + } else { + failed_replays += 1; + + // If the only input for fuzzed function cannot be replied, then move to + // next one without adding it in memory. + if fuzzed_function.is_some() { + continue 'corpus_replay; + } } } - } - metrics.corpus_count += 1; + metrics.corpus_count += 1; - trace!( - target: "corpus", - "load sequence with len {} from corpus file {}", - tx_seq.len(), - path.display() - ); + trace!( + target: "corpus", + "load sequence with len {} from corpus file {}", + tx_seq.len(), + path.display() + ); - // Populate in memory corpus with the sequence from corpus file. - in_memory_corpus.push(CorpusEntry::new(tx_seq, path)?); + // Populate in memory corpus with the sequence from corpus file. + in_memory_corpus.push(CorpusEntry::new(tx_seq, path)?); + } } } Ok(Self { - tx_generator, - mutation_generator, - config, + id, in_memory_corpus, - current_mutated: None, - failed_replays, history_map, + failed_replays, metrics, + tx_generator, + mutation_generator, + current_mutated: None, + config: config.into(), + new_entry_indices: Default::default(), + last_sync_timestamp: 0, + worker_dir, }) } /// Updates stats for the given call sequence, if new coverage produced. /// Persists the call sequence (if corpus directory is configured and new coverage) and updates /// in-memory corpus. + #[tracing::instrument(skip_all, fields(worker_id = self.id))] pub fn process_inputs(&mut self, inputs: &[BasicTxDetails], new_coverage: bool) { - // Early return if corpus dir / coverage guided fuzzing is not configured. - let Some(corpus_dir) = &self.config.corpus_dir else { + let Some(worker_corpus) = &self.worker_dir else { return; }; @@ -321,16 +360,18 @@ impl CorpusManager { let corpus = CorpusEntry::from_tx_seq(inputs); let corpus_uuid = corpus.uuid; - + let timestamp = corpus.timestamp; // Persist to disk if corpus dir is configured. let write_result = if self.config.corpus_gzip { foundry_common::fs::write_json_gzip_file( - corpus_dir.join(format!("{corpus_uuid}{JSON_EXTENSION}.gz")).as_path(), + worker_corpus + .join(format!("{corpus_uuid}-{timestamp}{JSON_EXTENSION}.gz")) + .as_path(), &corpus.tx_seq, ) } else { foundry_common::fs::write_json_file( - corpus_dir.join(format!("{corpus_uuid}{JSON_EXTENSION}")).as_path(), + worker_corpus.join(format!("{corpus_uuid}-{timestamp}{JSON_EXTENSION}")).as_path(), &corpus.tx_seq, ) }; @@ -340,19 +381,37 @@ impl CorpusManager { } else { trace!( target: "corpus", - "persisted {} inputs for new coverage in {corpus_uuid} corpus", + "persisted {} inputs for new coverage for {corpus_uuid} corpus", &corpus.tx_seq.len() ); } + // Track in-memory corpus changes to update MasterWorker on sync + let new_index = self.in_memory_corpus.len(); + self.new_entry_indices.push(new_index); + // This includes reverting txs in the corpus and `can_continue` removes // them. We want this as it is new coverage and may help reach the other branch. self.metrics.corpus_count += 1; self.in_memory_corpus.push(corpus); } + /// Collects coverage from call result and updates metrics. + pub fn merge_edge_coverage(&mut self, call_result: &mut RawCallResult) -> bool { + if !self.config.collect_edge_coverage() { + return false; + } + + let (new_coverage, is_edge) = call_result.merge_edge_coverage(&mut self.history_map); + if new_coverage { + self.metrics.update_seen(is_edge); + } + new_coverage + } + /// Generates new call sequence from in memory corpus. Evicts oldest corpus mutated more than /// configured max mutations value. Used by invariant test campaigns. + #[tracing::instrument(skip_all, fields(worker_id = self.id))] pub fn new_inputs( &mut self, test_runner: &mut TestRunner, @@ -376,6 +435,7 @@ impl CorpusManager { .new_tree(test_runner) .map_err(|err| eyre!("Could not generate mutation type {err}"))? .current(); + let rng = test_runner.rng(); let corpus_len = self.in_memory_corpus.len(); let primary = &self.in_memory_corpus[rng.random_range(0..corpus_len)]; @@ -478,8 +538,9 @@ impl CorpusManager { Ok(new_seq) } - /// Generates new input from in memory corpus. Evicts oldest corpus mutated more than - /// configured max mutations value. Used by fuzz test campaigns. + /// Generates a new input from the shared in memory corpus. Evicts oldest corpus mutated more + /// than configured max mutations value. Used by fuzz (stateless) test campaigns. + #[tracing::instrument(skip_all, fields(worker_id = self.id))] pub fn new_input( &mut self, test_runner: &mut TestRunner, @@ -491,9 +552,9 @@ impl CorpusManager { return Ok(self.new_tx(test_runner)?.call_details.calldata); } - let tx = if !self.in_memory_corpus.is_empty() { - self.evict_oldest_corpus()?; + self.evict_oldest_corpus()?; + let tx = if !self.in_memory_corpus.is_empty() { let corpus = &self.in_memory_corpus [test_runner.rng().random_range(0..self.in_memory_corpus.len())]; self.current_mutated = Some(corpus.uuid); @@ -508,6 +569,15 @@ impl CorpusManager { Ok(tx.call_details.calldata) } + /// Generates single call from corpus strategy. + pub fn new_tx(&self, test_runner: &mut TestRunner) -> eyre::Result { + Ok(self + .tx_generator + .new_tree(test_runner) + .map_err(|_| eyre!("Could not generate case"))? + .current()) + } + /// Returns the next call to be used in call sequence. /// If coverage guided fuzzing is not configured or if previous input was discarded then this is /// a new tx from strategy. @@ -537,33 +607,6 @@ impl CorpusManager { Ok(sequence[depth].clone()) } - /// Generates single call from corpus strategy. - pub fn new_tx(&mut self, test_runner: &mut TestRunner) -> eyre::Result { - Ok(self - .tx_generator - .new_tree(test_runner) - .map_err(|_| eyre!("Could not generate case"))? - .current()) - } - - /// Returns campaign failed replays. - pub fn failed_replays(self) -> usize { - self.failed_replays - } - - /// Collects coverage from call result and updates metrics. - pub fn merge_edge_coverage(&mut self, call_result: &mut RawCallResult) -> bool { - if !self.config.collect_edge_coverage() { - return false; - } - - let (new_coverage, is_edge) = call_result.merge_edge_coverage(&mut self.history_map); - if new_coverage { - self.metrics.update_seen(is_edge); - } - new_coverage - } - /// Flush the oldest corpus mutated more than configured max mutations unless they are /// favored. fn evict_oldest_corpus(&mut self) -> eyre::Result<()> { @@ -580,8 +623,7 @@ impl CorpusManager { // Flush to disk the seed metadata at the time of eviction. let eviction_time = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); foundry_common::fs::write_json_file( - self.config - .corpus_dir + self.worker_dir .clone() .unwrap() .join(format!("{uuid}-{eviction_time}-{METADATA_SUFFIX}")) @@ -591,6 +633,16 @@ impl CorpusManager { // Remove corpus from memory. self.in_memory_corpus.remove(index); + + // Adjust the tracked indices + self.new_entry_indices.retain_mut(|i| { + if *i > index { + *i -= 1; // Shift indices down + true // Keep this index + } else { + *i != index // Remove if it's the deleted index, keep otherwise + } + }); } Ok(()) } @@ -638,6 +690,304 @@ impl CorpusManager { function.abi_encode_input(&prev_inputs).map_err(|e| eyre!(e.to_string()))?.into(); Ok(()) } + + // Sync Methods + + /// Exports the new corpus entries to the master workers (id = 0) sync dir. + #[tracing::instrument(skip_all, fields(worker_id = self.id))] + fn export(&self) -> eyre::Result<()> { + // Early return if no new entries or corpus dir not configured + if self.new_entry_indices.is_empty() || self.worker_dir.is_none() { + return Ok(()); + } + + let worker_dir = self.worker_dir.as_ref().unwrap(); + + // Master doesn't export (it only receives from others) + if self.id == 0 { + return Ok(()); + } + + let Some(master_sync_dir) = self + .config + .corpus_dir + .as_ref() + .map(|dir| dir.join(format!("{WORKER}0")).join(SYNC_DIR)) + else { + return Ok(()); + }; + + let mut exported = 0; + let corpus_dir = worker_dir.join(CORPUS_DIR); + + for &index in &self.new_entry_indices { + if let Some(entry) = self.in_memory_corpus.get(index) { + let ext = self + .config + .corpus_gzip + .then_some(format!("{JSON_EXTENSION}.gz")) + .unwrap_or(JSON_EXTENSION.to_string()); + let file_name = format!("{}-{}{ext}", entry.uuid, entry.timestamp); + let file_path = corpus_dir.join(&file_name); + let sync_path = master_sync_dir.join(&file_name); + + let Ok(_) = foundry_common::fs::copy(file_path, sync_path) else { + debug!(target: "corpus", "failed to export corpus {}", entry.uuid); + continue; + }; + + exported += 1; + } + } + + trace!(target: "corpus", "exported {exported} new corpus entries"); + + Ok(()) + } + + /// Imports the new corpus entries tx sequence which will be used to replay and update history + /// map. + fn import(&self) -> eyre::Result>> { + let Some(worker_dir) = &self.worker_dir else { + return Ok(vec![]); + }; + + let sync_dir = worker_dir.join(SYNC_DIR); + if !sync_dir.is_dir() { + return Ok(vec![]); + } + + let mut imports = vec![]; + for entry in std::fs::read_dir(sync_dir)? { + let Ok(entry) = entry else { + continue; + }; + + // Get the uuid and timestamp from the filename + let timestamp = if let Some(name) = entry.file_name().to_str() + && let Ok((_, timestamp)) = parse_corpus_filename(name) + { + timestamp + } else { + continue; + }; + + if timestamp <= self.last_sync_timestamp { + // TODO: Delete synced file + continue; + } + + let corpus = if self.config.corpus_gzip { + foundry_common::fs::read_json_gzip_file::>(&entry.path())? + } else { + foundry_common::fs::read_json_file::>(&entry.path())? + }; + + imports.push(corpus); + } + + Ok(imports) + } + + /// Syncs and calibrates the in memory corpus and updates the history_map if new coverage is + /// found from the corpus findings of other workers. + #[tracing::instrument(skip_all, fields(worker_id = self.id))] + fn calibrate( + &mut self, + executor: &Executor, + fuzzed_function: Option<&Function>, + fuzzed_contracts: Option<&FuzzRunIdentifiedContracts>, + ) -> eyre::Result<()> { + let Some(worker_dir) = &self.worker_dir else { + return Ok(()); + }; + + // Helper to check if tx can be replayed + let can_replay_tx = |tx: &BasicTxDetails| -> bool { + fuzzed_contracts.is_some_and(|contracts| contracts.targets.lock().can_replay(tx)) + || fuzzed_function.is_some_and(|function| { + tx.call_details + .calldata + .get(..4) + .is_some_and(|selector| function.selector() == selector) + }) + }; + + let sync_dir = worker_dir.join(SYNC_DIR); + let corpus_dir = worker_dir.join(CORPUS_DIR); + + let mut executor = executor.clone(); + for tx_seq in self.import()? { + if !tx_seq.is_empty() { + let mut new_coverage_on_sync = false; + for tx in &tx_seq { + if can_replay_tx(tx) { + let mut call_result = executor.call_raw( + tx.sender, + tx.call_details.target, + tx.call_details.calldata.clone(), + U256::ZERO, + )?; + + // Check if this provides new coverage + let (new_coverage, is_edge) = + call_result.merge_edge_coverage(&mut self.history_map); + + if new_coverage { + self.metrics.update_seen(is_edge); + new_coverage_on_sync = true; + } + + // Commit only for stateful tests + if fuzzed_contracts.is_some() { + executor.commit(&mut call_result); + } + + trace!( + target: "corpus", + %new_coverage, + "replayed tx for syncing: {:?}", + &tx + ); + } + } + + if new_coverage_on_sync { + let corpus_entry = CorpusEntry::from_tx_seq(&tx_seq); + let ext = self + .config + .corpus_gzip + .then_some(format!("{JSON_EXTENSION}.gz")) + .unwrap_or(JSON_EXTENSION.to_string()); + + let file_name = + format!("{}-{}{ext}", corpus_entry.uuid, corpus_entry.timestamp); + + // Move file from sync/ to corpus/ directory + let sync_path = sync_dir.join(&file_name); + let corpus_path = corpus_dir.join(&file_name); + + let Ok(_) = std::fs::rename(&sync_path, &corpus_path) else { + debug!(target: "corpus", "failed to move synced corpus {} from {sync_path:?} to {corpus_path:?} dir", corpus_entry.uuid); + continue; + }; + + trace!( + target: "corpus", + "moved synced corpus {} to corpus dir", + corpus_entry.uuid + ); + + self.in_memory_corpus.push(corpus_entry); + } + } + } + + let last_sync = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); + trace!(target: "corpus", "sync complete, updating last sync time to {}", last_sync); + self.last_sync_timestamp = last_sync; + + Ok(()) + } + + /// To be run by the master worker (id = 0) to distribute the global corpus to sync/ directories + /// of other workers. + #[tracing::instrument(skip_all, fields(worker_id = self.id))] + fn distribute(&mut self, num_workers: usize) -> eyre::Result<()> { + if self.id != 0 || self.worker_dir.is_none() { + return Ok(()); + } + + let worker_dir = self.worker_dir.as_ref().unwrap(); + let master_corpus_dir = worker_dir.join(CORPUS_DIR); + + for target_worker in 1..num_workers { + let target_dir = self + .config + .corpus_dir + .as_ref() + .unwrap() + .join(format!("{WORKER}{target_worker}")) + .join(SYNC_DIR); + + if !target_dir.is_dir() { + foundry_common::fs::create_dir_all(&target_dir)?; + } + + for entry in std::fs::read_dir(&master_corpus_dir)? { + let Ok(entry) = entry else { + continue; + }; + + let path = entry.path(); + if path.is_file() + && let Some(name) = path.file_name().and_then(|s| s.to_str()) + && !name.contains(METADATA_SUFFIX) + { + let sync_path = target_dir.join(name); + + let Ok((_, timestamp)) = parse_corpus_filename(name) else { + continue; + }; + + if timestamp > self.last_sync_timestamp { + let Ok(_) = foundry_common::fs::copy(&path, &sync_path) else { + debug!(target: "corpus", "failed to distribute corpus {} to {target_dir:?}", name); + continue; + }; + + trace!(target: "corpus", "distributed corpus {} to {target_dir:?}", name); + } + } + } + } + + Ok(()) + } + + /// Syncs the workers in_memory_corpus and history_map with the findings from other workers. + #[tracing::instrument(skip_all, fields(worker_id = self.id))] + pub fn sync( + &mut self, + num_workers: usize, + executor: &Executor, + fuzzed_function: Option<&Function>, + fuzzed_contracts: Option<&FuzzRunIdentifiedContracts>, + ) -> eyre::Result<()> { + if self.id == 0 { + // Master worker + self.calibrate(executor, fuzzed_function, fuzzed_contracts)?; + + self.distribute(num_workers)?; + + self.new_entry_indices.clear(); + + trace!(target: "corpus", "master synced"); + + return Ok(()); + } + + self.export()?; + + self.calibrate(executor, fuzzed_function, fuzzed_contracts)?; + + self.new_entry_indices.clear(); + + trace!(target: "corpus", "synced"); + + Ok(()) + } +} + +/// Parses the corpus filename and returns the uuid and timestamp associated with it. +fn parse_corpus_filename(name: &str) -> eyre::Result<(Uuid, u64)> { + let name = name.trim_end_matches(".gz").trim_end_matches(JSON_EXTENSION); + + let parts = name.rsplitn(2, "-").collect::>(); + let uuid = Uuid::parse_str(parts[0])?; + let timestamp = parts[1].parse()?; + + Ok((uuid, timestamp)) } #[cfg(test)] @@ -662,7 +1012,7 @@ mod tests { dir } - fn new_manager_with_single_corpus() -> (CorpusManager, Uuid) { + fn new_manager_with_single_corpus() -> (WorkerCorpus, Uuid) { let tx_gen = Just(basic_tx()).boxed(); let config = FuzzCorpusConfig { corpus_dir: Some(temp_corpus_dir()), @@ -676,15 +1026,24 @@ mod tests { let corpus = CorpusEntry::from_tx_seq(&tx_seq); let seed_uuid = corpus.uuid; - let manager = CorpusManager { + // Create corpus root dir and worker subdirectory + let corpus_root = config.corpus_dir.clone().unwrap(); + let worker_subdir = corpus_root.join("worker0"); + let _ = fs::create_dir_all(&worker_subdir); + + let manager = WorkerCorpus { + id: 0, tx_generator: tx_gen, mutation_generator: Just(MutationType::Repeat).boxed(), - config, + config: config.into(), in_memory_corpus: vec![corpus], current_mutated: Some(seed_uuid), failed_replays: 0, history_map: vec![0u8; COVERAGE_MAP_SIZE], metrics: CorpusMetrics::default(), + new_entry_indices: Default::default(), + last_sync_timestamp: 0, + worker_dir: Some(corpus_root), }; (manager, seed_uuid) @@ -774,15 +1133,23 @@ mod tests { non_favored.is_favored = false; let non_favored_uuid = non_favored.uuid; - let mut manager = CorpusManager { + let corpus_root = temp_corpus_dir(); + let worker_subdir = corpus_root.join("worker0"); + fs::create_dir_all(&worker_subdir).unwrap(); + + let mut manager = WorkerCorpus { + id: 0, tx_generator: tx_gen, mutation_generator: Just(MutationType::Repeat).boxed(), - config, + config: config.into(), in_memory_corpus: vec![favored, non_favored], current_mutated: None, failed_replays: 0, history_map: vec![0u8; COVERAGE_MAP_SIZE], metrics: CorpusMetrics::default(), + new_entry_indices: Default::default(), + last_sync_timestamp: 0, + worker_dir: Some(corpus_root), }; // First eviction should remove the non-favored one diff --git a/crates/evm/evm/src/executors/fuzz/mod.rs b/crates/evm/evm/src/executors/fuzz/mod.rs index 61bf824a44a79..09a8a8916964a 100644 --- a/crates/evm/evm/src/executors/fuzz/mod.rs +++ b/crates/evm/evm/src/executors/fuzz/mod.rs @@ -1,5 +1,6 @@ use crate::executors::{ DURATION_BETWEEN_METRICS_REPORT, Executor, FailFast, FuzzTestTimer, RawCallResult, + corpus::WorkerCorpus, }; use alloy_dyn_abi::JsonAbiExt; use alloy_json_abi::Function; @@ -28,7 +29,6 @@ use serde_json::json; use std::time::{Instant, SystemTime, UNIX_EPOCH}; mod types; -use crate::executors::corpus::CorpusManager; pub use types::{CaseOutcome, CounterExampleOutcome, FuzzOutcome}; /// Contains data collected during fuzz test runs. @@ -119,10 +119,11 @@ impl FuzzedExecutor { // We want to collect at least one trace which will be displayed to user. let max_traces_to_collect = std::cmp::max(1, self.config.gas_report_samples) as usize; - let mut corpus_manager = CorpusManager::new( + let mut corpus_manager = WorkerCorpus::new( + 0, // Id of the Master self.config.corpus.clone(), strategy.boxed(), - &self.executor, + Some(&self.executor), Some(func), None, )?; @@ -259,7 +260,7 @@ impl FuzzedExecutor { gas_report_traces: traces.into_iter().map(|a| a.arena).collect(), line_coverage: test_data.coverage, deprecated_cheatcodes: test_data.deprecated_cheatcodes, - failed_corpus_replays: corpus_manager.failed_replays(), + failed_corpus_replays: corpus_manager.failed_replays, }; match test_data.failure { @@ -300,7 +301,7 @@ impl FuzzedExecutor { &mut self, address: Address, calldata: Bytes, - coverage_metrics: &mut CorpusManager, + coverage_metrics: &mut WorkerCorpus, ) -> Result { let mut call = self .executor diff --git a/crates/evm/evm/src/executors/invariant/mod.rs b/crates/evm/evm/src/executors/invariant/mod.rs index 9361dca926549..183355421e565 100644 --- a/crates/evm/evm/src/executors/invariant/mod.rs +++ b/crates/evm/evm/src/executors/invariant/mod.rs @@ -1,5 +1,5 @@ use crate::{ - executors::{Executor, RawCallResult}, + executors::{Executor, RawCallResult, corpus::WorkerCorpus}, inspectors::Fuzzer, }; use alloy_primitives::{ @@ -51,9 +51,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; mod shrink; -use crate::executors::{ - DURATION_BETWEEN_METRICS_REPORT, EvmError, FailFast, FuzzTestTimer, corpus::CorpusManager, -}; +use crate::executors::{DURATION_BETWEEN_METRICS_REPORT, EvmError, FailFast, FuzzTestTimer}; pub use shrink::check_sequence; sol! { @@ -547,7 +545,7 @@ impl<'a> InvariantExecutor<'a> { gas_report_traces: result.gas_report_traces, line_coverage: result.line_coverage, metrics: result.metrics, - failed_corpus_replays: corpus_manager.failed_replays(), + failed_corpus_replays: corpus_manager.failed_replays, }) } @@ -559,7 +557,7 @@ impl<'a> InvariantExecutor<'a> { invariant_contract: &InvariantContract<'_>, fuzz_fixtures: &FuzzFixtures, deployed_libs: &[Address], - ) -> Result<(InvariantTest, CorpusManager)> { + ) -> Result<(InvariantTest, WorkerCorpus)> { // Finds out the chosen deployed contracts and/or senders. self.select_contract_artifacts(invariant_contract.address)?; let (targeted_senders, targeted_contracts) = @@ -633,13 +631,15 @@ impl<'a> InvariantExecutor<'a> { return Err(eyre!(error.revert_reason().unwrap_or_default())); } - let corpus_manager = CorpusManager::new( + let worker = WorkerCorpus::new( + 0, self.config.corpus.clone(), strategy.boxed(), - &self.executor, + Some(&self.executor), None, Some(&targeted_contracts), )?; + let invariant_test = InvariantTest::new( fuzz_state, targeted_contracts, @@ -648,7 +648,7 @@ impl<'a> InvariantExecutor<'a> { self.runner.clone(), ); - Ok((invariant_test, corpus_manager)) + Ok((invariant_test, worker)) } /// Fills the `InvariantExecutor` with the artifact identifier filters (in `path:name` string