diff --git a/crates/service/src/error.rs b/crates/service/src/error.rs index 2180a7087..aef8e1d8f 100644 --- a/crates/service/src/error.rs +++ b/crates/service/src/error.rs @@ -1,7 +1,6 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use alloy::primitives::Address; use anyhow::Error; use axum::{ response::{IntoResponse, Response}, @@ -27,12 +26,6 @@ pub enum IndexerServiceError { #[error("Issues with provided receipt: {0}")] ReceiptError(#[from] tap_core::Error), - #[error("No attestation signer found for allocation `{0}`")] - NoSignerForAllocation(Address), - #[error("Error while processing the request: {0}")] - ProcessingError(SubgraphServiceError), - #[error("Failed to sign attestation")] - FailedToSignAttestation, #[error("There was an error while accessing escrow account: {0}")] EscrowAccount(#[from] EscrowAccountsError), @@ -48,12 +41,10 @@ impl IntoResponse for IndexerServiceError { } let status = match self { - NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR, - - ReceiptError(_) | EscrowAccount(_) | ProcessingError(_) => StatusCode::BAD_REQUEST, + ReceiptError(_) | EscrowAccount(_) => StatusCode::BAD_REQUEST, ReceiptNotFound => StatusCode::PAYMENT_REQUIRED, DeploymentIdNotFound => StatusCode::INTERNAL_SERVER_ERROR, - AxumError(_) => StatusCode::BAD_REQUEST, + AxumError(_) => StatusCode::INTERNAL_SERVER_ERROR, SerializationError(_) => StatusCode::BAD_REQUEST, }; tracing::error!(%self, "An IndexerServiceError occoured."); diff --git a/crates/service/src/middleware.rs b/crates/service/src/middleware.rs index 968a97909..605eb93a4 100644 --- a/crates/service/src/middleware.rs +++ b/crates/service/src/middleware.rs @@ -1,19 +1,23 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +mod allocation; +mod attestation; +mod attestation_signer; pub mod auth; -mod inject_allocation; -mod inject_context; -mod inject_deployment; -mod inject_labels; -mod inject_receipt; -mod inject_sender; +mod deployment; +mod labels; mod prometheus_metrics; +mod sender; +mod tap_context; +mod tap_receipt; -pub use inject_allocation::{allocation_middleware, Allocation, AllocationState}; -pub use inject_context::context_middleware; -pub use inject_deployment::deployment_middleware; -pub use inject_labels::labels_middleware; -pub use inject_receipt::receipt_middleware; -pub use inject_sender::{sender_middleware, SenderState}; +pub use allocation::{allocation_middleware, Allocation, AllocationState}; +pub use attestation::{attestation_middleware, AttestationInput}; +pub use attestation_signer::{signer_middleware, AttestationState}; +pub use deployment::deployment_middleware; +pub use labels::labels_middleware; pub use prometheus_metrics::PrometheusMetricsMiddlewareLayer; +pub use sender::{sender_middleware, SenderState}; +pub use tap_context::context_middleware; +pub use tap_receipt::receipt_middleware; diff --git a/crates/service/src/middleware/inject_allocation.rs b/crates/service/src/middleware/allocation.rs similarity index 98% rename from crates/service/src/middleware/inject_allocation.rs rename to crates/service/src/middleware/allocation.rs index d9b572b82..f3fd437e0 100644 --- a/crates/service/src/middleware/inject_allocation.rs +++ b/crates/service/src/middleware/allocation.rs @@ -58,7 +58,7 @@ pub async fn allocation_middleware( #[cfg(test)] mod tests { - use crate::middleware::inject_allocation::Allocation; + use crate::middleware::allocation::Allocation; use super::{allocation_middleware, AllocationState}; diff --git a/crates/service/src/middleware/attestation.rs b/crates/service/src/middleware/attestation.rs new file mode 100644 index 000000000..2185609ce --- /dev/null +++ b/crates/service/src/middleware/attestation.rs @@ -0,0 +1,208 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::string::FromUtf8Error; + +use axum::{ + body::to_bytes, + extract::Request, + middleware::Next, + response::{IntoResponse, Response}, +}; +use reqwest::StatusCode; +use serde::Serialize; +use thegraph_core::Attestation; + +use indexer_attestation::AttestationSigner; + +#[derive(Clone)] +pub enum AttestationInput { + Attestable { req: String }, + NotAttestable, +} + +#[derive(Debug, Serialize)] +#[cfg_attr(test, derive(serde::Deserialize))] +pub struct IndexerResponsePayload { + #[serde(rename = "graphQLResponse")] + graphql_response: String, + attestation: Option, +} + +/// Check if the query is attestable and generates attestation +/// +/// Executes query -> return subgraph response: (string, attestable (bool)) +/// if attestable && allocation id: +/// - look for signer +/// - create attestation +/// - return response with attestation +/// else: +/// - return with no attestation +/// +/// Requires AttestationSigner +pub async fn attestation_middleware( + request: Request, + next: Next, +) -> Result { + let signer = request + .extensions() + .get::() + .cloned() + .ok_or(AttestationError::CouldNotFindSigner)?; + + let (parts, graphql_response) = next.run(request).await.into_parts(); + let attestation_response = parts.extensions.get::(); + let bytes = to_bytes(graphql_response, usize::MAX).await?; + let res = String::from_utf8(bytes.into())?; + + let attestation = match attestation_response { + Some(AttestationInput::Attestable { req }) => Some(signer.create_attestation(req, &res)), + _ => None, + }; + + let response = serde_json::to_string(&IndexerResponsePayload { + graphql_response: res, + attestation, + })?; + + Ok(Response::new(response.into())) +} + +#[derive(thiserror::Error, Debug)] +pub enum AttestationError { + #[error("Could not find signer for allocation")] + CouldNotFindSigner, + + #[error("There was an AxumError: {0}")] + AxumError(#[from] axum::Error), + + #[error("There was an error converting the response to UTF-8 string: {0}")] + FromUtf8Error(#[from] FromUtf8Error), + + #[error("there was an error while serializing the response: {0}")] + SerializationError(#[from] serde_json::Error), +} + +impl IntoResponse for AttestationError { + fn into_response(self) -> Response { + match self { + AttestationError::CouldNotFindSigner + | AttestationError::AxumError(_) + | AttestationError::FromUtf8Error(_) + | AttestationError::SerializationError(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + .into_response() + } +} + +#[cfg(test)] +mod tests { + use alloy::primitives::Address; + use axum::{ + body::{to_bytes, Body}, + http::{Request, Response}, + middleware::from_fn, + routing::get, + Router, + }; + use indexer_allocation::Allocation; + use indexer_attestation::AttestationSigner; + use reqwest::StatusCode; + use test_assets::{INDEXER_ALLOCATIONS, INDEXER_MNEMONIC}; + use tower::ServiceExt; + + use crate::middleware::{ + attestation::IndexerResponsePayload, attestation_middleware, AttestationInput, + }; + + const REQUEST: &str = "request"; + const RESPONSE: &str = "response"; + + fn allocation_signer() -> (Allocation, AttestationSigner) { + let allocation = INDEXER_ALLOCATIONS + .values() + .collect::>() + .pop() + .unwrap() + .clone(); + let signer = + AttestationSigner::new(&INDEXER_MNEMONIC.to_string(), &allocation, 1, Address::ZERO) + .unwrap(); + (allocation, signer) + } + + async fn payload_from_response(res: Response) -> IndexerResponsePayload { + let bytes = to_bytes(res.into_body(), usize::MAX).await.unwrap(); + + serde_json::from_slice(&bytes).unwrap() + } + + async fn send_request(app: Router, signer: Option) -> Response { + let mut request = Request::builder().uri("/"); + + if let Some(signer) = signer { + request = request.extension(signer); + } + + app.oneshot(request.body(Body::empty()).unwrap()) + .await + .unwrap() + } + + #[tokio::test] + async fn test_create_attestation() { + let (allocation, signer) = allocation_signer(); + let middleware = from_fn(attestation_middleware); + + let handle = move |_: Request| async move { + let mut res = Response::new(RESPONSE.to_string()); + res.extensions_mut().insert(AttestationInput::Attestable { + req: REQUEST.to_string(), + }); + res + }; + + let app = Router::new().route("/", get(handle)).layer(middleware); + + // with signer + let res = send_request(app, Some(signer.clone())).await; + assert_eq!(res.status(), StatusCode::OK); + + let response = payload_from_response(res).await; + assert_eq!(response.graphql_response, RESPONSE.to_string()); + + let attestation = response.attestation.unwrap(); + assert!(signer + .verify(&attestation, REQUEST, RESPONSE, &allocation.id) + .is_ok()); + } + + #[tokio::test] + async fn test_non_assignable() { + let (_, signer) = allocation_signer(); + let handle = move |_: Request| async move { Response::new(RESPONSE.to_string()) }; + + let middleware = from_fn(attestation_middleware); + let app = Router::new().route("/", get(handle)).layer(middleware); + + let res = send_request(app, Some(signer.clone())).await; + assert_eq!(res.status(), StatusCode::OK); + + let response = payload_from_response(res).await; + assert_eq!(response.graphql_response, RESPONSE.to_string()); + assert!(response.attestation.is_none()); + } + + #[tokio::test] + async fn test_no_signer() { + let handle = move |_: Request| async move { + Response::new(RESPONSE.to_string()); + }; + + let middleware = from_fn(attestation_middleware); + let app = Router::new().route("/", get(handle)).layer(middleware); + + let res = send_request(app, None).await; + assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR); + } +} diff --git a/crates/service/src/middleware/attestation_signer.rs b/crates/service/src/middleware/attestation_signer.rs new file mode 100644 index 000000000..f859292eb --- /dev/null +++ b/crates/service/src/middleware/attestation_signer.rs @@ -0,0 +1,113 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use alloy::primitives::Address; +use axum::{ + extract::{Request, State}, + middleware::Next, + response::Response, +}; +use indexer_attestation::AttestationSigner; +use std::collections::HashMap; +use tokio::sync::watch; + +use super::Allocation; + +#[derive(Clone)] +pub struct AttestationState { + pub attestation_signers: watch::Receiver>, +} + +/// Injects the attestation signer to be used in the attestation +/// +/// Needs Allocation Extension +pub async fn signer_middleware( + State(state): State, + mut request: Request, + next: Next, +) -> Response { + if let Some(Allocation(allocation_id)) = request.extensions().get::() { + if let Some(signer) = state.attestation_signers.borrow().get(allocation_id) { + request.extensions_mut().insert(signer.clone()); + } + } + + next.run(request).await +} + +#[cfg(test)] +mod tests { + use crate::middleware::{allocation::Allocation, signer_middleware, AttestationState}; + + use axum::{body::Body, http::Request, middleware::from_fn_with_state, routing::get, Router}; + use indexer_attestation::AttestationSigner; + use indexer_monitor::attestation_signers; + use reqwest::StatusCode; + use test_assets::{DISPUTE_MANAGER_ADDRESS, INDEXER_ALLOCATIONS, INDEXER_MNEMONIC}; + use tokio::sync::{mpsc::channel, watch}; + use tower::Service; + + #[tokio::test] + async fn test_attestation_signer_middleware() { + let allocations = (*INDEXER_ALLOCATIONS).clone(); + + let allocation = **allocations.keys().collect::>().first().unwrap(); + + let (_, allocations_rx) = watch::channel(allocations); + let (_, dispute_manager_rx) = watch::channel(*DISPUTE_MANAGER_ADDRESS); + let attestation_signers = attestation_signers( + allocations_rx, + INDEXER_MNEMONIC.clone(), + 1, + dispute_manager_rx, + ); + + let expected_signer = attestation_signers + .borrow() + .get(&allocation) + .unwrap() + .clone(); + + let state = AttestationState { + attestation_signers, + }; + + let middleware = from_fn_with_state(state, signer_middleware); + + let (tx, mut rx) = channel(1); + + let handle = move |request: Request| async move { + tx.send(request).await.unwrap(); + Body::empty() + }; + + let mut app = Router::new().route("/", get(handle)).layer(middleware); + + // with allocation + let res = app + .call( + Request::builder() + .uri("/") + .extension(Allocation(allocation)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + + let req = rx.recv().await.unwrap(); + let signer = req.extensions().get::().unwrap(); + assert_eq!(*signer, expected_signer); + + // without allocation + let res = app + .call(Request::builder().uri("/").body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + + let req = rx.recv().await.unwrap(); + assert!(req.extensions().get::().is_none()); + } +} diff --git a/crates/service/src/middleware/inject_deployment.rs b/crates/service/src/middleware/deployment.rs similarity index 100% rename from crates/service/src/middleware/inject_deployment.rs rename to crates/service/src/middleware/deployment.rs diff --git a/crates/service/src/middleware/inject_labels.rs b/crates/service/src/middleware/labels.rs similarity index 95% rename from crates/service/src/middleware/inject_labels.rs rename to crates/service/src/middleware/labels.rs index 8b87db4a5..b5bde6059 100644 --- a/crates/service/src/middleware/inject_labels.rs +++ b/crates/service/src/middleware/labels.rs @@ -7,9 +7,9 @@ use axum::{extract::Request, middleware::Next, response::Response}; use thegraph_core::DeploymentId; use super::{ - inject_allocation::Allocation, - inject_sender::Sender, + allocation::Allocation, prometheus_metrics::{MetricLabelProvider, MetricLabels}, + sender::Sender, }; const NO_DEPLOYMENT_ID: &str = "no-deployment"; @@ -81,10 +81,10 @@ pub async fn labels_middleware(mut request: Request, next: Next) -> Response { #[cfg(test)] mod tests { use crate::middleware::{ - inject_allocation::Allocation, - inject_labels::{NO_ALLOCATION, NO_DEPLOYMENT_ID, NO_SENDER}, - inject_sender::Sender, + allocation::Allocation, + labels::{NO_ALLOCATION, NO_DEPLOYMENT_ID, NO_SENDER}, prometheus_metrics::MetricLabels, + sender::Sender, }; use super::labels_middleware; diff --git a/crates/service/src/middleware/inject_sender.rs b/crates/service/src/middleware/sender.rs similarity index 98% rename from crates/service/src/middleware/inject_sender.rs rename to crates/service/src/middleware/sender.rs index f5a014c43..080ce2c65 100644 --- a/crates/service/src/middleware/inject_sender.rs +++ b/crates/service/src/middleware/sender.rs @@ -58,7 +58,7 @@ pub async fn sender_middleware( #[cfg(test)] mod tests { - use crate::middleware::inject_sender::SenderState; + use crate::middleware::sender::SenderState; use super::{sender_middleware, Sender}; use alloy::primitives::Address; diff --git a/crates/service/src/middleware/inject_context.rs b/crates/service/src/middleware/tap_context.rs similarity index 98% rename from crates/service/src/middleware/inject_context.rs rename to crates/service/src/middleware/tap_context.rs index 26b73130d..dbdb28e13 100644 --- a/crates/service/src/middleware/inject_context.rs +++ b/crates/service/src/middleware/tap_context.rs @@ -78,7 +78,7 @@ mod tests { use tower::ServiceExt; use crate::{ - middleware::inject_context::{context_middleware, QueryBody}, + middleware::tap_context::{context_middleware, QueryBody}, tap::AgoraQuery, }; diff --git a/crates/service/src/middleware/inject_receipt.rs b/crates/service/src/middleware/tap_receipt.rs similarity index 96% rename from crates/service/src/middleware/inject_receipt.rs rename to crates/service/src/middleware/tap_receipt.rs index f8e0507c8..e10c1a97e 100644 --- a/crates/service/src/middleware/inject_receipt.rs +++ b/crates/service/src/middleware/tap_receipt.rs @@ -24,7 +24,7 @@ pub async fn receipt_middleware(mut request: Request, next: Next) -> Response { #[cfg(test)] mod tests { - use crate::{middleware::inject_receipt::receipt_middleware, service::TapReceipt}; + use crate::{middleware::tap_receipt::receipt_middleware, service::TapReceipt}; use alloy::primitives::Address; use axum::{ diff --git a/crates/service/src/routes/request_handler.rs b/crates/service/src/routes/request_handler.rs index 7ac2fad0d..ab9744e1f 100644 --- a/crates/service/src/routes/request_handler.rs +++ b/crates/service/src/routes/request_handler.rs @@ -1,53 +1,60 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::sync::Arc; - -use crate::{error::IndexerServiceError, middleware::Allocation}; +use crate::{ + error::SubgraphServiceError, middleware::AttestationInput, service::SubgraphServiceState, +}; use axum::{ extract::{Path, State}, + http::{HeaderValue, Response}, response::IntoResponse, - Extension, }; -use reqwest::StatusCode; +use reqwest::header::CONTENT_TYPE; use thegraph_core::DeploymentId; use tracing::trace; -use crate::service::{AttestationOutput, IndexerServiceResponse, IndexerServiceState}; +const GRAPH_ATTESTABLE: &str = "graph-attestable"; pub async fn request_handler( - Path(manifest_id): Path, - Extension(Allocation(allocation_id)): Extension, - State(state): State>, + Path(deployment): Path, + State(state): State, req: String, -) -> Result { - trace!("Handling request for deployment `{manifest_id}`"); +) -> Result { + trace!("Handling request for deployment `{deployment}`"); - // Check if we have an attestation signer for the allocation the receipt was created for - let signer = state - .attestation_signers - .borrow() - .get(&allocation_id) - .cloned() - .ok_or_else(|| (IndexerServiceError::NoSignerForAllocation(allocation_id)))?; + let deployment_url = state + .graph_node_query_base_url + .join(&format!("subgraphs/id/{deployment}")) + .map_err(|_| SubgraphServiceError::InvalidDeployment(deployment))?; let response = state - .service_impl - .process_request(manifest_id, &req) + .graph_node_client + .post(deployment_url) + .body(req.clone()) + .header(CONTENT_TYPE, HeaderValue::from_static("application/json")) + .send() .await - .map_err(IndexerServiceError::ProcessingError)?; + .map_err(SubgraphServiceError::QueryForwardingError)?; - let res = response - .as_str() - .map_err(|_| IndexerServiceError::FailedToSignAttestation)?; + let attestable = response + .headers() + .get(GRAPH_ATTESTABLE) + .map_or(false, |value| { + value.to_str().map(|value| value == "true").unwrap_or(false) + }); - let attestation = AttestationOutput::Attestation( - response - .is_attestable() - .then(|| signer.create_attestation(&req, res)), - ); + let body = response + .text() + .await + .map_err(SubgraphServiceError::QueryForwardingError)?; + let attestation_input = if attestable { + AttestationInput::Attestable { req } + } else { + AttestationInput::NotAttestable + }; - let response = response.finalize(attestation); + let mut response = Response::new(body); + response.extensions_mut().insert(attestation_input); - Ok((StatusCode::OK, response)) + Ok(response) } diff --git a/crates/service/src/service.rs b/crates/service/src/service.rs index 45ed9ae28..31eb1fe3f 100644 --- a/crates/service/src/service.rs +++ b/crates/service/src/service.rs @@ -4,21 +4,18 @@ use std::sync::Arc; use std::time::Duration; -use super::{error::SubgraphServiceError, routes}; +use super::routes; use anyhow::anyhow; use async_graphql::{EmptySubscription, Schema}; use async_graphql_axum::GraphQL; use axum::{ routing::{post, post_service}, - Json, Router, + Router, }; use indexer_config::{Config, DipsConfig}; use reqwest::Url; -use serde::{de::DeserializeOwned, Serialize}; -use serde_json::{json, Value}; use sqlx::PgPool; use thegraph_core::attestation::eip712_domain; -use thegraph_core::DeploymentId; use crate::{ cli::Cli, @@ -35,45 +32,8 @@ use tracing::error; mod indexer_service; mod tap_receipt_header; -pub use indexer_service::{AttestationOutput, IndexerServiceResponse, IndexerServiceState}; pub use tap_receipt_header::TapReceipt; -#[derive(Debug)] -pub struct SubgraphServiceResponse { - inner: String, - attestable: bool, -} - -impl SubgraphServiceResponse { - pub fn new(inner: String, attestable: bool) -> Self { - Self { inner, attestable } - } -} - -impl IndexerServiceResponse for SubgraphServiceResponse { - type Data = Json; - type Error = SubgraphServiceError; // not used - - fn is_attestable(&self) -> bool { - self.attestable - } - - fn as_str(&self) -> Result<&str, Self::Error> { - Ok(self.inner.as_str()) - } - - fn finalize(self, attestation: AttestationOutput) -> Self::Data { - let (attestation_key, attestation_value) = match attestation { - AttestationOutput::Attestation(attestation) => ("attestation", json!(attestation)), - AttestationOutput::Attestable => ("attestable", json!(self.is_attestable())), - }; - Json(json!({ - "graphQLResponse": self.inner, - attestation_key: attestation_value, - })) - } -} - #[derive(Clone)] pub struct SubgraphServiceState { pub config: &'static Config, @@ -84,53 +44,6 @@ pub struct SubgraphServiceState { pub graph_node_query_base_url: &'static Url, } -pub struct SubgraphService { - state: SubgraphServiceState, -} - -impl SubgraphService { - fn new(state: SubgraphServiceState) -> Self { - Self { state } - } -} - -impl SubgraphService { - pub async fn process_request( - &self, - deployment: DeploymentId, - request: &Request, - ) -> Result { - let deployment_url = self - .state - .graph_node_query_base_url - .join(&format!("subgraphs/id/{deployment}")) - .map_err(|_| SubgraphServiceError::InvalidDeployment(deployment))?; - - let response = self - .state - .graph_node_client - .post(deployment_url) - .json(request) - .send() - .await - .map_err(SubgraphServiceError::QueryForwardingError)?; - - let attestable = response - .headers() - .get("graph-attestable") - .map_or(false, |value| { - value.to_str().map(|value| value == "true").unwrap_or(false) - }); - - let body = response - .text() - .await - .map_err(SubgraphServiceError::QueryForwardingError)?; - - Ok(SubgraphServiceResponse::new(body, attestable)) - } -} - /// Run the subgraph indexer service pub async fn run() -> anyhow::Result<()> { // Parse command line and environment arguments @@ -211,7 +124,7 @@ pub async fn run() -> anyhow::Result<()> { release, config, url_namespace: "subgraphs", - service_impl: SubgraphService::new(state), + subgraph_state: state, extra_routes: router, }) .await diff --git a/crates/service/src/service/indexer_service.rs b/crates/service/src/service/indexer_service.rs index a8f8b3ca9..0c3baa6e2 100644 --- a/crates/service/src/service/indexer_service.rs +++ b/crates/service/src/service/indexer_service.rs @@ -6,12 +6,10 @@ use axum::{ extract::{MatchedPath, Request as ExtractRequest}, http::{Method, Request}, middleware::{from_fn, from_fn_with_state}, - response::IntoResponse, routing::{get, post}, serve, Json, Router, ServiceExt, }; use build_info::BuildInfo; -use indexer_attestation::AttestationSigner; use indexer_monitor::{ attestation_signers, deployment_to_allocation, dispute_manager, escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient, @@ -20,12 +18,9 @@ use prometheus::TextEncoder; use reqwest::StatusCode; use serde::Serialize; use sqlx::postgres::PgPoolOptions; -use std::{ - collections::HashMap, error::Error, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration, -}; +use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use tap_core::{manager::Manager, receipt::checks::CheckList, tap_eip712_domain}; -use thegraph_core::{Address, Attestation}; -use tokio::{net::TcpListener, signal, sync::watch::Receiver}; +use tokio::{net::TcpListener, signal}; use tower::ServiceBuilder; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tower_http::{ @@ -40,10 +35,11 @@ use tracing::{error, info, info_span, warn}; use crate::{ metrics::{FAILED_RECEIPT, HANDLER_FAILURE, HANDLER_HISTOGRAM}, middleware::{ - allocation_middleware, + allocation_middleware, attestation_middleware, auth::{self, Bearer, OrExt}, context_middleware, deployment_middleware, labels_middleware, receipt_middleware, - sender_middleware, AllocationState, PrometheusMetricsMiddlewareLayer, SenderState, + sender_middleware, signer_middleware, AllocationState, AttestationState, + PrometheusMetricsMiddlewareLayer, SenderState, }, routes::{health, request_handler, static_subgraph_request_handler}, tap::IndexerTapContext, @@ -51,21 +47,7 @@ use crate::{ }; use indexer_config::Config; -use super::SubgraphService; - -pub trait IndexerServiceResponse { - type Data: IntoResponse; - type Error: Error; - - fn is_attestable(&self) -> bool; - fn as_str(&self) -> Result<&str, Self::Error>; - fn finalize(self, attestation: AttestationOutput) -> Self::Data; -} - -pub enum AttestationOutput { - Attestation(Option), - Attestable, -} +use super::SubgraphServiceState; #[derive(Clone, Serialize)] pub struct IndexerServiceRelease { @@ -89,17 +71,11 @@ impl From<&BuildInfo> for IndexerServiceRelease { } pub struct IndexerServiceOptions { - pub service_impl: SubgraphService, + pub subgraph_state: SubgraphServiceState, pub config: &'static Config, pub release: IndexerServiceRelease, pub url_namespace: &'static str, - pub extra_routes: Router>, -} - -pub struct IndexerServiceState { - pub config: Config, - pub attestation_signers: Receiver>, - pub service_impl: SubgraphService, + pub extra_routes: Router, } const HTTP_CLIENT_TIMEOUT: Duration = Duration::from_secs(30); @@ -274,12 +250,6 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { CheckList::new(checks), ))); - let state = Arc::new(IndexerServiceState { - config: options.config.clone(), - attestation_signers, - service_impl: options.service_impl, - }); - // Rate limits by allowing bursts of 10 requests and requiring 100ms of // time between consecutive requests after that, effectively rate // limiting to 10 req/s. @@ -360,8 +330,6 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { } } - misc_routes = misc_routes.with_state(state.clone()); - let mut request_handler_route = post(request_handler); // inject auth @@ -386,6 +354,9 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { escrow_accounts, domain_separator, }; + let attestation_state = AttestationState { + attestation_signers, + }; let service_builder = ServiceBuilder::new() // inject deployment id @@ -404,7 +375,11 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { HANDLER_FAILURE.clone(), )) // tap context - .layer(from_fn(context_middleware)); + .layer(from_fn(context_middleware)) + // inject signer + .layer(from_fn_with_state(attestation_state, signer_middleware)) + // create attestation + .layer(from_fn(attestation_middleware)); request_handler_route = request_handler_route.layer(service_builder); @@ -416,7 +391,7 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { .expect("Failed to set up `/{url_namespace}/id/:id` route"), request_handler_route, ) - .with_state(state.clone()); + .with_state(options.subgraph_state.clone()); let router = NormalizePath::trim_trailing_slash( misc_routes @@ -452,7 +427,7 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { _span: &tracing::Span| {}, ), ) - .with_state(state), + .with_state(options.subgraph_state), ); serve_metrics(options.config.metrics.get_socket_addr());