3
3
4
4
use std:: time:: Duration ;
5
5
6
+ use deployment_status_query:: Health ;
7
+ use graphql_client:: GraphQLQuery ;
6
8
use reqwest:: Url ;
7
9
use serde:: Deserialize ;
8
- use serde_json:: json;
9
10
use thegraph_core:: DeploymentId ;
10
- use thegraph_graphql_http:: {
11
- http:: request:: IntoRequestParameters ,
12
- http_client:: { ReqwestExt , ResponseResult } ,
13
- } ;
14
11
use tokio:: sync:: watch:: Receiver ;
15
12
16
13
use crate :: watcher:: new_watcher;
17
14
18
- use super :: Query ;
19
-
20
- #[ derive( Deserialize ) ]
21
- #[ serde( rename_all = "camelCase" ) ]
22
- struct DeploymentStatusResponse {
23
- indexing_statuses : Vec < DeploymentStatus > ,
24
- }
15
+ #[ derive( GraphQLQuery ) ]
16
+ #[ graphql(
17
+ schema_path = "../graphql/indexing_status.schema.graphql" ,
18
+ query_path = "../graphql/subgraph_deployment_status.graphql" ,
19
+ response_derives = "Debug" ,
20
+ variables_derives = "Clone"
21
+ ) ]
22
+ pub struct DeploymentStatusQuery ;
25
23
26
24
#[ derive( Clone , Debug , Deserialize , Eq , PartialEq ) ]
27
25
pub struct DeploymentStatus {
28
26
pub synced : bool ,
29
27
pub health : String ,
30
28
}
31
29
32
- async fn query < T : for < ' de > Deserialize < ' de > > (
33
- url : Url ,
34
- query : impl IntoRequestParameters + Send ,
35
- ) -> Result < ResponseResult < T > , anyhow:: Error > {
36
- Ok ( reqwest:: Client :: new ( ) . post ( url) . send_graphql ( query) . await ?)
37
- }
38
-
39
30
pub async fn monitor_deployment_status (
40
31
deployment : DeploymentId ,
41
32
status_url : Url ,
@@ -50,28 +41,28 @@ pub async fn check_deployment_status(
50
41
deployment : DeploymentId ,
51
42
status_url : Url ,
52
43
) -> Result < DeploymentStatus , anyhow:: Error > {
53
- let body = Query :: new_with_variables (
54
- r#"
55
- query indexingStatuses($ids: [String!]!) {
56
- indexingStatuses(subgraphs: $ids) {
57
- synced
58
- health
59
- }
60
- }
61
- "# ,
62
- [ ( "ids" , json ! ( [ deployment. to_string( ) ] ) ) ] ,
63
- ) ;
64
-
65
- let response = query :: < DeploymentStatusResponse > ( status_url, body) . await ?;
66
-
67
- match response {
68
- Ok ( deployment_status) => deployment_status
44
+ let req_body = DeploymentStatusQuery :: build_query ( deployment_status_query:: Variables {
45
+ ids : vec ! [ deployment. to_string( ) ] ,
46
+ } ) ;
47
+ let client = reqwest:: Client :: new ( ) ;
48
+ let response = client. post ( status_url) . json ( & req_body) . send ( ) . await ?;
49
+ let graphql_response: graphql_client:: Response < deployment_status_query:: ResponseData > =
50
+ response. json ( ) . await ?;
51
+ match graphql_response. data {
52
+ Some ( data) => data
69
53
. indexing_statuses
70
54
. first ( )
71
- . cloned ( )
55
+ . map ( |status| DeploymentStatus {
56
+ synced : status. synced ,
57
+ health : match status. health {
58
+ Health :: healthy => "healthy" . to_owned ( ) ,
59
+ Health :: unhealthy => "unhealthy" . to_owned ( ) ,
60
+ _ => "failed" . to_owned ( ) ,
61
+ } ,
62
+ } )
72
63
. ok_or_else ( || anyhow:: anyhow!( "Deployment `{deployment}` not found" ) ) ,
73
- Err ( e ) => Err ( anyhow:: anyhow!(
74
- "Failed to query status of deployment `{deployment}`: {e} "
64
+ None => Err ( anyhow:: anyhow!(
65
+ "Failed to query status of deployment `{deployment}`"
75
66
) ) ,
76
67
}
77
68
}
@@ -80,6 +71,7 @@ pub async fn check_deployment_status(
80
71
mod tests {
81
72
use std:: str:: FromStr ;
82
73
74
+ use serde_json:: json;
83
75
use wiremock:: matchers:: { method, path} ;
84
76
use wiremock:: { Mock , MockServer , ResponseTemplate } ;
85
77
0 commit comments