Skip to content

Commit 69c55f0

Browse files
authored
fix: create sender allocation on receipt notification (#208)
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 990c929 commit 69c55f0

File tree

3 files changed

+197
-47
lines changed

3 files changed

+197
-47
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Balance = U256;
3737
pub enum SenderAccountMessage {
3838
UpdateBalanceAndLastRavs(Balance, RavMap),
3939
UpdateAllocationIds(HashSet<Address>),
40+
NewAllocationId(Address),
4041
UpdateReceiptFees(Address, UnaggregatedReceipts),
4142
UpdateInvalidReceiptFees(Address, UnaggregatedReceipts),
4243
UpdateRav(SignedRAV),
@@ -568,6 +569,19 @@ impl Actor for SenderAccount {
568569
);
569570
state.allocation_ids = allocation_ids;
570571
}
572+
SenderAccountMessage::NewAllocationId(allocation_id) => {
573+
if let Err(error) = state
574+
.create_sender_allocation(myself.clone(), allocation_id)
575+
.await
576+
{
577+
error!(
578+
%error,
579+
%allocation_id,
580+
"There was an error while creating Sender Allocation."
581+
);
582+
}
583+
state.allocation_ids.insert(allocation_id);
584+
}
571585
SenderAccountMessage::UpdateBalanceAndLastRavs(new_balance, non_final_last_ravs) => {
572586
state.sender_balance = new_balance;
573587

@@ -733,6 +747,7 @@ pub mod tests {
733747
Self::UpdateInvalidReceiptFees(l0, l1),
734748
Self::UpdateInvalidReceiptFees(r0, r1),
735749
) => l0 == r0 && l1 == r1,
750+
(Self::NewAllocationId(l0), Self::NewAllocationId(r0)) => l0 == r0,
736751
(a, b) => unimplemented!("PartialEq not implementated for {a:?} and {b:?}"),
737752
}
738753
}
@@ -848,6 +863,53 @@ pub mod tests {
848863
handle.await.unwrap();
849864
}
850865

866+
#[sqlx::test(migrations = "../migrations")]
867+
async fn test_new_allocation_id(pgpool: PgPool) {
868+
let (sender_account, handle, prefix, _) = create_sender_account(
869+
pgpool,
870+
HashSet::new(),
871+
TRIGGER_VALUE,
872+
TRIGGER_VALUE,
873+
DUMMY_URL,
874+
)
875+
.await;
876+
877+
// we expect it to create a sender allocation
878+
sender_account
879+
.cast(SenderAccountMessage::NewAllocationId(*ALLOCATION_ID_0))
880+
.unwrap();
881+
882+
tokio::time::sleep(Duration::from_millis(10)).await;
883+
884+
// verify if create sender account
885+
let sender_allocation_id = format!("{}:{}:{}", prefix.clone(), SENDER.1, *ALLOCATION_ID_0);
886+
let actor_ref = ActorRef::<SenderAllocationMessage>::where_is(sender_allocation_id.clone());
887+
assert!(actor_ref.is_some());
888+
889+
// nothing should change because we already created
890+
sender_account
891+
.cast(SenderAccountMessage::UpdateAllocationIds(
892+
vec![*ALLOCATION_ID_0].into_iter().collect(),
893+
))
894+
.unwrap();
895+
tokio::time::sleep(Duration::from_millis(10)).await;
896+
897+
// try to delete sender allocation_id
898+
sender_account
899+
.cast(SenderAccountMessage::UpdateAllocationIds(HashSet::new()))
900+
.unwrap();
901+
902+
tokio::time::sleep(Duration::from_millis(100)).await;
903+
904+
let actor_ref = ActorRef::<SenderAllocationMessage>::where_is(sender_allocation_id.clone());
905+
assert!(actor_ref.is_none());
906+
907+
// safely stop the manager
908+
sender_account.stop_and_wait(None, None).await.unwrap();
909+
910+
handle.await.unwrap();
911+
}
912+
851913
pub struct MockSenderAllocation {
852914
triggered_rav_request: Arc<AtomicU32>,
853915
next_rav_value: Arc<Mutex<u128>>,

tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 132 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use std::{collections::HashMap, str::FromStr};
88
use crate::agent::sender_allocation::SenderAllocationMessage;
99
use crate::lazy_static;
1010
use alloy_sol_types::Eip712Domain;
11-
use anyhow::anyhow;
1211
use anyhow::Result;
12+
use anyhow::{anyhow, bail};
1313
use eventuals::{Eventual, EventualExt, PipeHandle};
1414
use indexer_common::escrow_accounts::EscrowAccounts;
1515
use indexer_common::prelude::{Allocation, SubgraphClient};
@@ -460,58 +460,98 @@ async fn new_receipts_watcher(
460460
"should be able to deserialize the Postgres Notify event payload as a \
461461
NewReceiptNotification",
462462
);
463+
if let Err(e) = handle_notification(
464+
new_receipt_notification,
465+
&escrow_accounts,
466+
prefix.as_deref(),
467+
)
468+
.await
469+
{
470+
error!("{}", e);
471+
}
472+
}
473+
}
463474

464-
tracing::debug!(
465-
notification = ?new_receipt_notification,
466-
"New receipt notification detected!"
475+
async fn handle_notification(
476+
new_receipt_notification: NewReceiptNotification,
477+
escrow_accounts: &Eventual<EscrowAccounts>,
478+
prefix: Option<&str>,
479+
) -> Result<()> {
480+
tracing::debug!(
481+
notification = ?new_receipt_notification,
482+
"New receipt notification detected!"
483+
);
484+
485+
let Ok(sender_address) = escrow_accounts
486+
.value()
487+
.await
488+
.expect("should be able to get escrow accounts")
489+
.get_sender_for_signer(&new_receipt_notification.signer_address)
490+
else {
491+
// TODO: save the receipt in the failed receipts table?
492+
bail!(
493+
"No sender address found for receipt signer address {}. \
494+
This should not happen.",
495+
new_receipt_notification.signer_address
467496
);
497+
};
468498

469-
let Ok(sender_address) = escrow_accounts
470-
.value()
471-
.await
472-
.expect("should be able to get escrow accounts")
473-
.get_sender_for_signer(&new_receipt_notification.signer_address)
474-
else {
475-
error!(
476-
"No sender address found for receipt signer address {}. \
477-
This should not happen.",
478-
new_receipt_notification.signer_address
479-
);
480-
// TODO: save the receipt in the failed receipts table?
481-
continue;
482-
};
483-
484-
let allocation_id = &new_receipt_notification.allocation_id;
485-
let allocation_str = &allocation_id.to_string();
486-
487-
let actor_name = format!(
488-
"{}{sender_address}:{allocation_id}",
499+
let allocation_id = &new_receipt_notification.allocation_id;
500+
let allocation_str = &allocation_id.to_string();
501+
502+
let actor_name = format!(
503+
"{}{sender_address}:{allocation_id}",
504+
prefix
505+
.as_ref()
506+
.map_or(String::default(), |prefix| format!("{prefix}:"))
507+
);
508+
509+
let Some(sender_allocation) = ActorRef::<SenderAllocationMessage>::where_is(actor_name) else {
510+
warn!(
511+
"No sender_allocation found for sender_address {}, allocation_id {} to process new \
512+
receipt notification. Starting a new sender_allocation.",
513+
sender_address, allocation_id
514+
);
515+
let sender_account_name = format!(
516+
"{}{sender_address}",
489517
prefix
490518
.as_ref()
491519
.map_or(String::default(), |prefix| format!("{prefix}:"))
492520
);
493521

494-
if let Some(sender_allocation) = ActorRef::<SenderAllocationMessage>::where_is(actor_name) {
495-
if let Err(e) = sender_allocation.cast(SenderAllocationMessage::NewReceipt(
496-
new_receipt_notification,
497-
)) {
498-
error!(
499-
"Error while forwarding new receipt notification to sender_allocation: {:?}",
500-
e
501-
);
502-
}
503-
} else {
504-
warn!(
505-
"No sender_allocation found for sender_address {}, allocation_id {} to process new \
506-
receipt notification. This should not happen.",
507-
sender_address,
508-
allocation_id
522+
let Some(sender_account) = ActorRef::<SenderAccountMessage>::where_is(sender_account_name)
523+
else {
524+
bail!(
525+
"No sender_account was found for address: {}.",
526+
sender_address
509527
);
510-
}
511-
RECEIPTS_CREATED
512-
.with_label_values(&[&sender_address.to_string(), allocation_str])
513-
.inc();
514-
}
528+
};
529+
sender_account
530+
.cast(SenderAccountMessage::NewAllocationId(*allocation_id))
531+
.map_err(|e| {
532+
anyhow!(
533+
"Error while sendeing new allocation id message to sender_account: {:?}",
534+
e
535+
)
536+
})?;
537+
return Ok(());
538+
};
539+
540+
sender_allocation
541+
.cast(SenderAllocationMessage::NewReceipt(
542+
new_receipt_notification,
543+
))
544+
.map_err(|e| {
545+
anyhow::anyhow!(
546+
"Error while forwarding new receipt notification to sender_allocation: {:?}",
547+
e
548+
)
549+
})?;
550+
551+
RECEIPTS_CREATED
552+
.with_label_values(&[&sender_address.to_string(), allocation_str])
553+
.inc();
554+
Ok(())
515555
}
516556

517557
#[cfg(test)]
@@ -522,6 +562,8 @@ mod tests {
522562
};
523563
use crate::agent::sender_account::tests::{MockSenderAllocation, PREFIX_ID};
524564
use crate::agent::sender_account::SenderAccountMessage;
565+
use crate::agent::sender_accounts_manager::{handle_notification, NewReceiptNotification};
566+
use crate::agent::sender_allocation::tests::MockSenderAccount;
525567
use crate::config;
526568
use crate::tap::test_utils::{
527569
create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID_0,
@@ -537,6 +579,7 @@ mod tests {
537579
use sqlx::postgres::PgListener;
538580
use sqlx::PgPool;
539581
use std::collections::{HashMap, HashSet};
582+
use std::sync::{Arc, Mutex};
540583
use std::time::Duration;
541584

542585
const DUMMY_URL: &str = "http://localhost:1234";
@@ -803,4 +846,49 @@ mod tests {
803846

804847
new_receipts_watcher_handle.abort();
805848
}
849+
850+
#[tokio::test]
851+
async fn test_create_allocation_id() {
852+
let senders_to_signers = vec![(SENDER.1, vec![SIGNER.1])].into_iter().collect();
853+
let escrow_accounts = EscrowAccounts::new(HashMap::new(), senders_to_signers);
854+
let escrow_accounts = Eventual::from_value(escrow_accounts);
855+
856+
let prefix = format!(
857+
"test-{}",
858+
PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
859+
);
860+
861+
let last_message_emitted = Arc::new(Mutex::new(vec![]));
862+
863+
let (sender_account, join_handle) = MockSenderAccount::spawn(
864+
Some(format!("{}:{}", prefix.clone(), SENDER.1,)),
865+
MockSenderAccount {
866+
last_message_emitted: last_message_emitted.clone(),
867+
},
868+
(),
869+
)
870+
.await
871+
.unwrap();
872+
873+
let new_receipt_notification = NewReceiptNotification {
874+
id: 1,
875+
allocation_id: *ALLOCATION_ID_0,
876+
signer_address: SIGNER.1,
877+
timestamp_ns: 1,
878+
value: 1,
879+
};
880+
881+
handle_notification(new_receipt_notification, &escrow_accounts, Some(&prefix))
882+
.await
883+
.unwrap();
884+
885+
tokio::time::sleep(Duration::from_millis(10)).await;
886+
887+
assert_eq!(
888+
last_message_emitted.lock().unwrap().last().unwrap(),
889+
&SenderAccountMessage::NewAllocationId(*ALLOCATION_ID_0)
890+
);
891+
sender_account.stop_and_wait(None, None).await.unwrap();
892+
join_handle.await.unwrap();
893+
}
806894
}

tap-agent/src/agent/sender_allocation.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,7 @@ impl SenderAllocationState {
714714
}
715715

716716
#[cfg(test)]
717-
mod tests {
717+
pub mod tests {
718718
use super::{
719719
SenderAllocation, SenderAllocationArgs, SenderAllocationMessage, SenderAllocationState,
720720
};
@@ -760,8 +760,8 @@ mod tests {
760760

761761
const DUMMY_URL: &str = "http://localhost:1234";
762762

763-
struct MockSenderAccount {
764-
last_message_emitted: Arc<Mutex<Vec<SenderAccountMessage>>>,
763+
pub struct MockSenderAccount {
764+
pub last_message_emitted: Arc<Mutex<Vec<SenderAccountMessage>>>,
765765
}
766766

767767
#[async_trait::async_trait]

0 commit comments

Comments
 (0)