|
| 1 | +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +mod bearer; |
| 5 | +mod or; |
| 6 | +mod tap; |
| 7 | + |
| 8 | +pub use bearer::Bearer; |
| 9 | +pub use or::OrExt; |
| 10 | +pub use tap::tap_receipt_authorize; |
| 11 | + |
| 12 | +#[cfg(test)] |
| 13 | +mod tests { |
| 14 | + use std::time::Duration; |
| 15 | + |
| 16 | + use alloy::primitives::{address, Address}; |
| 17 | + use axum::body::Body; |
| 18 | + use axum::http::{Request, Response}; |
| 19 | + use reqwest::{header, StatusCode}; |
| 20 | + use sqlx::PgPool; |
| 21 | + use tap_core::{manager::Manager, receipt::checks::CheckList}; |
| 22 | + use tokio::time::sleep; |
| 23 | + use tower::{Service, ServiceBuilder, ServiceExt}; |
| 24 | + use tower_http::auth::AsyncRequireAuthorizationLayer; |
| 25 | + |
| 26 | + use crate::middleware::auth::{self, Bearer, OrExt}; |
| 27 | + use crate::tap::IndexerTapContext; |
| 28 | + use test_assets::{create_signed_receipt, TAP_EIP712_DOMAIN}; |
| 29 | + |
| 30 | + const ALLOCATION_ID: Address = address!("deadbeefcafebabedeadbeefcafebabedeadbeef"); |
| 31 | + const BEARER_TOKEN: &str = "test"; |
| 32 | + |
| 33 | + async fn service( |
| 34 | + pgpool: PgPool, |
| 35 | + ) -> impl Service<Request<Body>, Response = Response<Body>, Error = impl std::fmt::Debug> { |
| 36 | + let context = IndexerTapContext::new(pgpool.clone(), TAP_EIP712_DOMAIN.clone()).await; |
| 37 | + let tap_manager = Box::leak(Box::new(Manager::new( |
| 38 | + TAP_EIP712_DOMAIN.clone(), |
| 39 | + context, |
| 40 | + CheckList::empty(), |
| 41 | + ))); |
| 42 | + |
| 43 | + let registry = prometheus::Registry::new(); |
| 44 | + let metric = Box::leak(Box::new( |
| 45 | + prometheus::register_counter_vec_with_registry!( |
| 46 | + "merge_checks_test", |
| 47 | + "Failed queries to handler", |
| 48 | + &["deployment"], |
| 49 | + registry, |
| 50 | + ) |
| 51 | + .unwrap(), |
| 52 | + )); |
| 53 | + let free_query = Bearer::new(BEARER_TOKEN); |
| 54 | + let tap_auth = auth::tap_receipt_authorize(tap_manager, metric); |
| 55 | + let authorize_requests = free_query.or(tap_auth); |
| 56 | + |
| 57 | + let authorization_middleware = AsyncRequireAuthorizationLayer::new(authorize_requests); |
| 58 | + |
| 59 | + let mut service = ServiceBuilder::new() |
| 60 | + .layer(authorization_middleware) |
| 61 | + .service_fn(|_: Request<Body>| async { |
| 62 | + Ok::<_, anyhow::Error>(Response::new(Body::default())) |
| 63 | + }); |
| 64 | + |
| 65 | + service.ready().await.unwrap(); |
| 66 | + service |
| 67 | + } |
| 68 | + |
| 69 | + #[sqlx::test(migrations = "../../migrations")] |
| 70 | + async fn test_composition_header_valid(pgpool: PgPool) { |
| 71 | + let mut service = service(pgpool.clone()).await; |
| 72 | + // should allow queries that contains the free token |
| 73 | + // if the token does not match, return payment required |
| 74 | + let mut req = Request::new(Default::default()); |
| 75 | + req.headers_mut().insert( |
| 76 | + header::AUTHORIZATION, |
| 77 | + format!("Bearer {}", BEARER_TOKEN).parse().unwrap(), |
| 78 | + ); |
| 79 | + let res = service.call(req).await.unwrap(); |
| 80 | + assert_eq!(res.status(), StatusCode::OK); |
| 81 | + } |
| 82 | + |
| 83 | + #[sqlx::test(migrations = "../../migrations")] |
| 84 | + async fn test_composition_header_invalid(pgpool: PgPool) { |
| 85 | + let mut service = service(pgpool.clone()).await; |
| 86 | + |
| 87 | + // if the token exists but is wrong, try the receipt |
| 88 | + let mut req = Request::new(Default::default()); |
| 89 | + req.headers_mut() |
| 90 | + .insert(header::AUTHORIZATION, "Bearer wrongtoken".parse().unwrap()); |
| 91 | + let res = service.call(req).await.unwrap(); |
| 92 | + // we return the error from tap |
| 93 | + assert_eq!(res.status(), StatusCode::PAYMENT_REQUIRED); |
| 94 | + } |
| 95 | + |
| 96 | + #[sqlx::test(migrations = "../../migrations")] |
| 97 | + async fn test_composition_with_receipt(pgpool: PgPool) { |
| 98 | + let mut service = service(pgpool.clone()).await; |
| 99 | + |
| 100 | + let receipt = create_signed_receipt(ALLOCATION_ID, 1, 1, 1).await; |
| 101 | + |
| 102 | + // check with receipt |
| 103 | + let mut req = Request::new(Default::default()); |
| 104 | + req.extensions_mut().insert(receipt); |
| 105 | + let res = service.call(req).await.unwrap(); |
| 106 | + assert_eq!(res.status(), StatusCode::OK); |
| 107 | + |
| 108 | + // verify receipts |
| 109 | + if tokio::time::timeout(Duration::from_secs(1), async { |
| 110 | + loop { |
| 111 | + let result = sqlx::query!("SELECT * FROM scalar_tap_receipts") |
| 112 | + .fetch_all(&pgpool) |
| 113 | + .await |
| 114 | + .unwrap(); |
| 115 | + |
| 116 | + if result.is_empty() { |
| 117 | + sleep(Duration::from_millis(50)).await; |
| 118 | + } else { |
| 119 | + break; |
| 120 | + } |
| 121 | + } |
| 122 | + }) |
| 123 | + .await |
| 124 | + .is_err() |
| 125 | + { |
| 126 | + panic!("Timeout assertion"); |
| 127 | + } |
| 128 | + } |
| 129 | + |
| 130 | + #[sqlx::test(migrations = "../../migrations")] |
| 131 | + async fn test_composition_without_header_or_receipt(pgpool: PgPool) { |
| 132 | + let mut service = service(pgpool.clone()).await; |
| 133 | + // if it has neither, should return payment required |
| 134 | + let req = Request::new(Default::default()); |
| 135 | + let res = service.call(req).await.unwrap(); |
| 136 | + assert_eq!(res.status(), StatusCode::PAYMENT_REQUIRED); |
| 137 | + } |
| 138 | +} |
0 commit comments