From 347ee761cca9952d7a54db72c2c81e33848ed33b Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 15:31:22 -0600 Subject: [PATCH 01/17] refactor: create inject_deployment middleware Signed-off-by: Gustavo Inacio --- Cargo.lock | 39 ++++++++++-- crates/service/Cargo.toml | 5 +- crates/service/src/lib.rs | 1 + crates/service/src/middleware.rs | 4 ++ .../src/middleware/inject_deployment.rs | 61 +++++++++++++++++++ 5 files changed, 104 insertions(+), 6 deletions(-) create mode 100644 crates/service/src/middleware.rs create mode 100644 crates/service/src/middleware/inject_deployment.rs diff --git a/Cargo.lock b/Cargo.lock index 63a473e12..71973302f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3532,7 +3532,7 @@ dependencies = [ [[package]] name = "indexer-monitor" -version = "1.4.0" +version = "0.1.0" dependencies = [ "alloy", "anyhow", @@ -3607,8 +3607,11 @@ dependencies = [ "thegraph-graphql-http", "thiserror", "tokio", + "tokio-test", "tokio-util", + "tower 0.5.1", "tower-http", + "tower-test", "tower_governor", "tracing", "tracing-subscriber", @@ -6940,6 +6943,19 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-tungstenite" version = "0.24.0" @@ -7075,16 +7091,15 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.5.2" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "bitflags 2.6.0", "bytes", "http 1.1.0", "http-body 1.0.1", - "http-body-util", "mime", "pin-project-lite", "tower-layer", @@ -7104,6 +7119,20 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" +[[package]] +name = "tower-test" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4546773ffeab9e4ea02b8872faa49bb616a80a7da66afc2f32688943f97efa7" +dependencies = [ + "futures-util", + "pin-project 1.1.6", + "tokio", + "tokio-test", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-util" version = "0.3.1" diff --git a/crates/service/Cargo.toml b/crates/service/Cargo.toml index 4a9c6f2f3..e977cd35c 100644 --- a/crates/service/Cargo.toml +++ b/crates/service/Cargo.toml @@ -40,7 +40,7 @@ tap_core.workspace = true uuid.workspace = true alloy.workspace = true tower_governor = "0.4.0" -tower-http = { version = "0.5.2", features = [ +tower-http = { version = "0.6.2", features = [ "auth", "cors", "normalize-path", @@ -57,6 +57,9 @@ bip39.workspace = true [dev-dependencies] hex-literal = "0.4.1" test-assets = { path = "../test-assets" } +tower-test = "0.4.0" +tower = "0.5.1" +tokio-test = "0.4.4" [build-dependencies] build-info-build = { version = "0.0.39", default-features = false } diff --git a/crates/service/src/lib.rs b/crates/service/src/lib.rs index ac967df4e..f51ac12bb 100644 --- a/crates/service/src/lib.rs +++ b/crates/service/src/lib.rs @@ -4,6 +4,7 @@ mod cli; mod database; mod error; +mod middleware; mod routes; pub mod service; mod tap; diff --git a/crates/service/src/middleware.rs b/crates/service/src/middleware.rs new file mode 100644 index 000000000..2ecd40877 --- /dev/null +++ b/crates/service/src/middleware.rs @@ -0,0 +1,4 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +mod inject_deployment; diff --git a/crates/service/src/middleware/inject_deployment.rs b/crates/service/src/middleware/inject_deployment.rs new file mode 100644 index 000000000..c9632ea0f --- /dev/null +++ b/crates/service/src/middleware/inject_deployment.rs @@ -0,0 +1,61 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use axum::{ + extract::{Path, Request}, + middleware::Next, + response::Response, + RequestExt, +}; +use thegraph_core::DeploymentId; + +pub async fn deployment_middleware(mut request: Request, next: Next) -> Response { + let deployment_id = request.extract_parts::>().await.ok(); + if let Some(Path(deployment_id)) = deployment_id { + request.extensions_mut().insert(deployment_id); + } + next.run(request).await +} + +#[cfg(test)] +mod tests { + use super::deployment_middleware; + use axum::{ + body::Body, + http::{Extensions, Request}, + middleware::from_fn, + routing::get, + Router, + }; + use reqwest::StatusCode; + use test_assets::ESCROW_SUBGRAPH_DEPLOYMENT; + use thegraph_core::DeploymentId; + use tower::ServiceExt; + + #[tokio::test] + async fn test_deployment_middleware() { + let middleware = from_fn(deployment_middleware); + + async fn handle(extensions: Extensions) -> Body { + extensions + .get::() + .expect("Should contain a deployment_id"); + Body::empty() + } + + let app = Router::new() + .route("/:deployment_id", get(handle)) + .layer(middleware); + + let res = app + .oneshot( + Request::builder() + .uri(format!("/{}", *ESCROW_SUBGRAPH_DEPLOYMENT)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } +} From 708092bd51f7170d52850337215210a39824cdd2 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 15:37:09 -0600 Subject: [PATCH 02/17] refactor: move indexer error to error.rs Signed-off-by: Gustavo Inacio --- crates/service/src/error.rs | 61 ++++++++++++++++- crates/service/src/routes/request_handler.rs | 4 +- crates/service/src/service.rs | 4 +- crates/service/src/service/indexer_service.rs | 65 +------------------ 4 files changed, 66 insertions(+), 68 deletions(-) diff --git a/crates/service/src/error.rs b/crates/service/src/error.rs index 586a7449b..f5596eb6c 100644 --- a/crates/service/src/error.rs +++ b/crates/service/src/error.rs @@ -1,12 +1,71 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use alloy::primitives::Address; use anyhow::Error; -use axum::response::{IntoResponse, Response}; +use axum::{response::{IntoResponse, Response}, Json}; +use indexer_monitor::EscrowAccountsError; use reqwest::StatusCode; +use serde::Serialize; use thegraph_core::DeploymentId; use thiserror::Error; +#[derive(Debug, Error)] +pub enum IndexerServiceError { + #[error("Issues with provided receipt: {0}")] + ReceiptError(tap_core::Error), + #[error("No attestation signer found for allocation `{0}`")] + NoSignerForAllocation(Address), + #[error("Invalid request body: {0}")] + InvalidRequest(anyhow::Error), + #[error("Error while processing the request: {0}")] + ProcessingError(SubgraphServiceError), + #[error("No valid receipt or free query auth token provided")] + Unauthorized, + #[error("Invalid free query auth token")] + InvalidFreeQueryAuthToken, + #[error("Failed to sign attestation")] + FailedToSignAttestation, + + #[error("Could not decode signer: {0}")] + CouldNotDecodeSigner(tap_core::Error), + + #[error("There was an error while accessing escrow account: {0}")] + EscrowAccount(EscrowAccountsError), +} + +impl IntoResponse for IndexerServiceError { + fn into_response(self) -> Response { + use IndexerServiceError::*; + + #[derive(Serialize)] + struct ErrorResponse { + message: String, + } + + let status = match self { + Unauthorized => StatusCode::UNAUTHORIZED, + + NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR, + + ReceiptError(_) + | InvalidRequest(_) + | InvalidFreeQueryAuthToken + | CouldNotDecodeSigner(_) + | EscrowAccount(_) + | ProcessingError(_) => StatusCode::BAD_REQUEST, + }; + tracing::error!(%self, "An IndexerServiceError occoured."); + ( + status, + Json(ErrorResponse { + message: self.to_string(), + }), + ) + .into_response() + } +} + #[derive(Debug, Error)] pub enum SubgraphServiceError { #[error("Invalid status query: {0}")] diff --git a/crates/service/src/routes/request_handler.rs b/crates/service/src/routes/request_handler.rs index 924830521..311188d08 100644 --- a/crates/service/src/routes/request_handler.rs +++ b/crates/service/src/routes/request_handler.rs @@ -3,7 +3,7 @@ use std::sync::Arc; -use crate::tap::AgoraQuery; +use crate::{error::IndexerServiceError, tap::AgoraQuery}; use axum::{ extract::{Path, State}, http::HeaderMap, @@ -19,7 +19,7 @@ use thegraph_core::DeploymentId; use tracing::trace; use crate::service::{ - AttestationOutput, IndexerServiceError, IndexerServiceResponse, IndexerServiceState, TapReceipt, + AttestationOutput, IndexerServiceResponse, IndexerServiceState, TapReceipt, }; lazy_static! { diff --git a/crates/service/src/service.rs b/crates/service/src/service.rs index 0e13d64e2..0a59cc5aa 100644 --- a/crates/service/src/service.rs +++ b/crates/service/src/service.rs @@ -35,9 +35,7 @@ use tracing::error; mod indexer_service; mod tap_receipt_header; -pub use indexer_service::{ - AttestationOutput, IndexerServiceError, IndexerServiceResponse, IndexerServiceState, -}; +pub use indexer_service::{AttestationOutput, IndexerServiceResponse, IndexerServiceState}; pub use tap_receipt_header::TapReceipt; #[derive(Debug)] diff --git a/crates/service/src/service/indexer_service.rs b/crates/service/src/service/indexer_service.rs index ec05ebeb7..408905850 100644 --- a/crates/service/src/service/indexer_service.rs +++ b/crates/service/src/service/indexer_service.rs @@ -7,7 +7,7 @@ use axum::extract::MatchedPath; use axum::extract::Request as ExtractRequest; use axum::http::{Method, Request}; use axum::{ - response::{IntoResponse, Response}, + response::IntoResponse, routing::{get, post}, Json, Router, }; @@ -16,19 +16,17 @@ use build_info::BuildInfo; use indexer_attestation::AttestationSigner; use indexer_monitor::{ attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, DeploymentDetails, - EscrowAccounts, EscrowAccountsError, SubgraphClient, + EscrowAccounts, SubgraphClient, }; use prometheus::TextEncoder; use reqwest::StatusCode; use serde::Serialize; use sqlx::postgres::PgPoolOptions; use std::{ - collections::HashMap, error::Error, fmt::Debug, net::SocketAddr, path::PathBuf, sync::Arc, - time::Duration, + collections::HashMap, error::Error, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration, }; use tap_core::{manager::Manager, receipt::checks::CheckList, tap_eip712_domain}; use thegraph_core::{Address, Attestation}; -use thiserror::Error; use tokio::net::TcpListener; use tokio::signal; use tokio::sync::watch::Receiver; @@ -38,7 +36,6 @@ use tower_http::{cors, cors::CorsLayer, normalize_path::NormalizePath, trace::Tr use tracing::warn; use tracing::{error, info, info_span}; -use crate::error::SubgraphServiceError; use crate::routes::health; use crate::routes::request_handler; use crate::routes::static_subgraph_request_handler; @@ -62,62 +59,6 @@ pub enum AttestationOutput { Attestable, } -#[derive(Debug, Error)] -pub enum IndexerServiceError { - #[error("Issues with provided receipt: {0}")] - ReceiptError(tap_core::Error), - #[error("No attestation signer found for allocation `{0}`")] - NoSignerForAllocation(Address), - #[error("Invalid request body: {0}")] - InvalidRequest(anyhow::Error), - #[error("Error while processing the request: {0}")] - ProcessingError(SubgraphServiceError), - #[error("No valid receipt or free query auth token provided")] - Unauthorized, - #[error("Invalid free query auth token")] - InvalidFreeQueryAuthToken, - #[error("Failed to sign attestation")] - FailedToSignAttestation, - - #[error("Could not decode signer: {0}")] - CouldNotDecodeSigner(tap_core::Error), - - #[error("There was an error while accessing escrow account: {0}")] - EscrowAccount(EscrowAccountsError), -} - -impl IntoResponse for IndexerServiceError { - fn into_response(self) -> Response { - use IndexerServiceError::*; - - #[derive(Serialize)] - struct ErrorResponse { - message: String, - } - - let status = match self { - Unauthorized => StatusCode::UNAUTHORIZED, - - NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR, - - ReceiptError(_) - | InvalidRequest(_) - | InvalidFreeQueryAuthToken - | CouldNotDecodeSigner(_) - | EscrowAccount(_) - | ProcessingError(_) => StatusCode::BAD_REQUEST, - }; - tracing::error!(%self, "An IndexerServiceError occoured."); - ( - status, - Json(ErrorResponse { - message: self.to_string(), - }), - ) - .into_response() - } -} - #[derive(Clone, Serialize)] pub struct IndexerServiceRelease { version: String, From f4e461ddabb0966194cfe4c52268a5a834c9c22a Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 15:52:00 -0600 Subject: [PATCH 03/17] refactor: add inject receipt middleware Signed-off-by: Gustavo Inacio --- crates/service/src/middleware.rs | 1 + .../service/src/middleware/inject_receipt.rs | 63 +++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 crates/service/src/middleware/inject_receipt.rs diff --git a/crates/service/src/middleware.rs b/crates/service/src/middleware.rs index 2ecd40877..d725f1b2a 100644 --- a/crates/service/src/middleware.rs +++ b/crates/service/src/middleware.rs @@ -2,3 +2,4 @@ // SPDX-License-Identifier: Apache-2.0 mod inject_deployment; +mod inject_receipt; diff --git a/crates/service/src/middleware/inject_receipt.rs b/crates/service/src/middleware/inject_receipt.rs new file mode 100644 index 000000000..7396f5d03 --- /dev/null +++ b/crates/service/src/middleware/inject_receipt.rs @@ -0,0 +1,63 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use axum::{extract::Request, middleware::Next, response::Response, RequestExt}; +use axum_extra::TypedHeader; + +use crate::service::TapReceipt; + +pub async fn receipt_middleware(mut request: Request, next: Next) -> Response { + if let Ok(TypedHeader(receipt)) = request.extract_parts::>().await { + if let Some(receipt) = receipt.into_signed_receipt() { + request.extensions_mut().insert(receipt); + } + } + next.run(request).await +} + +#[cfg(test)] +mod tests { + use crate::{middleware::inject_receipt::receipt_middleware, service::TapReceipt}; + + use alloy::primitives::Address; + use axum::{ + body::Body, + http::{Extensions, Request}, + middleware::from_fn, + routing::get, + Router, + }; + use axum_extra::headers::Header; + use reqwest::StatusCode; + use tap_core::receipt::SignedReceipt; + use test_assets::create_signed_receipt; + use tower::ServiceExt; + + #[tokio::test] + async fn test_receipt_middleware() { + let middleware = from_fn(receipt_middleware); + + async fn handle(extensions: Extensions) -> Body { + extensions + .get::() + .expect("Should decode tap receipt"); + Body::empty() + } + + let app = Router::new().route("/", get(handle)).layer(middleware); + + let receipt = create_signed_receipt(Address::ZERO, 1, 1, 1).await; + + let res = app + .oneshot( + Request::builder() + .uri("/") + .header(TapReceipt::name(), serde_json::to_string(&receipt).unwrap()) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } +} From 0475bc2f22355fea8394a255e7301c89aed433df Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 15:52:33 -0600 Subject: [PATCH 04/17] refactor: add inject sender middleware Signed-off-by: Gustavo Inacio --- crates/service/src/error.rs | 9 +- crates/service/src/middleware.rs | 1 + .../service/src/middleware/inject_sender.rs | 103 ++++++++++++++++++ 3 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 crates/service/src/middleware/inject_sender.rs diff --git a/crates/service/src/error.rs b/crates/service/src/error.rs index f5596eb6c..4faea4ef2 100644 --- a/crates/service/src/error.rs +++ b/crates/service/src/error.rs @@ -3,7 +3,10 @@ use alloy::primitives::Address; use anyhow::Error; -use axum::{response::{IntoResponse, Response}, Json}; +use axum::{ + response::{IntoResponse, Response}, + Json, +}; use indexer_monitor::EscrowAccountsError; use reqwest::StatusCode; use serde::Serialize; @@ -13,7 +16,7 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum IndexerServiceError { #[error("Issues with provided receipt: {0}")] - ReceiptError(tap_core::Error), + ReceiptError(#[from] tap_core::Error), #[error("No attestation signer found for allocation `{0}`")] NoSignerForAllocation(Address), #[error("Invalid request body: {0}")] @@ -31,7 +34,7 @@ pub enum IndexerServiceError { CouldNotDecodeSigner(tap_core::Error), #[error("There was an error while accessing escrow account: {0}")] - EscrowAccount(EscrowAccountsError), + EscrowAccount(#[from] EscrowAccountsError), } impl IntoResponse for IndexerServiceError { diff --git a/crates/service/src/middleware.rs b/crates/service/src/middleware.rs index d725f1b2a..267fe62b3 100644 --- a/crates/service/src/middleware.rs +++ b/crates/service/src/middleware.rs @@ -3,3 +3,4 @@ mod inject_deployment; mod inject_receipt; +mod inject_sender; diff --git a/crates/service/src/middleware/inject_sender.rs b/crates/service/src/middleware/inject_sender.rs new file mode 100644 index 000000000..1ab119476 --- /dev/null +++ b/crates/service/src/middleware/inject_sender.rs @@ -0,0 +1,103 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use alloy::dyn_abi::Eip712Domain; +use axum::{ + extract::{Request, State}, + middleware::Next, + response::Response, +}; +use indexer_monitor::EscrowAccounts; +use tap_core::receipt::SignedReceipt; +use tokio::sync::watch; + +use crate::error::IndexerServiceError; + +#[derive(Clone)] +pub struct SenderState { + pub domain_separator: Eip712Domain, + pub escrow_accounts: watch::Receiver, +} + +#[derive(Clone)] +pub struct Sender(String); + +impl From for String { + fn from(value: Sender) -> Self { + value.0 + } +} + +pub async fn sender_middleware( + State(state): State, + mut request: Request, + next: Next, +) -> Result { + if let Some(receipt) = request.extensions().get::() { + let signer = receipt.recover_signer(&state.domain_separator)?; + let sender = state + .escrow_accounts + .borrow() + .get_sender_for_signer(&signer)?; + request.extensions_mut().insert(Sender(sender.to_string())); + } + + Ok(next.run(request).await) +} + +#[cfg(test)] +mod tests { + use crate::middleware::inject_sender::SenderState; + + use super::{sender_middleware, Sender}; + use alloy::primitives::Address; + use axum::{ + body::Body, + http::{Extensions, Request}, + middleware::from_fn_with_state, + routing::get, + Router, + }; + use indexer_monitor::EscrowAccounts; + use reqwest::StatusCode; + use test_assets::{create_signed_receipt, ESCROW_ACCOUNTS_BALANCES, ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS}; + use tokio::sync::watch; + use tower::ServiceExt; + + #[tokio::test] + async fn test_sender_middleware() { + let escrow_accounts = watch::channel(EscrowAccounts::new( + ESCROW_ACCOUNTS_BALANCES.to_owned(), + ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + )) + .1; + let state = SenderState { + domain_separator: test_assets::TAP_EIP712_DOMAIN.clone(), + escrow_accounts, + }; + + let middleware = from_fn_with_state(state, sender_middleware); + + async fn handle(extensions: Extensions) -> Body { + let sender = extensions.get::().expect("Should contain sender"); + assert_eq!(sender.0, test_assets::TAP_SENDER.1.to_string()); + Body::empty() + } + + let app = Router::new().route("/", get(handle)).layer(middleware); + + let receipt = create_signed_receipt(Address::ZERO, 1, 1, 1).await; + + let res = app + .oneshot( + Request::builder() + .uri("/") + .extension(receipt) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } +} From 9e5eb0f723516369987dc2df3bb3fa2a7c3c2e6e Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 16:03:20 -0600 Subject: [PATCH 05/17] refactor: add inject allocation middleware Signed-off-by: Gustavo Inacio --- crates/service/src/middleware.rs | 1 + .../src/middleware/inject_allocation.rs | 127 ++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 crates/service/src/middleware/inject_allocation.rs diff --git a/crates/service/src/middleware.rs b/crates/service/src/middleware.rs index 267fe62b3..529ef8331 100644 --- a/crates/service/src/middleware.rs +++ b/crates/service/src/middleware.rs @@ -1,6 +1,7 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +mod inject_allocation; mod inject_deployment; mod inject_receipt; mod inject_sender; diff --git a/crates/service/src/middleware/inject_allocation.rs b/crates/service/src/middleware/inject_allocation.rs new file mode 100644 index 000000000..802e56a8a --- /dev/null +++ b/crates/service/src/middleware/inject_allocation.rs @@ -0,0 +1,127 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +//! injects allocation id in extensions +//! - check if allocation id already exists +//! - else, try to fetch allocation id from deployment_id and allocations watcher +//! - execute query +//! +//! Needs signed receipt Extension to be added OR deployment id + +use std::collections::HashMap; + +use alloy::primitives::Address; +use axum::{ + extract::{Request, State}, + middleware::Next, + response::Response, +}; +use tap_core::receipt::SignedReceipt; +use thegraph_core::DeploymentId; +use tokio::sync::watch; + +#[derive(Clone)] +pub struct Allocation(pub Address); + +impl From for String { + fn from(value: Allocation) -> Self { + value.0.to_string() + } +} + +#[derive(Clone)] +pub struct AllocationState { + pub deployment_to_allocation: watch::Receiver>, +} + +pub async fn allocation_middleware( + State(my_state): State, + mut request: Request, + next: Next, +) -> Response { + if let Some(receipt) = request.extensions().get::() { + let allocation = receipt.message.allocation_id; + request.extensions_mut().insert(Allocation(allocation)); + } else if let Some(deployment_id) = request.extensions().get::() { + if let Some(allocation) = my_state + .deployment_to_allocation + .borrow() + .get(deployment_id) + { + request.extensions_mut().insert(Allocation(*allocation)); + } + } + + next.run(request).await +} + +#[cfg(test)] +mod tests { + use crate::middleware::inject_allocation::Allocation; + + use super::{allocation_middleware, AllocationState}; + + use alloy::primitives::Address; + use axum::{ + body::Body, + http::{Extensions, Request}, + middleware::from_fn_with_state, + routing::get, + Router, + }; + use reqwest::StatusCode; + use test_assets::{create_signed_receipt, ESCROW_SUBGRAPH_DEPLOYMENT}; + use tokio::sync::watch; + use tower::ServiceExt; + + #[tokio::test] + async fn test_allocation_middleware() { + let deployment = *ESCROW_SUBGRAPH_DEPLOYMENT; + let deployment_to_allocation = + watch::channel(vec![(deployment, Address::ZERO)].into_iter().collect()).1; + let state = AllocationState { + deployment_to_allocation, + }; + + let middleware = from_fn_with_state(state, allocation_middleware); + + async fn handle(extensions: Extensions) -> Body { + let allocation = extensions + .get::() + .expect("Should contain allocation"); + assert_eq!(allocation.0, Address::ZERO); + Body::empty() + } + + let app = Router::new().route("/", get(handle)).layer(middleware); + + let receipt = create_signed_receipt(Address::ZERO, 1, 1, 1).await; + + // with receipt + let res = app + .clone() + .oneshot( + Request::builder() + .uri("/") + .extension(receipt) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + + // with deployment + let res = app + .oneshot( + Request::builder() + .uri("/") + .extension(deployment) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } +} From 05a4ecaeea0cab74f75655ce1a00a7ff8e8bf95d Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 16:15:55 -0600 Subject: [PATCH 06/17] refactor: add inject labels middleware Signed-off-by: Gustavo Inacio --- crates/service/src/middleware.rs | 2 + .../service/src/middleware/inject_labels.rs | 137 ++++++++++++++++++ .../service/src/middleware/inject_sender.rs | 14 +- crates/service/src/middleware/metrics.rs | 8 + 4 files changed, 155 insertions(+), 6 deletions(-) create mode 100644 crates/service/src/middleware/inject_labels.rs create mode 100644 crates/service/src/middleware/metrics.rs diff --git a/crates/service/src/middleware.rs b/crates/service/src/middleware.rs index 529ef8331..4065d7d04 100644 --- a/crates/service/src/middleware.rs +++ b/crates/service/src/middleware.rs @@ -3,5 +3,7 @@ mod inject_allocation; mod inject_deployment; +mod inject_labels; mod inject_receipt; mod inject_sender; +mod metrics; diff --git a/crates/service/src/middleware/inject_labels.rs b/crates/service/src/middleware/inject_labels.rs new file mode 100644 index 000000000..24439b0d8 --- /dev/null +++ b/crates/service/src/middleware/inject_labels.rs @@ -0,0 +1,137 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +//! Injects Metric Labels +//! +//! Require Sender, Allocation and Deployment extensions + +use std::sync::Arc; + +use axum::{extract::Request, middleware::Next, response::Response}; +use thegraph_core::DeploymentId; + +use super::{ + inject_allocation::Allocation, + inject_sender::Sender, + metrics::{MetricLabelProvider, MetricLabels}, +}; + +const NO_DEPLOYMENT_ID: &str = "no-deployment"; +const NO_ALLOCATION: &str = "no-allocation"; +const NO_SENDER: &str = "no-sender"; + +#[derive(Clone, Default)] +pub struct SenderAllocationDeploymentLabels { + sender: Option, + allocation: Option, + deployment_id: Option, +} + +impl MetricLabelProvider for SenderAllocationDeploymentLabels { + fn get_labels(&self) -> Vec<&str> { + let mut list = vec![]; + if let Some(deployment_id) = &self.deployment_id { + list.push(deployment_id.as_str()); + } else { + list.push(NO_DEPLOYMENT_ID); + } + if let Some(allocation) = &self.allocation { + list.push(allocation.as_str()); + } else { + list.push(NO_ALLOCATION); + } + if let Some(sender) = &self.sender { + list.push(sender.as_str()); + } else { + list.push(NO_SENDER); + } + list + } +} + +pub async fn labels_middleware(mut request: Request, next: Next) -> Response { + let sender: Option = request + .extensions() + .get::() + .map(|s| s.clone().into()); + + let allocation: Option = request + .extensions() + .get::() + .map(|s| s.clone().into()); + + let deployment_id: Option = request + .extensions() + .get::() + .map(|s| s.clone().to_string()); + + let labels: MetricLabels = Arc::new(SenderAllocationDeploymentLabels { + sender, + allocation, + deployment_id, + }); + request.extensions_mut().insert(labels); + + next.run(request).await +} + +#[cfg(test)] +mod tests { + use crate::middleware::{ + inject_allocation::Allocation, inject_sender::Sender, metrics::MetricLabels, + }; + + use super::labels_middleware; + + use alloy::primitives::Address; + use axum::{ + body::Body, + http::{Extensions, Request}, + middleware::from_fn, + routing::get, + Router, + }; + use reqwest::StatusCode; + use test_assets::ESCROW_SUBGRAPH_DEPLOYMENT; + use tower::ServiceExt; + + #[tokio::test] + async fn test_receipt_middleware() { + let middleware = from_fn(labels_middleware); + + let deployment = *ESCROW_SUBGRAPH_DEPLOYMENT; + let sender = Address::ZERO; + let allocation = Address::ZERO; + + let handle = move |extensions: Extensions| async move { + let metrics = extensions + .get::() + .expect("Should decode tap receipt"); + assert_eq!( + metrics.get_labels(), + vec![ + &deployment.to_string(), + &allocation.to_string(), + &sender.to_string(), + ] + ); + Body::empty() + }; + + let app = Router::new().route("/", get(handle)).layer(middleware); + + let res = app + .oneshot( + Request::builder() + .uri("/") + .extension(Sender(sender)) + .extension(deployment) + .extension(Allocation(allocation)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } +} diff --git a/crates/service/src/middleware/inject_sender.rs b/crates/service/src/middleware/inject_sender.rs index 1ab119476..195bdc049 100644 --- a/crates/service/src/middleware/inject_sender.rs +++ b/crates/service/src/middleware/inject_sender.rs @@ -1,7 +1,7 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use alloy::dyn_abi::Eip712Domain; +use alloy::{dyn_abi::Eip712Domain, primitives::Address}; use axum::{ extract::{Request, State}, middleware::Next, @@ -20,11 +20,11 @@ pub struct SenderState { } #[derive(Clone)] -pub struct Sender(String); +pub struct Sender(pub Address); impl From for String { fn from(value: Sender) -> Self { - value.0 + value.0.to_string() } } @@ -39,7 +39,7 @@ pub async fn sender_middleware( .escrow_accounts .borrow() .get_sender_for_signer(&signer)?; - request.extensions_mut().insert(Sender(sender.to_string())); + request.extensions_mut().insert(Sender(sender)); } Ok(next.run(request).await) @@ -60,7 +60,9 @@ mod tests { }; use indexer_monitor::EscrowAccounts; use reqwest::StatusCode; - use test_assets::{create_signed_receipt, ESCROW_ACCOUNTS_BALANCES, ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS}; + use test_assets::{ + create_signed_receipt, ESCROW_ACCOUNTS_BALANCES, ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS, + }; use tokio::sync::watch; use tower::ServiceExt; @@ -80,7 +82,7 @@ mod tests { async fn handle(extensions: Extensions) -> Body { let sender = extensions.get::().expect("Should contain sender"); - assert_eq!(sender.0, test_assets::TAP_SENDER.1.to_string()); + assert_eq!(sender.0, test_assets::TAP_SENDER.1); Body::empty() } diff --git a/crates/service/src/middleware/metrics.rs b/crates/service/src/middleware/metrics.rs new file mode 100644 index 000000000..de0c47845 --- /dev/null +++ b/crates/service/src/middleware/metrics.rs @@ -0,0 +1,8 @@ +use std::sync::Arc; + + +pub type MetricLabels = Arc; + +pub trait MetricLabelProvider { + fn get_labels(&self) -> Vec<&str>; +} From 2a1995da9bf0882eacd2f51e48a96973daf5928d Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 16:20:31 -0600 Subject: [PATCH 07/17] refactor: add metrics middleware Signed-off-by: Gustavo Inacio --- Cargo.lock | 27 +-- crates/service/Cargo.toml | 3 +- crates/service/src/middleware/metrics.rs | 243 ++++++++++++++++++++++- 3 files changed, 258 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 71973302f..66b2c2d02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -342,7 +342,7 @@ dependencies = [ "futures-utils-wasm", "lru", "parking_lot", - "pin-project 1.1.6", + "pin-project 1.1.7", "reqwest 0.12.9", "schnellru", "serde", @@ -409,7 +409,7 @@ dependencies = [ "alloy-transport-ipc", "alloy-transport-ws", "futures", - "pin-project 1.1.6", + "pin-project 1.1.7", "reqwest 0.12.9", "serde", "serde_json", @@ -690,7 +690,7 @@ dependencies = [ "bytes", "futures", "interprocess", - "pin-project 1.1.6", + "pin-project 1.1.7", "serde_json", "tokio", "tokio-util", @@ -3596,6 +3596,7 @@ dependencies = [ "indexer-monitor", "indexer-query", "lazy_static", + "pin-project 1.1.7", "prometheus", "reqwest 0.12.9", "serde", @@ -3903,7 +3904,7 @@ dependencies = [ "hyper-util", "jsonrpsee-core", "jsonrpsee-types", - "pin-project 1.1.6", + "pin-project 1.1.7", "route-recognizer", "serde", "serde_json", @@ -4746,11 +4747,11 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.6" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf123a161dde1e524adf36f90bc5d8d3462824a9c43553ad07a8183161189ec" +checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" dependencies = [ - "pin-project-internal 1.1.6", + "pin-project-internal 1.1.7", ] [[package]] @@ -4766,9 +4767,9 @@ dependencies = [ [[package]] name = "pin-project-internal" -version = "1.1.6" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4502d8515ca9f32f1fb543d987f63d95a14934883db45bdb48060b6b69257f8" +checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", @@ -7039,7 +7040,7 @@ dependencies = [ "hyper-timeout", "hyper-util", "percent-encoding", - "pin-project 1.1.6", + "pin-project 1.1.7", "prost", "rustls-native-certs 0.8.0", "rustls-pemfile 2.2.0", @@ -7062,7 +7063,7 @@ dependencies = [ "futures-core", "futures-util", "indexmap 1.9.3", - "pin-project 1.1.6", + "pin-project 1.1.7", "pin-project-lite", "rand 0.8.5", "slab", @@ -7126,7 +7127,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4546773ffeab9e4ea02b8872faa49bb616a80a7da66afc2f32688943f97efa7" dependencies = [ "futures-util", - "pin-project 1.1.6", + "pin-project 1.1.7", "tokio", "tokio-test", "tower-layer", @@ -7155,7 +7156,7 @@ dependencies = [ "forwarded-header-value", "governor", "http 1.1.0", - "pin-project 1.1.6", + "pin-project 1.1.7", "thiserror", "tower 0.5.1", "tracing", diff --git a/crates/service/Cargo.toml b/crates/service/Cargo.toml index e977cd35c..1bfa00a99 100644 --- a/crates/service/Cargo.toml +++ b/crates/service/Cargo.toml @@ -53,12 +53,13 @@ axum-extra = { version = "0.9.3", features = [ tokio-util = "0.7.10" cost-model = { git = "https://github.com/graphprotocol/agora", rev = "3ed34ca" } bip39.workspace = true +tower = "0.5.1" +pin-project = "1.1.7" [dev-dependencies] hex-literal = "0.4.1" test-assets = { path = "../test-assets" } tower-test = "0.4.0" -tower = "0.5.1" tokio-test = "0.4.4" [build-dependencies] diff --git a/crates/service/src/middleware/metrics.rs b/crates/service/src/middleware/metrics.rs index de0c47845..6941cfbef 100644 --- a/crates/service/src/middleware/metrics.rs +++ b/crates/service/src/middleware/metrics.rs @@ -1,8 +1,249 @@ -use std::sync::Arc; +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 +//! update metrics in case it succeeds or fails + +use axum::http::Request; +use pin_project::pin_project; +use prometheus::HistogramTimer; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tower::{Layer, Service}; pub type MetricLabels = Arc; pub trait MetricLabelProvider { fn get_labels(&self) -> Vec<&str>; } + +#[derive(Clone)] +pub struct MetricsMiddleware { + inner: S, + histogram: prometheus::HistogramVec, + failure: prometheus::CounterVec, +} + +#[derive(Clone)] +pub struct MetricsMiddlewareLayer { + histogram: prometheus::HistogramVec, + failure: prometheus::CounterVec, +} + +impl MetricsMiddlewareLayer { + pub fn new(histogram: prometheus::HistogramVec, failure: prometheus::CounterVec) -> Self { + Self { histogram, failure } + } +} + +impl Layer for MetricsMiddlewareLayer { + type Service = MetricsMiddleware; + + fn layer(&self, inner: S) -> Self::Service { + MetricsMiddleware { + inner, + histogram: self.histogram.clone(), + failure: self.failure.clone(), + } + } +} + +impl Service> for MetricsMiddleware +where + S: Service> + Clone + 'static, + ReqBody: 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = MetricsFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> MetricsFuture { + let labels = request.extensions().get::().cloned(); + MetricsFuture { + timer: None, + histogram: self.histogram.clone(), + failure: self.failure.clone(), + labels, + fut: self.inner.call(request), + } + } +} + +#[pin_project] +pub struct MetricsFuture { + /// Instant at which we started the requst. + timer: Option, + + histogram: prometheus::HistogramVec, + failure: prometheus::CounterVec, + + labels: Option, + + #[pin] + fut: F, +} + +impl Future for MetricsFuture +where + F: Future>, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let Some(labels) = &this.labels else { + return this.fut.poll(cx); + }; + + if this.timer.is_none() { + // Start timer so we can track duration of request. + let duration_metric = this.histogram.with_label_values(&labels.get_labels()); + *this.timer = Some(duration_metric.start_timer()); + } + + match this.fut.poll(cx) { + Poll::Ready(result) => { + if result.is_err() { + let _ = this + .failure + .get_metric_with_label_values(&labels.get_labels()); + } + // Record the duration of this request. + if let Some(timer) = this.timer.take() { + timer.observe_duration(); + } + Poll::Ready(result) + } + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use axum::{ + body::Body, + http::{Request, Response}, + }; + use prometheus::core::Collector; + use tower::{Service, ServiceBuilder, ServiceExt}; + + use crate::middleware::metrics::{MetricLabels, MetricsMiddlewareLayer}; + + use super::MetricLabelProvider; + + struct TestLabel; + impl MetricLabelProvider for TestLabel { + fn get_labels(&self) -> Vec<&str> { + vec!["label1,", "label2", "label3"] + } + } + async fn handle(_: Request) -> anyhow::Result> { + Ok(Response::new(Body::default())) + } + + async fn handle_err(_: Request) -> anyhow::Result> { + Err(anyhow::anyhow!("Error")) + } + + #[tokio::test] + async fn test_metrics_middleware() { + let registry = prometheus::Registry::new(); + let histogram_metric = prometheus::register_histogram_vec_with_registry!( + "histogram_metric", + "Test", + &["deployment", "sender", "allocation"], + registry, + ) + .unwrap(); + + let failure_metric = prometheus::register_counter_vec_with_registry!( + "failure_metric", + "Test", + &["deployment", "sender", "allocation"], + registry, + ) + .unwrap(); + + // check if everything is clean + assert_eq!( + histogram_metric + .collect() + .first() + .unwrap() + .get_metric() + .len(), + 0 + ); + assert_eq!( + failure_metric.collect().first().unwrap().get_metric().len(), + 0 + ); + + let metrics_layer = + MetricsMiddlewareLayer::new(histogram_metric.clone(), failure_metric.clone()); + let mut service = ServiceBuilder::new() + .layer(metrics_layer) + .service_fn(handle); + let handle = service.ready().await.unwrap(); + + // default labels, all empty + let labels: MetricLabels = Arc::new(TestLabel); + + let mut req = Request::new(Default::default()); + req.extensions_mut().insert(labels.clone()); + let _ = handle.call(req).await; + + assert_eq!( + histogram_metric + .collect() + .first() + .unwrap() + .get_metric() + .len(), + 1 + ); + + assert_eq!( + failure_metric.collect().first().unwrap().get_metric().len(), + 0 + ); + + let metrics_layer = + MetricsMiddlewareLayer::new(histogram_metric.clone(), failure_metric.clone()); + let mut service = ServiceBuilder::new() + .layer(metrics_layer) + .service_fn(handle_err); + let handle = service.ready().await.unwrap(); + + let mut req = Request::new(Default::default()); + req.extensions_mut().insert(labels); + let _ = handle.call(req).await; + + // it's using the same labels, should have only one metric + assert_eq!( + histogram_metric + .collect() + .first() + .unwrap() + .get_metric() + .len(), + 1 + ); + + // new failture + assert_eq!( + failure_metric.collect().first().unwrap().get_metric().len(), + 1 + ); + } +} From 5c6f19ef2a2c05b5acabe48a1d61144e9d8cedf6 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 16:27:41 -0600 Subject: [PATCH 08/17] refactor: add map_watcher Signed-off-by: Gustavo Inacio --- crates/watcher/src/lib.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index b011b2796..931c62fb2 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -113,3 +113,34 @@ where } }) } + +// Maps all outputs of Receiver into a new watcher +pub fn map_watcher( + mut receiver: watch::Receiver, + map_function: F, +) -> watch::Receiver +where + T1: Clone + Send + Sync + 'static, + T2: Send + Sync + 'static, + F: Fn(T1) -> T2 + Send + 'static, +{ + let initial_value = map_function(receiver.borrow().clone()); + let (tx, rx) = watch::channel(initial_value); + + tokio::spawn(async move { + loop { + select! { + Ok(())= receiver.changed() =>{}, + else=>{ + // Something is wrong. + panic!("receiver was dropped"); + } + } + + let current_val = receiver.borrow().clone(); + let mapped_value = map_function(current_val); + tx.send(mapped_value).expect("Failed to update channel"); + } + }); + rx +} From d1ac5c444025cc741beb46ecb02f4078be4d0ac5 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 16:28:02 -0600 Subject: [PATCH 09/17] refactor: add deployment_to_allocation monitor Signed-off-by: Gustavo Inacio --- .../monitor/src/deployment_to_allocation.rs | 21 +++++++++++++++++++ crates/monitor/src/lib.rs | 2 ++ 2 files changed, 23 insertions(+) create mode 100644 crates/monitor/src/deployment_to_allocation.rs diff --git a/crates/monitor/src/deployment_to_allocation.rs b/crates/monitor/src/deployment_to_allocation.rs new file mode 100644 index 000000000..f30ea601d --- /dev/null +++ b/crates/monitor/src/deployment_to_allocation.rs @@ -0,0 +1,21 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashMap; +use thegraph_core::{Address, DeploymentId}; +use tokio::sync::watch::Receiver; + +use indexer_allocation::Allocation; +use indexer_watcher::map_watcher; + +/// An always up-to-date list of attestation signers, one for each of the indexer's allocations. +pub fn deployment_to_allocation( + indexer_allocations_rx: Receiver>, +) -> Receiver> { + map_watcher(indexer_allocations_rx, move |allocation| { + allocation + .iter() + .map(|(address, allocation)| (allocation.subgraph_deployment.id, *address)) + .collect() + }) +} diff --git a/crates/monitor/src/lib.rs b/crates/monitor/src/lib.rs index 765ee6299..f90ffd6d9 100644 --- a/crates/monitor/src/lib.rs +++ b/crates/monitor/src/lib.rs @@ -4,6 +4,7 @@ mod allocations; mod attestation; mod client; +mod deployment_to_allocation; mod dispute_manager; mod escrow_accounts; @@ -11,6 +12,7 @@ pub use crate::{ allocations::indexer_allocations, attestation::attestation_signers, client::{DeploymentDetails, SubgraphClient}, + deployment_to_allocation::deployment_to_allocation, dispute_manager::dispute_manager, escrow_accounts::{escrow_accounts, EscrowAccounts, EscrowAccountsError}, }; From 5851510932b4af12bc8fd5561c09150545908feb Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 16:32:19 -0600 Subject: [PATCH 10/17] refactor: move handler metrics to metrics mod Signed-off-by: Gustavo Inacio --- crates/service/src/lib.rs | 1 + crates/service/src/metrics.rs | 28 ++++++++++++++++++ crates/service/src/routes/request_handler.rs | 30 ++------------------ 3 files changed, 31 insertions(+), 28 deletions(-) create mode 100644 crates/service/src/metrics.rs diff --git a/crates/service/src/lib.rs b/crates/service/src/lib.rs index f51ac12bb..8ed5ec937 100644 --- a/crates/service/src/lib.rs +++ b/crates/service/src/lib.rs @@ -4,6 +4,7 @@ mod cli; mod database; mod error; +mod metrics; mod middleware; mod routes; pub mod service; diff --git a/crates/service/src/metrics.rs b/crates/service/src/metrics.rs new file mode 100644 index 000000000..24a4f508a --- /dev/null +++ b/crates/service/src/metrics.rs @@ -0,0 +1,28 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use lazy_static::lazy_static; +use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec}; + +lazy_static! { + /// Register indexer error metrics in Prometheus registry + pub static ref HANDLER_HISTOGRAM: HistogramVec = register_histogram_vec!( + "indexer_query_handler_seconds", + "Histogram for default indexer query handler", + &["deployment", "allocation", "sender"] + ).unwrap(); + + pub static ref HANDLER_FAILURE: CounterVec = register_counter_vec!( + "indexer_query_handler_failed_total", + "Failed queries to handler", + &["deployment"] + ).unwrap(); + + pub static ref FAILED_RECEIPT: CounterVec = register_counter_vec!( + "indexer_receipt_failed_total", + "Failed receipt checks", + &["deployment", "allocation", "sender"] + ) + .unwrap(); + +} diff --git a/crates/service/src/routes/request_handler.rs b/crates/service/src/routes/request_handler.rs index 311188d08..7d5cbcbb4 100644 --- a/crates/service/src/routes/request_handler.rs +++ b/crates/service/src/routes/request_handler.rs @@ -3,46 +3,20 @@ use std::sync::Arc; -use crate::{error::IndexerServiceError, tap::AgoraQuery}; +use crate::{error::IndexerServiceError, metrics::{FAILED_RECEIPT, HANDLER_FAILURE, HANDLER_HISTOGRAM}, tap::AgoraQuery}; use axum::{ extract::{Path, State}, http::HeaderMap, response::IntoResponse, }; use axum_extra::TypedHeader; -use lazy_static::lazy_static; -use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec}; use reqwest::StatusCode; use serde_json::value::RawValue; use tap_core::receipt::Context; use thegraph_core::DeploymentId; use tracing::trace; -use crate::service::{ - AttestationOutput, IndexerServiceResponse, IndexerServiceState, TapReceipt, -}; - -lazy_static! { - /// Register indexer error metrics in Prometheus registry - pub static ref HANDLER_HISTOGRAM: HistogramVec = register_histogram_vec!( - "indexer_query_handler_seconds", - "Histogram for default indexer query handler", - &["deployment", "allocation", "sender"] - ).unwrap(); - - pub static ref HANDLER_FAILURE: CounterVec = register_counter_vec!( - "indexer_query_handler_failed_total", - "Failed queries to handler", - &["deployment"] - ).unwrap(); - - pub static ref FAILED_RECEIPT: CounterVec = register_counter_vec!( - "indexer_receipt_failed_total", - "Failed receipt checks", - &["deployment", "allocation", "sender"] - ).unwrap(); - -} +use crate::service::{AttestationOutput, IndexerServiceResponse, IndexerServiceState, TapReceipt}; pub async fn request_handler( Path(manifest_id): Path, From cd4d5cca620613721347ac3b32048378df2b7af1 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 16:38:40 -0600 Subject: [PATCH 11/17] refactor: use metrics middleware in request handler Signed-off-by: Gustavo Inacio --- crates/service/src/error.rs | 4 -- crates/service/src/middleware.rs | 7 +++ crates/service/src/routes/request_handler.rs | 48 ++------------ crates/service/src/service/indexer_service.rs | 63 +++++++++++++------ 4 files changed, 57 insertions(+), 65 deletions(-) diff --git a/crates/service/src/error.rs b/crates/service/src/error.rs index 4faea4ef2..98fcef42f 100644 --- a/crates/service/src/error.rs +++ b/crates/service/src/error.rs @@ -30,9 +30,6 @@ pub enum IndexerServiceError { #[error("Failed to sign attestation")] FailedToSignAttestation, - #[error("Could not decode signer: {0}")] - CouldNotDecodeSigner(tap_core::Error), - #[error("There was an error while accessing escrow account: {0}")] EscrowAccount(#[from] EscrowAccountsError), } @@ -54,7 +51,6 @@ impl IntoResponse for IndexerServiceError { ReceiptError(_) | InvalidRequest(_) | InvalidFreeQueryAuthToken - | CouldNotDecodeSigner(_) | EscrowAccount(_) | ProcessingError(_) => StatusCode::BAD_REQUEST, }; diff --git a/crates/service/src/middleware.rs b/crates/service/src/middleware.rs index 4065d7d04..b6f0ffb06 100644 --- a/crates/service/src/middleware.rs +++ b/crates/service/src/middleware.rs @@ -7,3 +7,10 @@ mod inject_labels; mod inject_receipt; mod inject_sender; mod metrics; + +pub use inject_allocation::{allocation_middleware, AllocationState}; +pub use inject_deployment::deployment_middleware; +pub use inject_labels::labels_middleware; +pub use inject_receipt::receipt_middleware; +pub use inject_sender::{sender_middleware, SenderState, Sender}; +pub use metrics::MetricsMiddlewareLayer; diff --git a/crates/service/src/routes/request_handler.rs b/crates/service/src/routes/request_handler.rs index 7d5cbcbb4..eff974a7b 100644 --- a/crates/service/src/routes/request_handler.rs +++ b/crates/service/src/routes/request_handler.rs @@ -3,11 +3,14 @@ use std::sync::Arc; -use crate::{error::IndexerServiceError, metrics::{FAILED_RECEIPT, HANDLER_FAILURE, HANDLER_HISTOGRAM}, tap::AgoraQuery}; +use crate::{ + error::IndexerServiceError, metrics::FAILED_RECEIPT, middleware::Sender, tap::AgoraQuery, +}; use axum::{ extract::{Path, State}, http::HeaderMap, response::IntoResponse, + Extension, }; use axum_extra::TypedHeader; use reqwest::StatusCode; @@ -20,23 +23,8 @@ use crate::service::{AttestationOutput, IndexerServiceResponse, IndexerServiceSt pub async fn request_handler( Path(manifest_id): Path, - typed_header: TypedHeader, - state: State>, - headers: HeaderMap, - body: String, -) -> Result { - _request_handler(manifest_id, typed_header, state, headers, body) - .await - .inspect_err(|_| { - HANDLER_FAILURE - .with_label_values(&[&manifest_id.to_string()]) - .inc() - }) -} - -async fn _request_handler( - manifest_id: DeploymentId, TypedHeader(receipt): TypedHeader, + Extension(sender): Extension, State(state): State>, headers: HeaderMap, req: String, @@ -87,30 +75,6 @@ async fn _request_handler( variables, }); - // recover the signer address - // get escrow accounts from channel - // return sender from signer - // - // TODO: We are currently doing this process twice. - // One here and other on common/src/tap/checks/sender_balance_check.rs - // We'll get back to normal once we have attachable context to `verify_and_store_receipt` - let signer = receipt - .recover_signer(&state.domain_separator) - .map_err(IndexerServiceError::CouldNotDecodeSigner)?; - let sender = state - .escrow_accounts - .borrow() - .get_sender_for_signer(&signer) - .map_err(IndexerServiceError::EscrowAccount)?; - - let _metric = HANDLER_HISTOGRAM - .with_label_values(&[ - &manifest_id.to_string(), - &allocation_id.to_string(), - &sender.to_string(), - ]) - .start_timer(); - // Verify the receipt and store it in the database state .tap_manager @@ -121,7 +85,7 @@ async fn _request_handler( .with_label_values(&[ &manifest_id.to_string(), &allocation_id.to_string(), - &sender.to_string(), + &sender.0.to_string(), ]) .inc() }) diff --git a/crates/service/src/service/indexer_service.rs b/crates/service/src/service/indexer_service.rs index 408905850..285347bcf 100644 --- a/crates/service/src/service/indexer_service.rs +++ b/crates/service/src/service/indexer_service.rs @@ -1,22 +1,21 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use alloy::dyn_abi::Eip712Domain; use anyhow; use axum::extract::MatchedPath; use axum::extract::Request as ExtractRequest; -use axum::http::{Method, Request}; use axum::{ + http::{Method, Request}, + middleware::{from_fn, from_fn_with_state}, response::IntoResponse, routing::{get, post}, - Json, Router, + serve, Json, Router, ServiceExt, }; -use axum::{serve, ServiceExt}; use build_info::BuildInfo; use indexer_attestation::AttestationSigner; use indexer_monitor::{ - attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, DeploymentDetails, - EscrowAccounts, SubgraphClient, + attestation_signers, deployment_to_allocation, dispute_manager, escrow_accounts, + indexer_allocations, DeploymentDetails, SubgraphClient, }; use prometheus::TextEncoder; use reqwest::StatusCode; @@ -30,17 +29,23 @@ use thegraph_core::{Address, Attestation}; use tokio::net::TcpListener; use tokio::signal; use tokio::sync::watch::Receiver; +use tower::ServiceBuilder; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tower_http::validate_request::ValidateRequestHeaderLayer; use tower_http::{cors, cors::CorsLayer, normalize_path::NormalizePath, trace::TraceLayer}; use tracing::warn; use tracing::{error, info, info_span}; -use crate::routes::health; -use crate::routes::request_handler; -use crate::routes::static_subgraph_request_handler; -use crate::tap::IndexerTapContext; -use crate::wallet::public_key; +use crate::{ + metrics::{HANDLER_FAILURE, HANDLER_HISTOGRAM}, + middleware::{ + allocation_middleware, deployment_middleware, labels_middleware, receipt_middleware, + sender_middleware, AllocationState, MetricsMiddlewareLayer, SenderState, + }, + routes::{health, request_handler, static_subgraph_request_handler}, + tap::IndexerTapContext, + wallet::public_key, +}; use indexer_config::Config; use super::SubgraphService; @@ -93,10 +98,6 @@ pub struct IndexerServiceState { pub attestation_signers: Receiver>, pub tap_manager: Manager, pub service_impl: SubgraphService, - - // tap - pub escrow_accounts: Receiver, - pub domain_separator: Eip712Domain, } const HTTP_CLIENT_TIMEOUT: Duration = Duration::from_secs(30); @@ -257,7 +258,7 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { let checks = IndexerTapContext::get_checks( database, - allocations, + allocations.clone(), escrow_accounts.clone(), domain_separator.clone(), timestamp_error_tolerance, @@ -276,8 +277,6 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { attestation_signers, tap_manager, service_impl: options.service_impl, - escrow_accounts, - domain_separator, }); // Rate limits by allowing bursts of 10 requests and requiring 100ms of @@ -362,13 +361,39 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { misc_routes = misc_routes.with_state(state.clone()); + let deployment_to_allocation = deployment_to_allocation(allocations); + let allocation_state = AllocationState { + deployment_to_allocation, + }; + let sender_state = SenderState { + escrow_accounts, + domain_separator, + }; + + let service_builder = ServiceBuilder::new() + // inject deployment id + .layer(from_fn(deployment_middleware)) + // inject receipt + .layer(from_fn(receipt_middleware)) + // inject allocation id + .layer(from_fn_with_state(allocation_state, allocation_middleware)) + // inject sender + .layer(from_fn_with_state(sender_state, sender_middleware)) + // inject metrics labels + .layer(from_fn(labels_middleware)) + // metrics for histogram and failure + .layer(MetricsMiddlewareLayer::new( + HANDLER_HISTOGRAM.clone(), + HANDLER_FAILURE.clone(), + )); + let data_routes = Router::new() .route( PathBuf::from(&options.config.service.url_prefix) .join(format!("{}/id/:id", options.url_namespace)) .to_str() .expect("Failed to set up `/{url_namespace}/id/:id` route"), - post(request_handler), + post(request_handler).route_layer(service_builder), ) .with_state(state.clone()); From e439aefe8b7c481efe2861056fb432992c156b5e Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 20 Nov 2024 16:47:51 -0600 Subject: [PATCH 12/17] refactor: use extensions for query Signed-off-by: Gustavo Inacio --- crates/service/src/middleware.rs | 4 +- crates/service/src/routes/request_handler.rs | 79 +++++++++----------- 2 files changed, 38 insertions(+), 45 deletions(-) diff --git a/crates/service/src/middleware.rs b/crates/service/src/middleware.rs index b6f0ffb06..11a349b37 100644 --- a/crates/service/src/middleware.rs +++ b/crates/service/src/middleware.rs @@ -8,9 +8,9 @@ mod inject_receipt; mod inject_sender; mod metrics; -pub use inject_allocation::{allocation_middleware, AllocationState}; +pub use inject_allocation::{allocation_middleware, Allocation, AllocationState}; pub use inject_deployment::deployment_middleware; pub use inject_labels::labels_middleware; pub use inject_receipt::receipt_middleware; -pub use inject_sender::{sender_middleware, SenderState, Sender}; +pub use inject_sender::{sender_middleware, Sender, SenderState}; pub use metrics::MetricsMiddlewareLayer; diff --git a/crates/service/src/routes/request_handler.rs b/crates/service/src/routes/request_handler.rs index eff974a7b..3a84fce5c 100644 --- a/crates/service/src/routes/request_handler.rs +++ b/crates/service/src/routes/request_handler.rs @@ -4,7 +4,10 @@ use std::sync::Arc; use crate::{ - error::IndexerServiceError, metrics::FAILED_RECEIPT, middleware::Sender, tap::AgoraQuery, + error::IndexerServiceError, + metrics::FAILED_RECEIPT, + middleware::{Allocation, Sender}, + tap::AgoraQuery, }; use axum::{ extract::{Path, State}, @@ -24,7 +27,8 @@ use crate::service::{AttestationOutput, IndexerServiceResponse, IndexerServiceSt pub async fn request_handler( Path(manifest_id): Path, TypedHeader(receipt): TypedHeader, - Extension(sender): Extension, + Extension(Sender(sender)): Extension, + Extension(Allocation(allocation_id)): Extension, State(state): State>, headers: HeaderMap, req: String, @@ -34,8 +38,35 @@ pub async fn request_handler( let request: QueryBody = serde_json::from_str(&req).map_err(|e| IndexerServiceError::InvalidRequest(e.into()))?; - let Some(receipt) = receipt.into_signed_receipt() else { - // Serve free query, NO METRICS + if let Some(receipt) = receipt.into_signed_receipt() { + let variables = request + .variables + .as_ref() + .map(ToString::to_string) + .unwrap_or_default(); + let mut ctx = Context::new(); + ctx.insert(AgoraQuery { + deployment_id: manifest_id, + query: request.query.clone(), + variables, + }); + + // Verify the receipt and store it in the database + state + .tap_manager + .verify_and_store_receipt(&ctx, receipt) + .await + .inspect_err(|_| { + FAILED_RECEIPT + .with_label_values(&[ + &manifest_id.to_string(), + &allocation_id.to_string(), + &sender.to_string(), + ]) + .inc() + }) + .map_err(IndexerServiceError::ReceiptError)?; + } else { match headers .get("authorization") .and_then(|v| v.to_str().ok()) @@ -51,45 +82,7 @@ pub async fn request_handler( } trace!(?manifest_id, "New free query"); - - let response = state - .service_impl - .process_request(manifest_id, request) - .await - .map_err(IndexerServiceError::ProcessingError)? - .finalize(AttestationOutput::Attestable); - return Ok((StatusCode::OK, response)); - }; - - let allocation_id = receipt.message.allocation_id; - - let variables = request - .variables - .as_ref() - .map(ToString::to_string) - .unwrap_or_default(); - let mut ctx = Context::new(); - ctx.insert(AgoraQuery { - deployment_id: manifest_id, - query: request.query.clone(), - variables, - }); - - // Verify the receipt and store it in the database - state - .tap_manager - .verify_and_store_receipt(&ctx, receipt) - .await - .inspect_err(|_| { - FAILED_RECEIPT - .with_label_values(&[ - &manifest_id.to_string(), - &allocation_id.to_string(), - &sender.0.to_string(), - ]) - .inc() - }) - .map_err(IndexerServiceError::ReceiptError)?; + } // Check if we have an attestation signer for the allocation the receipt was created for let signer = state From 5613baa55f8bd27b7775bcc7974b4b0917882fef Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 21 Nov 2024 18:05:51 -0600 Subject: [PATCH 13/17] refactor: rename metrics to prometheus metrics Signed-off-by: Gustavo Inacio --- crates/service/src/middleware.rs | 4 +-- .../service/src/middleware/inject_labels.rs | 4 +-- .../{metrics.rs => prometheus_metrics.rs} | 30 +++++++++---------- crates/service/src/service/indexer_service.rs | 4 +-- 4 files changed, 21 insertions(+), 21 deletions(-) rename crates/service/src/middleware/{metrics.rs => prometheus_metrics.rs} (86%) diff --git a/crates/service/src/middleware.rs b/crates/service/src/middleware.rs index 11a349b37..0c76f1853 100644 --- a/crates/service/src/middleware.rs +++ b/crates/service/src/middleware.rs @@ -6,11 +6,11 @@ mod inject_deployment; mod inject_labels; mod inject_receipt; mod inject_sender; -mod metrics; +mod prometheus_metrics; pub use inject_allocation::{allocation_middleware, Allocation, AllocationState}; pub use inject_deployment::deployment_middleware; pub use inject_labels::labels_middleware; pub use inject_receipt::receipt_middleware; pub use inject_sender::{sender_middleware, Sender, SenderState}; -pub use metrics::MetricsMiddlewareLayer; +pub use prometheus_metrics::PrometheusMetricsMiddlewareLayer; diff --git a/crates/service/src/middleware/inject_labels.rs b/crates/service/src/middleware/inject_labels.rs index 24439b0d8..626254a7e 100644 --- a/crates/service/src/middleware/inject_labels.rs +++ b/crates/service/src/middleware/inject_labels.rs @@ -13,7 +13,7 @@ use thegraph_core::DeploymentId; use super::{ inject_allocation::Allocation, inject_sender::Sender, - metrics::{MetricLabelProvider, MetricLabels}, + prometheus_metrics::{MetricLabelProvider, MetricLabels}, }; const NO_DEPLOYMENT_ID: &str = "no-deployment"; @@ -78,7 +78,7 @@ pub async fn labels_middleware(mut request: Request, next: Next) -> Response { #[cfg(test)] mod tests { use crate::middleware::{ - inject_allocation::Allocation, inject_sender::Sender, metrics::MetricLabels, + inject_allocation::Allocation, inject_sender::Sender, prometheus_metrics::MetricLabels, }; use super::labels_middleware; diff --git a/crates/service/src/middleware/metrics.rs b/crates/service/src/middleware/prometheus_metrics.rs similarity index 86% rename from crates/service/src/middleware/metrics.rs rename to crates/service/src/middleware/prometheus_metrics.rs index 6941cfbef..b7bff5b7c 100644 --- a/crates/service/src/middleware/metrics.rs +++ b/crates/service/src/middleware/prometheus_metrics.rs @@ -21,29 +21,29 @@ pub trait MetricLabelProvider { } #[derive(Clone)] -pub struct MetricsMiddleware { +pub struct PrometheusMetricsMiddleware { inner: S, histogram: prometheus::HistogramVec, failure: prometheus::CounterVec, } #[derive(Clone)] -pub struct MetricsMiddlewareLayer { +pub struct PrometheusMetricsMiddlewareLayer { histogram: prometheus::HistogramVec, failure: prometheus::CounterVec, } -impl MetricsMiddlewareLayer { +impl PrometheusMetricsMiddlewareLayer { pub fn new(histogram: prometheus::HistogramVec, failure: prometheus::CounterVec) -> Self { Self { histogram, failure } } } -impl Layer for MetricsMiddlewareLayer { - type Service = MetricsMiddleware; +impl Layer for PrometheusMetricsMiddlewareLayer { + type Service = PrometheusMetricsMiddleware; fn layer(&self, inner: S) -> Self::Service { - MetricsMiddleware { + PrometheusMetricsMiddleware { inner, histogram: self.histogram.clone(), failure: self.failure.clone(), @@ -51,22 +51,22 @@ impl Layer for MetricsMiddlewareLayer { } } -impl Service> for MetricsMiddleware +impl Service> for PrometheusMetricsMiddleware where S: Service> + Clone + 'static, ReqBody: 'static, { type Response = S::Response; type Error = S::Error; - type Future = MetricsFuture; + type Future = PrometheusMetricsFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } - fn call(&mut self, request: Request) -> MetricsFuture { + fn call(&mut self, request: Request) -> PrometheusMetricsFuture { let labels = request.extensions().get::().cloned(); - MetricsFuture { + PrometheusMetricsFuture { timer: None, histogram: self.histogram.clone(), failure: self.failure.clone(), @@ -77,7 +77,7 @@ where } #[pin_project] -pub struct MetricsFuture { +pub struct PrometheusMetricsFuture { /// Instant at which we started the requst. timer: Option, @@ -90,7 +90,7 @@ pub struct MetricsFuture { fut: F, } -impl Future for MetricsFuture +impl Future for PrometheusMetricsFuture where F: Future>, { @@ -137,7 +137,7 @@ mod tests { use prometheus::core::Collector; use tower::{Service, ServiceBuilder, ServiceExt}; - use crate::middleware::metrics::{MetricLabels, MetricsMiddlewareLayer}; + use crate::middleware::prometheus_metrics::{MetricLabels, PrometheusMetricsMiddlewareLayer}; use super::MetricLabelProvider; @@ -190,7 +190,7 @@ mod tests { ); let metrics_layer = - MetricsMiddlewareLayer::new(histogram_metric.clone(), failure_metric.clone()); + PrometheusMetricsMiddlewareLayer::new(histogram_metric.clone(), failure_metric.clone()); let mut service = ServiceBuilder::new() .layer(metrics_layer) .service_fn(handle); @@ -219,7 +219,7 @@ mod tests { ); let metrics_layer = - MetricsMiddlewareLayer::new(histogram_metric.clone(), failure_metric.clone()); + PrometheusMetricsMiddlewareLayer::new(histogram_metric.clone(), failure_metric.clone()); let mut service = ServiceBuilder::new() .layer(metrics_layer) .service_fn(handle_err); diff --git a/crates/service/src/service/indexer_service.rs b/crates/service/src/service/indexer_service.rs index 285347bcf..c50cf8b3a 100644 --- a/crates/service/src/service/indexer_service.rs +++ b/crates/service/src/service/indexer_service.rs @@ -40,7 +40,7 @@ use crate::{ metrics::{HANDLER_FAILURE, HANDLER_HISTOGRAM}, middleware::{ allocation_middleware, deployment_middleware, labels_middleware, receipt_middleware, - sender_middleware, AllocationState, MetricsMiddlewareLayer, SenderState, + sender_middleware, AllocationState, PrometheusMetricsMiddlewareLayer, SenderState, }, routes::{health, request_handler, static_subgraph_request_handler}, tap::IndexerTapContext, @@ -382,7 +382,7 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { // inject metrics labels .layer(from_fn(labels_middleware)) // metrics for histogram and failure - .layer(MetricsMiddlewareLayer::new( + .layer(PrometheusMetricsMiddlewareLayer::new( HANDLER_HISTOGRAM.clone(), HANDLER_FAILURE.clone(), )); From a209e9ba443b7b320b01102a4018a08ab234a304 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 21 Nov 2024 18:06:47 -0600 Subject: [PATCH 14/17] docs: add documentation to middlewares Signed-off-by: Gustavo Inacio --- crates/monitor/src/deployment_to_allocation.rs | 3 ++- crates/service/src/metrics.rs | 13 ++++++++++++- .../service/src/middleware/inject_allocation.rs | 15 ++++++++------- .../service/src/middleware/inject_deployment.rs | 1 + crates/service/src/middleware/inject_labels.rs | 11 +++++++---- crates/service/src/middleware/inject_receipt.rs | 3 +++ crates/service/src/middleware/inject_sender.rs | 7 +++++++ .../service/src/middleware/prometheus_metrics.rs | 6 ++++++ 8 files changed, 46 insertions(+), 13 deletions(-) diff --git a/crates/monitor/src/deployment_to_allocation.rs b/crates/monitor/src/deployment_to_allocation.rs index f30ea601d..ae55c9185 100644 --- a/crates/monitor/src/deployment_to_allocation.rs +++ b/crates/monitor/src/deployment_to_allocation.rs @@ -8,7 +8,8 @@ use tokio::sync::watch::Receiver; use indexer_allocation::Allocation; use indexer_watcher::map_watcher; -/// An always up-to-date list of attestation signers, one for each of the indexer's allocations. +/// Watcher of indexer allocation +/// returning a map of subgraph deployment to allocation id pub fn deployment_to_allocation( indexer_allocations_rx: Receiver>, ) -> Receiver> { diff --git a/crates/service/src/metrics.rs b/crates/service/src/metrics.rs index 24a4f508a..d9517492c 100644 --- a/crates/service/src/metrics.rs +++ b/crates/service/src/metrics.rs @@ -5,19 +5,30 @@ use lazy_static::lazy_static; use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec}; lazy_static! { - /// Register indexer error metrics in Prometheus registry + /// Metric registered in global registry for + /// indexer query handler + /// + /// Labels: "deployment", "allocation", "sender" pub static ref HANDLER_HISTOGRAM: HistogramVec = register_histogram_vec!( "indexer_query_handler_seconds", "Histogram for default indexer query handler", &["deployment", "allocation", "sender"] ).unwrap(); + /// Metric registered in global registry for + /// Failed queries to handler + /// + /// Labels: "deployment" pub static ref HANDLER_FAILURE: CounterVec = register_counter_vec!( "indexer_query_handler_failed_total", "Failed queries to handler", &["deployment"] ).unwrap(); + /// Metric registered in global registry for + /// Failed receipt checks + /// + /// Labels: "deployment", "allocation", "sender" pub static ref FAILED_RECEIPT: CounterVec = register_counter_vec!( "indexer_receipt_failed_total", "Failed receipt checks", diff --git a/crates/service/src/middleware/inject_allocation.rs b/crates/service/src/middleware/inject_allocation.rs index 802e56a8a..d9b572b82 100644 --- a/crates/service/src/middleware/inject_allocation.rs +++ b/crates/service/src/middleware/inject_allocation.rs @@ -1,13 +1,6 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -//! injects allocation id in extensions -//! - check if allocation id already exists -//! - else, try to fetch allocation id from deployment_id and allocations watcher -//! - execute query -//! -//! Needs signed receipt Extension to be added OR deployment id - use std::collections::HashMap; use alloy::primitives::Address; @@ -20,6 +13,7 @@ use tap_core::receipt::SignedReceipt; use thegraph_core::DeploymentId; use tokio::sync::watch; +/// The current query Allocation Id address #[derive(Clone)] pub struct Allocation(pub Address); @@ -29,11 +23,18 @@ impl From for String { } } +/// State to be used by allocation middleware #[derive(Clone)] pub struct AllocationState { + /// watcher that maps deployment ids to allocation ids pub deployment_to_allocation: watch::Receiver>, } +/// Injects allocation id in extensions +/// - check if allocation id already exists +/// - else, try to fetch allocation id from deployment_id to allocations map +/// +/// Requires signed receipt Extension to be added OR deployment id pub async fn allocation_middleware( State(my_state): State, mut request: Request, diff --git a/crates/service/src/middleware/inject_deployment.rs b/crates/service/src/middleware/inject_deployment.rs index c9632ea0f..ba415238e 100644 --- a/crates/service/src/middleware/inject_deployment.rs +++ b/crates/service/src/middleware/inject_deployment.rs @@ -9,6 +9,7 @@ use axum::{ }; use thegraph_core::DeploymentId; +/// Injects deployment id in the extensions from the path pub async fn deployment_middleware(mut request: Request, next: Next) -> Response { let deployment_id = request.extract_parts::>().await.ok(); if let Some(Path(deployment_id)) = deployment_id { diff --git a/crates/service/src/middleware/inject_labels.rs b/crates/service/src/middleware/inject_labels.rs index 626254a7e..3e8762fe6 100644 --- a/crates/service/src/middleware/inject_labels.rs +++ b/crates/service/src/middleware/inject_labels.rs @@ -1,10 +1,6 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -//! Injects Metric Labels -//! -//! Require Sender, Allocation and Deployment extensions - use std::sync::Arc; use axum::{extract::Request, middleware::Next, response::Response}; @@ -20,6 +16,10 @@ const NO_DEPLOYMENT_ID: &str = "no-deployment"; const NO_ALLOCATION: &str = "no-allocation"; const NO_SENDER: &str = "no-sender"; +/// Labels used by metrics which implements MetricLabelProvider +/// +/// Might contain sender, allocation and deployment id and fills +/// the gaps with constant values when they are not present #[derive(Clone, Default)] pub struct SenderAllocationDeploymentLabels { sender: Option, @@ -49,6 +49,9 @@ impl MetricLabelProvider for SenderAllocationDeploymentLabels { } } +/// Injects Metric Labels to be used by MetricMiddleware +/// +/// Soft requirement for Sender, Allocation and Deployment extensions pub async fn labels_middleware(mut request: Request, next: Next) -> Response { let sender: Option = request .extensions() diff --git a/crates/service/src/middleware/inject_receipt.rs b/crates/service/src/middleware/inject_receipt.rs index 7396f5d03..76fec4c92 100644 --- a/crates/service/src/middleware/inject_receipt.rs +++ b/crates/service/src/middleware/inject_receipt.rs @@ -6,6 +6,9 @@ use axum_extra::TypedHeader; use crate::service::TapReceipt; +/// Injects tap receipts in the extensions +/// +/// This is useful to not deserialize multiple times the same receipt pub async fn receipt_middleware(mut request: Request, next: Next) -> Response { if let Ok(TypedHeader(receipt)) = request.extract_parts::>().await { if let Some(receipt) = receipt.into_signed_receipt() { diff --git a/crates/service/src/middleware/inject_sender.rs b/crates/service/src/middleware/inject_sender.rs index 195bdc049..40fa26ab5 100644 --- a/crates/service/src/middleware/inject_sender.rs +++ b/crates/service/src/middleware/inject_sender.rs @@ -13,12 +13,16 @@ use tokio::sync::watch; use crate::error::IndexerServiceError; +/// Stated used by sender middleware #[derive(Clone)] pub struct SenderState { + /// Used to recover the signer address pub domain_separator: Eip712Domain, + /// Used to get the sender address given the signer address pub escrow_accounts: watch::Receiver, } +/// The current query Sender address #[derive(Clone)] pub struct Sender(pub Address); @@ -28,6 +32,9 @@ impl From for String { } } +/// Injects the sender found from the signer in the receipt +/// +/// Requires Receipt extension pub async fn sender_middleware( State(state): State, mut request: Request, diff --git a/crates/service/src/middleware/prometheus_metrics.rs b/crates/service/src/middleware/prometheus_metrics.rs index b7bff5b7c..409e01ba2 100644 --- a/crates/service/src/middleware/prometheus_metrics.rs +++ b/crates/service/src/middleware/prometheus_metrics.rs @@ -20,6 +20,7 @@ pub trait MetricLabelProvider { fn get_labels(&self) -> Vec<&str>; } +/// Middleware for metrics #[derive(Clone)] pub struct PrometheusMetricsMiddleware { inner: S, @@ -27,9 +28,14 @@ pub struct PrometheusMetricsMiddleware { failure: prometheus::CounterVec, } +/// MetricsMiddleware used in tower components +/// +/// Register prometheus metrics in case of success or failure #[derive(Clone)] pub struct PrometheusMetricsMiddlewareLayer { + /// Histogram used to register the processing timer histogram: prometheus::HistogramVec, + /// Counter metric in case of failure failure: prometheus::CounterVec, } From 73bfe0d9cdf050cd4a850e24329ff8f9ef725227 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 21 Nov 2024 18:07:01 -0600 Subject: [PATCH 15/17] test: update middleware tests Signed-off-by: Gustavo Inacio --- .../monitor/src/deployment_to_allocation.rs | 22 +++++++++++++++++++ .../src/middleware/inject_deployment.rs | 11 ++++++---- .../service/src/middleware/inject_receipt.rs | 15 ++++++++----- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/crates/monitor/src/deployment_to_allocation.rs b/crates/monitor/src/deployment_to_allocation.rs index ae55c9185..97756cb0c 100644 --- a/crates/monitor/src/deployment_to_allocation.rs +++ b/crates/monitor/src/deployment_to_allocation.rs @@ -20,3 +20,25 @@ pub fn deployment_to_allocation( .collect() }) } + +#[cfg(test)] +mod tests { + use tokio::sync::watch; + + use super::deployment_to_allocation; + + #[tokio::test] + async fn test_deployment_to_allocation() { + let allocations = test_assets::INDEXER_ALLOCATIONS.clone(); + let allocations_watcher = watch::channel(allocations.clone()).1; + let deployment = deployment_to_allocation(allocations_watcher); + + let deployments = deployment.borrow(); + // one of the allocation id point to the same subgraph + assert_eq!(deployments.len(), 3); + // check if all allocations point to the subgraph id + for (key, val) in deployments.iter() { + assert_eq!(allocations.get(val).unwrap().subgraph_deployment.id, *key); + } + } +} diff --git a/crates/service/src/middleware/inject_deployment.rs b/crates/service/src/middleware/inject_deployment.rs index ba415238e..45f2e1110 100644 --- a/crates/service/src/middleware/inject_deployment.rs +++ b/crates/service/src/middleware/inject_deployment.rs @@ -37,12 +37,15 @@ mod tests { async fn test_deployment_middleware() { let middleware = from_fn(deployment_middleware); - async fn handle(extensions: Extensions) -> Body { - extensions + let deployment = *ESCROW_SUBGRAPH_DEPLOYMENT; + + let handle = move |extensions: Extensions| async move { + let received_deployment = extensions .get::() .expect("Should contain a deployment_id"); + assert_eq!(*received_deployment, deployment); Body::empty() - } + }; let app = Router::new() .route("/:deployment_id", get(handle)) @@ -51,7 +54,7 @@ mod tests { let res = app .oneshot( Request::builder() - .uri(format!("/{}", *ESCROW_SUBGRAPH_DEPLOYMENT)) + .uri(format!("/{}", deployment)) .body(Body::empty()) .unwrap(), ) diff --git a/crates/service/src/middleware/inject_receipt.rs b/crates/service/src/middleware/inject_receipt.rs index 76fec4c92..5959e4d2c 100644 --- a/crates/service/src/middleware/inject_receipt.rs +++ b/crates/service/src/middleware/inject_receipt.rs @@ -40,22 +40,25 @@ mod tests { async fn test_receipt_middleware() { let middleware = from_fn(receipt_middleware); - async fn handle(extensions: Extensions) -> Body { - extensions + let receipt = create_signed_receipt(Address::ZERO, 1, 1, 1).await; + let receipt_json = serde_json::to_string(&receipt).unwrap(); + + let handle = move |extensions: Extensions| async move { + let received_receipt = extensions .get::() .expect("Should decode tap receipt"); + assert_eq!(received_receipt.message, receipt.message); + assert_eq!(received_receipt.signature, receipt.signature); Body::empty() - } + }; let app = Router::new().route("/", get(handle)).layer(middleware); - let receipt = create_signed_receipt(Address::ZERO, 1, 1, 1).await; - let res = app .oneshot( Request::builder() .uri("/") - .header(TapReceipt::name(), serde_json::to_string(&receipt).unwrap()) + .header(TapReceipt::name(), receipt_json) .body(Body::empty()) .unwrap(), ) From 37adb82c649cabf1547ad02d572807d35b03ae4c Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 21 Nov 2024 18:30:36 -0600 Subject: [PATCH 16/17] docs: add comment on why we dont fail in absense Signed-off-by: Gustavo Inacio --- crates/service/src/middleware/inject_receipt.rs | 4 ++++ crates/service/src/middleware/inject_sender.rs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/crates/service/src/middleware/inject_receipt.rs b/crates/service/src/middleware/inject_receipt.rs index 5959e4d2c..db2491c82 100644 --- a/crates/service/src/middleware/inject_receipt.rs +++ b/crates/service/src/middleware/inject_receipt.rs @@ -8,6 +8,10 @@ use crate::service::TapReceipt; /// Injects tap receipts in the extensions /// +/// A request won't always have a receipt because they might be +/// free queries. +/// That's why we don't fail with 400. +/// /// This is useful to not deserialize multiple times the same receipt pub async fn receipt_middleware(mut request: Request, next: Next) -> Response { if let Ok(TypedHeader(receipt)) = request.extract_parts::>().await { diff --git a/crates/service/src/middleware/inject_sender.rs b/crates/service/src/middleware/inject_sender.rs index 40fa26ab5..f5a014c43 100644 --- a/crates/service/src/middleware/inject_sender.rs +++ b/crates/service/src/middleware/inject_sender.rs @@ -34,6 +34,10 @@ impl From for String { /// Injects the sender found from the signer in the receipt /// +/// A request won't always have a receipt because they might be +/// free queries. +/// That's why we don't fail with 400. +/// /// Requires Receipt extension pub async fn sender_middleware( State(state): State, From 55c40a9b813fb519bd31164a34ba9288d9de6967 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 21 Nov 2024 18:34:21 -0600 Subject: [PATCH 17/17] test: add no labels case Signed-off-by: Gustavo Inacio --- .../service/src/middleware/inject_labels.rs | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/crates/service/src/middleware/inject_labels.rs b/crates/service/src/middleware/inject_labels.rs index 3e8762fe6..8b87db4a5 100644 --- a/crates/service/src/middleware/inject_labels.rs +++ b/crates/service/src/middleware/inject_labels.rs @@ -81,7 +81,10 @@ pub async fn labels_middleware(mut request: Request, next: Next) -> Response { #[cfg(test)] mod tests { use crate::middleware::{ - inject_allocation::Allocation, inject_sender::Sender, prometheus_metrics::MetricLabels, + inject_allocation::Allocation, + inject_labels::{NO_ALLOCATION, NO_DEPLOYMENT_ID, NO_SENDER}, + inject_sender::Sender, + prometheus_metrics::MetricLabels, }; use super::labels_middleware; @@ -99,7 +102,7 @@ mod tests { use tower::ServiceExt; #[tokio::test] - async fn test_receipt_middleware() { + async fn test_label_middleware() { let middleware = from_fn(labels_middleware); let deployment = *ESCROW_SUBGRAPH_DEPLOYMENT; @@ -109,7 +112,7 @@ mod tests { let handle = move |extensions: Extensions| async move { let metrics = extensions .get::() - .expect("Should decode tap receipt"); + .expect("Should decode metric label"); assert_eq!( metrics.get_labels(), vec![ @@ -137,4 +140,28 @@ mod tests { .unwrap(); assert_eq!(res.status(), StatusCode::OK); } + + #[tokio::test] + async fn test_empty_label_middleware() { + let middleware = from_fn(labels_middleware); + + let handle = move |extensions: Extensions| async move { + let metrics = extensions + .get::() + .expect("Should decode metric label"); + assert_eq!( + metrics.get_labels(), + vec![NO_DEPLOYMENT_ID, NO_ALLOCATION, NO_SENDER] + ); + Body::empty() + }; + + let app = Router::new().route("/", get(handle)).layer(middleware); + + let res = app + .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } }