Skip to content

Commit b22348e

Browse files
migrate2watch deployment_status
1 parent a25dcfd commit b22348e

File tree

2 files changed

+40
-34
lines changed

2 files changed

+40
-34
lines changed

common/src/subgraph_client/client.rs

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
54
use super::monitor::{monitor_deployment_status, DeploymentStatus};
65
use anyhow::anyhow;
76
use axum::body::Bytes;
@@ -161,19 +160,7 @@ impl DeploymentClient {
161160
&self,
162161
variables: T::Variables,
163162
) -> Result<ResponseResult<T::ResponseData>, anyhow::Error> {
164-
if let Some(ref status_rx) = self.status {
165-
//change
166-
while status_rx.borrow().is_none() {dbg!("looping");};
167-
let some_ref = status_rx.borrow();
168-
let deployment_status = some_ref.as_ref().unwrap();
169-
170-
if !deployment_status.synced || &deployment_status.health != "healthy" {
171-
return Err(anyhow!(
172-
"Deployment `{}` is not ready or healthy to be queried",
173-
self.query_url
174-
));
175-
}
176-
}
163+
self.check_deployment_status()?;
177164

178165
let body = T::build_query(variables);
179166
let reqwest_response = self
@@ -194,20 +181,7 @@ impl DeploymentClient {
194181
}
195182

196183
pub async fn query_raw(&self, body: Bytes) -> Result<reqwest::Response, anyhow::Error> {
197-
if let Some(ref status_rx) = self.status {
198-
//change
199-
while status_rx.borrow().is_none() {dbg!("looping");};
200-
let some_ref = status_rx.borrow();
201-
let deployment_status = some_ref.as_ref().unwrap();
202-
203-
if !deployment_status.synced || &deployment_status.health != "healthy" {
204-
return Err(anyhow!(
205-
"Deployment `{}` is not ready or healthy to be queried",
206-
self.query_url
207-
));
208-
}
209-
}
210-
184+
self.check_deployment_status()?;
211185
Ok(self
212186
.http_client
213187
.post(self.query_url.as_ref())
@@ -217,6 +191,27 @@ impl DeploymentClient {
217191
.send()
218192
.await?)
219193
}
194+
fn check_deployment_status(&self)->Result<(), anyhow::Error>{
195+
if let Some(ref status_rx) = self.status {
196+
let status_ref = status_rx.borrow();
197+
let status = status_ref.as_ref();
198+
match status{
199+
Some(deployment_status)=>{
200+
if !deployment_status.synced || &deployment_status.health != "healthy" {
201+
return Err(anyhow!(
202+
"Deployment `{}` is not ready or healthy to be queried",
203+
self.query_url
204+
));
205+
}
206+
}
207+
None => return Err(anyhow!(
208+
"Deployment `{}` is not available",
209+
self.query_url
210+
))
211+
}
212+
}
213+
Ok(())
214+
}
220215
}
221216

222217
/// Client for a subgraph that can fall back from a local deployment to a remote query URL
@@ -299,6 +294,7 @@ impl SubgraphClient {
299294
#[cfg(test)]
300295
mod test {
301296
use std::str::FromStr;
297+
use std::time::Duration;
302298

303299
use serde_json::json;
304300
use wiremock::matchers::{method, path};
@@ -445,6 +441,9 @@ mod test {
445441
.unwrap(),
446442
);
447443

444+
// Waiting for status to propegate
445+
tokio::time::sleep(Duration::from_millis(1000)).await;
446+
448447
// Query the subgraph
449448
let data = client
450449
.query::<UserQuery, _>(user_query::Variables {})
@@ -525,6 +524,9 @@ mod test {
525524
.unwrap(),
526525
);
527526

527+
// Waiting for status to propegate
528+
tokio::time::sleep(Duration::from_millis(100)).await;
529+
528530
// Query the subgraph
529531
let data = client
530532
.query::<UserQuery, _>(user_query::Variables {})
@@ -605,6 +607,9 @@ mod test {
605607
.unwrap(),
606608
);
607609

610+
// Waiting for status to propegate
611+
tokio::time::sleep(Duration::from_millis(100)).await;
612+
608613
// Query the subgraph
609614
let data = client
610615
.query::<UserQuery, _>(user_query::Variables {})
@@ -614,4 +619,4 @@ mod test {
614619

615620
assert_eq!(data.user.name, "remote".to_string());
616621
}
617-
}
622+
}

common/src/subgraph_client/monitor.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ pub fn monitor_deployment_status(
6868
});
6969
match result{
7070
Ok(deployment_status) => {
71+
dbg!("Sending......");
7172
tx
7273
.send(Some(deployment_status))
7374
.expect("Failed to update deployment_status channel");
@@ -77,7 +78,7 @@ pub fn monitor_deployment_status(
7778
"Error querying deployment status for `{}`: {}",
7879
deployment, err
7980
);
80-
sleep(Duration::from_secs(15)).await
81+
sleep(Duration::from_secs(15)).await;
8182
},
8283
}
8384
}
@@ -122,7 +123,7 @@ mod tests {
122123
.await;
123124

124125
let mut status_rx = monitor_deployment_status(deployment, status_url);
125-
status_rx.changed().await;
126+
status_rx.changed().await.unwrap();
126127
let status_ref = status_rx.borrow();
127128
assert_eq!(
128129
status_ref.as_ref().unwrap().clone(),
@@ -161,7 +162,7 @@ mod tests {
161162
.await;
162163

163164
let mut status_rx = monitor_deployment_status(deployment, status_url);
164-
status_rx.changed().await;
165+
status_rx.changed().await.unwrap();
165166
let status_ref = status_rx.borrow();
166167
assert_eq!(
167168
status_ref.as_ref().unwrap().clone(),
@@ -200,7 +201,7 @@ mod tests {
200201
.await;
201202

202203
let mut status_rx = monitor_deployment_status(deployment, status_url);
203-
status_rx.changed().await;
204+
status_rx.changed().await.unwrap();
204205
let status_ref = status_rx.borrow();
205206
assert_eq!(
206207
status_ref.as_ref().unwrap().clone(),
@@ -239,7 +240,7 @@ mod tests {
239240
.await;
240241

241242
let mut status_rx = monitor_deployment_status(deployment, status_url);
242-
status_rx.changed().await;
243+
status_rx.changed().await.unwrap();
243244
let status_ref = status_rx.borrow();
244245
assert_eq!(
245246
status_ref.as_ref().unwrap().clone(),

0 commit comments

Comments
 (0)