diff --git a/Cargo.lock b/Cargo.lock index 91b5b773f..e44db3f04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3723,6 +3723,7 @@ dependencies = [ "tap_core", "tempfile", "test-assets", + "test-log", "thegraph-core", "thiserror", "tokio", diff --git a/crates/tap-agent/Cargo.toml b/crates/tap-agent/Cargo.toml index d17245f0a..979343c12 100644 --- a/crates/tap-agent/Cargo.toml +++ b/crates/tap-agent/Cargo.toml @@ -50,3 +50,4 @@ futures = { version = "0.3.30", default-features = false } tempfile = "3.8.0" wiremock.workspace = true test-assets = { path = "../test-assets" } +test-log = { version = "0.2.12", default-features = false } diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index 8633dbd8d..6a22e07ce 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -160,6 +160,7 @@ impl Actor for SenderAccountsManager { // Start the new_receipts_watcher task that will consume from the `pglistener` // after starting all senders state.new_receipts_watcher_handle = Some(tokio::spawn(new_receipts_watcher( + myself.get_cell(), pglistener, escrow_accounts, prefix, @@ -464,6 +465,7 @@ impl State { /// Continuously listens for new receipt notifications from Postgres and forwards them to the /// corresponding SenderAccount. async fn new_receipts_watcher( + actor_cell: ActorCell, mut pglistener: PgListener, escrow_accounts_rx: Receiver, prefix: Option, @@ -476,16 +478,22 @@ async fn new_receipts_watcher( 'scalar_tap_receipt_notification'", ); loop { - // TODO: recover from errors or shutdown the whole program? - let pg_notification = pglistener.recv().await.expect( - "should be able to receive Postgres Notify events on the channel \ - 'scalar_tap_receipt_notification'", - ); - let new_receipt_notification: NewReceiptNotification = - serde_json::from_str(pg_notification.payload()).expect( + let Ok(pg_notification) = pglistener.recv().await else { + error!( + "should be able to receive Postgres Notify events on the channel \ + 'scalar_tap_receipt_notification'" + ); + break; + }; + let Ok(new_receipt_notification) = + serde_json::from_str::(pg_notification.payload()) + else { + error!( "should be able to deserialize the Postgres Notify event payload as a \ NewReceiptNotification", ); + break; + }; if let Err(e) = handle_notification( new_receipt_notification, escrow_accounts_rx.clone(), @@ -496,6 +504,12 @@ async fn new_receipts_watcher( error!("{}", e); } } + // shutdown the whole system + actor_cell + .kill_and_wait(None) + .await + .expect("Failed to kill manager."); + error!("Manager killed"); } async fn handle_notification( @@ -595,7 +609,7 @@ mod tests { use alloy::hex::ToHexExt; use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient}; use ractor::concurrency::JoinHandle; - use ractor::{Actor, ActorRef}; + use ractor::{Actor, ActorRef, ActorStatus}; use reqwest::Url; use ruint::aliases::U256; use sqlx::postgres::PgListener; @@ -853,9 +867,11 @@ mod tests { HashMap::from([(SENDER.1, vec![SIGNER.1])]), )) .1; + let dummy_actor = DummyActor::spawn().await; // Start the new_receipts_watcher task that will consume from the `pglistener` let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher( + dummy_actor.get_cell(), pglistener, escrow_accounts_rx, Some(prefix.clone()), @@ -882,6 +898,33 @@ mod tests { new_receipts_watcher_handle.abort(); } + #[test_log::test(sqlx::test(migrations = "../../migrations"))] + async fn test_manager_killed_in_database_connection(pgpool: PgPool) { + let mut pglistener = PgListener::connect_with(&pgpool).await.unwrap(); + pglistener + .listen("scalar_tap_receipt_notification") + .await + .expect( + "should be able to subscribe to Postgres Notify events on the channel \ + 'scalar_tap_receipt_notification'", + ); + + let escrow_accounts_rx = watch::channel(EscrowAccounts::default()).1; + let dummy_actor = DummyActor::spawn().await; + + // Start the new_receipts_watcher task that will consume from the `pglistener` + let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher( + dummy_actor.get_cell(), + pglistener, + escrow_accounts_rx, + None, + )); + pgpool.close().await; + new_receipts_watcher_handle.await.unwrap(); + + assert_eq!(dummy_actor.get_status(), ActorStatus::Stopped) + } + #[tokio::test] async fn test_create_allocation_id() { let senders_to_signers = vec![(SENDER.1, vec![SIGNER.1])].into_iter().collect();