Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 9628e44

Browse files
committed
extract different implementations into own modules
1 parent 4109e0c commit 9628e44

File tree

5 files changed

+590
-436
lines changed

5 files changed

+590
-436
lines changed
Lines changed: 377 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,377 @@
1+
// Copyright 2023 Parity Technologies (UK) Ltd.
2+
// This file is part of Cumulus.
3+
4+
// Cumulus is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
9+
// Cumulus is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU General Public License for more details.
13+
14+
// You should have received a copy of the GNU General Public License
15+
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.
16+
17+
//! This provides the option to run a basic relay-chain driven Aura implementation
18+
//!
19+
//! For more information about AuRa, the Substrate crate should be checked.
20+
21+
use codec::{Decode, Encode};
22+
use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
23+
use cumulus_client_consensus_common::{
24+
ParachainBlockImportMarker, ParachainCandidate, ParentSearchParams,
25+
};
26+
use cumulus_client_consensus_proposer::ProposerInterface;
27+
use cumulus_primitives_core::{
28+
relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData,
29+
};
30+
use cumulus_primitives_parachain_inherent::ParachainInherentData;
31+
use cumulus_relay_chain_interface::RelayChainInterface;
32+
33+
use polkadot_node_primitives::{CollationResult, MaybeCompressedPoV};
34+
use polkadot_overseer::Handle as OverseerHandle;
35+
use polkadot_primitives::{Block as PBlock, CollatorPair, Header as PHeader, Id as ParaId};
36+
37+
use futures::prelude::*;
38+
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
39+
use sc_consensus::{
40+
import_queue::{BasicQueue, Verifier as VerifierT},
41+
BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction,
42+
};
43+
use sc_consensus_aura::standalone as aura_internal;
44+
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
45+
use sp_api::ProvideRuntimeApi;
46+
use sp_application_crypto::AppPublic;
47+
use sp_block_builder::BlockBuilder as BlockBuilderApi;
48+
use sp_blockchain::HeaderBackend;
49+
use sp_consensus::{error::Error as ConsensusError, BlockOrigin, SyncOracle};
50+
use sp_consensus_aura::{AuraApi, Slot, SlotDuration};
51+
use sp_core::crypto::Pair;
52+
use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
53+
use sp_keystore::KeystorePtr;
54+
use sp_runtime::{
55+
generic::Digest,
56+
traits::{Block as BlockT, HashFor, Header as HeaderT, Member},
57+
};
58+
use sp_state_machine::StorageChanges;
59+
use sp_timestamp::Timestamp;
60+
use std::{convert::TryFrom, error::Error, fmt::Debug, hash::Hash, sync::Arc, time::Duration};
61+
62+
/// Parameters for [`run`].
63+
pub struct Params<BI, CIDP, Client, RClient, SO, Proposer, CS> {
64+
pub create_inherent_data_providers: CIDP,
65+
pub block_import: BI,
66+
pub para_client: Arc<Client>,
67+
pub relay_client: Arc<RClient>,
68+
pub sync_oracle: SO,
69+
pub keystore: KeystorePtr,
70+
pub key: CollatorPair,
71+
pub para_id: ParaId,
72+
pub overseer_handle: OverseerHandle,
73+
pub slot_duration: SlotDuration,
74+
pub relay_chain_slot_duration: SlotDuration,
75+
pub proposer: Proposer,
76+
pub collator_service: CS,
77+
}
78+
79+
/// Run bare Aura consensus as a relay-chain-driven collator.
80+
pub async fn run<Block, P, BI, CIDP, Client, RClient, SO, Proposer, CS>(
81+
params: Params<BI, CIDP, Client, RClient, SO, Proposer, CS>,
82+
) where
83+
Block: BlockT,
84+
Client: ProvideRuntimeApi<Block>
85+
+ BlockOf
86+
+ AuxStore
87+
+ HeaderBackend<Block>
88+
+ BlockBackend<Block>
89+
+ Send
90+
+ Sync
91+
+ 'static,
92+
Client::Api: AuraApi<Block, P::Public> + CollectCollationInfo<Block>,
93+
RClient: RelayChainInterface,
94+
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
95+
BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
96+
SO: SyncOracle + Send + Sync + Clone + 'static,
97+
Proposer: ProposerInterface<Block, Transaction = BI::Transaction>,
98+
Proposer::Transaction: Sync,
99+
CS: CollatorServiceInterface<Block>,
100+
P: Pair + Send + Sync,
101+
P::Public: AppPublic + Hash + Member + Encode + Decode,
102+
P::Signature: TryFrom<Vec<u8>> + Hash + Member + Encode + Decode,
103+
{
104+
let mut params = params;
105+
106+
let mut collation_requests = cumulus_client_collator::relay_chain_driven::init(
107+
params.key,
108+
params.para_id,
109+
params.overseer_handle,
110+
)
111+
.await;
112+
113+
while let Some(request) = collation_requests.next().await {
114+
macro_rules! reject_with_error {
115+
($err:expr) => {{
116+
request.complete(None);
117+
tracing::error!(target: crate::LOG_TARGET, err = ?{ $err });
118+
continue;
119+
}};
120+
}
121+
122+
macro_rules! try_request {
123+
($x:expr) => {{
124+
match $x {
125+
Ok(x) => x,
126+
Err(e) => reject_with_error!(e),
127+
}
128+
}};
129+
}
130+
131+
let validation_data = request.persisted_validation_data();
132+
133+
let parent_header =
134+
try_request!(Block::Header::decode(&mut &validation_data.parent_head.0[..]));
135+
136+
let parent_hash = parent_header.hash();
137+
138+
if !params.collator_service.check_block_status(parent_hash, &parent_header) {
139+
continue
140+
}
141+
142+
let relay_parent_header = match params.relay_client.header(*request.relay_parent()).await {
143+
Err(e) => reject_with_error!(e),
144+
Ok(None) => continue, // sanity: would be inconsistent to get `None` here
145+
Ok(Some(h)) => h,
146+
};
147+
148+
let claim = match claim_slot::<_, _, P>(
149+
&*params.para_client,
150+
parent_hash,
151+
&relay_parent_header,
152+
params.slot_duration,
153+
params.relay_chain_slot_duration,
154+
&params.keystore,
155+
)
156+
.await
157+
{
158+
Ok(None) => continue,
159+
Ok(Some(c)) => c,
160+
Err(e) => reject_with_error!(e),
161+
};
162+
163+
let (parachain_inherent_data, other_inherent_data) = try_request!(
164+
create_inherent_data(
165+
*request.relay_parent(),
166+
&validation_data,
167+
parent_hash,
168+
params.para_id,
169+
claim.timestamp,
170+
&params.relay_client,
171+
&params.create_inherent_data_providers,
172+
)
173+
.await
174+
);
175+
176+
let proposal = try_request!(
177+
params
178+
.proposer
179+
.propose(
180+
&parent_header,
181+
&parachain_inherent_data,
182+
other_inherent_data,
183+
Digest { logs: vec![claim.pre_digest] },
184+
// TODO [https://github.com/paritytech/cumulus/issues/2439]
185+
// We should call out to a pluggable interface that provides
186+
// the proposal duration.
187+
Duration::from_millis(500),
188+
// Set the block limit to 50% of the maximum PoV size.
189+
//
190+
// TODO: If we got benchmarking that includes the proof size,
191+
// we should be able to use the maximum pov size.
192+
Some((validation_data.max_pov_size / 2) as usize),
193+
)
194+
.await
195+
);
196+
197+
let sealed_importable = try_request!(seal::<_, _, P>(
198+
proposal.block,
199+
proposal.storage_changes,
200+
&claim.author_pub,
201+
&params.keystore,
202+
));
203+
204+
let post_hash = sealed_importable.post_hash();
205+
let block = Block::new(
206+
sealed_importable.post_header(),
207+
sealed_importable
208+
.body
209+
.as_ref()
210+
.expect("body always created with this `propose` fn; qed")
211+
.clone(),
212+
);
213+
214+
try_request!(params.block_import.import_block(sealed_importable).await);
215+
216+
let response = if let Some((collation, b)) = params.collator_service.build_collation(
217+
&parent_header,
218+
post_hash,
219+
ParachainCandidate { block, proof: proposal.proof },
220+
) {
221+
tracing::info!(
222+
target: crate::LOG_TARGET,
223+
"PoV size {{ header: {}kb, extrinsics: {}kb, storage_proof: {}kb }}",
224+
b.header().encode().len() as f64 / 1024f64,
225+
b.extrinsics().encode().len() as f64 / 1024f64,
226+
b.storage_proof().encode().len() as f64 / 1024f64,
227+
);
228+
229+
if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
230+
tracing::info!(
231+
target: crate::LOG_TARGET,
232+
"Compressed PoV size: {}kb",
233+
pov.block_data.0.len() as f64 / 1024f64,
234+
);
235+
}
236+
237+
let result_sender = params.collator_service.announce_with_barrier(post_hash);
238+
Some(CollationResult { collation, result_sender: Some(result_sender) })
239+
} else {
240+
None
241+
};
242+
243+
request.complete(response);
244+
}
245+
}
246+
247+
/// A claim on an Aura slot.
248+
struct SlotClaim<Pub> {
249+
author_pub: Pub,
250+
pre_digest: sp_runtime::DigestItem,
251+
timestamp: Timestamp,
252+
}
253+
254+
async fn claim_slot<B, C, P>(
255+
client: &C,
256+
parent_hash: B::Hash,
257+
relay_parent_header: &PHeader,
258+
slot_duration: SlotDuration,
259+
relay_chain_slot_duration: SlotDuration,
260+
keystore: &KeystorePtr,
261+
) -> Result<Option<SlotClaim<P::Public>>, Box<dyn Error>>
262+
where
263+
B: BlockT,
264+
C: ProvideRuntimeApi<B> + Send + Sync + 'static,
265+
C::Api: AuraApi<B, P::Public>,
266+
P: Pair,
267+
P::Public: Encode + Decode,
268+
P::Signature: Encode + Decode,
269+
{
270+
// load authorities
271+
let authorities = client.runtime_api().authorities(parent_hash).map_err(Box::new)?;
272+
273+
// Determine the current slot and timestamp based on the relay-parent's.
274+
let (slot_now, timestamp) =
275+
match sc_consensus_babe::find_pre_digest::<PBlock>(relay_parent_header) {
276+
Ok(babe_pre_digest) => {
277+
let t =
278+
Timestamp::new(relay_chain_slot_duration.as_millis() * *babe_pre_digest.slot());
279+
let slot = Slot::from_timestamp(t, slot_duration);
280+
281+
(slot, t)
282+
},
283+
Err(_) => return Ok(None),
284+
};
285+
286+
// Try to claim the slot locally.
287+
let author_pub = {
288+
let res = aura_internal::claim_slot::<P>(slot_now, &authorities, keystore).await;
289+
match res {
290+
Some(p) => p,
291+
None => return Ok(None),
292+
}
293+
};
294+
295+
// Produce the pre-digest.
296+
let pre_digest = aura_internal::pre_digest::<P>(slot_now);
297+
298+
Ok(Some(SlotClaim { author_pub, pre_digest, timestamp }))
299+
}
300+
301+
// This explicitly creates the inherent data for parachains, as well as overriding the
302+
// timestamp based on the slot number.
303+
async fn create_inherent_data<B: BlockT>(
304+
relay_parent: PHash,
305+
validation_data: &PersistedValidationData,
306+
parent_hash: B::Hash,
307+
para_id: ParaId,
308+
timestamp: Timestamp,
309+
relay_chain_interface: &impl RelayChainInterface,
310+
create_inherent_data_providers: &impl CreateInherentDataProviders<B, ()>,
311+
) -> Result<(ParachainInherentData, InherentData), Box<dyn Error>> {
312+
let paras_inherent_data = ParachainInherentData::create_at(
313+
relay_parent,
314+
relay_chain_interface,
315+
validation_data,
316+
para_id,
317+
)
318+
.await;
319+
320+
let paras_inherent_data = match paras_inherent_data {
321+
Some(p) => p,
322+
None =>
323+
return Err(format!("Could not create paras inherent data at {:?}", relay_parent).into()),
324+
};
325+
326+
let mut other_inherent_data = create_inherent_data_providers
327+
.create_inherent_data_providers(parent_hash, ())
328+
.map_err(|e| e as Box<dyn Error>)
329+
.await?
330+
.create_inherent_data()
331+
.await
332+
.map_err(Box::new)?;
333+
334+
other_inherent_data.replace_data(sp_timestamp::INHERENT_IDENTIFIER, &timestamp);
335+
336+
Ok((paras_inherent_data, other_inherent_data))
337+
}
338+
339+
fn seal<B: BlockT, T, P>(
340+
pre_sealed: B,
341+
storage_changes: StorageChanges<T, HashFor<B>>,
342+
author_pub: &P::Public,
343+
keystore: &KeystorePtr,
344+
) -> Result<BlockImportParams<B, T>, Box<dyn Error>>
345+
where
346+
P: Pair,
347+
P::Signature: Encode + Decode + TryFrom<Vec<u8>>,
348+
P::Public: AppPublic,
349+
{
350+
let (pre_header, body) = pre_sealed.deconstruct();
351+
let pre_hash = pre_header.hash();
352+
let block_number = *pre_header.number();
353+
354+
// seal the block.
355+
let block_import_params = {
356+
let seal_digest =
357+
aura_internal::seal::<_, P>(&pre_hash, &author_pub, keystore).map_err(Box::new)?;
358+
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, pre_header);
359+
block_import_params.post_digests.push(seal_digest);
360+
block_import_params.body = Some(body.clone());
361+
block_import_params.state_action =
362+
StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
363+
block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
364+
block_import_params
365+
};
366+
let post_hash = block_import_params.post_hash();
367+
368+
tracing::info!(
369+
target: crate::LOG_TARGET,
370+
"🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
371+
block_number,
372+
post_hash,
373+
pre_hash,
374+
);
375+
376+
Ok(block_import_params)
377+
}

0 commit comments

Comments
 (0)