@@ -313,6 +313,16 @@ impl<B: Block> PersistedState<B> {
313313 }
314314}
315315
316+ /// Helper object holding BEEFY worker communication/gossip components.
317+ ///
318+ /// These are created once, but will be reused if worker is restarted/reinitialized.
319+ pub ( crate ) struct BeefyComms < B : Block > {
320+ pub gossip_engine : GossipEngine < B > ,
321+ pub gossip_validator : Arc < GossipValidator < B > > ,
322+ pub gossip_report_stream : TracingUnboundedReceiver < PeerReport > ,
323+ pub on_demand_justifications : OnDemandJustificationsEngine < B > ,
324+ }
325+
316326/// A BEEFY worker plays the BEEFY protocol
317327pub ( crate ) struct BeefyWorker < B : Block , BE , P , RuntimeApi , S > {
318328 // utilities
@@ -322,11 +332,8 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
322332 pub sync : Arc < S > ,
323333 pub key_store : BeefyKeystore ,
324334
325- // communication
326- pub gossip_engine : GossipEngine < B > ,
327- pub gossip_validator : Arc < GossipValidator < B > > ,
328- pub gossip_report_stream : TracingUnboundedReceiver < PeerReport > ,
329- pub on_demand_justifications : OnDemandJustificationsEngine < B > ,
335+ // communication (created once, but returned and reused if worker is restarted/reinitialized)
336+ pub comms : BeefyComms < B > ,
330337
331338 // channels
332339 /// Links between the block importer, the background voter and the RPC layer.
@@ -475,7 +482,7 @@ where
475482 if let Err ( e) = self
476483 . persisted_state
477484 . gossip_filter_config ( )
478- . map ( |filter| self . gossip_validator . update_filter ( filter) )
485+ . map ( |filter| self . comms . gossip_validator . update_filter ( filter) )
479486 {
480487 error ! ( target: LOG_TARGET , "🥩 Voter error: {:?}" , e) ;
481488 }
@@ -495,7 +502,11 @@ where
495502 if let Some ( finality_proof) = self . handle_vote ( vote) ? {
496503 let gossip_proof = GossipMessage :: < B > :: FinalityProof ( finality_proof) ;
497504 let encoded_proof = gossip_proof. encode ( ) ;
498- self . gossip_engine . gossip_message ( proofs_topic :: < B > ( ) , encoded_proof, true ) ;
505+ self . comms . gossip_engine . gossip_message (
506+ proofs_topic :: < B > ( ) ,
507+ encoded_proof,
508+ true ,
509+ ) ;
499510 } ,
500511 RoundAction :: Drop => metric_inc ! ( self , beefy_stale_votes) ,
501512 RoundAction :: Enqueue => error ! ( target: LOG_TARGET , "🥩 unexpected vote: {:?}." , vote) ,
@@ -603,7 +614,7 @@ where
603614
604615 metric_set ! ( self , beefy_best_block, block_num) ;
605616
606- self . on_demand_justifications . cancel_requests_older_than ( block_num) ;
617+ self . comms . on_demand_justifications . cancel_requests_older_than ( block_num) ;
607618
608619 if let Err ( e) = self
609620 . backend
@@ -632,7 +643,7 @@ where
632643 // Update gossip validator votes filter.
633644 self . persisted_state
634645 . gossip_filter_config ( )
635- . map ( |filter| self . gossip_validator . update_filter ( filter) ) ?;
646+ . map ( |filter| self . comms . gossip_validator . update_filter ( filter) ) ?;
636647 Ok ( ( ) )
637648 }
638649
@@ -752,12 +763,14 @@ where
752763 err
753764 } ) ? {
754765 let encoded_proof = GossipMessage :: < B > :: FinalityProof ( finality_proof) . encode ( ) ;
755- self . gossip_engine . gossip_message ( proofs_topic :: < B > ( ) , encoded_proof, true ) ;
766+ self . comms
767+ . gossip_engine
768+ . gossip_message ( proofs_topic :: < B > ( ) , encoded_proof, true ) ;
756769 } else {
757770 metric_inc ! ( self , beefy_votes_sent) ;
758771 debug ! ( target: LOG_TARGET , "🥩 Sent vote message: {:?}" , vote) ;
759772 let encoded_vote = GossipMessage :: < B > :: Vote ( vote) . encode ( ) ;
760- self . gossip_engine . gossip_message ( votes_topic :: < B > ( ) , encoded_vote, false ) ;
773+ self . comms . gossip_engine . gossip_message ( votes_topic :: < B > ( ) , encoded_vote, false ) ;
761774 }
762775
763776 // Persist state after vote to avoid double voting in case of voter restarts.
@@ -783,7 +796,7 @@ where
783796 // make sure there's also an on-demand justification request out for it.
784797 if let Some ( ( block, active) ) = self . voting_oracle ( ) . mandatory_pending ( ) {
785798 // This only starts new request if there isn't already an active one.
786- self . on_demand_justifications . request ( block, active) ;
799+ self . comms . on_demand_justifications . request ( block, active) ;
787800 }
788801 }
789802 }
@@ -796,15 +809,16 @@ where
796809 mut self ,
797810 block_import_justif : & mut Fuse < NotificationReceiver < BeefyVersionedFinalityProof < B > > > ,
798811 finality_notifications : & mut Fuse < FinalityNotifications < B > > ,
799- ) -> Error {
812+ ) -> ( Error , BeefyComms < B > ) {
800813 info ! (
801814 target: LOG_TARGET ,
802815 "🥩 run BEEFY worker, best grandpa: #{:?}." ,
803816 self . best_grandpa_block( )
804817 ) ;
805818
806819 let mut votes = Box :: pin (
807- self . gossip_engine
820+ self . comms
821+ . gossip_engine
808822 . messages_for ( votes_topic :: < B > ( ) )
809823 . filter_map ( |notification| async move {
810824 let vote = GossipMessage :: < B > :: decode_all ( & mut & notification. message [ ..] )
@@ -816,7 +830,8 @@ where
816830 . fuse ( ) ,
817831 ) ;
818832 let mut gossip_proofs = Box :: pin (
819- self . gossip_engine
833+ self . comms
834+ . gossip_engine
820835 . messages_for ( proofs_topic :: < B > ( ) )
821836 . filter_map ( |notification| async move {
822837 let proof = GossipMessage :: < B > :: decode_all ( & mut & notification. message [ ..] )
@@ -828,12 +843,12 @@ where
828843 . fuse ( ) ,
829844 ) ;
830845
831- loop {
846+ let error = loop {
832847 // Act on changed 'state'.
833848 self . process_new_state ( ) ;
834849
835850 // Mutable reference used to drive the gossip engine.
836- let mut gossip_engine = & mut self . gossip_engine ;
851+ let mut gossip_engine = & mut self . comms . gossip_engine ;
837852 // Use temp val and report after async section,
838853 // to avoid having to Mutex-wrap `gossip_engine`.
839854 let mut gossip_report: Option < PeerReport > = None ;
@@ -847,18 +862,18 @@ where
847862 notification = finality_notifications. next( ) => {
848863 if let Some ( notif) = notification {
849864 if let Err ( err) = self . handle_finality_notification( & notif) {
850- return err;
865+ break err;
851866 }
852867 } else {
853- return Error :: FinalityStreamTerminated ;
868+ break Error :: FinalityStreamTerminated ;
854869 }
855870 } ,
856871 // Make sure to pump gossip engine.
857872 _ = gossip_engine => {
858- return Error :: GossipEngineTerminated ;
873+ break Error :: GossipEngineTerminated ;
859874 } ,
860875 // Process incoming justifications as these can make some in-flight votes obsolete.
861- response_info = self . on_demand_justifications. next( ) . fuse( ) => {
876+ response_info = self . comms . on_demand_justifications. next( ) . fuse( ) => {
862877 match response_info {
863878 ResponseInfo :: ValidProof ( justif, peer_report) => {
864879 if let Err ( err) = self . triage_incoming_justif( justif) {
@@ -878,7 +893,7 @@ where
878893 debug!( target: LOG_TARGET , "🥩 {}" , err) ;
879894 }
880895 } else {
881- return Error :: BlockImportStreamTerminated ;
896+ break Error :: BlockImportStreamTerminated ;
882897 }
883898 } ,
884899 justif = gossip_proofs. next( ) => {
@@ -888,7 +903,7 @@ where
888903 debug!( target: LOG_TARGET , "🥩 {}" , err) ;
889904 }
890905 } else {
891- return Error :: FinalityProofGossipStreamTerminated ;
906+ break Error :: FinalityProofGossipStreamTerminated ;
892907 }
893908 } ,
894909 // Finally process incoming votes.
@@ -899,18 +914,21 @@ where
899914 debug!( target: LOG_TARGET , "🥩 {}" , err) ;
900915 }
901916 } else {
902- return Error :: VotesGossipStreamTerminated ;
917+ break Error :: VotesGossipStreamTerminated ;
903918 }
904919 } ,
905920 // Process peer reports.
906- report = self . gossip_report_stream. next( ) => {
921+ report = self . comms . gossip_report_stream. next( ) => {
907922 gossip_report = report;
908923 } ,
909924 }
910925 if let Some ( PeerReport { who, cost_benefit } ) = gossip_report {
911- self . gossip_engine . report ( who, cost_benefit) ;
926+ self . comms . gossip_engine . report ( who, cost_benefit) ;
912927 }
913- }
928+ } ;
929+
930+ // return error _and_ `comms` that can be reused
931+ ( error, self . comms )
914932 }
915933
916934 /// Report the given equivocation to the BEEFY runtime module. This method
@@ -1146,18 +1164,21 @@ pub(crate) mod tests {
11461164 )
11471165 . unwrap ( ) ;
11481166 let payload_provider = MmrRootProvider :: new ( api. clone ( ) ) ;
1167+ let comms = BeefyComms {
1168+ gossip_engine,
1169+ gossip_validator,
1170+ gossip_report_stream,
1171+ on_demand_justifications,
1172+ } ;
11491173 BeefyWorker {
11501174 backend,
11511175 payload_provider,
11521176 runtime : api,
11531177 key_store : Some ( keystore) . into ( ) ,
11541178 links,
1155- gossip_engine,
1156- gossip_validator,
1157- gossip_report_stream,
1179+ comms,
11581180 metrics,
11591181 sync : Arc :: new ( sync) ,
1160- on_demand_justifications,
11611182 pending_justifications : BTreeMap :: new ( ) ,
11621183 persisted_state,
11631184 }
0 commit comments