diff --git a/Cargo.lock b/Cargo.lock index 63a473e12..66b2c2d02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -342,7 +342,7 @@ dependencies = [ "futures-utils-wasm", "lru", "parking_lot", - "pin-project 1.1.6", + "pin-project 1.1.7", "reqwest 0.12.9", "schnellru", "serde", @@ -409,7 +409,7 @@ dependencies = [ "alloy-transport-ipc", "alloy-transport-ws", "futures", - "pin-project 1.1.6", + "pin-project 1.1.7", "reqwest 0.12.9", "serde", "serde_json", @@ -690,7 +690,7 @@ dependencies = [ "bytes", "futures", "interprocess", - "pin-project 1.1.6", + "pin-project 1.1.7", "serde_json", "tokio", "tokio-util", @@ -3532,7 +3532,7 @@ dependencies = [ [[package]] name = "indexer-monitor" -version = "1.4.0" +version = "0.1.0" dependencies = [ "alloy", "anyhow", @@ -3596,6 +3596,7 @@ dependencies = [ "indexer-monitor", "indexer-query", "lazy_static", + "pin-project 1.1.7", "prometheus", "reqwest 0.12.9", "serde", @@ -3607,8 +3608,11 @@ dependencies = [ "thegraph-graphql-http", "thiserror", "tokio", + "tokio-test", "tokio-util", + "tower 0.5.1", "tower-http", + "tower-test", "tower_governor", "tracing", "tracing-subscriber", @@ -3900,7 +3904,7 @@ dependencies = [ "hyper-util", "jsonrpsee-core", "jsonrpsee-types", - "pin-project 1.1.6", + "pin-project 1.1.7", "route-recognizer", "serde", "serde_json", @@ -4743,11 +4747,11 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.6" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf123a161dde1e524adf36f90bc5d8d3462824a9c43553ad07a8183161189ec" +checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" dependencies = [ - "pin-project-internal 1.1.6", + "pin-project-internal 1.1.7", ] [[package]] @@ -4763,9 +4767,9 @@ dependencies = [ [[package]] name = "pin-project-internal" -version = "1.1.6" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4502d8515ca9f32f1fb543d987f63d95a14934883db45bdb48060b6b69257f8" +checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", @@ -6940,6 +6944,19 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-tungstenite" version = "0.24.0" @@ -7023,7 +7040,7 @@ dependencies = [ "hyper-timeout", "hyper-util", "percent-encoding", - "pin-project 1.1.6", + "pin-project 1.1.7", "prost", "rustls-native-certs 0.8.0", "rustls-pemfile 2.2.0", @@ -7046,7 +7063,7 @@ dependencies = [ "futures-core", "futures-util", "indexmap 1.9.3", - "pin-project 1.1.6", + "pin-project 1.1.7", "pin-project-lite", "rand 0.8.5", "slab", @@ -7075,16 +7092,15 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.5.2" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "bitflags 2.6.0", "bytes", "http 1.1.0", "http-body 1.0.1", - "http-body-util", "mime", "pin-project-lite", "tower-layer", @@ -7104,6 +7120,20 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" +[[package]] +name = "tower-test" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4546773ffeab9e4ea02b8872faa49bb616a80a7da66afc2f32688943f97efa7" +dependencies = [ + "futures-util", + "pin-project 1.1.7", + "tokio", + "tokio-test", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-util" version = "0.3.1" @@ -7126,7 +7156,7 @@ dependencies = [ "forwarded-header-value", "governor", "http 1.1.0", - "pin-project 1.1.6", + "pin-project 1.1.7", "thiserror", "tower 0.5.1", "tracing", diff --git a/crates/monitor/src/deployment_to_allocation.rs b/crates/monitor/src/deployment_to_allocation.rs new file mode 100644 index 000000000..97756cb0c --- /dev/null +++ b/crates/monitor/src/deployment_to_allocation.rs @@ -0,0 +1,44 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashMap; +use thegraph_core::{Address, DeploymentId}; +use tokio::sync::watch::Receiver; + +use indexer_allocation::Allocation; +use indexer_watcher::map_watcher; + +/// Watcher of indexer allocation +/// returning a map of subgraph deployment to allocation id +pub fn deployment_to_allocation( + indexer_allocations_rx: Receiver>, +) -> Receiver> { + map_watcher(indexer_allocations_rx, move |allocation| { + allocation + .iter() + .map(|(address, allocation)| (allocation.subgraph_deployment.id, *address)) + .collect() + }) +} + +#[cfg(test)] +mod tests { + use tokio::sync::watch; + + use super::deployment_to_allocation; + + #[tokio::test] + async fn test_deployment_to_allocation() { + let allocations = test_assets::INDEXER_ALLOCATIONS.clone(); + let allocations_watcher = watch::channel(allocations.clone()).1; + let deployment = deployment_to_allocation(allocations_watcher); + + let deployments = deployment.borrow(); + // one of the allocation id point to the same subgraph + assert_eq!(deployments.len(), 3); + // check if all allocations point to the subgraph id + for (key, val) in deployments.iter() { + assert_eq!(allocations.get(val).unwrap().subgraph_deployment.id, *key); + } + } +} diff --git a/crates/monitor/src/lib.rs b/crates/monitor/src/lib.rs index 765ee6299..f90ffd6d9 100644 --- a/crates/monitor/src/lib.rs +++ b/crates/monitor/src/lib.rs @@ -4,6 +4,7 @@ mod allocations; mod attestation; mod client; +mod deployment_to_allocation; mod dispute_manager; mod escrow_accounts; @@ -11,6 +12,7 @@ pub use crate::{ allocations::indexer_allocations, attestation::attestation_signers, client::{DeploymentDetails, SubgraphClient}, + deployment_to_allocation::deployment_to_allocation, dispute_manager::dispute_manager, escrow_accounts::{escrow_accounts, EscrowAccounts, EscrowAccountsError}, }; diff --git a/crates/service/Cargo.toml b/crates/service/Cargo.toml index 4a9c6f2f3..1bfa00a99 100644 --- a/crates/service/Cargo.toml +++ b/crates/service/Cargo.toml @@ -40,7 +40,7 @@ tap_core.workspace = true uuid.workspace = true alloy.workspace = true tower_governor = "0.4.0" -tower-http = { version = "0.5.2", features = [ +tower-http = { version = "0.6.2", features = [ "auth", "cors", "normalize-path", @@ -53,10 +53,14 @@ axum-extra = { version = "0.9.3", features = [ tokio-util = "0.7.10" cost-model = { git = "https://github.com/graphprotocol/agora", rev = "3ed34ca" } bip39.workspace = true +tower = "0.5.1" +pin-project = "1.1.7" [dev-dependencies] hex-literal = "0.4.1" test-assets = { path = "../test-assets" } +tower-test = "0.4.0" +tokio-test = "0.4.4" [build-dependencies] build-info-build = { version = "0.0.39", default-features = false } diff --git a/crates/service/src/error.rs b/crates/service/src/error.rs index 586a7449b..98fcef42f 100644 --- a/crates/service/src/error.rs +++ b/crates/service/src/error.rs @@ -1,12 +1,70 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use alloy::primitives::Address; use anyhow::Error; -use axum::response::{IntoResponse, Response}; +use axum::{ + response::{IntoResponse, Response}, + Json, +}; +use indexer_monitor::EscrowAccountsError; use reqwest::StatusCode; +use serde::Serialize; use thegraph_core::DeploymentId; use thiserror::Error; +#[derive(Debug, Error)] +pub enum IndexerServiceError { + #[error("Issues with provided receipt: {0}")] + ReceiptError(#[from] tap_core::Error), + #[error("No attestation signer found for allocation `{0}`")] + NoSignerForAllocation(Address), + #[error("Invalid request body: {0}")] + InvalidRequest(anyhow::Error), + #[error("Error while processing the request: {0}")] + ProcessingError(SubgraphServiceError), + #[error("No valid receipt or free query auth token provided")] + Unauthorized, + #[error("Invalid free query auth token")] + InvalidFreeQueryAuthToken, + #[error("Failed to sign attestation")] + FailedToSignAttestation, + + #[error("There was an error while accessing escrow account: {0}")] + EscrowAccount(#[from] EscrowAccountsError), +} + +impl IntoResponse for IndexerServiceError { + fn into_response(self) -> Response { + use IndexerServiceError::*; + + #[derive(Serialize)] + struct ErrorResponse { + message: String, + } + + let status = match self { + Unauthorized => StatusCode::UNAUTHORIZED, + + NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR, + + ReceiptError(_) + | InvalidRequest(_) + | InvalidFreeQueryAuthToken + | EscrowAccount(_) + | ProcessingError(_) => StatusCode::BAD_REQUEST, + }; + tracing::error!(%self, "An IndexerServiceError occoured."); + ( + status, + Json(ErrorResponse { + message: self.to_string(), + }), + ) + .into_response() + } +} + #[derive(Debug, Error)] pub enum SubgraphServiceError { #[error("Invalid status query: {0}")] diff --git a/crates/service/src/lib.rs b/crates/service/src/lib.rs index ac967df4e..8ed5ec937 100644 --- a/crates/service/src/lib.rs +++ b/crates/service/src/lib.rs @@ -4,6 +4,8 @@ mod cli; mod database; mod error; +mod metrics; +mod middleware; mod routes; pub mod service; mod tap; diff --git a/crates/service/src/metrics.rs b/crates/service/src/metrics.rs new file mode 100644 index 000000000..d9517492c --- /dev/null +++ b/crates/service/src/metrics.rs @@ -0,0 +1,39 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use lazy_static::lazy_static; +use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec}; + +lazy_static! { + /// Metric registered in global registry for + /// indexer query handler + /// + /// Labels: "deployment", "allocation", "sender" + pub static ref HANDLER_HISTOGRAM: HistogramVec = register_histogram_vec!( + "indexer_query_handler_seconds", + "Histogram for default indexer query handler", + &["deployment", "allocation", "sender"] + ).unwrap(); + + /// Metric registered in global registry for + /// Failed queries to handler + /// + /// Labels: "deployment" + pub static ref HANDLER_FAILURE: CounterVec = register_counter_vec!( + "indexer_query_handler_failed_total", + "Failed queries to handler", + &["deployment"] + ).unwrap(); + + /// Metric registered in global registry for + /// Failed receipt checks + /// + /// Labels: "deployment", "allocation", "sender" + pub static ref FAILED_RECEIPT: CounterVec = register_counter_vec!( + "indexer_receipt_failed_total", + "Failed receipt checks", + &["deployment", "allocation", "sender"] + ) + .unwrap(); + +} diff --git a/crates/service/src/middleware.rs b/crates/service/src/middleware.rs new file mode 100644 index 000000000..0c76f1853 --- /dev/null +++ b/crates/service/src/middleware.rs @@ -0,0 +1,16 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +mod inject_allocation; +mod inject_deployment; +mod inject_labels; +mod inject_receipt; +mod inject_sender; +mod prometheus_metrics; + +pub use inject_allocation::{allocation_middleware, Allocation, AllocationState}; +pub use inject_deployment::deployment_middleware; +pub use inject_labels::labels_middleware; +pub use inject_receipt::receipt_middleware; +pub use inject_sender::{sender_middleware, Sender, SenderState}; +pub use prometheus_metrics::PrometheusMetricsMiddlewareLayer; diff --git a/crates/service/src/middleware/inject_allocation.rs b/crates/service/src/middleware/inject_allocation.rs new file mode 100644 index 000000000..d9b572b82 --- /dev/null +++ b/crates/service/src/middleware/inject_allocation.rs @@ -0,0 +1,128 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashMap; + +use alloy::primitives::Address; +use axum::{ + extract::{Request, State}, + middleware::Next, + response::Response, +}; +use tap_core::receipt::SignedReceipt; +use thegraph_core::DeploymentId; +use tokio::sync::watch; + +/// The current query Allocation Id address +#[derive(Clone)] +pub struct Allocation(pub Address); + +impl From for String { + fn from(value: Allocation) -> Self { + value.0.to_string() + } +} + +/// State to be used by allocation middleware +#[derive(Clone)] +pub struct AllocationState { + /// watcher that maps deployment ids to allocation ids + pub deployment_to_allocation: watch::Receiver>, +} + +/// Injects allocation id in extensions +/// - check if allocation id already exists +/// - else, try to fetch allocation id from deployment_id to allocations map +/// +/// Requires signed receipt Extension to be added OR deployment id +pub async fn allocation_middleware( + State(my_state): State, + mut request: Request, + next: Next, +) -> Response { + if let Some(receipt) = request.extensions().get::() { + let allocation = receipt.message.allocation_id; + request.extensions_mut().insert(Allocation(allocation)); + } else if let Some(deployment_id) = request.extensions().get::() { + if let Some(allocation) = my_state + .deployment_to_allocation + .borrow() + .get(deployment_id) + { + request.extensions_mut().insert(Allocation(*allocation)); + } + } + + next.run(request).await +} + +#[cfg(test)] +mod tests { + use crate::middleware::inject_allocation::Allocation; + + use super::{allocation_middleware, AllocationState}; + + use alloy::primitives::Address; + use axum::{ + body::Body, + http::{Extensions, Request}, + middleware::from_fn_with_state, + routing::get, + Router, + }; + use reqwest::StatusCode; + use test_assets::{create_signed_receipt, ESCROW_SUBGRAPH_DEPLOYMENT}; + use tokio::sync::watch; + use tower::ServiceExt; + + #[tokio::test] + async fn test_allocation_middleware() { + let deployment = *ESCROW_SUBGRAPH_DEPLOYMENT; + let deployment_to_allocation = + watch::channel(vec![(deployment, Address::ZERO)].into_iter().collect()).1; + let state = AllocationState { + deployment_to_allocation, + }; + + let middleware = from_fn_with_state(state, allocation_middleware); + + async fn handle(extensions: Extensions) -> Body { + let allocation = extensions + .get::() + .expect("Should contain allocation"); + assert_eq!(allocation.0, Address::ZERO); + Body::empty() + } + + let app = Router::new().route("/", get(handle)).layer(middleware); + + let receipt = create_signed_receipt(Address::ZERO, 1, 1, 1).await; + + // with receipt + let res = app + .clone() + .oneshot( + Request::builder() + .uri("/") + .extension(receipt) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + + // with deployment + let res = app + .oneshot( + Request::builder() + .uri("/") + .extension(deployment) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } +} diff --git a/crates/service/src/middleware/inject_deployment.rs b/crates/service/src/middleware/inject_deployment.rs new file mode 100644 index 000000000..45f2e1110 --- /dev/null +++ b/crates/service/src/middleware/inject_deployment.rs @@ -0,0 +1,65 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use axum::{ + extract::{Path, Request}, + middleware::Next, + response::Response, + RequestExt, +}; +use thegraph_core::DeploymentId; + +/// Injects deployment id in the extensions from the path +pub async fn deployment_middleware(mut request: Request, next: Next) -> Response { + let deployment_id = request.extract_parts::>().await.ok(); + if let Some(Path(deployment_id)) = deployment_id { + request.extensions_mut().insert(deployment_id); + } + next.run(request).await +} + +#[cfg(test)] +mod tests { + use super::deployment_middleware; + use axum::{ + body::Body, + http::{Extensions, Request}, + middleware::from_fn, + routing::get, + Router, + }; + use reqwest::StatusCode; + use test_assets::ESCROW_SUBGRAPH_DEPLOYMENT; + use thegraph_core::DeploymentId; + use tower::ServiceExt; + + #[tokio::test] + async fn test_deployment_middleware() { + let middleware = from_fn(deployment_middleware); + + let deployment = *ESCROW_SUBGRAPH_DEPLOYMENT; + + let handle = move |extensions: Extensions| async move { + let received_deployment = extensions + .get::() + .expect("Should contain a deployment_id"); + assert_eq!(*received_deployment, deployment); + Body::empty() + }; + + let app = Router::new() + .route("/:deployment_id", get(handle)) + .layer(middleware); + + let res = app + .oneshot( + Request::builder() + .uri(format!("/{}", deployment)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } +} diff --git a/crates/service/src/middleware/inject_labels.rs b/crates/service/src/middleware/inject_labels.rs new file mode 100644 index 000000000..8b87db4a5 --- /dev/null +++ b/crates/service/src/middleware/inject_labels.rs @@ -0,0 +1,167 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use axum::{extract::Request, middleware::Next, response::Response}; +use thegraph_core::DeploymentId; + +use super::{ + inject_allocation::Allocation, + inject_sender::Sender, + prometheus_metrics::{MetricLabelProvider, MetricLabels}, +}; + +const NO_DEPLOYMENT_ID: &str = "no-deployment"; +const NO_ALLOCATION: &str = "no-allocation"; +const NO_SENDER: &str = "no-sender"; + +/// Labels used by metrics which implements MetricLabelProvider +/// +/// Might contain sender, allocation and deployment id and fills +/// the gaps with constant values when they are not present +#[derive(Clone, Default)] +pub struct SenderAllocationDeploymentLabels { + sender: Option, + allocation: Option, + deployment_id: Option, +} + +impl MetricLabelProvider for SenderAllocationDeploymentLabels { + fn get_labels(&self) -> Vec<&str> { + let mut list = vec![]; + if let Some(deployment_id) = &self.deployment_id { + list.push(deployment_id.as_str()); + } else { + list.push(NO_DEPLOYMENT_ID); + } + if let Some(allocation) = &self.allocation { + list.push(allocation.as_str()); + } else { + list.push(NO_ALLOCATION); + } + if let Some(sender) = &self.sender { + list.push(sender.as_str()); + } else { + list.push(NO_SENDER); + } + list + } +} + +/// Injects Metric Labels to be used by MetricMiddleware +/// +/// Soft requirement for Sender, Allocation and Deployment extensions +pub async fn labels_middleware(mut request: Request, next: Next) -> Response { + let sender: Option = request + .extensions() + .get::() + .map(|s| s.clone().into()); + + let allocation: Option = request + .extensions() + .get::() + .map(|s| s.clone().into()); + + let deployment_id: Option = request + .extensions() + .get::() + .map(|s| s.clone().to_string()); + + let labels: MetricLabels = Arc::new(SenderAllocationDeploymentLabels { + sender, + allocation, + deployment_id, + }); + request.extensions_mut().insert(labels); + + next.run(request).await +} + +#[cfg(test)] +mod tests { + use crate::middleware::{ + inject_allocation::Allocation, + inject_labels::{NO_ALLOCATION, NO_DEPLOYMENT_ID, NO_SENDER}, + inject_sender::Sender, + prometheus_metrics::MetricLabels, + }; + + use super::labels_middleware; + + use alloy::primitives::Address; + use axum::{ + body::Body, + http::{Extensions, Request}, + middleware::from_fn, + routing::get, + Router, + }; + use reqwest::StatusCode; + use test_assets::ESCROW_SUBGRAPH_DEPLOYMENT; + use tower::ServiceExt; + + #[tokio::test] + async fn test_label_middleware() { + let middleware = from_fn(labels_middleware); + + let deployment = *ESCROW_SUBGRAPH_DEPLOYMENT; + let sender = Address::ZERO; + let allocation = Address::ZERO; + + let handle = move |extensions: Extensions| async move { + let metrics = extensions + .get::() + .expect("Should decode metric label"); + assert_eq!( + metrics.get_labels(), + vec![ + &deployment.to_string(), + &allocation.to_string(), + &sender.to_string(), + ] + ); + Body::empty() + }; + + let app = Router::new().route("/", get(handle)).layer(middleware); + + let res = app + .oneshot( + Request::builder() + .uri("/") + .extension(Sender(sender)) + .extension(deployment) + .extension(Allocation(allocation)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } + + #[tokio::test] + async fn test_empty_label_middleware() { + let middleware = from_fn(labels_middleware); + + let handle = move |extensions: Extensions| async move { + let metrics = extensions + .get::() + .expect("Should decode metric label"); + assert_eq!( + metrics.get_labels(), + vec![NO_DEPLOYMENT_ID, NO_ALLOCATION, NO_SENDER] + ); + Body::empty() + }; + + let app = Router::new().route("/", get(handle)).layer(middleware); + + let res = app + .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } +} diff --git a/crates/service/src/middleware/inject_receipt.rs b/crates/service/src/middleware/inject_receipt.rs new file mode 100644 index 000000000..db2491c82 --- /dev/null +++ b/crates/service/src/middleware/inject_receipt.rs @@ -0,0 +1,73 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use axum::{extract::Request, middleware::Next, response::Response, RequestExt}; +use axum_extra::TypedHeader; + +use crate::service::TapReceipt; + +/// Injects tap receipts in the extensions +/// +/// A request won't always have a receipt because they might be +/// free queries. +/// That's why we don't fail with 400. +/// +/// This is useful to not deserialize multiple times the same receipt +pub async fn receipt_middleware(mut request: Request, next: Next) -> Response { + if let Ok(TypedHeader(receipt)) = request.extract_parts::>().await { + if let Some(receipt) = receipt.into_signed_receipt() { + request.extensions_mut().insert(receipt); + } + } + next.run(request).await +} + +#[cfg(test)] +mod tests { + use crate::{middleware::inject_receipt::receipt_middleware, service::TapReceipt}; + + use alloy::primitives::Address; + use axum::{ + body::Body, + http::{Extensions, Request}, + middleware::from_fn, + routing::get, + Router, + }; + use axum_extra::headers::Header; + use reqwest::StatusCode; + use tap_core::receipt::SignedReceipt; + use test_assets::create_signed_receipt; + use tower::ServiceExt; + + #[tokio::test] + async fn test_receipt_middleware() { + let middleware = from_fn(receipt_middleware); + + let receipt = create_signed_receipt(Address::ZERO, 1, 1, 1).await; + let receipt_json = serde_json::to_string(&receipt).unwrap(); + + let handle = move |extensions: Extensions| async move { + let received_receipt = extensions + .get::() + .expect("Should decode tap receipt"); + assert_eq!(received_receipt.message, receipt.message); + assert_eq!(received_receipt.signature, receipt.signature); + Body::empty() + }; + + let app = Router::new().route("/", get(handle)).layer(middleware); + + let res = app + .oneshot( + Request::builder() + .uri("/") + .header(TapReceipt::name(), receipt_json) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } +} diff --git a/crates/service/src/middleware/inject_sender.rs b/crates/service/src/middleware/inject_sender.rs new file mode 100644 index 000000000..f5a014c43 --- /dev/null +++ b/crates/service/src/middleware/inject_sender.rs @@ -0,0 +1,116 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use alloy::{dyn_abi::Eip712Domain, primitives::Address}; +use axum::{ + extract::{Request, State}, + middleware::Next, + response::Response, +}; +use indexer_monitor::EscrowAccounts; +use tap_core::receipt::SignedReceipt; +use tokio::sync::watch; + +use crate::error::IndexerServiceError; + +/// Stated used by sender middleware +#[derive(Clone)] +pub struct SenderState { + /// Used to recover the signer address + pub domain_separator: Eip712Domain, + /// Used to get the sender address given the signer address + pub escrow_accounts: watch::Receiver, +} + +/// The current query Sender address +#[derive(Clone)] +pub struct Sender(pub Address); + +impl From for String { + fn from(value: Sender) -> Self { + value.0.to_string() + } +} + +/// Injects the sender found from the signer in the receipt +/// +/// A request won't always have a receipt because they might be +/// free queries. +/// That's why we don't fail with 400. +/// +/// Requires Receipt extension +pub async fn sender_middleware( + State(state): State, + mut request: Request, + next: Next, +) -> Result { + if let Some(receipt) = request.extensions().get::() { + let signer = receipt.recover_signer(&state.domain_separator)?; + let sender = state + .escrow_accounts + .borrow() + .get_sender_for_signer(&signer)?; + request.extensions_mut().insert(Sender(sender)); + } + + Ok(next.run(request).await) +} + +#[cfg(test)] +mod tests { + use crate::middleware::inject_sender::SenderState; + + use super::{sender_middleware, Sender}; + use alloy::primitives::Address; + use axum::{ + body::Body, + http::{Extensions, Request}, + middleware::from_fn_with_state, + routing::get, + Router, + }; + use indexer_monitor::EscrowAccounts; + use reqwest::StatusCode; + use test_assets::{ + create_signed_receipt, ESCROW_ACCOUNTS_BALANCES, ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS, + }; + use tokio::sync::watch; + use tower::ServiceExt; + + #[tokio::test] + async fn test_sender_middleware() { + let escrow_accounts = watch::channel(EscrowAccounts::new( + ESCROW_ACCOUNTS_BALANCES.to_owned(), + ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + )) + .1; + let state = SenderState { + domain_separator: test_assets::TAP_EIP712_DOMAIN.clone(), + escrow_accounts, + }; + + let middleware = from_fn_with_state(state, sender_middleware); + + async fn handle(extensions: Extensions) -> Body { + let sender = extensions.get::().expect("Should contain sender"); + assert_eq!(sender.0, test_assets::TAP_SENDER.1); + Body::empty() + } + + let app = Router::new().route("/", get(handle)).layer(middleware); + + let receipt = create_signed_receipt(Address::ZERO, 1, 1, 1).await; + + let res = app + .oneshot( + Request::builder() + .uri("/") + .extension(receipt) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } +} diff --git a/crates/service/src/middleware/prometheus_metrics.rs b/crates/service/src/middleware/prometheus_metrics.rs new file mode 100644 index 000000000..409e01ba2 --- /dev/null +++ b/crates/service/src/middleware/prometheus_metrics.rs @@ -0,0 +1,255 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +//! update metrics in case it succeeds or fails + +use axum::http::Request; +use pin_project::pin_project; +use prometheus::HistogramTimer; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tower::{Layer, Service}; + +pub type MetricLabels = Arc; + +pub trait MetricLabelProvider { + fn get_labels(&self) -> Vec<&str>; +} + +/// Middleware for metrics +#[derive(Clone)] +pub struct PrometheusMetricsMiddleware { + inner: S, + histogram: prometheus::HistogramVec, + failure: prometheus::CounterVec, +} + +/// MetricsMiddleware used in tower components +/// +/// Register prometheus metrics in case of success or failure +#[derive(Clone)] +pub struct PrometheusMetricsMiddlewareLayer { + /// Histogram used to register the processing timer + histogram: prometheus::HistogramVec, + /// Counter metric in case of failure + failure: prometheus::CounterVec, +} + +impl PrometheusMetricsMiddlewareLayer { + pub fn new(histogram: prometheus::HistogramVec, failure: prometheus::CounterVec) -> Self { + Self { histogram, failure } + } +} + +impl Layer for PrometheusMetricsMiddlewareLayer { + type Service = PrometheusMetricsMiddleware; + + fn layer(&self, inner: S) -> Self::Service { + PrometheusMetricsMiddleware { + inner, + histogram: self.histogram.clone(), + failure: self.failure.clone(), + } + } +} + +impl Service> for PrometheusMetricsMiddleware +where + S: Service> + Clone + 'static, + ReqBody: 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = PrometheusMetricsFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> PrometheusMetricsFuture { + let labels = request.extensions().get::().cloned(); + PrometheusMetricsFuture { + timer: None, + histogram: self.histogram.clone(), + failure: self.failure.clone(), + labels, + fut: self.inner.call(request), + } + } +} + +#[pin_project] +pub struct PrometheusMetricsFuture { + /// Instant at which we started the requst. + timer: Option, + + histogram: prometheus::HistogramVec, + failure: prometheus::CounterVec, + + labels: Option, + + #[pin] + fut: F, +} + +impl Future for PrometheusMetricsFuture +where + F: Future>, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let Some(labels) = &this.labels else { + return this.fut.poll(cx); + }; + + if this.timer.is_none() { + // Start timer so we can track duration of request. + let duration_metric = this.histogram.with_label_values(&labels.get_labels()); + *this.timer = Some(duration_metric.start_timer()); + } + + match this.fut.poll(cx) { + Poll::Ready(result) => { + if result.is_err() { + let _ = this + .failure + .get_metric_with_label_values(&labels.get_labels()); + } + // Record the duration of this request. + if let Some(timer) = this.timer.take() { + timer.observe_duration(); + } + Poll::Ready(result) + } + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use axum::{ + body::Body, + http::{Request, Response}, + }; + use prometheus::core::Collector; + use tower::{Service, ServiceBuilder, ServiceExt}; + + use crate::middleware::prometheus_metrics::{MetricLabels, PrometheusMetricsMiddlewareLayer}; + + use super::MetricLabelProvider; + + struct TestLabel; + impl MetricLabelProvider for TestLabel { + fn get_labels(&self) -> Vec<&str> { + vec!["label1,", "label2", "label3"] + } + } + async fn handle(_: Request) -> anyhow::Result> { + Ok(Response::new(Body::default())) + } + + async fn handle_err(_: Request) -> anyhow::Result> { + Err(anyhow::anyhow!("Error")) + } + + #[tokio::test] + async fn test_metrics_middleware() { + let registry = prometheus::Registry::new(); + let histogram_metric = prometheus::register_histogram_vec_with_registry!( + "histogram_metric", + "Test", + &["deployment", "sender", "allocation"], + registry, + ) + .unwrap(); + + let failure_metric = prometheus::register_counter_vec_with_registry!( + "failure_metric", + "Test", + &["deployment", "sender", "allocation"], + registry, + ) + .unwrap(); + + // check if everything is clean + assert_eq!( + histogram_metric + .collect() + .first() + .unwrap() + .get_metric() + .len(), + 0 + ); + assert_eq!( + failure_metric.collect().first().unwrap().get_metric().len(), + 0 + ); + + let metrics_layer = + PrometheusMetricsMiddlewareLayer::new(histogram_metric.clone(), failure_metric.clone()); + let mut service = ServiceBuilder::new() + .layer(metrics_layer) + .service_fn(handle); + let handle = service.ready().await.unwrap(); + + // default labels, all empty + let labels: MetricLabels = Arc::new(TestLabel); + + let mut req = Request::new(Default::default()); + req.extensions_mut().insert(labels.clone()); + let _ = handle.call(req).await; + + assert_eq!( + histogram_metric + .collect() + .first() + .unwrap() + .get_metric() + .len(), + 1 + ); + + assert_eq!( + failure_metric.collect().first().unwrap().get_metric().len(), + 0 + ); + + let metrics_layer = + PrometheusMetricsMiddlewareLayer::new(histogram_metric.clone(), failure_metric.clone()); + let mut service = ServiceBuilder::new() + .layer(metrics_layer) + .service_fn(handle_err); + let handle = service.ready().await.unwrap(); + + let mut req = Request::new(Default::default()); + req.extensions_mut().insert(labels); + let _ = handle.call(req).await; + + // it's using the same labels, should have only one metric + assert_eq!( + histogram_metric + .collect() + .first() + .unwrap() + .get_metric() + .len(), + 1 + ); + + // new failture + assert_eq!( + failure_metric.collect().first().unwrap().get_metric().len(), + 1 + ); + } +} diff --git a/crates/service/src/routes/request_handler.rs b/crates/service/src/routes/request_handler.rs index 924830521..3a84fce5c 100644 --- a/crates/service/src/routes/request_handler.rs +++ b/crates/service/src/routes/request_handler.rs @@ -3,66 +3,32 @@ use std::sync::Arc; -use crate::tap::AgoraQuery; +use crate::{ + error::IndexerServiceError, + metrics::FAILED_RECEIPT, + middleware::{Allocation, Sender}, + tap::AgoraQuery, +}; use axum::{ extract::{Path, State}, http::HeaderMap, response::IntoResponse, + Extension, }; use axum_extra::TypedHeader; -use lazy_static::lazy_static; -use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec}; use reqwest::StatusCode; use serde_json::value::RawValue; use tap_core::receipt::Context; use thegraph_core::DeploymentId; use tracing::trace; -use crate::service::{ - AttestationOutput, IndexerServiceError, IndexerServiceResponse, IndexerServiceState, TapReceipt, -}; - -lazy_static! { - /// Register indexer error metrics in Prometheus registry - pub static ref HANDLER_HISTOGRAM: HistogramVec = register_histogram_vec!( - "indexer_query_handler_seconds", - "Histogram for default indexer query handler", - &["deployment", "allocation", "sender"] - ).unwrap(); - - pub static ref HANDLER_FAILURE: CounterVec = register_counter_vec!( - "indexer_query_handler_failed_total", - "Failed queries to handler", - &["deployment"] - ).unwrap(); - - pub static ref FAILED_RECEIPT: CounterVec = register_counter_vec!( - "indexer_receipt_failed_total", - "Failed receipt checks", - &["deployment", "allocation", "sender"] - ).unwrap(); - -} +use crate::service::{AttestationOutput, IndexerServiceResponse, IndexerServiceState, TapReceipt}; pub async fn request_handler( Path(manifest_id): Path, - typed_header: TypedHeader, - state: State>, - headers: HeaderMap, - body: String, -) -> Result { - _request_handler(manifest_id, typed_header, state, headers, body) - .await - .inspect_err(|_| { - HANDLER_FAILURE - .with_label_values(&[&manifest_id.to_string()]) - .inc() - }) -} - -async fn _request_handler( - manifest_id: DeploymentId, TypedHeader(receipt): TypedHeader, + Extension(Sender(sender)): Extension, + Extension(Allocation(allocation_id)): Extension, State(state): State>, headers: HeaderMap, req: String, @@ -72,8 +38,35 @@ async fn _request_handler( let request: QueryBody = serde_json::from_str(&req).map_err(|e| IndexerServiceError::InvalidRequest(e.into()))?; - let Some(receipt) = receipt.into_signed_receipt() else { - // Serve free query, NO METRICS + if let Some(receipt) = receipt.into_signed_receipt() { + let variables = request + .variables + .as_ref() + .map(ToString::to_string) + .unwrap_or_default(); + let mut ctx = Context::new(); + ctx.insert(AgoraQuery { + deployment_id: manifest_id, + query: request.query.clone(), + variables, + }); + + // Verify the receipt and store it in the database + state + .tap_manager + .verify_and_store_receipt(&ctx, receipt) + .await + .inspect_err(|_| { + FAILED_RECEIPT + .with_label_values(&[ + &manifest_id.to_string(), + &allocation_id.to_string(), + &sender.to_string(), + ]) + .inc() + }) + .map_err(IndexerServiceError::ReceiptError)?; + } else { match headers .get("authorization") .and_then(|v| v.to_str().ok()) @@ -89,69 +82,7 @@ async fn _request_handler( } trace!(?manifest_id, "New free query"); - - let response = state - .service_impl - .process_request(manifest_id, request) - .await - .map_err(IndexerServiceError::ProcessingError)? - .finalize(AttestationOutput::Attestable); - return Ok((StatusCode::OK, response)); - }; - - let allocation_id = receipt.message.allocation_id; - - let variables = request - .variables - .as_ref() - .map(ToString::to_string) - .unwrap_or_default(); - let mut ctx = Context::new(); - ctx.insert(AgoraQuery { - deployment_id: manifest_id, - query: request.query.clone(), - variables, - }); - - // recover the signer address - // get escrow accounts from channel - // return sender from signer - // - // TODO: We are currently doing this process twice. - // One here and other on common/src/tap/checks/sender_balance_check.rs - // We'll get back to normal once we have attachable context to `verify_and_store_receipt` - let signer = receipt - .recover_signer(&state.domain_separator) - .map_err(IndexerServiceError::CouldNotDecodeSigner)?; - let sender = state - .escrow_accounts - .borrow() - .get_sender_for_signer(&signer) - .map_err(IndexerServiceError::EscrowAccount)?; - - let _metric = HANDLER_HISTOGRAM - .with_label_values(&[ - &manifest_id.to_string(), - &allocation_id.to_string(), - &sender.to_string(), - ]) - .start_timer(); - - // Verify the receipt and store it in the database - state - .tap_manager - .verify_and_store_receipt(&ctx, receipt) - .await - .inspect_err(|_| { - FAILED_RECEIPT - .with_label_values(&[ - &manifest_id.to_string(), - &allocation_id.to_string(), - &sender.to_string(), - ]) - .inc() - }) - .map_err(IndexerServiceError::ReceiptError)?; + } // Check if we have an attestation signer for the allocation the receipt was created for let signer = state diff --git a/crates/service/src/service.rs b/crates/service/src/service.rs index 0e13d64e2..0a59cc5aa 100644 --- a/crates/service/src/service.rs +++ b/crates/service/src/service.rs @@ -35,9 +35,7 @@ use tracing::error; mod indexer_service; mod tap_receipt_header; -pub use indexer_service::{ - AttestationOutput, IndexerServiceError, IndexerServiceResponse, IndexerServiceState, -}; +pub use indexer_service::{AttestationOutput, IndexerServiceResponse, IndexerServiceState}; pub use tap_receipt_header::TapReceipt; #[derive(Debug)] diff --git a/crates/service/src/service/indexer_service.rs b/crates/service/src/service/indexer_service.rs index ec05ebeb7..c50cf8b3a 100644 --- a/crates/service/src/service/indexer_service.rs +++ b/crates/service/src/service/indexer_service.rs @@ -1,49 +1,51 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use alloy::dyn_abi::Eip712Domain; use anyhow; use axum::extract::MatchedPath; use axum::extract::Request as ExtractRequest; -use axum::http::{Method, Request}; use axum::{ - response::{IntoResponse, Response}, + http::{Method, Request}, + middleware::{from_fn, from_fn_with_state}, + response::IntoResponse, routing::{get, post}, - Json, Router, + serve, Json, Router, ServiceExt, }; -use axum::{serve, ServiceExt}; use build_info::BuildInfo; use indexer_attestation::AttestationSigner; use indexer_monitor::{ - attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, DeploymentDetails, - EscrowAccounts, EscrowAccountsError, SubgraphClient, + attestation_signers, deployment_to_allocation, dispute_manager, escrow_accounts, + indexer_allocations, DeploymentDetails, SubgraphClient, }; use prometheus::TextEncoder; use reqwest::StatusCode; use serde::Serialize; use sqlx::postgres::PgPoolOptions; use std::{ - collections::HashMap, error::Error, fmt::Debug, net::SocketAddr, path::PathBuf, sync::Arc, - time::Duration, + collections::HashMap, error::Error, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration, }; use tap_core::{manager::Manager, receipt::checks::CheckList, tap_eip712_domain}; use thegraph_core::{Address, Attestation}; -use thiserror::Error; use tokio::net::TcpListener; use tokio::signal; use tokio::sync::watch::Receiver; +use tower::ServiceBuilder; use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; use tower_http::validate_request::ValidateRequestHeaderLayer; use tower_http::{cors, cors::CorsLayer, normalize_path::NormalizePath, trace::TraceLayer}; use tracing::warn; use tracing::{error, info, info_span}; -use crate::error::SubgraphServiceError; -use crate::routes::health; -use crate::routes::request_handler; -use crate::routes::static_subgraph_request_handler; -use crate::tap::IndexerTapContext; -use crate::wallet::public_key; +use crate::{ + metrics::{HANDLER_FAILURE, HANDLER_HISTOGRAM}, + middleware::{ + allocation_middleware, deployment_middleware, labels_middleware, receipt_middleware, + sender_middleware, AllocationState, PrometheusMetricsMiddlewareLayer, SenderState, + }, + routes::{health, request_handler, static_subgraph_request_handler}, + tap::IndexerTapContext, + wallet::public_key, +}; use indexer_config::Config; use super::SubgraphService; @@ -62,62 +64,6 @@ pub enum AttestationOutput { Attestable, } -#[derive(Debug, Error)] -pub enum IndexerServiceError { - #[error("Issues with provided receipt: {0}")] - ReceiptError(tap_core::Error), - #[error("No attestation signer found for allocation `{0}`")] - NoSignerForAllocation(Address), - #[error("Invalid request body: {0}")] - InvalidRequest(anyhow::Error), - #[error("Error while processing the request: {0}")] - ProcessingError(SubgraphServiceError), - #[error("No valid receipt or free query auth token provided")] - Unauthorized, - #[error("Invalid free query auth token")] - InvalidFreeQueryAuthToken, - #[error("Failed to sign attestation")] - FailedToSignAttestation, - - #[error("Could not decode signer: {0}")] - CouldNotDecodeSigner(tap_core::Error), - - #[error("There was an error while accessing escrow account: {0}")] - EscrowAccount(EscrowAccountsError), -} - -impl IntoResponse for IndexerServiceError { - fn into_response(self) -> Response { - use IndexerServiceError::*; - - #[derive(Serialize)] - struct ErrorResponse { - message: String, - } - - let status = match self { - Unauthorized => StatusCode::UNAUTHORIZED, - - NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR, - - ReceiptError(_) - | InvalidRequest(_) - | InvalidFreeQueryAuthToken - | CouldNotDecodeSigner(_) - | EscrowAccount(_) - | ProcessingError(_) => StatusCode::BAD_REQUEST, - }; - tracing::error!(%self, "An IndexerServiceError occoured."); - ( - status, - Json(ErrorResponse { - message: self.to_string(), - }), - ) - .into_response() - } -} - #[derive(Clone, Serialize)] pub struct IndexerServiceRelease { version: String, @@ -152,10 +98,6 @@ pub struct IndexerServiceState { pub attestation_signers: Receiver>, pub tap_manager: Manager, pub service_impl: SubgraphService, - - // tap - pub escrow_accounts: Receiver, - pub domain_separator: Eip712Domain, } const HTTP_CLIENT_TIMEOUT: Duration = Duration::from_secs(30); @@ -316,7 +258,7 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { let checks = IndexerTapContext::get_checks( database, - allocations, + allocations.clone(), escrow_accounts.clone(), domain_separator.clone(), timestamp_error_tolerance, @@ -335,8 +277,6 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { attestation_signers, tap_manager, service_impl: options.service_impl, - escrow_accounts, - domain_separator, }); // Rate limits by allowing bursts of 10 requests and requiring 100ms of @@ -421,13 +361,39 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> { misc_routes = misc_routes.with_state(state.clone()); + let deployment_to_allocation = deployment_to_allocation(allocations); + let allocation_state = AllocationState { + deployment_to_allocation, + }; + let sender_state = SenderState { + escrow_accounts, + domain_separator, + }; + + let service_builder = ServiceBuilder::new() + // inject deployment id + .layer(from_fn(deployment_middleware)) + // inject receipt + .layer(from_fn(receipt_middleware)) + // inject allocation id + .layer(from_fn_with_state(allocation_state, allocation_middleware)) + // inject sender + .layer(from_fn_with_state(sender_state, sender_middleware)) + // inject metrics labels + .layer(from_fn(labels_middleware)) + // metrics for histogram and failure + .layer(PrometheusMetricsMiddlewareLayer::new( + HANDLER_HISTOGRAM.clone(), + HANDLER_FAILURE.clone(), + )); + let data_routes = Router::new() .route( PathBuf::from(&options.config.service.url_prefix) .join(format!("{}/id/:id", options.url_namespace)) .to_str() .expect("Failed to set up `/{url_namespace}/id/:id` route"), - post(request_handler), + post(request_handler).route_layer(service_builder), ) .with_state(state.clone()); diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index b011b2796..931c62fb2 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -113,3 +113,34 @@ where } }) } + +// Maps all outputs of Receiver into a new watcher +pub fn map_watcher( + mut receiver: watch::Receiver, + map_function: F, +) -> watch::Receiver +where + T1: Clone + Send + Sync + 'static, + T2: Send + Sync + 'static, + F: Fn(T1) -> T2 + Send + 'static, +{ + let initial_value = map_function(receiver.borrow().clone()); + let (tx, rx) = watch::channel(initial_value); + + tokio::spawn(async move { + loop { + select! { + Ok(())= receiver.changed() =>{}, + else=>{ + // Something is wrong. + panic!("receiver was dropped"); + } + } + + let current_val = receiver.borrow().clone(); + let mapped_value = map_function(current_val); + tx.send(mapped_value).expect("Failed to update channel"); + } + }); + rx +}