Skip to content

Commit ba0d7be

Browse files
committed
fix: shutdown tap-agent if db connection lost
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 7e5d280 commit ba0d7be

File tree

3 files changed

+53
-8
lines changed

3 files changed

+53
-8
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/tap-agent/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,4 @@ futures = { version = "0.3.30", default-features = false }
5050
tempfile = "3.8.0"
5151
wiremock.workspace = true
5252
test-assets = { path = "../test-assets" }
53+
test-log = { version = "0.2.12", default-features = false }

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ impl Actor for SenderAccountsManager {
160160
// Start the new_receipts_watcher task that will consume from the `pglistener`
161161
// after starting all senders
162162
state.new_receipts_watcher_handle = Some(tokio::spawn(new_receipts_watcher(
163+
myself.get_cell(),
163164
pglistener,
164165
escrow_accounts,
165166
prefix,
@@ -464,6 +465,7 @@ impl State {
464465
/// Continuously listens for new receipt notifications from Postgres and forwards them to the
465466
/// corresponding SenderAccount.
466467
async fn new_receipts_watcher(
468+
actor_cell: ActorCell,
467469
mut pglistener: PgListener,
468470
escrow_accounts_rx: Receiver<EscrowAccounts>,
469471
prefix: Option<String>,
@@ -476,16 +478,22 @@ async fn new_receipts_watcher(
476478
'scalar_tap_receipt_notification'",
477479
);
478480
loop {
479-
// TODO: recover from errors or shutdown the whole program?
480-
let pg_notification = pglistener.recv().await.expect(
481-
"should be able to receive Postgres Notify events on the channel \
482-
'scalar_tap_receipt_notification'",
483-
);
484-
let new_receipt_notification: NewReceiptNotification =
485-
serde_json::from_str(pg_notification.payload()).expect(
481+
let Ok(pg_notification) = pglistener.recv().await else {
482+
error!(
483+
"should be able to receive Postgres Notify events on the channel \
484+
'scalar_tap_receipt_notification'"
485+
);
486+
break;
487+
};
488+
let Ok(new_receipt_notification) =
489+
serde_json::from_str::<NewReceiptNotification>(pg_notification.payload())
490+
else {
491+
error!(
486492
"should be able to deserialize the Postgres Notify event payload as a \
487493
NewReceiptNotification",
488494
);
495+
break;
496+
};
489497
if let Err(e) = handle_notification(
490498
new_receipt_notification,
491499
escrow_accounts_rx.clone(),
@@ -496,6 +504,12 @@ async fn new_receipts_watcher(
496504
error!("{}", e);
497505
}
498506
}
507+
// shutdown the whole system
508+
actor_cell
509+
.kill_and_wait(None)
510+
.await
511+
.expect("Failed to kill manager.");
512+
error!("Manager killed");
499513
}
500514

501515
async fn handle_notification(
@@ -595,7 +609,7 @@ mod tests {
595609
use alloy::hex::ToHexExt;
596610
use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient};
597611
use ractor::concurrency::JoinHandle;
598-
use ractor::{Actor, ActorRef};
612+
use ractor::{Actor, ActorRef, ActorStatus};
599613
use reqwest::Url;
600614
use ruint::aliases::U256;
601615
use sqlx::postgres::PgListener;
@@ -853,9 +867,11 @@ mod tests {
853867
HashMap::from([(SENDER.1, vec![SIGNER.1])]),
854868
))
855869
.1;
870+
let dummy_actor = DummyActor::spawn().await;
856871

857872
// Start the new_receipts_watcher task that will consume from the `pglistener`
858873
let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher(
874+
dummy_actor.get_cell(),
859875
pglistener,
860876
escrow_accounts_rx,
861877
Some(prefix.clone()),
@@ -882,6 +898,33 @@ mod tests {
882898
new_receipts_watcher_handle.abort();
883899
}
884900

901+
#[test_log::test(sqlx::test(migrations = "../../migrations"))]
902+
async fn test_manager_killed_in_database_connection(pgpool: PgPool) {
903+
let mut pglistener = PgListener::connect_with(&pgpool).await.unwrap();
904+
pglistener
905+
.listen("scalar_tap_receipt_notification")
906+
.await
907+
.expect(
908+
"should be able to subscribe to Postgres Notify events on the channel \
909+
'scalar_tap_receipt_notification'",
910+
);
911+
912+
let escrow_accounts_rx = watch::channel(EscrowAccounts::default()).1;
913+
let dummy_actor = DummyActor::spawn().await;
914+
915+
// Start the new_receipts_watcher task that will consume from the `pglistener`
916+
let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher(
917+
dummy_actor.get_cell(),
918+
pglistener,
919+
escrow_accounts_rx,
920+
None,
921+
));
922+
pgpool.close().await;
923+
new_receipts_watcher_handle.await.unwrap();
924+
925+
assert_eq!(dummy_actor.get_status(), ActorStatus::Stopped)
926+
}
927+
885928
#[tokio::test]
886929
async fn test_create_allocation_id() {
887930
let senders_to_signers = vec![(SENDER.1, vec![SIGNER.1])].into_iter().collect();

0 commit comments

Comments
 (0)