Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 28 additions & 25 deletions crates/services/sync/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ enum BlockHeaderData {
/// The headers (or full blocks) have been fetched and checked.
Cached(CachedDataBatch),
/// The headers has just been fetched from the network.
Fetched(Batch<SealedBlockHeader>),
Fetched(Arc<Batch<SealedBlockHeader>>),
}

impl<P, E, C> Import<P, E, C> {
Expand Down Expand Up @@ -167,8 +167,8 @@ impl<T> Batch<T> {
}
}

type SealedHeaderBatch = Batch<SealedBlockHeader>;
type SealedBlockBatch = Batch<SealedBlock>;
type SealedHeaderBatch = Arc<Batch<SealedBlockHeader>>;
type SealedBlockBatch = Arc<Batch<SealedBlock>>;

impl<P, E, C> Import<P, E, C>
where
Expand Down Expand Up @@ -330,7 +330,7 @@ where
peer,
range,
results,
} = batch;
} = batch.as_ref();

let mut done = vec![];
let mut shutdown = shutdown.clone();
Expand All @@ -341,7 +341,7 @@ where
_ = shutdown.while_started() => {
break;
},
res = execute_and_commit(executor.as_ref(), state, sealed_block) => {
res = execute_and_commit(executor.as_ref(), state, sealed_block.clone()) => {
cache.remove_element(&height);
res
},
Expand All @@ -363,10 +363,10 @@ where
let batch = Batch::new(peer.clone(), range.clone(), done);

if !batch.is_err() {
report_peer(p2p, peer, PeerReportReason::SuccessfulBlockImport);
report_peer(p2p, peer.clone(), PeerReportReason::SuccessfulBlockImport);
}

batch
Arc::new(batch)
}
.instrument(tracing::debug_span!("execute_and_commit"))
.in_current_span()
Expand Down Expand Up @@ -432,9 +432,9 @@ fn get_block_stream<
peer,
range,
results,
} = fetched_batch;
} = fetched_batch.as_ref();
let checked_headers = results
.into_iter()
.iter()
.take_while(|header| {
check_sealed_header(
header,
Expand All @@ -443,8 +443,9 @@ fn get_block_stream<
&consensus,
)
})
.cloned()
.collect::<Vec<_>>();
let batch = Batch::new(peer, range.clone(), checked_headers);
let batch = Arc::new(Batch::new(peer.clone(), range.clone(), checked_headers));
if !batch.is_err() {
cache.insert_headers(batch.clone());
}
Expand All @@ -470,9 +471,9 @@ fn get_block_stream<
peer,
range,
results,
} = batch;
} = batch.as_ref();
if results.is_empty() {
SealedBlockBatch::new(peer, range, vec![])
Arc::new(Batch::new(peer.clone(), range.clone(), vec![]))
} else {
await_da_height(
results
Expand All @@ -482,7 +483,7 @@ fn get_block_stream<
)
.await;
let headers =
SealedHeaderBatch::new(peer, range.clone(), results);
Arc::new(Batch::new(peer.clone(), range.clone(), results.to_vec()));
let batch = get_blocks(&p2p, headers).await;
if !batch.is_err() {
cache.insert_blocks(batch.clone());
Expand All @@ -492,7 +493,7 @@ fn get_block_stream<
}
BlockHeaderData::Cached(CachedDataBatch::None(_)) => {
tracing::error!("Cached data batch should never be created outside of the caching algorithm.");
Batch::new(None, 0..1, vec![])
Arc::new(Batch::new(None, 0..1, vec![]))
}
}
}
Expand Down Expand Up @@ -624,7 +625,7 @@ where
range.end
);
let Some(sourced_headers) = get_sealed_block_headers(range.clone(), p2p).await else {
return Batch::new(None, range, vec![])
return Arc::new(Batch::new(None, range, vec![]))
};
let SourcePeer {
peer_id,
Expand All @@ -647,7 +648,7 @@ where
PeerReportReason::MissingBlockHeaders,
);
}
Batch::new(Some(peer_id), range, headers)
Arc::new(Batch::new(Some(peer_id), range, headers))
}

fn report_peer<P>(p2p: &Arc<P>, peer_id: Option<PeerId>, reason: PeerReportReason)
Expand All @@ -674,27 +675,29 @@ where
results: headers,
range,
peer,
} = headers;
} = headers.as_ref();

let Some(SourcePeer {
peer_id,
data: transactions,
}) = get_transactions(range.clone(), peer.clone(), p2p).await
else {
return Batch::new(peer, range, vec![])
return Arc::new(Batch::new(peer.clone(), range.clone(), vec![]))
};

let iter = headers.into_iter().zip(transactions.into_iter());
let iter = headers.iter().zip(transactions.into_iter());
let mut blocks = vec![];
for (block_header, transactions) in iter {
let SealedBlockHeader {
consensus,
entity: header,
} = block_header;
let block =
Block::try_from_executed(header, transactions.0).map(|block| SealedBlock {
entity: block,
consensus,
Block::try_from_executed(header.clone(), transactions.0).map(|block| {
SealedBlock {
entity: block,
consensus: consensus.clone(),
}
});
if let Some(block) = block {
blocks.push(block);
Expand All @@ -707,7 +710,7 @@ where
break
}
}
Batch::new(Some(peer_id), range, blocks)
Arc::new(Batch::new(Some(peer_id), range.clone(), blocks))
}

#[tracing::instrument(
Expand Down Expand Up @@ -774,9 +777,9 @@ impl<S> ScanNone<S> {
}

impl<S> ScanErr<S> {
fn scan_err<'a, T: 'a>(self) -> impl Stream<Item = Batch<T>> + 'a
fn scan_err<'a, T: 'a>(self) -> impl Stream<Item = Arc<Batch<T>>> + 'a
where
S: Stream<Item = Batch<T>> + Send + 'a,
S: Stream<Item = Arc<Batch<T>>> + Send + 'a,
{
let stream = self.0.boxed::<'a>();
futures::stream::unfold((false, stream), |(mut err, mut stream)| async move {
Expand Down
Loading
Loading