1+ use crate :: consensus:: hybrid_import_queue:: HybridBlockImport ;
12use crate :: consensus:: { ConsensusMechanism , StartAuthoringParams } ;
23use crate :: {
34 client:: { FullBackend , FullClient } ,
4- conditional_evm_block_import:: ConditionalEVMBlockImport ,
55 ethereum:: EthConfiguration ,
66 service:: { BIQ , FullSelectChain , GrandpaBlockImport } ,
77} ;
8- use fc_consensus:: FrontierBlockImport ;
98use jsonrpsee:: tokio;
109use node_subtensor_runtime:: opaque:: Block ;
11- use sc_client_api:: { AuxStore , BlockOf } ;
10+ use sc_client_api:: { AuxStore , BlockOf , UsageProvider } ;
1211use sc_consensus:: { BlockImport , BoxBlockImport } ;
1312use sc_consensus_grandpa:: BlockNumberOps ;
1413use sc_consensus_slots:: { BackoffAuthoringBlocksStrategy , InherentDataProviderExt } ;
14+ use sc_network_sync:: SyncingService ;
1515use sc_service:: { Configuration , TaskManager } ;
1616use sc_telemetry:: TelemetryHandle ;
1717use sc_transaction_pool:: TransactionPoolHandle ;
18+ use sc_transaction_pool_api:: OffchainTransactionPoolFactory ;
1819use sp_api:: ProvideRuntimeApi ;
1920use sp_blockchain:: { HeaderBackend , HeaderMetadata } ;
2021use sp_consensus:: { Environment , Proposer , SelectChain , SyncOracle } ;
21- use sp_consensus_aura:: sr25519:: AuthorityId ;
22+ use sp_consensus_aura:: sr25519:: AuthorityId as AuraAuthorityId ;
2223use sp_consensus_aura:: { AuraApi , sr25519:: AuthorityPair as AuraPair } ;
24+ use sp_consensus_babe:: AuthorityId as BabeAuthorityId ;
25+ use sp_consensus_babe:: BabeConfiguration ;
2326use sp_consensus_slots:: SlotDuration ;
2427use sp_inherents:: CreateInherentDataProviders ;
2528use sp_keystore:: KeystorePtr ;
29+ use sp_runtime:: traits:: Block as BlockT ;
2630use sp_runtime:: traits:: NumberFor ;
2731use std:: { error:: Error , sync:: Arc } ;
2832
@@ -48,7 +52,7 @@ impl ConsensusMechanism for AuraConsensus {
4852 + Send
4953 + Sync
5054 + ' static ,
51- C :: Api : AuraApi < Block , AuthorityId > ,
55+ C :: Api : AuraApi < Block , AuraAuthorityId > ,
5256 SC : SelectChain < Block > + ' static ,
5357 I : BlockImport < Block , Error = sp_consensus:: Error > + Send + Sync + ' static ,
5458 PF : Environment < Block , Error = Error > + Send + Sync + ' static ,
@@ -122,16 +126,18 @@ impl ConsensusMechanism for AuraConsensus {
122126 {
123127 let build_import_queue = Box :: new (
124128 move |client : Arc < FullClient > ,
125- _backend : Arc < FullBackend > ,
126- config : & Configuration ,
129+ backend : Arc < FullBackend > ,
130+ service_config : & Configuration ,
127131 _eth_config : & EthConfiguration ,
128132 task_manager : & TaskManager ,
129133 telemetry : Option < TelemetryHandle > ,
130134 grandpa_block_import : GrandpaBlockImport ,
131- _transaction_pool : Arc < TransactionPoolHandle < Block , FullClient > > | {
132- let conditional_block_import = ConditionalEVMBlockImport :: new (
135+ transaction_pool : Arc < TransactionPoolHandle < Block , FullClient > > | {
136+ let expected_babe_config = get_expected_babe_configuration ( & * client) ?;
137+ let conditional_block_import = HybridBlockImport :: new (
138+ client. clone ( ) ,
133139 grandpa_block_import. clone ( ) ,
134- FrontierBlockImport :: new ( grandpa_block_import . clone ( ) , client . clone ( ) ) ,
140+ expected_babe_config . clone ( ) ,
135141 ) ;
136142
137143 let slot_duration = sc_consensus_aura:: slot_duration ( & * client) ?;
@@ -145,17 +151,28 @@ impl ConsensusMechanism for AuraConsensus {
145151 Ok ( ( slot, timestamp) )
146152 } ;
147153
148- let import_queue = super :: aura_wrapped_import_queue:: import_queue (
149- sc_consensus_aura:: ImportQueueParams {
154+ // Aura needs the hybrid import queue, because it needs to
155+ // 1. Validate the first Babe block it encounters before switching into Babe
156+ // consensus mode
157+ // 2. Import the entire blockchain without restarting during warp sync, because
158+ // warp sync does not allow restarting sync midway.
159+ let import_queue = super :: hybrid_import_queue:: import_queue (
160+ crate :: consensus:: hybrid_import_queue:: HybridImportQueueParams {
150161 block_import : conditional_block_import. clone ( ) ,
151162 justification_import : Some ( Box :: new ( grandpa_block_import. clone ( ) ) ) ,
152163 client,
153164 create_inherent_data_providers,
154165 spawner : & task_manager. spawn_essential_handle ( ) ,
155- registry : config . prometheus_registry ( ) ,
166+ registry : service_config . prometheus_registry ( ) ,
156167 check_for_equivocation : Default :: default ( ) ,
157168 telemetry,
158169 compatibility_mode : sc_consensus_aura:: CompatibilityMode :: None ,
170+ select_chain : sc_consensus:: LongestChain :: new ( backend. clone ( ) ) ,
171+ babe_config : expected_babe_config,
172+ epoch_changes : conditional_block_import. babe_link ( ) . epoch_changes ( ) . clone ( ) ,
173+ offchain_tx_pool_factory : OffchainTransactionPoolFactory :: new (
174+ transaction_pool,
175+ ) ,
159176 } ,
160177 )
161178 . map_err :: < sc_service:: Error , _ > ( Into :: into) ?;
@@ -178,31 +195,44 @@ impl ConsensusMechanism for AuraConsensus {
178195 & self ,
179196 task_manager : & mut TaskManager ,
180197 client : Arc < FullClient > ,
181- triggered : Option < Arc < std:: sync:: atomic:: AtomicBool > > ,
198+ custom_service_signal : Option < Arc < std:: sync:: atomic:: AtomicBool > > ,
199+ sync_service : Arc < SyncingService < Block > > ,
182200 ) -> Result < ( ) , sc_service:: Error > {
183201 let client_clone = client. clone ( ) ;
184- let triggered_clone = triggered . clone ( ) ;
202+ let custom_service_signal_clone = custom_service_signal . clone ( ) ;
185203 let slot_duration = self . slot_duration ( & client) ?;
186204 task_manager. spawn_essential_handle ( ) . spawn (
187- "babe-switch" ,
188- None ,
189- Box :: pin ( async move {
190- let client = client_clone;
191- let triggered = triggered_clone;
192- loop {
193- // Check if the runtime is Babe once per block.
194- if let Ok ( c) = sc_consensus_babe:: configuration ( & * client) {
195- if !c. authorities . is_empty ( ) {
196- log:: info!( "Babe runtime detected! Intentionally failing the essential handle `babe-switch` to trigger switch to Babe service." ) ;
197- if let Some ( triggered) = triggered {
198- triggered. store ( true , std:: sync:: atomic:: Ordering :: SeqCst ) ;
199- } ;
200- break ;
201- }
202- } ;
203- tokio:: time:: sleep ( slot_duration. as_duration ( ) ) . await ;
204- }
205- } ) ) ;
205+ "babe-switch" ,
206+ None ,
207+ Box :: pin ( async move {
208+ let client = client_clone;
209+ let custom_service_signal = custom_service_signal_clone;
210+ loop {
211+ // Check if the runtime is Babe once per block.
212+ if let Ok ( c) = sc_consensus_babe:: configuration ( & * client) {
213+ // Aura Consensus uses the hybrid import queue which is able to import both
214+ // Aura and Babe blocks. Wait until sync finishes before switching to the
215+ // Babe service to not break warp sync.
216+ //
217+ // Note that although unintuitive, it is required that we wait until BOTH
218+ // warp sync and state sync are finished before we can safely switch to the
219+ // Babe service. If we only wait for the "warp sync" to finish while state
220+ // sync is still in progress prior to switching, the warp sync will not
221+ // complete successfully.
222+ let syncing = sync_service. status ( ) . await . is_ok_and ( |status| status. warp_sync . is_some ( ) || status. state_sync . is_some ( ) ) ;
223+ if !c. authorities . is_empty ( ) && !syncing {
224+ log:: info!( "Babe runtime detected! Intentionally failing the essential handle `babe-switch` to trigger switch to Babe service." ) ;
225+ // Signal that the node stopped due to the custom service exiting.
226+ if let Some ( custom_service_signal) = custom_service_signal {
227+ custom_service_signal. store ( true , std:: sync:: atomic:: Ordering :: SeqCst ) ;
228+ } ;
229+ break ;
230+ }
231+ } ;
232+ tokio:: time:: sleep ( slot_duration. as_duration ( ) ) . await ;
233+ }
234+ } ) ,
235+ ) ;
206236 Ok ( ( ) )
207237 }
208238
@@ -216,3 +246,41 @@ impl ConsensusMechanism for AuraConsensus {
216246 Ok ( Default :: default ( ) )
217247 }
218248}
249+
250+ /// Returns what the Babe configuration is expected to be at the first Babe block.
251+ ///
252+ /// This is required for the hybrid import queue, so it is ready to validate the first encountered
253+ /// babe block(s) before switching to Babe consensus.
254+ fn get_expected_babe_configuration < B : BlockT , C > (
255+ client : & C ,
256+ ) -> sp_blockchain:: Result < BabeConfiguration >
257+ where
258+ C : AuxStore + ProvideRuntimeApi < B > + UsageProvider < B > ,
259+ C :: Api : AuraApi < B , AuraAuthorityId > ,
260+ {
261+ let at_hash = if client. usage_info ( ) . chain . finalized_state . is_some ( ) {
262+ client. usage_info ( ) . chain . best_hash
263+ } else {
264+ client. usage_info ( ) . chain . genesis_hash
265+ } ;
266+
267+ let runtime_api = client. runtime_api ( ) ;
268+ let authorities = runtime_api
269+ . authorities ( at_hash) ?
270+ . into_iter ( )
271+ . map ( |a| ( BabeAuthorityId :: from ( a. into_inner ( ) ) , 1 ) )
272+ . collect ( ) ;
273+
274+ let slot_duration = runtime_api. slot_duration ( at_hash) ?. as_millis ( ) ;
275+ let epoch_config = node_subtensor_runtime:: BABE_GENESIS_EPOCH_CONFIG ;
276+ let config = sp_consensus_babe:: BabeConfiguration {
277+ slot_duration,
278+ epoch_length : node_subtensor_runtime:: EPOCH_DURATION_IN_SLOTS ,
279+ c : epoch_config. c ,
280+ authorities,
281+ randomness : Default :: default ( ) ,
282+ allowed_slots : epoch_config. allowed_slots ,
283+ } ;
284+
285+ Ok ( config)
286+ }
0 commit comments