@@ -77,7 +77,12 @@ pub fn is_ancient_block<N: From<u32> + PartialOrd + Saturating>(block: N, best:
7777}
7878
7979/// Opaque justifications subscription type.
80- pub struct Subscription < T > ( pub ( crate ) Mutex < futures:: channel:: mpsc:: Receiver < Option < T > > > ) ;
80+ pub struct Subscription < T > (
81+ pub ( crate ) Mutex < futures:: channel:: mpsc:: Receiver < Option < T > > > ,
82+ // The following field is not explicitly used by the code. But when it is dropped,
83+ // the bakground task receives a shutdown signal.
84+ #[ allow( dead_code) ] pub ( crate ) futures:: channel:: oneshot:: Sender < ( ) > ,
85+ ) ;
8186
8287/// Opaque GRANDPA authorities set.
8388pub type OpaqueGrandpaAuthoritiesSet = Vec < u8 > ;
@@ -621,6 +626,7 @@ impl<C: Chain> Client<C> {
621626 e
622627 } ) ??;
623628
629+ let ( cancel_sender, cancel_receiver) = futures:: channel:: oneshot:: channel ( ) ;
624630 let ( sender, receiver) = futures:: channel:: mpsc:: channel ( MAX_SUBSCRIPTION_CAPACITY ) ;
625631 let ( tracker, subscription) = self
626632 . jsonrpsee_execute ( move |client| async move {
@@ -639,7 +645,7 @@ impl<C: Chain> Client<C> {
639645 self_clone,
640646 stall_timeout,
641647 tx_hash,
642- Subscription ( Mutex :: new ( receiver) ) ,
648+ Subscription ( Mutex :: new ( receiver) , cancel_sender ) ,
643649 ) ;
644650 Ok ( ( tracker, subscription) )
645651 } )
@@ -649,6 +655,7 @@ impl<C: Chain> Client<C> {
649655 "extrinsic" . into ( ) ,
650656 subscription,
651657 sender,
658+ cancel_receiver,
652659 ) ) ;
653660 Ok ( tracker)
654661 }
@@ -790,14 +797,16 @@ impl<C: Chain> Client<C> {
790797 Ok ( FC :: subscribe_justifications ( & client) . await ?)
791798 } )
792799 . await ?;
800+ let ( cancel_sender, cancel_receiver) = futures:: channel:: oneshot:: channel ( ) ;
793801 let ( sender, receiver) = futures:: channel:: mpsc:: channel ( MAX_SUBSCRIPTION_CAPACITY ) ;
794802 self . data . read ( ) . await . tokio . spawn ( Subscription :: background_worker (
795803 C :: NAME . into ( ) ,
796804 "justification" . into ( ) ,
797805 subscription,
798806 sender,
807+ cancel_receiver,
799808 ) ) ;
800- Ok ( Subscription ( Mutex :: new ( receiver) ) )
809+ Ok ( Subscription ( Mutex :: new ( receiver) , cancel_sender ) )
801810 }
802811
803812 /// Generates a proof of key ownership for the given authority in the given set.
@@ -843,9 +852,17 @@ impl<C: Chain> Client<C> {
843852impl < T : DeserializeOwned > Subscription < T > {
844853 /// Consumes subscription and returns future statuses stream.
845854 pub fn into_stream ( self ) -> impl futures:: Stream < Item = T > {
846- futures:: stream:: unfold ( self , |this| async {
855+ futures:: stream:: unfold ( Some ( self ) , |mut this| async move {
856+ let Some ( this) = this. take ( ) else { return None } ;
847857 let item = this. 0 . lock ( ) . await . next ( ) . await . unwrap_or ( None ) ;
848- item. map ( |i| ( i, this) )
858+ match item {
859+ Some ( item) => Some ( ( item, Some ( this) ) ) ,
860+ None => {
861+ // let's make it explicit here
862+ let _ = this. 1 . send ( ( ) ) ;
863+ None
864+ } ,
865+ }
849866 } )
850867 }
851868
@@ -860,36 +877,61 @@ impl<T: DeserializeOwned> Subscription<T> {
860877 async fn background_worker (
861878 chain_name : String ,
862879 item_type : String ,
863- mut subscription : jsonrpsee:: core:: client:: Subscription < T > ,
880+ subscription : jsonrpsee:: core:: client:: Subscription < T > ,
864881 mut sender : futures:: channel:: mpsc:: Sender < Option < T > > ,
882+ cancel_receiver : futures:: channel:: oneshot:: Receiver < ( ) > ,
865883 ) {
884+ log:: trace!(
885+ target: "bridge" ,
886+ "Starting background worker for {} {} subscription stream." ,
887+ chain_name,
888+ item_type,
889+ ) ;
890+
891+ futures:: pin_mut!( subscription, cancel_receiver) ;
866892 loop {
867- match subscription. next ( ) . await {
868- Some ( Ok ( item) ) =>
893+ match futures :: future :: select ( subscription. next ( ) , & mut cancel_receiver ) . await {
894+ futures :: future :: Either :: Left ( ( Some ( Ok ( item) ) , _ ) ) =>
869895 if sender. send ( Some ( item) ) . await . is_err ( ) {
896+ log:: trace!(
897+ target: "bridge" ,
898+ "{} {} subscription stream: no listener. Stopping background worker." ,
899+ chain_name,
900+ item_type,
901+ ) ;
902+
870903 break
871904 } ,
872- Some ( Err ( e) ) => {
905+ futures :: future :: Either :: Left ( ( Some ( Err ( e) ) , _ ) ) => {
873906 log:: trace!(
874907 target: "bridge" ,
875- "{} {} subscription stream has returned '{:?}'. Stream needs to be restarted." ,
908+ "{} {} subscription stream has returned '{:?}'. Stream needs to be restarted. Stopping background worker. " ,
876909 chain_name,
877910 item_type,
878911 e,
879912 ) ;
880913 let _ = sender. send ( None ) . await ;
881914 break
882915 } ,
883- None => {
916+ futures :: future :: Either :: Left ( ( None , _ ) ) => {
884917 log:: trace!(
885918 target: "bridge" ,
886- "{} {} subscription stream has returned None. Stream needs to be restarted." ,
919+ "{} {} subscription stream has returned None. Stream needs to be restarted. Stopping background worker. " ,
887920 chain_name,
888921 item_type,
889922 ) ;
890923 let _ = sender. send ( None ) . await ;
891924 break
892925 } ,
926+ futures:: future:: Either :: Right ( ( _, _) ) => {
927+ log:: trace!(
928+ target: "bridge" ,
929+ "{} {} subscription stream: listener has been dropped. Stopping background worker." ,
930+ chain_name,
931+ item_type,
932+ ) ;
933+ break ;
934+ } ,
893935 }
894936 }
895937 }
0 commit comments