Skip to content

Commit 387d660

Browse files
authored
refactor: listen to v2 receipts (#628)
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 1e255e8 commit 387d660

File tree

2 files changed

+88
-26
lines changed

2 files changed

+88
-26
lines changed

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 73 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ pub struct SenderAccountsManagerArgs {
142142
pub struct State {
143143
sender_ids_v1: HashSet<Address>,
144144
sender_ids_v2: HashSet<Address>,
145-
new_receipts_watcher_handle: Option<tokio::task::JoinHandle<()>>,
145+
new_receipts_watcher_handle_v1: Option<tokio::task::JoinHandle<()>>,
146+
new_receipts_watcher_handle_v2: Option<tokio::task::JoinHandle<()>>,
146147

147148
config: &'static SenderAccountConfig,
148149
domain_separator: Eip712Domain,
@@ -191,7 +192,9 @@ impl Actor for SenderAccountsManager {
191192
.map(AllocationId::Legacy)
192193
.collect::<HashSet<_>>()
193194
});
194-
let pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap();
195+
// we need two connections because each one will listen to different notify events
196+
let pglistener_v1 = PgListener::connect_with(&pgpool.clone()).await.unwrap();
197+
let pglistener_v2 = PgListener::connect_with(&pgpool.clone()).await.unwrap();
195198
let myself_clone = myself.clone();
196199
let accounts_clone = escrow_accounts_v1.clone();
197200
watch_pipe(accounts_clone, move |escrow_accounts| {
@@ -225,8 +228,9 @@ impl Actor for SenderAccountsManager {
225228
domain_separator,
226229
sender_ids_v1: HashSet::new(),
227230
sender_ids_v2: HashSet::new(),
228-
new_receipts_watcher_handle: None,
229-
pgpool,
231+
new_receipts_watcher_handle_v1: None,
232+
new_receipts_watcher_handle_v2: None,
233+
pgpool: pgpool.clone(),
230234
indexer_allocations,
231235
escrow_accounts_v1: escrow_accounts_v1.clone(),
232236
escrow_accounts_v2: escrow_accounts_v2.clone(),
@@ -279,10 +283,21 @@ impl Actor for SenderAccountsManager {
279283

280284
// Start the new_receipts_watcher task that will consume from the `pglistener`
281285
// after starting all senders
282-
state.new_receipts_watcher_handle = Some(tokio::spawn(new_receipts_watcher(
286+
state.new_receipts_watcher_handle_v1 = Some(tokio::spawn(new_receipts_watcher(
283287
myself.get_cell(),
284-
pglistener,
288+
pglistener_v1,
285289
escrow_accounts_v1,
290+
SenderType::Legacy,
291+
prefix.clone(),
292+
)));
293+
294+
// Start the new_receipts_watcher task that will consume from the `pglistener`
295+
// after starting all senders
296+
state.new_receipts_watcher_handle_v2 = Some(tokio::spawn(new_receipts_watcher(
297+
myself.get_cell(),
298+
pglistener_v2,
299+
escrow_accounts_v2,
300+
SenderType::Horizon,
286301
prefix,
287302
)));
288303

@@ -296,9 +311,14 @@ impl Actor for SenderAccountsManager {
296311
) -> Result<(), ActorProcessingErr> {
297312
// Abort the notification watcher on drop. Otherwise it may panic because the PgPool could
298313
// get dropped before. (Observed in tests)
299-
if let Some(handle) = &state.new_receipts_watcher_handle {
314+
if let Some(handle) = &state.new_receipts_watcher_handle_v1 {
315+
handle.abort();
316+
}
317+
318+
if let Some(handle) = &state.new_receipts_watcher_handle_v2 {
300319
handle.abort();
301320
}
321+
302322
Ok(())
303323
}
304324

@@ -741,20 +761,34 @@ async fn new_receipts_watcher(
741761
actor_cell: ActorCell,
742762
mut pglistener: PgListener,
743763
escrow_accounts_rx: Receiver<EscrowAccounts>,
764+
sender_type: SenderType,
744765
prefix: Option<String>,
745766
) {
746-
pglistener
747-
.listen("scalar_tap_receipt_notification")
748-
.await
749-
.expect(
750-
"should be able to subscribe to Postgres Notify events on the channel \
767+
match sender_type {
768+
SenderType::Legacy => {
769+
pglistener
770+
.listen("scalar_tap_receipt_notification")
771+
.await
772+
.expect(
773+
"should be able to subscribe to Postgres Notify events on the channel \
751774
'scalar_tap_receipt_notification'",
752-
);
775+
);
776+
}
777+
SenderType::Horizon => {
778+
pglistener
779+
.listen("tap_horizon_receipt_notification")
780+
.await
781+
.expect(
782+
"should be able to subscribe to Postgres Notify events on the channel \
783+
'tap_horizon_receipt_notification'",
784+
);
785+
}
786+
}
753787
loop {
754788
let Ok(pg_notification) = pglistener.recv().await else {
755789
tracing::error!(
756790
"should be able to receive Postgres Notify events on the channel \
757-
'scalar_tap_receipt_notification'"
791+
'scalar_tap_receipt_notification'/'tap_horizon_receipt_notification'"
758792
);
759793
break;
760794
};
@@ -770,6 +804,7 @@ async fn new_receipts_watcher(
770804
if let Err(e) = handle_notification(
771805
new_receipt_notification,
772806
escrow_accounts_rx.clone(),
807+
sender_type,
773808
prefix.as_deref(),
774809
)
775810
.await
@@ -798,6 +833,7 @@ async fn new_receipts_watcher(
798833
async fn handle_notification(
799834
new_receipt_notification: NewReceiptNotification,
800835
escrow_accounts_rx: Receiver<EscrowAccounts>,
836+
sender_type: SenderType,
801837
prefix: Option<&str>,
802838
) -> anyhow::Result<()> {
803839
tracing::trace!(
@@ -835,10 +871,14 @@ async fn handle_notification(
835871
allocation_id
836872
);
837873
let sender_account_name = format!(
838-
"{}{sender_address}",
874+
"{}{}{sender_address}",
839875
prefix
840876
.as_ref()
841-
.map_or(String::default(), |prefix| format!("{prefix}:"))
877+
.map_or(String::default(), |prefix| format!("{prefix}:")),
878+
match sender_type {
879+
SenderType::Legacy => "legacy:",
880+
SenderType::Horizon => "horizon:",
881+
}
842882
);
843883

844884
let Some(sender_account) = ActorRef::<SenderAccountMessage>::where_is(sender_account_name)
@@ -849,9 +889,10 @@ async fn handle_notification(
849889
);
850890
};
851891
sender_account
852-
.cast(SenderAccountMessage::NewAllocationId(AllocationId::Legacy(
853-
*allocation_id,
854-
)))
892+
.cast(SenderAccountMessage::NewAllocationId(match sender_type {
893+
SenderType::Legacy => AllocationId::Legacy(*allocation_id),
894+
SenderType::Horizon => AllocationId::Horizon(*allocation_id),
895+
}))
855896
.map_err(|e| {
856897
anyhow!(
857898
"Error while sendeing new allocation id message to sender_account: {:?}",
@@ -946,7 +987,8 @@ mod tests {
946987
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),
947988
sender_ids_v1: HashSet::new(),
948989
sender_ids_v2: HashSet::new(),
949-
new_receipts_watcher_handle: None,
990+
new_receipts_watcher_handle_v1: None,
991+
new_receipts_watcher_handle_v2: None,
950992
pgpool,
951993
indexer_allocations: watch::channel(HashSet::new()).1,
952994
escrow_accounts_v1: watch::channel(escrow_accounts.clone()).1,
@@ -1131,6 +1173,7 @@ mod tests {
11311173
dummy_actor.get_cell(),
11321174
pglistener,
11331175
escrow_accounts_rx,
1176+
SenderType::Legacy,
11341177
Some(prefix.clone()),
11351178
));
11361179

@@ -1174,6 +1217,7 @@ mod tests {
11741217
dummy_actor.get_cell(),
11751218
pglistener,
11761219
escrow_accounts_rx,
1220+
SenderType::Legacy,
11771221
None,
11781222
));
11791223
pgpool.close().await;
@@ -1193,7 +1237,7 @@ mod tests {
11931237
let (last_message_emitted, mut rx) = mpsc::channel(64);
11941238

11951239
let (sender_account, join_handle) = MockSenderAccount::spawn(
1196-
Some(format!("{}:{}", prefix.clone(), SENDER.1,)),
1240+
Some(format!("{}:legacy:{}", prefix.clone(), SENDER.1,)),
11971241
MockSenderAccount {
11981242
last_message_emitted,
11991243
},
@@ -1210,9 +1254,14 @@ mod tests {
12101254
value: 1,
12111255
};
12121256

1213-
handle_notification(new_receipt_notification, escrow_accounts, Some(&prefix))
1214-
.await
1215-
.unwrap();
1257+
handle_notification(
1258+
new_receipt_notification,
1259+
escrow_accounts,
1260+
SenderType::Legacy,
1261+
Some(&prefix),
1262+
)
1263+
.await
1264+
.unwrap();
12161265

12171266
assert_eq!(
12181267
rx.recv().await.unwrap(),

migrations/20250131122241_tap_horizon_receipts.up.sql

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,21 @@ CREATE TABLE IF NOT EXISTS tap_horizon_receipts (
1414
value NUMERIC(39) NOT NULL
1515
);
1616

17-
CREATE INDEX IF NOT EXISTS scalar_tap_receipts_allocation_id_idx ON scalar_tap_receipts (allocation_id);
18-
CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_receipts (timestamp_ns);
17+
CREATE INDEX IF NOT EXISTS tap_horizon_receipts_allocation_id_idx ON tap_horizon_receipts (allocation_id);
18+
CREATE INDEX IF NOT EXISTS tap_horizon_receipts_timestamp_ns_idx ON tap_horizon_receipts (timestamp_ns);
19+
20+
CREATE FUNCTION tap_horizon_receipt_notify()
21+
RETURNS trigger AS
22+
$$
23+
BEGIN
24+
PERFORM pg_notify('tap_horizon_receipt_notification', format('{"id": %s, "allocation_id": "%s", "signer_address": "%s", "timestamp_ns": %s, "value": %s}', NEW.id, NEW.allocation_id, NEW.signer_address, NEW.timestamp_ns, NEW.value));
25+
RETURN NEW;
26+
END;
27+
$$ LANGUAGE 'plpgsql';
28+
29+
CREATE TRIGGER receipt_update AFTER INSERT OR UPDATE
30+
ON tap_horizon_receipts
31+
FOR EACH ROW EXECUTE PROCEDURE tap_horizon_receipt_notify();
1932

2033

2134
-- This table is used to store invalid receipts (receipts that fail at least one of the checks in the tap-agent).

0 commit comments

Comments
 (0)