@@ -221,7 +221,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
221221 B : Block ,
222222 BE : Backend < B > ,
223223 C : Client < B , BE > + BlockBackend < B > ,
224- P : PayloadProvider < B > ,
224+ P : PayloadProvider < B > + Clone ,
225225 R : ProvideRuntimeApi < B > ,
226226 R :: Api : BeefyApi < B , AuthorityId > + MmrApi < B , MmrRootHash , NumberFor < B > > ,
227227 N : GossipNetwork < B > + NetworkRequest + Send + Sync + ' static ,
@@ -237,7 +237,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
237237 min_block_delta,
238238 prometheus_registry,
239239 links,
240- on_demand_justifications_handler,
240+ mut on_demand_justifications_handler,
241241 } = beefy_params;
242242
243243 let BeefyNetworkParams {
@@ -248,83 +248,105 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
248248 ..
249249 } = network_params;
250250
251- let known_peers = Arc :: new ( Mutex :: new ( KnownPeers :: new ( ) ) ) ;
252- // Default votes filter is to discard everything.
253- // Validator is updated later with correct starting round and set id.
254- let ( gossip_validator, gossip_report_stream) =
255- communication:: gossip:: GossipValidator :: new ( known_peers. clone ( ) ) ;
256- let gossip_validator = Arc :: new ( gossip_validator) ;
257- let mut gossip_engine = GossipEngine :: new (
258- network. clone ( ) ,
259- sync. clone ( ) ,
260- gossip_protocol_name,
261- gossip_validator. clone ( ) ,
262- None ,
263- ) ;
264251 let metrics = register_metrics ( prometheus_registry. clone ( ) ) ;
265252
266- // The `GossipValidator` adds and removes known peers based on valid votes and network events.
267- let on_demand_justifications = OnDemandJustificationsEngine :: new (
268- network. clone ( ) ,
269- justifications_protocol_name,
270- known_peers,
271- prometheus_registry. clone ( ) ,
272- ) ;
273-
274253 // Subscribe to finality notifications and justifications before waiting for runtime pallet and
275254 // reuse the streams, so we don't miss notifications while waiting for pallet to be available.
276255 let mut finality_notifications = client. finality_notification_stream ( ) . fuse ( ) ;
277- let block_import_justif = links. from_block_import_justif_stream . subscribe ( 100_000 ) . fuse ( ) ;
278-
279- // Wait for BEEFY pallet to be active before starting voter.
280- let persisted_state =
281- match wait_for_runtime_pallet ( & * runtime, & mut gossip_engine, & mut finality_notifications)
282- . await
283- . and_then ( |( beefy_genesis, best_grandpa) | {
284- load_or_init_voter_state (
285- & * backend,
286- & * runtime,
287- beefy_genesis,
288- best_grandpa,
289- min_block_delta,
290- )
291- } ) {
256+ let mut block_import_justif = links. from_block_import_justif_stream . subscribe ( 100_000 ) . fuse ( ) ;
257+
258+ // We re-create and re-run the worker in this loop in order to quickly reinit and resume after
259+ // select recoverable errors.
260+ loop {
261+ let known_peers = Arc :: new ( Mutex :: new ( KnownPeers :: new ( ) ) ) ;
262+ // Default votes filter is to discard everything.
263+ // Validator is updated later with correct starting round and set id.
264+ let ( gossip_validator, gossip_report_stream) =
265+ communication:: gossip:: GossipValidator :: new ( known_peers. clone ( ) ) ;
266+ let gossip_validator = Arc :: new ( gossip_validator) ;
267+ let mut gossip_engine = GossipEngine :: new (
268+ network. clone ( ) ,
269+ sync. clone ( ) ,
270+ gossip_protocol_name. clone ( ) ,
271+ gossip_validator. clone ( ) ,
272+ None ,
273+ ) ;
274+
275+ // The `GossipValidator` adds and removes known peers based on valid votes and network
276+ // events.
277+ let on_demand_justifications = OnDemandJustificationsEngine :: new (
278+ network. clone ( ) ,
279+ justifications_protocol_name. clone ( ) ,
280+ known_peers,
281+ prometheus_registry. clone ( ) ,
282+ ) ;
283+
284+ // Wait for BEEFY pallet to be active before starting voter.
285+ let persisted_state = match wait_for_runtime_pallet (
286+ & * runtime,
287+ & mut gossip_engine,
288+ & mut finality_notifications,
289+ )
290+ . await
291+ . and_then ( |( beefy_genesis, best_grandpa) | {
292+ load_or_init_voter_state (
293+ & * backend,
294+ & * runtime,
295+ beefy_genesis,
296+ best_grandpa,
297+ min_block_delta,
298+ )
299+ } ) {
292300 Ok ( state) => state,
293301 Err ( e) => {
294302 error ! ( target: LOG_TARGET , "Error: {:?}. Terminating." , e) ;
295303 return
296304 } ,
297305 } ;
298- // Update the gossip validator with the right starting round and set id.
299- if let Err ( e) = persisted_state
300- . gossip_filter_config ( )
301- . map ( |f| gossip_validator. update_filter ( f) )
302- {
303- error ! ( target: LOG_TARGET , "Error: {:?}. Terminating." , e) ;
304- return
305- }
306+ // Update the gossip validator with the right starting round and set id.
307+ if let Err ( e) = persisted_state
308+ . gossip_filter_config ( )
309+ . map ( |f| gossip_validator. update_filter ( f) )
310+ {
311+ error ! ( target: LOG_TARGET , "Error: {:?}. Terminating." , e) ;
312+ return
313+ }
306314
307- let worker = worker:: BeefyWorker {
308- backend,
309- payload_provider,
310- runtime,
311- sync,
312- key_store : key_store. into ( ) ,
313- gossip_engine,
314- gossip_validator,
315- gossip_report_stream,
316- on_demand_justifications,
317- links,
318- metrics,
319- pending_justifications : BTreeMap :: new ( ) ,
320- persisted_state,
321- } ;
315+ let worker = worker:: BeefyWorker {
316+ backend : backend . clone ( ) ,
317+ payload_provider : payload_provider . clone ( ) ,
318+ runtime : runtime . clone ( ) ,
319+ sync : sync . clone ( ) ,
320+ key_store : key_store. clone ( ) . into ( ) ,
321+ gossip_engine,
322+ gossip_validator,
323+ gossip_report_stream,
324+ on_demand_justifications,
325+ links : links . clone ( ) ,
326+ metrics : metrics . clone ( ) ,
327+ pending_justifications : BTreeMap :: new ( ) ,
328+ persisted_state,
329+ } ;
322330
323- futures:: future:: select (
324- Box :: pin ( worker. run ( block_import_justif, finality_notifications) ) ,
325- Box :: pin ( on_demand_justifications_handler. run ( ) ) ,
326- )
327- . await ;
331+ match futures:: future:: select (
332+ Box :: pin ( worker. run ( & mut block_import_justif, & mut finality_notifications) ) ,
333+ Box :: pin ( on_demand_justifications_handler. run ( ) ) ,
334+ )
335+ . await
336+ {
337+ // On `ConsensusReset` error, just reinit and restart voter.
338+ futures:: future:: Either :: Left ( ( error:: Error :: ConsensusReset , _) ) => {
339+ error ! ( target: LOG_TARGET , "🥩 Error: {:?}. Restarting voter." , error:: Error :: ConsensusReset ) ;
340+ continue
341+ } ,
342+ // On other errors, bring down / finish the task.
343+ futures:: future:: Either :: Left ( ( worker_err, _) ) =>
344+ error ! ( target: LOG_TARGET , "🥩 Error: {:?}. Terminating." , worker_err) ,
345+ futures:: future:: Either :: Right ( ( odj_handler_err, _) ) =>
346+ error ! ( target: LOG_TARGET , "🥩 Error: {:?}. Terminating." , odj_handler_err) ,
347+ } ;
348+ return
349+ }
328350}
329351
330352fn load_or_init_voter_state < B , BE , R > (
0 commit comments