diff --git a/crates/service/src/error.rs b/crates/service/src/error.rs index aef8e1d8f..eba52af12 100644 --- a/crates/service/src/error.rs +++ b/crates/service/src/error.rs @@ -1,6 +1,8 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use std::convert::Infallible; + use anyhow::Error; use axum::{ response::{IntoResponse, Response}, @@ -9,6 +11,8 @@ use axum::{ use indexer_monitor::EscrowAccountsError; use reqwest::StatusCode; use serde::Serialize; +use tap_core::receipt::ReceiptError; +use tap_core::Error as TapError; use thegraph_core::DeploymentId; use thiserror::Error; @@ -25,31 +29,37 @@ pub enum IndexerServiceError { SerializationError(#[from] serde_json::Error), #[error("Issues with provided receipt: {0}")] - ReceiptError(#[from] tap_core::Error), - + TapCoreError(#[from] tap_core::Error), #[error("There was an error while accessing escrow account: {0}")] EscrowAccount(#[from] EscrowAccountsError), } +impl StatusCodeExt for IndexerServiceError { + fn status_code(&self) -> StatusCode { + use IndexerServiceError as E; + match &self { + E::TapCoreError(ref error) => match error { + TapError::SignatureError(_) + | TapError::ReceiptError(ReceiptError::CheckFailure(_)) => StatusCode::BAD_REQUEST, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }, + E::EscrowAccount(_) | E::ReceiptNotFound => StatusCode::PAYMENT_REQUIRED, + E::DeploymentIdNotFound => StatusCode::INTERNAL_SERVER_ERROR, + E::AxumError(_) | E::SerializationError(_) => StatusCode::BAD_GATEWAY, + } + } +} + impl IntoResponse for IndexerServiceError { fn into_response(self) -> Response { - use IndexerServiceError::*; - #[derive(Serialize)] struct ErrorResponse { message: String, } - let status = match self { - ReceiptError(_) | EscrowAccount(_) => StatusCode::BAD_REQUEST, - ReceiptNotFound => StatusCode::PAYMENT_REQUIRED, - DeploymentIdNotFound => StatusCode::INTERNAL_SERVER_ERROR, - AxumError(_) => StatusCode::INTERNAL_SERVER_ERROR, - SerializationError(_) => StatusCode::BAD_REQUEST, - }; tracing::error!(%self, "An IndexerServiceError occoured."); ( - status, + self.status_code(), Json(ErrorResponse { message: self.to_string(), }), @@ -72,15 +82,14 @@ pub enum SubgraphServiceError { QueryForwardingError(reqwest::Error), } -impl From<&SubgraphServiceError> for StatusCode { - fn from(err: &SubgraphServiceError) -> Self { +impl StatusCodeExt for SubgraphServiceError { + fn status_code(&self) -> StatusCode { use SubgraphServiceError::*; - match err { - InvalidStatusQuery(_) => StatusCode::BAD_REQUEST, - UnsupportedStatusQueryFields(_) => StatusCode::BAD_REQUEST, - StatusQueryError(_) => StatusCode::INTERNAL_SERVER_ERROR, - InvalidDeployment(_) => StatusCode::BAD_REQUEST, - QueryForwardingError(_) => StatusCode::INTERNAL_SERVER_ERROR, + match self { + InvalidStatusQuery(_) | UnsupportedStatusQueryFields(_) => StatusCode::BAD_REQUEST, + InvalidDeployment(_) => StatusCode::INTERNAL_SERVER_ERROR, + StatusQueryError(_) => StatusCode::BAD_GATEWAY, + QueryForwardingError(_) => StatusCode::SERVICE_UNAVAILABLE, } } } @@ -88,6 +97,35 @@ impl From<&SubgraphServiceError> for StatusCode { // Tell axum how to convert `SubgraphServiceError` into a response. impl IntoResponse for SubgraphServiceError { fn into_response(self) -> Response { - (StatusCode::from(&self), self.to_string()).into_response() + (self.status_code(), self.to_string()).into_response() + } +} + +pub trait StatusCodeExt { + fn status_code(&self) -> StatusCode; +} + +impl StatusCodeExt for Response { + fn status_code(&self) -> StatusCode { + self.status() + } +} + +impl StatusCodeExt for Result +where + T: StatusCodeExt, + E: StatusCodeExt, +{ + fn status_code(&self) -> StatusCode { + match self { + Ok(t) => t.status_code(), + Err(e) => e.status_code(), + } + } +} + +impl StatusCodeExt for Infallible { + fn status_code(&self) -> StatusCode { + unreachable!() } } diff --git a/crates/service/src/metrics.rs b/crates/service/src/metrics.rs index 74fcce2ef..56fb04ba2 100644 --- a/crates/service/src/metrics.rs +++ b/crates/service/src/metrics.rs @@ -20,17 +20,7 @@ lazy_static! { pub static ref HANDLER_HISTOGRAM: HistogramVec = register_histogram_vec!( "indexer_query_handler_seconds", "Histogram for default indexer query handler", - &["deployment", "allocation", "sender"] - ).unwrap(); - - /// Metric registered in global registry for - /// Failed queries to handler - /// - /// Labels: "deployment" - pub static ref HANDLER_FAILURE: CounterVec = register_counter_vec!( - "indexer_query_handler_failed_total", - "Failed queries to handler", - &["deployment"] + &["deployment", "allocation", "sender", "status_code"] ).unwrap(); /// Metric registered in global registry for diff --git a/crates/service/src/middleware/attestation.rs b/crates/service/src/middleware/attestation.rs index 2185609ce..24d7ae88c 100644 --- a/crates/service/src/middleware/attestation.rs +++ b/crates/service/src/middleware/attestation.rs @@ -15,6 +15,8 @@ use thegraph_core::Attestation; use indexer_attestation::AttestationSigner; +use crate::error::StatusCodeExt; + #[derive(Clone)] pub enum AttestationInput { Attestable { req: String }, @@ -83,15 +85,20 @@ pub enum AttestationError { SerializationError(#[from] serde_json::Error), } -impl IntoResponse for AttestationError { - fn into_response(self) -> Response { +impl StatusCodeExt for AttestationError { + fn status_code(&self) -> StatusCode { match self { - AttestationError::CouldNotFindSigner - | AttestationError::AxumError(_) + AttestationError::CouldNotFindSigner => StatusCode::INTERNAL_SERVER_ERROR, + AttestationError::AxumError(_) | AttestationError::FromUtf8Error(_) - | AttestationError::SerializationError(_) => StatusCode::INTERNAL_SERVER_ERROR, + | AttestationError::SerializationError(_) => StatusCode::BAD_GATEWAY, } - .into_response() + } +} + +impl IntoResponse for AttestationError { + fn into_response(self) -> Response { + self.status_code().into_response() } } diff --git a/crates/service/src/middleware/labels.rs b/crates/service/src/middleware/labels.rs index b5bde6059..798d28bb4 100644 --- a/crates/service/src/middleware/labels.rs +++ b/crates/service/src/middleware/labels.rs @@ -71,8 +71,9 @@ pub async fn labels_middleware(mut request: Request, next: Next) -> Response { let labels: MetricLabels = Arc::new(SenderAllocationDeploymentLabels { sender, allocation, - deployment_id, + deployment_id: deployment_id.clone(), }); + request.extensions_mut().insert(labels); next.run(request).await diff --git a/crates/service/src/middleware/prometheus_metrics.rs b/crates/service/src/middleware/prometheus_metrics.rs index 409e01ba2..a16128fd2 100644 --- a/crates/service/src/middleware/prometheus_metrics.rs +++ b/crates/service/src/middleware/prometheus_metrics.rs @@ -5,15 +5,17 @@ use axum::http::Request; use pin_project::pin_project; -use prometheus::HistogramTimer; use std::{ future::Future, pin::Pin, sync::Arc, task::{Context, Poll}, + time::Instant, }; use tower::{Layer, Service}; +use crate::error::StatusCodeExt; + pub type MetricLabels = Arc; pub trait MetricLabelProvider { @@ -25,7 +27,6 @@ pub trait MetricLabelProvider { pub struct PrometheusMetricsMiddleware { inner: S, histogram: prometheus::HistogramVec, - failure: prometheus::CounterVec, } /// MetricsMiddleware used in tower components @@ -35,13 +36,11 @@ pub struct PrometheusMetricsMiddleware { pub struct PrometheusMetricsMiddlewareLayer { /// Histogram used to register the processing timer histogram: prometheus::HistogramVec, - /// Counter metric in case of failure - failure: prometheus::CounterVec, } impl PrometheusMetricsMiddlewareLayer { - pub fn new(histogram: prometheus::HistogramVec, failure: prometheus::CounterVec) -> Self { - Self { histogram, failure } + pub fn new(histogram: prometheus::HistogramVec) -> Self { + Self { histogram } } } @@ -52,7 +51,6 @@ impl Layer for PrometheusMetricsMiddlewareLayer { PrometheusMetricsMiddleware { inner, histogram: self.histogram.clone(), - failure: self.failure.clone(), } } } @@ -61,6 +59,7 @@ impl Service> for PrometheusMetricsMiddleware where S: Service> + Clone + 'static, ReqBody: 'static, + Result: StatusCodeExt, { type Response = S::Response; type Error = S::Error; @@ -75,7 +74,6 @@ where PrometheusMetricsFuture { timer: None, histogram: self.histogram.clone(), - failure: self.failure.clone(), labels, fut: self.inner.call(request), } @@ -85,46 +83,45 @@ where #[pin_project] pub struct PrometheusMetricsFuture { /// Instant at which we started the requst. - timer: Option, + timer: Option, histogram: prometheus::HistogramVec, - failure: prometheus::CounterVec, - labels: Option, #[pin] fut: F, } -impl Future for PrometheusMetricsFuture +impl Future for PrometheusMetricsFuture where - F: Future>, + F: Future, + T: StatusCodeExt, { type Output = F::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - let Some(labels) = &this.labels else { + let Some(labels) = this.labels else { return this.fut.poll(cx); }; if this.timer.is_none() { // Start timer so we can track duration of request. - let duration_metric = this.histogram.with_label_values(&labels.get_labels()); - *this.timer = Some(duration_metric.start_timer()); + *this.timer = Some(Instant::now()); } match this.fut.poll(cx) { Poll::Ready(result) => { - if result.is_err() { - let _ = this - .failure - .get_metric_with_label_values(&labels.get_labels()); - } + let status_code = result.status_code(); + // add status code + let mut labels = labels.get_labels(); + labels.push(status_code.as_str()); + let duration_metric = this.histogram.with_label_values(&labels); + // Record the duration of this request. - if let Some(timer) = this.timer.take() { - timer.observe_duration(); - } + let timer = this.timer.take().expect("timer should exist"); + duration_metric.observe(timer.elapsed().as_secs_f64()); + Poll::Ready(result) } Poll::Pending => Poll::Pending, @@ -141,9 +138,13 @@ mod tests { http::{Request, Response}, }; use prometheus::core::Collector; + use reqwest::StatusCode; use tower::{Service, ServiceBuilder, ServiceExt}; - use crate::middleware::prometheus_metrics::{MetricLabels, PrometheusMetricsMiddlewareLayer}; + use crate::{ + error::StatusCodeExt, + middleware::prometheus_metrics::{MetricLabels, PrometheusMetricsMiddlewareLayer}, + }; use super::MetricLabelProvider; @@ -153,12 +154,22 @@ mod tests { vec!["label1,", "label2", "label3"] } } - async fn handle(_: Request) -> anyhow::Result> { + + #[derive(Debug)] + struct ErrorResponse; + + impl StatusCodeExt for ErrorResponse { + fn status_code(&self) -> StatusCode { + StatusCode::INTERNAL_SERVER_ERROR + } + } + + async fn handle(_: Request) -> Result, ErrorResponse> { Ok(Response::new(Body::default())) } - async fn handle_err(_: Request) -> anyhow::Result> { - Err(anyhow::anyhow!("Error")) + async fn handle_err(_: Request) -> Result, ErrorResponse> { + Err(ErrorResponse) } #[tokio::test] @@ -167,36 +178,20 @@ mod tests { let histogram_metric = prometheus::register_histogram_vec_with_registry!( "histogram_metric", "Test", - &["deployment", "sender", "allocation"], - registry, - ) - .unwrap(); - - let failure_metric = prometheus::register_counter_vec_with_registry!( - "failure_metric", - "Test", - &["deployment", "sender", "allocation"], + &["deployment", "sender", "allocation", "status_code"], registry, ) .unwrap(); // check if everything is clean - assert_eq!( - histogram_metric - .collect() - .first() - .unwrap() - .get_metric() - .len(), - 0 - ); - assert_eq!( - failure_metric.collect().first().unwrap().get_metric().len(), - 0 - ); - - let metrics_layer = - PrometheusMetricsMiddlewareLayer::new(histogram_metric.clone(), failure_metric.clone()); + assert!(histogram_metric + .collect() + .first() + .unwrap() + .get_metric() + .is_empty()); + + let metrics_layer = PrometheusMetricsMiddlewareLayer::new(histogram_metric.clone()); let mut service = ServiceBuilder::new() .layer(metrics_layer) .service_fn(handle); @@ -209,23 +204,21 @@ mod tests { req.extensions_mut().insert(labels.clone()); let _ = handle.call(req).await; - assert_eq!( + let how_many_metrics = |status: u32| { histogram_metric .collect() .first() .unwrap() .get_metric() - .len(), - 1 - ); + .iter() + .filter(|a| a.get_label()[3].get_value() == status.to_string()) + .count() + }; - assert_eq!( - failure_metric.collect().first().unwrap().get_metric().len(), - 0 - ); + assert_eq!(how_many_metrics(200), 1); + assert_eq!(how_many_metrics(500), 0); - let metrics_layer = - PrometheusMetricsMiddlewareLayer::new(histogram_metric.clone(), failure_metric.clone()); + let metrics_layer = PrometheusMetricsMiddlewareLayer::new(histogram_metric.clone()); let mut service = ServiceBuilder::new() .layer(metrics_layer) .service_fn(handle_err); @@ -236,20 +229,7 @@ mod tests { let _ = handle.call(req).await; // it's using the same labels, should have only one metric - assert_eq!( - histogram_metric - .collect() - .first() - .unwrap() - .get_metric() - .len(), - 1 - ); - - // new failture - assert_eq!( - failure_metric.collect().first().unwrap().get_metric().len(), - 1 - ); + assert_eq!(how_many_metrics(200), 1); + assert_eq!(how_many_metrics(500), 1); } } diff --git a/crates/service/src/routes/static_subgraph.rs b/crates/service/src/routes/static_subgraph.rs index eab55c996..57dd00300 100644 --- a/crates/service/src/routes/static_subgraph.rs +++ b/crates/service/src/routes/static_subgraph.rs @@ -33,11 +33,20 @@ pub enum StaticSubgraphError { FailedToParse(#[from] reqwest::Error), } +impl From<&StaticSubgraphError> for StatusCode { + fn from(value: &StaticSubgraphError) -> Self { + match value { + StaticSubgraphError::FailedToQuery(_) => StatusCode::SERVICE_UNAVAILABLE, + StaticSubgraphError::FailedToParse(_) => StatusCode::BAD_GATEWAY, + } + } +} + impl IntoResponse for StaticSubgraphError { fn into_response(self) -> axum::response::Response { tracing::error!(%self, "StaticSubgraphError occoured."); ( - StatusCode::INTERNAL_SERVER_ERROR, + StatusCode::from(&self), Json(json! {{ "message": self.to_string(), }}), diff --git a/crates/service/src/service/router.rs b/crates/service/src/service/router.rs index 6166359bb..3ecd957ce 100644 --- a/crates/service/src/service/router.rs +++ b/crates/service/src/service/router.rs @@ -39,7 +39,7 @@ use typed_builder::TypedBuilder; use crate::{ database::dips::{AgreementStore, InMemoryAgreementStore}, - metrics::{FAILED_RECEIPT, HANDLER_FAILURE, HANDLER_HISTOGRAM}, + metrics::{FAILED_RECEIPT, HANDLER_HISTOGRAM}, middleware::{ allocation_middleware, attestation_middleware, auth::{self, Bearer, OrExt}, @@ -295,8 +295,18 @@ impl ServiceRouter { )) }; + let attestation_state = AttestationState { + attestation_signers, + }; + let mut handler = post(request_handler); + handler = handler + // create attestation + .route_layer(from_fn(attestation_middleware)) + // inject signer + .route_layer(from_fn_with_state(attestation_state, signer_middleware)); + // inject auth let failed_receipt_metric = Box::leak(Box::new(FAILED_RECEIPT.clone())); let tap_auth = auth::tap_receipt_authorize(tap_manager, failed_receipt_metric); @@ -319,9 +329,6 @@ impl ServiceRouter { escrow_accounts, domain_separator: self.domain_separator, }; - let attestation_state = AttestationState { - attestation_signers, - }; let service_builder = ServiceBuilder::new() // inject deployment id @@ -337,14 +344,9 @@ impl ServiceRouter { // metrics for histogram and failure .layer(PrometheusMetricsMiddlewareLayer::new( HANDLER_HISTOGRAM.clone(), - HANDLER_FAILURE.clone(), )) // tap context - .layer(from_fn(context_middleware)) - // inject signer - .layer(from_fn_with_state(attestation_state, signer_middleware)) - // create attestation - .layer(from_fn(attestation_middleware)); + .layer(from_fn(context_middleware)); handler.route_layer(service_builder) }; diff --git a/crates/service/tests/router_test.rs b/crates/service/tests/router_test.rs index 1c3ffe0d5..da4f0d301 100644 --- a/crates/service/tests/router_test.rs +++ b/crates/service/tests/router_test.rs @@ -18,7 +18,7 @@ use test_assets::{ create_signed_receipt, SignedReceiptRequest, INDEXER_ALLOCATIONS, TAP_EIP712_DOMAIN, }; use tokio::sync::watch; -use tower::ServiceExt; +use tower::Service; use wiremock::{ matchers::{method, path}, Mock, MockServer, ResponseTemplate, @@ -95,7 +95,7 @@ async fn full_integration_test(database: PgPool) { .allocations(allocations) .build(); - let app = router.create_router().await.unwrap(); + let mut app = router.create_router().await.unwrap(); let receipt = create_signed_receipt( SignedReceiptRequest::builder() @@ -118,7 +118,7 @@ async fn full_integration_test(database: PgPool) { .unwrap(); // with deployment - let res = app.oneshot(request).await.unwrap(); + let res = app.call(request).await.unwrap(); assert_eq!(res.status(), StatusCode::OK); let graphql_response = res.into_body(); @@ -126,4 +126,21 @@ async fn full_integration_test(database: PgPool) { let res = String::from_utf8(bytes.into()).unwrap(); insta::assert_snapshot!(res); + + let request = Request::builder() + .method(Method::POST) + .uri(format!("/subgraphs/id/{deployment}")) + .body(serde_json::to_string(&query).unwrap()) + .unwrap(); + + // without tap receipt + let res = app.call(request).await.unwrap(); + + assert_eq!(res.status(), StatusCode::PAYMENT_REQUIRED); + + let graphql_response = res.into_body(); + let bytes = to_bytes(graphql_response, usize::MAX).await.unwrap(); + let res = String::from_utf8(bytes.into()).unwrap(); + + insta::assert_snapshot!(res); } diff --git a/crates/service/tests/snapshots/router_test__full_integration_test-2.snap b/crates/service/tests/snapshots/router_test__full_integration_test-2.snap new file mode 100644 index 000000000..2c5ec1f87 --- /dev/null +++ b/crates/service/tests/snapshots/router_test__full_integration_test-2.snap @@ -0,0 +1,6 @@ +--- +source: crates/service/tests/router_test.rs +expression: res +snapshot_kind: text +--- +{"message":"No Tap receipt was found in the request"}