@@ -160,6 +160,7 @@ impl Actor for SenderAccountsManager {
160160 // Start the new_receipts_watcher task that will consume from the `pglistener`
161161 // after starting all senders
162162 state. new_receipts_watcher_handle = Some ( tokio:: spawn ( new_receipts_watcher (
163+ myself. get_cell ( ) ,
163164 pglistener,
164165 escrow_accounts,
165166 prefix,
@@ -464,6 +465,7 @@ impl State {
464465/// Continuously listens for new receipt notifications from Postgres and forwards them to the
465466/// corresponding SenderAccount.
466467async fn new_receipts_watcher (
468+ actor_cell : ActorCell ,
467469 mut pglistener : PgListener ,
468470 escrow_accounts_rx : Receiver < EscrowAccounts > ,
469471 prefix : Option < String > ,
@@ -476,16 +478,22 @@ async fn new_receipts_watcher(
476478 'scalar_tap_receipt_notification'",
477479 ) ;
478480 loop {
479- // TODO: recover from errors or shutdown the whole program?
480- let pg_notification = pglistener. recv ( ) . await . expect (
481- "should be able to receive Postgres Notify events on the channel \
482- 'scalar_tap_receipt_notification'",
483- ) ;
484- let new_receipt_notification: NewReceiptNotification =
485- serde_json:: from_str ( pg_notification. payload ( ) ) . expect (
481+ let Ok ( pg_notification) = pglistener. recv ( ) . await else {
482+ error ! (
483+ "should be able to receive Postgres Notify events on the channel \
484+ 'scalar_tap_receipt_notification'"
485+ ) ;
486+ break ;
487+ } ;
488+ let Ok ( new_receipt_notification) =
489+ serde_json:: from_str :: < NewReceiptNotification > ( pg_notification. payload ( ) )
490+ else {
491+ error ! (
486492 "should be able to deserialize the Postgres Notify event payload as a \
487493 NewReceiptNotification",
488494 ) ;
495+ break ;
496+ } ;
489497 if let Err ( e) = handle_notification (
490498 new_receipt_notification,
491499 escrow_accounts_rx. clone ( ) ,
@@ -496,6 +504,12 @@ async fn new_receipts_watcher(
496504 error ! ( "{}" , e) ;
497505 }
498506 }
507+ // shutdown the whole system
508+ actor_cell
509+ . kill_and_wait ( None )
510+ . await
511+ . expect ( "Failed to kill manager." ) ;
512+ error ! ( "Manager killed" ) ;
499513}
500514
501515async fn handle_notification (
@@ -595,7 +609,7 @@ mod tests {
595609 use alloy:: hex:: ToHexExt ;
596610 use indexer_monitor:: { DeploymentDetails , EscrowAccounts , SubgraphClient } ;
597611 use ractor:: concurrency:: JoinHandle ;
598- use ractor:: { Actor , ActorRef } ;
612+ use ractor:: { Actor , ActorRef , ActorStatus } ;
599613 use reqwest:: Url ;
600614 use ruint:: aliases:: U256 ;
601615 use sqlx:: postgres:: PgListener ;
@@ -853,9 +867,11 @@ mod tests {
853867 HashMap :: from ( [ ( SENDER . 1 , vec ! [ SIGNER . 1 ] ) ] ) ,
854868 ) )
855869 . 1 ;
870+ let dummy_actor = DummyActor :: spawn ( ) . await ;
856871
857872 // Start the new_receipts_watcher task that will consume from the `pglistener`
858873 let new_receipts_watcher_handle = tokio:: spawn ( new_receipts_watcher (
874+ dummy_actor. get_cell ( ) ,
859875 pglistener,
860876 escrow_accounts_rx,
861877 Some ( prefix. clone ( ) ) ,
@@ -882,6 +898,33 @@ mod tests {
882898 new_receipts_watcher_handle. abort ( ) ;
883899 }
884900
901+ #[ test_log:: test( sqlx:: test( migrations = "../../migrations" ) ) ]
902+ async fn test_manager_killed_in_database_connection ( pgpool : PgPool ) {
903+ let mut pglistener = PgListener :: connect_with ( & pgpool) . await . unwrap ( ) ;
904+ pglistener
905+ . listen ( "scalar_tap_receipt_notification" )
906+ . await
907+ . expect (
908+ "should be able to subscribe to Postgres Notify events on the channel \
909+ 'scalar_tap_receipt_notification'",
910+ ) ;
911+
912+ let escrow_accounts_rx = watch:: channel ( EscrowAccounts :: default ( ) ) . 1 ;
913+ let dummy_actor = DummyActor :: spawn ( ) . await ;
914+
915+ // Start the new_receipts_watcher task that will consume from the `pglistener`
916+ let new_receipts_watcher_handle = tokio:: spawn ( new_receipts_watcher (
917+ dummy_actor. get_cell ( ) ,
918+ pglistener,
919+ escrow_accounts_rx,
920+ None ,
921+ ) ) ;
922+ pgpool. close ( ) . await ;
923+ new_receipts_watcher_handle. await . unwrap ( ) ;
924+
925+ assert_eq ! ( dummy_actor. get_status( ) , ActorStatus :: Stopped )
926+ }
927+
885928 #[ tokio:: test]
886929 async fn test_create_allocation_id ( ) {
887930 let senders_to_signers = vec ! [ ( SENDER . 1 , vec![ SIGNER . 1 ] ) ] . into_iter ( ) . collect ( ) ;
0 commit comments