diff --git a/Cargo.lock b/Cargo.lock index d129f4eea452c..f3d5136d3dced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16054,6 +16054,7 @@ name = "polkadot-node-subsystem-util" version = "7.0.0" dependencies = [ "assert_matches", + "bitvec", "fatality", "futures", "itertools 0.11.0", diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index b989f81efd5dc..37c9a4a2b545a 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -419,6 +419,7 @@ fn build_polkadot_full_node( keep_finalized_for: None, invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, + speculative_availability: false, }; let (relay_chain_full_node, paranode_req_receiver) = match config.network.network_backend { diff --git a/polkadot/cli/src/cli.rs b/polkadot/cli/src/cli.rs index fa8595cc7c57c..f0b6b26d7a40b 100644 --- a/polkadot/cli/src/cli.rs +++ b/polkadot/cli/src/cli.rs @@ -166,6 +166,10 @@ pub struct RunCmd { /// **Dangerous!** Do not touch unless explicitly advised to. #[arg(long, hide = true)] pub collator_protocol_hold_off: Option, + + /// Enable speculative availability requests via Prospective Parachains. Defaults to disabled. + #[arg(long = "speculative-availability")] + pub speculative_availability: bool, } #[allow(missing_docs)] diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index b9366d07f6c54..5e43d020d6a47 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -292,6 +292,7 @@ where keep_finalized_for: cli.run.keep_finalized_for, invulnerable_ah_collators, collator_protocol_hold_off, + speculative_availability: cli.run.speculative_availability, }, ) .map(|full| full.task_manager)?; diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 9fd221ebee979..311f6a84c8784 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -1232,6 +1232,30 @@ fn process_message( }, } }, + AvailabilityStoreMessage::NoteBackableCandidates { + candidate_hashes, + num_validators, + tx, + } => { + for candidate_hash in candidate_hashes { + let res = note_backable_candidate( + &subsystem.db, + &subsystem.config, + candidate_hash, + num_validators, + subsystem, + ); + + match res { + Ok(_) => {}, + Err(e) => { + let _ = tx.send(Err(())); + return Err(e) + }, + } + } + let _ = tx.send(Ok(())); + }, } Ok(()) @@ -1405,3 +1429,30 @@ fn prune_all(db: &Arc, config: &Config, now: Duration) -> Result<( db.write(tx)?; Ok(()) } + +fn note_backable_candidate( + db: &Arc, + config: &Config, + candidate_hash: CandidateHash, + num_validators: usize, + subsystem: &AvailabilityStoreSubsystem, +) -> Result<(), Error> { + let mut tx = DBTransaction::new(); + + if load_meta(db, config, &candidate_hash)?.is_none() { + let now = subsystem.clock.now()?; + let meta = CandidateMeta { + state: State::Unavailable(now.into()), + data_available: false, + chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; num_validators], + }; + + let prune_at = now + KEEP_UNAVAILABLE_FOR; + + write_pruning_key(&mut tx, config, prune_at, &candidate_hash); + write_meta(&mut tx, config, &candidate_hash, &meta); + } + + db.write(tx)?; + Ok(()) +} diff --git a/polkadot/node/core/provisioner/Cargo.toml b/polkadot/node/core/provisioner/Cargo.toml index 91bac900dc0fc..88d5bb7e844f3 100644 --- a/polkadot/node/core/provisioner/Cargo.toml +++ b/polkadot/node/core/provisioner/Cargo.toml @@ -12,7 +12,6 @@ repository.workspace = true workspace = true [dependencies] -bitvec = { features = ["alloc"], workspace = true } fatality = { workspace = true } futures = { workspace = true } futures-timer = { workspace = true } @@ -26,6 +25,7 @@ schnellru = { workspace = true } thiserror = { workspace = true } [dev-dependencies] +bitvec = { features = ["alloc"], workspace = true } polkadot-node-subsystem-test-helpers = { workspace = true } polkadot-primitives = { workspace = true, features = ["test"] } polkadot-primitives-test-helpers = { workspace = true } diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 29e8eb34b3959..5e6bff67caed5 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -19,7 +19,6 @@ #![deny(missing_docs, unused_crate_dependencies)] -use bitvec::vec::BitVec; use futures::{ channel::oneshot::{self, Canceled}, future::BoxFuture, @@ -30,23 +29,22 @@ use futures::{ use futures_timer::Delay; use polkadot_node_subsystem::{ messages::{ - Ancestors, CandidateBackingMessage, ChainApiMessage, ProspectiveParachainsMessage, - ProvisionableData, ProvisionerInherentData, ProvisionerMessage, + CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData, + ProvisionerMessage, }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; -use polkadot_node_subsystem_util::{request_availability_cores, TimeoutExt}; +use polkadot_node_subsystem_util::{ + request_availability_cores, request_backable_candidates, select_availability_bitfields, + TimeoutExt, +}; use polkadot_primitives::{ - BackedCandidate, CandidateEvent, CandidateHash, CoreIndex, CoreState, Hash, Id as ParaId, - SignedAvailabilityBitfield, ValidatorIndex, + BackedCandidate, CandidateEvent, CoreState, Hash, SignedAvailabilityBitfield, }; use sc_consensus_slots::time_until_next_slot; use schnellru::{ByLength, LruMap}; -use std::{ - collections::{BTreeMap, HashMap}, - time::Duration, -}; +use std::{collections::HashMap, time::Duration}; mod disputes; mod error; mod metrics; @@ -281,7 +279,7 @@ async fn handle_active_leaves_update( let Some(inherent) = inherents.get(&header.parent_hash) else { return Ok(()) }; let diff = inherent.backed_candidates.len() as isize - in_block_count; - gum::debug!(target: LOG_TARGET, + gum::debug!(target: LOG_TARGET, ?diff, ?in_block_count, local_count = ?inherent.backed_candidates.len(), @@ -421,8 +419,6 @@ fn note_provisionable_data( } } -type CoreAvailability = BitVec; - /// The provisioner is the subsystem best suited to choosing which specific /// backed candidates and availability bitfields should be assembled into the /// block. To engage this functionality, a @@ -515,167 +511,6 @@ async fn send_inherent_data( Ok(()) } -/// In general, we want to pick all the bitfields. However, we have the following constraints: -/// -/// - not more than one per validator -/// - each 1 bit must correspond to an occupied core -/// -/// If we have too many, an arbitrary selection policy is fine. For purposes of maximizing -/// availability, we pick the one with the greatest number of 1 bits. -/// -/// Note: This does not enforce any sorting precondition on the output; the ordering there will be -/// unrelated to the sorting of the input. -fn select_availability_bitfields( - cores: &[CoreState], - bitfields: &[SignedAvailabilityBitfield], - leaf_hash: &Hash, -) -> Vec { - let mut selected: BTreeMap = BTreeMap::new(); - - gum::debug!( - target: LOG_TARGET, - bitfields_count = bitfields.len(), - ?leaf_hash, - "bitfields count before selection" - ); - - 'a: for bitfield in bitfields.iter().cloned() { - if bitfield.payload().0.len() != cores.len() { - gum::debug!(target: LOG_TARGET, ?leaf_hash, "dropping bitfield due to length mismatch"); - continue - } - - let is_better = selected - .get(&bitfield.validator_index()) - .map_or(true, |b| b.payload().0.count_ones() < bitfield.payload().0.count_ones()); - - if !is_better { - gum::trace!( - target: LOG_TARGET, - val_idx = bitfield.validator_index().0, - ?leaf_hash, - "dropping bitfield due to duplication - the better one is kept" - ); - continue - } - - for (idx, _) in cores.iter().enumerate().filter(|v| !v.1.is_occupied()) { - // Bit is set for an unoccupied core - invalid - if *bitfield.payload().0.get(idx).as_deref().unwrap_or(&false) { - gum::debug!( - target: LOG_TARGET, - val_idx = bitfield.validator_index().0, - ?leaf_hash, - "dropping invalid bitfield - bit is set for an unoccupied core" - ); - continue 'a - } - } - - let _ = selected.insert(bitfield.validator_index(), bitfield); - } - - gum::debug!( - target: LOG_TARGET, - ?leaf_hash, - "selected {} of all {} bitfields (each bitfield is from a unique validator)", - selected.len(), - bitfields.len() - ); - - selected.into_values().collect() -} - -/// Requests backable candidates from Prospective Parachains subsystem -/// based on core states. -async fn request_backable_candidates( - availability_cores: &[CoreState], - bitfields: &[SignedAvailabilityBitfield], - relay_parent: &ActivatedLeaf, - sender: &mut impl overseer::ProvisionerSenderTrait, -) -> Result>, Error> { - let block_number_under_construction = relay_parent.number + 1; - - // Record how many cores are scheduled for each paraid. Use a BTreeMap because - // we'll need to iterate through them. - let mut scheduled_cores_per_para: BTreeMap = BTreeMap::new(); - // The on-chain ancestors of a para present in availability-cores. - let mut ancestors: HashMap = - HashMap::with_capacity(availability_cores.len()); - - for (core_idx, core) in availability_cores.iter().enumerate() { - let core_idx = CoreIndex(core_idx as u32); - match core { - CoreState::Scheduled(scheduled_core) => { - *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; - }, - CoreState::Occupied(occupied_core) => { - let is_available = bitfields_indicate_availability( - core_idx.0 as usize, - bitfields, - &occupied_core.availability, - ); - - if is_available { - ancestors - .entry(occupied_core.para_id()) - .or_default() - .insert(occupied_core.candidate_hash); - - if let Some(ref scheduled_core) = occupied_core.next_up_on_available { - // Request a new backable candidate for the newly scheduled para id. - *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; - } - } else if occupied_core.time_out_at <= block_number_under_construction { - // Timed out before being available. - - if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out { - // Candidate's availability timed out, practically same as scheduled. - *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; - } - } else { - // Not timed out and not available. - ancestors - .entry(occupied_core.para_id()) - .or_default() - .insert(occupied_core.candidate_hash); - } - }, - CoreState::Free => continue, - }; - } - - let mut selected_candidates: HashMap> = - HashMap::with_capacity(scheduled_cores_per_para.len()); - - for (para_id, core_count) in scheduled_cores_per_para { - let para_ancestors = ancestors.remove(¶_id).unwrap_or_default(); - - let response = get_backable_candidates( - relay_parent.hash, - para_id, - para_ancestors, - core_count as u32, - sender, - ) - .await?; - - if response.is_empty() { - gum::debug!( - target: LOG_TARGET, - leaf_hash = ?relay_parent.hash, - ?para_id, - "No backable candidate returned by prospective parachains", - ); - continue - } - - selected_candidates.insert(para_id, response); - } - - Ok(selected_candidates) -} - /// Determine which cores are free, and then to the degree possible, pick a candidate appropriate to /// each free core. async fn select_candidates( @@ -692,7 +527,7 @@ async fn select_candidates( ); let selected_candidates = - request_backable_candidates(availability_cores, bitfields, leaf, sender).await?; + request_backable_candidates(availability_cores, Some(bitfields), leaf, sender).await?; gum::debug!(target: LOG_TARGET, ?selected_candidates, "Got backable candidates"); // now get the backed candidates corresponding to these candidate receipts @@ -737,66 +572,3 @@ async fn select_candidates( Ok(merged_candidates) } - -/// Requests backable candidates from Prospective Parachains based on -/// the given ancestors in the fragment chain. The ancestors may not be ordered. -async fn get_backable_candidates( - relay_parent: Hash, - para_id: ParaId, - ancestors: Ancestors, - count: u32, - sender: &mut impl overseer::ProvisionerSenderTrait, -) -> Result, Error> { - let (tx, rx) = oneshot::channel(); - sender - .send_message(ProspectiveParachainsMessage::GetBackableCandidates( - relay_parent, - para_id, - count, - ancestors, - tx, - )) - .await; - - rx.await.map_err(Error::CanceledBackableCandidates) -} - -/// The availability bitfield for a given core is the transpose -/// of a set of signed availability bitfields. It goes like this: -/// -/// - construct a transverse slice along `core_idx` -/// - bitwise-or it with the availability slice -/// - count the 1 bits, compare to the total length; true on 2/3+ -fn bitfields_indicate_availability( - core_idx: usize, - bitfields: &[SignedAvailabilityBitfield], - availability: &CoreAvailability, -) -> bool { - let mut availability = availability.clone(); - let availability_len = availability.len(); - - for bitfield in bitfields { - let validator_idx = bitfield.validator_index().0 as usize; - match availability.get_mut(validator_idx) { - None => { - // in principle, this function might return a `Result` so that we can - // more clearly express this error condition however, in practice, that would just - // push off an error-handling routine which would look a whole lot like this one. - // simpler to just handle the error internally here. - gum::warn!( - target: LOG_TARGET, - validator_idx = %validator_idx, - availability_len = %availability_len, - "attempted to set a transverse bit at idx {} which is greater than bitfield size {}", - validator_idx, - availability_len, - ); - - return false - }, - Some(mut bit_mut) => *bit_mut |= bitfield.payload().0[core_idx], - } - } - - 3 * availability.count_ones() >= 2 * availability.len() -} diff --git a/polkadot/node/core/provisioner/src/tests.rs b/polkadot/node/core/provisioner/src/tests.rs index b965d3346d87d..3cb0aa2f32856 100644 --- a/polkadot/node/core/provisioner/src/tests.rs +++ b/polkadot/node/core/provisioner/src/tests.rs @@ -16,7 +16,11 @@ use super::*; use bitvec::bitvec; -use polkadot_primitives::{MutateDescriptorV2, OccupiedCore, ScheduledCore}; +use polkadot_node_subsystem::messages::{Ancestors, ProspectiveParachainsMessage}; +use polkadot_node_subsystem_util::CoreAvailability; +use polkadot_primitives::{ + CandidateHash, CoreIndex, MutateDescriptorV2, OccupiedCore, ScheduledCore, +}; use polkadot_primitives_test_helpers::{dummy_candidate_descriptor_v2, dummy_hash}; const MOCK_GROUP_SIZE: usize = 5; @@ -61,7 +65,10 @@ pub fn scheduled_core(id: u32) -> ScheduledCore { mod select_availability_bitfields { use super::{super::*, default_bitvec, occupied_core}; - use polkadot_primitives::{ScheduledCore, SigningContext, ValidatorId, ValidatorIndex}; + use polkadot_node_subsystem_util::CoreAvailability; + use polkadot_primitives::{ + CandidateHash, CoreIndex, ScheduledCore, SigningContext, ValidatorId, ValidatorIndex, + }; use sp_application_crypto::AppCrypto; use sp_keystore::{testing::MemoryKeystore, Keystore, KeystorePtr}; use std::sync::Arc; @@ -246,16 +253,16 @@ mod select_candidates { }; use futures::channel::mpsc; use polkadot_node_subsystem::messages::{ - AllMessages, RuntimeApiMessage, + AllMessages, Ancestors, ProspectiveParachainsMessage, RuntimeApiMessage, RuntimeApiRequest::{ AvailabilityCores, PersistedValidationData as PersistedValidationDataReq, }, }; use polkadot_node_subsystem_test_helpers::{mock::new_leaf, TestSubsystemSender}; use polkadot_primitives::{ - BlockNumber, CandidateCommitments, CandidateReceiptV2 as CandidateReceipt, - CommittedCandidateReceiptV2 as CommittedCandidateReceipt, MutateDescriptorV2, - PersistedValidationData, + BlockNumber, CandidateCommitments, CandidateHash, CandidateReceiptV2 as CandidateReceipt, + CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, Id as ParaId, + MutateDescriptorV2, PersistedValidationData, }; use polkadot_primitives_test_helpers::{dummy_candidate_descriptor_v2, dummy_hash}; use std::ops::Not; diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs index 852e7bbcbbbf3..d0808f6b0c229 100644 --- a/polkadot/node/network/availability-distribution/src/error.rs +++ b/polkadot/node/network/availability-distribution/src/error.rs @@ -88,6 +88,15 @@ pub enum Error { #[error("Erasure coding error: {0}")] ErasureCoding(#[from] polkadot_erasure_coding::Error), + + #[error("Expected a block header to be returned")] + NoSuchBlockHeader, + + #[error("Error from subsystem-util: {0}")] + SubsystemUtil(#[from] polkadot_node_subsystem_util::Error), + + #[error("Retrieving response from Availability Store unexpectedly failed")] + AvailabilityStore, } /// General result abbreviation type alias. @@ -112,7 +121,10 @@ pub fn log_error( JfyiError::QueryAvailableDataResponseChannel(_) | JfyiError::QueryChunkResponseChannel(_) | JfyiError::FailedNodeFeatures(_) | - JfyiError::ErasureCoding(_) => gum::warn!(target: LOG_TARGET, error = %jfyi, ctx), + JfyiError::ErasureCoding(_) | + JfyiError::NoSuchBlockHeader | + JfyiError::SubsystemUtil(_) | + JfyiError::AvailabilityStore => gum::warn!(target: LOG_TARGET, error = %jfyi, ctx), JfyiError::FetchPoV(_) | JfyiError::SendResponse | JfyiError::NoSuchPoV | diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 438453814978c..e5a220b8d3447 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -62,6 +62,8 @@ pub struct AvailabilityDistributionSubsystem { req_protocol_names: ReqProtocolNames, /// Prometheus metrics. metrics: Metrics, + /// Whether speculative availability requests are enabled. + speculative_availability: bool, } /// Receivers to be passed into availability distribution. @@ -94,21 +96,24 @@ impl AvailabilityDistributionSubsystem { recvs: IncomingRequestReceivers, req_protocol_names: ReqProtocolNames, metrics: Metrics, + speculative_availability: bool, ) -> Self { let runtime = RuntimeInfo::new(Some(keystore)); - Self { runtime, recvs, req_protocol_names, metrics } + Self { runtime, recvs, req_protocol_names, metrics, speculative_availability } } /// Start processing work as passed on from the Overseer. async fn run(self, mut ctx: Context) -> std::result::Result<(), FatalError> { - let Self { mut runtime, recvs, metrics, req_protocol_names } = self; + let Self { mut runtime, recvs, metrics, req_protocol_names, speculative_availability } = + self; let IncomingRequestReceivers { pov_req_receiver, chunk_req_v1_receiver, chunk_req_v2_receiver, } = recvs; - let mut requester = Requester::new(req_protocol_names, metrics.clone()).fuse(); + let mut requester = + Requester::new(req_protocol_names, metrics.clone(), speculative_availability).fuse(); let mut warn_freq = gum::Freq::new(); { diff --git a/polkadot/node/network/availability-distribution/src/metrics.rs b/polkadot/node/network/availability-distribution/src/metrics.rs index bc724be10d9b7..01de7cf4784b4 100644 --- a/polkadot/node/network/availability-distribution/src/metrics.rs +++ b/polkadot/node/network/availability-distribution/src/metrics.rs @@ -31,6 +31,12 @@ pub const FAILED: &'static str = "failed"; /// Label for chunks/PoVs that could not be served, because they were not available. pub const NOT_FOUND: &'static str = "not-found"; +/// Label for scheduled core origin. +pub const SCHEDULED: &'static str = "scheduled"; + +/// Label for occupied core origin. +pub const OCCUPIED: &'static str = "occupied"; + /// Availability Distribution metrics. #[derive(Clone, Default)] pub struct Metrics(Option); @@ -65,9 +71,9 @@ impl Metrics { } /// Increment counter on fetched labels. - pub fn on_fetch(&self, label: &'static str) { + pub fn on_fetch(&self, label: &'static str, origin: &'static str) { if let Some(metrics) = &self.0 { - metrics.fetched_chunks.with_label_values(&[label]).inc() + metrics.fetched_chunks.with_label_values(&[label, origin]).inc() } } @@ -109,7 +115,7 @@ impl metrics::Metrics for Metrics { "polkadot_parachain_fetched_chunks_total", "Total number of fetched chunks.", ), - &["success"] + &["success", "origin"], )?, registry, )?, diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 4a68349d398fe..82a37eab82ca9 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -36,14 +36,17 @@ use polkadot_node_subsystem::{ }; use polkadot_primitives::{ AuthorityDiscoveryId, BlakeTwo256, CandidateHash, ChunkIndex, GroupIndex, Hash, HashT, - OccupiedCore, SessionIndex, + SessionIndex, }; use sc_network::ProtocolName; use crate::{ error::{FatalError, Result}, - metrics::{Metrics, FAILED, SUCCEEDED}, - requester::session_cache::{BadValidators, SessionInfo}, + metrics::{Metrics, FAILED, OCCUPIED, SCHEDULED, SUCCEEDED}, + requester::{ + session_cache::{BadValidators, SessionInfo}, + CoreInfo, CoreInfoOrigin, + }, LOG_TARGET, }; @@ -137,6 +140,9 @@ struct RunningTask { /// Full protocol name for ChunkFetchingV2. req_v2_protocol_name: ProtocolName, + + /// The origin of the `CoreInfo` that was used to create this running task. + origin: CoreInfoOrigin, } impl FetchTaskConfig { @@ -145,7 +151,7 @@ impl FetchTaskConfig { /// The result of this function can be passed into [`FetchTask::start`]. pub fn new( leaf: Hash, - core: &OccupiedCore, + core: &CoreInfo, sender: mpsc::Sender, metrics: Metrics, session_info: &SessionInfo, @@ -170,13 +176,14 @@ impl FetchTaskConfig { candidate_hash: core.candidate_hash, index: session_info.our_index, }, - erasure_root: core.candidate_descriptor.erasure_root(), - relay_parent: core.candidate_descriptor.relay_parent(), + erasure_root: core.erasure_root, + relay_parent: core.relay_parent, metrics, sender, chunk_index, req_v1_protocol_name, - req_v2_protocol_name + req_v2_protocol_name, + origin: core.origin.clone(), }; FetchTaskConfig { live_in, prepared_running: Some(prepared_running) } } @@ -188,7 +195,7 @@ impl FetchTask { /// /// A task handling the fetching of the configured chunk will be spawned. pub async fn start(config: FetchTaskConfig, ctx: &mut Context) -> Result { - let FetchTaskConfig { prepared_running, live_in } = config; + let FetchTaskConfig { prepared_running, live_in, .. } = config; if let Some(running) = prepared_running { let (handle, kill) = oneshot::channel(); @@ -283,7 +290,13 @@ impl RunningTask { target: LOG_TARGET, "Node seems to be shutting down, canceling fetch task" ); - self.metrics.on_fetch(FAILED); + self.metrics.on_fetch( + FAILED, + match self.origin { + CoreInfoOrigin::Scheduled => SCHEDULED, + CoreInfoOrigin::Occupied => OCCUPIED, + }, + ); return }, Err(TaskError::PeerError) => { @@ -303,6 +316,7 @@ impl RunningTask { session_index = ?self.session_index, chunk_index = ?self.request.index, candidate_hash = ?self.request.candidate_hash, + origin = ?self.origin, "Validator did not have our chunk" ); bad_validators.push(validator); @@ -322,10 +336,22 @@ impl RunningTask { break } if succeeded { - self.metrics.on_fetch(SUCCEEDED); + self.metrics.on_fetch( + SUCCEEDED, + match self.origin { + CoreInfoOrigin::Scheduled => SCHEDULED, + CoreInfoOrigin::Occupied => OCCUPIED, + }, + ); self.conclude(bad_validators).await; } else { - self.metrics.on_fetch(FAILED); + self.metrics.on_fetch( + FAILED, + match self.origin { + CoreInfoOrigin::Scheduled => SCHEDULED, + CoreInfoOrigin::Occupied => OCCUPIED, + }, + ); self.conclude_fail().await } } diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs index 9d4ac5bc4b1b9..9f6ec3afd657b 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs @@ -368,6 +368,7 @@ fn get_test_running_task( req_v1_protocol_name: req_protocol_names.get_name(Protocol::ChunkFetchingV1), req_v2_protocol_name: req_protocol_names.get_name(Protocol::ChunkFetchingV2), chunk_index, + origin: CoreInfoOrigin::Occupied, }, rx, ) diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 2338250327246..02a3ec219d529 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -31,16 +31,19 @@ use futures::{ use polkadot_node_network_protocol::request_response::{v1, v2, IsRequest, ReqProtocolNames}; use polkadot_node_subsystem::{ - messages::{ChainApiMessage, RuntimeApiMessage}, - overseer, ActivatedLeaf, ActiveLeavesUpdate, + messages::{ + AvailabilityStoreMessage, CandidateBackingMessage, ChainApiMessage, RuntimeApiMessage, + }, + overseer, ActivatedLeaf, ActiveLeavesUpdate, SubsystemSender, }; use polkadot_node_subsystem_util::{ availability_chunks::availability_chunk_index, - runtime::{get_occupied_cores, RuntimeInfo}, + request_availability_cores, request_backable_candidates, request_validator_groups, + runtime::{get_occupied_cores, recv_runtime, RuntimeInfo}, }; -use polkadot_primitives::{CandidateHash, CoreIndex, Hash, OccupiedCore, SessionIndex}; +use polkadot_primitives::{CandidateHash, CoreIndex, GroupIndex, Hash, SessionIndex}; -use super::{FatalError, Metrics, Result, LOG_TARGET}; +use super::{error::Error, FatalError, Metrics, Result, LOG_TARGET}; #[cfg(test)] mod tests; @@ -53,6 +56,32 @@ use session_cache::SessionCache; mod fetch_task; use fetch_task::{FetchTask, FetchTaskConfig, FromFetchTask}; +/// A compact representation of a parachain candidate core's essential information, +/// used to streamline chunk-fetching tasks. This structure normalizes data from both +/// occupied and scheduled cores into a unified format containing only the fields +/// necessary for chunk fetching and validation. +#[derive(Debug, Clone)] +struct CoreInfo { + /// The candidate hash. + candidate_hash: CandidateHash, + /// The relay parent of the candidate. + relay_parent: Hash, + /// The root hash of the erasure coded chunks for the candidate. + erasure_root: Hash, + /// The group index of the group responsible for the candidate. + group_responsible: GroupIndex, + /// The origin of the CoreInfo, whether from occupied or scheduled cores. + origin: CoreInfoOrigin, +} + +/// Origin of CoreInfo. Whether it was created by calling prospective parachains for scheduled +/// candidates, or by querying occupied cores for already backed candidates. +#[derive(Debug, Clone)] +enum CoreInfoOrigin { + Occupied, + Scheduled, +} + /// Requester takes care of requesting erasure chunks from backing groups and stores them in the /// av store. /// @@ -80,6 +109,9 @@ pub struct Requester { /// Mapping of the req-response protocols to the full protocol names. req_protocol_names: ReqProtocolNames, + + /// Whether speculative availability requests are enabled. + speculative_availability: bool, } #[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)] @@ -91,7 +123,11 @@ impl Requester { /// /// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress /// by advancing the stream. - pub fn new(req_protocol_names: ReqProtocolNames, metrics: Metrics) -> Self { + pub fn new( + req_protocol_names: ReqProtocolNames, + metrics: Metrics, + speculative_availability: bool, + ) -> Self { let (tx, rx) = mpsc::channel(1); Requester { fetches: HashMap::new(), @@ -100,6 +136,7 @@ impl Requester { rx, metrics, req_protocol_names, + speculative_availability, } } @@ -124,6 +161,57 @@ impl Requester { Ok(()) } + async fn request_backable_candidates_core_info( + &mut self, + ctx: &mut Context, + new_head: ActivatedLeaf, + ) -> Result> { + let ActivatedLeaf { hash: leaf, .. } = new_head; + let mut scheduled_cores = Vec::new(); + let sender = &mut ctx.sender().clone(); + + let cores = recv_runtime(request_availability_cores(leaf, sender).await).await?; + let backable = request_backable_candidates(&cores, None, &new_head, sender).await?; + gum::trace!( + target: LOG_TARGET, + leaf_hash=?leaf, + "Got {} backable candidates", backable.len() + ); + + // now get the backed candidates corresponding to these candidate receipts + let (tx, rx) = oneshot::channel(); + sender + .send_message(CandidateBackingMessage::GetBackableCandidates(backable.clone(), tx)) + .await; + let candidates = rx.await?; + + // Process candidates and collect cores + for (_, candidates) in candidates.iter() { + for candidate in candidates { + let (_, core_index) = candidate.validator_indices_and_core_index(); + let Some(core_index) = core_index else { continue }; + + let receipt = candidate.candidate(); + let core = ( + core_index, + CoreInfo { + candidate_hash: receipt.hash(), + relay_parent: receipt.descriptor.relay_parent(), + erasure_root: receipt.descriptor.erasure_root(), + group_responsible: get_group_index_for_backed_candidate( + sender, core_index, leaf, + ) + .await?, + origin: CoreInfoOrigin::Scheduled, + }, + ); + scheduled_cores.push(core); + } + } + + Ok(scheduled_cores) + } + /// Start requesting chunks for newly imported head. /// /// This will also request [`SESSION_ANCESTRY_LEN`] leaf ancestors from the same session @@ -152,6 +240,23 @@ impl Requester { occupied_cores = ?cores, "Query occupied core" ); + + let cores = cores + .into_iter() + .map(|(index, occ)| { + ( + index, + CoreInfo { + candidate_hash: occ.candidate_hash, + relay_parent: occ.candidate_descriptor.relay_parent(), + erasure_root: occ.candidate_descriptor.erasure_root(), + group_responsible: occ.group_responsible, + origin: CoreInfoOrigin::Occupied, + }, + ) + }) + .collect::>(); + // Important: // We mark the whole ancestry as live in the **leaf** hash, so we don't need to track // any tasks separately. @@ -162,6 +267,62 @@ impl Requester { self.add_cores(ctx, runtime, leaf, leaf_session_index, cores).await?; } + if self.speculative_availability { + let scheduled_cores = self.request_backable_candidates_core_info(ctx, new_head).await?; + if scheduled_cores.len() > 0 { + gum::trace!( + target: LOG_TARGET, + ?scheduled_cores, + "Query scheduled cores" + ); + + let candidate_hashes = scheduled_cores + .iter() + .map(|(_, core_info)| core_info.candidate_hash) + .collect::>(); + + let session_info = self + .session_cache + .get_session_info( + ctx, + runtime, + // We use leaf here, the relay_parent must be in the same session as + // the leaf. This is guaranteed by runtime which ensures that cores are + // cleared at session boundaries. At the same time, only leaves are + // guaranteed to be fetchable by the state trie. + leaf, + leaf_session_index, + ) + .await + .map_err(|err| { + gum::warn!( + target: LOG_TARGET, + error = ?err, + "Failed to spawn a fetch task" + ); + err + })?; + + if let Some(session_info) = session_info { + let num_validators = + session_info.validator_groups.iter().fold(0usize, |mut acc, group| { + acc = acc.saturating_add(group.len()); + acc + }); + + let (tx, rx) = oneshot::channel(); + sender + .send_message(AvailabilityStoreMessage::NoteBackableCandidates { + candidate_hashes, + num_validators, + tx, + }) + .await; + rx.await?.map_err(|_| Error::AvailabilityStore)?; + self.add_cores(ctx, runtime, leaf, leaf_session_index, scheduled_cores).await?; + } + } + } Ok(()) } @@ -187,7 +348,7 @@ impl Requester { runtime: &mut RuntimeInfo, leaf: Hash, leaf_session_index: SessionIndex, - cores: impl IntoIterator, + cores: impl IntoIterator, ) -> Result<()> { for (core_index, core) in cores { if let Some(e) = self.fetches.get_mut(&core.candidate_hash) { @@ -347,3 +508,27 @@ where .map_err(FatalError::ChainApi)?; Ok(ancestors) } + +/// Request group index assuming you have the core index and relay parent +async fn get_group_index_for_backed_candidate( + sender: &mut Sender, + core_index: CoreIndex, + relay_parent: Hash, +) -> Result +where + Sender: + overseer::SubsystemSender + overseer::SubsystemSender, +{ + let (validator_groups, mut rotation_info) = + recv_runtime(request_validator_groups(relay_parent, sender).await).await?; + + // Get the block number for the relay_parent to update rotation_info.now + let (tx, rx) = oneshot::channel(); + sender.send_message(ChainApiMessage::BlockHeader(relay_parent, tx)).await; + let header = rx.await??; + let block_number = header.ok_or_else(|| Error::NoSuchBlockHeader)?.number; + + rotation_info.now = block_number.into(); // Update now to the relay_parent's block number + + Ok(rotation_info.group_for_core(core_index, validator_groups.len())) +} diff --git a/polkadot/node/network/availability-distribution/src/requester/tests.rs b/polkadot/node/network/availability-distribution/src/requester/tests.rs index 021f6da7e2e99..77d68cb1b8385 100644 --- a/polkadot/node/network/availability-distribution/src/requester/tests.rs +++ b/polkadot/node/network/availability-distribution/src/requester/tests.rs @@ -201,8 +201,11 @@ fn test_harness>( #[test] fn check_ancestry_lookup_in_same_session() { let test_state = TestState::new(); - let mut requester = - Requester::new(ReqProtocolNames::new(&Hash::repeat_byte(0xff), None), Default::default()); + let mut requester = Requester::new( + ReqProtocolNames::new(&Hash::repeat_byte(0xff), None), + Default::default(), + false, + ); let keystore = make_ferdie_keystore(); let mut runtime = RuntimeInfo::new(Some(keystore)); @@ -268,8 +271,11 @@ fn check_ancestry_lookup_in_same_session() { #[test] fn check_ancestry_lookup_in_different_sessions() { let mut test_state = TestState::new(); - let mut requester = - Requester::new(ReqProtocolNames::new(&Hash::repeat_byte(0xff), None), Default::default()); + let mut requester = Requester::new( + ReqProtocolNames::new(&Hash::repeat_byte(0xff), None), + Default::default(), + false, + ); let keystore = make_ferdie_keystore(); let mut runtime = RuntimeInfo::new(Some(keystore)); diff --git a/polkadot/node/network/availability-distribution/src/tests/mod.rs b/polkadot/node/network/availability-distribution/src/tests/mod.rs index 078220607c37f..44c1d6b52cc85 100644 --- a/polkadot/node/network/availability-distribution/src/tests/mod.rs +++ b/polkadot/node/network/availability-distribution/src/tests/mod.rs @@ -62,6 +62,7 @@ fn test_harness>( IncomingRequestReceivers { pov_req_receiver, chunk_req_v1_receiver, chunk_req_v2_receiver }, req_protocol_names, Default::default(), + false, ); let subsystem = subsystem.run(context); diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 83709e73027b9..5857ad3c8f0e4 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -520,6 +520,8 @@ pub struct Overseer { ChainApiMessage, RuntimeApiMessage, NetworkBridgeTxMessage, + ProspectiveParachainsMessage, + CandidateBackingMessage ])] availability_distribution: AvailabilityDistribution, diff --git a/polkadot/node/service/src/builder/mod.rs b/polkadot/node/service/src/builder/mod.rs index 9d52617cc8da9..bfbb20de1e7e8 100644 --- a/polkadot/node/service/src/builder/mod.rs +++ b/polkadot/node/service/src/builder/mod.rs @@ -98,6 +98,8 @@ pub struct NewFullParams { pub invulnerable_ah_collators: HashSet, /// Override for `HOLD_OFF_DURATION` constant . pub collator_protocol_hold_off: Option, + /// Enable speculative availability requests via Prospective Parachains. + pub speculative_availability: bool, } /// Completely built polkadot node service. @@ -209,6 +211,7 @@ where keep_finalized_for, invulnerable_ah_collators, collator_protocol_hold_off, + speculative_availability, }, overseer_connector, partial_components: @@ -452,6 +455,7 @@ where fetch_chunks_threshold, invulnerable_ah_collators, collator_protocol_hold_off, + speculative_availability, }) }; diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index d6ed752b4c31c..b9c474b146008 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -147,6 +147,8 @@ pub struct ExtendedOverseerGenArgs { pub invulnerable_ah_collators: HashSet, /// Override for `HOLD_OFF_DURATION` constant . pub collator_protocol_hold_off: Option, + /// Whether speculative availability requests are enabled. + pub speculative_availability: bool, } /// Obtain a prepared validator `Overseer`, that is initialized with all default values. @@ -183,6 +185,7 @@ pub fn validator_overseer_builder( fetch_chunks_threshold, invulnerable_ah_collators, collator_protocol_hold_off, + speculative_availability, }: ExtendedOverseerGenArgs, ) -> Result< InitializedOverseerBuilder< @@ -262,6 +265,7 @@ where }, req_protocol_names.clone(), Metrics::register(registry)?, + speculative_availability, )) .availability_recovery(AvailabilityRecoverySubsystem::for_validator( fetch_chunks_threshold, diff --git a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs index b346f988a3c90..ce0070ccf802f 100644 --- a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs @@ -268,6 +268,7 @@ pub fn prepare_test( }, state.req_protocol_names.clone(), Metrics::try_register(&dependencies.registry).unwrap(), + false, // Disable speculative availability for this benchmark ); let chain_api_state = ChainApiState { block_headers: state.block_headers.clone() }; diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 8805a330a99f6..21e8621e8d7d1 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -572,6 +572,18 @@ pub enum AvailabilityStoreMessage { tx: oneshot::Sender>, }, + /// Note that a candidate is backable. Only used by Speculative Availability to signal to + /// Availability Store that it should keep track of this candidate and it is highly likely the + /// candidate will be backed. + NoteBackableCandidates { + /// A hash of the backable candidate. + candidate_hashes: HashSet, + /// The number of validators in the session. + num_validators: usize, + /// Sending side of the channel to send result to. + tx: oneshot::Sender>, + }, + /// Computes and checks the erasure root of `AvailableData` before storing all of its chunks in /// the AV store. /// diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml index e8cf11c2a83f6..b8c33f12712c3 100644 --- a/polkadot/node/subsystem-util/Cargo.toml +++ b/polkadot/node/subsystem-util/Cargo.toml @@ -12,6 +12,7 @@ repository.workspace = true workspace = true [dependencies] +bitvec = { workspace = true } codec = { features = ["derive"], workspace = true } fatality = { workspace = true } futures = { workspace = true } diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 93cdc84443aa2..fa9bdd6d91f63 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -40,21 +40,23 @@ pub use polkadot_node_metrics::{metrics, Metronome}; use codec::Encode; use futures::channel::{mpsc, oneshot}; +use bitvec::{self, vec::BitVec}; + use polkadot_primitives::{ async_backing::{BackingState, Constraints}, slashing, AsyncBackingParams, AuthorityDiscoveryId, CandidateEvent, CandidateHash, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, CoreState, EncodeAs, ExecutorParams, GroupIndex, GroupRotationInfo, Hash, Id as ParaId, NodeFeatures, OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, SessionIndex, - SessionInfo, Signed, SigningContext, ValidationCode, ValidationCodeHash, ValidatorId, - ValidatorIndex, ValidatorSignature, + SessionInfo, Signed, SignedAvailabilityBitfield, SigningContext, ValidationCode, + ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, }; pub use rand; use sp_application_crypto::AppCrypto; use sp_core::ByteArray; use sp_keystore::{Error as KeystoreError, KeystorePtr}; use std::{ - collections::{BTreeMap, VecDeque}, + collections::{BTreeMap, HashMap, VecDeque}, time::Duration, }; use thiserror::Error; @@ -98,6 +100,10 @@ mod determine_new_blocks; mod controlled_validator_indices; pub use controlled_validator_indices::ControlledValidatorIndices; +use polkadot_node_subsystem_types::{ + messages::{Ancestors, ProspectiveParachainsMessage}, + ActivatedLeaf, +}; #[cfg(test)] mod tests; @@ -139,6 +145,10 @@ pub enum Error { /// Data that are supposed to be there a not there #[error("Data are not available")] DataNotAvailable, + /// Attempted to get backable candidates from prospective parachains but the request was + /// canceled. + #[error("failed to get backable candidates from prospective parachains")] + CanceledBackableCandidates(#[source] oneshot::Canceled), } impl From for Error { @@ -543,3 +553,244 @@ impl Validator { Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key) } } + +/// In general, we want to pick all the bitfields. However, we have the following constraints: +/// +/// - not more than one per validator +/// - each 1 bit must correspond to an occupied core +/// +/// If we have too many, an arbitrary selection policy is fine. For purposes of maximizing +/// availability, we pick the one with the greatest number of 1 bits. +/// +/// Note: This does not enforce any sorting precondition on the output; the ordering there will be +/// unrelated to the sorting of the input. +pub fn select_availability_bitfields( + cores: &[CoreState], + bitfields: &[SignedAvailabilityBitfield], + leaf_hash: &Hash, +) -> Vec { + let mut selected: BTreeMap = BTreeMap::new(); + + gum::debug!( + target: LOG_TARGET, + bitfields_count = bitfields.len(), + ?leaf_hash, + "bitfields count before selection" + ); + + 'a: for bitfield in bitfields.iter().cloned() { + if bitfield.payload().0.len() != cores.len() { + gum::debug!(target: LOG_TARGET, ?leaf_hash, "dropping bitfield due to length mismatch"); + continue + } + + let is_better = selected + .get(&bitfield.validator_index()) + .map_or(true, |b| b.payload().0.count_ones() < bitfield.payload().0.count_ones()); + + if !is_better { + gum::trace!( + target: LOG_TARGET, + val_idx = bitfield.validator_index().0, + ?leaf_hash, + "dropping bitfield due to duplication - the better one is kept" + ); + continue + } + + for (idx, _) in cores.iter().enumerate().filter(|v| !v.1.is_occupied()) { + // Bit is set for an unoccupied core - invalid + if *bitfield.payload().0.get(idx).as_deref().unwrap_or(&false) { + gum::debug!( + target: LOG_TARGET, + val_idx = bitfield.validator_index().0, + ?leaf_hash, + "dropping invalid bitfield - bit is set for an unoccupied core" + ); + continue 'a + } + } + + let _ = selected.insert(bitfield.validator_index(), bitfield); + } + + gum::debug!( + target: LOG_TARGET, + ?leaf_hash, + "selected {} of all {} bitfields (each bitfield is from a unique validator)", + selected.len(), + bitfields.len() + ); + + selected.into_values().collect() +} + +/// Requests backable candidates from Prospective Parachains subsystem +/// based on core states. +pub async fn request_backable_candidates( + availability_cores: &[CoreState], + bitfields: Option<&[SignedAvailabilityBitfield]>, + relay_parent: &ActivatedLeaf, + sender: &mut impl overseer::SubsystemSender, +) -> Result>, Error> { + let block_number_under_construction = relay_parent.number + 1; + + // Record how many cores are scheduled for each paraid. Use a BTreeMap because + // we'll need to iterate through them. + let mut scheduled_cores_per_para: BTreeMap = BTreeMap::new(); + // The on-chain ancestors of a para present in availability-cores. + let mut ancestors: HashMap = + HashMap::with_capacity(availability_cores.len()); + + for (core_idx, core) in availability_cores.iter().enumerate() { + let core_idx = CoreIndex(core_idx as u32); + match core { + CoreState::Scheduled(scheduled_core) => { + *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; + }, + CoreState::Occupied(occupied_core) => { + let is_available = match bitfields { + Some(bitfields) => { + // If bitfields are provided, check if the core is available. + bitfields_indicate_availability( + core_idx.0 as usize, + bitfields, + &occupied_core.availability, + ) + }, + None => { + gum::debug!( + target: LOG_TARGET, + leaf_hash = ?relay_parent.hash, + ?core_idx, + "No availability bitfields provided, assuming core is available" + ); + true + }, + }; + + if is_available { + ancestors + .entry(occupied_core.para_id()) + .or_default() + .insert(occupied_core.candidate_hash); + + if let Some(ref scheduled_core) = occupied_core.next_up_on_available { + // Request a new backable candidate for the newly scheduled para id. + *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; + } + } else if occupied_core.time_out_at <= block_number_under_construction { + // Timed out before being available. + + if let Some(ref scheduled_core) = occupied_core.next_up_on_time_out { + // Candidate's availability timed out, practically same as scheduled. + *scheduled_cores_per_para.entry(scheduled_core.para_id).or_insert(0) += 1; + } + } else { + // Not timed out and not available. + ancestors + .entry(occupied_core.para_id()) + .or_default() + .insert(occupied_core.candidate_hash); + } + }, + CoreState::Free => continue, + }; + } + + let mut selected_candidates: HashMap> = + HashMap::with_capacity(scheduled_cores_per_para.len()); + + for (para_id, core_count) in scheduled_cores_per_para { + let para_ancestors = ancestors.remove(¶_id).unwrap_or_default(); + + let response = get_backable_candidates( + relay_parent.hash, + para_id, + para_ancestors, + core_count as u32, + sender, + ) + .await?; + + if response.is_empty() { + gum::debug!( + target: LOG_TARGET, + leaf_hash = ?relay_parent.hash, + ?para_id, + "No backable candidate returned by prospective parachains", + ); + continue; + } + + selected_candidates.insert(para_id, response); + } + + Ok(selected_candidates) +} + +/// The availability bitfield for a given core. +pub type CoreAvailability = BitVec; + +/// The availability bitfield for a given core is the transpose +/// of a set of signed availability bitfields. It goes like this: +/// +/// - construct a transverse slice along `core_idx` +/// - bitwise-or it with the availability slice +/// - count the 1 bits, compare to the total length; true on 2/3+ +fn bitfields_indicate_availability( + core_idx: usize, + bitfields: &[SignedAvailabilityBitfield], + availability: &CoreAvailability, +) -> bool { + let mut availability = availability.clone(); + let availability_len = availability.len(); + + for bitfield in bitfields { + let validator_idx = bitfield.validator_index().0 as usize; + match availability.get_mut(validator_idx) { + None => { + // in principle, this function might return a `Result` so that we can + // more clearly express this error condition however, in practice, that would just + // push off an error-handling routine which would look a whole lot like this one. + // simpler to just handle the error internally here. + gum::warn!( + target: LOG_TARGET, + validator_idx = %validator_idx, + availability_len = %availability_len, + "attempted to set a transverse bit at idx {} which is greater than bitfield size {}", + validator_idx, + availability_len, + ); + + return false; + }, + Some(mut bit_mut) => *bit_mut |= bitfield.payload().0[core_idx], + } + } + + 3 * availability.count_ones() >= 2 * availability.len() +} + +/// Requests backable candidates from Prospective Parachains based on +/// the given ancestors in the fragment chain. The ancestors may not be ordered. +async fn get_backable_candidates( + relay_parent: Hash, + para_id: ParaId, + ancestors: Ancestors, + count: u32, + sender: &mut impl overseer::SubsystemSender, +) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + sender + .send_message(ProspectiveParachainsMessage::GetBackableCandidates( + relay_parent, + para_id, + count, + ancestors, + tx, + )) + .await; + + rx.await.map_err(Error::CanceledBackableCandidates) +} diff --git a/polkadot/node/test/service/src/lib.rs b/polkadot/node/test/service/src/lib.rs index fc035174fe039..477e9825efb35 100644 --- a/polkadot/node/test/service/src/lib.rs +++ b/polkadot/node/test/service/src/lib.rs @@ -102,6 +102,7 @@ pub fn new_full( keep_finalized_for: None, invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, + speculative_availability: false, }; match config.network.network_backend { diff --git a/polkadot/parachain/test-parachains/adder/collator/src/main.rs b/polkadot/parachain/test-parachains/adder/collator/src/main.rs index f6ed513c76a33..471bec24f4916 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/main.rs @@ -101,6 +101,7 @@ fn main() -> Result<()> { keep_finalized_for: None, invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, + speculative_availability: false, }, ) .map_err(|e| e.to_string())?; diff --git a/polkadot/parachain/test-parachains/undying/collator/src/main.rs b/polkadot/parachain/test-parachains/undying/collator/src/main.rs index 50392bce7d4f1..edfe1cf0a256d 100644 --- a/polkadot/parachain/test-parachains/undying/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/undying/collator/src/main.rs @@ -113,6 +113,7 @@ fn main() -> Result<()> { keep_finalized_for: None, invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, + speculative_availability: false, }, ) .map_err(|e| e.to_string())?; diff --git a/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs b/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs index 08f1d59bbddbe..ac61c29f5e273 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/async_backing_6_seconds_rate.rs @@ -82,6 +82,20 @@ async fn async_backing_6_seconds_rate_test() -> Result<(), anyhow::Error> { ) .await?; + let scheduled_metric_name = + "polkadot_parachain_fetched_chunks_total{origin=\"scheduled\",success=\"succeeded\"}"; + // scheduled chunk fetches should be 0 since speculative availability is disabled + assert!(relay_node.assert_with(scheduled_metric_name, |v| { v == 0.0 }).await?); + + let occupied_metric_name = + "polkadot_parachain_fetched_chunks_total{origin=\"occupied\",success=\"succeeded\"}"; + // all requests should be occupied since speculative availability is disabled + assert!( + relay_node + .assert_with(occupied_metric_name, |v| { v >= 22.0 && v <= 40.0 }) + .await? + ); + // Assert the parachain finalized block height is also on par with the number of backed // candidates. We can only do this for the collator based on cumulus. assert_finality_lag(¶_node_2001.wait_client().await?, 6).await?; diff --git a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs index e28cfb4039303..0ba0c07d24b4b 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs @@ -8,5 +8,6 @@ mod dispute_old_finalized; mod duplicate_collations; mod shared_core_idle_parachain; mod spam_statement_distribution_requests; +mod speculative_availability_requests; mod sync_backing; mod validator_disabling; diff --git a/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs new file mode 100644 index 0000000000000..c55f60fbb6c24 --- /dev/null +++ b/polkadot/zombienet-sdk-tests/tests/functional/speculative_availability_requests.rs @@ -0,0 +1,108 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Test we are producing 12-second parachain blocks if using 2x async-backing parachains, +// ensure that chunk fetch requests are being made speculatively. + +use anyhow::anyhow; + +use cumulus_zombienet_sdk_helpers::{assert_finality_lag, assert_para_throughput}; +use polkadot_primitives::Id as ParaId; +use serde_json::json; +use zombienet_sdk::{ + subxt::{OnlineClient, PolkadotConfig}, + NetworkConfigBuilder, +}; + +#[tokio::test(flavor = "multi_thread")] +async fn speculative_availability_requests_test() -> Result<(), anyhow::Error> { + let _ = env_logger::try_init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let images = zombienet_sdk::environment::get_images_from_env(); + + let config = NetworkConfigBuilder::new() + .with_relaychain(|r| { + let r = r + .with_chain("rococo-local") + .with_default_command("polkadot") + .with_default_image(images.polkadot.as_str()) + .with_default_args(vec![("-lparachain=debug", "--speculative-availability").into()]) + .with_genesis_overrides(json!({ + "configuration": { + "config": { + "scheduler_params": { + "group_rotation_frequency": 4 + } + } + } + })) + .with_node(|node| node.with_name("validator-0")); + + (1..12) + .fold(r, |acc, i| acc.with_node(|node| node.with_name(&format!("validator-{i}")))) + }) + .with_parachain(|p| { + p.with_id(2000) + .with_default_command("polkadot-parachain") + .with_default_image(images.cumulus.as_str()) + .with_default_args(vec![("-lparachain=debug,aura=debug").into()]) + .with_collator(|n| n.with_name("collator-2000")) + }) + .with_parachain(|p| { + p.with_id(2001) + .with_default_command("polkadot-parachain") + .with_default_image(images.cumulus.as_str()) + .with_default_args(vec![("-lparachain=debug,aura=debug").into()]) + .with_collator(|n| n.with_name("collator-2001")) + }) + .build() + .map_err(|e| { + let errs = e.into_iter().map(|e| e.to_string()).collect::>().join(" "); + anyhow!("config errs: {errs}") + })?; + + let spawn_fn = zombienet_sdk::environment::get_spawn_fn(); + let network = spawn_fn(config).await?; + + let relay_node = network.get_node("validator-0")?; + let para_node_2001 = network.get_node("collator-2001")?; + + let relay_client: OnlineClient = relay_node.wait_client().await?; + + assert_para_throughput( + &relay_client, + 15, + [(ParaId::from(2000), 11..16), (ParaId::from(2001), 11..16)], + ) + .await?; + + let scheduled_metric_name = + "polkadot_parachain_fetched_chunks_total{origin=\"scheduled\",success=\"succeeded\"}"; + // scheduled chunk fetches should be more than the asserted para throughput given blocks are + // still produced + assert!( + relay_node + .assert_with(scheduled_metric_name, |v| { v >= 22.0 && v <= 40.0 }) + .await? + ); + + let occupied_metric_name = + "polkadot_parachain_fetched_chunks_total{origin=\"occupied\",success=\"succeeded\"}"; + // given when speculative availability is requested on active leaves update, there may not be + // any backable candidates from Prospective Parachains at that the time. + assert!( + relay_node + .assert_with(occupied_metric_name, |v| { v >= 2.0 && v <= 10.0 }) + .await? + ); + + // Assert the parachain finalized block height is also on par with the number of backed + // candidates. We can only do this for the collator based on cumulus. + assert_finality_lag(¶_node_2001.wait_client().await?, 6).await?; + + log::info!("Test finished successfully"); + + Ok(()) +}