Skip to content

Commit 4cbac14

Browse files
committed
fix: use graph_client for response in subgraph health req
1 parent f2cbb6f commit 4cbac14

File tree

2 files changed

+49
-76
lines changed

2 files changed

+49
-76
lines changed

common/src/indexer_service/http/health.rs

Lines changed: 48 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,9 @@ use axum::{
99
use graphql_client::GraphQLQuery;
1010
use indexer_config::GraphNodeConfig;
1111
use reqwest::StatusCode;
12-
use serde::{Deserialize, Serialize};
1312
use serde_json::json;
1413
use thiserror::Error;
1514

16-
#[derive(Deserialize, Debug)]
17-
struct Response {
18-
data: SubgraphData,
19-
}
20-
21-
#[derive(Deserialize, Debug)]
22-
#[allow(non_snake_case)]
23-
struct SubgraphData {
24-
indexingStatuses: Vec<IndexingStatus>,
25-
}
26-
27-
#[derive(Deserialize, Debug)]
28-
#[allow(non_snake_case)]
29-
struct IndexingStatus {
30-
health: Health,
31-
fatalError: Option<Message>,
32-
nonFatalErrors: Vec<Message>,
33-
}
34-
35-
#[derive(Serialize, Deserialize, Debug)]
36-
struct Message {
37-
message: String,
38-
}
39-
4015
#[derive(GraphQLQuery)]
4116
#[graphql(
4217
schema_path = "../graphql/indexing_status.schema.graphql",
@@ -46,39 +21,31 @@ struct Message {
4621
)]
4722
pub struct HealthQuery;
4823

49-
#[derive(Deserialize, Debug)]
50-
#[allow(non_camel_case_types)]
51-
enum Health {
52-
healthy,
53-
unhealthy,
54-
failed,
55-
}
56-
57-
impl Health {
58-
fn as_str(&self) -> &str {
59-
match self {
60-
Health::healthy => "healthy",
61-
Health::unhealthy => "unhealthy",
62-
Health::failed => "failed",
63-
}
64-
}
65-
}
66-
6724
#[derive(Debug, Error)]
6825
pub enum CheckHealthError {
26+
#[error("Failed to send request")]
27+
RequestFailed,
28+
#[error("Failed to decode response")]
29+
BadResponse,
6930
#[error("Deployment not found")]
7031
DeploymentNotFound,
71-
#[error("Failed to process query")]
72-
QueryForwardingError,
32+
#[error("Invalid health status found")]
33+
InvalidHealthStatus,
7334
}
7435

7536
impl IntoResponse for CheckHealthError {
7637
fn into_response(self) -> AxumResponse {
7738
let (status, error_message) = match &self {
7839
CheckHealthError::DeploymentNotFound => (StatusCode::NOT_FOUND, "Deployment not found"),
79-
CheckHealthError::QueryForwardingError => {
80-
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to process query")
81-
}
40+
CheckHealthError::BadResponse => (
41+
StatusCode::INTERNAL_SERVER_ERROR,
42+
"Failed to decode response",
43+
),
44+
CheckHealthError::RequestFailed => (StatusCode::BAD_GATEWAY, "Failed to send request"),
45+
CheckHealthError::InvalidHealthStatus => (
46+
StatusCode::INTERNAL_SERVER_ERROR,
47+
"Invalid health status found",
48+
),
8249
};
8350

8451
let body = serde_json::json!({
@@ -102,32 +69,39 @@ pub async fn health(
10269
.post(graph_node.status_url)
10370
.json(&req_body)
10471
.send()
105-
.await;
106-
let res = response.expect("Failed to get response");
107-
let response_json: Result<Response, reqwest::Error> = res.json().await;
72+
.await
73+
.map_err(|_| CheckHealthError::RequestFailed)?;
10874

109-
match response_json {
110-
Ok(res) => {
111-
if res.data.indexingStatuses.is_empty() {
112-
return Err(CheckHealthError::DeploymentNotFound);
113-
};
114-
let status = &res.data.indexingStatuses[0];
115-
let health_response = match status.health {
116-
Health::healthy => json!({ "health": status.health.as_str() }),
117-
Health::unhealthy => {
118-
let errors: Vec<&String> = status
119-
.nonFatalErrors
120-
.iter()
121-
.map(|msg| &msg.message)
122-
.collect();
123-
json!({ "health": status.health.as_str(), "nonFatalErrors": errors })
124-
}
125-
Health::failed => {
126-
json!({ "health": status.health.as_str(), "fatalError": status.fatalError.as_ref().map_or("null", |msg| &msg.message) })
127-
}
128-
};
129-
Ok(Json(health_response))
130-
}
131-
Err(_) => Err(CheckHealthError::QueryForwardingError),
75+
let graphql_response: graphql_client::Response<health_query::ResponseData> = response
76+
.json()
77+
.await
78+
.map_err(|_| CheckHealthError::BadResponse)?;
79+
80+
let data = match (graphql_response.data, graphql_response.errors) {
81+
(Some(data), None) => data,
82+
(_, Some(_)) => return Err(CheckHealthError::BadResponse),
83+
(_, _) => return Err(CheckHealthError::BadResponse),
84+
};
85+
86+
if data.indexing_statuses.is_empty() {
87+
return Err(CheckHealthError::DeploymentNotFound);
13288
}
89+
90+
let status = &data.indexing_statuses[0];
91+
let health_response = match status.health {
92+
health_query::Health::healthy => json!({ "health": status.health }),
93+
health_query::Health::unhealthy => {
94+
let errors: Vec<&String> = status
95+
.non_fatal_errors
96+
.iter()
97+
.map(|msg| &msg.message)
98+
.collect();
99+
json!({ "health": status.health, "nonFatalErrors": errors })
100+
}
101+
health_query::Health::failed => {
102+
json!({ "health": status.health, "fatalError": status.fatal_error.as_ref().map_or("null", |msg| &msg.message) })
103+
}
104+
health_query::Health::Other(_) => return Err(CheckHealthError::InvalidHealthStatus),
105+
};
106+
Ok(Json(health_response))
133107
}

common/src/subgraph_client/client.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use anyhow::anyhow;
66
use axum::body::Bytes;
77
use graphql_client::GraphQLQuery;
88
use reqwest::{header, Url};
9-
use serde::Serialize;
109
use serde_json::{Map, Value};
1110
use thegraph_core::DeploymentId;
1211
use thegraph_graphql_http::{
@@ -16,7 +15,7 @@ use thegraph_graphql_http::{
1615
use tokio::sync::watch::Receiver;
1716
use tracing::warn;
1817

19-
#[derive(Clone, Serialize)]
18+
#[derive(Clone)]
2019
pub struct Query {
2120
pub query: Document,
2221
pub variables: Map<String, Value>,

0 commit comments

Comments
 (0)