Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/tap-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ futures = { version = "0.3.30", default-features = false }
tempfile = "3.8.0"
wiremock.workspace = true
test-assets = { path = "../test-assets" }
test-log = { version = "0.2.12", default-features = false }
59 changes: 51 additions & 8 deletions crates/tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl Actor for SenderAccountsManager {
// Start the new_receipts_watcher task that will consume from the `pglistener`
// after starting all senders
state.new_receipts_watcher_handle = Some(tokio::spawn(new_receipts_watcher(
myself.get_cell(),
pglistener,
escrow_accounts,
prefix,
Expand Down Expand Up @@ -464,6 +465,7 @@ impl State {
/// Continuously listens for new receipt notifications from Postgres and forwards them to the
/// corresponding SenderAccount.
async fn new_receipts_watcher(
actor_cell: ActorCell,
mut pglistener: PgListener,
escrow_accounts_rx: Receiver<EscrowAccounts>,
prefix: Option<String>,
Expand All @@ -476,16 +478,22 @@ async fn new_receipts_watcher(
'scalar_tap_receipt_notification'",
);
loop {
// TODO: recover from errors or shutdown the whole program?
let pg_notification = pglistener.recv().await.expect(
"should be able to receive Postgres Notify events on the channel \
'scalar_tap_receipt_notification'",
);
let new_receipt_notification: NewReceiptNotification =
serde_json::from_str(pg_notification.payload()).expect(
let Ok(pg_notification) = pglistener.recv().await else {
error!(
"should be able to receive Postgres Notify events on the channel \
'scalar_tap_receipt_notification'"
);
break;
};
let Ok(new_receipt_notification) =
serde_json::from_str::<NewReceiptNotification>(pg_notification.payload())
else {
error!(
"should be able to deserialize the Postgres Notify event payload as a \
NewReceiptNotification",
);
break;
};
if let Err(e) = handle_notification(
new_receipt_notification,
escrow_accounts_rx.clone(),
Expand All @@ -496,6 +504,12 @@ async fn new_receipts_watcher(
error!("{}", e);
}
}
// shutdown the whole system
actor_cell
.kill_and_wait(None)
.await
.expect("Failed to kill manager.");
error!("Manager killed");
}

async fn handle_notification(
Expand Down Expand Up @@ -595,7 +609,7 @@ mod tests {
use alloy::hex::ToHexExt;
use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient};
use ractor::concurrency::JoinHandle;
use ractor::{Actor, ActorRef};
use ractor::{Actor, ActorRef, ActorStatus};
use reqwest::Url;
use ruint::aliases::U256;
use sqlx::postgres::PgListener;
Expand Down Expand Up @@ -853,9 +867,11 @@ mod tests {
HashMap::from([(SENDER.1, vec![SIGNER.1])]),
))
.1;
let dummy_actor = DummyActor::spawn().await;

// Start the new_receipts_watcher task that will consume from the `pglistener`
let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher(
dummy_actor.get_cell(),
pglistener,
escrow_accounts_rx,
Some(prefix.clone()),
Expand All @@ -882,6 +898,33 @@ mod tests {
new_receipts_watcher_handle.abort();
}

#[test_log::test(sqlx::test(migrations = "../../migrations"))]
async fn test_manager_killed_in_database_connection(pgpool: PgPool) {
let mut pglistener = PgListener::connect_with(&pgpool).await.unwrap();
pglistener
.listen("scalar_tap_receipt_notification")
.await
.expect(
"should be able to subscribe to Postgres Notify events on the channel \
'scalar_tap_receipt_notification'",
);

let escrow_accounts_rx = watch::channel(EscrowAccounts::default()).1;
let dummy_actor = DummyActor::spawn().await;

// Start the new_receipts_watcher task that will consume from the `pglistener`
let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher(
dummy_actor.get_cell(),
pglistener,
escrow_accounts_rx,
None,
));
pgpool.close().await;
new_receipts_watcher_handle.await.unwrap();

assert_eq!(dummy_actor.get_status(), ActorStatus::Stopped)
}

#[tokio::test]
async fn test_create_allocation_id() {
let senders_to_signers = vec![(SENDER.1, vec![SIGNER.1])].into_iter().collect();
Expand Down
Loading