diff --git a/contrib/indexer-service/config.toml b/contrib/indexer-service/config.toml index fa2629022..ca4b45138 100644 --- a/contrib/indexer-service/config.toml +++ b/contrib/indexer-service/config.toml @@ -44,3 +44,6 @@ trigger_value_divisor = 500_000 [tap.sender_aggregator_endpoints] "ACCOUNT0_ADDRESS_PLACEHOLDER" = "http://tap-aggregator:7610" + +[horizon] +enabled = false diff --git a/contrib/indexer-service/start.sh b/contrib/indexer-service/start.sh index d32cb4ae2..5080bec22 100755 --- a/contrib/indexer-service/start.sh +++ b/contrib/indexer-service/start.sh @@ -19,14 +19,6 @@ until pg_isready -h postgres -U postgres -d indexer_components_1; do sleep 2 done -echo "Applying database migrations..." -# Get all the migration UP files and sort them by name -for migration_file in $(find /opt/migrations -name "*.up.sql" | sort); do - echo "Applying migration: $(basename $migration_file)" - psql -h postgres -U postgres postgres -f $migration_file -done -echo "Database migrations completed." - # Get network subgraph deployment ID NETWORK_DEPLOYMENT=$(curl -s "http://graph-node:8000/subgraphs/name/graph-network" \ -H 'content-type: application/json' \ diff --git a/contrib/tap-agent/start.sh b/contrib/tap-agent/start.sh index 3a5cad74c..245045a84 100755 --- a/contrib/tap-agent/start.sh +++ b/contrib/tap-agent/start.sh @@ -28,13 +28,6 @@ fi echo "Postgres is ready!" -echo "Ensuring database tables exist..." -for migration_file in $(find /opt/migrations -name "*.up.sql" | sort); do - echo "Applying migration if needed: $(basename $migration_file)" - psql -h postgres -U postgres indexer_components_1 -f $migration_file 2>/dev/null || true -done -echo "Database setup completed." - # Wait for indexer-service to be ready with timeout echo "Waiting for indexer-service to be ready..." MAX_ATTEMPTS=30 diff --git a/crates/config/default_values.toml b/crates/config/default_values.toml index 9c3387f0c..fe1a06455 100644 --- a/crates/config/default_values.toml +++ b/crates/config/default_values.toml @@ -27,3 +27,6 @@ trigger_value_divisor = 10 timestamp_buffer_secs = 60 request_timeout_secs = 5 max_receipts_per_request = 10000 + +[horizon] +enabled = false diff --git a/crates/config/maximal-config-example.toml b/crates/config/maximal-config-example.toml index 79637f0f1..b00c1e663 100644 --- a/crates/config/maximal-config-example.toml +++ b/crates/config/maximal-config-example.toml @@ -168,3 +168,6 @@ hardhat = "100" [dips.additional_networks] "eip155:1337" = "hardhat" + +[horizon] +enabled = false diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index f63b1daf1..d4409bcc8 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -42,6 +42,7 @@ pub struct Config { pub service: ServiceConfig, pub tap: TapConfig, pub dips: Option, + pub horizon: HorizonConfig, } // Newtype wrapping Config to be able use serde_ignored with Figment @@ -445,6 +446,15 @@ pub struct RavRequestConfig { pub max_receipts_per_request: u64, } +/// Configuration for the horizon +/// standard +#[derive(Debug, Default, Deserialize)] +#[cfg_attr(test, derive(PartialEq))] +pub struct HorizonConfig { + /// Whether the horizon is enabled or not + pub enabled: bool, +} + #[cfg(test)] mod tests { use std::{ diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index 4945738bd..9ebbf84b6 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -400,6 +400,9 @@ pub struct SenderAccountConfig { /// Senders that are allowed to spend up to `max_amount_willing_to_lose_grt` /// over the escrow balance pub trusted_senders: HashSet
, + + #[doc(hidden)] + pub horizon_enabled: bool, } impl SenderAccountConfig { @@ -415,6 +418,7 @@ impl SenderAccountConfig { rav_request_timeout: config.tap.rav_request.request_timeout_secs, tap_sender_timeout: config.tap.sender_timeout_secs, trusted_senders: config.tap.trusted_senders.clone(), + horizon_enabled: config.horizon.enabled, } } } @@ -623,6 +627,13 @@ impl State { sender_balance = self.sender_balance.to_u128(), "Denying sender." ); + // Check if this is horizon like sender and if it is actually enable, + // otherwise just ignore. + // FIXME: This should be removed once full horizon support + // is implemented! + if matches!(self.sender_type, SenderType::Horizon) && !self.config.horizon_enabled { + return; + } SenderAccount::deny_sender(self.sender_type, &self.pgpool, self.sender).await; self.denied = true; @@ -654,16 +665,18 @@ impl State { .expect("Should not fail to delete from denylist"); } SenderType::Horizon => { - sqlx::query!( - r#" + if self.config.horizon_enabled { + sqlx::query!( + r#" DELETE FROM tap_horizon_denylist WHERE sender_address = $1 "#, - self.sender.encode_hex(), - ) - .execute(&self.pgpool) - .await - .expect("Should not fail to delete from denylist"); + self.sender.encode_hex(), + ) + .execute(&self.pgpool) + .await + .expect("Should not fail to delete from horizon denylist"); + } } } self.denied = false; @@ -798,20 +811,26 @@ impl Actor for SenderAccount { .map(|record| (record.allocation_id, record.value_aggregate)) .collect(), // Get all ravs from v2 table - SenderType::Horizon => sqlx::query!( - r#" + SenderType::Horizon => { + if config.horizon_enabled { + sqlx::query!( + r#" SELECT allocation_id, value_aggregate FROM tap_horizon_ravs WHERE payer = $1 AND last AND NOT final; "#, - sender_id.encode_hex(), - ) - .fetch_all(&pgpool) - .await - .expect("Should not fail to fetch from scalar_tap_ravs") - .into_iter() - .map(|record| (record.allocation_id, record.value_aggregate)) - .collect(), + sender_id.encode_hex(), + ) + .fetch_all(&pgpool) + .await + .expect("Should not fail to fetch from \"horizon\" scalar_tap_ravs") + .into_iter() + .map(|record| (record.allocation_id, record.value_aggregate)) + .collect() + } else { + vec![] + } + } }; // get a list from the subgraph of which subgraphs were already redeemed and were not marked as final @@ -845,7 +864,11 @@ impl Actor for SenderAccount { // TODO Implement query for unfinalized v2 transactions // Depends on Escrow Subgraph Schema SenderType::Horizon => { - todo!() + if config.horizon_enabled { + todo!("Implement query for unfinalized v2 transactions, It depends on Escrow Subgraph Schema") + } + // if we have any problems, we don't want to filter out + vec![] } }; @@ -896,20 +919,28 @@ impl Actor for SenderAccount { .denied .expect("Deny status cannot be null"), // Get deny status from the tap horizon table - SenderType::Horizon => sqlx::query!( - r#" + SenderType::Horizon => { + if config.horizon_enabled { + sqlx::query!( + r#" SELECT EXISTS ( SELECT 1 FROM tap_horizon_denylist WHERE sender_address = $1 ) as denied "#, - sender_id.encode_hex(), - ) - .fetch_one(&pgpool) - .await? - .denied - .expect("Deny status cannot be null"), + sender_id.encode_hex(), + ) + .fetch_one(&pgpool) + .await? + .denied + .expect("Deny status cannot be null") + } else { + // If horizon is enabled, + // just ignore this sender + false + } + } }; let sender_balance = escrow_accounts @@ -1419,7 +1450,7 @@ impl SenderAccount { ) .execute(pool) .await - .expect("Should not fail to insert into denylist"); + .expect("Should not fail to insert into \"horizon\" denylist"); } } diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index 894ae9f5d..1ad28617e 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -264,12 +264,17 @@ impl Actor for SenderAccountsManager { .await; // v2 - let sender_allocation_v2 = select! { - sender_allocation = state.get_pending_sender_allocation_id_v2() => sender_allocation, - _ = tokio::time::sleep(state.config.tap_sender_timeout) => { - panic!("Timeout while getting pending sender allocation ids"); + let sender_allocation_v2 = if state.config.horizon_enabled { + select! { + sender_allocation = state.get_pending_sender_allocation_id_v2() => sender_allocation, + _ = tokio::time::sleep(state.config.tap_sender_timeout) => { + panic!("Timeout while getting pending sender allocation ids"); + } } + } else { + HashMap::new() }; + state.sender_ids_v2.extend(sender_allocation_v2.keys()); stream::iter(sender_allocation_v2) .map(|(sender_id, allocation_ids)| { @@ -298,19 +303,24 @@ impl Actor for SenderAccountsManager { // Start the new_receipts_watcher task that will consume from the `pglistener` // after starting all senders - state.new_receipts_watcher_handle_v2 = Some(tokio::spawn( - new_receipts_watcher() - .actor_cell(myself.get_cell()) - .pglistener(pglistener_v2) - .escrow_accounts_rx(escrow_accounts_v2) - .sender_type(SenderType::Horizon) - .maybe_prefix(prefix) - .call(), - )); + state.new_receipts_watcher_handle_v2 = None; + + if state.config.horizon_enabled { + state.new_receipts_watcher_handle_v2 = Some(tokio::spawn( + new_receipts_watcher() + .actor_cell(myself.get_cell()) + .pglistener(pglistener_v2) + .escrow_accounts_rx(escrow_accounts_v2) + .sender_type(SenderType::Horizon) + .maybe_prefix(prefix) + .call(), + )); + } tracing::info!("SenderAccountManager created!"); Ok(state) } + async fn post_stop( &self, _: ActorRef, diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index e313f0990..b936ada98 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -91,6 +91,7 @@ pub fn get_sender_account_config() -> &'static SenderAccountConfig { escrow_polling_interval: ESCROW_POLLING_INTERVAL, tap_sender_timeout: Duration::from_secs(63), trusted_senders: HashSet::new(), + horizon_enabled: true, })) } @@ -127,6 +128,7 @@ pub async fn create_sender_account( escrow_polling_interval: Duration::default(), tap_sender_timeout: TAP_SENDER_TIMEOUT, trusted_senders, + horizon_enabled: false, })); let network_subgraph = Box::leak(Box::new( diff --git a/crates/tap-agent/tests/tap_agent_test.rs b/crates/tap-agent/tests/tap_agent_test.rs index c0913d464..4a48f5cc1 100644 --- a/crates/tap-agent/tests/tap_agent_test.rs +++ b/crates/tap-agent/tests/tap_agent_test.rs @@ -92,6 +92,7 @@ pub async fn start_agent( escrow_polling_interval: Duration::from_secs(10), tap_sender_timeout: Duration::from_secs(30), trusted_senders: HashSet::new(), + horizon_enabled: false, })); let args = SenderAccountsManagerArgs { diff --git a/integration-tests/src/main.rs b/integration-tests/src/main.rs index 4cf6fd04b..a1c445ecc 100644 --- a/integration-tests/src/main.rs +++ b/integration-tests/src/main.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use rav_tests::tap_rav_test; +use rav_tests::test_tap_rav_v1; mod metrics; mod rav_tests; @@ -13,5 +13,5 @@ use receipt::create_tap_receipt; #[tokio::main] async fn main() -> Result<()> { // Run the TAP receipt test - tap_rav_test().await + test_tap_rav_v1().await } diff --git a/integration-tests/src/rav_tests.rs b/integration-tests/src/rav_tests.rs index d7f2ce3ef..a97b31f0a 100644 --- a/integration-tests/src/rav_tests.rs +++ b/integration-tests/src/rav_tests.rs @@ -52,7 +52,8 @@ const NUM_RECEIPTS: u32 = 3; const BATCHES: u32 = 2; const MAX_TRIGGERS: usize = 100; -pub async fn tap_rav_test() -> Result<()> { +// Function to test the tap RAV generation +pub async fn test_tap_rav_v1() -> Result<()> { // Setup wallet using your MnemonicBuilder let index: u32 = 0; let wallet: PrivateKeySigner = MnemonicBuilder::::default() @@ -62,9 +63,6 @@ pub async fn tap_rav_test() -> Result<()> { .build() .unwrap(); - let sender_address = wallet.address(); - println!("Using sender address: {}", sender_address); - // Setup HTTP client let http_client = Arc::new(Client::new()); @@ -87,7 +85,6 @@ pub async fn tap_rav_test() -> Result<()> { // Try to find a valid allocation let response_text = response.text().await?; - println!("Network subgraph response: {}", response_text); let json_value = serde_json::from_str::(&response_text)?; let allocation_id = json_value