Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions crates/service/src/routes/cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

use std::str::FromStr;
use std::sync::Arc;

use crate::database::cost_model::{self, CostModel};
use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject};
Expand Down Expand Up @@ -127,7 +126,7 @@ impl Query {
ctx: &Context<'_>,
deployment_ids: Vec<DeploymentId>,
) -> Result<Vec<GraphQlCostModel>, anyhow::Error> {
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
let pool = &ctx.data_unchecked::<SubgraphServiceState>().database;
let cost_models = cost_model::cost_models(pool, &deployment_ids).await?;
Ok(cost_models.into_iter().map(|m| m.into()).collect())
}
Expand All @@ -137,7 +136,7 @@ impl Query {
ctx: &Context<'_>,
deployment_id: DeploymentId,
) -> Result<Option<GraphQlCostModel>, anyhow::Error> {
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
let pool = &ctx.data_unchecked::<SubgraphServiceState>().database;
cost_model::cost_model(pool, &deployment_id)
.await
.map(|model_opt| model_opt.map(GraphQlCostModel::from))
Expand All @@ -151,7 +150,7 @@ pub async fn build_schema() -> CostSchema {
}

pub async fn cost(
State(state): State<Arc<SubgraphServiceState>>,
State(state): State<SubgraphServiceState>,
req: GraphQLRequest,
) -> GraphQLResponse {
state
Expand Down
21 changes: 7 additions & 14 deletions crates/service/src/routes/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use thegraph_core::DeploymentId;
use tracing::trace;

use crate::service::{
AttestationOutput, IndexerServiceError, IndexerServiceImpl, IndexerServiceResponse,
IndexerServiceState, TapReceipt,
AttestationOutput, IndexerServiceError, IndexerServiceResponse, IndexerServiceState, TapReceipt,
};

lazy_static! {
Expand All @@ -45,16 +44,13 @@ lazy_static! {

}

pub async fn request_handler<I>(
pub async fn request_handler(
Path(manifest_id): Path<DeploymentId>,
typed_header: TypedHeader<TapReceipt>,
state: State<Arc<IndexerServiceState<I>>>,
state: State<Arc<IndexerServiceState>>,
headers: HeaderMap,
body: String,
) -> Result<impl IntoResponse, IndexerServiceError<I::Error>>
where
I: IndexerServiceImpl + Sync + Send + 'static,
{
) -> Result<impl IntoResponse, IndexerServiceError> {
_request_handler(manifest_id, typed_header, state, headers, body)
.await
.inspect_err(|_| {
Expand All @@ -64,16 +60,13 @@ where
})
}

async fn _request_handler<I>(
async fn _request_handler(
manifest_id: DeploymentId,
TypedHeader(receipt): TypedHeader<TapReceipt>,
State(state): State<Arc<IndexerServiceState<I>>>,
State(state): State<Arc<IndexerServiceState>>,
headers: HeaderMap,
req: String,
) -> Result<impl IntoResponse, IndexerServiceError<I::Error>>
where
I: IndexerServiceImpl + Sync + Send + 'static,
{
) -> Result<impl IntoResponse, IndexerServiceError> {
trace!("Handling request for deployment `{manifest_id}`");

let request: QueryBody =
Expand Down
3 changes: 1 addition & 2 deletions crates/service/src/routes/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashSet;
use std::sync::Arc;

use async_graphql_axum::GraphQLRequest;
use axum::{extract::State, response::IntoResponse, Json};
Expand Down Expand Up @@ -58,7 +57,7 @@ impl IntoRequestParameters for WrappedGraphQLRequest {

// Custom middleware function to process the request before reaching the main handler
pub async fn status(
State(state): State<Arc<SubgraphServiceState>>,
State(state): State<SubgraphServiceState>,
request: GraphQLRequest,
) -> Result<impl IntoResponse, SubgraphServiceError> {
let request = request.into_inner();
Expand Down
32 changes: 13 additions & 19 deletions crates/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use anyhow::anyhow;
use async_graphql::{EmptySubscription, Schema};
use async_graphql_axum::GraphQL;
use axum::{
async_trait,
routing::{post, post_service},
Json, Router,
};
Expand All @@ -28,7 +27,7 @@ use crate::{
dips::{AgreementStore, InMemoryAgreementStore},
},
routes::dips::Price,
service::indexer_service::{IndexerService, IndexerServiceOptions, IndexerServiceRelease},
service::indexer_service::{IndexerServiceOptions, IndexerServiceRelease},
};
use clap::Parser;
use tracing::error;
Expand All @@ -37,13 +36,12 @@ mod indexer_service;
mod tap_receipt_header;

pub use indexer_service::{
AttestationOutput, IndexerServiceError, IndexerServiceImpl, IndexerServiceResponse,
IndexerServiceState,
AttestationOutput, IndexerServiceError, IndexerServiceResponse, IndexerServiceState,
};
pub use tap_receipt_header::TapReceipt;

#[derive(Debug)]
struct SubgraphServiceResponse {
pub struct SubgraphServiceResponse {
inner: String,
attestable: bool,
}
Expand Down Expand Up @@ -78,6 +76,7 @@ impl IndexerServiceResponse for SubgraphServiceResponse {
}
}

#[derive(Clone)]
pub struct SubgraphServiceState {
pub config: &'static Config,
pub database: PgPool,
Expand All @@ -87,27 +86,22 @@ pub struct SubgraphServiceState {
pub graph_node_query_base_url: &'static Url,
}

struct SubgraphService {
state: Arc<SubgraphServiceState>,
pub struct SubgraphService {
state: SubgraphServiceState,
}

impl SubgraphService {
fn new(state: Arc<SubgraphServiceState>) -> Self {
fn new(state: SubgraphServiceState) -> Self {
Self { state }
}
}

#[async_trait]
impl IndexerServiceImpl for SubgraphService {
type Error = SubgraphServiceError;
type Response = SubgraphServiceResponse;
type State = SubgraphServiceState;

async fn process_request<Request: DeserializeOwned + Send + std::fmt::Debug + Serialize>(
impl SubgraphService {
pub async fn process_request<Request: DeserializeOwned + Send + std::fmt::Debug + Serialize>(
&self,
deployment: DeploymentId,
request: Request,
) -> Result<Self::Response, Self::Error> {
) -> Result<SubgraphServiceResponse, SubgraphServiceError> {
let deployment_url = self
.state
.graph_node_query_base_url
Expand Down Expand Up @@ -166,7 +160,7 @@ pub async fn run() -> anyhow::Result<()> {
// Some of the subgraph service configuration goes into the so-called
// "state", which will be passed to any request handler, middleware etc.
// that is involved in serving requests
let state = Arc::new(SubgraphServiceState {
let state = SubgraphServiceState {
config,
database: database::connect(config.database.clone().get_formated_postgres_url().as_ref())
.await,
Expand All @@ -178,7 +172,7 @@ pub async fn run() -> anyhow::Result<()> {
.expect("Failed to init HTTP client for Graph Node"),
graph_node_status_url: &config.graph_node.status_url,
graph_node_query_base_url: &config.graph_node.query_url,
});
};

let agreement_store: Arc<dyn AgreementStore> = Arc::new(InMemoryAgreementStore::default());
let prices: Vec<Price> = vec![];
Expand Down Expand Up @@ -215,7 +209,7 @@ pub async fn run() -> anyhow::Result<()> {
router = router.route("/dips", post_service(GraphQL::new(schema)));
}

IndexerService::run(IndexerServiceOptions {
indexer_service::run(IndexerServiceOptions {
release,
config,
url_namespace: "subgraphs",
Expand Down
Loading
Loading