Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 73 additions & 24 deletions crates/tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ pub struct SenderAccountsManagerArgs {
pub struct State {
sender_ids_v1: HashSet<Address>,
sender_ids_v2: HashSet<Address>,
new_receipts_watcher_handle: Option<tokio::task::JoinHandle<()>>,
new_receipts_watcher_handle_v1: Option<tokio::task::JoinHandle<()>>,
new_receipts_watcher_handle_v2: Option<tokio::task::JoinHandle<()>>,

config: &'static SenderAccountConfig,
domain_separator: Eip712Domain,
Expand Down Expand Up @@ -191,7 +192,9 @@ impl Actor for SenderAccountsManager {
.map(AllocationId::Legacy)
.collect::<HashSet<_>>()
});
let pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap();
// we need two connections because each one will listen to different notify events
let pglistener_v1 = PgListener::connect_with(&pgpool.clone()).await.unwrap();
let pglistener_v2 = PgListener::connect_with(&pgpool.clone()).await.unwrap();
let myself_clone = myself.clone();
let accounts_clone = escrow_accounts_v1.clone();
watch_pipe(accounts_clone, move |escrow_accounts| {
Expand Down Expand Up @@ -225,8 +228,9 @@ impl Actor for SenderAccountsManager {
domain_separator,
sender_ids_v1: HashSet::new(),
sender_ids_v2: HashSet::new(),
new_receipts_watcher_handle: None,
pgpool,
new_receipts_watcher_handle_v1: None,
new_receipts_watcher_handle_v2: None,
pgpool: pgpool.clone(),
indexer_allocations,
escrow_accounts_v1: escrow_accounts_v1.clone(),
escrow_accounts_v2: escrow_accounts_v2.clone(),
Expand Down Expand Up @@ -279,10 +283,21 @@ impl Actor for SenderAccountsManager {

// Start the new_receipts_watcher task that will consume from the `pglistener`
// after starting all senders
state.new_receipts_watcher_handle = Some(tokio::spawn(new_receipts_watcher(
state.new_receipts_watcher_handle_v1 = Some(tokio::spawn(new_receipts_watcher(
myself.get_cell(),
pglistener,
pglistener_v1,
escrow_accounts_v1,
SenderType::Legacy,
prefix.clone(),
)));

// Start the new_receipts_watcher task that will consume from the `pglistener`
// after starting all senders
state.new_receipts_watcher_handle_v2 = Some(tokio::spawn(new_receipts_watcher(
myself.get_cell(),
pglistener_v2,
escrow_accounts_v2,
SenderType::Horizon,
prefix,
)));

Expand All @@ -296,9 +311,14 @@ impl Actor for SenderAccountsManager {
) -> Result<(), ActorProcessingErr> {
// Abort the notification watcher on drop. Otherwise it may panic because the PgPool could
// get dropped before. (Observed in tests)
if let Some(handle) = &state.new_receipts_watcher_handle {
if let Some(handle) = &state.new_receipts_watcher_handle_v1 {
handle.abort();
}

if let Some(handle) = &state.new_receipts_watcher_handle_v2 {
handle.abort();
}

Ok(())
}

Expand Down Expand Up @@ -741,20 +761,34 @@ async fn new_receipts_watcher(
actor_cell: ActorCell,
mut pglistener: PgListener,
escrow_accounts_rx: Receiver<EscrowAccounts>,
sender_type: SenderType,
prefix: Option<String>,
) {
pglistener
.listen("scalar_tap_receipt_notification")
.await
.expect(
"should be able to subscribe to Postgres Notify events on the channel \
match sender_type {
SenderType::Legacy => {
pglistener
.listen("scalar_tap_receipt_notification")
.await
.expect(
"should be able to subscribe to Postgres Notify events on the channel \
'scalar_tap_receipt_notification'",
);
);
}
SenderType::Horizon => {
pglistener
.listen("tap_horizon_receipt_notification")
.await
.expect(
"should be able to subscribe to Postgres Notify events on the channel \
'tap_horizon_receipt_notification'",
);
}
}
loop {
let Ok(pg_notification) = pglistener.recv().await else {
tracing::error!(
"should be able to receive Postgres Notify events on the channel \
'scalar_tap_receipt_notification'"
'scalar_tap_receipt_notification'/'tap_horizon_receipt_notification'"
);
break;
};
Expand All @@ -770,6 +804,7 @@ async fn new_receipts_watcher(
if let Err(e) = handle_notification(
new_receipt_notification,
escrow_accounts_rx.clone(),
sender_type,
prefix.as_deref(),
)
.await
Expand Down Expand Up @@ -798,6 +833,7 @@ async fn new_receipts_watcher(
async fn handle_notification(
new_receipt_notification: NewReceiptNotification,
escrow_accounts_rx: Receiver<EscrowAccounts>,
sender_type: SenderType,
prefix: Option<&str>,
) -> anyhow::Result<()> {
tracing::trace!(
Expand Down Expand Up @@ -835,10 +871,14 @@ async fn handle_notification(
allocation_id
);
let sender_account_name = format!(
"{}{sender_address}",
"{}{}{sender_address}",
prefix
.as_ref()
.map_or(String::default(), |prefix| format!("{prefix}:"))
.map_or(String::default(), |prefix| format!("{prefix}:")),
match sender_type {
SenderType::Legacy => "legacy:",
SenderType::Horizon => "horizon:",
}
);

let Some(sender_account) = ActorRef::<SenderAccountMessage>::where_is(sender_account_name)
Expand All @@ -849,9 +889,10 @@ async fn handle_notification(
);
};
sender_account
.cast(SenderAccountMessage::NewAllocationId(AllocationId::Legacy(
*allocation_id,
)))
.cast(SenderAccountMessage::NewAllocationId(match sender_type {
SenderType::Legacy => AllocationId::Legacy(*allocation_id),
SenderType::Horizon => AllocationId::Horizon(*allocation_id),
}))
.map_err(|e| {
anyhow!(
"Error while sendeing new allocation id message to sender_account: {:?}",
Expand Down Expand Up @@ -946,7 +987,8 @@ mod tests {
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),
sender_ids_v1: HashSet::new(),
sender_ids_v2: HashSet::new(),
new_receipts_watcher_handle: None,
new_receipts_watcher_handle_v1: None,
new_receipts_watcher_handle_v2: None,
pgpool,
indexer_allocations: watch::channel(HashSet::new()).1,
escrow_accounts_v1: watch::channel(escrow_accounts.clone()).1,
Expand Down Expand Up @@ -1131,6 +1173,7 @@ mod tests {
dummy_actor.get_cell(),
pglistener,
escrow_accounts_rx,
SenderType::Legacy,
Some(prefix.clone()),
));

Expand Down Expand Up @@ -1174,6 +1217,7 @@ mod tests {
dummy_actor.get_cell(),
pglistener,
escrow_accounts_rx,
SenderType::Legacy,
None,
));
pgpool.close().await;
Expand All @@ -1193,7 +1237,7 @@ mod tests {
let (last_message_emitted, mut rx) = mpsc::channel(64);

let (sender_account, join_handle) = MockSenderAccount::spawn(
Some(format!("{}:{}", prefix.clone(), SENDER.1,)),
Some(format!("{}:legacy:{}", prefix.clone(), SENDER.1,)),
MockSenderAccount {
last_message_emitted,
},
Expand All @@ -1210,9 +1254,14 @@ mod tests {
value: 1,
};

handle_notification(new_receipt_notification, escrow_accounts, Some(&prefix))
.await
.unwrap();
handle_notification(
new_receipt_notification,
escrow_accounts,
SenderType::Legacy,
Some(&prefix),
)
.await
.unwrap();

assert_eq!(
rx.recv().await.unwrap(),
Expand Down
17 changes: 15 additions & 2 deletions migrations/20250131122241_tap_horizon_receipts.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,21 @@ CREATE TABLE IF NOT EXISTS tap_horizon_receipts (
value NUMERIC(39) NOT NULL
);

CREATE INDEX IF NOT EXISTS scalar_tap_receipts_allocation_id_idx ON scalar_tap_receipts (allocation_id);
CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_receipts (timestamp_ns);
CREATE INDEX IF NOT EXISTS tap_horizon_receipts_allocation_id_idx ON tap_horizon_receipts (allocation_id);
CREATE INDEX IF NOT EXISTS tap_horizon_receipts_timestamp_ns_idx ON tap_horizon_receipts (timestamp_ns);

CREATE FUNCTION tap_horizon_receipt_notify()
RETURNS trigger AS
$$
BEGIN
PERFORM pg_notify('tap_horizon_receipt_notification', format('{"id": %s, "allocation_id": "%s", "signer_address": "%s", "timestamp_ns": %s, "value": %s}', NEW.id, NEW.allocation_id, NEW.signer_address, NEW.timestamp_ns, NEW.value));
RETURN NEW;
END;
$$ LANGUAGE 'plpgsql';

CREATE TRIGGER receipt_update AFTER INSERT OR UPDATE
ON tap_horizon_receipts
FOR EACH ROW EXECUTE PROCEDURE tap_horizon_receipt_notify();


-- This table is used to store invalid receipts (receipts that fail at least one of the checks in the tap-agent).
Expand Down
Loading