Skip to content

Commit a932ff4

Browse files
committed
feat: add new route to check subgraph health
1 parent 058d9d1 commit a932ff4

File tree

4 files changed

+123
-1
lines changed

4 files changed

+123
-1
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
use crate::subgraph_client::Query;
2+
use axum::{
3+
extract::Path,
4+
response::{IntoResponse, Response as AxumResponse},
5+
Extension, Json,
6+
};
7+
use reqwest::StatusCode;
8+
use serde::Deserialize;
9+
use serde_json::json;
10+
use thiserror::Error;
11+
12+
use super::GraphNodeConfig;
13+
14+
#[derive(Deserialize, Debug)]
15+
struct Response {
16+
data: SubgraphData,
17+
}
18+
19+
#[derive(Deserialize, Debug)]
20+
#[allow(non_snake_case)]
21+
struct SubgraphData {
22+
indexingStatuses: Vec<IndexingStatus>,
23+
}
24+
25+
#[derive(Deserialize, Debug)]
26+
struct IndexingStatus {
27+
health: Health,
28+
}
29+
30+
#[derive(Deserialize, Debug)]
31+
#[allow(non_camel_case_types)]
32+
enum Health {
33+
healthy,
34+
unhealthy,
35+
failed,
36+
}
37+
38+
impl Health {
39+
fn as_str(&self) -> &str {
40+
match self {
41+
Health::healthy => "healthy",
42+
Health::unhealthy => "unhealthy",
43+
Health::failed => "failed",
44+
}
45+
}
46+
}
47+
48+
#[derive(Debug, Error)]
49+
pub enum CheckHealthError {
50+
#[error("Graph node config not found")]
51+
GraphNodeConfigNotFound,
52+
#[error("Deployment not found")]
53+
DeploymentNotFound,
54+
#[error("Failed to process query")]
55+
QueryForwardingError,
56+
}
57+
58+
impl IntoResponse for CheckHealthError {
59+
fn into_response(self) -> AxumResponse {
60+
let (status, error_message) = match &self {
61+
CheckHealthError::GraphNodeConfigNotFound => {
62+
(StatusCode::NOT_FOUND, "Graph node config not found")
63+
}
64+
CheckHealthError::DeploymentNotFound => (StatusCode::NOT_FOUND, "Deployment not found"),
65+
CheckHealthError::QueryForwardingError => {
66+
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to process query")
67+
}
68+
};
69+
70+
let body = serde_json::json!({
71+
"error": error_message,
72+
});
73+
74+
(status, Json(body)).into_response()
75+
}
76+
}
77+
78+
pub async fn health(
79+
Path(deployment_id): Path<String>,
80+
Extension(graph_node): Extension<Option<GraphNodeConfig>>,
81+
) -> Result<impl IntoResponse, CheckHealthError> {
82+
let url = if let Some(graph_node) = graph_node {
83+
graph_node.status_url
84+
} else {
85+
return Err(CheckHealthError::GraphNodeConfigNotFound);
86+
};
87+
88+
let body = Query::new_with_variables(
89+
r#"
90+
query indexingStatuses($ids: [String!]!) {
91+
indexingStatuses(subgraphs: $ids) {
92+
health
93+
}
94+
}
95+
"#,
96+
[("ids", json!([deployment_id]))],
97+
);
98+
99+
let client = reqwest::Client::new();
100+
let response = client.post(url).json(&body).send().await;
101+
let response = response.expect("Failed to get response");
102+
let response_json: Result<Response, reqwest::Error> = response.json().await;
103+
104+
match response_json {
105+
Ok(res) => {
106+
if res.data.indexingStatuses.len() == 0 {
107+
return Err(CheckHealthError::DeploymentNotFound);
108+
};
109+
let health_status = res.data.indexingStatuses[0].health.as_str();
110+
Ok(Json(json!({ "health": health_status })))
111+
}
112+
Err(_) => return Err(CheckHealthError::QueryForwardingError),
113+
}
114+
}

common/src/indexer_service/http/indexer_service.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use tracing::{info, info_span};
3636

3737
use crate::escrow_accounts::EscrowAccounts;
3838
use crate::escrow_accounts::EscrowAccountsError;
39+
use crate::indexer_service::http::health::health;
3940
use crate::{
4041
address::public_key,
4142
indexer_service::http::static_subgraph::static_subgraph_request_handler,
@@ -352,6 +353,11 @@ impl IndexerService {
352353
.route("/info", get(operator_address))
353354
.layer(misc_rate_limiter);
354355

356+
// Check subgraph Health
357+
misc_routes = misc_routes
358+
.route("/subgraph/health/:deployment_id", get(health))
359+
.route_layer(Extension(options.config.graph_node));
360+
355361
// Rate limits by allowing bursts of 50 requests and requiring 20ms of
356362
// time between consecutive requests after that, effectively rate
357363
// limiting to 50 req/s.

common/src/indexer_service/http/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
mod config;
5+
mod health;
56
mod indexer_service;
67
mod request_handler;
78
mod static_subgraph;

common/src/subgraph_client/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use axum::body::Bytes;
77
use eventuals::Eventual;
88
use graphql_client::GraphQLQuery;
99
use reqwest::{header, Url};
10+
use serde::Serialize;
1011
use serde_json::{Map, Value};
1112
use thegraph_core::DeploymentId;
1213
use thegraph_graphql_http::{
@@ -15,7 +16,7 @@ use thegraph_graphql_http::{
1516
};
1617
use tracing::warn;
1718

18-
#[derive(Clone)]
19+
#[derive(Clone, Serialize)]
1920
pub struct Query {
2021
pub query: Document,
2122
pub variables: Map<String, Value>,

0 commit comments

Comments
 (0)