From dcd9d9a83248f7a7391f78d0978221ed101a7d67 Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Wed, 12 Nov 2025 10:08:18 -0500 Subject: [PATCH 01/16] core info, get backable candidates, hack av-store --- Cargo.lock | 1 + polkadot/node/core/av-store/src/lib.rs | 29 +- .../src/requester/fetch_task/mod.rs | 44 ++- .../src/requester/mod.rs | 232 +++++++++++++++- polkadot/node/overseer/src/lib.rs | 2 + polkadot/node/subsystem-util/Cargo.toml | 1 + polkadot/node/subsystem-util/src/lib.rs | 258 +++++++++++++++++- 7 files changed, 552 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d129f4eea452c..f3d5136d3dced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16054,6 +16054,7 @@ name = "polkadot-node-subsystem-util" version = "7.0.0" dependencies = [ "assert_matches", + "bitvec", "fatality", "futures", "itertools 0.11.0", diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 9fd221ebee979..76b4115ccebc8 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -1249,7 +1249,24 @@ fn store_chunk( let mut meta = match load_meta(db, config, &candidate_hash)? { Some(m) => m, - None => return Ok(false), // we weren't informed of this candidate by import events. + None => { + // TODO: revise this, this is a really bad hack + let now = SystemTime::now().duration_since(UNIX_EPOCH)?; + let meta = CandidateMeta { + state: State::Unavailable(now.into()), + data_available: false, + chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; 12], + }; + + gum::debug!( + target: LOG_TARGET, + ?candidate_hash, + validator_index = %validator_index.0, + "Cannot store chunk for unknown candidate.", + ); + // return Ok(false) // we weren't informed of this candidate by import events. + meta + }, }; match meta.chunks_stored.get(validator_index.0 as usize).map(|b| *b) { @@ -1260,7 +1277,15 @@ fn store_chunk( write_chunk(&mut tx, config, &candidate_hash, validator_index, &chunk); write_meta(&mut tx, config, &candidate_hash, &meta); }, - None => return Ok(false), // out of bounds. + None => { + gum::debug!( + target: LOG_TARGET, + ?candidate_hash, + validator_index = %validator_index.0, + "Cannot store chunk for unknown validator index.", + ); + return Ok(false) // out of bounds. + }, } gum::debug!( diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 4a68349d398fe..167ec47b2b5bc 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -43,7 +43,10 @@ use sc_network::ProtocolName; use crate::{ error::{FatalError, Result}, metrics::{Metrics, FAILED, SUCCEEDED}, - requester::session_cache::{BadValidators, SessionInfo}, + requester::{ + session_cache::{BadValidators, SessionInfo}, + CoreInfo, + }, LOG_TARGET, }; @@ -145,7 +148,7 @@ impl FetchTaskConfig { /// The result of this function can be passed into [`FetchTask::start`]. pub fn new( leaf: Hash, - core: &OccupiedCore, + core: &CoreInfo, sender: mpsc::Sender, metrics: Metrics, session_info: &SessionInfo, @@ -170,8 +173,8 @@ impl FetchTaskConfig { candidate_hash: core.candidate_hash, index: session_info.our_index, }, - erasure_root: core.candidate_descriptor.erasure_root(), - relay_parent: core.candidate_descriptor.relay_parent(), + erasure_root: core.erasure_root, + relay_parent: core.relay_parent, metrics, sender, chunk_index, @@ -310,15 +313,48 @@ impl RunningTask { }, }; + gum::info!( + target: LOG_TARGET, + validator = ?validator, + relay_parent = ?self.relay_parent, + group_index = ?self.group_index, + session_index = ?self.session_index, + chunk_index = ?self.request.index, + candidate_hash = ?self.request.candidate_hash, + "Received erasure chunk from validator", + ); + // Data genuine? if !self.validate_chunk(&validator, &chunk, self.chunk_index) { bad_validators.push(validator); continue } + gum::info!( + target: LOG_TARGET, + validator = ?validator, + relay_parent = ?self.relay_parent, + group_index = ?self.group_index, + session_index = ?self.session_index, + chunk_index = ?self.request.index, + candidate_hash = ?self.request.candidate_hash, + "Erasure chunk validated successfully", + ); + // Ok, let's store it and be happy: self.store_chunk(chunk).await; succeeded = true; + + gum::info!( + target: LOG_TARGET, + validator = ?validator, + relay_parent = ?self.relay_parent, + group_index = ?self.group_index, + session_index = ?self.session_index, + chunk_index = ?self.request.index, + candidate_hash = ?self.request.candidate_hash, + "Erasure chunk stored successfully", + ); break } if succeeded { diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 2338250327246..b44f7a322b707 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -31,16 +31,20 @@ use futures::{ use polkadot_node_network_protocol::request_response::{v1, v2, IsRequest, ReqProtocolNames}; use polkadot_node_subsystem::{ - messages::{ChainApiMessage, RuntimeApiMessage}, + messages::{ChainApiMessage, RuntimeApiMessage, CandidateBackingMessage}, overseer, ActivatedLeaf, ActiveLeavesUpdate, + SubsystemSender, }; use polkadot_node_subsystem_util::{ availability_chunks::availability_chunk_index, runtime::{get_occupied_cores, RuntimeInfo}, + request_availability_cores, + request_backable_candidates, + request_validator_groups, }; -use polkadot_primitives::{CandidateHash, CoreIndex, Hash, OccupiedCore, SessionIndex}; +use polkadot_primitives::{BackedCandidate, CandidateHash, CoreIndex, GroupIndex, Hash, OccupiedCore, SessionIndex}; -use super::{FatalError, Metrics, Result, LOG_TARGET}; +use super::{FatalError, Metrics, Result, LOG_TARGET, error::Error}; #[cfg(test)] mod tests; @@ -53,6 +57,22 @@ use session_cache::SessionCache; mod fetch_task; use fetch_task::{FetchTask, FetchTaskConfig, FromFetchTask}; +/// A compact representation of a parachain candidate core's essential information, +/// used to streamline chunk-fetching tasks. This structure normalizes data from both +/// occupied and scheduled cores into a unified format containing only the fields +/// necessary for chunk fetching and validation. +#[derive(Debug, Clone)] +struct CoreInfo { + /// The candidate hash. + candidate_hash: CandidateHash, + /// The relay parent of the candidate. + relay_parent: Hash, + /// The root hash of the erasure coded chunks for the candidate. + erasure_root: Hash, + /// The group index of the group responsible for the candidate. + group_responsible: GroupIndex, +} + /// Requester takes care of requesting erasure chunks from backing groups and stores them in the /// av store. /// @@ -144,14 +164,134 @@ impl Requester { ) .await?; + let mut scheduled_cores = Vec::new(); // Also spawn or bump tasks for candidates in ancestry in the same session. for hash in std::iter::once(leaf).chain(ancestors_in_session) { + if hash == leaf { + let sender = &mut ctx.sender().clone(); + + // let bitfields = select_availability + let availability_cores = request_availability_cores(leaf, sender) + .await + .await; + + if availability_cores.is_err() { + return Err(Error::NoSuchPoV) + } + + let cores = availability_cores.unwrap().unwrap(); + + // let bitfields = select_availability_bitfields(&availability_cores, bitfields, &leaf.hash); + let backable_candidates = request_backable_candidates(&cores, None, &new_head, sender).await; + + if backable_candidates.is_err() { + return Err(Error::NoSuchPoV) + } + + let backable = backable_candidates.unwrap(); + + gum::info!( + target: LOG_TARGET, + backable = ?backable, + "Backable candidates" + ); + + let sender = &mut ctx.sender().clone(); + // now get the backed candidates corresponding to these candidate receipts + let (tx, rx) = oneshot::channel(); + sender.send_message(CandidateBackingMessage::GetBackableCandidates( + backable.clone(), + tx, + )).await; + + let candidates = rx.await; + if candidates.is_err() { + return Err(Error::NoSuchPoV) + } + + let unwrapped = candidates.unwrap(); + gum::info!( + target: LOG_TARGET, + leaf_hash=?leaf, + candidates=?unwrapped, + "Got {} backed candidates", unwrapped.len() + ); + + // Process candidates and collect cores + // let mut scheduled_cores = Vec::new(); + for (para_id, candidates) in unwrapped.iter() { + for candidate in candidates { + let (_, core_index) = candidate.validator_indices_and_core_index(); + let Some(core_index) = core_index else { continue }; + + let receipt = candidate.candidate(); + + gum::info!( + target: LOG_TARGET, + para_id = ?para_id, + candidate_hash = ?receipt.hash(), + core_index = ?core_index, + "Scheduling candidate for core" + ); + + let group_responsible = get_group_index_for_backed_candidate(candidate, leaf, sender).await?; + + let core = ( + core_index, + CoreInfo { + candidate_hash: receipt.hash(), + relay_parent: receipt.descriptor.relay_parent(), + erasure_root: receipt.descriptor.erasure_root(), + group_responsible: group_responsible, + }, + ); + gum::info!( + target: LOG_TARGET, + ?core, + "Scheduled core info" + ); + scheduled_cores.push(core); + } + } + } + let cores = get_occupied_cores(sender, hash).await?; gum::trace!( target: LOG_TARGET, occupied_cores = ?cores, "Query occupied core" ); + + let cores = cores + .into_iter() + .map(|(index, occ)| { + gum::info!( + target: LOG_TARGET, + index = ?index, + candidate_hash = ?occ.candidate_hash, + next_up_on_available = ?occ.next_up_on_available, + leaf = ?leaf, + "Scheduled parachain candidate" + ); + + let core = ( + index, + CoreInfo { + candidate_hash: occ.candidate_hash, + relay_parent: occ.candidate_descriptor.relay_parent(), + erasure_root: occ.candidate_descriptor.erasure_root(), + group_responsible: occ.group_responsible, + }, + ); + gum::info!( + target: LOG_TARGET, + ?core, + "Occupied core info" + ); + core + }) + .collect::>(); + // Important: // We mark the whole ancestry as live in the **leaf** hash, so we don't need to track // any tasks separately. @@ -161,6 +301,14 @@ impl Requester { // leaf being deactivated. self.add_cores(ctx, runtime, leaf, leaf_session_index, cores).await?; } + if scheduled_cores.len() > 0 { + gum::info!( + target: LOG_TARGET, + ?scheduled_cores, + "Adding scheduled cores" + ); + self.add_cores(ctx, runtime, leaf, leaf_session_index, scheduled_cores).await?; + } Ok(()) } @@ -168,9 +316,17 @@ impl Requester { /// Stop requesting chunks for obsolete heads. fn stop_requesting_chunks(&mut self, obsolete_leaves: impl Iterator) { let obsolete_leaves: HashSet<_> = obsolete_leaves.collect(); - self.fetches.retain(|_, task| { + self.fetches.retain(|candidate_hash, task| { task.remove_leaves(&obsolete_leaves); - task.is_live() + let is_live = task.is_live(); + if !is_live { + gum::info!( + target: LOG_TARGET, + ?candidate_hash, + "Removing fetch task as no longer live" + ); + } + is_live }) } @@ -187,7 +343,7 @@ impl Requester { runtime: &mut RuntimeInfo, leaf: Hash, leaf_session_index: SessionIndex, - cores: impl IntoIterator, + cores: impl IntoIterator, ) -> Result<()> { for (core_index, core) in cores { if let Some(e) = self.fetches.get_mut(&core.candidate_hash) { @@ -266,6 +422,11 @@ impl Stream for Requester { Poll::Ready(Some(FromFetchTask::Concluded(None))) => continue, Poll::Ready(Some(FromFetchTask::Failed(candidate_hash))) => { // Make sure we retry on next block still pending availability. + gum::info!( + target: LOG_TARGET, + ?candidate_hash, + "Fetch task failed, removing from active fetches to allow retrying" + ); self.fetches.remove(&candidate_hash); }, Poll::Ready(None) => return Poll::Ready(None), @@ -347,3 +508,62 @@ where .map_err(FatalError::ChainApi)?; Ok(ancestors) } + +// Assuming you have the backed_candidate and relay_parent +async fn get_group_index_for_backed_candidate( + backed_candidate: &BackedCandidate, + relay_parent: Hash, + sender: &mut Sender, +) -> Result +where + Sender: overseer::SubsystemSender + overseer::SubsystemSender, +{ + let para_id = backed_candidate.receipt().descriptor.para_id(); + + let (validator_groups, mut rotation_info) = request_validator_groups(relay_parent, sender) + .await + .await + .map_err(|e| { + gum::error!( + target: LOG_TARGET, + ?para_id, + error = ?e, + "Error retrieving validator groups", + ); + Error::NoSuchPoV + })? + .map_err(|e| { + gum::error!( + target: LOG_TARGET, + ?para_id, + error = ?e, + "Error retrieving validator groups", + ); + Error::NoSuchPoV + })?; + + // Get the block number for the relay_parent to update rotation_info.now + let (tx, rx) = oneshot::channel(); + sender.send_message(ChainApiMessage::BlockHeader(relay_parent, tx)).await; + let header = rx.await.map_err(|_| Error::NoSuchPoV)?.map_err(|_| Error::NoSuchPoV)?; + let block_number = header.map(|h| h.number).unwrap_or(0); // Default to 0 if not found, but should be found + + rotation_info.now = block_number.into(); // Update now to the relay_parent's block number + + let (_, core_index)= backed_candidate.validator_indices_and_core_index(); + let core_index = match core_index { + Some(index) => index, + None => { + gum::error!( + target: LOG_TARGET, + ?para_id, + "No core index found for backed candidate", + ); + return Err(Error::NoSuchPoV) + } + }; + + let cores = validator_groups.len(); + + Ok(rotation_info.group_for_core(core_index, cores)) +} \ No newline at end of file diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 83709e73027b9..5857ad3c8f0e4 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -520,6 +520,8 @@ pub struct Overseer { ChainApiMessage, RuntimeApiMessage, NetworkBridgeTxMessage, + ProspectiveParachainsMessage, + CandidateBackingMessage ])] availability_distribution: AvailabilityDistribution, diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml index e8cf11c2a83f6..b8c33f12712c3 100644 --- a/polkadot/node/subsystem-util/Cargo.toml +++ b/polkadot/node/subsystem-util/Cargo.toml @@ -12,6 +12,7 @@ repository.workspace = true workspace = true [dependencies] +bitvec = { workspace = true } codec = { features = ["derive"], workspace = true } fatality = { workspace = true } futures = { workspace = true } diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 93cdc84443aa2..74a497aef3b0b 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -40,21 +40,24 @@ pub use polkadot_node_metrics::{metrics, Metronome}; use codec::Encode; use futures::channel::{mpsc, oneshot}; +use bitvec::vec::BitVec; +use bitvec; + use polkadot_primitives::{ async_backing::{BackingState, Constraints}, slashing, AsyncBackingParams, AuthorityDiscoveryId, CandidateEvent, CandidateHash, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, CoreState, EncodeAs, ExecutorParams, GroupIndex, GroupRotationInfo, Hash, Id as ParaId, NodeFeatures, OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, SessionIndex, - SessionInfo, Signed, SigningContext, ValidationCode, ValidationCodeHash, ValidatorId, - ValidatorIndex, ValidatorSignature, + SessionInfo, Signed, SignedAvailabilityBitfield, SigningContext, ValidationCode, + ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, }; pub use rand; use sp_application_crypto::AppCrypto; use sp_core::ByteArray; use sp_keystore::{Error as KeystoreError, KeystorePtr}; use std::{ - collections::{BTreeMap, VecDeque}, + collections::{BTreeMap, HashMap, VecDeque}, time::Duration, }; use thiserror::Error; @@ -98,6 +101,10 @@ mod determine_new_blocks; mod controlled_validator_indices; pub use controlled_validator_indices::ControlledValidatorIndices; +use polkadot_node_subsystem_types::{ + messages::{Ancestors, ProspectiveParachainsMessage}, + ActivatedLeaf, +}; #[cfg(test)] mod tests; @@ -139,6 +146,10 @@ pub enum Error { /// Data that are supposed to be there a not there #[error("Data are not available")] DataNotAvailable, + /// Attempted to get backable candidates from prospective parachains but the request was + /// canceled. + #[error("failed to get backable candidates from prospective parachains")] + CanceledBackableCandidates(#[source] oneshot::Canceled), } impl From for Error { @@ -543,3 +554,244 @@ impl Validator { Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key) } } + +/// In general, we want to pick all the bitfields. However, we have the following constraints: +/// +/// - not more than one per validator +/// - each 1 bit must correspond to an occupied core +/// +/// If we have too many, an arbitrary selection policy is fine. For purposes of maximizing +/// availability, we pick the one with the greatest number of 1 bits. +/// +/// Note: This does not enforce any sorting precondition on the output; the ordering there will be +/// unrelated to the sorting of the input. +pub fn select_availability_bitfields( + cores: &[CoreState], + bitfields: &[SignedAvailabilityBitfield], + leaf_hash: &Hash, +) -> Vec { + let mut selected: BTreeMap = BTreeMap::new(); + + gum::debug!( + target: LOG_TARGET, + bitfields_count = bitfields.len(), + ?leaf_hash, + "bitfields count before selection" + ); + + 'a: for bitfield in bitfields.iter().cloned() { + if bitfield.payload().0.len() != cores.len() { + gum::debug!(target: LOG_TARGET, ?leaf_hash, "dropping bitfield due to length mismatch"); + continue + } + + let is_better = selected + .get(&bitfield.validator_index()) + .map_or(true, |b| b.payload().0.count_ones() < bitfield.payload().0.count_ones()); + + if !is_better { + gum::trace!( + target: LOG_TARGET, + val_idx = bitfield.validator_index().0, + ?leaf_hash, + "dropping bitfield due to duplication - the better one is kept" + ); + continue + } + + for (idx, _) in cores.iter().enumerate().filter(|v| !v.1.is_occupied()) { + // Bit is set for an unoccupied core - invalid + if *bitfield.payload().0.get(idx).as_deref().unwrap_or(&false) { + gum::debug!( + target: LOG_TARGET, + val_idx = bitfield.validator_index().0, + ?leaf_hash, + "dropping invalid bitfield - bit is set for an unoccupied core" + ); + continue 'a + } + } + + let _ = selected.insert(bitfield.validator_index(), bitfield); + } + + gum::debug!( + target: LOG_TARGET, + ?leaf_hash, + "selected {} of all {} bitfields (each bitfield is from a unique validator)", + selected.len(), + bitfields.len() + ); + + selected.into_values().collect() +} + +/// Requests backable candidates from Prospective Parachains subsystem +/// based on core states. +pub async fn request_backable_candidates( + availability_cores: &[CoreState], + bitfields: Option<&[SignedAvailabilityBitfield]>, + relay_parent: &ActivatedLeaf, + sender: &mut impl overseer::SubsystemSender, +) -> Result>, Error> { + let block_number_under_construction = relay_parent.number + 1; + + // Record how many cores are scheduled for each paraid. Use a BTreeMap because + // we'll need to iterate through them. + let mut scheduled_cores_per_para: BTreeMap = BTreeMap::new(); + // The on-chain ancestors of a para present in availability-cores. + let mut ancestors: HashMap = + HashMap::with_capacity(availability_cores.len()); + + for (core_idx, core) in availability_cores.iter().enumerate() { + let core_idx = CoreIndex(core_idx as u32); + match core { + CoreState::Scheduled(scheduled_core) => { + *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; + }, + CoreState::Occupied(occupied_core) => { + let is_available = match bitfields { + Some(bitfields) => { + // If bitfields are provided, check if the core is available. + bitfields_indicate_availability( + core_idx.0 as usize, + bitfields, + &occupied_core.availability, + ) + }, + None => { + gum::debug!( + target: LOG_TARGET, + leaf_hash = ?relay_parent.hash, + ?core_idx, + "No availability bitfields provided, assuming core is available" + ); + true + }, + }; + + if is_available { + ancestors + .entry(occupied_core.para_id()) + .or_default() + .insert(occupied_core.candidate_hash); + + if let Some(ref scheduled_core) = occupied_core.next_up_on_available { + // Request a new backable candidate for the newly scheduled para id. + *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; + } + } else if occupied_core.time_out_at <= block_number_under_construction { + // Timed out before being available. + + if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out { + // Candidate's availability timed out, practically same as scheduled. + *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; + } + } else { + // Not timed out and not available. + ancestors + .entry(occupied_core.para_id()) + .or_default() + .insert(occupied_core.candidate_hash); + } + }, + CoreState::Free => continue, + }; + } + + let mut selected_candidates: HashMap> = + HashMap::with_capacity(scheduled_cores_per_para.len()); + + for (para_id, core_count) in scheduled_cores_per_para { + let para_ancestors = ancestors.remove(¶_id).unwrap_or_default(); + + let response = get_backable_candidates( + relay_parent.hash, + para_id, + para_ancestors, + core_count as u32, + sender, + ) + .await?; + + if response.is_empty() { + gum::debug!( + target: LOG_TARGET, + leaf_hash = ?relay_parent.hash, + ?para_id, + "No backable candidate returned by prospective parachains", + ); + continue; + } + + selected_candidates.insert(para_id, response); + } + + Ok(selected_candidates) +} + +/// The availability bitfield for a given core. +pub type CoreAvailability = BitVec; + +/// The availability bitfield for a given core is the transpose +/// of a set of signed availability bitfields. It goes like this: +/// +/// - construct a transverse slice along `core_idx` +/// - bitwise-or it with the availability slice +/// - count the 1 bits, compare to the total length; true on 2/3+ +fn bitfields_indicate_availability( + core_idx: usize, + bitfields: &[SignedAvailabilityBitfield], + availability: &CoreAvailability, +) -> bool { + let mut availability = availability.clone(); + let availability_len = availability.len(); + + for bitfield in bitfields { + let validator_idx = bitfield.validator_index().0 as usize; + match availability.get_mut(validator_idx) { + None => { + // in principle, this function might return a `Result` so that we can + // more clearly express this error condition however, in practice, that would just + // push off an error-handling routine which would look a whole lot like this one. + // simpler to just handle the error internally here. + gum::warn!( + target: LOG_TARGET, + validator_idx = %validator_idx, + availability_len = %availability_len, + "attempted to set a transverse bit at idx {} which is greater than bitfield size {}", + validator_idx, + availability_len, + ); + + return false; + }, + Some(mut bit_mut) => *bit_mut |= bitfield.payload().0[core_idx], + } + } + + 3 * availability.count_ones() >= 2 * availability.len() +} + +/// Requests backable candidates from Prospective Parachains based on +/// the given ancestors in the fragment chain. The ancestors may not be ordered. +async fn get_backable_candidates( + relay_parent: Hash, + para_id: ParaId, + ancestors: Ancestors, + count: u32, + sender: &mut impl overseer::SubsystemSender, +) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + sender + .send_message(ProspectiveParachainsMessage::GetBackableCandidates( + relay_parent, + para_id, + count, + ancestors, + tx, + )) + .await; + + rx.await.map_err(Error::CanceledBackableCandidates) +} \ No newline at end of file From 9fedb163492d8e4a4f9c8bd49c8c1cb9b963d5a1 Mon Sep 17 00:00:00 2001 From: Tim Wu Date: Mon, 17 Nov 2025 14:27:34 +0000 Subject: [PATCH 02/16] add new av store message type, send message for early fetches --- polkadot/node/core/av-store/src/lib.rs | 44 +++++++++++++++++++ .../src/requester/mod.rs | 16 ++++++- polkadot/node/subsystem-types/src/messages.rs | 9 ++++ 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 76b4115ccebc8..7466f2a5dad05 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -1232,6 +1232,24 @@ fn process_message( }, } }, + AvailabilityStoreMessage::NoteBackableCandidate{ candidate_hash, tx } => { + let res = note_backable_candidate( + &subsystem.db, + &subsystem.config, + candidate_hash, + subsystem, + ); + + match res { + Ok(_) => { + let _ = tx.send(Ok(())); + }, + Err(e) => { + let _ = tx.send(Err(())); + return Err(e) + }, + } + }, } Ok(()) @@ -1430,3 +1448,29 @@ fn prune_all(db: &Arc, config: &Config, now: Duration) -> Result<( db.write(tx)?; Ok(()) } + +fn note_backable_candidate( + db: &Arc, + config: &Config, + candidate_hash: CandidateHash, + subsystem: &AvailabilityStoreSubsystem, +) -> Result<(), Error> { + let mut tx = DBTransaction::new(); + + if load_meta(db, config, &candidate_hash)?.is_none() { + let now = subsystem.clock.now()?; + let meta = CandidateMeta { + state: State::Unavailable(now.into()), + data_available: false, + chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; 12], + }; + + let prune_at = now + KEEP_UNAVAILABLE_FOR; + + write_pruning_key(&mut tx, config, prune_at, &candidate_hash); + write_meta(&mut tx, config, &candidate_hash, &meta); + } + + db.write(tx)?; + Ok(()) +} \ No newline at end of file diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index b44f7a322b707..6f8b343ccc8a2 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -31,7 +31,7 @@ use futures::{ use polkadot_node_network_protocol::request_response::{v1, v2, IsRequest, ReqProtocolNames}; use polkadot_node_subsystem::{ - messages::{ChainApiMessage, RuntimeApiMessage, CandidateBackingMessage}, + messages::{ChainApiMessage, RuntimeApiMessage, CandidateBackingMessage, AvailabilityStoreMessage}, overseer, ActivatedLeaf, ActiveLeavesUpdate, SubsystemSender, }; @@ -181,7 +181,6 @@ impl Requester { let cores = availability_cores.unwrap().unwrap(); - // let bitfields = select_availability_bitfields(&availability_cores, bitfields, &leaf.hash); let backable_candidates = request_backable_candidates(&cores, None, &new_head, sender).await; if backable_candidates.is_err() { @@ -307,6 +306,19 @@ impl Requester { ?scheduled_cores, "Adding scheduled cores" ); + let sender = &mut ctx.sender().clone(); + for (_, core_info) in &scheduled_cores { + let (tx, rx) = oneshot::channel(); + sender + .send_message( + AvailabilityStoreMessage::NoteBackableCandidate{ candidate_hash: core_info.candidate_hash, tx }, + ) + .await; + if let Err(err) = rx.await { + gum::error!(target: LOG_TARGET, "Sending NoteBackableCandidate message failed: {:?}", err); + } + + } self.add_cores(ctx, runtime, leaf, leaf_session_index, scheduled_cores).await?; } diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 8805a330a99f6..d6ac4d5882c52 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -572,6 +572,15 @@ pub enum AvailabilityStoreMessage { tx: oneshot::Sender>, }, + /// Note that a candidate is backable. Only used by Speculative Availability to signal to Availability Store + /// that it should keep track of this candidate and it is highly likely the candidate will be backed. + NoteBackableCandidate{ + /// A hash of the backable candidate. + candidate_hash: CandidateHash, + /// Sending side of the channel to send result to. + tx: oneshot::Sender>, + }, + /// Computes and checks the erasure root of `AvailableData` before storing all of its chunks in /// the AV store. /// From e2ee38b148d9c68bf88ff156b8e20e1755c6063b Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Tue, 2 Dec 2025 15:17:08 -0500 Subject: [PATCH 03/16] remove duplicate function --- Cargo.lock | 1 - polkadot/node/core/provisioner/Cargo.toml | 1 - polkadot/node/core/provisioner/src/lib.rs | 241 +--------------------- 3 files changed, 7 insertions(+), 236 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f3d5136d3dced..82c4e34dcf4cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15756,7 +15756,6 @@ dependencies = [ name = "polkadot-node-core-provisioner" version = "7.0.0" dependencies = [ - "bitvec", "fatality", "futures", "futures-timer", diff --git a/polkadot/node/core/provisioner/Cargo.toml b/polkadot/node/core/provisioner/Cargo.toml index 91bac900dc0fc..fedf55081cfee 100644 --- a/polkadot/node/core/provisioner/Cargo.toml +++ b/polkadot/node/core/provisioner/Cargo.toml @@ -12,7 +12,6 @@ repository.workspace = true workspace = true [dependencies] -bitvec = { features = ["alloc"], workspace = true } fatality = { workspace = true } futures = { workspace = true } futures-timer = { workspace = true } diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 29e8eb34b3959..273c382ac799d 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -19,7 +19,6 @@ #![deny(missing_docs, unused_crate_dependencies)] -use bitvec::vec::BitVec; use futures::{ channel::oneshot::{self, Canceled}, future::BoxFuture, @@ -30,21 +29,21 @@ use futures::{ use futures_timer::Delay; use polkadot_node_subsystem::{ messages::{ - Ancestors, CandidateBackingMessage, ChainApiMessage, ProspectiveParachainsMessage, + CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData, ProvisionerMessage, }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; -use polkadot_node_subsystem_util::{request_availability_cores, TimeoutExt}; +use polkadot_node_subsystem_util::{request_availability_cores, TimeoutExt, request_backable_candidates, select_availability_bitfields}; use polkadot_primitives::{ - BackedCandidate, CandidateEvent, CandidateHash, CoreIndex, CoreState, Hash, Id as ParaId, - SignedAvailabilityBitfield, ValidatorIndex, + BackedCandidate, CandidateEvent, CoreState, Hash, + SignedAvailabilityBitfield, }; use sc_consensus_slots::time_until_next_slot; use schnellru::{ByLength, LruMap}; use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, time::Duration, }; mod disputes; @@ -421,8 +420,6 @@ fn note_provisionable_data( } } -type CoreAvailability = BitVec; - /// The provisioner is the subsystem best suited to choosing which specific /// backed candidates and availability bitfields should be assembled into the /// block. To engage this functionality, a @@ -515,167 +512,6 @@ async fn send_inherent_data( Ok(()) } -/// In general, we want to pick all the bitfields. However, we have the following constraints: -/// -/// - not more than one per validator -/// - each 1 bit must correspond to an occupied core -/// -/// If we have too many, an arbitrary selection policy is fine. For purposes of maximizing -/// availability, we pick the one with the greatest number of 1 bits. -/// -/// Note: This does not enforce any sorting precondition on the output; the ordering there will be -/// unrelated to the sorting of the input. -fn select_availability_bitfields( - cores: &[CoreState], - bitfields: &[SignedAvailabilityBitfield], - leaf_hash: &Hash, -) -> Vec { - let mut selected: BTreeMap = BTreeMap::new(); - - gum::debug!( - target: LOG_TARGET, - bitfields_count = bitfields.len(), - ?leaf_hash, - "bitfields count before selection" - ); - - 'a: for bitfield in bitfields.iter().cloned() { - if bitfield.payload().0.len() != cores.len() { - gum::debug!(target: LOG_TARGET, ?leaf_hash, "dropping bitfield due to length mismatch"); - continue - } - - let is_better = selected - .get(&bitfield.validator_index()) - .map_or(true, |b| b.payload().0.count_ones() < bitfield.payload().0.count_ones()); - - if !is_better { - gum::trace!( - target: LOG_TARGET, - val_idx = bitfield.validator_index().0, - ?leaf_hash, - "dropping bitfield due to duplication - the better one is kept" - ); - continue - } - - for (idx, _) in cores.iter().enumerate().filter(|v| !v.1.is_occupied()) { - // Bit is set for an unoccupied core - invalid - if *bitfield.payload().0.get(idx).as_deref().unwrap_or(&false) { - gum::debug!( - target: LOG_TARGET, - val_idx = bitfield.validator_index().0, - ?leaf_hash, - "dropping invalid bitfield - bit is set for an unoccupied core" - ); - continue 'a - } - } - - let _ = selected.insert(bitfield.validator_index(), bitfield); - } - - gum::debug!( - target: LOG_TARGET, - ?leaf_hash, - "selected {} of all {} bitfields (each bitfield is from a unique validator)", - selected.len(), - bitfields.len() - ); - - selected.into_values().collect() -} - -/// Requests backable candidates from Prospective Parachains subsystem -/// based on core states. -async fn request_backable_candidates( - availability_cores: &[CoreState], - bitfields: &[SignedAvailabilityBitfield], - relay_parent: &ActivatedLeaf, - sender: &mut impl overseer::ProvisionerSenderTrait, -) -> Result>, Error> { - let block_number_under_construction = relay_parent.number + 1; - - // Record how many cores are scheduled for each paraid. Use a BTreeMap because - // we'll need to iterate through them. - let mut scheduled_cores_per_para: BTreeMap = BTreeMap::new(); - // The on-chain ancestors of a para present in availability-cores. - let mut ancestors: HashMap = - HashMap::with_capacity(availability_cores.len()); - - for (core_idx, core) in availability_cores.iter().enumerate() { - let core_idx = CoreIndex(core_idx as u32); - match core { - CoreState::Scheduled(scheduled_core) => { - *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; - }, - CoreState::Occupied(occupied_core) => { - let is_available = bitfields_indicate_availability( - core_idx.0 as usize, - bitfields, - &occupied_core.availability, - ); - - if is_available { - ancestors - .entry(occupied_core.para_id()) - .or_default() - .insert(occupied_core.candidate_hash); - - if let Some(ref scheduled_core) = occupied_core.next_up_on_available { - // Request a new backable candidate for the newly scheduled para id. - *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; - } - } else if occupied_core.time_out_at <= block_number_under_construction { - // Timed out before being available. - - if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out { - // Candidate's availability timed out, practically same as scheduled. - *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; - } - } else { - // Not timed out and not available. - ancestors - .entry(occupied_core.para_id()) - .or_default() - .insert(occupied_core.candidate_hash); - } - }, - CoreState::Free => continue, - }; - } - - let mut selected_candidates: HashMap> = - HashMap::with_capacity(scheduled_cores_per_para.len()); - - for (para_id, core_count) in scheduled_cores_per_para { - let para_ancestors = ancestors.remove(¶_id).unwrap_or_default(); - - let response = get_backable_candidates( - relay_parent.hash, - para_id, - para_ancestors, - core_count as u32, - sender, - ) - .await?; - - if response.is_empty() { - gum::debug!( - target: LOG_TARGET, - leaf_hash = ?relay_parent.hash, - ?para_id, - "No backable candidate returned by prospective parachains", - ); - continue - } - - selected_candidates.insert(para_id, response); - } - - Ok(selected_candidates) -} - /// Determine which cores are free, and then to the degree possible, pick a candidate appropriate to /// each free core. async fn select_candidates( @@ -692,7 +528,7 @@ async fn select_candidates( ); let selected_candidates = - request_backable_candidates(availability_cores, bitfields, leaf, sender).await?; + request_backable_candidates(availability_cores, Some(bitfields), leaf, sender).await?; gum::debug!(target: LOG_TARGET, ?selected_candidates, "Got backable candidates"); // now get the backed candidates corresponding to these candidate receipts @@ -736,67 +572,4 @@ async fn select_candidates( ); Ok(merged_candidates) -} - -/// Requests backable candidates from Prospective Parachains based on -/// the given ancestors in the fragment chain. The ancestors may not be ordered. -async fn get_backable_candidates( - relay_parent: Hash, - para_id: ParaId, - ancestors: Ancestors, - count: u32, - sender: &mut impl overseer::ProvisionerSenderTrait, -) -> Result, Error> { - let (tx, rx) = oneshot::channel(); - sender - .send_message(ProspectiveParachainsMessage::GetBackableCandidates( - relay_parent, - para_id, - count, - ancestors, - tx, - )) - .await; - - rx.await.map_err(Error::CanceledBackableCandidates) -} - -/// The availability bitfield for a given core is the transpose -/// of a set of signed availability bitfields. It goes like this: -/// -/// - construct a transverse slice along `core_idx` -/// - bitwise-or it with the availability slice -/// - count the 1 bits, compare to the total length; true on 2/3+ -fn bitfields_indicate_availability( - core_idx: usize, - bitfields: &[SignedAvailabilityBitfield], - availability: &CoreAvailability, -) -> bool { - let mut availability = availability.clone(); - let availability_len = availability.len(); - - for bitfield in bitfields { - let validator_idx = bitfield.validator_index().0 as usize; - match availability.get_mut(validator_idx) { - None => { - // in principle, this function might return a `Result` so that we can - // more clearly express this error condition however, in practice, that would just - // push off an error-handling routine which would look a whole lot like this one. - // simpler to just handle the error internally here. - gum::warn!( - target: LOG_TARGET, - validator_idx = %validator_idx, - availability_len = %availability_len, - "attempted to set a transverse bit at idx {} which is greater than bitfield size {}", - validator_idx, - availability_len, - ); - - return false - }, - Some(mut bit_mut) => *bit_mut |= bitfield.payload().0[core_idx], - } - } - - 3 * availability.count_ones() >= 2 * availability.len() -} +} \ No newline at end of file From bf6b4568f8e38bfb66294865f9479df32d2c552a Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Tue, 2 Dec 2025 17:59:10 -0500 Subject: [PATCH 04/16] remove hack when querying chunk --- polkadot/node/core/av-store/src/lib.rs | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 7466f2a5dad05..b42f24357d9a1 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -1267,24 +1267,7 @@ fn store_chunk( let mut meta = match load_meta(db, config, &candidate_hash)? { Some(m) => m, - None => { - // TODO: revise this, this is a really bad hack - let now = SystemTime::now().duration_since(UNIX_EPOCH)?; - let meta = CandidateMeta { - state: State::Unavailable(now.into()), - data_available: false, - chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; 12], - }; - - gum::debug!( - target: LOG_TARGET, - ?candidate_hash, - validator_index = %validator_index.0, - "Cannot store chunk for unknown candidate.", - ); - // return Ok(false) // we weren't informed of this candidate by import events. - meta - }, + None => return Ok(false), // we weren't informed of this candidate by import events. }; match meta.chunks_stored.get(validator_index.0 as usize).map(|b| *b) { From bff8238e66623c78f2a1457c600550fae485ff12 Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Tue, 2 Dec 2025 23:19:26 -0500 Subject: [PATCH 05/16] clean up error handling, revise fetching backable candidates --- .../availability-distribution/src/error.rs | 10 +- .../src/requester/fetch_task/mod.rs | 2 +- .../src/requester/mod.rs | 216 +++++++----------- 3 files changed, 93 insertions(+), 135 deletions(-) diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs index 852e7bbcbbbf3..78d5dec5fa80b 100644 --- a/polkadot/node/network/availability-distribution/src/error.rs +++ b/polkadot/node/network/availability-distribution/src/error.rs @@ -88,6 +88,12 @@ pub enum Error { #[error("Erasure coding error: {0}")] ErasureCoding(#[from] polkadot_erasure_coding::Error), + + #[error("Expected a block header to be returned")] + NoSuchBlockHeader, + + #[error("Error from subsystem-util: {0}")] + SubsystemUtil(#[from] polkadot_node_subsystem_util::Error), } /// General result abbreviation type alias. @@ -112,7 +118,9 @@ pub fn log_error( JfyiError::QueryAvailableDataResponseChannel(_) | JfyiError::QueryChunkResponseChannel(_) | JfyiError::FailedNodeFeatures(_) | - JfyiError::ErasureCoding(_) => gum::warn!(target: LOG_TARGET, error = %jfyi, ctx), + JfyiError::ErasureCoding(_) | + JfyiError::NoSuchBlockHeader | + JfyiError::SubsystemUtil(_) => gum::warn!(target: LOG_TARGET, error = %jfyi, ctx), JfyiError::FetchPoV(_) | JfyiError::SendResponse | JfyiError::NoSuchPoV | diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 167ec47b2b5bc..f07db51d69aeb 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -36,7 +36,7 @@ use polkadot_node_subsystem::{ }; use polkadot_primitives::{ AuthorityDiscoveryId, BlakeTwo256, CandidateHash, ChunkIndex, GroupIndex, Hash, HashT, - OccupiedCore, SessionIndex, + SessionIndex, }; use sc_network::ProtocolName; diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 6f8b343ccc8a2..611d70c91dc4f 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -41,8 +41,9 @@ use polkadot_node_subsystem_util::{ request_availability_cores, request_backable_candidates, request_validator_groups, + runtime::recv_runtime, }; -use polkadot_primitives::{BackedCandidate, CandidateHash, CoreIndex, GroupIndex, Hash, OccupiedCore, SessionIndex}; +use polkadot_primitives::{CandidateHash, CoreIndex, GroupIndex, Hash, SessionIndex}; use super::{FatalError, Metrics, Result, LOG_TARGET, error::Error}; @@ -144,116 +145,100 @@ impl Requester { Ok(()) } - /// Start requesting chunks for newly imported head. - /// - /// This will also request [`SESSION_ANCESTRY_LEN`] leaf ancestors from the same session - /// and start requesting chunks for them too. - async fn start_requesting_chunks( + async fn request_backable_candidates_core_info( &mut self, ctx: &mut Context, - runtime: &mut RuntimeInfo, new_head: ActivatedLeaf, - ) -> Result<()> { - let sender = &mut ctx.sender().clone(); + ) -> Result> { let ActivatedLeaf { hash: leaf, .. } = new_head; - let (leaf_session_index, ancestors_in_session) = get_block_ancestors_in_same_session( - sender, - runtime, - leaf, - Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION, - ) - .await?; - let mut scheduled_cores = Vec::new(); - // Also spawn or bump tasks for candidates in ancestry in the same session. - for hash in std::iter::once(leaf).chain(ancestors_in_session) { - if hash == leaf { - let sender = &mut ctx.sender().clone(); + let sender = &mut ctx.sender().clone(); - // let bitfields = select_availability - let availability_cores = request_availability_cores(leaf, sender) - .await - .await; + let cores = recv_runtime(request_availability_cores(leaf, sender).await).await?; + let backable = request_backable_candidates(&cores, None, &new_head, sender).await?; - if availability_cores.is_err() { - return Err(Error::NoSuchPoV) - } + gum::info!( + target: LOG_TARGET, + backable = ?backable, + "Backable candidates" + ); - let cores = availability_cores.unwrap().unwrap(); + // now get the backed candidates corresponding to these candidate receipts + let (tx, rx) = oneshot::channel(); + sender.send_message(CandidateBackingMessage::GetBackableCandidates( + backable.clone(), + tx, + )).await; - let backable_candidates = request_backable_candidates(&cores, None, &new_head, sender).await; + let candidates = rx.await?; + gum::info!( + target: LOG_TARGET, + leaf_hash=?leaf, + candidates=?candidates, + "Got {} backed candidates", candidates.len() + ); - if backable_candidates.is_err() { - return Err(Error::NoSuchPoV) - } + // Process candidates and collect cores + for (para_id, candidates) in candidates.iter() { + for candidate in candidates { + let (_, core_index) = candidate.validator_indices_and_core_index(); + let Some(core_index) = core_index else { continue }; + + let receipt = candidate.candidate(); - let backable = backable_candidates.unwrap(); - gum::info!( target: LOG_TARGET, - backable = ?backable, - "Backable candidates" + para_id = ?para_id, + candidate_hash = ?receipt.hash(), + core_index = ?core_index, + "Scheduling candidate for core" ); - let sender = &mut ctx.sender().clone(); - // now get the backed candidates corresponding to these candidate receipts - let (tx, rx) = oneshot::channel(); - sender.send_message(CandidateBackingMessage::GetBackableCandidates( - backable.clone(), - tx, - )).await; - - let candidates = rx.await; - if candidates.is_err() { - return Err(Error::NoSuchPoV) - } - - let unwrapped = candidates.unwrap(); + let core = ( + core_index, + CoreInfo { + candidate_hash: receipt.hash(), + relay_parent: receipt.descriptor.relay_parent(), + erasure_root: receipt.descriptor.erasure_root(), + group_responsible: get_group_index_for_backed_candidate(sender, core_index, leaf).await?, + }, + ); gum::info!( target: LOG_TARGET, - leaf_hash=?leaf, - candidates=?unwrapped, - "Got {} backed candidates", unwrapped.len() + ?core, + "Scheduled core info" ); + scheduled_cores.push(core); + } + } - // Process candidates and collect cores - // let mut scheduled_cores = Vec::new(); - for (para_id, candidates) in unwrapped.iter() { - for candidate in candidates { - let (_, core_index) = candidate.validator_indices_and_core_index(); - let Some(core_index) = core_index else { continue }; - - let receipt = candidate.candidate(); + Ok(scheduled_cores) + } - gum::info!( - target: LOG_TARGET, - para_id = ?para_id, - candidate_hash = ?receipt.hash(), - core_index = ?core_index, - "Scheduling candidate for core" - ); + /// Start requesting chunks for newly imported head. + /// + /// This will also request [`SESSION_ANCESTRY_LEN`] leaf ancestors from the same session + /// and start requesting chunks for them too. + async fn start_requesting_chunks( + &mut self, + ctx: &mut Context, + runtime: &mut RuntimeInfo, + new_head: ActivatedLeaf, + ) -> Result<()> { + let sender = &mut ctx.sender().clone(); + let ActivatedLeaf { hash: leaf, .. } = new_head; + let (leaf_session_index, ancestors_in_session) = get_block_ancestors_in_same_session( + sender, + runtime, + leaf, + Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION, + ) + .await?; - let group_responsible = get_group_index_for_backed_candidate(candidate, leaf, sender).await?; + let scheduled_cores = self.request_backable_candidates_core_info(ctx, new_head).await?; - let core = ( - core_index, - CoreInfo { - candidate_hash: receipt.hash(), - relay_parent: receipt.descriptor.relay_parent(), - erasure_root: receipt.descriptor.erasure_root(), - group_responsible: group_responsible, - }, - ); - gum::info!( - target: LOG_TARGET, - ?core, - "Scheduled core info" - ); - scheduled_cores.push(core); - } - } - } - + // Also spawn or bump tasks for candidates in ancestry in the same session. + for hash in std::iter::once(leaf).chain(ancestors_in_session) { let cores = get_occupied_cores(sender, hash).await?; gum::trace!( target: LOG_TARGET, @@ -300,13 +285,14 @@ impl Requester { // leaf being deactivated. self.add_cores(ctx, runtime, leaf, leaf_session_index, cores).await?; } + if scheduled_cores.len() > 0 { gum::info!( target: LOG_TARGET, ?scheduled_cores, "Adding scheduled cores" ); - let sender = &mut ctx.sender().clone(); + for (_, core_info) in &scheduled_cores { let (tx, rx) = oneshot::channel(); sender @@ -523,59 +509,23 @@ where // Assuming you have the backed_candidate and relay_parent async fn get_group_index_for_backed_candidate( - backed_candidate: &BackedCandidate, + sender: &mut Sender, + core_index: CoreIndex, relay_parent: Hash, - sender: &mut Sender, + ) -> Result where Sender: overseer::SubsystemSender + overseer::SubsystemSender, { - let para_id = backed_candidate.receipt().descriptor.para_id(); - - let (validator_groups, mut rotation_info) = request_validator_groups(relay_parent, sender) - .await - .await - .map_err(|e| { - gum::error!( - target: LOG_TARGET, - ?para_id, - error = ?e, - "Error retrieving validator groups", - ); - Error::NoSuchPoV - })? - .map_err(|e| { - gum::error!( - target: LOG_TARGET, - ?para_id, - error = ?e, - "Error retrieving validator groups", - ); - Error::NoSuchPoV - })?; + let (validator_groups, mut rotation_info) = recv_runtime(request_validator_groups(relay_parent, sender).await).await?; // Get the block number for the relay_parent to update rotation_info.now let (tx, rx) = oneshot::channel(); sender.send_message(ChainApiMessage::BlockHeader(relay_parent, tx)).await; - let header = rx.await.map_err(|_| Error::NoSuchPoV)?.map_err(|_| Error::NoSuchPoV)?; - let block_number = header.map(|h| h.number).unwrap_or(0); // Default to 0 if not found, but should be found + let header = rx.await??; + let block_number = header.ok_or_else(|| Error::NoSuchBlockHeader)?.number; rotation_info.now = block_number.into(); // Update now to the relay_parent's block number - let (_, core_index)= backed_candidate.validator_indices_and_core_index(); - let core_index = match core_index { - Some(index) => index, - None => { - gum::error!( - target: LOG_TARGET, - ?para_id, - "No core index found for backed candidate", - ); - return Err(Error::NoSuchPoV) - } - }; - - let cores = validator_groups.len(); - - Ok(rotation_info.group_for_core(core_index, cores)) + Ok(rotation_info.group_for_core(core_index, validator_groups.len())) } \ No newline at end of file From 3b8c5e22722bc9acac91e47d8387c50cd4e4feda Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Wed, 3 Dec 2025 16:38:14 -0500 Subject: [PATCH 06/16] add CoreInfoOrigin to request types --- .../src/requester/fetch_task/mod.rs | 25 +++++++++++++------ .../src/requester/mod.rs | 12 +++++++++ 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index f07db51d69aeb..09ff8b7721dac 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -45,7 +45,7 @@ use crate::{ metrics::{Metrics, FAILED, SUCCEEDED}, requester::{ session_cache::{BadValidators, SessionInfo}, - CoreInfo, + CoreInfo, CoreInfoOrigin, }, LOG_TARGET, }; @@ -60,6 +60,7 @@ mod tests; pub struct FetchTaskConfig { prepared_running: Option, live_in: HashSet, + origin: CoreInfoOrigin, } /// Information about a task fetching an erasure chunk. @@ -74,6 +75,9 @@ pub struct FetchTask { /// We keep the task around in until `live_in` becomes empty, to make /// sure we won't re-fetch an already fetched candidate. state: FetchedState, + + /// The origin of the `CoreInfo` that was used to create this fetch task. + origin: CoreInfoOrigin, } /// State of a particular candidate chunk fetching process. @@ -140,6 +144,9 @@ struct RunningTask { /// Full protocol name for ChunkFetchingV2. req_v2_protocol_name: ProtocolName, + + /// The origin of the `CoreInfo` that was used to create this running task. + origin: CoreInfoOrigin, } impl FetchTaskConfig { @@ -160,7 +167,7 @@ impl FetchTaskConfig { // Don't run tasks for our backing group: if session_info.our_group == Some(core.group_responsible) { - return FetchTaskConfig { live_in, prepared_running: None } + return FetchTaskConfig { live_in, prepared_running: None, origin: core.origin.clone() } } let prepared_running = RunningTask { @@ -179,9 +186,10 @@ impl FetchTaskConfig { sender, chunk_index, req_v1_protocol_name, - req_v2_protocol_name + req_v2_protocol_name, + origin: core.origin.clone(), }; - FetchTaskConfig { live_in, prepared_running: Some(prepared_running) } + FetchTaskConfig { live_in, prepared_running: Some(prepared_running), origin: core.origin.clone() } } } @@ -191,7 +199,7 @@ impl FetchTask { /// /// A task handling the fetching of the configured chunk will be spawned. pub async fn start(config: FetchTaskConfig, ctx: &mut Context) -> Result { - let FetchTaskConfig { prepared_running, live_in } = config; + let FetchTaskConfig { prepared_running, live_in, origin } = config; if let Some(running) = prepared_running { let (handle, kill) = oneshot::channel(); @@ -199,9 +207,9 @@ impl FetchTask { ctx.spawn("chunk-fetcher", running.run(kill).boxed()) .map_err(|e| FatalError::SpawnTask(e))?; - Ok(FetchTask { live_in, state: FetchedState::Started(handle) }) + Ok(FetchTask { live_in, state: FetchedState::Started(handle), origin: origin.clone() }) } else { - Ok(FetchTask { live_in, state: FetchedState::Canceled }) + Ok(FetchTask { live_in, state: FetchedState::Canceled, origin: origin.clone() }) } } @@ -306,6 +314,7 @@ impl RunningTask { session_index = ?self.session_index, chunk_index = ?self.request.index, candidate_hash = ?self.request.candidate_hash, + origin = ?self.origin, "Validator did not have our chunk" ); bad_validators.push(validator); @@ -338,6 +347,7 @@ impl RunningTask { session_index = ?self.session_index, chunk_index = ?self.request.index, candidate_hash = ?self.request.candidate_hash, + origin = ?self.origin, "Erasure chunk validated successfully", ); @@ -353,6 +363,7 @@ impl RunningTask { session_index = ?self.session_index, chunk_index = ?self.request.index, candidate_hash = ?self.request.candidate_hash, + origin = ?self.origin, "Erasure chunk stored successfully", ); break diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 611d70c91dc4f..aacf34a491aae 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -72,8 +72,18 @@ struct CoreInfo { erasure_root: Hash, /// The group index of the group responsible for the candidate. group_responsible: GroupIndex, + /// The origin of the CoreInfo, whether from occupied or scheduled cores. + origin: CoreInfoOrigin, } +/// Origin of CoreInfo. Whether it was created by calling prospective parachains for scheduled +/// candidates, or by querying occupied cores for already backed candidates. +#[derive(Debug, Clone)] +enum CoreInfoOrigin { + Occupied, + Scheduled, +} + /// Requester takes care of requesting erasure chunks from backing groups and stores them in the /// av store. /// @@ -201,6 +211,7 @@ impl Requester { relay_parent: receipt.descriptor.relay_parent(), erasure_root: receipt.descriptor.erasure_root(), group_responsible: get_group_index_for_backed_candidate(sender, core_index, leaf).await?, + origin: CoreInfoOrigin::Scheduled, }, ); gum::info!( @@ -265,6 +276,7 @@ impl Requester { relay_parent: occ.candidate_descriptor.relay_parent(), erasure_root: occ.candidate_descriptor.erasure_root(), group_responsible: occ.group_responsible, + origin: CoreInfoOrigin::Occupied, }, ); gum::info!( From ac4f97b1b4d29b8d37c6d6053e3f5d90e4bdcc06 Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Wed, 3 Dec 2025 18:42:39 -0500 Subject: [PATCH 07/16] add wip zombienet test for speculative availability --- .../tests/functional/mod.rs | 1 + .../speculative_availability_requests.rs | 89 +++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs diff --git a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs index e28cfb4039303..f0b1755cc0f02 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs @@ -6,6 +6,7 @@ mod approved_peer_mixed_validators; mod async_backing_6_seconds_rate; mod dispute_old_finalized; mod duplicate_collations; +mod speculative_availability_requests; mod shared_core_idle_parachain; mod spam_statement_distribution_requests; mod sync_backing; diff --git a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs new file mode 100644 index 0000000000000..2d73aff98ddab --- /dev/null +++ b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs @@ -0,0 +1,89 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Test we are producing 12-second parachain blocks if using an old collator, pre async-backing. + +use anyhow::anyhow; + +use cumulus_zombienet_sdk_helpers::{assert_finality_lag, assert_para_throughput}; +use polkadot_primitives::Id as ParaId; +use serde_json::json; +use zombienet_sdk::{ + subxt::{OnlineClient, PolkadotConfig}, + NetworkConfigBuilder, +}; + +#[tokio::test(flavor = "multi_thread")] +async fn speculative_availability_requests_test() -> Result<(), anyhow::Error> { + let _ = env_logger::try_init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let images = zombienet_sdk::environment::get_images_from_env(); + + let config = NetworkConfigBuilder::new() + .with_relaychain(|r| { + let r = r + .with_chain("rococo-local") + .with_default_command("polkadot") + .with_default_image(images.polkadot.as_str()) + .with_default_args(vec![("-lparachain=debug").into()]) + .with_genesis_overrides(json!({ + "configuration": { + "config": { + "scheduler_params": { + "group_rotation_frequency": 4 + } + } + } + })) + .with_node(|node| node.with_name("validator-0")); + + (1..12) + .fold(r, |acc, i| acc.with_node(|node| node.with_name(&format!("validator-{i}")))) + }) + .with_parachain(|p| { + p.with_id(2000) + .with_default_command("polkadot-parachain") + .with_default_image(images.cumulus.as_str()) + .with_default_args(vec![("-lparachain=debug,aura=debug").into()]) + .with_collator(|n| n.with_name("collator-2000")) + }) + .with_parachain(|p| { + p.with_id(2001) + .with_default_command("polkadot-parachain") + .with_default_image(images.cumulus.as_str()) + .with_default_args(vec![("-lparachain=debug,aura=debug").into()]) + .with_collator(|n| n.with_name("collator-2001")) + }) + .build() + .map_err(|e| { + let errs = e.into_iter().map(|e| e.to_string()).collect::>().join(" "); + anyhow!("config errs: {errs}") + })?; + + let spawn_fn = zombienet_sdk::environment::get_spawn_fn(); + let network = spawn_fn(config).await?; + + let relay_node = network.get_node("validator-0")?; + let para_node_2001 = network.get_node("collator-2001")?; + + let relay_client: OnlineClient = relay_node.wait_client().await?; + + assert_para_throughput( + &relay_client, + 15, + [(ParaId::from(2000), 11..16), (ParaId::from(2001), 11..16)] + .into_iter() + .collect(), + ) + .await?; + + // Assert the parachain finalized block height is also on par with the number of backed + // candidates. We can only do this for the collator based on cumulus. + assert_finality_lag(¶_node_2001.wait_client().await?, 6).await?; + + log::info!("Test finished successfully"); + + Ok(()) +} From db5b91216889627660d58f43d89b6cf4a35c9ce2 Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Wed, 3 Dec 2025 23:06:42 -0500 Subject: [PATCH 08/16] add origin metric label to fetched_chunks, assert on said label in ZN test --- .../availability-distribution/src/metrics.rs | 12 +++++++++--- .../src/requester/fetch_task/mod.rs | 17 +++++++++++++---- .../speculative_availability_requests.rs | 9 +++++++++ 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/polkadot/node/network/availability-distribution/src/metrics.rs b/polkadot/node/network/availability-distribution/src/metrics.rs index bc724be10d9b7..01de7cf4784b4 100644 --- a/polkadot/node/network/availability-distribution/src/metrics.rs +++ b/polkadot/node/network/availability-distribution/src/metrics.rs @@ -31,6 +31,12 @@ pub const FAILED: &'static str = "failed"; /// Label for chunks/PoVs that could not be served, because they were not available. pub const NOT_FOUND: &'static str = "not-found"; +/// Label for scheduled core origin. +pub const SCHEDULED: &'static str = "scheduled"; + +/// Label for occupied core origin. +pub const OCCUPIED: &'static str = "occupied"; + /// Availability Distribution metrics. #[derive(Clone, Default)] pub struct Metrics(Option); @@ -65,9 +71,9 @@ impl Metrics { } /// Increment counter on fetched labels. - pub fn on_fetch(&self, label: &'static str) { + pub fn on_fetch(&self, label: &'static str, origin: &'static str) { if let Some(metrics) = &self.0 { - metrics.fetched_chunks.with_label_values(&[label]).inc() + metrics.fetched_chunks.with_label_values(&[label, origin]).inc() } } @@ -109,7 +115,7 @@ impl metrics::Metrics for Metrics { "polkadot_parachain_fetched_chunks_total", "Total number of fetched chunks.", ), - &["success"] + &["success", "origin"], )?, registry, )?, diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 09ff8b7721dac..f7a0c689bbdb6 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -42,7 +42,7 @@ use sc_network::ProtocolName; use crate::{ error::{FatalError, Result}, - metrics::{Metrics, FAILED, SUCCEEDED}, + metrics::{Metrics, FAILED, SUCCEEDED, SCHEDULED, OCCUPIED}, requester::{ session_cache::{BadValidators, SessionInfo}, CoreInfo, CoreInfoOrigin, @@ -294,7 +294,10 @@ impl RunningTask { target: LOG_TARGET, "Node seems to be shutting down, canceling fetch task" ); - self.metrics.on_fetch(FAILED); + self.metrics.on_fetch(FAILED, match self.origin { + CoreInfoOrigin::Scheduled => SCHEDULED, + CoreInfoOrigin::Occupied => OCCUPIED, + }); return }, Err(TaskError::PeerError) => { @@ -369,10 +372,16 @@ impl RunningTask { break } if succeeded { - self.metrics.on_fetch(SUCCEEDED); + self.metrics.on_fetch(SUCCEEDED, match self.origin { + CoreInfoOrigin::Scheduled => SCHEDULED, + CoreInfoOrigin::Occupied => OCCUPIED, + }); self.conclude(bad_validators).await; } else { - self.metrics.on_fetch(FAILED); + self.metrics.on_fetch(FAILED, match self.origin { + CoreInfoOrigin::Scheduled => SCHEDULED, + CoreInfoOrigin::Occupied => OCCUPIED, + }); self.conclude_fail().await } } diff --git a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs index 2d73aff98ddab..73b93a21ad918 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs @@ -79,6 +79,15 @@ async fn speculative_availability_requests_test() -> Result<(), anyhow::Error> { ) .await?; + let scheduled_metric_name = "polkadot_parachain_fetched_chunks_total{origin=\"scheduled\",success=\"succeeded\"}"; + // scheduled chunk fetches should be more than the asserted para throughput given blocks are still produced + assert!(relay_node.assert_with(scheduled_metric_name, |v| {v >= 22.0 && v <= 40.0}).await?); + + let occupied_metric_name = "polkadot_parachain_fetched_chunks_total{origin=\"occupied\",success=\"succeeded\"}"; + // given when speculative availability is requested on active leaves update, there may not be any backable + // candidates from Prospective Parachains at that the time. + assert!(relay_node.assert_with(occupied_metric_name, |v| {v >= 2.0 && v <= 10.0}).await?); + // Assert the parachain finalized block height is also on par with the number of backed // candidates. We can only do this for the collator based on cumulus. assert_finality_lag(¶_node_2001.wait_client().await?, 6).await?; From cd93f0324048b97e6c41e0b14fcec1a641374035 Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Thu, 4 Dec 2025 10:54:29 -0500 Subject: [PATCH 09/16] revise NoteBackableCandidate msg to handle HashSet of candidate hashes, add num_validators attribute --- polkadot/node/core/av-store/src/lib.rs | 35 +++++++++-------- .../src/requester/mod.rs | 38 ++++++++++++++++--- polkadot/node/subsystem-types/src/messages.rs | 6 ++- 3 files changed, 56 insertions(+), 23 deletions(-) diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index b42f24357d9a1..382dce6785320 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -1232,23 +1232,25 @@ fn process_message( }, } }, - AvailabilityStoreMessage::NoteBackableCandidate{ candidate_hash, tx } => { - let res = note_backable_candidate( - &subsystem.db, - &subsystem.config, - candidate_hash, - subsystem, - ); + AvailabilityStoreMessage::NoteBackableCandidates{ candidate_hashes, num_validators, tx } => { + for candidate_hash in candidate_hashes { + let res = note_backable_candidate( + &subsystem.db, + &subsystem.config, + candidate_hash, + num_validators, + subsystem, + ); - match res { - Ok(_) => { - let _ = tx.send(Ok(())); - }, - Err(e) => { - let _ = tx.send(Err(())); - return Err(e) - }, + match res { + Ok(_) => {}, + Err(e) => { + let _ = tx.send(Err(())); + return Err(e) + }, + } } + let _ = tx.send(Ok(())); }, } @@ -1436,6 +1438,7 @@ fn note_backable_candidate( db: &Arc, config: &Config, candidate_hash: CandidateHash, + num_validators: usize, subsystem: &AvailabilityStoreSubsystem, ) -> Result<(), Error> { let mut tx = DBTransaction::new(); @@ -1445,7 +1448,7 @@ fn note_backable_candidate( let meta = CandidateMeta { state: State::Unavailable(now.into()), data_available: false, - chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; 12], + chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; num_validators], }; let prune_at = now + KEEP_UNAVAILABLE_FOR; diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index aacf34a491aae..51ee5d6aa1318 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -305,21 +305,49 @@ impl Requester { "Adding scheduled cores" ); - for (_, core_info) in &scheduled_cores { + let candidate_hashes = scheduled_cores.iter().map(|(_, core_info)| core_info.candidate_hash).collect::>(); + + let session_info = self + .session_cache + .get_session_info( + ctx, + runtime, + // We use leaf here, the relay_parent must be in the same session as + // the leaf. This is guaranteed by runtime which ensures that cores are + // cleared at session boundaries. At the same time, only leaves are + // guaranteed to be fetchable by the state trie. + leaf, + leaf_session_index, + ) + .await + .map_err(|err| { + gum::warn!( + target: LOG_TARGET, + error = ?err, + "Failed to spawn a fetch task" + ); + err + })?; + + if let Some(session_info) = session_info { + let num_validators = + session_info.validator_groups.iter().fold(0usize, |mut acc, group| { + acc = acc.saturating_add(group.len()); + acc + }); + let (tx, rx) = oneshot::channel(); sender .send_message( - AvailabilityStoreMessage::NoteBackableCandidate{ candidate_hash: core_info.candidate_hash, tx }, + AvailabilityStoreMessage::NoteBackableCandidates{ candidate_hashes: candidate_hashes, num_validators, tx }, ) .await; if let Err(err) = rx.await { gum::error!(target: LOG_TARGET, "Sending NoteBackableCandidate message failed: {:?}", err); } - + self.add_cores(ctx, runtime, leaf, leaf_session_index, scheduled_cores).await?; } - self.add_cores(ctx, runtime, leaf, leaf_session_index, scheduled_cores).await?; } - Ok(()) } diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index d6ac4d5882c52..ecffceecf5604 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -574,9 +574,11 @@ pub enum AvailabilityStoreMessage { /// Note that a candidate is backable. Only used by Speculative Availability to signal to Availability Store /// that it should keep track of this candidate and it is highly likely the candidate will be backed. - NoteBackableCandidate{ + NoteBackableCandidates{ /// A hash of the backable candidate. - candidate_hash: CandidateHash, + candidate_hashes: HashSet, + /// The number of validators in the session. + num_validators: usize, /// Sending side of the channel to send result to. tx: oneshot::Sender>, }, From 6e087a70b92ce45d298dfd37fcb0f49b2ad6fcbc Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Fri, 5 Dec 2025 00:25:18 -0500 Subject: [PATCH 10/16] add speculative-availability cli flag, integrate into availability distribution --- .../src/lib.rs | 1 + polkadot/cli/src/cli.rs | 4 + polkadot/cli/src/command.rs | 1 + .../availability-distribution/src/lib.rs | 9 +- .../src/requester/fetch_task/mod.rs | 14 +-- .../src/requester/mod.rs | 102 +++++++++--------- polkadot/node/service/src/builder/mod.rs | 4 + polkadot/node/service/src/overseer.rs | 4 + .../async_backing_6_seconds_rate.rs | 8 ++ .../speculative_availability_requests.rs | 2 +- 10 files changed, 88 insertions(+), 61 deletions(-) diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index b989f81efd5dc..37c9a4a2b545a 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -419,6 +419,7 @@ fn build_polkadot_full_node( keep_finalized_for: None, invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, + speculative_availability: false, }; let (relay_chain_full_node, paranode_req_receiver) = match config.network.network_backend { diff --git a/polkadot/cli/src/cli.rs b/polkadot/cli/src/cli.rs index fa8595cc7c57c..f0b6b26d7a40b 100644 --- a/polkadot/cli/src/cli.rs +++ b/polkadot/cli/src/cli.rs @@ -166,6 +166,10 @@ pub struct RunCmd { /// **Dangerous!** Do not touch unless explicitly advised to. #[arg(long, hide = true)] pub collator_protocol_hold_off: Option, + + /// Enable speculative availability requests via Prospective Parachains. Defaults to disabled. + #[arg(long = "speculative-availability")] + pub speculative_availability: bool, } #[allow(missing_docs)] diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index b9366d07f6c54..5e43d020d6a47 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -292,6 +292,7 @@ where keep_finalized_for: cli.run.keep_finalized_for, invulnerable_ah_collators, collator_protocol_hold_off, + speculative_availability: cli.run.speculative_availability, }, ) .map(|full| full.task_manager)?; diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 438453814978c..25e670e88c6ac 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -62,6 +62,8 @@ pub struct AvailabilityDistributionSubsystem { req_protocol_names: ReqProtocolNames, /// Prometheus metrics. metrics: Metrics, + /// Whether speculative availability requests are enabled. + speculative_availability: bool, } /// Receivers to be passed into availability distribution. @@ -94,21 +96,22 @@ impl AvailabilityDistributionSubsystem { recvs: IncomingRequestReceivers, req_protocol_names: ReqProtocolNames, metrics: Metrics, + speculative_availability: bool, ) -> Self { let runtime = RuntimeInfo::new(Some(keystore)); - Self { runtime, recvs, req_protocol_names, metrics } + Self { runtime, recvs, req_protocol_names, metrics, speculative_availability } } /// Start processing work as passed on from the Overseer. async fn run(self, mut ctx: Context) -> std::result::Result<(), FatalError> { - let Self { mut runtime, recvs, metrics, req_protocol_names } = self; + let Self { mut runtime, recvs, metrics, req_protocol_names, speculative_availability } = self; let IncomingRequestReceivers { pov_req_receiver, chunk_req_v1_receiver, chunk_req_v2_receiver, } = recvs; - let mut requester = Requester::new(req_protocol_names, metrics.clone()).fuse(); + let mut requester = Requester::new(req_protocol_names, metrics.clone(), speculative_availability).fuse(); let mut warn_freq = gum::Freq::new(); { diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index f7a0c689bbdb6..a730a82bd5ab2 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -60,7 +60,6 @@ mod tests; pub struct FetchTaskConfig { prepared_running: Option, live_in: HashSet, - origin: CoreInfoOrigin, } /// Information about a task fetching an erasure chunk. @@ -75,9 +74,6 @@ pub struct FetchTask { /// We keep the task around in until `live_in` becomes empty, to make /// sure we won't re-fetch an already fetched candidate. state: FetchedState, - - /// The origin of the `CoreInfo` that was used to create this fetch task. - origin: CoreInfoOrigin, } /// State of a particular candidate chunk fetching process. @@ -167,7 +163,7 @@ impl FetchTaskConfig { // Don't run tasks for our backing group: if session_info.our_group == Some(core.group_responsible) { - return FetchTaskConfig { live_in, prepared_running: None, origin: core.origin.clone() } + return FetchTaskConfig { live_in, prepared_running: None } } let prepared_running = RunningTask { @@ -189,7 +185,7 @@ impl FetchTaskConfig { req_v2_protocol_name, origin: core.origin.clone(), }; - FetchTaskConfig { live_in, prepared_running: Some(prepared_running), origin: core.origin.clone() } + FetchTaskConfig { live_in, prepared_running: Some(prepared_running) } } } @@ -199,7 +195,7 @@ impl FetchTask { /// /// A task handling the fetching of the configured chunk will be spawned. pub async fn start(config: FetchTaskConfig, ctx: &mut Context) -> Result { - let FetchTaskConfig { prepared_running, live_in, origin } = config; + let FetchTaskConfig { prepared_running, live_in, .. } = config; if let Some(running) = prepared_running { let (handle, kill) = oneshot::channel(); @@ -207,9 +203,9 @@ impl FetchTask { ctx.spawn("chunk-fetcher", running.run(kill).boxed()) .map_err(|e| FatalError::SpawnTask(e))?; - Ok(FetchTask { live_in, state: FetchedState::Started(handle), origin: origin.clone() }) + Ok(FetchTask { live_in, state: FetchedState::Started(handle) }) } else { - Ok(FetchTask { live_in, state: FetchedState::Canceled, origin: origin.clone() }) + Ok(FetchTask { live_in, state: FetchedState::Canceled }) } } diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 51ee5d6aa1318..8deae3dc3e5db 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -111,6 +111,9 @@ pub struct Requester { /// Mapping of the req-response protocols to the full protocol names. req_protocol_names: ReqProtocolNames, + + /// Whether speculative availability requests are enabled. + speculative_availability: bool, } #[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)] @@ -122,7 +125,7 @@ impl Requester { /// /// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress /// by advancing the stream. - pub fn new(req_protocol_names: ReqProtocolNames, metrics: Metrics) -> Self { + pub fn new(req_protocol_names: ReqProtocolNames, metrics: Metrics, speculative_availability: bool) -> Self { let (tx, rx) = mpsc::channel(1); Requester { fetches: HashMap::new(), @@ -131,6 +134,7 @@ impl Requester { rx, metrics, req_protocol_names, + speculative_availability, } } @@ -246,8 +250,6 @@ impl Requester { ) .await?; - let scheduled_cores = self.request_backable_candidates_core_info(ctx, new_head).await?; - // Also spawn or bump tasks for candidates in ancestry in the same session. for hash in std::iter::once(leaf).chain(ancestors_in_session) { let cores = get_occupied_cores(sender, hash).await?; @@ -298,54 +300,58 @@ impl Requester { self.add_cores(ctx, runtime, leaf, leaf_session_index, cores).await?; } - if scheduled_cores.len() > 0 { - gum::info!( - target: LOG_TARGET, - ?scheduled_cores, - "Adding scheduled cores" - ); + if self.speculative_availability { + let scheduled_cores = self.request_backable_candidates_core_info(ctx, new_head).await?; + if scheduled_cores.len() > 0 { + gum::info!( + target: LOG_TARGET, + ?scheduled_cores, + "Adding scheduled cores" + ); - let candidate_hashes = scheduled_cores.iter().map(|(_, core_info)| core_info.candidate_hash).collect::>(); - - let session_info = self - .session_cache - .get_session_info( - ctx, - runtime, - // We use leaf here, the relay_parent must be in the same session as - // the leaf. This is guaranteed by runtime which ensures that cores are - // cleared at session boundaries. At the same time, only leaves are - // guaranteed to be fetchable by the state trie. - leaf, - leaf_session_index, - ) - .await - .map_err(|err| { - gum::warn!( - target: LOG_TARGET, - error = ?err, - "Failed to spawn a fetch task" - ); - err - })?; - - if let Some(session_info) = session_info { - let num_validators = - session_info.validator_groups.iter().fold(0usize, |mut acc, group| { - acc = acc.saturating_add(group.len()); - acc - }); - - let (tx, rx) = oneshot::channel(); - sender - .send_message( - AvailabilityStoreMessage::NoteBackableCandidates{ candidate_hashes: candidate_hashes, num_validators, tx }, + let candidate_hashes = scheduled_cores.iter().map(|(_, core_info)| core_info.candidate_hash).collect::>(); + + let session_info = self + .session_cache + .get_session_info( + ctx, + runtime, + // We use leaf here, the relay_parent must be in the same session as + // the leaf. This is guaranteed by runtime which ensures that cores are + // cleared at session boundaries. At the same time, only leaves are + // guaranteed to be fetchable by the state trie. + leaf, + leaf_session_index, ) - .await; - if let Err(err) = rx.await { - gum::error!(target: LOG_TARGET, "Sending NoteBackableCandidate message failed: {:?}", err); + .await + .map_err(|err| { + gum::warn!( + target: LOG_TARGET, + error = ?err, + "Failed to spawn a fetch task" + ); + err + })?; + + if let Some(session_info) = session_info { + let num_validators = + session_info.validator_groups.iter().fold(0usize, |mut acc, group| { + acc = acc.saturating_add(group.len()); + acc + }); + + let (tx, rx) = oneshot::channel(); + sender + .send_message( + AvailabilityStoreMessage::NoteBackableCandidates{ candidate_hashes: candidate_hashes, num_validators, tx }, + ) + .await; + if let Err(err) = rx.await { + gum::error!(target: LOG_TARGET, "Sending NoteBackableCandidate message failed: {:?}", err); + } else { + self.add_cores(ctx, runtime, leaf, leaf_session_index, scheduled_cores).await?; + } } - self.add_cores(ctx, runtime, leaf, leaf_session_index, scheduled_cores).await?; } } Ok(()) diff --git a/polkadot/node/service/src/builder/mod.rs b/polkadot/node/service/src/builder/mod.rs index 9d52617cc8da9..bfbb20de1e7e8 100644 --- a/polkadot/node/service/src/builder/mod.rs +++ b/polkadot/node/service/src/builder/mod.rs @@ -98,6 +98,8 @@ pub struct NewFullParams { pub invulnerable_ah_collators: HashSet, /// Override for `HOLD_OFF_DURATION` constant . pub collator_protocol_hold_off: Option, + /// Enable speculative availability requests via Prospective Parachains. + pub speculative_availability: bool, } /// Completely built polkadot node service. @@ -209,6 +211,7 @@ where keep_finalized_for, invulnerable_ah_collators, collator_protocol_hold_off, + speculative_availability, }, overseer_connector, partial_components: @@ -452,6 +455,7 @@ where fetch_chunks_threshold, invulnerable_ah_collators, collator_protocol_hold_off, + speculative_availability, }) }; diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index d6ed752b4c31c..b9c474b146008 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -147,6 +147,8 @@ pub struct ExtendedOverseerGenArgs { pub invulnerable_ah_collators: HashSet, /// Override for `HOLD_OFF_DURATION` constant . pub collator_protocol_hold_off: Option, + /// Whether speculative availability requests are enabled. + pub speculative_availability: bool, } /// Obtain a prepared validator `Overseer`, that is initialized with all default values. @@ -183,6 +185,7 @@ pub fn validator_overseer_builder( fetch_chunks_threshold, invulnerable_ah_collators, collator_protocol_hold_off, + speculative_availability, }: ExtendedOverseerGenArgs, ) -> Result< InitializedOverseerBuilder< @@ -262,6 +265,7 @@ where }, req_protocol_names.clone(), Metrics::register(registry)?, + speculative_availability, )) .availability_recovery(AvailabilityRecoverySubsystem::for_validator( fetch_chunks_threshold, diff --git a/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs b/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs index 08f1d59bbddbe..6d0c213db33b4 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs @@ -82,6 +82,14 @@ async fn async_backing_6_seconds_rate_test() -> Result<(), anyhow::Error> { ) .await?; + let scheduled_metric_name = "polkadot_parachain_fetched_chunks_total{origin=\"scheduled\",success=\"succeeded\"}"; + // scheduled chunk fetches should be 0 since speculative availability is disabled + assert!(relay_node.assert_with(scheduled_metric_name, |v| {v == 0.0}).await?); + + let occupied_metric_name = "polkadot_parachain_fetched_chunks_total{origin=\"occupied\",success=\"succeeded\"}"; + // all requests should be occupied since speculative availability is disabled + assert!(relay_node.assert_with(occupied_metric_name, |v| {v >= 22.0 && v <= 40.0}).await?); + // Assert the parachain finalized block height is also on par with the number of backed // candidates. We can only do this for the collator based on cumulus. assert_finality_lag(¶_node_2001.wait_client().await?, 6).await?; diff --git a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs index 73b93a21ad918..b2dae2561bf4e 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs @@ -27,7 +27,7 @@ async fn speculative_availability_requests_test() -> Result<(), anyhow::Error> { .with_chain("rococo-local") .with_default_command("polkadot") .with_default_image(images.polkadot.as_str()) - .with_default_args(vec![("-lparachain=debug").into()]) + .with_default_args(vec![("-lparachain=debug", "--speculative-availability").into()]) .with_genesis_overrides(json!({ "configuration": { "config": { From 9f930cd4ae43487cf3d906db756bd254c8ef3735 Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Fri, 5 Dec 2025 14:06:41 -0500 Subject: [PATCH 11/16] code cleanup --- polkadot/node/core/av-store/src/lib.rs | 10 +-- .../availability-distribution/src/error.rs | 6 +- .../src/requester/fetch_task/mod.rs | 35 -------- .../src/requester/mod.rs | 80 +++---------------- .../speculative_availability_requests.rs | 3 +- 5 files changed, 21 insertions(+), 113 deletions(-) diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 382dce6785320..1b60c768dd8ce 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -1280,15 +1280,7 @@ fn store_chunk( write_chunk(&mut tx, config, &candidate_hash, validator_index, &chunk); write_meta(&mut tx, config, &candidate_hash, &meta); }, - None => { - gum::debug!( - target: LOG_TARGET, - ?candidate_hash, - validator_index = %validator_index.0, - "Cannot store chunk for unknown validator index.", - ); - return Ok(false) // out of bounds. - }, + None => return Ok(false) // out of bounds. } gum::debug!( diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs index 78d5dec5fa80b..d0808f6b0c229 100644 --- a/polkadot/node/network/availability-distribution/src/error.rs +++ b/polkadot/node/network/availability-distribution/src/error.rs @@ -94,6 +94,9 @@ pub enum Error { #[error("Error from subsystem-util: {0}")] SubsystemUtil(#[from] polkadot_node_subsystem_util::Error), + + #[error("Retrieving response from Availability Store unexpectedly failed")] + AvailabilityStore, } /// General result abbreviation type alias. @@ -120,7 +123,8 @@ pub fn log_error( JfyiError::FailedNodeFeatures(_) | JfyiError::ErasureCoding(_) | JfyiError::NoSuchBlockHeader | - JfyiError::SubsystemUtil(_) => gum::warn!(target: LOG_TARGET, error = %jfyi, ctx), + JfyiError::SubsystemUtil(_) | + JfyiError::AvailabilityStore => gum::warn!(target: LOG_TARGET, error = %jfyi, ctx), JfyiError::FetchPoV(_) | JfyiError::SendResponse | JfyiError::NoSuchPoV | diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index a730a82bd5ab2..7f726e3d8bcdd 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -321,50 +321,15 @@ impl RunningTask { }, }; - gum::info!( - target: LOG_TARGET, - validator = ?validator, - relay_parent = ?self.relay_parent, - group_index = ?self.group_index, - session_index = ?self.session_index, - chunk_index = ?self.request.index, - candidate_hash = ?self.request.candidate_hash, - "Received erasure chunk from validator", - ); - // Data genuine? if !self.validate_chunk(&validator, &chunk, self.chunk_index) { bad_validators.push(validator); continue } - gum::info!( - target: LOG_TARGET, - validator = ?validator, - relay_parent = ?self.relay_parent, - group_index = ?self.group_index, - session_index = ?self.session_index, - chunk_index = ?self.request.index, - candidate_hash = ?self.request.candidate_hash, - origin = ?self.origin, - "Erasure chunk validated successfully", - ); - // Ok, let's store it and be happy: self.store_chunk(chunk).await; succeeded = true; - - gum::info!( - target: LOG_TARGET, - validator = ?validator, - relay_parent = ?self.relay_parent, - group_index = ?self.group_index, - session_index = ?self.session_index, - chunk_index = ?self.request.index, - candidate_hash = ?self.request.candidate_hash, - origin = ?self.origin, - "Erasure chunk stored successfully", - ); break } if succeeded { diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 8deae3dc3e5db..a652f54cecaf3 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -170,11 +170,10 @@ impl Requester { let cores = recv_runtime(request_availability_cores(leaf, sender).await).await?; let backable = request_backable_candidates(&cores, None, &new_head, sender).await?; - - gum::info!( + gum::trace!( target: LOG_TARGET, - backable = ?backable, - "Backable candidates" + leaf_hash=?leaf, + "Got {} backable candidates", backable.len() ); // now get the backed candidates corresponding to these candidate receipts @@ -183,31 +182,15 @@ impl Requester { backable.clone(), tx, )).await; - let candidates = rx.await?; - gum::info!( - target: LOG_TARGET, - leaf_hash=?leaf, - candidates=?candidates, - "Got {} backed candidates", candidates.len() - ); // Process candidates and collect cores - for (para_id, candidates) in candidates.iter() { + for (_, candidates) in candidates.iter() { for candidate in candidates { let (_, core_index) = candidate.validator_indices_and_core_index(); let Some(core_index) = core_index else { continue }; let receipt = candidate.candidate(); - - gum::info!( - target: LOG_TARGET, - para_id = ?para_id, - candidate_hash = ?receipt.hash(), - core_index = ?core_index, - "Scheduling candidate for core" - ); - let core = ( core_index, CoreInfo { @@ -218,11 +201,6 @@ impl Requester { origin: CoreInfoOrigin::Scheduled, }, ); - gum::info!( - target: LOG_TARGET, - ?core, - "Scheduled core info" - ); scheduled_cores.push(core); } } @@ -262,16 +240,7 @@ impl Requester { let cores = cores .into_iter() .map(|(index, occ)| { - gum::info!( - target: LOG_TARGET, - index = ?index, - candidate_hash = ?occ.candidate_hash, - next_up_on_available = ?occ.next_up_on_available, - leaf = ?leaf, - "Scheduled parachain candidate" - ); - - let core = ( + ( index, CoreInfo { candidate_hash: occ.candidate_hash, @@ -280,13 +249,7 @@ impl Requester { group_responsible: occ.group_responsible, origin: CoreInfoOrigin::Occupied, }, - ); - gum::info!( - target: LOG_TARGET, - ?core, - "Occupied core info" - ); - core + ) }) .collect::>(); @@ -303,10 +266,10 @@ impl Requester { if self.speculative_availability { let scheduled_cores = self.request_backable_candidates_core_info(ctx, new_head).await?; if scheduled_cores.len() > 0 { - gum::info!( + gum::trace!( target: LOG_TARGET, ?scheduled_cores, - "Adding scheduled cores" + "Query scheduled cores" ); let candidate_hashes = scheduled_cores.iter().map(|(_, core_info)| core_info.candidate_hash).collect::>(); @@ -346,11 +309,8 @@ impl Requester { AvailabilityStoreMessage::NoteBackableCandidates{ candidate_hashes: candidate_hashes, num_validators, tx }, ) .await; - if let Err(err) = rx.await { - gum::error!(target: LOG_TARGET, "Sending NoteBackableCandidate message failed: {:?}", err); - } else { - self.add_cores(ctx, runtime, leaf, leaf_session_index, scheduled_cores).await?; - } + rx.await?.map_err(|_| Error::AvailabilityStore)?; + self.add_cores(ctx, runtime, leaf, leaf_session_index, scheduled_cores).await?; } } } @@ -360,17 +320,9 @@ impl Requester { /// Stop requesting chunks for obsolete heads. fn stop_requesting_chunks(&mut self, obsolete_leaves: impl Iterator) { let obsolete_leaves: HashSet<_> = obsolete_leaves.collect(); - self.fetches.retain(|candidate_hash, task| { + self.fetches.retain(|_, task| { task.remove_leaves(&obsolete_leaves); - let is_live = task.is_live(); - if !is_live { - gum::info!( - target: LOG_TARGET, - ?candidate_hash, - "Removing fetch task as no longer live" - ); - } - is_live + task.is_live() }) } @@ -466,11 +418,6 @@ impl Stream for Requester { Poll::Ready(Some(FromFetchTask::Concluded(None))) => continue, Poll::Ready(Some(FromFetchTask::Failed(candidate_hash))) => { // Make sure we retry on next block still pending availability. - gum::info!( - target: LOG_TARGET, - ?candidate_hash, - "Fetch task failed, removing from active fetches to allow retrying" - ); self.fetches.remove(&candidate_hash); }, Poll::Ready(None) => return Poll::Ready(None), @@ -553,12 +500,11 @@ where Ok(ancestors) } -// Assuming you have the backed_candidate and relay_parent +/// Request group index assuming you have the core index and relay parent async fn get_group_index_for_backed_candidate( sender: &mut Sender, core_index: CoreIndex, relay_parent: Hash, - ) -> Result where Sender: overseer::SubsystemSender + overseer::SubsystemSender, diff --git a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs index b2dae2561bf4e..10dfde059685e 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs @@ -1,7 +1,8 @@ // Copyright (C) Parity Technologies (UK) Ltd. // SPDX-License-Identifier: Apache-2.0 -// Test we are producing 12-second parachain blocks if using an old collator, pre async-backing. +// Test we are producing 12-second parachain blocks if using 2x async-backing parachains, +// ensure that chunk fetch requests are being made speculatively. use anyhow::anyhow; From e73deba3cd15693ac6f459b80a317f79564de7d3 Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Fri, 5 Dec 2025 15:39:06 -0500 Subject: [PATCH 12/16] fix zn test --- .../tests/functional/speculative_availability_requests.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs index 10dfde059685e..8db06200167cd 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs @@ -75,8 +75,6 @@ async fn speculative_availability_requests_test() -> Result<(), anyhow::Error> { &relay_client, 15, [(ParaId::from(2000), 11..16), (ParaId::from(2001), 11..16)] - .into_iter() - .collect(), ) .await?; From 612d5d2148f2e3cfc79ba3c601053928e600b7f1 Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Fri, 5 Dec 2025 15:57:31 -0500 Subject: [PATCH 13/16] fix benchmark --- polkadot/node/subsystem-bench/src/lib/availability/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs index b346f988a3c90..be4a7db2f7ab3 100644 --- a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs @@ -268,6 +268,7 @@ pub fn prepare_test( }, state.req_protocol_names.clone(), Metrics::try_register(&dependencies.registry).unwrap(), + false, // Disable speculative availability for this benchmark ); let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() }; From 91c62dd00ae0ea0c76d0e575ead5e00ff8776f1a Mon Sep 17 00:00:00 2001 From: Timothy Wu Date: Sat, 6 Dec 2025 00:32:48 -0500 Subject: [PATCH 14/16] cargo fmt --- polkadot/node/core/av-store/src/lib.rs | 10 ++- polkadot/node/core/provisioner/src/lib.rs | 21 +++--- .../availability-distribution/src/lib.rs | 6 +- .../src/requester/fetch_task/mod.rs | 35 ++++++---- .../src/requester/mod.rs | 65 +++++++++++-------- .../src/lib/availability/mod.rs | 2 +- polkadot/node/subsystem-types/src/messages.rs | 7 +- polkadot/node/subsystem-util/src/lib.rs | 5 +- .../async_backing_6_seconds_rate.rs | 14 ++-- .../tests/functional/mod.rs | 2 +- .../speculative_availability_requests.rs | 29 ++++++--- 11 files changed, 119 insertions(+), 77 deletions(-) diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 1b60c768dd8ce..311f6a84c8784 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -1232,7 +1232,11 @@ fn process_message( }, } }, - AvailabilityStoreMessage::NoteBackableCandidates{ candidate_hashes, num_validators, tx } => { + AvailabilityStoreMessage::NoteBackableCandidates { + candidate_hashes, + num_validators, + tx, + } => { for candidate_hash in candidate_hashes { let res = note_backable_candidate( &subsystem.db, @@ -1280,7 +1284,7 @@ fn store_chunk( write_chunk(&mut tx, config, &candidate_hash, validator_index, &chunk); write_meta(&mut tx, config, &candidate_hash, &meta); }, - None => return Ok(false) // out of bounds. + None => return Ok(false), // out of bounds. } gum::debug!( @@ -1451,4 +1455,4 @@ fn note_backable_candidate( db.write(tx)?; Ok(()) -} \ No newline at end of file +} diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 273c382ac799d..5e6bff67caed5 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -29,23 +29,22 @@ use futures::{ use futures_timer::Delay; use polkadot_node_subsystem::{ messages::{ - CandidateBackingMessage, ChainApiMessage, - ProvisionableData, ProvisionerInherentData, ProvisionerMessage, + CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData, + ProvisionerMessage, }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; -use polkadot_node_subsystem_util::{request_availability_cores, TimeoutExt, request_backable_candidates, select_availability_bitfields}; +use polkadot_node_subsystem_util::{ + request_availability_cores, request_backable_candidates, select_availability_bitfields, + TimeoutExt, +}; use polkadot_primitives::{ - BackedCandidate, CandidateEvent, CoreState, Hash, - SignedAvailabilityBitfield, + BackedCandidate, CandidateEvent, CoreState, Hash, SignedAvailabilityBitfield, }; use sc_consensus_slots::time_until_next_slot; use schnellru::{ByLength, LruMap}; -use std::{ - collections::HashMap, - time::Duration, -}; +use std::{collections::HashMap, time::Duration}; mod disputes; mod error; mod metrics; @@ -280,7 +279,7 @@ async fn handle_active_leaves_update( let Some(inherent) = inherents.get(&header.parent_hash) else { return Ok(()) }; let diff = inherent.backed_candidates.len() as isize - in_block_count; - gum::debug!(target: LOG_TARGET, + gum::debug!(target: LOG_TARGET, ?diff, ?in_block_count, local_count = ?inherent.backed_candidates.len(), @@ -572,4 +571,4 @@ async fn select_candidates( ); Ok(merged_candidates) -} \ No newline at end of file +} diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 25e670e88c6ac..e5a220b8d3447 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -104,14 +104,16 @@ impl AvailabilityDistributionSubsystem { /// Start processing work as passed on from the Overseer. async fn run(self, mut ctx: Context) -> std::result::Result<(), FatalError> { - let Self { mut runtime, recvs, metrics, req_protocol_names, speculative_availability } = self; + let Self { mut runtime, recvs, metrics, req_protocol_names, speculative_availability } = + self; let IncomingRequestReceivers { pov_req_receiver, chunk_req_v1_receiver, chunk_req_v2_receiver, } = recvs; - let mut requester = Requester::new(req_protocol_names, metrics.clone(), speculative_availability).fuse(); + let mut requester = + Requester::new(req_protocol_names, metrics.clone(), speculative_availability).fuse(); let mut warn_freq = gum::Freq::new(); { diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 7f726e3d8bcdd..82a37eab82ca9 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -42,7 +42,7 @@ use sc_network::ProtocolName; use crate::{ error::{FatalError, Result}, - metrics::{Metrics, FAILED, SUCCEEDED, SCHEDULED, OCCUPIED}, + metrics::{Metrics, FAILED, OCCUPIED, SCHEDULED, SUCCEEDED}, requester::{ session_cache::{BadValidators, SessionInfo}, CoreInfo, CoreInfoOrigin, @@ -290,10 +290,13 @@ impl RunningTask { target: LOG_TARGET, "Node seems to be shutting down, canceling fetch task" ); - self.metrics.on_fetch(FAILED, match self.origin { - CoreInfoOrigin::Scheduled => SCHEDULED, - CoreInfoOrigin::Occupied => OCCUPIED, - }); + self.metrics.on_fetch( + FAILED, + match self.origin { + CoreInfoOrigin::Scheduled => SCHEDULED, + CoreInfoOrigin::Occupied => OCCUPIED, + }, + ); return }, Err(TaskError::PeerError) => { @@ -333,16 +336,22 @@ impl RunningTask { break } if succeeded { - self.metrics.on_fetch(SUCCEEDED, match self.origin { - CoreInfoOrigin::Scheduled => SCHEDULED, - CoreInfoOrigin::Occupied => OCCUPIED, - }); + self.metrics.on_fetch( + SUCCEEDED, + match self.origin { + CoreInfoOrigin::Scheduled => SCHEDULED, + CoreInfoOrigin::Occupied => OCCUPIED, + }, + ); self.conclude(bad_validators).await; } else { - self.metrics.on_fetch(FAILED, match self.origin { - CoreInfoOrigin::Scheduled => SCHEDULED, - CoreInfoOrigin::Occupied => OCCUPIED, - }); + self.metrics.on_fetch( + FAILED, + match self.origin { + CoreInfoOrigin::Scheduled => SCHEDULED, + CoreInfoOrigin::Occupied => OCCUPIED, + }, + ); self.conclude_fail().await } } diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index a652f54cecaf3..02a3ec219d529 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -31,21 +31,19 @@ use futures::{ use polkadot_node_network_protocol::request_response::{v1, v2, IsRequest, ReqProtocolNames}; use polkadot_node_subsystem::{ - messages::{ChainApiMessage, RuntimeApiMessage, CandidateBackingMessage, AvailabilityStoreMessage}, - overseer, ActivatedLeaf, ActiveLeavesUpdate, - SubsystemSender, + messages::{ + AvailabilityStoreMessage, CandidateBackingMessage, ChainApiMessage, RuntimeApiMessage, + }, + overseer, ActivatedLeaf, ActiveLeavesUpdate, SubsystemSender, }; use polkadot_node_subsystem_util::{ availability_chunks::availability_chunk_index, - runtime::{get_occupied_cores, RuntimeInfo}, - request_availability_cores, - request_backable_candidates, - request_validator_groups, - runtime::recv_runtime, + request_availability_cores, request_backable_candidates, request_validator_groups, + runtime::{get_occupied_cores, recv_runtime, RuntimeInfo}, }; use polkadot_primitives::{CandidateHash, CoreIndex, GroupIndex, Hash, SessionIndex}; -use super::{FatalError, Metrics, Result, LOG_TARGET, error::Error}; +use super::{error::Error, FatalError, Metrics, Result, LOG_TARGET}; #[cfg(test)] mod tests; @@ -82,7 +80,7 @@ struct CoreInfo { enum CoreInfoOrigin { Occupied, Scheduled, -} +} /// Requester takes care of requesting erasure chunks from backing groups and stores them in the /// av store. @@ -125,7 +123,11 @@ impl Requester { /// /// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress /// by advancing the stream. - pub fn new(req_protocol_names: ReqProtocolNames, metrics: Metrics, speculative_availability: bool) -> Self { + pub fn new( + req_protocol_names: ReqProtocolNames, + metrics: Metrics, + speculative_availability: bool, + ) -> Self { let (tx, rx) = mpsc::channel(1); Requester { fetches: HashMap::new(), @@ -165,7 +167,7 @@ impl Requester { new_head: ActivatedLeaf, ) -> Result> { let ActivatedLeaf { hash: leaf, .. } = new_head; - let mut scheduled_cores = Vec::new(); + let mut scheduled_cores = Vec::new(); let sender = &mut ctx.sender().clone(); let cores = recv_runtime(request_availability_cores(leaf, sender).await).await?; @@ -178,10 +180,9 @@ impl Requester { // now get the backed candidates corresponding to these candidate receipts let (tx, rx) = oneshot::channel(); - sender.send_message(CandidateBackingMessage::GetBackableCandidates( - backable.clone(), - tx, - )).await; + sender + .send_message(CandidateBackingMessage::GetBackableCandidates(backable.clone(), tx)) + .await; let candidates = rx.await?; // Process candidates and collect cores @@ -197,7 +198,10 @@ impl Requester { candidate_hash: receipt.hash(), relay_parent: receipt.descriptor.relay_parent(), erasure_root: receipt.descriptor.erasure_root(), - group_responsible: get_group_index_for_backed_candidate(sender, core_index, leaf).await?, + group_responsible: get_group_index_for_backed_candidate( + sender, core_index, leaf, + ) + .await?, origin: CoreInfoOrigin::Scheduled, }, ); @@ -272,8 +276,11 @@ impl Requester { "Query scheduled cores" ); - let candidate_hashes = scheduled_cores.iter().map(|(_, core_info)| core_info.candidate_hash).collect::>(); - + let candidate_hashes = scheduled_cores + .iter() + .map(|(_, core_info)| core_info.candidate_hash) + .collect::>(); + let session_info = self .session_cache .get_session_info( @@ -302,12 +309,14 @@ impl Requester { acc = acc.saturating_add(group.len()); acc }); - + let (tx, rx) = oneshot::channel(); sender - .send_message( - AvailabilityStoreMessage::NoteBackableCandidates{ candidate_hashes: candidate_hashes, num_validators, tx }, - ) + .send_message(AvailabilityStoreMessage::NoteBackableCandidates { + candidate_hashes, + num_validators, + tx, + }) .await; rx.await?.map_err(|_| Error::AvailabilityStore)?; self.add_cores(ctx, runtime, leaf, leaf_session_index, scheduled_cores).await?; @@ -504,12 +513,14 @@ where async fn get_group_index_for_backed_candidate( sender: &mut Sender, core_index: CoreIndex, - relay_parent: Hash, + relay_parent: Hash, ) -> Result where - Sender: overseer::SubsystemSender + overseer::SubsystemSender, + Sender: + overseer::SubsystemSender + overseer::SubsystemSender, { - let (validator_groups, mut rotation_info) = recv_runtime(request_validator_groups(relay_parent, sender).await).await?; + let (validator_groups, mut rotation_info) = + recv_runtime(request_validator_groups(relay_parent, sender).await).await?; // Get the block number for the relay_parent to update rotation_info.now let (tx, rx) = oneshot::channel(); @@ -520,4 +531,4 @@ where rotation_info.now = block_number.into(); // Update now to the relay_parent's block number Ok(rotation_info.group_for_core(core_index, validator_groups.len())) -} \ No newline at end of file +} diff --git a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs index be4a7db2f7ab3..ce0070ccf802f 100644 --- a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs @@ -268,7 +268,7 @@ pub fn prepare_test( }, state.req_protocol_names.clone(), Metrics::try_register(&dependencies.registry).unwrap(), - false, // Disable speculative availability for this benchmark + false, // Disable speculative availability for this benchmark ); let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() }; diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index ecffceecf5604..21e8621e8d7d1 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -572,9 +572,10 @@ pub enum AvailabilityStoreMessage { tx: oneshot::Sender>, }, - /// Note that a candidate is backable. Only used by Speculative Availability to signal to Availability Store - /// that it should keep track of this candidate and it is highly likely the candidate will be backed. - NoteBackableCandidates{ + /// Note that a candidate is backable. Only used by Speculative Availability to signal to + /// Availability Store that it should keep track of this candidate and it is highly likely the + /// candidate will be backed. + NoteBackableCandidates { /// A hash of the backable candidate. candidate_hashes: HashSet, /// The number of validators in the session. diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 74a497aef3b0b..fa9bdd6d91f63 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -40,8 +40,7 @@ pub use polkadot_node_metrics::{metrics, Metronome}; use codec::Encode; use futures::channel::{mpsc, oneshot}; -use bitvec::vec::BitVec; -use bitvec; +use bitvec::{self, vec::BitVec}; use polkadot_primitives::{ async_backing::{BackingState, Constraints}, @@ -794,4 +793,4 @@ async fn get_backable_candidates( .await; rx.await.map_err(Error::CanceledBackableCandidates) -} \ No newline at end of file +} diff --git a/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs b/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs index 6d0c213db33b4..ac61c29f5e273 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs @@ -82,13 +82,19 @@ async fn async_backing_6_seconds_rate_test() -> Result<(), anyhow::Error> { ) .await?; - let scheduled_metric_name = "polkadot_parachain_fetched_chunks_total{origin=\"scheduled\",success=\"succeeded\"}"; + let scheduled_metric_name = + "polkadot_parachain_fetched_chunks_total{origin=\"scheduled\",success=\"succeeded\"}"; // scheduled chunk fetches should be 0 since speculative availability is disabled - assert!(relay_node.assert_with(scheduled_metric_name, |v| {v == 0.0}).await?); + assert!(relay_node.assert_with(scheduled_metric_name, |v| { v == 0.0 }).await?); - let occupied_metric_name = "polkadot_parachain_fetched_chunks_total{origin=\"occupied\",success=\"succeeded\"}"; + let occupied_metric_name = + "polkadot_parachain_fetched_chunks_total{origin=\"occupied\",success=\"succeeded\"}"; // all requests should be occupied since speculative availability is disabled - assert!(relay_node.assert_with(occupied_metric_name, |v| {v >= 22.0 && v <= 40.0}).await?); + assert!( + relay_node + .assert_with(occupied_metric_name, |v| { v >= 22.0 && v <= 40.0 }) + .await? + ); // Assert the parachain finalized block height is also on par with the number of backed // candidates. We can only do this for the collator based on cumulus. diff --git a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs index f0b1755cc0f02..0ba0c07d24b4b 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs @@ -6,8 +6,8 @@ mod approved_peer_mixed_validators; mod async_backing_6_seconds_rate; mod dispute_old_finalized; mod duplicate_collations; -mod speculative_availability_requests; mod shared_core_idle_parachain; mod spam_statement_distribution_requests; +mod speculative_availability_requests; mod sync_backing; mod validator_disabling; diff --git a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs index 8db06200167cd..c55f60fbb6c24 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs @@ -74,18 +74,29 @@ async fn speculative_availability_requests_test() -> Result<(), anyhow::Error> { assert_para_throughput( &relay_client, 15, - [(ParaId::from(2000), 11..16), (ParaId::from(2001), 11..16)] + [(ParaId::from(2000), 11..16), (ParaId::from(2001), 11..16)], ) .await?; - let scheduled_metric_name = "polkadot_parachain_fetched_chunks_total{origin=\"scheduled\",success=\"succeeded\"}"; - // scheduled chunk fetches should be more than the asserted para throughput given blocks are still produced - assert!(relay_node.assert_with(scheduled_metric_name, |v| {v >= 22.0 && v <= 40.0}).await?); - - let occupied_metric_name = "polkadot_parachain_fetched_chunks_total{origin=\"occupied\",success=\"succeeded\"}"; - // given when speculative availability is requested on active leaves update, there may not be any backable - // candidates from Prospective Parachains at that the time. - assert!(relay_node.assert_with(occupied_metric_name, |v| {v >= 2.0 && v <= 10.0}).await?); + let scheduled_metric_name = + "polkadot_parachain_fetched_chunks_total{origin=\"scheduled\",success=\"succeeded\"}"; + // scheduled chunk fetches should be more than the asserted para throughput given blocks are + // still produced + assert!( + relay_node + .assert_with(scheduled_metric_name, |v| { v >= 22.0 && v <= 40.0 }) + .await? + ); + + let occupied_metric_name = + "polkadot_parachain_fetched_chunks_total{origin=\"occupied\",success=\"succeeded\"}"; + // given when speculative availability is requested on active leaves update, there may not be + // any backable candidates from Prospective Parachains at that the time. + assert!( + relay_node + .assert_with(occupied_metric_name, |v| { v >= 2.0 && v <= 10.0 }) + .await? + ); // Assert the parachain finalized block height is also on par with the number of backed // candidates. We can only do this for the collator based on cumulus. From 59ce4bbe21dfb275cb81b425e4763a1ee4d4432a Mon Sep 17 00:00:00 2001 From: Tim Wu Date: Sat, 6 Dec 2025 06:44:42 +0000 Subject: [PATCH 15/16] fix polkadot_service::NewFullParams --- polkadot/node/test/service/src/lib.rs | 1 + polkadot/parachain/test-parachains/adder/collator/src/main.rs | 1 + polkadot/parachain/test-parachains/undying/collator/src/main.rs | 1 + 3 files changed, 3 insertions(+) diff --git a/polkadot/node/test/service/src/lib.rs b/polkadot/node/test/service/src/lib.rs index fc035174fe039..477e9825efb35 100644 --- a/polkadot/node/test/service/src/lib.rs +++ b/polkadot/node/test/service/src/lib.rs @@ -102,6 +102,7 @@ pub fn new_full( keep_finalized_for: None, invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, + speculative_availability: false, }; match config.network.network_backend { diff --git a/polkadot/parachain/test-parachains/adder/collator/src/main.rs b/polkadot/parachain/test-parachains/adder/collator/src/main.rs index f6ed513c76a33..471bec24f4916 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/main.rs @@ -101,6 +101,7 @@ fn main() -> Result<()> { keep_finalized_for: None, invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, + speculative_availability: false, }, ) .map_err(|e| e.to_string())?; diff --git a/polkadot/parachain/test-parachains/undying/collator/src/main.rs b/polkadot/parachain/test-parachains/undying/collator/src/main.rs index 50392bce7d4f1..edfe1cf0a256d 100644 --- a/polkadot/parachain/test-parachains/undying/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/undying/collator/src/main.rs @@ -113,6 +113,7 @@ fn main() -> Result<()> { keep_finalized_for: None, invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, + speculative_availability: false, }, ) .map_err(|e| e.to_string())?; From da460f72e2f186a1a437d33b1d2bf553dddc07e1 Mon Sep 17 00:00:00 2001 From: Tim Wu Date: Sat, 6 Dec 2025 17:38:23 +0000 Subject: [PATCH 16/16] fix provisioner and availability-distribution tests --- Cargo.lock | 1 + polkadot/node/core/provisioner/Cargo.toml | 1 + polkadot/node/core/provisioner/src/tests.rs | 19 +++++++++++++------ .../src/requester/fetch_task/tests.rs | 1 + .../src/requester/tests.rs | 14 ++++++++++---- .../src/tests/mod.rs | 1 + 6 files changed, 27 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 82c4e34dcf4cb..f3d5136d3dced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15756,6 +15756,7 @@ dependencies = [ name = "polkadot-node-core-provisioner" version = "7.0.0" dependencies = [ + "bitvec", "fatality", "futures", "futures-timer", diff --git a/polkadot/node/core/provisioner/Cargo.toml b/polkadot/node/core/provisioner/Cargo.toml index fedf55081cfee..88d5bb7e844f3 100644 --- a/polkadot/node/core/provisioner/Cargo.toml +++ b/polkadot/node/core/provisioner/Cargo.toml @@ -25,6 +25,7 @@ schnellru = { workspace = true } thiserror = { workspace = true } [dev-dependencies] +bitvec = { features = ["alloc"], workspace = true } polkadot-node-subsystem-test-helpers = { workspace = true } polkadot-primitives = { workspace = true, features = ["test"] } polkadot-primitives-test-helpers = { workspace = true } diff --git a/polkadot/node/core/provisioner/src/tests.rs b/polkadot/node/core/provisioner/src/tests.rs index b965d3346d87d..3cb0aa2f32856 100644 --- a/polkadot/node/core/provisioner/src/tests.rs +++ b/polkadot/node/core/provisioner/src/tests.rs @@ -16,7 +16,11 @@ use super::*; use bitvec::bitvec; -use polkadot_primitives::{MutateDescriptorV2, OccupiedCore, ScheduledCore}; +use polkadot_node_subsystem::messages::{Ancestors, ProspectiveParachainsMessage}; +use polkadot_node_subsystem_util::CoreAvailability; +use polkadot_primitives::{ + CandidateHash, CoreIndex, MutateDescriptorV2, OccupiedCore, ScheduledCore, +}; use polkadot_primitives_test_helpers::{dummy_candidate_descriptor_v2, dummy_hash}; const MOCK_GROUP_SIZE: usize = 5; @@ -61,7 +65,10 @@ pub fn scheduled_core(id: u32) -> ScheduledCore { mod select_availability_bitfields { use super::{super::*, default_bitvec, occupied_core}; - use polkadot_primitives::{ScheduledCore, SigningContext, ValidatorId, ValidatorIndex}; + use polkadot_node_subsystem_util::CoreAvailability; + use polkadot_primitives::{ + CandidateHash, CoreIndex, ScheduledCore, SigningContext, ValidatorId, ValidatorIndex, + }; use sp_application_crypto::AppCrypto; use sp_keystore::{testing::MemoryKeystore, Keystore, KeystorePtr}; use std::sync::Arc; @@ -246,16 +253,16 @@ mod select_candidates { }; use futures::channel::mpsc; use polkadot_node_subsystem::messages::{ - AllMessages, RuntimeApiMessage, + AllMessages, Ancestors, ProspectiveParachainsMessage, RuntimeApiMessage, RuntimeApiRequest::{ AvailabilityCores, PersistedValidationData as PersistedValidationDataReq, }, }; use polkadot_node_subsystem_test_helpers::{mock::new_leaf, TestSubsystemSender}; use polkadot_primitives::{ - BlockNumber, CandidateCommitments, CandidateReceiptV2 as CandidateReceipt, - CommittedCandidateReceiptV2 as CommittedCandidateReceipt, MutateDescriptorV2, - PersistedValidationData, + BlockNumber, CandidateCommitments, CandidateHash, CandidateReceiptV2 as CandidateReceipt, + CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, Id as ParaId, + MutateDescriptorV2, PersistedValidationData, }; use polkadot_primitives_test_helpers::{dummy_candidate_descriptor_v2, dummy_hash}; use std::ops::Not; diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs index 9d4ac5bc4b1b9..9f6ec3afd657b 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs @@ -368,6 +368,7 @@ fn get_test_running_task( req_v1_protocol_name: req_protocol_names.get_name(Protocol::ChunkFetchingV1), req_v2_protocol_name: req_protocol_names.get_name(Protocol::ChunkFetchingV2), chunk_index, + origin: CoreInfoOrigin::Occupied, }, rx, ) diff --git a/polkadot/node/network/availability-distribution/src/requester/tests.rs b/polkadot/node/network/availability-distribution/src/requester/tests.rs index 021f6da7e2e99..77d68cb1b8385 100644 --- a/polkadot/node/network/availability-distribution/src/requester/tests.rs +++ b/polkadot/node/network/availability-distribution/src/requester/tests.rs @@ -201,8 +201,11 @@ fn test_harness>( #[test] fn check_ancestry_lookup_in_same_session() { let test_state = TestState::new(); - let mut requester = - Requester::new(ReqProtocolNames::new(&Hash::repeat_byte(0xff), None), Default::default()); + let mut requester = Requester::new( + ReqProtocolNames::new(&Hash::repeat_byte(0xff), None), + Default::default(), + false, + ); let keystore = make_ferdie_keystore(); let mut runtime = RuntimeInfo::new(Some(keystore)); @@ -268,8 +271,11 @@ fn check_ancestry_lookup_in_same_session() { #[test] fn check_ancestry_lookup_in_different_sessions() { let mut test_state = TestState::new(); - let mut requester = - Requester::new(ReqProtocolNames::new(&Hash::repeat_byte(0xff), None), Default::default()); + let mut requester = Requester::new( + ReqProtocolNames::new(&Hash::repeat_byte(0xff), None), + Default::default(), + false, + ); let keystore = make_ferdie_keystore(); let mut runtime = RuntimeInfo::new(Some(keystore)); diff --git a/polkadot/node/network/availability-distribution/src/tests/mod.rs b/polkadot/node/network/availability-distribution/src/tests/mod.rs index 078220607c37f..44c1d6b52cc85 100644 --- a/polkadot/node/network/availability-distribution/src/tests/mod.rs +++ b/polkadot/node/network/availability-distribution/src/tests/mod.rs @@ -62,6 +62,7 @@ fn test_harness>( IncomingRequestReceivers { pov_req_receiver, chunk_req_v1_receiver, chunk_req_v2_receiver }, req_protocol_names, Default::default(), + false, ); let subsystem = subsystem.run(context);