Skip to content

Commit 92c68f8

Browse files
authored
sc-consensus-beefy: reuse instead of recreate GossipEngine (paritytech#1262)
"sc-consensus-beefy: restart voter on pallet reset #14821" introduced a mechanism to reinitialize the BEEFY worker on certain errors; but re-creating the GossipEngine doesn't play well with "Rework the event system of sc-network #14197". So this PR slightly changes the re-initialization logic to reuse the original GossipEngine and not recreate it. Signed-off-by: Adrian Catangiu <[email protected]>
1 parent 226f00b commit 92c68f8

File tree

2 files changed

+87
-62
lines changed

2 files changed

+87
-62
lines changed

substrate/client/consensus/beefy/src/lib.rs

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -255,36 +255,42 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
255255
let mut finality_notifications = client.finality_notification_stream().fuse();
256256
let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
257257

258+
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
259+
// Default votes filter is to discard everything.
260+
// Validator is updated later with correct starting round and set id.
261+
let (gossip_validator, gossip_report_stream) =
262+
communication::gossip::GossipValidator::new(known_peers.clone());
263+
let gossip_validator = Arc::new(gossip_validator);
264+
let gossip_engine = GossipEngine::new(
265+
network.clone(),
266+
sync.clone(),
267+
gossip_protocol_name.clone(),
268+
gossip_validator.clone(),
269+
None,
270+
);
271+
272+
// The `GossipValidator` adds and removes known peers based on valid votes and network
273+
// events.
274+
let on_demand_justifications = OnDemandJustificationsEngine::new(
275+
network.clone(),
276+
justifications_protocol_name.clone(),
277+
known_peers,
278+
prometheus_registry.clone(),
279+
);
280+
let mut beefy_comms = worker::BeefyComms {
281+
gossip_engine,
282+
gossip_validator,
283+
gossip_report_stream,
284+
on_demand_justifications,
285+
};
286+
258287
// We re-create and re-run the worker in this loop in order to quickly reinit and resume after
259288
// select recoverable errors.
260289
loop {
261-
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
262-
// Default votes filter is to discard everything.
263-
// Validator is updated later with correct starting round and set id.
264-
let (gossip_validator, gossip_report_stream) =
265-
communication::gossip::GossipValidator::new(known_peers.clone());
266-
let gossip_validator = Arc::new(gossip_validator);
267-
let mut gossip_engine = GossipEngine::new(
268-
network.clone(),
269-
sync.clone(),
270-
gossip_protocol_name.clone(),
271-
gossip_validator.clone(),
272-
None,
273-
);
274-
275-
// The `GossipValidator` adds and removes known peers based on valid votes and network
276-
// events.
277-
let on_demand_justifications = OnDemandJustificationsEngine::new(
278-
network.clone(),
279-
justifications_protocol_name.clone(),
280-
known_peers,
281-
prometheus_registry.clone(),
282-
);
283-
284290
// Wait for BEEFY pallet to be active before starting voter.
285291
let persisted_state = match wait_for_runtime_pallet(
286292
&*runtime,
287-
&mut gossip_engine,
293+
&mut beefy_comms.gossip_engine,
288294
&mut finality_notifications,
289295
)
290296
.await
@@ -306,7 +312,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
306312
// Update the gossip validator with the right starting round and set id.
307313
if let Err(e) = persisted_state
308314
.gossip_filter_config()
309-
.map(|f| gossip_validator.update_filter(f))
315+
.map(|f| beefy_comms.gossip_validator.update_filter(f))
310316
{
311317
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
312318
return
@@ -318,10 +324,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
318324
runtime: runtime.clone(),
319325
sync: sync.clone(),
320326
key_store: key_store.clone().into(),
321-
gossip_engine,
322-
gossip_validator,
323-
gossip_report_stream,
324-
on_demand_justifications,
327+
comms: beefy_comms,
325328
links: links.clone(),
326329
metrics: metrics.clone(),
327330
pending_justifications: BTreeMap::new(),
@@ -335,12 +338,13 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
335338
.await
336339
{
337340
// On `ConsensusReset` error, just reinit and restart voter.
338-
futures::future::Either::Left((error::Error::ConsensusReset, _)) => {
341+
futures::future::Either::Left(((error::Error::ConsensusReset, reuse_comms), _)) => {
339342
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
343+
beefy_comms = reuse_comms;
340344
continue
341345
},
342346
// On other errors, bring down / finish the task.
343-
futures::future::Either::Left((worker_err, _)) =>
347+
futures::future::Either::Left(((worker_err, _), _)) =>
344348
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err),
345349
futures::future::Either::Right((odj_handler_err, _)) =>
346350
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err),

substrate/client/consensus/beefy/src/worker.rs

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,16 @@ impl<B: Block> PersistedState<B> {
313313
}
314314
}
315315

316+
/// Helper object holding BEEFY worker communication/gossip components.
317+
///
318+
/// These are created once, but will be reused if worker is restarted/reinitialized.
319+
pub(crate) struct BeefyComms<B: Block> {
320+
pub gossip_engine: GossipEngine<B>,
321+
pub gossip_validator: Arc<GossipValidator<B>>,
322+
pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
323+
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
324+
}
325+
316326
/// A BEEFY worker plays the BEEFY protocol
317327
pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
318328
// utilities
@@ -322,11 +332,8 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
322332
pub sync: Arc<S>,
323333
pub key_store: BeefyKeystore,
324334

325-
// communication
326-
pub gossip_engine: GossipEngine<B>,
327-
pub gossip_validator: Arc<GossipValidator<B>>,
328-
pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
329-
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
335+
// communication (created once, but returned and reused if worker is restarted/reinitialized)
336+
pub comms: BeefyComms<B>,
330337

331338
// channels
332339
/// Links between the block importer, the background voter and the RPC layer.
@@ -475,7 +482,7 @@ where
475482
if let Err(e) = self
476483
.persisted_state
477484
.gossip_filter_config()
478-
.map(|filter| self.gossip_validator.update_filter(filter))
485+
.map(|filter| self.comms.gossip_validator.update_filter(filter))
479486
{
480487
error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e);
481488
}
@@ -495,7 +502,11 @@ where
495502
if let Some(finality_proof) = self.handle_vote(vote)? {
496503
let gossip_proof = GossipMessage::<B>::FinalityProof(finality_proof);
497504
let encoded_proof = gossip_proof.encode();
498-
self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
505+
self.comms.gossip_engine.gossip_message(
506+
proofs_topic::<B>(),
507+
encoded_proof,
508+
true,
509+
);
499510
},
500511
RoundAction::Drop => metric_inc!(self, beefy_stale_votes),
501512
RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
@@ -603,7 +614,7 @@ where
603614

604615
metric_set!(self, beefy_best_block, block_num);
605616

606-
self.on_demand_justifications.cancel_requests_older_than(block_num);
617+
self.comms.on_demand_justifications.cancel_requests_older_than(block_num);
607618

608619
if let Err(e) = self
609620
.backend
@@ -632,7 +643,7 @@ where
632643
// Update gossip validator votes filter.
633644
self.persisted_state
634645
.gossip_filter_config()
635-
.map(|filter| self.gossip_validator.update_filter(filter))?;
646+
.map(|filter| self.comms.gossip_validator.update_filter(filter))?;
636647
Ok(())
637648
}
638649

@@ -752,12 +763,14 @@ where
752763
err
753764
})? {
754765
let encoded_proof = GossipMessage::<B>::FinalityProof(finality_proof).encode();
755-
self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
766+
self.comms
767+
.gossip_engine
768+
.gossip_message(proofs_topic::<B>(), encoded_proof, true);
756769
} else {
757770
metric_inc!(self, beefy_votes_sent);
758771
debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
759772
let encoded_vote = GossipMessage::<B>::Vote(vote).encode();
760-
self.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
773+
self.comms.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
761774
}
762775

763776
// Persist state after vote to avoid double voting in case of voter restarts.
@@ -783,7 +796,7 @@ where
783796
// make sure there's also an on-demand justification request out for it.
784797
if let Some((block, active)) = self.voting_oracle().mandatory_pending() {
785798
// This only starts new request if there isn't already an active one.
786-
self.on_demand_justifications.request(block, active);
799+
self.comms.on_demand_justifications.request(block, active);
787800
}
788801
}
789802
}
@@ -796,15 +809,16 @@ where
796809
mut self,
797810
block_import_justif: &mut Fuse<NotificationReceiver<BeefyVersionedFinalityProof<B>>>,
798811
finality_notifications: &mut Fuse<FinalityNotifications<B>>,
799-
) -> Error {
812+
) -> (Error, BeefyComms<B>) {
800813
info!(
801814
target: LOG_TARGET,
802815
"🥩 run BEEFY worker, best grandpa: #{:?}.",
803816
self.best_grandpa_block()
804817
);
805818

806819
let mut votes = Box::pin(
807-
self.gossip_engine
820+
self.comms
821+
.gossip_engine
808822
.messages_for(votes_topic::<B>())
809823
.filter_map(|notification| async move {
810824
let vote = GossipMessage::<B>::decode_all(&mut &notification.message[..])
@@ -816,7 +830,8 @@ where
816830
.fuse(),
817831
);
818832
let mut gossip_proofs = Box::pin(
819-
self.gossip_engine
833+
self.comms
834+
.gossip_engine
820835
.messages_for(proofs_topic::<B>())
821836
.filter_map(|notification| async move {
822837
let proof = GossipMessage::<B>::decode_all(&mut &notification.message[..])
@@ -828,12 +843,12 @@ where
828843
.fuse(),
829844
);
830845

831-
loop {
846+
let error = loop {
832847
// Act on changed 'state'.
833848
self.process_new_state();
834849

835850
// Mutable reference used to drive the gossip engine.
836-
let mut gossip_engine = &mut self.gossip_engine;
851+
let mut gossip_engine = &mut self.comms.gossip_engine;
837852
// Use temp val and report after async section,
838853
// to avoid having to Mutex-wrap `gossip_engine`.
839854
let mut gossip_report: Option<PeerReport> = None;
@@ -847,18 +862,18 @@ where
847862
notification = finality_notifications.next() => {
848863
if let Some(notif) = notification {
849864
if let Err(err) = self.handle_finality_notification(&notif) {
850-
return err;
865+
break err;
851866
}
852867
} else {
853-
return Error::FinalityStreamTerminated;
868+
break Error::FinalityStreamTerminated;
854869
}
855870
},
856871
// Make sure to pump gossip engine.
857872
_ = gossip_engine => {
858-
return Error::GossipEngineTerminated;
873+
break Error::GossipEngineTerminated;
859874
},
860875
// Process incoming justifications as these can make some in-flight votes obsolete.
861-
response_info = self.on_demand_justifications.next().fuse() => {
876+
response_info = self.comms.on_demand_justifications.next().fuse() => {
862877
match response_info {
863878
ResponseInfo::ValidProof(justif, peer_report) => {
864879
if let Err(err) = self.triage_incoming_justif(justif) {
@@ -878,7 +893,7 @@ where
878893
debug!(target: LOG_TARGET, "🥩 {}", err);
879894
}
880895
} else {
881-
return Error::BlockImportStreamTerminated;
896+
break Error::BlockImportStreamTerminated;
882897
}
883898
},
884899
justif = gossip_proofs.next() => {
@@ -888,7 +903,7 @@ where
888903
debug!(target: LOG_TARGET, "🥩 {}", err);
889904
}
890905
} else {
891-
return Error::FinalityProofGossipStreamTerminated;
906+
break Error::FinalityProofGossipStreamTerminated;
892907
}
893908
},
894909
// Finally process incoming votes.
@@ -899,18 +914,21 @@ where
899914
debug!(target: LOG_TARGET, "🥩 {}", err);
900915
}
901916
} else {
902-
return Error::VotesGossipStreamTerminated;
917+
break Error::VotesGossipStreamTerminated;
903918
}
904919
},
905920
// Process peer reports.
906-
report = self.gossip_report_stream.next() => {
921+
report = self.comms.gossip_report_stream.next() => {
907922
gossip_report = report;
908923
},
909924
}
910925
if let Some(PeerReport { who, cost_benefit }) = gossip_report {
911-
self.gossip_engine.report(who, cost_benefit);
926+
self.comms.gossip_engine.report(who, cost_benefit);
912927
}
913-
}
928+
};
929+
930+
// return error _and_ `comms` that can be reused
931+
(error, self.comms)
914932
}
915933

916934
/// Report the given equivocation to the BEEFY runtime module. This method
@@ -1146,18 +1164,21 @@ pub(crate) mod tests {
11461164
)
11471165
.unwrap();
11481166
let payload_provider = MmrRootProvider::new(api.clone());
1167+
let comms = BeefyComms {
1168+
gossip_engine,
1169+
gossip_validator,
1170+
gossip_report_stream,
1171+
on_demand_justifications,
1172+
};
11491173
BeefyWorker {
11501174
backend,
11511175
payload_provider,
11521176
runtime: api,
11531177
key_store: Some(keystore).into(),
11541178
links,
1155-
gossip_engine,
1156-
gossip_validator,
1157-
gossip_report_stream,
1179+
comms,
11581180
metrics,
11591181
sync: Arc::new(sync),
1160-
on_demand_justifications,
11611182
pending_justifications: BTreeMap::new(),
11621183
persisted_state,
11631184
}

0 commit comments

Comments
 (0)