Skip to content

Commit 906fb3f

Browse files
committed
refactor: use auth middleware
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 570b105 commit 906fb3f

File tree

5 files changed

+47
-104
lines changed

5 files changed

+47
-104
lines changed

crates/service/src/error.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,8 @@ pub enum IndexerServiceError {
2222
ReceiptError(#[from] tap_core::Error),
2323
#[error("No attestation signer found for allocation `{0}`")]
2424
NoSignerForAllocation(Address),
25-
#[error("Invalid request body: {0}")]
26-
InvalidRequest(anyhow::Error),
2725
#[error("Error while processing the request: {0}")]
2826
ProcessingError(SubgraphServiceError),
29-
#[error("No valid receipt or free query auth token provided")]
30-
Unauthorized,
31-
#[error("Invalid free query auth token")]
32-
InvalidFreeQueryAuthToken,
3327
#[error("Failed to sign attestation")]
3428
FailedToSignAttestation,
3529

@@ -47,15 +41,9 @@ impl IntoResponse for IndexerServiceError {
4741
}
4842

4943
let status = match self {
50-
Unauthorized => StatusCode::UNAUTHORIZED,
51-
5244
NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR,
5345

54-
ReceiptError(_)
55-
| InvalidRequest(_)
56-
| InvalidFreeQueryAuthToken
57-
| EscrowAccount(_)
58-
| ProcessingError(_) => StatusCode::BAD_REQUEST,
46+
ReceiptError(_) | EscrowAccount(_) | ProcessingError(_) => StatusCode::BAD_REQUEST,
5947
ReceiptNotFound => StatusCode::PAYMENT_REQUIRED,
6048
};
6149
tracing::error!(%self, "An IndexerServiceError occoured.");

crates/service/src/middleware.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
mod auth;
4+
pub mod auth;
55
mod inject_allocation;
66
mod inject_deployment;
77
mod inject_labels;
@@ -10,8 +10,9 @@ mod inject_sender;
1010
mod prometheus_metrics;
1111

1212
pub use inject_allocation::{allocation_middleware, Allocation, AllocationState};
13+
pub use inject_context::context_middleware;
1314
pub use inject_deployment::deployment_middleware;
1415
pub use inject_labels::labels_middleware;
1516
pub use inject_receipt::receipt_middleware;
16-
pub use inject_sender::{sender_middleware, Sender, SenderState};
17+
pub use inject_sender::{sender_middleware, SenderState};
1718
pub use prometheus_metrics::PrometheusMetricsMiddlewareLayer;

crates/service/src/routes/request_handler.rs

Lines changed: 3 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,87 +3,26 @@
33

44
use std::sync::Arc;
55

6-
use crate::{
7-
error::IndexerServiceError,
8-
metrics::FAILED_RECEIPT,
9-
middleware::{Allocation, Sender},
10-
tap::AgoraQuery,
11-
};
6+
use crate::{error::IndexerServiceError, middleware::Allocation};
127
use axum::{
138
extract::{Path, State},
14-
http::HeaderMap,
159
response::IntoResponse,
1610
Extension,
1711
};
18-
use axum_extra::TypedHeader;
1912
use reqwest::StatusCode;
20-
use serde_json::value::RawValue;
21-
use tap_core::receipt::Context;
2213
use thegraph_core::DeploymentId;
2314
use tracing::trace;
2415

25-
use crate::service::{AttestationOutput, IndexerServiceResponse, IndexerServiceState, TapReceipt};
16+
use crate::service::{AttestationOutput, IndexerServiceResponse, IndexerServiceState};
2617

2718
pub async fn request_handler(
2819
Path(manifest_id): Path<DeploymentId>,
29-
TypedHeader(receipt): TypedHeader<TapReceipt>,
30-
Extension(Sender(sender)): Extension<Sender>,
3120
Extension(Allocation(allocation_id)): Extension<Allocation>,
3221
State(state): State<Arc<IndexerServiceState>>,
33-
headers: HeaderMap,
3422
req: String,
3523
) -> Result<impl IntoResponse, IndexerServiceError> {
3624
trace!("Handling request for deployment `{manifest_id}`");
3725

38-
let request: QueryBody =
39-
serde_json::from_str(&req).map_err(|e| IndexerServiceError::InvalidRequest(e.into()))?;
40-
41-
if let Some(receipt) = receipt.into_signed_receipt() {
42-
let variables = request
43-
.variables
44-
.as_ref()
45-
.map(ToString::to_string)
46-
.unwrap_or_default();
47-
let mut ctx = Context::new();
48-
ctx.insert(AgoraQuery {
49-
deployment_id: manifest_id,
50-
query: request.query.clone(),
51-
variables,
52-
});
53-
54-
// Verify the receipt and store it in the database
55-
state
56-
.tap_manager
57-
.verify_and_store_receipt(&ctx, receipt)
58-
.await
59-
.inspect_err(|_| {
60-
FAILED_RECEIPT
61-
.with_label_values(&[
62-
&manifest_id.to_string(),
63-
&allocation_id.to_string(),
64-
&sender.to_string(),
65-
])
66-
.inc()
67-
})
68-
.map_err(IndexerServiceError::ReceiptError)?;
69-
} else {
70-
match headers
71-
.get("authorization")
72-
.and_then(|v| v.to_str().ok())
73-
.and_then(|s| s.strip_prefix("Bearer "))
74-
.map(|s| s.to_string())
75-
{
76-
None => return Err(IndexerServiceError::Unauthorized),
77-
Some(ref token) => {
78-
if Some(token) != state.config.service.free_query_auth_token.as_ref() {
79-
return Err(IndexerServiceError::InvalidFreeQueryAuthToken);
80-
}
81-
}
82-
}
83-
84-
trace!(?manifest_id, "New free query");
85-
}
86-
8726
// Check if we have an attestation signer for the allocation the receipt was created for
8827
let signer = state
8928
.attestation_signers
@@ -94,7 +33,7 @@ pub async fn request_handler(
9433

9534
let response = state
9635
.service_impl
97-
.process_request(manifest_id, request)
36+
.process_request(manifest_id, &req)
9837
.await
9938
.map_err(IndexerServiceError::ProcessingError)?;
10039

@@ -112,9 +51,3 @@ pub async fn request_handler(
11251

11352
Ok((StatusCode::OK, response))
11453
}
115-
116-
#[derive(Debug, serde::Deserialize, serde::Serialize)]
117-
pub struct QueryBody {
118-
pub query: String,
119-
pub variables: Option<Box<RawValue>>,
120-
}

crates/service/src/service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl SubgraphService {
9898
pub async fn process_request<Request: DeserializeOwned + Send + std::fmt::Debug + Serialize>(
9999
&self,
100100
deployment: DeploymentId,
101-
request: Request,
101+
request: &Request,
102102
) -> Result<SubgraphServiceResponse, SubgraphServiceError> {
103103
let deployment_url = self
104104
.state
@@ -110,7 +110,7 @@ impl SubgraphService {
110110
.state
111111
.graph_node_client
112112
.post(deployment_url)
113-
.json(&request)
113+
.json(request)
114114
.send()
115115
.await
116116
.map_err(SubgraphServiceError::QueryForwardingError)?;

crates/service/src/service/indexer_service.rs

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use anyhow;
5-
use axum::extract::MatchedPath;
6-
use axum::extract::Request as ExtractRequest;
75
use axum::{
6+
extract::{MatchedPath, Request as ExtractRequest},
87
http::{Method, Request},
98
middleware::{from_fn, from_fn_with_state},
109
response::IntoResponse,
@@ -26,20 +25,24 @@ use std::{
2625
};
2726
use tap_core::{manager::Manager, receipt::checks::CheckList, tap_eip712_domain};
2827
use thegraph_core::{Address, Attestation};
29-
use tokio::net::TcpListener;
30-
use tokio::signal;
31-
use tokio::sync::watch::Receiver;
28+
use tokio::{net::TcpListener, signal, sync::watch::Receiver};
3229
use tower::ServiceBuilder;
3330
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
34-
use tower_http::validate_request::ValidateRequestHeaderLayer;
35-
use tower_http::{cors, cors::CorsLayer, normalize_path::NormalizePath, trace::TraceLayer};
36-
use tracing::warn;
37-
use tracing::{error, info, info_span};
31+
use tower_http::{
32+
auth::AsyncRequireAuthorizationLayer,
33+
cors::{self, CorsLayer},
34+
normalize_path::NormalizePath,
35+
trace::TraceLayer,
36+
validate_request::ValidateRequestHeaderLayer,
37+
};
38+
use tracing::{error, info, info_span, warn};
3839

3940
use crate::{
40-
metrics::{HANDLER_FAILURE, HANDLER_HISTOGRAM},
41+
metrics::{FAILED_RECEIPT, HANDLER_FAILURE, HANDLER_HISTOGRAM},
4142
middleware::{
42-
allocation_middleware, deployment_middleware, labels_middleware, receipt_middleware,
43+
allocation_middleware,
44+
auth::{self, Bearer, OrExt},
45+
context_middleware, deployment_middleware, labels_middleware, receipt_middleware,
4346
sender_middleware, AllocationState, PrometheusMetricsMiddlewareLayer, SenderState,
4447
},
4548
routes::{health, request_handler, static_subgraph_request_handler},
@@ -96,7 +99,6 @@ pub struct IndexerServiceOptions {
9699
pub struct IndexerServiceState {
97100
pub config: Config,
98101
pub attestation_signers: Receiver<HashMap<Address, AttestationSigner>>,
99-
pub tap_manager: Manager<IndexerTapContext>,
100102
pub service_impl: SubgraphService,
101103
}
102104

@@ -266,16 +268,15 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> {
266268
)
267269
.await;
268270

269-
let tap_manager = Manager::new(
271+
let tap_manager = Box::leak(Box::new(Manager::new(
270272
domain_separator.clone(),
271273
indexer_context,
272274
CheckList::new(checks),
273-
);
275+
)));
274276

275277
let state = Arc::new(IndexerServiceState {
276278
config: options.config.clone(),
277279
attestation_signers,
278-
tap_manager,
279280
service_impl: options.service_impl,
280281
});
281282

@@ -361,6 +362,22 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> {
361362

362363
misc_routes = misc_routes.with_state(state.clone());
363364

365+
let mut request_handler_route = post(request_handler);
366+
367+
// inject auth
368+
let failed_receipt_metric = Box::leak(Box::new(FAILED_RECEIPT.clone()));
369+
let tap_auth = auth::tap_receipt_authorize(tap_manager, failed_receipt_metric);
370+
371+
if let Some(free_auth_token) = &options.config.service.serve_auth_token {
372+
let free_query = Bearer::new(free_auth_token);
373+
let result = free_query.or(tap_auth);
374+
let auth_layer = AsyncRequireAuthorizationLayer::new(result);
375+
request_handler_route = request_handler_route.layer(auth_layer);
376+
} else {
377+
let auth_layer = AsyncRequireAuthorizationLayer::new(tap_auth);
378+
request_handler_route = request_handler_route.layer(auth_layer);
379+
}
380+
364381
let deployment_to_allocation = deployment_to_allocation(allocations);
365382
let allocation_state = AllocationState {
366383
deployment_to_allocation,
@@ -385,15 +402,19 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> {
385402
.layer(PrometheusMetricsMiddlewareLayer::new(
386403
HANDLER_HISTOGRAM.clone(),
387404
HANDLER_FAILURE.clone(),
388-
));
405+
))
406+
// tap context
407+
.layer(from_fn(context_middleware));
408+
409+
request_handler_route = request_handler_route.layer(service_builder);
389410

390411
let data_routes = Router::new()
391412
.route(
392413
PathBuf::from(&options.config.service.url_prefix)
393414
.join(format!("{}/id/:id", options.url_namespace))
394415
.to_str()
395416
.expect("Failed to set up `/{url_namespace}/id/:id` route"),
396-
post(request_handler).route_layer(service_builder),
417+
request_handler_route,
397418
)
398419
.with_state(state.clone());
399420

0 commit comments

Comments
 (0)