Skip to content

Commit 409c515

Browse files
authored
fix(tap-agent): deny sender on failure to create (#300)
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent a8fa51e commit 409c515

File tree

3 files changed

+119
-40
lines changed

3 files changed

+119
-40
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -253,16 +253,7 @@ impl State {
253253
"Denying sender."
254254
);
255255

256-
sqlx::query!(
257-
r#"
258-
INSERT INTO scalar_tap_denylist (sender_address)
259-
VALUES ($1) ON CONFLICT DO NOTHING
260-
"#,
261-
self.sender.encode_hex(),
262-
)
263-
.execute(&self.pgpool)
264-
.await
265-
.expect("Should not fail to insert into denylist");
256+
SenderAccount::deny_sender(&self.pgpool, self.sender).await;
266257
self.denied = true;
267258
SENDER_DENIED
268259
.with_label_values(&[&self.sender.to_string()])
@@ -810,6 +801,21 @@ impl Actor for SenderAccount {
810801
}
811802
}
812803

804+
impl SenderAccount {
805+
pub async fn deny_sender(pool: &sqlx::PgPool, sender: Address) {
806+
sqlx::query!(
807+
r#"
808+
INSERT INTO scalar_tap_denylist (sender_address)
809+
VALUES ($1) ON CONFLICT DO NOTHING
810+
"#,
811+
sender.encode_hex(),
812+
)
813+
.execute(pool)
814+
.await
815+
.expect("Should not fail to insert into denylist");
816+
}
817+
}
818+
813819
#[cfg(test)]
814820
pub mod tests {
815821
use super::{SenderAccount, SenderAccountArgs, SenderAccountMessage};

tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 102 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ impl Actor for SenderAccountsManager {
148148
for (sender_id, allocation_ids) in sender_allocation {
149149
state.sender_ids.insert(sender_id);
150150
state
151-
.create_sender_account(myself.get_cell(), sender_id, allocation_ids)
152-
.await?;
151+
.create_or_deny_sender(myself.get_cell(), sender_id, allocation_ids)
152+
.await;
153153
}
154154

155155
// Start the new_receipts_watcher task that will consume from the `pglistener`
@@ -191,16 +191,9 @@ impl Actor for SenderAccountsManager {
191191
SenderAccountsManagerMessage::UpdateSenderAccounts(target_senders) => {
192192
// Create new sender accounts
193193
for sender in target_senders.difference(&state.sender_ids) {
194-
if let Err(e) = state
195-
.create_sender_account(myself.get_cell(), *sender, HashSet::new())
196-
.await
197-
{
198-
error!(
199-
sender_address = %sender,
200-
error = %e,
201-
"There was an error while creating a sender account."
202-
);
203-
}
194+
state
195+
.create_or_deny_sender(myself.get_cell(), *sender, HashSet::new())
196+
.await;
204197
}
205198

206199
// Remove sender accounts
@@ -263,16 +256,9 @@ impl Actor for SenderAccountsManager {
263256
.remove(&sender_id)
264257
.unwrap_or(HashSet::new());
265258

266-
if let Err(e) = state
267-
.create_sender_account(myself.get_cell(), sender_id, allocations)
268-
.await
269-
{
270-
error!(
271-
error = %e,
272-
sender_address = %sender_id,
273-
"There was an error while re-creating sender account."
274-
);
275-
}
259+
state
260+
.create_or_deny_sender(myself.get_cell(), sender_id, allocations)
261+
.await;
276262
}
277263
_ => {}
278264
}
@@ -291,13 +277,46 @@ impl State {
291277
sender_allocation_id
292278
}
293279

280+
async fn create_or_deny_sender(
281+
&self,
282+
supervisor: ActorCell,
283+
sender_id: Address,
284+
allocation_ids: HashSet<Address>,
285+
) {
286+
if let Err(e) = self
287+
.create_sender_account(supervisor, sender_id, allocation_ids)
288+
.await
289+
{
290+
error!(
291+
"There was an error while starting the sender {}, denying it. Error: {:?}",
292+
sender_id, e
293+
);
294+
SenderAccount::deny_sender(&self.pgpool, sender_id).await;
295+
}
296+
}
297+
294298
async fn create_sender_account(
295299
&self,
296300
supervisor: ActorCell,
297301
sender_id: Address,
298302
allocation_ids: HashSet<Address>,
299303
) -> anyhow::Result<()> {
300-
let args = self.new_sender_account_args(&sender_id, allocation_ids)?;
304+
let Ok(args) = self.new_sender_account_args(&sender_id, allocation_ids) else {
305+
warn!(
306+
"Sender {} is not on your [tap.sender_aggregator_endpoints] list. \
307+
\
308+
This means that you don't recognize this sender and don't want to \
309+
provide queries for it.
310+
\
311+
If you do recognize and want to serve queries for it, \
312+
add a new entry to the config [tap.sender_aggregator_endpoints]",
313+
sender_id
314+
);
315+
bail!(
316+
"No sender_aggregator_endpoints found for sender {}",
317+
sender_id
318+
);
319+
};
301320
SenderAccount::spawn_linked(
302321
Some(self.format_sender_account(&sender_id)),
303322
SenderAccount,
@@ -428,12 +447,10 @@ impl State {
428447
sender_aggregator_endpoint: self
429448
.sender_aggregator_endpoints
430449
.get(sender_id)
431-
.ok_or_else(|| {
432-
anyhow!(
433-
"No sender_aggregator_endpoint found for sender {}",
434-
sender_id
435-
)
436-
})?
450+
.ok_or(anyhow!(
451+
"No sender_aggregator_endpoints found for sender {}",
452+
sender_id
453+
))?
437454
.clone(),
438455
allocation_ids,
439456
prefix: self.prefix.clone(),
@@ -567,8 +584,9 @@ mod tests {
567584
use crate::config;
568585
use crate::tap::test_utils::{
569586
create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID_0,
570-
ALLOCATION_ID_1, INDEXER, SENDER, SENDER_2, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR,
587+
ALLOCATION_ID_1, INDEXER, SENDER, SENDER_2, SENDER_3, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR,
571588
};
589+
use alloy::hex::ToHexExt;
572590
use alloy::primitives::Address;
573591
use eventuals::{Eventual, EventualExt};
574592
use indexer_common::allocations::Allocation;
@@ -783,6 +801,60 @@ mod tests {
783801
handle.await.unwrap();
784802
}
785803

804+
#[sqlx::test(migrations = "../migrations")]
805+
async fn test_deny_sender_account_on_failure(pgpool: PgPool) {
806+
struct DummyActor;
807+
#[async_trait::async_trait]
808+
impl Actor for DummyActor {
809+
type Msg = ();
810+
type State = ();
811+
type Arguments = ();
812+
813+
async fn pre_start(
814+
&self,
815+
_: ActorRef<Self::Msg>,
816+
_: Self::Arguments,
817+
) -> Result<Self::State, ActorProcessingErr> {
818+
Ok(())
819+
}
820+
}
821+
822+
let (_prefix, state) = create_state(pgpool.clone());
823+
let (supervisor, handle) = DummyActor::spawn(None, DummyActor, ()).await.unwrap();
824+
// we wait to check if the sender is created
825+
826+
let sender_id = SENDER_3.1;
827+
828+
state
829+
.create_or_deny_sender(supervisor.get_cell(), sender_id, HashSet::new())
830+
.await;
831+
832+
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
833+
834+
// TODO check if sender is denied
835+
836+
let denied = sqlx::query!(
837+
r#"
838+
SELECT EXISTS (
839+
SELECT 1
840+
FROM scalar_tap_denylist
841+
WHERE sender_address = $1
842+
) as denied
843+
"#,
844+
sender_id.encode_hex(),
845+
)
846+
.fetch_one(&pgpool)
847+
.await
848+
.unwrap()
849+
.denied
850+
.expect("Deny status cannot be null");
851+
852+
assert!(denied, "Sender was not denied after failing.");
853+
854+
supervisor.stop_and_wait(None, None).await.unwrap();
855+
handle.await.unwrap();
856+
}
857+
786858
#[sqlx::test(migrations = "../migrations")]
787859
async fn test_receive_notifications_(pgpool: PgPool) {
788860
let prefix = format!(

tap-agent/src/tap/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ lazy_static! {
2929
Address::from_str("0xbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc").unwrap();
3030
pub static ref SENDER: (PrivateKeySigner, Address) = wallet(0);
3131
pub static ref SENDER_2: (PrivateKeySigner, Address) = wallet(1);
32+
pub static ref SENDER_3: (PrivateKeySigner, Address) = wallet(4);
3233
pub static ref SIGNER: (PrivateKeySigner, Address) = wallet(2);
3334
pub static ref INDEXER: (PrivateKeySigner, Address) = wallet(3);
3435
pub static ref TAP_EIP712_DOMAIN_SEPARATOR: Eip712Domain = eip712_domain! {

0 commit comments

Comments
 (0)