Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions crates/service/src/routes/cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

use std::str::FromStr;
use std::sync::Arc;

use crate::database::cost_model::{self, CostModel};
use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject};
Expand Down Expand Up @@ -127,7 +126,7 @@ impl Query {
ctx: &Context<'_>,
deployment_ids: Vec<DeploymentId>,
) -> Result<Vec<GraphQlCostModel>, anyhow::Error> {
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
let pool = &ctx.data_unchecked::<SubgraphServiceState>().database;
let cost_models = cost_model::cost_models(pool, &deployment_ids).await?;
Ok(cost_models.into_iter().map(|m| m.into()).collect())
}
Expand All @@ -137,7 +136,7 @@ impl Query {
ctx: &Context<'_>,
deployment_id: DeploymentId,
) -> Result<Option<GraphQlCostModel>, anyhow::Error> {
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
let pool = &ctx.data_unchecked::<SubgraphServiceState>().database;
cost_model::cost_model(pool, &deployment_id)
.await
.map(|model_opt| model_opt.map(GraphQlCostModel::from))
Expand All @@ -151,7 +150,7 @@ pub async fn build_schema() -> CostSchema {
}

pub async fn cost(
State(state): State<Arc<SubgraphServiceState>>,
State(state): State<SubgraphServiceState>,
req: GraphQLRequest,
) -> GraphQLResponse {
state
Expand Down
21 changes: 7 additions & 14 deletions crates/service/src/routes/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use thegraph_core::DeploymentId;
use tracing::trace;

use crate::service::{
AttestationOutput, IndexerServiceError, IndexerServiceImpl, IndexerServiceResponse,
IndexerServiceState, TapReceipt,
AttestationOutput, IndexerServiceError, IndexerServiceResponse, IndexerServiceState, TapReceipt,
};

lazy_static! {
Expand All @@ -45,16 +44,13 @@ lazy_static! {

}

pub async fn request_handler<I>(
pub async fn request_handler(
Path(manifest_id): Path<DeploymentId>,
typed_header: TypedHeader<TapReceipt>,
state: State<Arc<IndexerServiceState<I>>>,
state: State<Arc<IndexerServiceState>>,
headers: HeaderMap,
body: String,
) -> Result<impl IntoResponse, IndexerServiceError<I::Error>>
where
I: IndexerServiceImpl + Sync + Send + 'static,
{
) -> Result<impl IntoResponse, IndexerServiceError> {
_request_handler(manifest_id, typed_header, state, headers, body)
.await
.inspect_err(|_| {
Expand All @@ -64,16 +60,13 @@ where
})
}

async fn _request_handler<I>(
async fn _request_handler(
manifest_id: DeploymentId,
TypedHeader(receipt): TypedHeader<TapReceipt>,
State(state): State<Arc<IndexerServiceState<I>>>,
State(state): State<Arc<IndexerServiceState>>,
headers: HeaderMap,
req: String,
) -> Result<impl IntoResponse, IndexerServiceError<I::Error>>
where
I: IndexerServiceImpl + Sync + Send + 'static,
{
) -> Result<impl IntoResponse, IndexerServiceError> {
trace!("Handling request for deployment `{manifest_id}`");

let request: QueryBody =
Expand Down
3 changes: 1 addition & 2 deletions crates/service/src/routes/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashSet;
use std::sync::Arc;

use async_graphql_axum::GraphQLRequest;
use axum::{extract::State, response::IntoResponse, Json};
Expand Down Expand Up @@ -58,7 +57,7 @@ impl IntoRequestParameters for WrappedGraphQLRequest {

// Custom middleware function to process the request before reaching the main handler
pub async fn status(
State(state): State<Arc<SubgraphServiceState>>,
State(state): State<SubgraphServiceState>,
request: GraphQLRequest,
) -> Result<impl IntoResponse, SubgraphServiceError> {
let request = request.into_inner();
Expand Down
32 changes: 13 additions & 19 deletions crates/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use anyhow::anyhow;
use async_graphql::{EmptySubscription, Schema};
use async_graphql_axum::GraphQL;
use axum::{
async_trait,
routing::{post, post_service},
Json, Router,
};
Expand All @@ -28,7 +27,7 @@ use crate::{
dips::{AgreementStore, InMemoryAgreementStore},
},
routes::dips::Price,
service::indexer_service::{IndexerService, IndexerServiceOptions, IndexerServiceRelease},
service::indexer_service::{IndexerServiceOptions, IndexerServiceRelease},
};
use clap::Parser;
use tracing::error;
Expand All @@ -37,13 +36,12 @@ mod indexer_service;
mod tap_receipt_header;

pub use indexer_service::{
AttestationOutput, IndexerServiceError, IndexerServiceImpl, IndexerServiceResponse,
IndexerServiceState,
AttestationOutput, IndexerServiceError, IndexerServiceResponse, IndexerServiceState,
};
pub use tap_receipt_header::TapReceipt;

#[derive(Debug)]
struct SubgraphServiceResponse {
pub struct SubgraphServiceResponse {
inner: String,
attestable: bool,
}
Expand Down Expand Up @@ -78,6 +76,7 @@ impl IndexerServiceResponse for SubgraphServiceResponse {
}
}

#[derive(Clone)]
pub struct SubgraphServiceState {
pub config: &'static Config,
pub database: PgPool,
Expand All @@ -87,27 +86,22 @@ pub struct SubgraphServiceState {
pub graph_node_query_base_url: &'static Url,
}

struct SubgraphService {
state: Arc<SubgraphServiceState>,
pub struct SubgraphService {
state: SubgraphServiceState,
}

impl SubgraphService {
fn new(state: Arc<SubgraphServiceState>) -> Self {
fn new(state: SubgraphServiceState) -> Self {
Self { state }
}
}

#[async_trait]
impl IndexerServiceImpl for SubgraphService {
type Error = SubgraphServiceError;
type Response = SubgraphServiceResponse;
type State = SubgraphServiceState;

async fn process_request<Request: DeserializeOwned + Send + std::fmt::Debug + Serialize>(
impl SubgraphService {
pub async fn process_request<Request: DeserializeOwned + Send + std::fmt::Debug + Serialize>(
&self,
deployment: DeploymentId,
request: Request,
) -> Result<Self::Response, Self::Error> {
) -> Result<SubgraphServiceResponse, SubgraphServiceError> {
let deployment_url = self
.state
.graph_node_query_base_url
Expand Down Expand Up @@ -166,7 +160,7 @@ pub async fn run() -> anyhow::Result<()> {
// Some of the subgraph service configuration goes into the so-called
// "state", which will be passed to any request handler, middleware etc.
// that is involved in serving requests
let state = Arc::new(SubgraphServiceState {
let state = SubgraphServiceState {
config,
database: database::connect(config.database.clone().get_formated_postgres_url().as_ref())
.await,
Expand All @@ -178,7 +172,7 @@ pub async fn run() -> anyhow::Result<()> {
.expect("Failed to init HTTP client for Graph Node"),
graph_node_status_url: &config.graph_node.status_url,
graph_node_query_base_url: &config.graph_node.query_url,
});
};

let agreement_store: Arc<dyn AgreementStore> = Arc::new(InMemoryAgreementStore::default());
let prices: Vec<Price> = vec![];
Expand Down Expand Up @@ -215,7 +209,7 @@ pub async fn run() -> anyhow::Result<()> {
router = router.route("/dips", post_service(GraphQL::new(schema)));
}

IndexerService::run(IndexerServiceOptions {
indexer_service::run(IndexerServiceOptions {
release,
config,
url_namespace: "subgraphs",
Expand Down
Loading
Loading