diff --git a/.sqlx/query-b1d4dfcd202af310df032edc34d83dabb56d7b947b023ee3a8b32b24b07bcd18.json b/.sqlx/query-1644e9aa44b08e99180cff30a6b0cc1fe1e5367bd545ca489d116de0a709a6ee.json similarity index 53% rename from .sqlx/query-b1d4dfcd202af310df032edc34d83dabb56d7b947b023ee3a8b32b24b07bcd18.json rename to .sqlx/query-1644e9aa44b08e99180cff30a6b0cc1fe1e5367bd545ca489d116de0a709a6ee.json index 5c09842b4..4d1501214 100644 --- a/.sqlx/query-b1d4dfcd202af310df032edc34d83dabb56d7b947b023ee3a8b32b24b07bcd18.json +++ b/.sqlx/query-1644e9aa44b08e99180cff30a6b0cc1fe1e5367bd545ca489d116de0a709a6ee.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT allocation_id, value_aggregate\n FROM scalar_tap_ravs\n WHERE sender_address = $1 AND last AND NOT final;\n ", + "query": "\n SELECT allocation_id, value_aggregate\n FROM scalar_tap_ravs\n WHERE sender_address = $1 AND last AND NOT final;\n ", "describe": { "columns": [ { @@ -24,5 +24,5 @@ false ] }, - "hash": "b1d4dfcd202af310df032edc34d83dabb56d7b947b023ee3a8b32b24b07bcd18" + "hash": "1644e9aa44b08e99180cff30a6b0cc1fe1e5367bd545ca489d116de0a709a6ee" } diff --git a/.sqlx/query-386e6b18da62b478ccdba5439490817f582abaec68e1ba01d55daee1e1edcbbd.json b/.sqlx/query-386e6b18da62b478ccdba5439490817f582abaec68e1ba01d55daee1e1edcbbd.json new file mode 100644 index 000000000..0dd3c43d1 --- /dev/null +++ b/.sqlx/query-386e6b18da62b478ccdba5439490817f582abaec68e1ba01d55daee1e1edcbbd.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n payer,\n ARRAY_AGG(DISTINCT allocation_id) FILTER (WHERE NOT last) AS allocation_ids\n FROM tap_horizon_ravs\n GROUP BY payer\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "payer", + "type_info": "Bpchar" + }, + { + "ordinal": 1, + "name": "allocation_ids", + "type_info": "BpcharArray" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + null + ] + }, + "hash": "386e6b18da62b478ccdba5439490817f582abaec68e1ba01d55daee1e1edcbbd" +} diff --git a/.sqlx/query-7bf9c412120de537eecb8efb64da5b4ace9acc032be502cd1d9fc72c5b9ed50a.json b/.sqlx/query-7bf9c412120de537eecb8efb64da5b4ace9acc032be502cd1d9fc72c5b9ed50a.json new file mode 100644 index 000000000..ea67b93bb --- /dev/null +++ b/.sqlx/query-7bf9c412120de537eecb8efb64da5b4ace9acc032be502cd1d9fc72c5b9ed50a.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH grouped AS (\n SELECT signer_address, allocation_id\n FROM tap_horizon_receipts\n GROUP BY signer_address, allocation_id\n )\n SELECT \n signer_address,\n ARRAY_AGG(allocation_id) AS allocation_ids\n FROM grouped\n GROUP BY signer_address\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "signer_address", + "type_info": "Bpchar" + }, + { + "ordinal": 1, + "name": "allocation_ids", + "type_info": "BpcharArray" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + null + ] + }, + "hash": "7bf9c412120de537eecb8efb64da5b4ace9acc032be502cd1d9fc72c5b9ed50a" +} diff --git a/Cargo.lock b/Cargo.lock index 6bcb26379..c862e56cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2664,6 +2664,8 @@ version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcaee3d8e3cfc3fd92428d477bc97fc29ec8716d180c0d74c643bb26166660e0" dependencies = [ + "anstream", + "anstyle", "env_filter", "log", ] @@ -7461,7 +7463,9 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dffced63c2b5c7be278154d76b479f9f9920ed34e7574201407f0b14e2bbb93" dependencies = [ + "env_logger", "test-log-macros", + "tracing-subscriber", ] [[package]] diff --git a/crates/monitor/src/escrow_accounts.rs b/crates/monitor/src/escrow_accounts.rs index d471f4eb5..6c1fd047e 100644 --- a/crates/monitor/src/escrow_accounts.rs +++ b/crates/monitor/src/escrow_accounts.rs @@ -95,12 +95,33 @@ pub async fn escrow_accounts( reject_thawing_signers: bool, ) -> Result { indexer_watcher::new_watcher(interval, move || { - get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers) + get_escrow_accounts_v1(escrow_subgraph, indexer_address, reject_thawing_signers) }) .await } -async fn get_escrow_accounts( +pub async fn escrow_accounts_v2( + escrow_subgraph: &'static SubgraphClient, + indexer_address: Address, + interval: Duration, + reject_thawing_signers: bool, +) -> Result { + indexer_watcher::new_watcher(interval, move || { + get_escrow_accounts_v2(escrow_subgraph, indexer_address, reject_thawing_signers) + }) + .await +} + +// TODO implement escrow accounts v2 query +async fn get_escrow_accounts_v2( + _escrow_subgraph: &'static SubgraphClient, + _indexer_address: Address, + _reject_thawing_signers: bool, +) -> anyhow::Result { + Ok(EscrowAccounts::new(HashMap::new(), HashMap::new())) +} + +async fn get_escrow_accounts_v1( escrow_subgraph: &'static SubgraphClient, indexer_address: Address, reject_thawing_signers: bool, diff --git a/crates/monitor/src/lib.rs b/crates/monitor/src/lib.rs index d29edff82..a9a10b6a6 100644 --- a/crates/monitor/src/lib.rs +++ b/crates/monitor/src/lib.rs @@ -15,6 +15,7 @@ pub use crate::{ deployment_to_allocation::{deployment_to_allocation, DeploymentToAllocationWatcher}, dispute_manager::{dispute_manager, DisputeManagerWatcher}, escrow_accounts::{ - escrow_accounts, EscrowAccounts, EscrowAccountsError, EscrowAccountsWatcher, + escrow_accounts, escrow_accounts_v2, EscrowAccounts, EscrowAccountsError, + EscrowAccountsWatcher, }, }; diff --git a/crates/tap-agent/Cargo.toml b/crates/tap-agent/Cargo.toml index 3596e2c56..5feb893ed 100644 --- a/crates/tap-agent/Cargo.toml +++ b/crates/tap-agent/Cargo.toml @@ -62,6 +62,6 @@ tempfile = "3.8.0" wiremock.workspace = true wiremock-grpc = "0.0.3-alpha3" test-assets = { path = "../test-assets" } -test-log = { version = "0.2.12", default-features = false } +test-log = { version = "0.2.12", features = ["trace"] } bon = "3.3" rstest = "0.24.0" diff --git a/crates/tap-agent/src/agent.rs b/crates/tap-agent/src/agent.rs index 8fafec575..d903a7ea4 100644 --- a/crates/tap-agent/src/agent.rs +++ b/crates/tap-agent/src/agent.rs @@ -39,7 +39,9 @@ use indexer_config::{ Config, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, NetworkSubgraphConfig, SubgraphConfig, SubgraphsConfig, TapConfig, }; -use indexer_monitor::{escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient}; +use indexer_monitor::{ + escrow_accounts, escrow_accounts_v2, indexer_allocations, DeploymentDetails, SubgraphClient, +}; use ractor::{concurrency::JoinHandle, Actor, ActorRef}; use sender_account::SenderAccountConfig; use sender_accounts_manager::SenderAccountsManager; @@ -154,7 +156,16 @@ pub async fn start_agent() -> (ActorRef, JoinHandl .await, )); - let escrow_accounts = escrow_accounts( + let escrow_accounts_v1 = escrow_accounts( + escrow_subgraph, + *indexer_address, + *escrow_sync_interval, + false, + ) + .await + .expect("Error creating escrow_accounts channel"); + + let escrow_accounts_v2 = escrow_accounts_v2( escrow_subgraph, *indexer_address, *escrow_sync_interval, @@ -170,7 +181,8 @@ pub async fn start_agent() -> (ActorRef, JoinHandl domain_separator: EIP_712_DOMAIN.clone(), pgpool, indexer_allocations, - escrow_accounts, + escrow_accounts_v1, + escrow_accounts_v2, escrow_subgraph, network_subgraph, sender_aggregator_endpoints: sender_aggregator_endpoints.clone(), diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index 229288e79..515f5dd2b 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -35,7 +35,7 @@ use tonic::transport::{Channel, Endpoint}; use tracing::Level; use super::{ - sender_accounts_manager::AllocationId, + sender_accounts_manager::{AllocationId, SenderType}, sender_allocation::{ AllocationConfig, SenderAllocation, SenderAllocationArgs, SenderAllocationMessage, }, @@ -233,6 +233,9 @@ pub struct SenderAccountArgs { /// Configuration for retry scheduler in case sender is denied pub retry_interval: Duration, + + /// Sender type, used to decide which set of tables to use + pub sender_type: SenderType, } /// State used by the actor @@ -321,6 +324,9 @@ pub struct State { /// limited to `max_amount_willing_to_lose_grt` trusted_sender: bool, + /// Sender type, used to decide which set of tables to use + sender_type: SenderType, + // Config forwarded to [SenderAllocation] config: &'static SenderAccountConfig, } @@ -574,7 +580,7 @@ impl State { "Denying sender." ); - SenderAccount::deny_sender(&self.pgpool, self.sender).await; + SenderAccount::deny_sender(self.sender_type, &self.pgpool, self.sender).await; self.denied = true; SENDER_DENIED .with_label_values(&[&self.sender.to_string()]) @@ -590,16 +596,21 @@ impl State { sender_balance = self.sender_balance.to_u128(), "Allowing sender." ); - sqlx::query!( - r#" + match self.sender_type { + SenderType::Legacy => { + sqlx::query!( + r#" DELETE FROM scalar_tap_denylist WHERE sender_address = $1 "#, - self.sender.encode_hex(), - ) - .execute(&self.pgpool) - .await - .expect("Should not fail to delete from denylist"); + self.sender.encode_hex(), + ) + .execute(&self.pgpool) + .await + .expect("Should not fail to delete from denylist"); + } + SenderType::Horizon => unimplemented!(), + } self.denied = false; SENDER_DENIED @@ -688,6 +699,7 @@ impl Actor for SenderAccount { allocation_ids, prefix, retry_interval, + sender_type, }: Self::Arguments, ) -> Result { let myself_clone = myself.clone(); @@ -714,39 +726,58 @@ impl Actor for SenderAccount { .get_balance_for_sender(&sender_id) .unwrap_or_default(); async move { - let last_non_final_ravs = sqlx::query!( - r#" - SELECT allocation_id, value_aggregate - FROM scalar_tap_ravs - WHERE sender_address = $1 AND last AND NOT final; - "#, - sender_id.encode_hex(), - ) - .fetch_all(&pgpool) - .await - .expect("Should not fail to fetch from scalar_tap_ravs"); + let last_non_final_ravs = match sender_type { + // Get all ravs from v1 table + SenderType::Legacy => sqlx::query!( + r#" + SELECT allocation_id, value_aggregate + FROM scalar_tap_ravs + WHERE sender_address = $1 AND last AND NOT final; + "#, + sender_id.encode_hex(), + ) + .fetch_all(&pgpool) + .await + .expect("Should not fail to fetch from scalar_tap_ravs"), + // Get all ravs from v2 table + SenderType::Horizon => { + unimplemented!() + } + }; // get a list from the subgraph of which subgraphs were already redeemed and were not marked as final - let redeemed_ravs_allocation_ids = match escrow_subgraph - .query::(unfinalized_transactions::Variables { - unfinalized_ravs_allocation_ids: last_non_final_ravs - .iter() - .map(|rav| rav.allocation_id.to_string()) - .collect::>(), - sender: format!("{:x?}", sender_id), - }) - .await - { - Ok(Ok(response)) => response - .transactions - .into_iter() - .map(|tx| { - tx.allocation_id - .expect("all redeem tx must have allocation_id") - }) - .collect::>(), - // if we have any problems, we don't want to filter out - _ => vec![], + let redeemed_ravs_allocation_ids = match sender_type { + SenderType::Legacy => { + // This query returns unfinalized transactions for v1 + match escrow_subgraph + .query::( + unfinalized_transactions::Variables { + unfinalized_ravs_allocation_ids: last_non_final_ravs + .iter() + .map(|rav| rav.allocation_id.to_string()) + .collect::>(), + sender: format!("{:x?}", sender_id), + }, + ) + .await + { + Ok(Ok(response)) => response + .transactions + .into_iter() + .map(|tx| { + tx.allocation_id + .expect("all redeem tx must have allocation_id") + }) + .collect::>(), + // if we have any problems, we don't want to filter out + _ => vec![], + } + } + // TODO Implement query for unfinalized v2 transactions + // Depends on Escrow Subgraph Schema + SenderType::Horizon => { + todo!() + } }; // filter the ravs marked as last that were not redeemed yet @@ -779,21 +810,25 @@ impl Actor for SenderAccount { } }); - // Get deny status from the scalar_tap_denylist table - let denied = sqlx::query!( - r#" + let denied = match sender_type { + // Get deny status from the scalar_tap_denylist table + SenderType::Legacy => sqlx::query!( + r#" SELECT EXISTS ( SELECT 1 FROM scalar_tap_denylist WHERE sender_address = $1 ) as denied "#, - sender_id.encode_hex(), - ) - .fetch_one(&pgpool) - .await? - .denied - .expect("Deny status cannot be null"); + sender_id.encode_hex(), + ) + .fetch_one(&pgpool) + .await? + .denied + .expect("Deny status cannot be null"), + // Get deny status from the tap horizon table + SenderType::Horizon => unimplemented!(), + }; let sender_balance = escrow_accounts .borrow() @@ -860,6 +895,7 @@ impl Actor for SenderAccount { backoff_info: BackoffInfo::default(), trusted_sender: config.trusted_senders.contains(&sender_id), config, + sender_type, }; stream::iter(allocation_ids) @@ -937,7 +973,12 @@ impl Actor for SenderAccount { fee ***MONEY***. " ); - SenderAccount::deny_sender(&state.pgpool, state.sender).await; + SenderAccount::deny_sender( + state.sender_type, + &state.pgpool, + state.sender, + ) + .await; } // add new value @@ -1266,7 +1307,14 @@ impl Actor for SenderAccount { impl SenderAccount { /// Deny sender by giving `sender` [Address] - pub async fn deny_sender(pool: &PgPool, sender: Address) { + pub async fn deny_sender(sender_type: SenderType, pool: &PgPool, sender: Address) { + match sender_type { + SenderType::Legacy => Self::deny_v1_sender(pool, sender).await, + SenderType::Horizon => Self::deny_v2_sender(pool, sender).await, + } + } + + async fn deny_v1_sender(pool: &PgPool, sender: Address) { sqlx::query!( r#" INSERT INTO scalar_tap_denylist (sender_address) @@ -1278,6 +1326,10 @@ impl SenderAccount { .await .expect("Should not fail to insert into denylist"); } + + async fn deny_v2_sender(_pool: &PgPool, _sender: Address) { + unimplemented!() + } } #[cfg(test)] @@ -1537,8 +1589,8 @@ pub mod tests { flush_messages(¬ify).await; // should not delete it because it was not in network subgraph - let actor_ref = ActorRef::::where_is(sender_allocation_id.clone()); - assert!(actor_ref.is_some()); + let allocation_ref = + ActorRef::::where_is(sender_allocation_id.clone()).unwrap(); // Mock result for closed allocations @@ -1568,7 +1620,7 @@ pub mod tests { .cast(SenderAccountMessage::UpdateAllocationIds(HashSet::new())) .unwrap(); - flush_messages(¬ify).await; + allocation_ref.wait(None).await.unwrap(); let actor_ref = ActorRef::::where_is(sender_allocation_id.clone()); assert!(actor_ref.is_none()); diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index 46649a987..542c56492 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -83,12 +83,30 @@ impl Display for AllocationId { } } +/// Type used in [SenderAccountsManager] and [SenderAccount] to route the correct escrow queries +/// and to use the correct set of tables +#[derive(Clone, Copy)] +pub enum SenderType { + /// SenderAccounts that are found in Escrow Subgraph v1 (Legacy) + Legacy, + /// SenderAccounts that are found in Tap Collector v2 (Horizon) + Horizon, +} + /// Enum containing all types of messages that a [SenderAccountsManager] can receive #[derive(Debug)] pub enum SenderAccountsManagerMessage { /// Spawn and Stop [SenderAccount]s that were added or removed /// in comparison with it current state and updates the state - UpdateSenderAccounts(HashSet
), + /// + /// This tracks only v1 accounts + UpdateSenderAccountsV1(HashSet
), + + /// Spawn and Stop [SenderAccount]s that were added or removed + /// in comparison with it current state and updates the state + /// + /// This tracks only v2 accounts + UpdateSenderAccountsV2(HashSet
), } /// Arguments received in startup while spawing [SenderAccount] actor @@ -102,8 +120,10 @@ pub struct SenderAccountsManagerArgs { pub pgpool: PgPool, /// Watcher that returns a map of open and recently closed allocation ids pub indexer_allocations: Receiver>, - /// Watcher containing the escrow accounts - pub escrow_accounts: Receiver, + /// Watcher containing the escrow accounts for v1 + pub escrow_accounts_v1: Receiver, + /// Watcher containing the escrow accounts for v2 + pub escrow_accounts_v2: Receiver, /// SubgraphClient of the escrow subgraph pub escrow_subgraph: &'static SubgraphClient, /// SubgraphClient of the network subgraph @@ -120,14 +140,18 @@ pub struct SenderAccountsManagerArgs { /// This is a separate instance that makes it easier to have mutable /// reference, for more information check ractor library pub struct State { - sender_ids: HashSet
, + sender_ids_v1: HashSet
, + sender_ids_v2: HashSet
, new_receipts_watcher_handle: Option>, config: &'static SenderAccountConfig, domain_separator: Eip712Domain, pgpool: PgPool, indexer_allocations: Receiver>, - escrow_accounts: Receiver, + /// Watcher containing the escrow accounts for v1 + escrow_accounts_v1: Receiver, + /// Watcher containing the escrow accounts for v2 + escrow_accounts_v2: Receiver, escrow_subgraph: &'static SubgraphClient, network_subgraph: &'static SubgraphClient, sender_aggregator_endpoints: HashMap, @@ -151,7 +175,8 @@ impl Actor for SenderAccountsManager { domain_separator, indexer_allocations, pgpool, - escrow_accounts, + escrow_accounts_v1, + escrow_accounts_v2, escrow_subgraph, network_subgraph, sender_aggregator_endpoints, @@ -168,13 +193,29 @@ impl Actor for SenderAccountsManager { }); let pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap(); let myself_clone = myself.clone(); - let accounts_clone = escrow_accounts.clone(); + let accounts_clone = escrow_accounts_v1.clone(); watch_pipe(accounts_clone, move |escrow_accounts| { let senders = escrow_accounts.get_senders(); myself_clone - .cast(SenderAccountsManagerMessage::UpdateSenderAccounts(senders)) + .cast(SenderAccountsManagerMessage::UpdateSenderAccountsV1( + senders, + )) + .unwrap_or_else(|e| { + tracing::error!("Error while updating sender_accounts v1: {:?}", e); + }); + async {} + }); + + let myself_clone = myself.clone(); + let _escrow_accounts_v2 = escrow_accounts_v2.clone(); + watch_pipe(_escrow_accounts_v2, move |escrow_accounts| { + let senders = escrow_accounts.get_senders(); + myself_clone + .cast(SenderAccountsManagerMessage::UpdateSenderAccountsV2( + senders, + )) .unwrap_or_else(|e| { - tracing::error!("Error while updating sender_accounts: {:?}", e); + tracing::error!("Error while updating sender_accounts v2: {:?}", e); }); async {} }); @@ -182,28 +223,55 @@ impl Actor for SenderAccountsManager { let mut state = State { config, domain_separator, - sender_ids: HashSet::new(), + sender_ids_v1: HashSet::new(), + sender_ids_v2: HashSet::new(), new_receipts_watcher_handle: None, pgpool, indexer_allocations, - escrow_accounts: escrow_accounts.clone(), + escrow_accounts_v1: escrow_accounts_v1.clone(), + escrow_accounts_v2: escrow_accounts_v2.clone(), escrow_subgraph, network_subgraph, sender_aggregator_endpoints, prefix: prefix.clone(), }; - let sender_allocation = select! { - sender_allocation = state.get_pending_sender_allocation_id() => sender_allocation, + // v1 + let sender_allocation_v1 = select! { + sender_allocation = state.get_pending_sender_allocation_id_v1() => sender_allocation, _ = tokio::time::sleep(state.config.tap_sender_timeout) => { panic!("Timeout while getting pending sender allocation ids"); } }; + state.sender_ids_v1.extend(sender_allocation_v1.keys()); + stream::iter(sender_allocation_v1) + .map(|(sender_id, allocation_ids)| { + state.create_or_deny_sender( + myself.get_cell(), + sender_id, + allocation_ids, + SenderType::Legacy, + ) + }) + .buffer_unordered(10) // Limit concurrency to 10 senders at a time + .collect::>() + .await; - state.sender_ids.extend(sender_allocation.keys()); - - stream::iter(sender_allocation) + // v2 + let sender_allocation_v2 = select! { + sender_allocation = state.get_pending_sender_allocation_id_v2() => sender_allocation, + _ = tokio::time::sleep(state.config.tap_sender_timeout) => { + panic!("Timeout while getting pending sender allocation ids"); + } + }; + state.sender_ids_v2.extend(sender_allocation_v2.keys()); + stream::iter(sender_allocation_v2) .map(|(sender_id, allocation_ids)| { - state.create_or_deny_sender(myself.get_cell(), sender_id, allocation_ids) + state.create_or_deny_sender( + myself.get_cell(), + sender_id, + allocation_ids, + SenderType::Horizon, + ) }) .buffer_unordered(10) // Limit concurrency to 10 senders at a time .collect::>() @@ -214,7 +282,7 @@ impl Actor for SenderAccountsManager { state.new_receipts_watcher_handle = Some(tokio::spawn(new_receipts_watcher( myself.get_cell(), pglistener, - escrow_accounts, + escrow_accounts_v1, prefix, ))); @@ -246,24 +314,54 @@ impl Actor for SenderAccountsManager { ); match msg { - SenderAccountsManagerMessage::UpdateSenderAccounts(target_senders) => { + SenderAccountsManagerMessage::UpdateSenderAccountsV1(target_senders) => { + // Create new sender accounts + for sender in target_senders.difference(&state.sender_ids_v1) { + state + .create_or_deny_sender( + myself.get_cell(), + *sender, + HashSet::new(), + SenderType::Legacy, + ) + .await; + } + + // Remove sender accounts + for sender in state.sender_ids_v1.difference(&target_senders) { + if let Some(sender_handle) = ActorRef::::where_is( + state.format_sender_account(sender, SenderType::Legacy), + ) { + sender_handle.stop(None); + } + } + + state.sender_ids_v1 = target_senders; + } + + SenderAccountsManagerMessage::UpdateSenderAccountsV2(target_senders) => { // Create new sender accounts - for sender in target_senders.difference(&state.sender_ids) { + for sender in target_senders.difference(&state.sender_ids_v2) { state - .create_or_deny_sender(myself.get_cell(), *sender, HashSet::new()) + .create_or_deny_sender( + myself.get_cell(), + *sender, + HashSet::new(), + SenderType::Horizon, + ) .await; } // Remove sender accounts - for sender in state.sender_ids.difference(&target_senders) { + for sender in state.sender_ids_v2.difference(&target_senders) { if let Some(sender_handle) = ActorRef::::where_is( - state.format_sender_account(sender), + state.format_sender_account(sender, SenderType::Horizon), ) { sender_handle.stop(None); } } - state.sender_ids = target_senders; + state.sender_ids_v2 = target_senders; } } Ok(()) @@ -293,7 +391,8 @@ impl Actor for SenderAccountsManager { tracing::error!("SenderAllocation doesn't have a name"); return Ok(()); }; - let Some(sender_id) = sender_id.split(':').next_back() else { + let mut splitter = sender_id.split(':'); + let Some(sender_id) = splitter.next_back() else { tracing::error!(%sender_id, "Could not extract sender_id from name"); return Ok(()); }; @@ -301,9 +400,17 @@ impl Actor for SenderAccountsManager { tracing::error!(%sender_id, "Could not convert sender_id to Address"); return Ok(()); }; + let sender_type = match splitter.next_back() { + Some("legacy") => SenderType::Legacy, + Some("horizon") => SenderType::Horizon, + _ => { + tracing::error!(%sender_id, "Could not extract sender_type from name"); + return Ok(()); + } + }; let mut sender_allocation = select! { - sender_allocation = state.get_pending_sender_allocation_id() => sender_allocation, + sender_allocation = state.get_pending_sender_allocation_id_v1() => sender_allocation, _ = tokio::time::sleep(state.config.tap_sender_timeout) => { tracing::error!("Timeout while getting pending sender allocation ids"); return Ok(()); @@ -315,7 +422,7 @@ impl Actor for SenderAccountsManager { .unwrap_or(HashSet::new()); state - .create_or_deny_sender(myself.get_cell(), sender_id, allocations) + .create_or_deny_sender(myself.get_cell(), sender_id, allocations, sender_type) .await; } _ => {} @@ -325,12 +432,16 @@ impl Actor for SenderAccountsManager { } impl State { - fn format_sender_account(&self, sender: &Address) -> String { + fn format_sender_account(&self, sender: &Address, sender_type: SenderType) -> String { let mut sender_allocation_id = String::new(); if let Some(prefix) = &self.prefix { sender_allocation_id.push_str(prefix); sender_allocation_id.push(':'); } + sender_allocation_id.push_str(match sender_type { + SenderType::Legacy => "legacy:", + SenderType::Horizon => "horizon:", + }); sender_allocation_id.push_str(&format!("{}", sender)); sender_allocation_id } @@ -347,9 +458,10 @@ impl State { supervisor: ActorCell, sender_id: Address, allocation_ids: HashSet, + sender_type: SenderType, ) { if let Err(e) = self - .create_sender_account(supervisor, sender_id, allocation_ids) + .create_sender_account(supervisor, sender_id, allocation_ids, sender_type) .await { tracing::error!( @@ -357,7 +469,7 @@ impl State { sender_id, e ); - SenderAccount::deny_sender(&self.pgpool, sender_id).await; + SenderAccount::deny_sender(sender_type, &self.pgpool, sender_id).await; } } @@ -371,8 +483,9 @@ impl State { supervisor: ActorCell, sender_id: Address, allocation_ids: HashSet, + sender_type: SenderType, ) -> anyhow::Result<()> { - let Ok(args) = self.new_sender_account_args(&sender_id, allocation_ids) else { + let Ok(args) = self.new_sender_account_args(&sender_id, allocation_ids, sender_type) else { tracing::warn!( "Sender {} is not on your [tap.sender_aggregator_endpoints] list. \ \ @@ -389,7 +502,7 @@ impl State { ); }; SenderAccount::spawn_linked( - Some(self.format_sender_account(&sender_id)), + Some(self.format_sender_account(&sender_id, sender_type)), SenderAccount, args, supervisor, @@ -401,13 +514,14 @@ impl State { /// Gather all outstanding receipts and unfinalized RAVs from the database. /// Used to create [SenderAccount] instances for all senders that have unfinalized allocations /// and try to finalize them if they have become ineligible. - async fn get_pending_sender_allocation_id(&self) -> HashMap> { + /// + /// This loads legacy allocations + async fn get_pending_sender_allocation_id_v1(&self) -> HashMap> { // First we accumulate all allocations for each sender. This is because we may have more // than one signer per sender in DB. let mut unfinalized_sender_allocations_map: HashMap> = HashMap::new(); - // Legacy Allocations let receipts_signer_allocations_in_db = sqlx::query!( r#" WITH grouped AS ( @@ -441,7 +555,7 @@ impl State { let signer_id = Address::from_str(&row.signer_address) .expect("signer_address should be a valid address"); let sender_id = self - .escrow_accounts + .escrow_accounts_v1 .borrow() .get_sender_for_signer(&signer_id) .expect("should be able to get sender from signer"); @@ -487,9 +601,99 @@ impl State { .or_default() .extend(allocation_ids); } + unfinalized_sender_allocations_map + } + + /// Gather all outstanding receipts and unfinalized RAVs from the database. + /// Used to create [SenderAccount] instances for all senders that have unfinalized allocations + /// and try to finalize them if they have become ineligible. + /// + /// This loads horizon allocations + async fn get_pending_sender_allocation_id_v2(&self) -> HashMap> { + // First we accumulate all allocations for each sender. This is because we may have more + // than one signer per sender in DB. + let mut unfinalized_sender_allocations_map: HashMap> = + HashMap::new(); + + let receipts_signer_allocations_in_db = sqlx::query!( + r#" + WITH grouped AS ( + SELECT signer_address, allocation_id + FROM tap_horizon_receipts + GROUP BY signer_address, allocation_id + ) + SELECT + signer_address, + ARRAY_AGG(allocation_id) AS allocation_ids + FROM grouped + GROUP BY signer_address + "# + ) + .fetch_all(&self.pgpool) + .await + .expect("should be able to fetch pending receipts from the database"); + + for row in receipts_signer_allocations_in_db { + let allocation_ids = row + .allocation_ids + .expect("all receipts should have an allocation_id") + .iter() + .map(|allocation_id| { + AllocationId::Legacy( + Address::from_str(allocation_id) + .expect("allocation_id should be a valid address"), + ) + }) + .collect::>(); + let signer_id = Address::from_str(&row.signer_address) + .expect("signer_address should be a valid address"); + let sender_id = self + .escrow_accounts_v1 + .borrow() + .get_sender_for_signer(&signer_id) + .expect("should be able to get sender from signer"); + + // Accumulate allocations for the sender + unfinalized_sender_allocations_map + .entry(sender_id) + .or_default() + .extend(allocation_ids); + } + + let nonfinal_ravs_sender_allocations_in_db = sqlx::query!( + r#" + SELECT + payer, + ARRAY_AGG(DISTINCT allocation_id) FILTER (WHERE NOT last) AS allocation_ids + FROM tap_horizon_ravs + GROUP BY payer + "# + ) + .fetch_all(&self.pgpool) + .await + .expect("should be able to fetch unfinalized RAVs from the database"); - // TODO Load horizon allocations + for row in nonfinal_ravs_sender_allocations_in_db { + let allocation_ids = row + .allocation_ids + .expect("all RAVs should have an allocation_id") + .iter() + .map(|allocation_id| { + AllocationId::Legacy( + Address::from_str(allocation_id) + .expect("allocation_id should be a valid address"), + ) + }) + .collect::>(); + let sender_id = + Address::from_str(&row.payer).expect("sender_address should be a valid address"); + // Accumulate allocations for the sender + unfinalized_sender_allocations_map + .entry(sender_id) + .or_default() + .extend(allocation_ids); + } unfinalized_sender_allocations_map } @@ -501,12 +705,16 @@ impl State { &self, sender_id: &Address, allocation_ids: HashSet, + sender_type: SenderType, ) -> anyhow::Result { Ok(SenderAccountArgs { config: self.config, pgpool: self.pgpool.clone(), sender_id: *sender_id, - escrow_accounts: self.escrow_accounts.clone(), + escrow_accounts: match sender_type { + SenderType::Legacy => self.escrow_accounts_v1.clone(), + SenderType::Horizon => self.escrow_accounts_v2.clone(), + }, indexer_allocations: self.indexer_allocations.clone(), escrow_subgraph: self.escrow_subgraph, network_subgraph: self.network_subgraph, @@ -522,6 +730,7 @@ impl State { allocation_ids, prefix: self.prefix.clone(), retry_interval: Duration::from_secs(30), + sender_type, }) } } @@ -678,7 +887,9 @@ mod tests { use reqwest::Url; use ruint::aliases::U256; use sqlx::{postgres::PgListener, PgPool}; - use test_assets::{flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER}; + use test_assets::{ + assert_while_retry, flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER, + }; use thegraph_core::alloy::hex::ToHexExt; use tokio::sync::{ mpsc::{self, error::TryRecvError}, @@ -689,7 +900,9 @@ mod tests { use crate::{ agent::{ sender_account::SenderAccountMessage, - sender_accounts_manager::{handle_notification, AllocationId, NewReceiptNotification}, + sender_accounts_manager::{ + handle_notification, AllocationId, NewReceiptNotification, SenderType, + }, }, test::{ actors::{DummyActor, MockSenderAccount, MockSenderAllocation, TestableActor}, @@ -731,11 +944,13 @@ mod tests { State { config, domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), - sender_ids: HashSet::new(), + sender_ids_v1: HashSet::new(), + sender_ids_v2: HashSet::new(), new_receipts_watcher_handle: None, pgpool, indexer_allocations: watch::channel(HashSet::new()).1, - escrow_accounts: watch::channel(escrow_accounts).1, + escrow_accounts_v1: watch::channel(escrow_accounts.clone()).1, + escrow_accounts_v2: watch::channel(escrow_accounts).1, escrow_subgraph: get_subgraph_client().await, network_subgraph: get_subgraph_client().await, sender_aggregator_endpoints: HashMap::from([ @@ -763,7 +978,7 @@ mod tests { let signed_rav = create_rav(ALLOCATION_ID_1, SIGNER.0.clone(), 4, 10); store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); - let pending_allocation_id = state.get_pending_sender_allocation_id().await; + let pending_allocation_id = state.get_pending_sender_allocation_id_v1().await; // check if pending allocations are correct assert_eq!(pending_allocation_id.len(), 1); @@ -777,25 +992,38 @@ mod tests { create_sender_accounts_manager().pgpool(pgpool).call().await; actor - .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( + .cast(SenderAccountsManagerMessage::UpdateSenderAccountsV1( vec![SENDER.1].into_iter().collect(), )) .unwrap(); flush_messages(¬ify).await; + assert_while_retry! { + ActorRef::::where_is(format!( + "{}:legacy:{}", + prefix.clone(), + SENDER.1 + )).is_none() + }; + // verify if create sender account - let actor_ref = - ActorRef::::where_is(format!("{}:{}", prefix.clone(), SENDER.1)); - assert!(actor_ref.is_some()); + let sender_ref = ActorRef::::where_is(format!( + "{}:legacy:{}", + prefix.clone(), + SENDER.1 + )) + .unwrap(); actor - .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( + .cast(SenderAccountsManagerMessage::UpdateSenderAccountsV1( HashSet::new(), )) .unwrap(); flush_messages(¬ify).await; + + sender_ref.wait(None).await.unwrap(); // verify if it gets removed let actor_ref = ActorRef::::where_is(format!("{}:{}", prefix, SENDER.1)); @@ -812,12 +1040,17 @@ mod tests { let supervisor = DummyActor::spawn().await; // we wait to check if the sender is created state - .create_sender_account(supervisor.get_cell(), SENDER_2.1, HashSet::new()) + .create_sender_account( + supervisor.get_cell(), + SENDER_2.1, + HashSet::new(), + SenderType::Legacy, + ) .await .unwrap(); let actor_ref = - ActorRef::::where_is(format!("{}:{}", prefix, SENDER_2.1)); + ActorRef::::where_is(format!("{}:legacy:{}", prefix, SENDER_2.1)); assert!(actor_ref.is_some()); } @@ -827,7 +1060,12 @@ mod tests { let supervisor = DummyActor::spawn().await; state - .create_or_deny_sender(supervisor.get_cell(), INDEXER.1, HashSet::new()) + .create_or_deny_sender( + supervisor.get_cell(), + INDEXER.1, + HashSet::new(), + SenderType::Legacy, + ) .await; let denied = sqlx::query!( diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index 998a1e469..16cc3cf75 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -44,7 +44,7 @@ use crate::{ }, sender_accounts_manager::{ AllocationId, SenderAccountsManager, SenderAccountsManagerArgs, - SenderAccountsManagerMessage, + SenderAccountsManagerMessage, SenderType, }, }, tap::{ @@ -178,6 +178,7 @@ pub async fn create_sender_account( allocation_ids: HashSet::new(), prefix: Some(prefix.clone()), retry_interval: RETRY_DURATION, + sender_type: SenderType::Legacy, }; let actor = TestableActor::new(SenderAccount); @@ -198,6 +199,8 @@ pub async fn create_sender_accounts_manager( pgpool: PgPool, network_subgraph: Option<&str>, escrow_subgraph: Option<&str>, + initial_escrow_accounts_v1: Option, + initial_escrow_accounts_v2: Option, ) -> ( String, Arc, @@ -222,12 +225,18 @@ pub async fn create_sender_accounts_manager( .await, )); let (escrow_accounts_tx, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); - escrow_accounts_tx - .send(EscrowAccounts::new( - HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), - HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )) - .expect("Failed to update escrow_accounts channel"); + if let Some(escrow_acccounts) = initial_escrow_accounts_v1 { + escrow_accounts_tx + .send(escrow_acccounts) + .expect("Failed to update escrow_accounts channel"); + } + + let (escrow_accounts_tx_v2, escrow_accounts_rx_v2) = watch::channel(EscrowAccounts::default()); + if let Some(escrow_acccounts) = initial_escrow_accounts_v2 { + escrow_accounts_tx_v2 + .send(escrow_acccounts) + .expect("Failed to update escrow_accounts channel"); + } let prefix = generate_random_prefix(); let args = SenderAccountsManagerArgs { @@ -235,7 +244,8 @@ pub async fn create_sender_accounts_manager( domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), pgpool, indexer_allocations: allocations_rx, - escrow_accounts: escrow_accounts_rx, + escrow_accounts_v1: escrow_accounts_rx, + escrow_accounts_v2: escrow_accounts_rx_v2, escrow_subgraph, network_subgraph, sender_aggregator_endpoints: HashMap::from([ diff --git a/crates/tap-agent/tests/sender_account_manager_test.rs b/crates/tap-agent/tests/sender_account_manager_test.rs index 65fa85bf7..cbe1c5aed 100644 --- a/crates/tap-agent/tests/sender_account_manager_test.rs +++ b/crates/tap-agent/tests/sender_account_manager_test.rs @@ -1,8 +1,9 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; +use indexer_monitor::EscrowAccounts; use indexer_tap_agent::{ agent::{ sender_account::SenderAccountMessage, @@ -11,12 +12,14 @@ use indexer_tap_agent::{ }, test::{ create_received_receipt, create_sender_accounts_manager, store_receipt, ALLOCATION_ID_0, + ESCROW_VALUE, }, }; use ractor::{ActorRef, ActorStatus}; use serde_json::json; use sqlx::PgPool; use test_assets::{assert_while_retry, flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER}; +use thegraph_core::alloy::primitives::U256; use wiremock::{ matchers::{body_string_contains, method}, Mock, MockServer, ResponseTemplate, @@ -26,7 +29,7 @@ const TRIGGER_VALUE: u128 = 100; // This test should ensure the full flow starting from // sender account manager layer to work, up to closing an allocation -#[sqlx::test(migrations = "../../migrations")] +#[test_log::test(sqlx::test(migrations = "../../migrations"))] async fn sender_account_manager_layer_test(pgpool: PgPool) { let mock_network_subgraph_server: MockServer = MockServer::start().await; mock_network_subgraph_server @@ -63,19 +66,34 @@ async fn sender_account_manager_layer_test(pgpool: PgPool) { .pgpool(pgpool.clone()) .network_subgraph(&mock_network_subgraph_server.uri()) .escrow_subgraph(&mock_escrow_subgraph_server.uri()) + .initial_escrow_accounts_v1(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) .call() .await; actor - .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( + .cast(SenderAccountsManagerMessage::UpdateSenderAccountsV1( vec![SENDER.1].into_iter().collect(), )) .unwrap(); flush_messages(¬ify).await; + assert_while_retry!({ + ActorRef::::where_is(format!( + "{}:legacy:{}", + prefix.clone(), + SENDER.1 + )) + .is_none() + }); // verify if create sender account - let sender_account_ref = - ActorRef::::where_is(format!("{}:{}", prefix.clone(), SENDER.1)); + let sender_account_ref = ActorRef::::where_is(format!( + "{}:legacy:{}", + prefix.clone(), + SENDER.1 + )); assert!(sender_account_ref.is_some()); let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, 1, 1, TRIGGER_VALUE - 10); @@ -124,14 +142,15 @@ async fn sender_account_manager_layer_test(pgpool: PgPool) { // this calls and closes acounts manager sender accounts actor - .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( + .cast(SenderAccountsManagerMessage::UpdateSenderAccountsV1( HashSet::new(), )) .unwrap(); sender_account_ref.unwrap().wait(None).await.unwrap(); // verify if it gets removed - let actor_ref = ActorRef::::where_is(format!("{}:{}", prefix, SENDER.1)); + let actor_ref = + ActorRef::::where_is(format!("{}:legacy:{}", prefix, SENDER.1)); assert!(actor_ref.is_none()); let rav_marked_as_last = sqlx::query!( diff --git a/crates/tap-agent/tests/tap_agent_test.rs b/crates/tap-agent/tests/tap_agent_test.rs index ea78db60b..a58f19e41 100644 --- a/crates/tap-agent/tests/tap_agent_test.rs +++ b/crates/tap-agent/tests/tap_agent_test.rs @@ -100,7 +100,8 @@ pub async fn start_agent( domain_separator: TAP_EIP712_DOMAIN.clone(), pgpool, indexer_allocations: indexer_allocations1, - escrow_accounts, + escrow_accounts_v1: escrow_accounts.clone(), + escrow_accounts_v2: watch::channel(EscrowAccounts::default()).1, escrow_subgraph, network_subgraph, sender_aggregator_endpoints: sender_aggregator_endpoints.clone(), @@ -118,9 +119,11 @@ async fn test_start_tap_agent(pgpool: PgPool) { flush_messages(¬ify).await; // verify if create sender account - let actor_ref = ActorRef::::where_is(format!("{}", TAP_SENDER.1)); - - assert!(actor_ref.is_some()); + assert_while_retry!(ActorRef::::where_is(format!( + "legacy:{}", + TAP_SENDER.1 + )) + .is_none()); // Add batch receits to the database. const AMOUNT_OF_RECEIPTS: u64 = 3000; @@ -140,6 +143,15 @@ async fn test_start_tap_agent(pgpool: PgPool) { } let res = store_batch_receipts(&pgpool, receipts).await; assert!(res.is_ok()); + + assert_while_retry!({ + ActorRef::::where_is(format!( + "{}:{}", + TAP_SENDER.1, ALLOCATION_ID_0, + )) + .is_none() + }); + let sender_allocation_ref = ActorRef::::where_is(format!( "{}:{}", TAP_SENDER.1, ALLOCATION_ID_0, diff --git a/justfile b/justfile index 386417eb6..adb42400f 100644 --- a/justfile +++ b/justfile @@ -14,8 +14,16 @@ url: clippy: cargo +nightly clippy --all-targets --all-features +# run everything that is needed for ci to pass +ci: + just fmt + just clippy + just test + just sqlx-prepare + + test: - cargo nextest run + RUST_LOG=debug cargo nextest run fmt: cargo +nightly fmt