Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/client/consensus/aura/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tokio = { workspace = true, features = ["macros"] }
tracing = { workspace = true, default-features = true }

# Substrate
frame-support = { workspace = true, default-features = true }
prometheus-endpoint = { workspace = true, default-features = true }
sc-client-api = { workspace = true, default-features = true }
sc-consensus = { workspace = true, default-features = true }
Expand Down
12 changes: 7 additions & 5 deletions cumulus/client/consensus/aura/src/collator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,10 @@ pub async fn claim_slot<B, C, P>(
client: &C,
parent_hash: B::Hash,
relay_parent_header: &PHeader,
slot_duration: SlotDuration,
_slot_duration: SlotDuration,
relay_chain_slot_duration: Duration,
keystore: &KeystorePtr,
slot_tracker: &crate::slot_tracker::IncrementalSlotTracker,
) -> Result<Option<SlotClaim<P::Public>>, Box<dyn Error>>
where
B: BlockT,
Expand All @@ -366,15 +367,16 @@ where
relay_chain_slot_duration,
) {
Some((r_s, t)) => {
let our_slot = Slot::from_timestamp(t, slot_duration);
// Use incremental slot calculation: S_i = S_{i-1} + 1
// Client-side tracker eliminates expensive runtime API calls
let our_slot = slot_tracker.current_slot();

tracing::debug!(
target: crate::LOG_TARGET,
relay_slot = ?r_s,
para_slot = ?our_slot,
timestamp = ?t,
?slot_duration,
?relay_chain_slot_duration,
"Adjusted relay-chain slot to parachain slot"
"Using client-side incremental slot calculation"
);
(our_slot, t)
},
Expand Down
41 changes: 41 additions & 0 deletions cumulus/client/consensus/aura/src/collators/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use cumulus_client_collator::{
};
use cumulus_client_consensus_common::ParachainBlockImportMarker;
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::Slot;
use cumulus_primitives_core::{relay_chain::BlockId as RBlockId, CollectCollationInfo};
use cumulus_relay_chain_interface::RelayChainInterface;

Expand Down Expand Up @@ -113,6 +114,10 @@ where
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
async move {
// Create a global slot tracker for this collator instance
// We'll initialize it with the first slot_duration we get
let slot_tracker = std::sync::Arc::new(std::sync::Mutex::new(None::<crate::slot_tracker::IncrementalSlotTracker>));

let mut collation_requests = match params.collation_request_receiver {
Some(receiver) => receiver,
None =>
Expand Down Expand Up @@ -198,13 +203,49 @@ where
Err(e) => reject_with_error!(e),
};

// Initialize global slot tracker once if needed
{
let mut tracker_guard = slot_tracker.lock().unwrap();
if tracker_guard.is_none() {
// Initialize with storage reading to get proper slot continuation
if let Err(e) = crate::slot_tracker::init_global_slot_tracker_with_storage(
&*params.para_client,
parent_hash,
slot_duration,
) {
tracing::warn!(
target: crate::LOG_TARGET,
error = ?e,
"Failed to initialize slot tracker from storage, falling back to zero"
);
crate::slot_tracker::init_global_slot_tracker(Some(Slot::from(0)), slot_duration);
}

// Create local tracker instance for this collator
*tracker_guard = Some(crate::slot_tracker::IncrementalSlotTracker::new(Some(Slot::from(0)), slot_duration));
}
}

// Get current slot from the global tracker
let slot_tracker_ref = {
let tracker_guard = slot_tracker.lock().unwrap();
if let Some(ref tracker) = *tracker_guard {
tracker.clone()
} else {
// This should not happen as we initialize above
panic!("Slot tracker should be initialized");
}
};

// Use incremental slot tracker for proper slot calculation
let claim = match collator_util::claim_slot::<_, _, P>(
&*params.para_client,
parent_hash,
&relay_parent_header,
slot_duration,
params.relay_chain_slot_duration,
&params.keystore,
&slot_tracker_ref,
)
.await
{
Expand Down
130 changes: 90 additions & 40 deletions cumulus/client/consensus/aura/src/collators/lookahead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use codec::{Codec, Encode};
use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot};
use cumulus_primitives_core::{CollectCollationInfo, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;

Expand All @@ -52,7 +52,7 @@ use sc_consensus::BlockImport;
use sp_api::ProvideRuntimeApi;
use sp_application_crypto::AppPublic;
use sp_blockchain::HeaderBackend;
use sp_consensus_aura::{AuraApi, Slot};
use sp_consensus_aura::AuraApi;
use sp_core::crypto::Pair;
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
Expand Down Expand Up @@ -109,11 +109,13 @@ where
+ AuxStore
+ HeaderBackend<Block>
+ BlockBackend<Block>
+ sp_api::CallApiAt<Block>
+ Send
+ Sync
+ 'static,
Client::Api:
AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
Client::Api: AuraApi<Block, P::Public>
+ CollectCollationInfo<Block>
+ AuraUnincludedSegmentApi<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
RClient: RelayChainInterface + Clone + 'static,
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
Expand Down Expand Up @@ -161,11 +163,13 @@ where
+ AuxStore
+ HeaderBackend<Block>
+ BlockBackend<Block>
+ sp_api::CallApiAt<Block>
+ Send
+ Sync
+ 'static,
Client::Api:
AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
Client::Api: AuraApi<Block, P::Public>
+ CollectCollationInfo<Block>
+ AuraUnincludedSegmentApi<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
RClient: RelayChainInterface + Clone + 'static,
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
Expand All @@ -179,6 +183,10 @@ where
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
async move {
// Create a global slot tracker for this collator instance
// We'll initialize it with the first slot_duration we get
let slot_tracker = std::sync::Arc::new(std::sync::Mutex::new(None::<crate::slot_tracker::IncrementalSlotTracker>));

cumulus_client_collator::initialize_collator_subsystems(
&mut params.overseer_handle,
params.collator_key,
Expand Down Expand Up @@ -262,43 +270,75 @@ where
None => continue,
};

let para_client = &*params.para_client;
let keystore = &params.keystore;
let can_build_upon = |block_hash| {
let slot_duration = match sc_consensus_aura::standalone::slot_duration_at(
&*params.para_client,
block_hash,
) {
Ok(sd) => sd,
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to acquire parachain slot duration");
return None
},
};
tracing::debug!(target: crate::LOG_TARGET, ?slot_duration, ?block_hash, "Parachain slot duration acquired");
let para_client = params.para_client.clone();
let keystore = params.keystore.clone();
let relay_parent_header_clone = relay_parent_header.clone();
let relay_parent_header_for_closure = relay_parent_header.clone();
let relay_chain_slot_duration = params.relay_chain_slot_duration;
let slot_tracker_clone = slot_tracker.clone();

// Get slot duration once and initialize tracker if needed
let slot_duration = match sc_consensus_aura::standalone::slot_duration_at(
&*para_client,
initial_parent.hash,
) {
Ok(sd) => sd,
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to acquire parachain slot duration");
continue;
},
};

// Initialize slot tracker once if needed
{
let mut tracker_guard = slot_tracker_clone.lock().unwrap();
if tracker_guard.is_none() {
// Initialize with storage reading to get proper slot continuation
if let Err(e) = crate::slot_tracker::init_global_slot_tracker_with_storage(
&*para_client,
initial_parent.hash,
slot_duration,
) {
tracing::warn!(
target: crate::LOG_TARGET,
error = ?e,
"Failed to initialize slot tracker from storage, falling back to zero"
);
crate::slot_tracker::init_global_slot_tracker(Some(Slot::from(0)), slot_duration);
}

// Create local tracker instance for this collator
*tracker_guard = Some(crate::slot_tracker::IncrementalSlotTracker::new(Some(Slot::from(0)), slot_duration));
}
}

let can_build_upon = move |block_hash| {
let (relay_slot, timestamp) = consensus_common::relay_slot_and_timestamp(
&relay_parent_header,
params.relay_chain_slot_duration,
&relay_parent_header_for_closure,
relay_chain_slot_duration,
)?;
let slot_now = Slot::from_timestamp(timestamp, slot_duration);

// Use the global slot tracker
let slot_now = {
let tracker_guard = slot_tracker_clone.lock().unwrap();
if let Some(ref tracker) = *tracker_guard {
tracker.current_slot()
} else {
// This should not happen as we initialize above
return None;
}
};

tracing::debug!(
target: crate::LOG_TARGET,
?relay_slot,
para_slot = ?slot_now,
?timestamp,
?slot_duration,
relay_chain_slot_duration = ?params.relay_chain_slot_duration,
"Adjusted relay-chain slot to parachain slot"
?block_hash,
"Using incremental slot tracker for slot calculation"
);
Some(super::can_build_upon::<_, _, P>(
slot_now,
relay_slot,
timestamp,
block_hash,
included_block.hash(),
para_client,
&keystore,
))

Some((slot_now, relay_slot, timestamp, block_hash))
};

// Build in a loop until not allowed. Note that the authorities can change
Expand All @@ -316,9 +356,19 @@ where
// scheduled chains this ensures that the backlog will grow steadily.
for n_built in 0..2 {
let slot_claim = match can_build_upon(parent_hash) {
Some(fut) => match fut.await {
None => break,
Some(c) => c,
Some((slot_now, relay_slot, timestamp, block_hash)) => {
match super::can_build_upon::<_, _, P>(
slot_now,
relay_slot,
timestamp,
block_hash,
included_block.hash(),
&*para_client,
&keystore,
).await {
None => break,
Some(c) => c,
}
},
None => break,
};
Expand All @@ -332,8 +382,8 @@ where

let validation_data = PersistedValidationData {
parent_head: parent_header.encode().into(),
relay_parent_number: *relay_parent_header.number(),
relay_parent_storage_root: *relay_parent_header.state_root(),
relay_parent_number: *relay_parent_header_clone.number(),
relay_parent_storage_root: *relay_parent_header_clone.state_root(),
max_pov_size,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ fn compute_next_wake_up_time(

let (duration, timestamp) =
time_until_next_attempt(time_now, block_production_interval, time_offset);
// Note: For slot_timer, we keep using timestamp-based calculation for timing
// but the actual slot index used for block authoring comes from the incremental API
let aura_slot = Slot::from_timestamp(timestamp, para_slot_duration);
(duration, aura_slot)
}
Expand Down
15 changes: 15 additions & 0 deletions cumulus/client/consensus/aura/src/equivocation_import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,22 @@ where
}

fn slot_now(slot_duration: SlotDuration) -> Slot {
// Try to use global incremental slot tracker first
if let Some(slot) = crate::slot_tracker::get_global_current_slot() {
tracing::debug!(
target: crate::LOG_TARGET,
para_slot = ?slot,
"Using incremental slot from global tracker"
);
return slot;
}

// Fallback to timestamp-based calculation if global tracker is not initialized
let timestamp = sp_timestamp::InherentDataProvider::from_system_time().timestamp();
tracing::warn!(
target: crate::LOG_TARGET,
"Global slot tracker not initialized, falling back to timestamp-based calculation"
);
Slot::from_timestamp(timestamp, slot_duration)
}

Expand Down
1 change: 1 addition & 0 deletions cumulus/client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub use sc_consensus_slots::InherentDataProviderExt;
pub mod collator;
pub mod collators;
pub mod equivocation_import_queue;
pub mod slot_tracker;

const LOG_TARGET: &str = "aura::cumulus";

Expand Down
Loading