diff --git a/Cargo.lock b/Cargo.lock index 0c8b4b81c..2f92c43bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1291,6 +1291,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33415e24172c1b7d6066f6d999545375ab8e1d95421d6784bdfff9496f292387" dependencies = [ "bitcoin_hashes", + "rand 0.7.3", + "rand_core 0.5.1", "serde", "unicode-normalization", ] diff --git a/common/src/subgraph_client/client.rs b/common/src/subgraph_client/client.rs index b83f5df45..8dc52c3eb 100644 --- a/common/src/subgraph_client/client.rs +++ b/common/src/subgraph_client/client.rs @@ -88,12 +88,23 @@ impl DeploymentDetails { graph_node_status_url: &str, graph_node_base_url: &str, deployment: DeploymentId, + ) -> Result { + Self::for_graph_node_url( + Url::parse(graph_node_status_url)?, + Url::parse(graph_node_base_url)?, + deployment, + ) + } + + pub fn for_graph_node_url( + graph_node_status_url: Url, + graph_node_base_url: Url, + deployment: DeploymentId, ) -> Result { Ok(Self { deployment: Some(deployment), - status_url: Some(Url::parse(graph_node_status_url)?), - query_url: Url::parse(graph_node_base_url)? - .join(&format!("subgraphs/id/{deployment}"))?, + status_url: Some(graph_node_status_url), + query_url: graph_node_base_url.join(&format!("subgraphs/id/{deployment}"))?, query_auth_token: None, }) } @@ -111,12 +122,19 @@ impl DeploymentDetails { query_url: &str, query_auth_token: Option, ) -> Result { - Ok(Self { + Ok(Self::for_query_url_with_token_url( + Url::parse(query_url)?, + query_auth_token, + )) + } + + pub fn for_query_url_with_token_url(query_url: Url, query_auth_token: Option) -> Self { + Self { deployment: None, status_url: None, - query_url: Url::parse(query_url)?, + query_url, query_auth_token, - }) + } } } diff --git a/config/Cargo.toml b/config/Cargo.toml index 8341b7fa9..3c6026482 100644 --- a/config/Cargo.toml +++ b/config/Cargo.toml @@ -9,7 +9,7 @@ serde.workspace = true thegraph-core.workspace = true tracing.workspace = true bigdecimal = { workspace = true, features = ["serde"] } -bip39 = "2.0.0" +bip39 = { version = "2.0.0", features = ["rand"] } figment = { version = "0.10.19", features = ["env", "toml"] } serde_with = { version = "3.8.1", default-features = false } serde_repr = "0.1.19" diff --git a/config/src/config.rs b/config/src/config.rs index 8235d0030..c64b58bc5 100644 --- a/config/src/config.rs +++ b/config/src/config.rs @@ -209,7 +209,7 @@ pub struct IndexerConfig { pub operator_mnemonic: Mnemonic, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] #[cfg_attr(test, derive(PartialEq))] #[serde(untagged)] #[serde(deny_unknown_fields)] @@ -300,7 +300,7 @@ pub struct SubgraphConfig { pub syncing_interval_secs: Duration, } -#[derive(Debug, Deserialize_repr, Clone)] +#[derive(Debug, Deserialize_repr, Clone, Copy)] #[cfg_attr(test, derive(PartialEq))] #[repr(u64)] pub enum TheGraphChainId { diff --git a/config/src/grt.rs b/config/src/grt.rs index 25994d2a8..eb189f64e 100644 --- a/config/src/grt.rs +++ b/config/src/grt.rs @@ -4,10 +4,18 @@ use bigdecimal::{BigDecimal, ToPrimitive}; use serde::{de::Error, Deserialize}; -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Default)] pub struct NonZeroGRT(u128); impl NonZeroGRT { + pub fn new(value: u128) -> Result { + if value == 0 { + Err("GRT value cannot be represented as a u128 GRT wei value".into()) + } else { + Ok(NonZeroGRT(value)) + } + } + pub fn get_value(&self) -> u128 { self.0 } diff --git a/config/src/lib.rs b/config/src/lib.rs index 67e163dc2..0e645cfc2 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -4,5 +4,6 @@ mod config; mod grt; +pub use bip39::Mnemonic; pub use config::*; pub use grt::*; diff --git a/service/src/config.rs b/service/src/config.rs index b904bd40d..b92ccbf1d 100644 --- a/service/src/config.rs +++ b/service/src/config.rs @@ -69,7 +69,7 @@ impl From for Config { recently_closed_allocation_buffer_seconds: 0, }, graph_network: GraphNetworkConfig { - chain_id: value.blockchain.chain_id.clone() as u64, + chain_id: value.blockchain.chain_id as u64, }, tap: TapConfig { chain_id: value.blockchain.chain_id as u64, diff --git a/tap-agent/src/agent.rs b/tap-agent/src/agent.rs index d79d02b04..038cf5f52 100644 --- a/tap-agent/src/agent.rs +++ b/tap-agent/src/agent.rs @@ -1,20 +1,20 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::time::Duration; - use indexer_common::prelude::{ escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient, }; +use indexer_config::{ + Config, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, NetworkSubgraphConfig, + SubgraphConfig, SubgraphsConfig, TapConfig, +}; use ractor::concurrency::JoinHandle; use ractor::{Actor, ActorRef}; +use sender_account::SenderAccountConfig; use crate::agent::sender_accounts_manager::{ SenderAccountsManagerArgs, SenderAccountsManagerMessage, }; -use crate::config::{ - Config, EscrowSubgraph, Ethereum, IndexerInfrastructure, NetworkSubgraph, Tap, -}; use crate::{database, CONFIG, EIP_712_DOMAIN}; use sender_accounts_manager::SenderAccountsManager; @@ -25,95 +25,105 @@ pub mod unaggregated_receipts; pub async fn start_agent() -> (ActorRef, JoinHandle<()>) { let Config { - ethereum: Ethereum { indexer_address }, - indexer_infrastructure: - IndexerInfrastructure { - graph_node_query_endpoint, - graph_node_status_endpoint, - .. - }, - postgres, - network_subgraph: - NetworkSubgraph { - network_subgraph_deployment, - network_subgraph_endpoint, - network_subgraph_auth_token, - allocation_syncing_interval_ms, - recently_closed_allocation_buffer_seconds, + indexer: IndexerConfig { + indexer_address, .. + }, + graph_node: + GraphNodeConfig { + status_url: graph_node_status_endpoint, + query_url: graph_node_query_endpoint, }, - escrow_subgraph: - EscrowSubgraph { - escrow_subgraph_deployment, - escrow_subgraph_endpoint, - escrow_subgraph_auth_token, - escrow_syncing_interval_ms, + database, + subgraphs: + SubgraphsConfig { + network: + NetworkSubgraphConfig { + config: + SubgraphConfig { + query_url: network_query_url, + query_auth_token: network_query_auth_token, + deployment_id: network_deployment_id, + syncing_interval_secs: network_sync_interval, + }, + recently_closed_allocation_buffer_secs: recently_closed_allocation_buffer, + }, + escrow: + EscrowSubgraphConfig { + config: + SubgraphConfig { + query_url: escrow_query_url, + query_auth_token: escrow_query_auth_token, + deployment_id: escrow_deployment_id, + syncing_interval_secs: escrow_sync_interval, + }, + }, }, tap: - Tap { + TapConfig { // TODO: replace with a proper implementation once the gateway registry contract is ready sender_aggregator_endpoints, .. }, .. } = &*CONFIG; - let pgpool = database::connect(postgres).await; + let pgpool = database::connect(database.clone()).await; let http_client = reqwest::Client::new(); let network_subgraph = Box::leak(Box::new(SubgraphClient::new( http_client.clone(), - network_subgraph_deployment + network_deployment_id .map(|deployment| { - DeploymentDetails::for_graph_node( - graph_node_status_endpoint, - graph_node_query_endpoint, + DeploymentDetails::for_graph_node_url( + graph_node_status_endpoint.clone(), + graph_node_query_endpoint.clone(), deployment, ) }) .transpose() .expect("Failed to parse graph node query endpoint and network subgraph deployment"), - DeploymentDetails::for_query_url_with_token( - network_subgraph_endpoint, - network_subgraph_auth_token.clone(), - ) - .expect("Failed to parse network subgraph endpoint"), + DeploymentDetails::for_query_url_with_token_url( + network_query_url.clone(), + network_query_auth_token.clone(), + ), ))); let indexer_allocations = indexer_allocations( network_subgraph, *indexer_address, - Duration::from_millis(*allocation_syncing_interval_ms), - Duration::from_secs(*recently_closed_allocation_buffer_seconds), + *network_sync_interval, + *recently_closed_allocation_buffer, ); let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( http_client.clone(), - escrow_subgraph_deployment + escrow_deployment_id .map(|deployment| { - DeploymentDetails::for_graph_node( - graph_node_status_endpoint, - graph_node_query_endpoint, + DeploymentDetails::for_graph_node_url( + graph_node_status_endpoint.clone(), + graph_node_query_endpoint.clone(), deployment, ) }) .transpose() .expect("Failed to parse graph node query endpoint and escrow subgraph deployment"), - DeploymentDetails::for_query_url_with_token( - escrow_subgraph_endpoint, - escrow_subgraph_auth_token.clone(), - ) - .expect("Failed to parse escrow subgraph endpoint"), + DeploymentDetails::for_query_url_with_token_url( + escrow_query_url.clone(), + escrow_query_auth_token.clone(), + ), ))); let escrow_accounts = escrow_accounts( escrow_subgraph, *indexer_address, - Duration::from_millis(*escrow_syncing_interval_ms), + *escrow_sync_interval, false, ); + let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG))); + let args = SenderAccountsManagerArgs { - config: &CONFIG, + config, domain_separator: EIP_712_DOMAIN.clone(), pgpool, indexer_allocations, diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 596389ad4..3115eb222 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -10,6 +10,7 @@ use bigdecimal::ToPrimitive; use graphql_client::GraphQLQuery; use jsonrpsee::http_client::HttpClientBuilder; use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; +use reqwest::Url; use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::time::Duration; @@ -28,14 +29,11 @@ use tracing::{error, Level}; use super::sender_allocation::{SenderAllocation, SenderAllocationArgs}; use crate::adaptative_concurrency::AdaptiveLimiter; -use crate::agent::sender_allocation::SenderAllocationMessage; +use crate::agent::sender_allocation::{AllocationConfig, SenderAllocationMessage}; use crate::agent::unaggregated_receipts::UnaggregatedReceipts; use crate::backoff::BackoffInfo; +use crate::tap::escrow_adapter::EscrowAdapter; use crate::tracker::{SenderFeeTracker, SimpleFeeTracker}; -use crate::{ - config::{self}, - tap::escrow_adapter::EscrowAdapter, -}; use lazy_static::lazy_static; lazy_static! { @@ -120,14 +118,15 @@ pub enum SenderAccountMessage { pub struct SenderAccount; pub struct SenderAccountArgs { - pub config: &'static config::Config, + pub config: &'static SenderAccountConfig, + pub pgpool: PgPool, pub sender_id: Address, pub escrow_accounts: Eventual, pub indexer_allocations: Receiver>, pub escrow_subgraph: &'static SubgraphClient, pub domain_separator: Eip712Domain, - pub sender_aggregator_endpoint: String, + pub sender_aggregator_endpoint: Url, pub allocation_ids: HashSet
, pub prefix: Option, @@ -159,12 +158,40 @@ pub struct State { escrow_subgraph: &'static SubgraphClient, escrow_adapter: EscrowAdapter, domain_separator: Eip712Domain, - config: &'static config::Config, pgpool: PgPool, sender_aggregator: jsonrpsee::http_client::HttpClient, // Backoff info backoff_info: BackoffInfo, + + // Config + config: &'static SenderAccountConfig, +} + +pub struct SenderAccountConfig { + pub rav_request_buffer: Duration, + pub max_amount_willing_to_lose_grt: u128, + pub trigger_value: u128, + + // allocation config + pub rav_request_timeout: Duration, + pub rav_request_receipt_limit: u64, + pub indexer_address: Address, + pub escrow_polling_interval: Duration, +} + +impl SenderAccountConfig { + pub fn from_config(config: &indexer_config::Config) -> Self { + Self { + rav_request_buffer: config.tap.rav_request.timestamp_buffer_secs, + rav_request_receipt_limit: config.tap.rav_request.max_receipts_per_request, + indexer_address: config.indexer.indexer_address, + escrow_polling_interval: config.subgraphs.escrow.config.syncing_interval_secs, + max_amount_willing_to_lose_grt: config.tap.max_amount_willing_to_lose_grt.get_value(), + trigger_value: config.tap.get_trigger_value(), + rav_request_timeout: config.tap.rav_request.request_timeout_secs, + } + } } impl State { @@ -179,7 +206,6 @@ impl State { "SenderAccount is creating allocation." ); let args = SenderAllocationArgs { - config: self.config, pgpool: self.pgpool.clone(), allocation_id, sender: self.sender, @@ -189,6 +215,7 @@ impl State { domain_separator: self.domain_separator.clone(), sender_account_ref: sender_account_ref.clone(), sender_aggregator: self.sender_aggregator.clone(), + config: AllocationConfig::from_sender_config(self.config), }; SenderAllocation::spawn_linked( @@ -305,10 +332,10 @@ impl State { let unaggregated_fees = self.sender_fee_tracker.get_total_fee(); let pending_fees_over_balance = U256::from(pending_ravs + unaggregated_fees) >= self.sender_balance; - let max_unaggregated_fees = self.config.tap.max_unnaggregated_fees_per_sender; + let max_amount_willing_to_lose = self.config.max_amount_willing_to_lose_grt; let invalid_receipt_fees = self.invalid_receipts_tracker.get_total_fee(); let total_fee_over_max_value = - unaggregated_fees + invalid_receipt_fees >= max_unaggregated_fees; + unaggregated_fees + invalid_receipt_fees >= max_amount_willing_to_lose; tracing::trace!( %pending_fees_over_balance, @@ -324,7 +351,7 @@ impl State { tracing::warn!( fee_tracker = self.sender_fee_tracker.get_total_fee(), rav_tracker = self.rav_tracker.get_total_fee(), - max_fee_per_sender = self.config.tap.max_unnaggregated_fees_per_sender, + max_amount_willing_to_lose = self.config.max_amount_willing_to_lose_grt, sender_balance = self.sender_balance.to_u128(), "Denying sender." ); @@ -341,7 +368,7 @@ impl State { tracing::info!( fee_tracker = self.sender_fee_tracker.get_total_fee(), rav_tracker = self.rav_tracker.get_total_fee(), - max_fee_per_sender = self.config.tap.max_unnaggregated_fees_per_sender, + max_amount_willing_to_lose = self.config.max_amount_willing_to_lose_grt, sender_balance = self.sender_balance.to_u128(), "Allowing sender." ); @@ -519,40 +546,38 @@ impl Actor for SenderAccount { MAX_FEE_PER_SENDER .with_label_values(&[&sender_id.to_string()]) - .set(config.tap.max_unnaggregated_fees_per_sender as f64); + .set(config.max_amount_willing_to_lose_grt as f64); RAV_REQUEST_TRIGGER_VALUE .with_label_values(&[&sender_id.to_string()]) - .set(config.tap.rav_request_trigger_value as f64); + .set(config.trigger_value as f64); let sender_aggregator = HttpClientBuilder::default() - .request_timeout(Duration::from_secs(config.tap.rav_request_timeout_secs)) + .request_timeout(config.rav_request_timeout) .build(&sender_aggregator_endpoint)?; let state = State { - sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis( - config.tap.rav_request_timestamp_buffer_ms, - )), + prefix, + sender_fee_tracker: SenderFeeTracker::new(config.rav_request_buffer), rav_tracker: SimpleFeeTracker::default(), invalid_receipts_tracker: SimpleFeeTracker::default(), allocation_ids: allocation_ids.clone(), _indexer_allocations_handle, _escrow_account_monitor, - prefix, - escrow_accounts, - escrow_subgraph, - escrow_adapter, - domain_separator, - sender_aggregator, - config, - pgpool, + scheduled_rav_request: None, sender: sender_id, denied, sender_balance, retry_interval, - scheduled_rav_request: None, adaptive_limiter: AdaptiveLimiter::new(INITIAL_RAV_REQUEST_CONCURRENT, 1..50), + escrow_accounts, + escrow_subgraph, + escrow_adapter, + domain_separator, + pgpool, + sender_aggregator, backoff_info: BackoffInfo::default(), + config, }; for allocation_id in &allocation_ids { @@ -663,21 +688,21 @@ impl Actor for SenderAccount { .get_count_outside_buffer_for_allocation(&allocation_id); let can_trigger_rav = state.sender_fee_tracker.can_trigger_rav(allocation_id); let counter_greater_receipt_limit = total_counter_for_allocation - >= state.config.tap.rav_request_receipt_limit + >= state.config.rav_request_receipt_limit && can_trigger_rav; let rav_result = if !state.backoff_info.in_backoff() - && total_fee_outside_buffer >= state.config.tap.rav_request_trigger_value + && total_fee_outside_buffer >= state.config.trigger_value { tracing::debug!( total_fee_outside_buffer, - trigger_value = state.config.tap.rav_request_trigger_value, + trigger_value = state.config.trigger_value, "Total fee greater than the trigger value. Triggering RAV request" ); state.rav_request_for_heaviest_allocation().await } else if counter_greater_receipt_limit { tracing::debug!( total_counter_for_allocation, - rav_request_receipt_limit = state.config.tap.rav_request_receipt_limit, + rav_request_receipt_limit = state.config.rav_request_receipt_limit, %allocation_id, "Total counter greater than the receipt limit per rav. Triggering RAV request" ); @@ -924,7 +949,6 @@ pub mod tests { use crate::agent::sender_accounts_manager::NewReceiptNotification; use crate::agent::sender_allocation::SenderAllocationMessage; use crate::agent::unaggregated_receipts::UnaggregatedReceipts; - use crate::config; use crate::tap::test_utils::{ create_rav, store_rav_with_options, ALLOCATION_ID_0, ALLOCATION_ID_1, INDEXER, SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, @@ -936,6 +960,7 @@ pub mod tests { use indexer_common::prelude::{DeploymentDetails, SubgraphClient}; use ractor::concurrency::JoinHandle; use ractor::{call, Actor, ActorProcessingErr, ActorRef, ActorStatus}; + use reqwest::Url; use serde_json::json; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; @@ -999,7 +1024,7 @@ pub mod tests { pgpool: PgPool, initial_allocation: HashSet
, rav_request_trigger_value: u128, - max_unnaggregated_fees_per_sender: u128, + max_amount_willing_to_lose_grt: u128, escrow_subgraph_endpoint: &str, rav_request_receipt_limit: u64, ) -> ( @@ -1008,20 +1033,14 @@ pub mod tests { String, EventualWriter, ) { - let config = Box::leak(Box::new(config::Config { - config: None, - ethereum: config::Ethereum { - indexer_address: INDEXER.1, - }, - tap: config::Tap { - rav_request_trigger_value, - rav_request_timestamp_buffer_ms: BUFFER_MS, - rav_request_timeout_secs: 5, - max_unnaggregated_fees_per_sender, - rav_request_receipt_limit, - ..Default::default() - }, - ..Default::default() + let config = Box::leak(Box::new(super::SenderAccountConfig { + rav_request_buffer: Duration::from_millis(BUFFER_MS), + max_amount_willing_to_lose_grt, + trigger_value: rav_request_trigger_value, + rav_request_timeout: Duration::default(), + rav_request_receipt_limit, + indexer_address: INDEXER.1, + escrow_polling_interval: Duration::default(), })); let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( @@ -1049,7 +1068,7 @@ pub mod tests { indexer_allocations: watch::channel(initial_allocation).1, escrow_subgraph, domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), - sender_aggregator_endpoint: DUMMY_URL.to_string(), + sender_aggregator_endpoint: Url::parse(DUMMY_URL).unwrap(), allocation_ids: HashSet::new(), prefix: Some(prefix.clone()), retry_interval: Duration::from_millis(10), diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index aff314ec7..7bb4bafe0 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -15,6 +15,7 @@ use eventuals::{Eventual, EventualExt, PipeHandle}; use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{Allocation, SubgraphClient}; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; +use reqwest::Url; use serde::Deserialize; use sqlx::{postgres::PgListener, PgPool}; use tokio::select; @@ -23,8 +24,9 @@ use tracing::{error, warn}; use prometheus::{register_counter_vec, CounterVec}; -use super::sender_account::{SenderAccount, SenderAccountArgs, SenderAccountMessage}; -use crate::config; +use super::sender_account::{ + SenderAccount, SenderAccountArgs, SenderAccountConfig, SenderAccountMessage, +}; lazy_static! { static ref RECEIPTS_CREATED: CounterVec = register_counter_vec!( @@ -52,14 +54,14 @@ pub enum SenderAccountsManagerMessage { } pub struct SenderAccountsManagerArgs { - pub config: &'static config::Config, + pub config: &'static SenderAccountConfig, pub domain_separator: Eip712Domain, pub pgpool: PgPool, pub indexer_allocations: Receiver>, pub escrow_accounts: Eventual, pub escrow_subgraph: &'static SubgraphClient, - pub sender_aggregator_endpoints: HashMap, + pub sender_aggregator_endpoints: HashMap, pub prefix: Option, } @@ -69,13 +71,13 @@ pub struct State { new_receipts_watcher_handle: Option>, _eligible_allocations_senders_pipe: PipeHandle, - config: &'static config::Config, + config: &'static SenderAccountConfig, domain_separator: Eip712Domain, pgpool: PgPool, indexer_allocations: Receiver>, escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, - sender_aggregator_endpoints: HashMap, + sender_aggregator_endpoints: HashMap, prefix: Option, } @@ -591,10 +593,9 @@ mod tests { SenderAccountsManagerMessage, State, }; use crate::agent::sender_account::tests::{MockSenderAllocation, PREFIX_ID}; - use crate::agent::sender_account::SenderAccountMessage; + use crate::agent::sender_account::{SenderAccountConfig, SenderAccountMessage}; use crate::agent::sender_accounts_manager::{handle_notification, NewReceiptNotification}; use crate::agent::sender_allocation::tests::MockSenderAccount; - use crate::config; use crate::tap::test_utils::{ create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID_0, ALLOCATION_ID_1, INDEXER, SENDER, SENDER_2, SENDER_3, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, @@ -605,6 +606,7 @@ mod tests { use indexer_common::prelude::{DeploymentDetails, SubgraphClient}; use ractor::concurrency::JoinHandle; use ractor::{Actor, ActorProcessingErr, ActorRef}; + use reqwest::Url; use ruint::aliases::U256; use sqlx::postgres::PgListener; use sqlx::PgPool; @@ -622,18 +624,15 @@ mod tests { ))) } - fn get_config() -> &'static config::Config { - Box::leak(Box::new(config::Config { - config: None, - ethereum: config::Ethereum { - indexer_address: INDEXER.1, - }, - tap: config::Tap { - rav_request_trigger_value: 100, - rav_request_timestamp_buffer_ms: 1, - ..Default::default() - }, - ..Default::default() + fn get_config() -> &'static super::SenderAccountConfig { + Box::leak(Box::new(SenderAccountConfig { + rav_request_buffer: Duration::from_millis(1), + max_amount_willing_to_lose_grt: 0, + trigger_value: 100, + rav_request_timeout: Duration::from_millis(1), + rav_request_receipt_limit: 1000, + indexer_address: INDEXER.1, + escrow_polling_interval: Duration::default(), })) } @@ -664,8 +663,8 @@ mod tests { escrow_accounts: escrow_accounts_eventual, escrow_subgraph, sender_aggregator_endpoints: HashMap::from([ - (SENDER.1, String::from("http://localhost:8000")), - (SENDER_2.1, String::from("http://localhost:8000")), + (SENDER.1, Url::parse("http://localhost:8000").unwrap()), + (SENDER_2.1, Url::parse("http://localhost:8000").unwrap()), ]), prefix: Some(prefix.clone()), }; @@ -707,8 +706,8 @@ mod tests { escrow_accounts: Eventual::from_value(escrow_accounts), escrow_subgraph: get_subgraph_client(), sender_aggregator_endpoints: HashMap::from([ - (SENDER.1, String::from("http://localhost:8000")), - (SENDER_2.1, String::from("http://localhost:8000")), + (SENDER.1, Url::parse("http://localhost:8000").unwrap()), + (SENDER_2.1, Url::parse("http://localhost:8000").unwrap()), ]), prefix: Some(prefix), }, diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index b7f6ab323..b44a896f3 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -35,13 +35,14 @@ use crate::agent::sender_account::SenderAccountMessage; use crate::agent::sender_accounts_manager::NewReceiptNotification; use crate::agent::unaggregated_receipts::UnaggregatedReceipts; use crate::{ - config::{self}, tap::context::{checks::Signature, TapAgentContext}, tap::signers_trimmed, tap::{context::checks::AllocationId, escrow_adapter::EscrowAdapter}, }; use thiserror::Error; +use super::sender_account::SenderAccountConfig; + lazy_static! { static ref CLOSED_SENDER_ALLOCATIONS: CounterVec = register_counter_vec!( "tap_closed_sender_allocation_total", @@ -100,16 +101,37 @@ pub struct SenderAllocationState { tap_manager: TapManager, allocation_id: Address, sender: Address, - config: &'static config::Config, escrow_accounts: Eventual, domain_separator: Eip712Domain, sender_account_ref: ActorRef, sender_aggregator: jsonrpsee::http_client::HttpClient, + + //config + timestamp_buffer_ns: u64, + rav_request_receipt_limit: u64, +} + +#[derive(Clone)] +pub struct AllocationConfig { + pub timestamp_buffer_ns: u64, + pub rav_request_receipt_limit: u64, + pub indexer_address: Address, + pub escrow_polling_interval: Duration, +} + +impl AllocationConfig { + pub fn from_sender_config(config: &SenderAccountConfig) -> Self { + Self { + timestamp_buffer_ns: config.rav_request_buffer.as_nanos() as u64, + rav_request_receipt_limit: config.rav_request_receipt_limit, + indexer_address: config.indexer_address, + escrow_polling_interval: config.escrow_polling_interval, + } + } } pub struct SenderAllocationArgs { - pub config: &'static config::Config, pub pgpool: PgPool, pub allocation_id: Address, pub sender: Address, @@ -119,6 +141,9 @@ pub struct SenderAllocationArgs { pub domain_separator: Eip712Domain, pub sender_account_ref: ActorRef, pub sender_aggregator: jsonrpsee::http_client::HttpClient, + + //config + pub config: AllocationConfig, } #[derive(Debug)] @@ -298,7 +323,6 @@ impl Actor for SenderAllocation { impl SenderAllocationState { async fn new( SenderAllocationArgs { - config, pgpool, allocation_id, sender, @@ -308,14 +332,16 @@ impl SenderAllocationState { domain_separator, sender_account_ref, sender_aggregator, + config, }: SenderAllocationArgs, ) -> anyhow::Result { let required_checks: Vec> = vec![ Arc::new(AllocationId::new( + config.indexer_address, + config.escrow_polling_interval, sender, allocation_id, escrow_subgraph, - config, )), Arc::new(Signature::new( domain_separator.clone(), @@ -341,14 +367,16 @@ impl SenderAllocationState { tap_manager, allocation_id, sender, - config, escrow_accounts, domain_separator, + sender_account_ref: sender_account_ref.clone(), unaggregated_fees: UnaggregatedReceipts::default(), invalid_receipts_fees: UnaggregatedReceipts::default(), latest_rav, sender_aggregator, + rav_request_receipt_limit: config.rav_request_receipt_limit, + timestamp_buffer_ns: config.timestamp_buffer_ns, }) } @@ -493,8 +521,8 @@ impl SenderAllocationState { } = self .tap_manager .create_rav_request( - self.config.tap.rav_request_timestamp_buffer_ms * 1_000_000, - Some(self.config.tap.rav_request_receipt_limit), + self.timestamp_buffer_ns, + Some(self.rav_request_receipt_limit), ) .await?; match ( @@ -825,7 +853,6 @@ pub mod tests { sender_accounts_manager::NewReceiptNotification, unaggregated_receipts::UnaggregatedReceipts, }, - config, tap::{ escrow_adapter::EscrowAdapter, test_utils::{ @@ -851,7 +878,7 @@ pub mod tests { use std::{ collections::HashMap, sync::Arc, - time::{SystemTime, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use tap_aggregator::{jsonrpsee_helpers::JsonRpcResponse, server::run_server}; use tap_core::receipt::{ @@ -921,21 +948,6 @@ pub mod tests { escrow_subgraph_endpoint: &str, sender_account: Option>, ) -> SenderAllocationArgs { - let config = Box::leak(Box::new(config::Config { - config: None, - ethereum: config::Ethereum { - indexer_address: INDEXER.1, - }, - tap: config::Tap { - rav_request_trigger_value: 100, - rav_request_timestamp_buffer_ms: 1, - rav_request_timeout_secs: 5, - rav_request_receipt_limit: 1000, - ..Default::default() - }, - ..Default::default() - })); - let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( reqwest::Client::new(), None, @@ -958,7 +970,6 @@ pub mod tests { .build(&sender_aggregator_endpoint) .unwrap(); SenderAllocationArgs { - config, pgpool: pgpool.clone(), allocation_id: *ALLOCATION_ID_0, sender: SENDER.1, @@ -968,6 +979,12 @@ pub mod tests { domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), sender_account_ref, sender_aggregator, + config: super::AllocationConfig { + timestamp_buffer_ns: 1, + rav_request_receipt_limit: 1000, + indexer_address: INDEXER.1, + escrow_polling_interval: Duration::from_millis(1000), + }, } } diff --git a/tap-agent/src/cli.rs b/tap-agent/src/cli.rs new file mode 100644 index 000000000..9a3928829 --- /dev/null +++ b/tap-agent/src/cli.rs @@ -0,0 +1,57 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use clap::Parser; +use indexer_config::{Config as IndexerConfig, ConfigPrefix}; +use std::path::PathBuf; +use tracing::subscriber::{set_global_default, SetGlobalDefaultError}; +use tracing::{error, level_filters::LevelFilter}; +use tracing_subscriber::{EnvFilter, FmtSubscriber}; + +#[derive(Parser)] +pub struct Cli { + /// Path to the configuration file. + /// See https://github.com/graphprotocol/indexer-rs/tree/main/tap-agent for examples. + #[arg(long, value_name = "FILE", verbatim_doc_comment)] + pub config: Option, +} + +/// Sets up tracing, allows log level to be set from the environment variables +fn init_tracing(format: String) -> Result<(), SetGlobalDefaultError> { + let filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + let subscriber_builder: tracing_subscriber::fmt::SubscriberBuilder< + tracing_subscriber::fmt::format::DefaultFields, + tracing_subscriber::fmt::format::Format, + EnvFilter, + > = FmtSubscriber::builder().with_env_filter(filter); + match format.as_str() { + "json" => set_global_default(subscriber_builder.json().finish()), + "full" => set_global_default(subscriber_builder.finish()), + "compact" => set_global_default(subscriber_builder.compact().finish()), + _ => set_global_default(subscriber_builder.with_ansi(true).pretty().finish()), + } +} + +pub fn get_config() -> Result { + let cli = Cli::parse(); + let config = IndexerConfig::parse(ConfigPrefix::Tap, cli.config.as_ref()).map_err(|e| { + error!( + "Invalid configuration file `{}`: {}, if a value is missing you can also use \ + --config to fill the rest of the values", + cli.config.unwrap_or_default().display(), + e + ); + anyhow::anyhow!(e) + })?; + + // add a LogFormat to config + init_tracing("pretty".to_string()).expect( + "Could not set up global default subscriber for logger, check \ + environmental variable `RUST_LOG`", + ); + + Ok(config) +} diff --git a/tap-agent/src/config.rs b/tap-agent/src/config.rs deleted file mode 100644 index 48371fe14..000000000 --- a/tap-agent/src/config.rs +++ /dev/null @@ -1,211 +0,0 @@ -// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::Result; -use clap::Parser; -use indexer_config::{Config as IndexerConfig, ConfigPrefix}; -use reqwest::Url; -use std::path::PathBuf; -use std::{collections::HashMap, str::FromStr}; -use thegraph_core::{Address, DeploymentId}; -use tracing::subscriber::{set_global_default, SetGlobalDefaultError}; -use tracing::{error, level_filters::LevelFilter}; -use tracing_subscriber::{EnvFilter, FmtSubscriber}; - -#[derive(Parser)] -pub struct Cli { - /// Path to the configuration file. - /// See https://github.com/graphprotocol/indexer-rs/tree/main/tap-agent for examples. - #[arg(long, value_name = "FILE", verbatim_doc_comment)] - pub config: Option, -} - -impl From for Config { - fn from(value: IndexerConfig) -> Self { - Self { - ethereum: Ethereum { - indexer_address: value.indexer.indexer_address, - }, - receipts: Receipts { - receipts_verifier_chain_id: value.blockchain.chain_id as u64, - receipts_verifier_address: value.blockchain.receipts_verifier_address, - }, - indexer_infrastructure: IndexerInfrastructure { - metrics_port: value.metrics.port, - graph_node_query_endpoint: value.graph_node.query_url.into(), - graph_node_status_endpoint: value.graph_node.status_url.into(), - log_level: None, - }, - postgres: Postgres { - postgres_url: value.database.get_formated_postgres_url(), - }, - network_subgraph: NetworkSubgraph { - network_subgraph_deployment: value.subgraphs.network.config.deployment_id, - network_subgraph_endpoint: value.subgraphs.network.config.query_url.into(), - network_subgraph_auth_token: value.subgraphs.network.config.query_auth_token, - allocation_syncing_interval_ms: value - .subgraphs - .network - .config - .syncing_interval_secs - .as_millis() as u64, - recently_closed_allocation_buffer_seconds: value - .subgraphs - .network - .recently_closed_allocation_buffer_secs - .as_secs(), - }, - escrow_subgraph: EscrowSubgraph { - escrow_subgraph_deployment: value.subgraphs.escrow.config.deployment_id, - escrow_subgraph_endpoint: value.subgraphs.escrow.config.query_url.into(), - escrow_subgraph_auth_token: value.subgraphs.escrow.config.query_auth_token, - escrow_syncing_interval_ms: value - .subgraphs - .escrow - .config - .syncing_interval_secs - .as_millis() as u64, - }, - tap: Tap { - rav_request_trigger_value: value.tap.get_trigger_value(), - rav_request_timestamp_buffer_ms: value - .tap - .rav_request - .timestamp_buffer_secs - .as_millis() as u64, - rav_request_timeout_secs: value.tap.rav_request.request_timeout_secs.as_secs(), - sender_aggregator_endpoints: value - .tap - .sender_aggregator_endpoints - .into_iter() - .map(|(addr, url)| (addr, url.into())) - .collect(), - rav_request_receipt_limit: value.tap.rav_request.max_receipts_per_request, - max_unnaggregated_fees_per_sender: value - .tap - .max_amount_willing_to_lose_grt - .get_value(), - }, - config: None, - } - } -} - -#[derive(Clone, Debug, Default)] -pub struct Config { - pub ethereum: Ethereum, - pub receipts: Receipts, - pub indexer_infrastructure: IndexerInfrastructure, - pub postgres: Postgres, - pub network_subgraph: NetworkSubgraph, - pub escrow_subgraph: EscrowSubgraph, - pub tap: Tap, - pub config: Option, -} - -#[derive(Clone, Debug, Default)] -pub struct Ethereum { - pub indexer_address: Address, -} - -#[derive(Clone, Debug, Default)] -pub struct Receipts { - pub receipts_verifier_chain_id: u64, - pub receipts_verifier_address: Address, -} - -#[derive(Clone, Debug, Default)] -pub struct IndexerInfrastructure { - pub metrics_port: u16, - pub graph_node_query_endpoint: String, - pub graph_node_status_endpoint: String, - pub log_level: Option, -} - -#[derive(Clone, Debug)] -pub struct Postgres { - pub postgres_url: Url, -} - -impl Default for Postgres { - fn default() -> Self { - Self { - postgres_url: Url::from_str("postgres:://postgres@postgres/postgres").unwrap(), - } - } -} - -#[derive(Clone, Debug, Default)] -pub struct NetworkSubgraph { - pub network_subgraph_deployment: Option, - pub network_subgraph_endpoint: String, - pub network_subgraph_auth_token: Option, - pub allocation_syncing_interval_ms: u64, - pub recently_closed_allocation_buffer_seconds: u64, -} - -#[derive(Clone, Debug, Default)] -pub struct EscrowSubgraph { - pub escrow_subgraph_deployment: Option, - pub escrow_subgraph_endpoint: String, - pub escrow_subgraph_auth_token: Option, - pub escrow_syncing_interval_ms: u64, -} - -#[derive(Clone, Debug, Default)] -pub struct Tap { - pub rav_request_trigger_value: u128, - pub rav_request_timestamp_buffer_ms: u64, - pub rav_request_timeout_secs: u64, - pub sender_aggregator_endpoints: HashMap, - pub rav_request_receipt_limit: u64, - pub max_unnaggregated_fees_per_sender: u128, -} - -/// Sets up tracing, allows log level to be set from the environment variables -fn init_tracing(format: String) -> Result<(), SetGlobalDefaultError> { - let filter = EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(); - let subscriber_builder: tracing_subscriber::fmt::SubscriberBuilder< - tracing_subscriber::fmt::format::DefaultFields, - tracing_subscriber::fmt::format::Format, - EnvFilter, - > = FmtSubscriber::builder().with_env_filter(filter); - match format.as_str() { - "json" => set_global_default(subscriber_builder.json().finish()), - "full" => set_global_default(subscriber_builder.finish()), - "compact" => set_global_default(subscriber_builder.compact().finish()), - _ => set_global_default(subscriber_builder.with_ansi(true).pretty().finish()), - } -} - -impl Config { - pub fn from_cli() -> Result { - let cli = Cli::parse(); - let indexer_config = - IndexerConfig::parse(ConfigPrefix::Tap, cli.config.as_ref()).map_err(|e| { - error!( - "Invalid configuration file `{}`: {}, if a value is missing you can also use \ - --config to fill the rest of the values", - cli.config.unwrap_or_default().display(), - e - ); - anyhow::anyhow!(e) - })?; - let config: Config = indexer_config.into(); - - // Enables tracing under RUST_LOG variable - if let Some(log_setting) = &config.indexer_infrastructure.log_level { - std::env::set_var("RUST_LOG", log_setting); - }; - - // add a LogFormat to config - init_tracing("pretty".to_string()).expect( - "Could not set up global default subscriber for logger, check \ - environmental variable `RUST_LOG`", - ); - - Ok(config) - } -} diff --git a/tap-agent/src/database.rs b/tap-agent/src/database.rs index fd4c6eeda..b2b532df3 100644 --- a/tap-agent/src/database.rs +++ b/tap-agent/src/database.rs @@ -6,10 +6,8 @@ use std::time::Duration; use sqlx::{postgres::PgPoolOptions, PgPool}; use tracing::debug; -use crate::config; - -pub async fn connect(config: &config::Postgres) -> PgPool { - let url = &config.postgres_url; +pub async fn connect(config: indexer_config::DatabaseConfig) -> PgPool { + let url = &config.get_formated_postgres_url(); debug!( postgres_host = tracing::field::debug(&url.host()), postgres_port = tracing::field::debug(&url.port()), diff --git a/tap-agent/src/lib.rs b/tap-agent/src/lib.rs index 48b675caf..b5b2cd590 100644 --- a/tap-agent/src/lib.rs +++ b/tap-agent/src/lib.rs @@ -2,23 +2,22 @@ // SPDX-License-Identifier: Apache-2.0 use alloy::dyn_abi::Eip712Domain; +use indexer_config::Config; use lazy_static::lazy_static; use tap_core::tap_eip712_domain; -use crate::config::Config; - lazy_static! { - pub static ref CONFIG: Config = Config::from_cli().expect("Failed to load configuration"); + pub static ref CONFIG: Config = cli::get_config().expect("Failed to load configuration"); pub static ref EIP_712_DOMAIN: Eip712Domain = tap_eip712_domain( - CONFIG.receipts.receipts_verifier_chain_id, - CONFIG.receipts.receipts_verifier_address, + CONFIG.blockchain.chain_id as u64, + CONFIG.blockchain.receipts_verifier_address, ); } pub mod adaptative_concurrency; pub mod agent; pub mod backoff; -pub mod config; +pub mod cli; pub mod database; pub mod metrics; pub mod tap; diff --git a/tap-agent/src/main.rs b/tap-agent/src/main.rs index 426ff74d2..942e88b88 100644 --- a/tap-agent/src/main.rs +++ b/tap-agent/src/main.rs @@ -16,9 +16,7 @@ async fn main() -> Result<()> { let (manager, handler) = agent::start_agent().await; info!("TAP Agent started."); - tokio::spawn(metrics::run_server( - CONFIG.indexer_infrastructure.metrics_port, - )); + tokio::spawn(metrics::run_server(CONFIG.metrics.port)); info!("Metrics port opened"); // Have tokio wait for SIGTERM or SIGINT. diff --git a/tap-agent/src/tap/context/checks/allocation_id.rs b/tap-agent/src/tap/context/checks/allocation_id.rs index 62d608598..87f05fbe2 100644 --- a/tap-agent/src/tap/context/checks/allocation_id.rs +++ b/tap-agent/src/tap/context/checks/allocation_id.rs @@ -16,8 +16,6 @@ use tap_core::receipt::{ use tokio::time::sleep; use tracing::error; -use crate::config; - pub struct AllocationId { tap_allocation_redeemed: Eventual, allocation_id: Address, @@ -25,17 +23,18 @@ pub struct AllocationId { impl AllocationId { pub fn new( + indexer_address: Address, + escrow_polling_interval: Duration, sender_id: Address, allocation_id: Address, escrow_subgraph: &'static SubgraphClient, - config: &'static config::Config, ) -> Self { let tap_allocation_redeemed = tap_allocation_redeemed_eventual( allocation_id, sender_id, - config.ethereum.indexer_address, + indexer_address, escrow_subgraph, - config.escrow_subgraph.escrow_syncing_interval_ms, + escrow_polling_interval, ); Self { @@ -76,9 +75,9 @@ fn tap_allocation_redeemed_eventual( sender_address: Address, indexer_address: Address, escrow_subgraph: &'static SubgraphClient, - escrow_subgraph_polling_interval_ms: u64, + escrow_polling_interval: Duration, ) -> Eventual { - eventuals::timer(Duration::from_millis(escrow_subgraph_polling_interval_ms)).map_with_retry( + eventuals::timer(escrow_polling_interval).map_with_retry( move |_| async move { query_escrow_check_transactions( allocation_id, @@ -94,7 +93,7 @@ fn tap_allocation_redeemed_eventual( "Failed to check the escrow redeem status for allocation {} and sender {}: {}", allocation_id, sender_address, error ); - sleep(Duration::from_millis(escrow_subgraph_polling_interval_ms).div_f32(2.)) + sleep(escrow_polling_interval.div_f32(2.)) }, ) }