Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,4 @@ thegraph-core = { git = "https://github.com/edgeandnode/toolshed", rev = "166353
thegraph-graphql-http = "0.2.0"
graphql_client = { version = "0.14.0", features = ["reqwest-rustls"] }
bip39 = "2.0.0"
rstest = "0.23.0"
2 changes: 2 additions & 0 deletions crates/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ pin-project = "1.1.7"
[dev-dependencies]
hex-literal = "0.4.1"
test-assets = { path = "../test-assets" }
rstest.workspace = true
tower-test = "0.4.0"
tower-service = "0.3.3"
tokio-test = "0.4.4"

[build-dependencies]
Expand Down
28 changes: 15 additions & 13 deletions crates/service/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@ use thiserror::Error;

#[derive(Debug, Error)]
pub enum IndexerServiceError {
#[error("No Tap receipt was found in the request")]
ReceiptNotFound,
#[error("Could not find deployment id")]
DeploymentIdNotFound,
#[error(transparent)]
AxumError(#[from] axum::Error),

#[error(transparent)]
SerializationError(#[from] serde_json::Error),

#[error("Issues with provided receipt: {0}")]
ReceiptError(#[from] tap_core::Error),
#[error("No attestation signer found for allocation `{0}`")]
NoSignerForAllocation(Address),
#[error("Invalid request body: {0}")]
InvalidRequest(anyhow::Error),
#[error("Error while processing the request: {0}")]
ProcessingError(SubgraphServiceError),
#[error("No valid receipt or free query auth token provided")]
Unauthorized,
#[error("Invalid free query auth token")]
InvalidFreeQueryAuthToken,
#[error("Failed to sign attestation")]
FailedToSignAttestation,

Expand All @@ -44,15 +48,13 @@ impl IntoResponse for IndexerServiceError {
}

let status = match self {
Unauthorized => StatusCode::UNAUTHORIZED,

NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR,

ReceiptError(_)
| InvalidRequest(_)
| InvalidFreeQueryAuthToken
| EscrowAccount(_)
| ProcessingError(_) => StatusCode::BAD_REQUEST,
ReceiptError(_) | EscrowAccount(_) | ProcessingError(_) => StatusCode::BAD_REQUEST,
ReceiptNotFound => StatusCode::PAYMENT_REQUIRED,
DeploymentIdNotFound => StatusCode::INTERNAL_SERVER_ERROR,
AxumError(_) => StatusCode::BAD_REQUEST,
SerializationError(_) => StatusCode::BAD_REQUEST,
};
tracing::error!(%self, "An IndexerServiceError occoured.");
(
Expand Down
5 changes: 4 additions & 1 deletion crates/service/src/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

pub mod auth;
mod inject_allocation;
mod inject_context;
mod inject_deployment;
mod inject_labels;
mod inject_receipt;
mod inject_sender;
mod prometheus_metrics;

pub use inject_allocation::{allocation_middleware, Allocation, AllocationState};
pub use inject_context::context_middleware;
pub use inject_deployment::deployment_middleware;
pub use inject_labels::labels_middleware;
pub use inject_receipt::receipt_middleware;
pub use inject_sender::{sender_middleware, Sender, SenderState};
pub use inject_sender::{sender_middleware, SenderState};
pub use prometheus_metrics::PrometheusMetricsMiddlewareLayer;
138 changes: 138 additions & 0 deletions crates/service/src/middleware/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

mod bearer;
mod or;
mod tap;

pub use bearer::Bearer;
pub use or::OrExt;
pub use tap::tap_receipt_authorize;

#[cfg(test)]
mod tests {
use std::time::Duration;

use alloy::primitives::{address, Address};
use axum::body::Body;
use axum::http::{Request, Response};
use reqwest::{header, StatusCode};
use sqlx::PgPool;
use tap_core::{manager::Manager, receipt::checks::CheckList};
use tokio::time::sleep;
use tower::{Service, ServiceBuilder, ServiceExt};
use tower_http::auth::AsyncRequireAuthorizationLayer;

use crate::middleware::auth::{self, Bearer, OrExt};
use crate::tap::IndexerTapContext;
use test_assets::{create_signed_receipt, TAP_EIP712_DOMAIN};

const ALLOCATION_ID: Address = address!("deadbeefcafebabedeadbeefcafebabedeadbeef");
const BEARER_TOKEN: &str = "test";

async fn service(
pgpool: PgPool,
) -> impl Service<Request<Body>, Response = Response<Body>, Error = impl std::fmt::Debug> {
let context = IndexerTapContext::new(pgpool.clone(), TAP_EIP712_DOMAIN.clone()).await;
let tap_manager = Box::leak(Box::new(Manager::new(
TAP_EIP712_DOMAIN.clone(),
context,
CheckList::empty(),
)));

let registry = prometheus::Registry::new();
let metric = Box::leak(Box::new(
prometheus::register_counter_vec_with_registry!(
"merge_checks_test",
"Failed queries to handler",
&["deployment"],
registry,
)
.unwrap(),
));
let free_query = Bearer::new(BEARER_TOKEN);
let tap_auth = auth::tap_receipt_authorize(tap_manager, metric);
let authorize_requests = free_query.or(tap_auth);

let authorization_middleware = AsyncRequireAuthorizationLayer::new(authorize_requests);

let mut service = ServiceBuilder::new()
.layer(authorization_middleware)
.service_fn(|_: Request<Body>| async {
Ok::<_, anyhow::Error>(Response::new(Body::default()))
});

service.ready().await.unwrap();
service
}

#[sqlx::test(migrations = "../../migrations")]
async fn test_composition_header_valid(pgpool: PgPool) {
let mut service = service(pgpool.clone()).await;
// should allow queries that contains the free token
// if the token does not match, return payment required
let mut req = Request::new(Default::default());
req.headers_mut().insert(
header::AUTHORIZATION,
format!("Bearer {}", BEARER_TOKEN).parse().unwrap(),
);
let res = service.call(req).await.unwrap();
assert_eq!(res.status(), StatusCode::OK);
}

#[sqlx::test(migrations = "../../migrations")]
async fn test_composition_header_invalid(pgpool: PgPool) {
let mut service = service(pgpool.clone()).await;

// if the token exists but is wrong, try the receipt
let mut req = Request::new(Default::default());
req.headers_mut()
.insert(header::AUTHORIZATION, "Bearer wrongtoken".parse().unwrap());
let res = service.call(req).await.unwrap();
// we return the error from tap
assert_eq!(res.status(), StatusCode::PAYMENT_REQUIRED);
}

#[sqlx::test(migrations = "../../migrations")]
async fn test_composition_with_receipt(pgpool: PgPool) {
let mut service = service(pgpool.clone()).await;

let receipt = create_signed_receipt(ALLOCATION_ID, 1, 1, 1).await;

// check with receipt
let mut req = Request::new(Default::default());
req.extensions_mut().insert(receipt);
let res = service.call(req).await.unwrap();
assert_eq!(res.status(), StatusCode::OK);

// verify receipts
if tokio::time::timeout(Duration::from_secs(1), async {
loop {
let result = sqlx::query!("SELECT * FROM scalar_tap_receipts")
.fetch_all(&pgpool)
.await
.unwrap();

if result.is_empty() {
sleep(Duration::from_millis(50)).await;
} else {
break;
}
}
})
.await
.is_err()
{
panic!("Timeout assertion");
}
}

#[sqlx::test(migrations = "../../migrations")]
async fn test_composition_without_header_or_receipt(pgpool: PgPool) {
let mut service = service(pgpool.clone()).await;
// if it has neither, should return payment required
let req = Request::new(Default::default());
let res = service.call(req).await.unwrap();
assert_eq!(res.status(), StatusCode::PAYMENT_REQUIRED);
}
}
Loading
Loading