From fac54f51bacaff5dae336f79a029fb3fb3a62909 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Fri, 1 Aug 2025 14:13:10 +0530 Subject: [PATCH] chore(sync): remove heavy clones --- crates/services/sync/src/import.rs | 53 +++---- crates/services/sync/src/import/cache.rs | 178 ++++++++++++----------- 2 files changed, 122 insertions(+), 109 deletions(-) diff --git a/crates/services/sync/src/import.rs b/crates/services/sync/src/import.rs index ec24a03c55c..a1a6223c1ce 100644 --- a/crates/services/sync/src/import.rs +++ b/crates/services/sync/src/import.rs @@ -115,7 +115,7 @@ enum BlockHeaderData { /// The headers (or full blocks) have been fetched and checked. Cached(CachedDataBatch), /// The headers has just been fetched from the network. - Fetched(Batch), + Fetched(Arc>), } impl Import { @@ -167,8 +167,8 @@ impl Batch { } } -type SealedHeaderBatch = Batch; -type SealedBlockBatch = Batch; +type SealedHeaderBatch = Arc>; +type SealedBlockBatch = Arc>; impl Import where @@ -330,7 +330,7 @@ where peer, range, results, - } = batch; + } = batch.as_ref(); let mut done = vec![]; let mut shutdown = shutdown.clone(); @@ -341,7 +341,7 @@ where _ = shutdown.while_started() => { break; }, - res = execute_and_commit(executor.as_ref(), state, sealed_block) => { + res = execute_and_commit(executor.as_ref(), state, sealed_block.clone()) => { cache.remove_element(&height); res }, @@ -363,10 +363,10 @@ where let batch = Batch::new(peer.clone(), range.clone(), done); if !batch.is_err() { - report_peer(p2p, peer, PeerReportReason::SuccessfulBlockImport); + report_peer(p2p, peer.clone(), PeerReportReason::SuccessfulBlockImport); } - batch + Arc::new(batch) } .instrument(tracing::debug_span!("execute_and_commit")) .in_current_span() @@ -432,9 +432,9 @@ fn get_block_stream< peer, range, results, - } = fetched_batch; + } = fetched_batch.as_ref(); let checked_headers = results - .into_iter() + .iter() .take_while(|header| { check_sealed_header( header, @@ -443,8 +443,9 @@ fn get_block_stream< &consensus, ) }) + .cloned() .collect::>(); - let batch = Batch::new(peer, range.clone(), checked_headers); + let batch = Arc::new(Batch::new(peer.clone(), range.clone(), checked_headers)); if !batch.is_err() { cache.insert_headers(batch.clone()); } @@ -470,9 +471,9 @@ fn get_block_stream< peer, range, results, - } = batch; + } = batch.as_ref(); if results.is_empty() { - SealedBlockBatch::new(peer, range, vec![]) + Arc::new(Batch::new(peer.clone(), range.clone(), vec![])) } else { await_da_height( results @@ -482,7 +483,7 @@ fn get_block_stream< ) .await; let headers = - SealedHeaderBatch::new(peer, range.clone(), results); + Arc::new(Batch::new(peer.clone(), range.clone(), results.to_vec())); let batch = get_blocks(&p2p, headers).await; if !batch.is_err() { cache.insert_blocks(batch.clone()); @@ -492,7 +493,7 @@ fn get_block_stream< } BlockHeaderData::Cached(CachedDataBatch::None(_)) => { tracing::error!("Cached data batch should never be created outside of the caching algorithm."); - Batch::new(None, 0..1, vec![]) + Arc::new(Batch::new(None, 0..1, vec![])) } } } @@ -624,7 +625,7 @@ where range.end ); let Some(sourced_headers) = get_sealed_block_headers(range.clone(), p2p).await else { - return Batch::new(None, range, vec![]) + return Arc::new(Batch::new(None, range, vec![])) }; let SourcePeer { peer_id, @@ -647,7 +648,7 @@ where PeerReportReason::MissingBlockHeaders, ); } - Batch::new(Some(peer_id), range, headers) + Arc::new(Batch::new(Some(peer_id), range, headers)) } fn report_peer

(p2p: &Arc

, peer_id: Option, reason: PeerReportReason) @@ -674,17 +675,17 @@ where results: headers, range, peer, - } = headers; + } = headers.as_ref(); let Some(SourcePeer { peer_id, data: transactions, }) = get_transactions(range.clone(), peer.clone(), p2p).await else { - return Batch::new(peer, range, vec![]) + return Arc::new(Batch::new(peer.clone(), range.clone(), vec![])) }; - let iter = headers.into_iter().zip(transactions.into_iter()); + let iter = headers.iter().zip(transactions.into_iter()); let mut blocks = vec![]; for (block_header, transactions) in iter { let SealedBlockHeader { @@ -692,9 +693,11 @@ where entity: header, } = block_header; let block = - Block::try_from_executed(header, transactions.0).map(|block| SealedBlock { - entity: block, - consensus, + Block::try_from_executed(header.clone(), transactions.0).map(|block| { + SealedBlock { + entity: block, + consensus: consensus.clone(), + } }); if let Some(block) = block { blocks.push(block); @@ -707,7 +710,7 @@ where break } } - Batch::new(Some(peer_id), range, blocks) + Arc::new(Batch::new(Some(peer_id), range.clone(), blocks)) } #[tracing::instrument( @@ -774,9 +777,9 @@ impl ScanNone { } impl ScanErr { - fn scan_err<'a, T: 'a>(self) -> impl Stream> + 'a + fn scan_err<'a, T: 'a>(self) -> impl Stream>> + 'a where - S: Stream> + Send + 'a, + S: Stream>> + Send + 'a, { let stream = self.0.boxed::<'a>(); futures::stream::unfold((false, stream), |(mut err, mut stream)| async move { diff --git a/crates/services/sync/src/import/cache.rs b/crates/services/sync/src/import/cache.rs index 4d253f89ed1..032c8431295 100644 --- a/crates/services/sync/src/import/cache.rs +++ b/crates/services/sync/src/import/cache.rs @@ -5,6 +5,7 @@ use std::{ Range, RangeInclusive, }, + sync::Arc, }; use fuel_core_services::SharedMutex; @@ -28,8 +29,8 @@ pub enum CachedData { #[derive(Debug, Clone, PartialEq, Eq)] pub enum CachedDataBatch { - Headers(Batch), - Blocks(Batch), + Headers(Arc>), + Blocks(Arc>), None(Range), } @@ -48,17 +49,20 @@ impl Cache { Self(SharedMutex::new(BTreeMap::new())) } - pub fn insert_blocks(&mut self, batch: Batch) { + pub fn insert_blocks(&mut self, batch: Arc>) { let mut lock = self.0.lock(); - for block in batch.results { - lock.insert(**block.entity.header().height(), CachedData::Block(block)); + for block in batch.results.iter() { + lock.insert( + **block.entity.header().height(), + CachedData::Block(block.clone()), + ); } } - pub fn insert_headers(&mut self, batch: Batch) { + pub fn insert_headers(&mut self, batch: Arc>) { let mut lock = self.0.lock(); - for header in batch.results { - lock.insert(**header.entity.height(), CachedData::Header(header)); + for header in batch.results.iter() { + lock.insert(**header.entity.height(), CachedData::Header(header.clone())); } } @@ -125,72 +129,76 @@ impl Cache { let max_chunk_size = max_chunk_size.get() as usize; match (current_chunk, data) { (CachedDataBatch::None(_), CachedData::Header(data)) => { - CachedDataBatch::Headers(Batch::new( + CachedDataBatch::Headers(Arc::new(Batch::new( None, height..height.saturating_add(1), vec![data], - )) + ))) } (CachedDataBatch::None(_), CachedData::Block(data)) => { - CachedDataBatch::Blocks(Batch::new( + CachedDataBatch::Blocks(Arc::new(Batch::new( None, height..height.saturating_add(1), vec![data], - )) + ))) } - (CachedDataBatch::Headers(mut batch), CachedData::Header(data)) => { + (CachedDataBatch::Headers(batch), CachedData::Header(data)) => { debug_assert_eq!(batch.range.end, height); debug_assert!(batch.range.len() <= max_chunk_size); if batch.range.len() == max_chunk_size { chunks.push(CachedDataBatch::Headers(batch)); - CachedDataBatch::Headers(Batch::new( + CachedDataBatch::Headers(Arc::new(Batch::new( None, height..height.saturating_add(1), vec![data], - )) + ))) } else { - batch.range = batch.range.start..batch.range.end.saturating_add(1); - batch.results.push(data); - CachedDataBatch::Headers(batch) + let mut new_batch = (*batch).clone(); + new_batch.range = + new_batch.range.start..new_batch.range.end.saturating_add(1); + new_batch.results.push(data); + CachedDataBatch::Headers(Arc::new(new_batch)) } } - (CachedDataBatch::Blocks(mut batch), CachedData::Block(data)) => { + (CachedDataBatch::Blocks(batch), CachedData::Block(data)) => { debug_assert_eq!(batch.range.end, height); debug_assert!(batch.range.len() <= max_chunk_size); if batch.range.len() == max_chunk_size { chunks.push(CachedDataBatch::Blocks(batch)); - CachedDataBatch::Blocks(Batch::new( + CachedDataBatch::Blocks(Arc::new(Batch::new( None, height..height.saturating_add(1), vec![data], - )) + ))) } else { - batch.range = batch.range.start..batch.range.end.saturating_add(1); - batch.results.push(data); - CachedDataBatch::Blocks(batch) + let mut new_batch = (*batch).clone(); + new_batch.range = + new_batch.range.start..new_batch.range.end.saturating_add(1); + new_batch.results.push(data); + CachedDataBatch::Blocks(Arc::new(new_batch)) } } (CachedDataBatch::Headers(headers_batch), CachedData::Block(block)) => { debug_assert_eq!(headers_batch.range.end, height); chunks.push(CachedDataBatch::Headers(headers_batch)); - CachedDataBatch::Blocks(Batch::new( + CachedDataBatch::Blocks(Arc::new(Batch::new( None, height..height.saturating_add(1), vec![block], - )) + ))) } (CachedDataBatch::Blocks(blocks_batch), CachedData::Header(header)) => { debug_assert_eq!(blocks_batch.range.end, height); chunks.push(CachedDataBatch::Blocks(blocks_batch)); - CachedDataBatch::Headers(Batch::new( + CachedDataBatch::Headers(Arc::new(Batch::new( None, height..height.saturating_add(1), vec![header], - )) + ))) } } } @@ -220,6 +228,8 @@ impl Cache { #[cfg(test)] mod tests { + use std::sync::Arc; + use crate::import::{ Batch, cache::{ @@ -273,7 +283,7 @@ mod tests { #[test_case(&[ create_header(0) ], &[], 3, 0..=10 => vec![ - CachedDataBatch::Headers(Batch::new(None, 0..1, vec![create_header(0)])), + CachedDataBatch::Headers(Arc::new(Batch::new(None, 0..1, vec![create_header(0)]))), CachedDataBatch::None(1..4), CachedDataBatch::None(4..7), CachedDataBatch::None(7..10), @@ -284,11 +294,11 @@ mod tests { create_header(1), create_header(2) ], &[], 3, 0..=10 => vec![ - CachedDataBatch::Headers(Batch::new(None, 0..3, vec![ + CachedDataBatch::Headers(Arc::new(Batch::new(None, 0..3, vec![ create_header(0), create_header(1), create_header(2) - ])), + ]))), CachedDataBatch::None(3..6), CachedDataBatch::None(6..9), CachedDataBatch::None(9..11), @@ -296,7 +306,7 @@ mod tests { #[test_case(&[], &[ create_block(0) ], 3, 0..=10 => vec![ - CachedDataBatch::Blocks(Batch::new(None, 0..1, vec![create_block(0)])), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 0..1, vec![create_block(0)]))), CachedDataBatch::None(1..4), CachedDataBatch::None(4..7), CachedDataBatch::None(7..10), @@ -307,8 +317,8 @@ mod tests { ], &[ create_block(1) ], 3, 0..=10 => vec![ - CachedDataBatch::Headers(Batch::new(None, 0..1, vec![create_header(0)])), - CachedDataBatch::Blocks(Batch::new(None, 1..2, vec![create_block(1)])), + CachedDataBatch::Headers(Arc::new(Batch::new(None, 0..1, vec![create_header(0)]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 1..2, vec![create_block(1)]))), CachedDataBatch::None(2..5), CachedDataBatch::None(5..8), CachedDataBatch::None(8..11), @@ -320,14 +330,14 @@ mod tests { create_block(2), create_block(3) ], 2, 0..=10 => vec![ - CachedDataBatch::Headers(Batch::new(None, 0..2, vec![ + CachedDataBatch::Headers(Arc::new(Batch::new(None, 0..2, vec![ create_header(0), create_header(1) - ])), - CachedDataBatch::Blocks(Batch::new(None, 2..4, vec![ + ]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 2..4, vec![ create_block(2), create_block(3) - ])), + ]))), CachedDataBatch::None(4..6), CachedDataBatch::None(6..8), CachedDataBatch::None(8..10), @@ -344,22 +354,22 @@ mod tests { create_block(6), create_block(7) ], 2, 0..=10 => vec![ - CachedDataBatch::Headers(Batch::new(None, 0..2, vec![ + CachedDataBatch::Headers(Arc::new(Batch::new(None, 0..2, vec![ create_header(0), create_header(1) - ])), - CachedDataBatch::Headers(Batch::new(None, 2..4, vec![ + ]))), + CachedDataBatch::Headers(Arc::new(Batch::new(None, 2..4, vec![ create_header(2), create_header(3) - ])), - CachedDataBatch::Blocks(Batch::new(None, 4..6, vec![ + ]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 4..6, vec![ create_block(4), create_block(5) - ])), - CachedDataBatch::Blocks(Batch::new(None, 6..8, vec![ + ]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 6..8, vec![ create_block(6), create_block(7) - ])), + ]))), CachedDataBatch::None(8..10), CachedDataBatch::None(10..11), ]; "multiple headers, multiple blocks and empty ranges with smaller chunk size")] @@ -374,22 +384,22 @@ mod tests { create_block(6), create_block(7) ], 2, 0..=7 => vec![ - CachedDataBatch::Headers(Batch::new(None, 0..2, vec![ + CachedDataBatch::Headers(Arc::new(Batch::new(None, 0..2, vec![ create_header(0), create_header(1) - ])), - CachedDataBatch::Headers(Batch::new(None, 2..4, vec![ + ]))), + CachedDataBatch::Headers(Arc::new(Batch::new(None, 2..4, vec![ create_header(2), create_header(3) - ])), - CachedDataBatch::Blocks(Batch::new(None, 4..6, vec![ + ]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 4..6, vec![ create_block(4), create_block(5) - ])), - CachedDataBatch::Blocks(Batch::new(None, 6..8, vec![ + ]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 6..8, vec![ create_block(6), create_block(7) - ])), + ]))), ]; "multiple headers, multiple blocks with no empty ranges")] #[test_case(&[ create_header(0), @@ -400,16 +410,16 @@ mod tests { create_block(4), create_block(5) ], 3, 0..=5 => vec![ - CachedDataBatch::Headers(Batch::new(None, 0..3, vec![ + CachedDataBatch::Headers(Arc::new(Batch::new(None, 0..3, vec![ create_header(0), create_header(1), create_header(2) - ])), - CachedDataBatch::Blocks(Batch::new(None, 3..6, vec![ + ]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 3..6, vec![ create_block(3), create_block(4), create_block(5) - ])), + ]))), ]; "multiple headers, multiple blocks with no empty ranges and larger chunk size")] #[test_case(&[ create_header(0), @@ -418,14 +428,14 @@ mod tests { create_block(2), create_block(3) ], 2, 0..=3 => vec![ - CachedDataBatch::Headers(Batch::new(None, 0..2, vec![ + CachedDataBatch::Headers(Arc::new(Batch::new(None, 0..2, vec![ create_header(0), create_header(1) - ])), - CachedDataBatch::Blocks(Batch::new(None, 2..4, vec![ + ]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 2..4, vec![ create_block(2), create_block(3) - ])), + ]))), ]; "multiple headers, multiple blocks with no empty ranges and exact chunk size")] #[test_case(&[ create_header(0), @@ -436,44 +446,44 @@ mod tests { create_block(4), create_block(5) ], 1, 0..=5 => vec![ - CachedDataBatch::Headers(Batch::new(None, 0..1, vec![ + CachedDataBatch::Headers(Arc::new(Batch::new(None, 0..1, vec![ create_header(0) - ])), - CachedDataBatch::Headers(Batch::new(None, 1..2, vec![ + ]))), + CachedDataBatch::Headers(Arc::new(Batch::new(None, 1..2, vec![ create_header(1) - ])), - CachedDataBatch::Headers(Batch::new(None, 2..3, vec![ + ]))), + CachedDataBatch::Headers(Arc::new(Batch::new(None, 2..3, vec![ create_header(2) - ])), - CachedDataBatch::Blocks(Batch::new(None, 3..4, vec![ + ]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 3..4, vec![ create_block(3) - ])), - CachedDataBatch::Blocks(Batch::new(None, 4..5, vec![ + ]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 4..5, vec![ create_block(4) - ])), - CachedDataBatch::Blocks(Batch::new(None, 5..6, vec![ + ]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 5..6, vec![ create_block(5) - ])), + ]))), ]; "multiple headers, multiple blocks with max chunk size of 1")] #[test_case(&[ create_header(0) ], &[ create_block(1) ], 1, 0..=1 => vec![ - CachedDataBatch::Headers(Batch::new(None, 0..1, vec![ + CachedDataBatch::Headers(Arc::new(Batch::new(None, 0..1, vec![ create_header(0) - ])), - CachedDataBatch::Blocks(Batch::new(None, 1..2, vec![ + ]))), + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 1..2, vec![ create_block(1) - ])), + ]))), ]; "one header, one block with max chunk size of 1")] #[test_case(&[], &[ create_block(5) ], 1, 4..=6 => vec![ CachedDataBatch::None(4..5), - CachedDataBatch::Blocks(Batch::new(None, 5..6, vec![ + CachedDataBatch::Blocks(Arc::new(Batch::new(None, 5..6, vec![ create_block(5) - ])), + ]))), CachedDataBatch::None(6..7), ]; "one block in empty range sandwich with max chunk size of 1")] #[tokio::test] @@ -484,16 +494,16 @@ mod tests { asked_range: RangeInclusive, ) -> Vec { let mut cache = Cache::new(); - cache.insert_headers(Batch::new( + cache.insert_headers(Arc::new(Batch::new( None, 0..headers.len().try_into().unwrap(), headers.to_vec(), - )); - cache.insert_blocks(Batch::new( + ))); + cache.insert_blocks(Arc::new(Batch::new( None, 0..blocks.len().try_into().unwrap(), blocks.to_vec(), - )); + ))); cache .get_chunks(asked_range, NonZeroU32::try_from(max_chunk_size).unwrap()) .collect()