diff --git a/crates/service/src/routes/cost.rs b/crates/service/src/routes/cost.rs index 55ae81c43..b3b35f806 100644 --- a/crates/service/src/routes/cost.rs +++ b/crates/service/src/routes/cost.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use std::str::FromStr; -use std::sync::Arc; use crate::database::cost_model::{self, CostModel}; use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject}; @@ -127,7 +126,7 @@ impl Query { ctx: &Context<'_>, deployment_ids: Vec, ) -> Result, anyhow::Error> { - let pool = &ctx.data_unchecked::>().database; + let pool = &ctx.data_unchecked::().database; let cost_models = cost_model::cost_models(pool, &deployment_ids).await?; Ok(cost_models.into_iter().map(|m| m.into()).collect()) } @@ -137,7 +136,7 @@ impl Query { ctx: &Context<'_>, deployment_id: DeploymentId, ) -> Result, anyhow::Error> { - let pool = &ctx.data_unchecked::>().database; + let pool = &ctx.data_unchecked::().database; cost_model::cost_model(pool, &deployment_id) .await .map(|model_opt| model_opt.map(GraphQlCostModel::from)) @@ -151,7 +150,7 @@ pub async fn build_schema() -> CostSchema { } pub async fn cost( - State(state): State>, + State(state): State, req: GraphQLRequest, ) -> GraphQLResponse { state diff --git a/crates/service/src/routes/request_handler.rs b/crates/service/src/routes/request_handler.rs index db7a28511..924830521 100644 --- a/crates/service/src/routes/request_handler.rs +++ b/crates/service/src/routes/request_handler.rs @@ -19,8 +19,7 @@ use thegraph_core::DeploymentId; use tracing::trace; use crate::service::{ - AttestationOutput, IndexerServiceError, IndexerServiceImpl, IndexerServiceResponse, - IndexerServiceState, TapReceipt, + AttestationOutput, IndexerServiceError, IndexerServiceResponse, IndexerServiceState, TapReceipt, }; lazy_static! { @@ -45,16 +44,13 @@ lazy_static! { } -pub async fn request_handler( +pub async fn request_handler( Path(manifest_id): Path, typed_header: TypedHeader, - state: State>>, + state: State>, headers: HeaderMap, body: String, -) -> Result> -where - I: IndexerServiceImpl + Sync + Send + 'static, -{ +) -> Result { _request_handler(manifest_id, typed_header, state, headers, body) .await .inspect_err(|_| { @@ -64,16 +60,13 @@ where }) } -async fn _request_handler( +async fn _request_handler( manifest_id: DeploymentId, TypedHeader(receipt): TypedHeader, - State(state): State>>, + State(state): State>, headers: HeaderMap, req: String, -) -> Result> -where - I: IndexerServiceImpl + Sync + Send + 'static, -{ +) -> Result { trace!("Handling request for deployment `{manifest_id}`"); let request: QueryBody = diff --git a/crates/service/src/routes/status.rs b/crates/service/src/routes/status.rs index 065e7e542..93605b622 100644 --- a/crates/service/src/routes/status.rs +++ b/crates/service/src/routes/status.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use std::collections::HashSet; -use std::sync::Arc; use async_graphql_axum::GraphQLRequest; use axum::{extract::State, response::IntoResponse, Json}; @@ -58,7 +57,7 @@ impl IntoRequestParameters for WrappedGraphQLRequest { // Custom middleware function to process the request before reaching the main handler pub async fn status( - State(state): State>, + State(state): State, request: GraphQLRequest, ) -> Result { let request = request.into_inner(); diff --git a/crates/service/src/service.rs b/crates/service/src/service.rs index 026ab2beb..0e13d64e2 100644 --- a/crates/service/src/service.rs +++ b/crates/service/src/service.rs @@ -9,7 +9,6 @@ use anyhow::anyhow; use async_graphql::{EmptySubscription, Schema}; use async_graphql_axum::GraphQL; use axum::{ - async_trait, routing::{post, post_service}, Json, Router, }; @@ -28,7 +27,7 @@ use crate::{ dips::{AgreementStore, InMemoryAgreementStore}, }, routes::dips::Price, - service::indexer_service::{IndexerService, IndexerServiceOptions, IndexerServiceRelease}, + service::indexer_service::{IndexerServiceOptions, IndexerServiceRelease}, }; use clap::Parser; use tracing::error; @@ -37,13 +36,12 @@ mod indexer_service; mod tap_receipt_header; pub use indexer_service::{ - AttestationOutput, IndexerServiceError, IndexerServiceImpl, IndexerServiceResponse, - IndexerServiceState, + AttestationOutput, IndexerServiceError, IndexerServiceResponse, IndexerServiceState, }; pub use tap_receipt_header::TapReceipt; #[derive(Debug)] -struct SubgraphServiceResponse { +pub struct SubgraphServiceResponse { inner: String, attestable: bool, } @@ -78,6 +76,7 @@ impl IndexerServiceResponse for SubgraphServiceResponse { } } +#[derive(Clone)] pub struct SubgraphServiceState { pub config: &'static Config, pub database: PgPool, @@ -87,27 +86,22 @@ pub struct SubgraphServiceState { pub graph_node_query_base_url: &'static Url, } -struct SubgraphService { - state: Arc, +pub struct SubgraphService { + state: SubgraphServiceState, } impl SubgraphService { - fn new(state: Arc) -> Self { + fn new(state: SubgraphServiceState) -> Self { Self { state } } } -#[async_trait] -impl IndexerServiceImpl for SubgraphService { - type Error = SubgraphServiceError; - type Response = SubgraphServiceResponse; - type State = SubgraphServiceState; - - async fn process_request( +impl SubgraphService { + pub async fn process_request( &self, deployment: DeploymentId, request: Request, - ) -> Result { + ) -> Result { let deployment_url = self .state .graph_node_query_base_url @@ -166,7 +160,7 @@ pub async fn run() -> anyhow::Result<()> { // Some of the subgraph service configuration goes into the so-called // "state", which will be passed to any request handler, middleware etc. // that is involved in serving requests - let state = Arc::new(SubgraphServiceState { + let state = SubgraphServiceState { config, database: database::connect(config.database.clone().get_formated_postgres_url().as_ref()) .await, @@ -178,7 +172,7 @@ pub async fn run() -> anyhow::Result<()> { .expect("Failed to init HTTP client for Graph Node"), graph_node_status_url: &config.graph_node.status_url, graph_node_query_base_url: &config.graph_node.query_url, - }); + }; let agreement_store: Arc = Arc::new(InMemoryAgreementStore::default()); let prices: Vec = vec![]; @@ -215,7 +209,7 @@ pub async fn run() -> anyhow::Result<()> { router = router.route("/dips", post_service(GraphQL::new(schema))); } - IndexerService::run(IndexerServiceOptions { + indexer_service::run(IndexerServiceOptions { release, config, url_namespace: "subgraphs", diff --git a/crates/service/src/service/indexer_service.rs b/crates/service/src/service/indexer_service.rs index a36bd2dff..3325402e6 100644 --- a/crates/service/src/service/indexer_service.rs +++ b/crates/service/src/service/indexer_service.rs @@ -7,7 +7,6 @@ use axum::extract::MatchedPath; use axum::extract::Request as ExtractRequest; use axum::http::{Method, Request}; use axum::{ - async_trait, response::{IntoResponse, Response}, routing::{get, post}, Json, Router, @@ -23,14 +22,14 @@ use indexer_common::{ }; use prometheus::TextEncoder; use reqwest::StatusCode; -use serde::{de::DeserializeOwned, Serialize}; +use serde::Serialize; use sqlx::postgres::PgPoolOptions; use std::{ collections::HashMap, error::Error, fmt::Debug, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration, }; use tap_core::{manager::Manager, receipt::checks::CheckList, tap_eip712_domain}; -use thegraph_core::{Address, Attestation, DeploymentId}; +use thegraph_core::{Address, Attestation}; use thiserror::Error; use tokio::net::TcpListener; use tokio::signal; @@ -41,12 +40,15 @@ use tower_http::{cors, cors::CorsLayer, normalize_path::NormalizePath, trace::Tr use tracing::warn; use tracing::{error, info, info_span}; +use crate::error::SubgraphServiceError; use crate::routes::health; use crate::routes::request_handler; use crate::routes::static_subgraph_request_handler; use crate::tap::IndexerTapContext; use indexer_config::Config; +use super::SubgraphService; + pub trait IndexerServiceResponse { type Data: IntoResponse; type Error: Error; @@ -61,24 +63,8 @@ pub enum AttestationOutput { Attestable, } -#[async_trait] -pub trait IndexerServiceImpl { - type Error: std::error::Error; - type Response: IndexerServiceResponse + Sized; - type State: Send + Sync; - - async fn process_request( - &self, - manifest_id: DeploymentId, - request: Request, - ) -> Result; -} - #[derive(Debug, Error)] -pub enum IndexerServiceError -where - E: std::error::Error, -{ +pub enum IndexerServiceError { #[error("Issues with provided receipt: {0}")] ReceiptError(tap_core::Error), #[error("No attestation signer found for allocation `{0}`")] @@ -86,7 +72,7 @@ where #[error("Invalid request body: {0}")] InvalidRequest(anyhow::Error), #[error("Error while processing the request: {0}")] - ProcessingError(E), + ProcessingError(SubgraphServiceError), #[error("No valid receipt or free query auth token provided")] Unauthorized, #[error("Invalid free query auth token")] @@ -101,10 +87,7 @@ where EscrowAccount(EscrowAccountsError), } -impl IntoResponse for IndexerServiceError -where - E: std::error::Error, -{ +impl IntoResponse for IndexerServiceError { fn into_response(self) -> Response { use IndexerServiceError::*; @@ -157,389 +140,385 @@ impl From<&BuildInfo> for IndexerServiceRelease { } } -pub struct IndexerServiceOptions -where - I: IndexerServiceImpl + Sync + Send + 'static, -{ - pub service_impl: I, +pub struct IndexerServiceOptions { + pub service_impl: SubgraphService, pub config: &'static Config, pub release: IndexerServiceRelease, pub url_namespace: &'static str, - pub extra_routes: Router>>, + pub extra_routes: Router>, } -pub struct IndexerServiceState -where - I: IndexerServiceImpl + Sync + Send + 'static, -{ +pub struct IndexerServiceState { pub config: Config, pub attestation_signers: Receiver>, pub tap_manager: Manager, - pub service_impl: Arc, + pub service_impl: SubgraphService, // tap pub escrow_accounts: Receiver, pub domain_separator: Eip712Domain, } -pub struct IndexerService {} - -impl IndexerService { - pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> - where - I: IndexerServiceImpl + Sync + Send + 'static, - { - let http_client = reqwest::Client::builder() - .tcp_nodelay(true) - .timeout(Duration::from_secs(30)) - .build() - .expect("Failed to init HTTP client"); - - let network_subgraph: &'static SubgraphClient = Box::leak(Box::new( - SubgraphClient::new( - http_client.clone(), - options - .config - .subgraphs - .network - .config - .deployment_id - .map(|deployment| { - DeploymentDetails::for_graph_node_url( - options.config.graph_node.status_url.clone(), - options.config.graph_node.query_url.clone(), - deployment, - ) - }) - .transpose() - .expect( - "Failed to parse graph node query endpoint and network subgraph deployment", - ), - DeploymentDetails::for_query_url_with_token( - options.config.subgraphs.network.config.query_url.as_ref(), - options - .config - .subgraphs - .network - .config - .query_auth_token - .clone(), - )?, - ) - .await, - )); +const HTTP_CLIENT_TIMEOUT: Duration = Duration::from_secs(30); +const DATABASE_TIMEOUT: Duration = Duration::from_secs(30); +const DATABASE_MAX_CONNECTIONS: u32 = 50; - // Identify the dispute manager for the configured network - let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(3600)) - .await - .expect("Failed to initialize dispute manager"); +const MISC_BURST_SIZE: u32 = 10; +const MISC_BURST_PER_MILLISECOND: u64 = 100; + +const STATIC_BURST_SIZE: u32 = 50; +const STATIC_BURST_PER_MILLISECOND: u64 = 20; + +const DISPUTE_MANAGER_INTERVAL: Duration = Duration::from_secs(3600); - // Monitor the indexer's own allocations - let allocations = indexer_allocations( - network_subgraph, - options.config.indexer.indexer_address, +pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { + let http_client = reqwest::Client::builder() + .tcp_nodelay(true) + .timeout(HTTP_CLIENT_TIMEOUT) + .build() + .expect("Failed to init HTTP client"); + + let network_subgraph: &'static SubgraphClient = Box::leak(Box::new( + SubgraphClient::new( + http_client.clone(), options .config .subgraphs .network .config - .syncing_interval_secs, + .deployment_id + .map(|deployment| { + DeploymentDetails::for_graph_node_url( + options.config.graph_node.status_url.clone(), + options.config.graph_node.query_url.clone(), + deployment, + ) + }) + .transpose() + .expect( + "Failed to parse graph node query endpoint and network subgraph deployment", + ), + DeploymentDetails::for_query_url_with_token( + options.config.subgraphs.network.config.query_url.as_ref(), + options + .config + .subgraphs + .network + .config + .query_auth_token + .clone(), + )?, + ) + .await, + )); + + // Identify the dispute manager for the configured network + let dispute_manager = dispute_manager(network_subgraph, DISPUTE_MANAGER_INTERVAL) + .await + .expect("Failed to initialize dispute manager"); + + // Monitor the indexer's own allocations + let allocations = indexer_allocations( + network_subgraph, + options.config.indexer.indexer_address, + options + .config + .subgraphs + .network + .config + .syncing_interval_secs, + options + .config + .subgraphs + .network + .recently_closed_allocation_buffer_secs, + ) + .await + .expect("Failed to initialize indexer_allocations watcher"); + + // Maintain an up-to-date set of attestation signers, one for each + // allocation + let attestation_signers = attestation_signers( + allocations.clone(), + options.config.indexer.operator_mnemonic.clone(), + options.config.blockchain.chain_id as u64, + dispute_manager, + ); + + let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new( + SubgraphClient::new( + http_client, options .config .subgraphs - .network - .recently_closed_allocation_buffer_secs, - ) - .await - .expect("Failed to initialize indexer_allocations watcher"); - - // Maintain an up-to-date set of attestation signers, one for each - // allocation - let attestation_signers = attestation_signers( - allocations.clone(), - options.config.indexer.operator_mnemonic.clone(), - options.config.blockchain.chain_id as u64, - dispute_manager, - ); - - let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new( - SubgraphClient::new( - http_client, + .escrow + .config + .deployment_id + .map(|deployment| { + DeploymentDetails::for_graph_node_url( + options.config.graph_node.status_url.clone(), + options.config.graph_node.query_url.clone(), + deployment, + ) + }) + .transpose() + .expect("Failed to parse graph node query endpoint and escrow subgraph deployment"), + DeploymentDetails::for_query_url_with_token( + options.config.subgraphs.escrow.config.query_url.as_ref(), options .config .subgraphs .escrow .config - .deployment_id - .map(|deployment| { - DeploymentDetails::for_graph_node_url( - options.config.graph_node.status_url.clone(), - options.config.graph_node.query_url.clone(), - deployment, - ) - }) - .transpose() - .expect( - "Failed to parse graph node query endpoint and escrow subgraph deployment", - ), - DeploymentDetails::for_query_url_with_token( - options.config.subgraphs.escrow.config.query_url.as_ref(), - options - .config - .subgraphs - .escrow - .config - .query_auth_token - .clone(), - )?, - ) - .await, - )); - - let escrow_accounts = escrow_accounts( - escrow_subgraph, - options.config.indexer.indexer_address, - options.config.subgraphs.escrow.config.syncing_interval_secs, - true, // Reject thawing signers eagerly + .query_auth_token + .clone(), + )?, ) - .await - .expect("Error creating escrow_accounts channel"); - - // Establish Database connection necessary for serving indexer management - // requests with defined schema - // Note: Typically, you'd call `sqlx::migrate!();` here to sync the models - // which defaults to files in "./migrations" to sync the database; - // however, this can cause conflicts with the migrations run by indexer - // agent. Hence we leave syncing and migrating entirely to the agent and - // assume the models are up to date in the service. - let database = PgPoolOptions::new() - .max_connections(50) - .acquire_timeout(Duration::from_secs(30)) - .connect( - options - .config - .database - .clone() - .get_formated_postgres_url() - .as_ref(), - ) - .await?; - - let domain_separator = tap_eip712_domain( - options.config.blockchain.chain_id as u64, - options.config.blockchain.receipts_verifier_address, - ); - let indexer_context = - IndexerTapContext::new(database.clone(), domain_separator.clone()).await; - let timestamp_error_tolerance = options.config.tap.rav_request.timestamp_buffer_secs; - - let receipt_max_value = options.config.service.tap.max_receipt_value_grt.get_value(); - - let checks = IndexerTapContext::get_checks( - database, - allocations, - escrow_accounts.clone(), - domain_separator.clone(), - timestamp_error_tolerance, - receipt_max_value, + .await, + )); + + let escrow_accounts = escrow_accounts( + escrow_subgraph, + options.config.indexer.indexer_address, + options.config.subgraphs.escrow.config.syncing_interval_secs, + true, // Reject thawing signers eagerly + ) + .await + .expect("Error creating escrow_accounts channel"); + + // Establish Database connection necessary for serving indexer management + // requests with defined schema + // Note: Typically, you'd call `sqlx::migrate!();` here to sync the models + // which defaults to files in "./migrations" to sync the database; + // however, this can cause conflicts with the migrations run by indexer + // agent. Hence we leave syncing and migrating entirely to the agent and + // assume the models are up to date in the service. + let database = PgPoolOptions::new() + .max_connections(DATABASE_MAX_CONNECTIONS) + .acquire_timeout(DATABASE_TIMEOUT) + .connect( + options + .config + .database + .clone() + .get_formated_postgres_url() + .as_ref(), ) - .await; + .await?; + + let domain_separator = tap_eip712_domain( + options.config.blockchain.chain_id as u64, + options.config.blockchain.receipts_verifier_address, + ); + let indexer_context = IndexerTapContext::new(database.clone(), domain_separator.clone()).await; + let timestamp_error_tolerance = options.config.tap.rav_request.timestamp_buffer_secs; + + let receipt_max_value = options.config.service.tap.max_receipt_value_grt.get_value(); + + let checks = IndexerTapContext::get_checks( + database, + allocations, + escrow_accounts.clone(), + domain_separator.clone(), + timestamp_error_tolerance, + receipt_max_value, + ) + .await; + + let tap_manager = Manager::new( + domain_separator.clone(), + indexer_context, + CheckList::new(checks), + ); + + let state = Arc::new(IndexerServiceState { + config: options.config.clone(), + attestation_signers, + tap_manager, + service_impl: options.service_impl, + escrow_accounts, + domain_separator, + }); + + // Rate limits by allowing bursts of 10 requests and requiring 100ms of + // time between consecutive requests after that, effectively rate + // limiting to 10 req/s. + let misc_rate_limiter = GovernorLayer { + config: Arc::new( + GovernorConfigBuilder::default() + .per_millisecond(MISC_BURST_PER_MILLISECOND) + .burst_size(MISC_BURST_SIZE) + .finish() + .expect("Failed to set up rate limiting"), + ), + }; - let tap_manager = Manager::new( - domain_separator.clone(), - indexer_context, - CheckList::new(checks), - ); + let operator_address = Json( + serde_json::json!({ "publicKey": public_key(&options.config.indexer.operator_mnemonic)?}), + ); + + let mut misc_routes = Router::new() + .route("/", get("Service is up and running")) + .route("/version", get(Json(options.release))) + .route("/info", get(operator_address)) + .layer(misc_rate_limiter.clone()); + + // Rate limits by allowing bursts of 50 requests and requiring 20ms of + // time between consecutive requests after that, effectively rate + // limiting to 50 req/s. + let static_subgraph_rate_limiter = GovernorLayer { + config: Arc::new( + GovernorConfigBuilder::default() + .per_millisecond(STATIC_BURST_PER_MILLISECOND) + .burst_size(STATIC_BURST_SIZE) + .finish() + .expect("Failed to set up rate limiting"), + ), + }; - let state = Arc::new(IndexerServiceState { - config: options.config.clone(), - attestation_signers, - tap_manager, - service_impl: Arc::new(options.service_impl), - escrow_accounts, - domain_separator, - }); - - // Rate limits by allowing bursts of 10 requests and requiring 100ms of - // time between consecutive requests after that, effectively rate - // limiting to 10 req/s. - let misc_rate_limiter = GovernorLayer { - config: Arc::new( - GovernorConfigBuilder::default() - .per_millisecond(100) - .burst_size(10) - .finish() - .expect("Failed to set up rate limiting"), - ), - }; + // Check subgraph Health + misc_routes = misc_routes + .route( + "/subgraph/health/:deployment_id", + get(health).with_state(options.config.graph_node.clone()), + ) + .layer(misc_rate_limiter); - let operator_address = Json( - serde_json::json!({ "publicKey": public_key(&options.config.indexer.operator_mnemonic)?}), - ); + if options.config.service.serve_network_subgraph { + if let Some(free_auth_token) = &options.config.service.serve_auth_token { + info!("Serving network subgraph at /network"); - let mut misc_routes = Router::new() - .route("/", get("Service is up and running")) - .route("/version", get(Json(options.release))) - .route("/info", get(operator_address)) - .layer(misc_rate_limiter.clone()); - - // Rate limits by allowing bursts of 50 requests and requiring 20ms of - // time between consecutive requests after that, effectively rate - // limiting to 50 req/s. - let static_subgraph_rate_limiter = GovernorLayer { - config: Arc::new( - GovernorConfigBuilder::default() - .per_millisecond(20) - .burst_size(50) - .finish() - .expect("Failed to set up rate limiting"), - ), - }; + let auth_layer = ValidateRequestHeaderLayer::bearer(free_auth_token); - // Check subgraph Health - misc_routes = misc_routes - .route( - "/subgraph/health/:deployment_id", - get(health).with_state(options.config.graph_node.clone()), - ) - .layer(misc_rate_limiter); - - if options.config.service.serve_network_subgraph { - if let Some(free_auth_token) = &options.config.service.serve_auth_token { - info!("Serving network subgraph at /network"); - - let auth_layer = ValidateRequestHeaderLayer::bearer(free_auth_token); - - misc_routes = misc_routes.route( - "/network", - post(static_subgraph_request_handler) - .route_layer(auth_layer) - .with_state(network_subgraph) - .route_layer(static_subgraph_rate_limiter.clone()), - ); - } else { - warn!("`serve_network_subgraph` is enabled but no `serve_auth_token` provided. Disabling it."); - } + misc_routes = misc_routes.route( + "/network", + post(static_subgraph_request_handler) + .route_layer(auth_layer) + .with_state(network_subgraph) + .route_layer(static_subgraph_rate_limiter.clone()), + ); + } else { + warn!("`serve_network_subgraph` is enabled but no `serve_auth_token` provided. Disabling it."); } + } - if options.config.service.serve_escrow_subgraph { - if let Some(free_auth_token) = &options.config.service.serve_auth_token { - info!("Serving escrow subgraph at /escrow"); - - let auth_layer = ValidateRequestHeaderLayer::bearer(free_auth_token); - - misc_routes = misc_routes.route( - "/escrow", - post(static_subgraph_request_handler) - .route_layer(auth_layer) - .with_state(escrow_subgraph) - .route_layer(static_subgraph_rate_limiter), - ) - } else { - warn!("`serve_escrow_subgraph` is enabled but no `serve_auth_token` provided. Disabling it."); - } - } + if options.config.service.serve_escrow_subgraph { + if let Some(free_auth_token) = &options.config.service.serve_auth_token { + info!("Serving escrow subgraph at /escrow"); - misc_routes = misc_routes.with_state(state.clone()); + let auth_layer = ValidateRequestHeaderLayer::bearer(free_auth_token); - let data_routes = Router::new() - .route( - PathBuf::from(&options.config.service.url_prefix) - .join(format!("{}/id/:id", options.url_namespace)) - .to_str() - .expect("Failed to set up `/{url_namespace}/id/:id` route"), - post(request_handler::), + misc_routes = misc_routes.route( + "/escrow", + post(static_subgraph_request_handler) + .route_layer(auth_layer) + .with_state(escrow_subgraph) + .route_layer(static_subgraph_rate_limiter), ) - .with_state(state.clone()); - - let router = NormalizePath::trim_trailing_slash( - misc_routes - .merge(data_routes) - .merge(options.extra_routes) - .layer( - CorsLayer::new() - .allow_origin(cors::Any) - .allow_headers(cors::Any) - .allow_methods([Method::OPTIONS, Method::POST, Method::GET]), - ) - .layer( - TraceLayer::new_for_http() - .make_span_with(|req: &Request<_>| { - let method = req.method(); - let uri = req.uri(); - let matched_path = req - .extensions() - .get::() - .map(MatchedPath::as_str); - - info_span!( - "http_request", - %method, - %uri, - matched_path, - ) - }) - // we disable failures here because we doing our own error logging - .on_failure( - |_error: tower_http::classify::ServerErrorsFailureClass, - _latency: Duration, - _span: &tracing::Span| {}, - ), - ) - .with_state(state), - ); - - Self::serve_metrics(options.config.metrics.get_socket_addr()); + } else { + warn!("`serve_escrow_subgraph` is enabled but no `serve_auth_token` provided. Disabling it."); + } + } - info!( - address = %options.config.service.host_and_port, - "Serving requests", - ); - let listener = TcpListener::bind(&options.config.service.host_and_port) - .await - .expect("Failed to bind to indexer-service port"); + misc_routes = misc_routes.with_state(state.clone()); - Ok(serve( - listener, - ServiceExt::::into_make_service_with_connect_info::(router), + let data_routes = Router::new() + .route( + PathBuf::from(&options.config.service.url_prefix) + .join(format!("{}/id/:id", options.url_namespace)) + .to_str() + .expect("Failed to set up `/{url_namespace}/id/:id` route"), + post(request_handler), ) - .with_graceful_shutdown(shutdown_signal()) - .await?) - } + .with_state(state.clone()); + + let router = NormalizePath::trim_trailing_slash( + misc_routes + .merge(data_routes) + .merge(options.extra_routes) + .layer( + CorsLayer::new() + .allow_origin(cors::Any) + .allow_headers(cors::Any) + .allow_methods([Method::OPTIONS, Method::POST, Method::GET]), + ) + .layer( + TraceLayer::new_for_http() + .make_span_with(|req: &Request<_>| { + let method = req.method(); + let uri = req.uri(); + let matched_path = req + .extensions() + .get::() + .map(MatchedPath::as_str); + + info_span!( + "http_request", + %method, + %uri, + matched_path, + ) + }) + // we disable failures here because we doing our own error logging + .on_failure( + |_error: tower_http::classify::ServerErrorsFailureClass, + _latency: Duration, + _span: &tracing::Span| {}, + ), + ) + .with_state(state), + ); + + serve_metrics(options.config.metrics.get_socket_addr()); + + info!( + address = %options.config.service.host_and_port, + "Serving requests", + ); + let listener = TcpListener::bind(&options.config.service.host_and_port) + .await + .expect("Failed to bind to indexer-service port"); + + Ok(serve( + listener, + ServiceExt::::into_make_service_with_connect_info::(router), + ) + .with_graceful_shutdown(shutdown_signal()) + .await?) +} - fn serve_metrics(host_and_port: SocketAddr) { - info!(address = %host_and_port, "Serving prometheus metrics"); - - tokio::spawn(async move { - let router = Router::new().route( - "/metrics", - get(|| async { - let metric_families = prometheus::gather(); - let encoder = TextEncoder::new(); - - match encoder.encode_to_string(&metric_families) { - Ok(s) => (StatusCode::OK, s), - Err(e) => { - error!("Error encoding metrics: {}", e); - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Error encoding metrics: {}", e), - ) - } +fn serve_metrics(host_and_port: SocketAddr) { + info!(address = %host_and_port, "Serving prometheus metrics"); + + tokio::spawn(async move { + let router = Router::new().route( + "/metrics", + get(|| async { + let metric_families = prometheus::gather(); + let encoder = TextEncoder::new(); + + match encoder.encode_to_string(&metric_families) { + Ok(s) => (StatusCode::OK, s), + Err(e) => { + error!("Error encoding metrics: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Error encoding metrics: {}", e), + ) } - }), - ); + } + }), + ); - serve( - TcpListener::bind(host_and_port) - .await - .expect("Failed to bind to metrics port"), - router.into_make_service(), - ) - .await - .expect("Failed to serve metrics") - }); - } + serve( + TcpListener::bind(host_and_port) + .await + .expect("Failed to bind to metrics port"), + router.into_make_service(), + ) + .await + .expect("Failed to serve metrics") + }); } pub async fn shutdown_signal() {