Skip to content

Commit c0804cf

Browse files
committed
chore(agent): add escrow monitoring
1 parent b501dc7 commit c0804cf

File tree

2 files changed

+184
-8
lines changed

2 files changed

+184
-8
lines changed

crates/tap-agent/src/agent/sender_account_task.rs

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@ use std::{
1111
sync::Arc,
1212
};
1313

14+
use anyhow::Context;
15+
1416
#[cfg(not(any(test, feature = "test")))]
1517
use std::time::Duration;
1618

1719
use anyhow::Result;
1820
use bigdecimal::ToPrimitive;
1921
use indexer_monitor::{EscrowAccounts, SubgraphClient};
2022
use thegraph_core::alloy::{
23+
hex::ToHexExt,
2124
primitives::{Address, U256},
2225
sol_types::Eip712Domain,
2326
};
@@ -1192,18 +1195,32 @@ impl SenderAccountTask {
11921195
if should_deny {
11931196
tracing::warn!(
11941197
sender = %state.sender,
1195-
"Sender deny condition reached - would add to denylist in production"
1198+
"Sender deny condition reached - adding to database denylist"
11961199
);
1197-
// TODO: In full production implementation, add to database denylist
1198-
// This would call add_to_denylist() similar to the original ractor version
1200+
1201+
// Add to database denylist
1202+
if let Err(e) = Self::add_to_denylist(&state.pgpool, state.sender).await {
1203+
tracing::error!(
1204+
sender = %state.sender,
1205+
error = %e,
1206+
"Failed to add sender to database denylist, updating local state only"
1207+
);
1208+
}
11991209
state.denied = true;
12001210
} else {
12011211
tracing::info!(
12021212
sender = %state.sender,
1203-
"Sender deny condition resolved - would remove from denylist in production"
1213+
"Sender deny condition resolved - removing from database denylist"
12041214
);
1205-
// TODO: In full production implementation, remove from database denylist
1206-
// This would call remove_from_denylist() similar to the original ractor version
1215+
1216+
// Remove from database denylist
1217+
if let Err(e) = Self::remove_from_denylist(&state.pgpool, state.sender).await {
1218+
tracing::error!(
1219+
sender = %state.sender,
1220+
error = %e,
1221+
"Failed to remove sender from database denylist, updating local state only"
1222+
);
1223+
}
12071224
state.denied = false;
12081225
}
12091226
}
@@ -1232,6 +1249,60 @@ impl SenderAccountTask {
12321249
state.denied = should_deny;
12331250
}
12341251
}
1252+
1253+
/// Add sender to database denylist
1254+
#[allow(dead_code)] // Used in production code only
1255+
async fn add_to_denylist(pgpool: &sqlx::PgPool, sender: Address) -> Result<()> {
1256+
tracing::info!(
1257+
sender = %sender,
1258+
"Adding sender to database denylist"
1259+
);
1260+
1261+
sqlx::query!(
1262+
r#"
1263+
INSERT INTO scalar_tap_denylist (sender_address)
1264+
VALUES ($1) ON CONFLICT DO NOTHING
1265+
"#,
1266+
sender.encode_hex(),
1267+
)
1268+
.execute(pgpool)
1269+
.await
1270+
.context("Failed to insert sender into denylist")?;
1271+
1272+
tracing::debug!(
1273+
sender = %sender,
1274+
"Successfully added sender to database denylist"
1275+
);
1276+
1277+
Ok(())
1278+
}
1279+
1280+
/// Remove sender from database denylist
1281+
#[allow(dead_code)] // Used in production code only
1282+
async fn remove_from_denylist(pgpool: &sqlx::PgPool, sender: Address) -> Result<()> {
1283+
tracing::info!(
1284+
sender = %sender,
1285+
"Removing sender from database denylist"
1286+
);
1287+
1288+
sqlx::query!(
1289+
r#"
1290+
DELETE FROM scalar_tap_denylist
1291+
WHERE sender_address = $1
1292+
"#,
1293+
sender.encode_hex(),
1294+
)
1295+
.execute(pgpool)
1296+
.await
1297+
.context("Failed to remove sender from denylist")?;
1298+
1299+
tracing::debug!(
1300+
sender = %sender,
1301+
"Successfully removed sender from database denylist"
1302+
);
1303+
1304+
Ok(())
1305+
}
12351306
}
12361307

12371308
#[cfg(test)]

crates/tap-agent/src/agent/sender_accounts_manager_task.rs

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ use super::{
2828

2929
#[cfg(any(test, feature = "test"))]
3030
use super::sender_account_task::SenderAccountTask;
31+
32+
#[cfg(not(any(test, feature = "test")))]
33+
use super::sender_account_task::SenderAccountTask;
3134
use crate::task_lifecycle::{LifecycleManager, RestartPolicy, TaskHandle, TaskRegistry};
3235

3336
/// Tokio task-based replacement for SenderAccountsManager actor
@@ -274,7 +277,58 @@ impl SenderAccountsManagerTask {
274277

275278
#[cfg(not(any(test, feature = "test")))]
276279
{
277-
tracing::warn!("Production V1 sender account spawning not fully implemented yet");
280+
// For production, create real escrow account monitoring via subgraph queries
281+
let escrow_rx = indexer_monitor::escrow_accounts_v1(
282+
state.escrow_subgraph,
283+
state.config.indexer_address,
284+
state.config.escrow_polling_interval,
285+
true, // reject_thawing_signers
286+
)
287+
.await
288+
.map_err(|e| {
289+
anyhow::anyhow!(
290+
"Failed to create V1 escrow accounts watcher for sender {}: {}",
291+
sender,
292+
e
293+
)
294+
})?;
295+
296+
let child_handle = SenderAccountTask::spawn(
297+
&state.lifecycle,
298+
Some(task_name.clone()),
299+
sender,
300+
state.config,
301+
state.pgpool.clone(),
302+
escrow_rx,
303+
state.escrow_subgraph,
304+
state.network_subgraph,
305+
state.domain_separator.clone(),
306+
state
307+
.sender_aggregator_endpoints
308+
.get(&sender)
309+
.cloned()
310+
.unwrap_or_else(|| {
311+
tracing::warn!(
312+
sender = %sender,
313+
"No aggregator endpoint configured for sender, using default"
314+
);
315+
"http://localhost:8080".parse().unwrap()
316+
}),
317+
state.prefix.clone(),
318+
)
319+
.await?;
320+
321+
// Register the child task
322+
state
323+
.child_registry
324+
.register(task_name.clone(), child_handle)
325+
.await;
326+
327+
tracing::info!(
328+
sender = %sender,
329+
task_name = %task_name,
330+
"Production V1 sender account task with real escrow monitoring spawned successfully"
331+
);
278332
}
279333

280334
Ok(())
@@ -321,7 +375,58 @@ impl SenderAccountsManagerTask {
321375

322376
#[cfg(not(any(test, feature = "test")))]
323377
{
324-
tracing::warn!("Production V2 sender account spawning not fully implemented yet");
378+
// For production, create real escrow account monitoring via subgraph queries
379+
let escrow_rx = indexer_monitor::escrow_accounts_v2(
380+
state.escrow_subgraph,
381+
state.config.indexer_address,
382+
state.config.escrow_polling_interval,
383+
true, // reject_thawing_signers
384+
)
385+
.await
386+
.map_err(|e| {
387+
anyhow::anyhow!(
388+
"Failed to create V2 escrow accounts watcher for sender {}: {}",
389+
sender,
390+
e
391+
)
392+
})?;
393+
394+
let child_handle = SenderAccountTask::spawn(
395+
&state.lifecycle,
396+
Some(task_name.clone()),
397+
sender,
398+
state.config,
399+
state.pgpool.clone(),
400+
escrow_rx,
401+
state.escrow_subgraph,
402+
state.network_subgraph,
403+
state.domain_separator.clone(),
404+
state
405+
.sender_aggregator_endpoints
406+
.get(&sender)
407+
.cloned()
408+
.unwrap_or_else(|| {
409+
tracing::warn!(
410+
sender = %sender,
411+
"No aggregator endpoint configured for sender, using default"
412+
);
413+
"http://localhost:8080".parse().unwrap()
414+
}),
415+
state.prefix.clone(),
416+
)
417+
.await?;
418+
419+
// Register the child task
420+
state
421+
.child_registry
422+
.register(task_name.clone(), child_handle)
423+
.await;
424+
425+
tracing::info!(
426+
sender = %sender,
427+
task_name = %task_name,
428+
"Production V2 sender account task with real escrow monitoring spawned successfully"
429+
);
325430
}
326431

327432
Ok(())

0 commit comments

Comments
 (0)