Skip to content

Commit 689b9d9

Browse files
authored
cumulus-pov-recovery: check pov_hash instead of reencoding data (#2287)
Collators were previously reencoding the available data and checking the erasure root. Replace that with just checking the PoV hash, which consumes much less CPU and takes less time. We also don't need to check the `PersistedValidationData` hash, as collators don't use it. Reason: #575 (comment) After systematic chunks recovery is merged, collators will no longer do any reed-solomon encoding/decoding, which has proven to be a great CPU consumer. Signed-off-by: alindima <[email protected]>
1 parent 8d26379 commit 689b9d9

File tree

5 files changed

+141
-91
lines changed

5 files changed

+141
-91
lines changed

cumulus/client/pov-recovery/src/active_candidate_recovery.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
use sp_runtime::traits::Block as BlockT;
1818

19-
use polkadot_node_primitives::AvailableData;
19+
use polkadot_node_primitives::PoV;
2020
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
2121

2222
use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};
2323

24-
use std::{collections::HashSet, pin::Pin};
24+
use std::{collections::HashSet, pin::Pin, sync::Arc};
2525

2626
use crate::RecoveryHandle;
2727

@@ -30,9 +30,8 @@ use crate::RecoveryHandle;
3030
/// This handles the candidate recovery and tracks the activate recoveries.
3131
pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
3232
/// The recoveries that are currently being executed.
33-
recoveries: FuturesUnordered<
34-
Pin<Box<dyn Future<Output = (Block::Hash, Option<AvailableData>)> + Send>>,
35-
>,
33+
recoveries:
34+
FuturesUnordered<Pin<Box<dyn Future<Output = (Block::Hash, Option<Arc<PoV>>)> + Send>>>,
3635
/// The block hashes of the candidates currently being recovered.
3736
candidates: HashSet<Block::Hash>,
3837
recovery_handle: Box<dyn RecoveryHandle>,
@@ -68,7 +67,7 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
6867
self.recoveries.push(
6968
async move {
7069
match rx.await {
71-
Ok(Ok(res)) => (block_hash, Some(res)),
70+
Ok(Ok(res)) => (block_hash, Some(res.pov)),
7271
Ok(Err(error)) => {
7372
tracing::debug!(
7473
target: crate::LOG_TARGET,
@@ -93,8 +92,8 @@ impl<Block: BlockT> ActiveCandidateRecovery<Block> {
9392

9493
/// Waits for the next recovery.
9594
///
96-
/// If the returned [`AvailableData`] is `None`, it means that the recovery failed.
97-
pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<AvailableData>) {
95+
/// If the returned [`PoV`] is `None`, it means that the recovery failed.
96+
pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<Arc<PoV>>) {
9897
loop {
9998
if let Some(res) = self.recoveries.next().await {
10099
self.candidates.remove(&res.0);

cumulus/client/pov-recovery/src/lib.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
5151
use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
5252
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
5353

54-
use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT};
54+
use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
5555
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
5656
use polkadot_overseer::Handle as OverseerHandle;
5757
use polkadot_primitives::{
@@ -346,15 +346,11 @@ where
346346
}
347347

348348
/// Handle a recovered candidate.
349-
async fn handle_candidate_recovered(
350-
&mut self,
351-
block_hash: Block::Hash,
352-
available_data: Option<AvailableData>,
353-
) {
354-
let available_data = match available_data {
355-
Some(data) => {
349+
async fn handle_candidate_recovered(&mut self, block_hash: Block::Hash, pov: Option<&PoV>) {
350+
let pov = match pov {
351+
Some(pov) => {
356352
self.candidates_in_retry.remove(&block_hash);
357-
data
353+
pov
358354
},
359355
None =>
360356
if self.candidates_in_retry.insert(block_hash) {
@@ -373,18 +369,16 @@ where
373369
},
374370
};
375371

376-
let raw_block_data = match sp_maybe_compressed_blob::decompress(
377-
&available_data.pov.block_data.0,
378-
POV_BOMB_LIMIT,
379-
) {
380-
Ok(r) => r,
381-
Err(error) => {
382-
tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
372+
let raw_block_data =
373+
match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
374+
Ok(r) => r,
375+
Err(error) => {
376+
tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
383377

384-
self.reset_candidate(block_hash);
385-
return
386-
},
387-
};
378+
self.reset_candidate(block_hash);
379+
return
380+
},
381+
};
388382

389383
let block_data = match ParachainBlockData::<Block>::decode(&mut &raw_block_data[..]) {
390384
Ok(d) => d,
@@ -595,10 +589,10 @@ where
595589
next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
596590
self.recover_candidate(next_to_recover).await;
597591
},
598-
(block_hash, available_data) =
592+
(block_hash, pov) =
599593
self.active_candidate_recovery.wait_for_recovery().fuse() =>
600594
{
601-
self.handle_candidate_recovered(block_hash, available_data).await;
595+
self.handle_candidate_recovered(block_hash, pov.as_deref()).await;
602596
},
603597
}
604598
}

cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ fn build_overseer(
102102
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
103103
let builder = Overseer::builder()
104104
.availability_distribution(DummySubsystem)
105-
.availability_recovery(AvailabilityRecoverySubsystem::with_availability_store_skip(
105+
.availability_recovery(AvailabilityRecoverySubsystem::for_collator(
106106
available_data_req_receiver,
107107
Metrics::register(registry)?,
108108
))

polkadot/node/network/availability-recovery/src/lib.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ pub struct AvailabilityRecoverySubsystem {
105105
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
106106
/// Metrics for this subsystem.
107107
metrics: Metrics,
108+
/// The type of check to perform after available data was recovered.
109+
post_recovery_check: PostRecoveryCheck,
110+
}
111+
112+
#[derive(Clone, PartialEq, Debug)]
113+
/// The type of check to perform after available data was recovered.
114+
pub enum PostRecoveryCheck {
115+
/// Reencode the data and check erasure root. For validators.
116+
Reencode,
117+
/// Only check the pov hash. For collators only.
118+
PovHash,
108119
}
109120

110121
/// Expensive erasure coding computations that we want to run on a blocking thread.
@@ -344,6 +355,7 @@ async fn launch_recovery_task<Context>(
344355
metrics: &Metrics,
345356
recovery_strategies: VecDeque<Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>>,
346357
bypass_availability_store: bool,
358+
post_recovery_check: PostRecoveryCheck,
347359
) -> error::Result<()> {
348360
let candidate_hash = receipt.hash();
349361
let params = RecoveryParams {
@@ -354,6 +366,8 @@ async fn launch_recovery_task<Context>(
354366
erasure_root: receipt.descriptor.erasure_root,
355367
metrics: metrics.clone(),
356368
bypass_availability_store,
369+
post_recovery_check,
370+
pov_hash: receipt.descriptor.pov_hash,
357371
};
358372

359373
let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies);
@@ -390,6 +404,7 @@ async fn handle_recover<Context>(
390404
erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
391405
recovery_strategy_kind: RecoveryStrategyKind,
392406
bypass_availability_store: bool,
407+
post_recovery_check: PostRecoveryCheck,
393408
) -> error::Result<()> {
394409
let candidate_hash = receipt.hash();
395410

@@ -486,6 +501,7 @@ async fn handle_recover<Context>(
486501
metrics,
487502
recovery_strategies,
488503
bypass_availability_store,
504+
post_recovery_check,
489505
)
490506
.await
491507
},
@@ -527,15 +543,17 @@ async fn query_chunk_size<Context>(
527543

528544
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
529545
impl AvailabilityRecoverySubsystem {
530-
/// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the
531-
/// `AvailabilityStoreSubsystem` subsystem.
532-
pub fn with_availability_store_skip(
546+
/// Create a new instance of `AvailabilityRecoverySubsystem` suitable for collator nodes,
547+
/// which never requests the `AvailabilityStoreSubsystem` subsystem and only checks the POV hash
548+
/// instead of reencoding the available data.
549+
pub fn for_collator(
533550
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
534551
metrics: Metrics,
535552
) -> Self {
536553
Self {
537554
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
538555
bypass_availability_store: true,
556+
post_recovery_check: PostRecoveryCheck::PovHash,
539557
req_receiver,
540558
metrics,
541559
}
@@ -550,6 +568,7 @@ impl AvailabilityRecoverySubsystem {
550568
Self {
551569
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstAlways,
552570
bypass_availability_store: false,
571+
post_recovery_check: PostRecoveryCheck::Reencode,
553572
req_receiver,
554573
metrics,
555574
}
@@ -563,6 +582,7 @@ impl AvailabilityRecoverySubsystem {
563582
Self {
564583
recovery_strategy_kind: RecoveryStrategyKind::ChunksAlways,
565584
bypass_availability_store: false,
585+
post_recovery_check: PostRecoveryCheck::Reencode,
566586
req_receiver,
567587
metrics,
568588
}
@@ -577,15 +597,21 @@ impl AvailabilityRecoverySubsystem {
577597
Self {
578598
recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(SMALL_POV_LIMIT),
579599
bypass_availability_store: false,
600+
post_recovery_check: PostRecoveryCheck::Reencode,
580601
req_receiver,
581602
metrics,
582603
}
583604
}
584605

585606
async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
586607
let mut state = State::default();
587-
let Self { mut req_receiver, metrics, recovery_strategy_kind, bypass_availability_store } =
588-
self;
608+
let Self {
609+
mut req_receiver,
610+
metrics,
611+
recovery_strategy_kind,
612+
bypass_availability_store,
613+
post_recovery_check,
614+
} = self;
589615

590616
let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
591617
let mut erasure_task_rx = erasure_task_rx.fuse();
@@ -675,7 +701,8 @@ impl AvailabilityRecoverySubsystem {
675701
&metrics,
676702
erasure_task_tx.clone(),
677703
recovery_strategy_kind.clone(),
678-
bypass_availability_store
704+
bypass_availability_store,
705+
post_recovery_check.clone()
679706
).await {
680707
gum::warn!(
681708
target: LOG_TARGET,

0 commit comments

Comments
 (0)