| 
17 | 17 | 
 
  | 
18 | 18 | //! Cumulus Collator implementation for Substrate.  | 
19 | 19 | 
  | 
20 |  | -use cumulus_primitives_core::{  | 
21 |  | -	relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData,  | 
22 |  | -};  | 
23 |  | - | 
24 |  | -use sc_client_api::BlockBackend;  | 
25 |  | -use sp_api::ProvideRuntimeApi;  | 
26 |  | -use sp_core::traits::SpawnNamed;  | 
27 |  | -use sp_runtime::traits::{Block as BlockT, Header as HeaderT};  | 
28 |  | - | 
29 |  | -use cumulus_client_consensus_common::ParachainConsensus;  | 
30 |  | -use polkadot_node_primitives::{CollationGenerationConfig, CollationResult, MaybeCompressedPoV};  | 
 | 20 | +use polkadot_node_primitives::CollationGenerationConfig;  | 
31 | 21 | use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage};  | 
32 | 22 | use polkadot_overseer::Handle as OverseerHandle;  | 
33 | 23 | use polkadot_primitives::{CollatorPair, Id as ParaId};  | 
34 |  | - | 
35 |  | -use codec::Decode;  | 
36 |  | -use futures::prelude::*;  | 
37 |  | -use std::sync::Arc;  | 
38 |  | - | 
39 |  | -use crate::service::CollatorService;  | 
40 |  | - | 
41 | 24 | pub mod service;  | 
42 | 25 | 
 
  | 
43 |  | -/// The logging target.  | 
44 |  | -const LOG_TARGET: &str = "cumulus-collator";  | 
45 |  | - | 
46 |  | -/// The implementation of the Cumulus `Collator`.  | 
47 |  | -///  | 
48 |  | -/// Note that this implementation is soon to be deprecated and removed, and it is suggested to  | 
49 |  | -/// directly use the [`CollatorService`] instead, so consensus engine implementations  | 
50 |  | -/// live at the top level.  | 
51 |  | -pub struct Collator<Block: BlockT, BS, RA> {  | 
52 |  | -	service: CollatorService<Block, BS, RA>,  | 
53 |  | -	parachain_consensus: Box<dyn ParachainConsensus<Block>>,  | 
54 |  | -}  | 
55 |  | - | 
56 |  | -impl<Block: BlockT, BS, RA> Clone for Collator<Block, BS, RA> {  | 
57 |  | -	fn clone(&self) -> Self {  | 
58 |  | -		Collator {  | 
59 |  | -			service: self.service.clone(),  | 
60 |  | -			parachain_consensus: self.parachain_consensus.clone(),  | 
61 |  | -		}  | 
62 |  | -	}  | 
63 |  | -}  | 
64 |  | - | 
65 |  | -impl<Block, BS, RA> Collator<Block, BS, RA>  | 
66 |  | -where  | 
67 |  | -	Block: BlockT,  | 
68 |  | -	BS: BlockBackend<Block>,  | 
69 |  | -	RA: ProvideRuntimeApi<Block>,  | 
70 |  | -	RA::Api: CollectCollationInfo<Block>,  | 
71 |  | -{  | 
72 |  | -	/// Create a new instance.  | 
73 |  | -	fn new(  | 
74 |  | -		collator_service: CollatorService<Block, BS, RA>,  | 
75 |  | -		parachain_consensus: Box<dyn ParachainConsensus<Block>>,  | 
76 |  | -	) -> Self {  | 
77 |  | -		Self { service: collator_service, parachain_consensus }  | 
78 |  | -	}  | 
79 |  | - | 
80 |  | -	async fn produce_candidate(  | 
81 |  | -		mut self,  | 
82 |  | -		relay_parent: PHash,  | 
83 |  | -		validation_data: PersistedValidationData,  | 
84 |  | -	) -> Option<CollationResult> {  | 
85 |  | -		tracing::trace!(  | 
86 |  | -			target: LOG_TARGET,  | 
87 |  | -			relay_parent = ?relay_parent,  | 
88 |  | -			"Producing candidate",  | 
89 |  | -		);  | 
90 |  | - | 
91 |  | -		let last_head = match Block::Header::decode(&mut &validation_data.parent_head.0[..]) {  | 
92 |  | -			Ok(x) => x,  | 
93 |  | -			Err(e) => {  | 
94 |  | -				tracing::error!(  | 
95 |  | -					target: LOG_TARGET,  | 
96 |  | -					error = ?e,  | 
97 |  | -					"Could not decode the head data."  | 
98 |  | -				);  | 
99 |  | -				return None  | 
100 |  | -			},  | 
101 |  | -		};  | 
102 |  | - | 
103 |  | -		let last_head_hash = last_head.hash();  | 
104 |  | -		if !self.service.check_block_status(last_head_hash, &last_head) {  | 
105 |  | -			return None  | 
106 |  | -		}  | 
107 |  | - | 
108 |  | -		tracing::info!(  | 
109 |  | -			target: LOG_TARGET,  | 
110 |  | -			relay_parent = ?relay_parent,  | 
111 |  | -			at = ?last_head_hash,  | 
112 |  | -			"Starting collation.",  | 
113 |  | -		);  | 
114 |  | - | 
115 |  | -		let candidate = self  | 
116 |  | -			.parachain_consensus  | 
117 |  | -			.produce_candidate(&last_head, relay_parent, &validation_data)  | 
118 |  | -			.await?;  | 
119 |  | - | 
120 |  | -		let block_hash = candidate.block.header().hash();  | 
121 |  | - | 
122 |  | -		let (collation, b) = self.service.build_collation(&last_head, block_hash, candidate)?;  | 
123 |  | - | 
124 |  | -		b.log_size_info();  | 
125 |  | - | 
126 |  | -		if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {  | 
127 |  | -			tracing::info!(  | 
128 |  | -				target: LOG_TARGET,  | 
129 |  | -				"Compressed PoV size: {}kb",  | 
130 |  | -				pov.block_data.0.len() as f64 / 1024f64,  | 
131 |  | -			);  | 
132 |  | -		}  | 
133 |  | - | 
134 |  | -		let result_sender = self.service.announce_with_barrier(block_hash);  | 
135 |  | - | 
136 |  | -		tracing::info!(target: LOG_TARGET, ?block_hash, "Produced proof-of-validity candidate.",);  | 
137 |  | - | 
138 |  | -		Some(CollationResult { collation, result_sender: Some(result_sender) })  | 
139 |  | -	}  | 
140 |  | -}  | 
141 |  | - | 
142 | 26 | /// Relay-chain-driven collators are those whose block production is driven purely  | 
143 | 27 | /// by new relay chain blocks and the most recently included parachain blocks  | 
144 | 28 | /// within them.  | 
@@ -255,220 +139,3 @@ pub async fn initialize_collator_subsystems(  | 
255 | 139 | 		.send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator")  | 
256 | 140 | 		.await;  | 
257 | 141 | }  | 
258 |  | - | 
259 |  | -/// Parameters for [`start_collator`].  | 
260 |  | -pub struct StartCollatorParams<Block: BlockT, RA, BS, Spawner> {  | 
261 |  | -	pub para_id: ParaId,  | 
262 |  | -	pub runtime_api: Arc<RA>,  | 
263 |  | -	pub block_status: Arc<BS>,  | 
264 |  | -	pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,  | 
265 |  | -	pub overseer_handle: OverseerHandle,  | 
266 |  | -	pub spawner: Spawner,  | 
267 |  | -	pub key: CollatorPair,  | 
268 |  | -	pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,  | 
269 |  | -}  | 
270 |  | - | 
271 |  | -/// Start the collator.  | 
272 |  | -#[deprecated = "Collators should run consensus futures which handle this logic internally"]  | 
273 |  | -pub async fn start_collator<Block, RA, BS, Spawner>(  | 
274 |  | -	params: StartCollatorParams<Block, RA, BS, Spawner>,  | 
275 |  | -) where  | 
276 |  | -	Block: BlockT,  | 
277 |  | -	BS: BlockBackend<Block> + Send + Sync + 'static,  | 
278 |  | -	Spawner: SpawnNamed + Clone + Send + Sync + 'static,  | 
279 |  | -	RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,  | 
280 |  | -	RA::Api: CollectCollationInfo<Block>,  | 
281 |  | -{  | 
282 |  | -	// This never needed to be asynchronous, but shouldn't be changed due to backcompat.  | 
283 |  | -	#[allow(deprecated)]  | 
284 |  | -	start_collator_sync(params);  | 
285 |  | -}  | 
286 |  | - | 
287 |  | -/// Start the collator in a synchronous function.  | 
288 |  | -#[deprecated = "Collators should run consensus futures which handle this logic internally"]  | 
289 |  | -pub fn start_collator_sync<Block, RA, BS, Spawner>(  | 
290 |  | -	StartCollatorParams {  | 
291 |  | -		para_id,  | 
292 |  | -		block_status,  | 
293 |  | -		announce_block,  | 
294 |  | -		overseer_handle,  | 
295 |  | -		spawner,  | 
296 |  | -		key,  | 
297 |  | -		parachain_consensus,  | 
298 |  | -		runtime_api,  | 
299 |  | -	}: StartCollatorParams<Block, RA, BS, Spawner>,  | 
300 |  | -) where  | 
301 |  | -	Block: BlockT,  | 
302 |  | -	BS: BlockBackend<Block> + Send + Sync + 'static,  | 
303 |  | -	Spawner: SpawnNamed + Clone + Send + Sync + 'static,  | 
304 |  | -	RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,  | 
305 |  | -	RA::Api: CollectCollationInfo<Block>,  | 
306 |  | -{  | 
307 |  | -	let collator_service =  | 
308 |  | -		CollatorService::new(block_status, Arc::new(spawner.clone()), announce_block, runtime_api);  | 
309 |  | - | 
310 |  | -	let collator = Collator::new(collator_service, parachain_consensus);  | 
311 |  | - | 
312 |  | -	let collation_future = Box::pin(async move {  | 
313 |  | -		let mut request_stream = relay_chain_driven::init(key, para_id, overseer_handle).await;  | 
314 |  | -		while let Some(request) = request_stream.next().await {  | 
315 |  | -			let collation = collator  | 
316 |  | -				.clone()  | 
317 |  | -				.produce_candidate(  | 
318 |  | -					*request.relay_parent(),  | 
319 |  | -					request.persisted_validation_data().clone(),  | 
320 |  | -				)  | 
321 |  | -				.await;  | 
322 |  | - | 
323 |  | -			request.complete(collation);  | 
324 |  | -		}  | 
325 |  | -	});  | 
326 |  | - | 
327 |  | -	spawner.spawn("cumulus-relay-driven-collator", None, collation_future);  | 
328 |  | -}  | 
329 |  | - | 
330 |  | -#[cfg(test)]  | 
331 |  | -mod tests {  | 
332 |  | -	use super::*;  | 
333 |  | -	use async_trait::async_trait;  | 
334 |  | -	use codec::Encode;  | 
335 |  | -	use cumulus_client_consensus_common::ParachainCandidate;  | 
336 |  | -	use cumulus_primitives_core::ParachainBlockData;  | 
337 |  | -	use cumulus_test_client::{  | 
338 |  | -		Client, ClientBlockImportExt, DefaultTestClientBuilderExt, InitBlockBuilder,  | 
339 |  | -		TestClientBuilder, TestClientBuilderExt,  | 
340 |  | -	};  | 
341 |  | -	use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;  | 
342 |  | -	use cumulus_test_runtime::{Block, Header};  | 
343 |  | -	use futures::{channel::mpsc, executor::block_on, StreamExt};  | 
344 |  | -	use polkadot_node_primitives::CollationGenerationConfig;  | 
345 |  | -	use polkadot_node_subsystem::messages::CollationGenerationMessage;  | 
346 |  | -	use polkadot_node_subsystem_test_helpers::ForwardSubsystem;  | 
347 |  | -	use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains};  | 
348 |  | -	use polkadot_primitives::HeadData;  | 
349 |  | -	use sp_consensus::BlockOrigin;  | 
350 |  | -	use sp_core::{testing::TaskExecutor, Pair};  | 
351 |  | -	use sp_runtime::traits::BlakeTwo256;  | 
352 |  | -	use sp_state_machine::Backend;  | 
353 |  | - | 
354 |  | -	struct AlwaysSupportsParachains;  | 
355 |  | - | 
356 |  | -	#[async_trait]  | 
357 |  | -	impl HeadSupportsParachains for AlwaysSupportsParachains {  | 
358 |  | -		async fn head_supports_parachains(&self, _head: &PHash) -> bool {  | 
359 |  | -			true  | 
360 |  | -		}  | 
361 |  | -	}  | 
362 |  | - | 
363 |  | -	#[derive(Clone)]  | 
364 |  | -	struct DummyParachainConsensus {  | 
365 |  | -		client: Arc<Client>,  | 
366 |  | -	}  | 
367 |  | - | 
368 |  | -	#[async_trait::async_trait]  | 
369 |  | -	impl ParachainConsensus<Block> for DummyParachainConsensus {  | 
370 |  | -		async fn produce_candidate(  | 
371 |  | -			&mut self,  | 
372 |  | -			parent: &Header,  | 
373 |  | -			_: PHash,  | 
374 |  | -			validation_data: &PersistedValidationData,  | 
375 |  | -		) -> Option<ParachainCandidate<Block>> {  | 
376 |  | -			let mut sproof = RelayStateSproofBuilder::default();  | 
377 |  | -			sproof.included_para_head = Some(HeadData(parent.encode()));  | 
378 |  | -			sproof.para_id = cumulus_test_runtime::PARACHAIN_ID.into();  | 
379 |  | - | 
380 |  | -			let cumulus_test_client::BlockBuilderAndSupportData { block_builder, .. } = self  | 
381 |  | -				.client  | 
382 |  | -				.init_block_builder_at(parent.hash(), Some(validation_data.clone()), sproof);  | 
383 |  | - | 
384 |  | -			let (block, _, proof) = block_builder.build().expect("Creates block").into_inner();  | 
385 |  | - | 
386 |  | -			self.client  | 
387 |  | -				.import(BlockOrigin::Own, block.clone())  | 
388 |  | -				.await  | 
389 |  | -				.expect("Imports the block");  | 
390 |  | - | 
391 |  | -			Some(ParachainCandidate { block, proof: proof.expect("Proof is returned") })  | 
392 |  | -		}  | 
393 |  | -	}  | 
394 |  | - | 
395 |  | -	#[test]  | 
396 |  | -	fn collates_produces_a_block_and_storage_proof_does_not_contains_code() {  | 
397 |  | -		sp_tracing::try_init_simple();  | 
398 |  | - | 
399 |  | -		let spawner = TaskExecutor::new();  | 
400 |  | -		let para_id = ParaId::from(100);  | 
401 |  | -		let announce_block = |_, _| ();  | 
402 |  | -		let client = Arc::new(TestClientBuilder::new().build());  | 
403 |  | -		let header = client.header(client.chain_info().genesis_hash).unwrap().unwrap();  | 
404 |  | - | 
405 |  | -		let (sub_tx, sub_rx) = mpsc::channel(64);  | 
406 |  | - | 
407 |  | -		let (overseer, handle) =  | 
408 |  | -			dummy_overseer_builder(spawner.clone(), AlwaysSupportsParachains, None)  | 
409 |  | -				.expect("Creates overseer builder")  | 
410 |  | -				.replace_collation_generation(|_| ForwardSubsystem(sub_tx))  | 
411 |  | -				.build()  | 
412 |  | -				.expect("Builds overseer");  | 
413 |  | - | 
414 |  | -		spawner.spawn("overseer", None, overseer.run().then(|_| async {}).boxed());  | 
415 |  | - | 
416 |  | -		#[allow(deprecated)]  | 
417 |  | -		let collator_start = start_collator(StartCollatorParams {  | 
418 |  | -			runtime_api: client.clone(),  | 
419 |  | -			block_status: client.clone(),  | 
420 |  | -			announce_block: Arc::new(announce_block),  | 
421 |  | -			overseer_handle: OverseerHandle::new(handle),  | 
422 |  | -			spawner,  | 
423 |  | -			para_id,  | 
424 |  | -			key: CollatorPair::generate().0,  | 
425 |  | -			parachain_consensus: Box::new(DummyParachainConsensus { client }),  | 
426 |  | -		});  | 
427 |  | -		block_on(collator_start);  | 
428 |  | - | 
429 |  | -		let msg = block_on(sub_rx.into_future())  | 
430 |  | -			.0  | 
431 |  | -			.expect("message should be send by `start_collator` above.");  | 
432 |  | - | 
433 |  | -		let collator_fn = match msg {  | 
434 |  | -			CollationGenerationMessage::Initialize(CollationGenerationConfig {  | 
435 |  | -				collator: Some(c),  | 
436 |  | -				..  | 
437 |  | -			}) => c,  | 
438 |  | -			_ => panic!("unexpected message or no collator fn"),  | 
439 |  | -		};  | 
440 |  | - | 
441 |  | -		let validation_data =  | 
442 |  | -			PersistedValidationData { parent_head: header.encode().into(), ..Default::default() };  | 
443 |  | -		let relay_parent = Default::default();  | 
444 |  | - | 
445 |  | -		let collation = block_on(collator_fn(relay_parent, &validation_data))  | 
446 |  | -			.expect("Collation is build")  | 
447 |  | -			.collation;  | 
448 |  | - | 
449 |  | -		let pov = collation.proof_of_validity.into_compressed();  | 
450 |  | - | 
451 |  | -		let decompressed =  | 
452 |  | -			sp_maybe_compressed_blob::decompress(&pov.block_data.0, 1024 * 1024 * 10).unwrap();  | 
453 |  | - | 
454 |  | -		let block =  | 
455 |  | -			ParachainBlockData::<Block>::decode(&mut &decompressed[..]).expect("Is a valid block");  | 
456 |  | - | 
457 |  | -		assert_eq!(1, *block.blocks()[0].header().number());  | 
458 |  | - | 
459 |  | -		// Ensure that we did not include `:code` in the proof.  | 
460 |  | -		let proof = block.proof().clone();  | 
461 |  | - | 
462 |  | -		let backend = sp_state_machine::create_proof_check_backend::<BlakeTwo256>(  | 
463 |  | -			*header.state_root(),  | 
464 |  | -			proof.to_storage_proof::<BlakeTwo256>(None).unwrap().0,  | 
465 |  | -		)  | 
466 |  | -		.unwrap();  | 
467 |  | - | 
468 |  | -		// Should return an error, as it was not included while building the proof.  | 
469 |  | -		assert!(backend  | 
470 |  | -			.storage(sp_core::storage::well_known_keys::CODE)  | 
471 |  | -			.unwrap_err()  | 
472 |  | -			.contains("Trie lookup error: Database missing expected key"));  | 
473 |  | -	}  | 
474 |  | -}  | 
0 commit comments