Skip to content

Commit 490fe8f

Browse files
committed
refactor: move process_request to request handler
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent f85f8a2 commit 490fe8f

File tree

4 files changed

+50
-114
lines changed

4 files changed

+50
-114
lines changed

crates/service/src/error.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ pub enum IndexerServiceError {
2626

2727
#[error("Issues with provided receipt: {0}")]
2828
ReceiptError(#[from] tap_core::Error),
29-
#[error("Error while processing the request: {0}")]
30-
ProcessingError(SubgraphServiceError),
3129

3230
#[error("There was an error while accessing escrow account: {0}")]
3331
EscrowAccount(#[from] EscrowAccountsError),
@@ -43,7 +41,7 @@ impl IntoResponse for IndexerServiceError {
4341
}
4442

4543
let status = match self {
46-
ReceiptError(_) | EscrowAccount(_) | ProcessingError(_) => StatusCode::BAD_REQUEST,
44+
ReceiptError(_) | EscrowAccount(_) => StatusCode::BAD_REQUEST,
4745
ReceiptNotFound => StatusCode::PAYMENT_REQUIRED,
4846
DeploymentIdNotFound => StatusCode::INTERNAL_SERVER_ERROR,
4947
AxumError(_) => StatusCode::BAD_REQUEST,
Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,58 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::sync::Arc;
5-
6-
use crate::error::IndexerServiceError;
4+
use crate::{
5+
error::SubgraphServiceError, middleware::AttestationInput, service::SubgraphServiceState,
6+
};
77
use axum::{
88
extract::{Path, State},
9+
http::{HeaderValue, Response},
910
response::IntoResponse,
1011
};
12+
use reqwest::header::CONTENT_TYPE;
1113
use thegraph_core::DeploymentId;
1214
use tracing::trace;
1315

14-
use crate::service::IndexerServiceState;
15-
1616
pub async fn request_handler(
17-
Path(manifest_id): Path<DeploymentId>,
18-
State(state): State<Arc<IndexerServiceState>>,
17+
Path(deployment): Path<DeploymentId>,
18+
State(state): State<SubgraphServiceState>,
1919
req: String,
20-
) -> Result<impl IntoResponse, IndexerServiceError> {
21-
trace!("Handling request for deployment `{manifest_id}`");
20+
) -> Result<impl IntoResponse, SubgraphServiceError> {
21+
trace!("Handling request for deployment `{deployment}`");
22+
23+
let deployment_url = state
24+
.graph_node_query_base_url
25+
.join(&format!("subgraphs/id/{deployment}"))
26+
.map_err(|_| SubgraphServiceError::InvalidDeployment(deployment))?;
2227

2328
let response = state
24-
.service_impl
25-
.process_request(manifest_id, req)
29+
.graph_node_client
30+
.post(deployment_url)
31+
.body(req.clone())
32+
.header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
33+
.send()
2634
.await
27-
.map_err(IndexerServiceError::ProcessingError)?;
35+
.map_err(SubgraphServiceError::QueryForwardingError)?;
36+
37+
let attestable = response
38+
.headers()
39+
.get("graph-attestable")
40+
.map_or(false, |value| {
41+
value.to_str().map(|value| value == "true").unwrap_or(false)
42+
});
43+
44+
let body = response
45+
.text()
46+
.await
47+
.map_err(SubgraphServiceError::QueryForwardingError)?;
48+
let attestation_input = if attestable {
49+
AttestationInput::Attestable { req }
50+
} else {
51+
AttestationInput::NotAttestable
52+
};
53+
54+
let mut response = Response::new(body);
55+
response.extensions_mut().insert(attestation_input);
2856

2957
Ok(response)
3058
}

crates/service/src/service.rs

Lines changed: 3 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,25 @@
44
use std::sync::Arc;
55
use std::time::Duration;
66

7-
use super::{error::SubgraphServiceError, routes};
7+
use super::routes;
88
use anyhow::anyhow;
99
use async_graphql::{EmptySubscription, Schema};
1010
use async_graphql_axum::GraphQL;
1111
use axum::{
12-
http::{HeaderValue, Response},
1312
routing::{post, post_service},
1413
Router,
1514
};
1615
use indexer_config::{Config, DipsConfig};
17-
use reqwest::{header::CONTENT_TYPE, Url};
16+
use reqwest::Url;
1817
use sqlx::PgPool;
1918
use thegraph_core::attestation::eip712_domain;
20-
use thegraph_core::DeploymentId;
2119

2220
use crate::{
2321
cli::Cli,
2422
database::{
2523
self,
2624
dips::{AgreementStore, InMemoryAgreementStore},
2725
},
28-
middleware::AttestationInput,
2926
routes::dips::Price,
3027
service::indexer_service::{IndexerServiceOptions, IndexerServiceRelease},
3128
};
@@ -35,7 +32,6 @@ use tracing::error;
3532
mod indexer_service;
3633
mod tap_receipt_header;
3734

38-
pub use indexer_service::{AttestationOutput, IndexerServiceResponse, IndexerServiceState};
3935
pub use tap_receipt_header::TapReceipt;
4036

4137
#[derive(Clone)]
@@ -48,62 +44,6 @@ pub struct SubgraphServiceState {
4844
pub graph_node_query_base_url: &'static Url,
4945
}
5046

51-
pub struct SubgraphService {
52-
state: SubgraphServiceState,
53-
}
54-
55-
impl SubgraphService {
56-
fn new(state: SubgraphServiceState) -> Self {
57-
Self { state }
58-
}
59-
}
60-
61-
impl SubgraphService {
62-
pub async fn process_request(
63-
&self,
64-
deployment: DeploymentId,
65-
req: String,
66-
) -> Result<Response<String>, SubgraphServiceError> {
67-
let deployment_url = self
68-
.state
69-
.graph_node_query_base_url
70-
.join(&format!("subgraphs/id/{deployment}"))
71-
.map_err(|_| SubgraphServiceError::InvalidDeployment(deployment))?;
72-
73-
let response = self
74-
.state
75-
.graph_node_client
76-
.post(deployment_url)
77-
.body(req.clone())
78-
.header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
79-
.send()
80-
.await
81-
.map_err(SubgraphServiceError::QueryForwardingError)?;
82-
83-
let attestable = response
84-
.headers()
85-
.get("graph-attestable")
86-
.map_or(false, |value| {
87-
value.to_str().map(|value| value == "true").unwrap_or(false)
88-
});
89-
90-
let body = response
91-
.text()
92-
.await
93-
.map_err(SubgraphServiceError::QueryForwardingError)?;
94-
let attestation_input = if attestable {
95-
AttestationInput::Attestable { req }
96-
} else {
97-
AttestationInput::NotAttestable
98-
};
99-
100-
let mut response = Response::new(body);
101-
response.extensions_mut().insert(attestation_input);
102-
103-
Ok(response)
104-
}
105-
}
106-
10747
/// Run the subgraph indexer service
10848
pub async fn run() -> anyhow::Result<()> {
10949
// Parse command line and environment arguments
@@ -184,7 +124,7 @@ pub async fn run() -> anyhow::Result<()> {
184124
release,
185125
config,
186126
url_namespace: "subgraphs",
187-
service_impl: SubgraphService::new(state),
127+
subgraph_state: state,
188128
extra_routes: router,
189129
})
190130
.await

crates/service/src/service/indexer_service.rs

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use axum::{
66
extract::{MatchedPath, Request as ExtractRequest},
77
http::{Method, Request},
88
middleware::{from_fn, from_fn_with_state},
9-
response::IntoResponse,
109
routing::{get, post},
1110
serve, Json, Router, ServiceExt,
1211
};
@@ -19,11 +18,8 @@ use prometheus::TextEncoder;
1918
use reqwest::StatusCode;
2019
use serde::Serialize;
2120
use sqlx::postgres::PgPoolOptions;
22-
use std::{
23-
collections::HashMap, error::Error, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration,
24-
};
21+
use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
2522
use tap_core::{manager::Manager, receipt::checks::CheckList, tap_eip712_domain};
26-
use thegraph_core::Attestation;
2723
use tokio::{net::TcpListener, signal};
2824
use tower::ServiceBuilder;
2925
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
@@ -51,21 +47,7 @@ use crate::{
5147
};
5248
use indexer_config::Config;
5349

54-
use super::SubgraphService;
55-
56-
pub trait IndexerServiceResponse {
57-
type Data: IntoResponse;
58-
type Error: Error;
59-
60-
fn is_attestable(&self) -> bool;
61-
fn as_str(&self) -> Result<&str, Self::Error>;
62-
fn finalize(self, attestation: AttestationOutput) -> Self::Data;
63-
}
64-
65-
pub enum AttestationOutput {
66-
Attestation(Option<Attestation>),
67-
Attestable,
68-
}
50+
use super::SubgraphServiceState;
6951

7052
#[derive(Clone, Serialize)]
7153
pub struct IndexerServiceRelease {
@@ -89,16 +71,11 @@ impl From<&BuildInfo> for IndexerServiceRelease {
8971
}
9072

9173
pub struct IndexerServiceOptions {
92-
pub service_impl: SubgraphService,
74+
pub subgraph_state: SubgraphServiceState,
9375
pub config: &'static Config,
9476
pub release: IndexerServiceRelease,
9577
pub url_namespace: &'static str,
96-
pub extra_routes: Router<Arc<IndexerServiceState>>,
97-
}
98-
99-
pub struct IndexerServiceState {
100-
pub config: Config,
101-
pub service_impl: SubgraphService,
78+
pub extra_routes: Router<SubgraphServiceState>,
10279
}
10380

10481
const HTTP_CLIENT_TIMEOUT: Duration = Duration::from_secs(30);
@@ -273,11 +250,6 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> {
273250
CheckList::new(checks),
274251
)));
275252

276-
let state = Arc::new(IndexerServiceState {
277-
config: options.config.clone(),
278-
service_impl: options.service_impl,
279-
});
280-
281253
// Rate limits by allowing bursts of 10 requests and requiring 100ms of
282254
// time between consecutive requests after that, effectively rate
283255
// limiting to 10 req/s.
@@ -358,8 +330,6 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> {
358330
}
359331
}
360332

361-
misc_routes = misc_routes.with_state(state.clone());
362-
363333
let mut request_handler_route = post(request_handler);
364334

365335
// inject auth
@@ -421,7 +391,7 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> {
421391
.expect("Failed to set up `/{url_namespace}/id/:id` route"),
422392
request_handler_route,
423393
)
424-
.with_state(state.clone());
394+
.with_state(options.subgraph_state.clone());
425395

426396
let router = NormalizePath::trim_trailing_slash(
427397
misc_routes
@@ -457,7 +427,7 @@ pub async fn run(options: IndexerServiceOptions) -> Result<(), anyhow::Error> {
457427
_span: &tracing::Span| {},
458428
),
459429
)
460-
.with_state(state),
430+
.with_state(options.subgraph_state),
461431
);
462432

463433
serve_metrics(options.config.metrics.get_socket_addr());

0 commit comments

Comments
 (0)