Skip to content

Commit 50550e9

Browse files
committed
refactor: drop eventuals in favor of tokio watch + timers for subgraph monitor
1 parent 99cf66c commit 50550e9

File tree

11 files changed

+250
-195
lines changed

11 files changed

+250
-195
lines changed

common/src/allocations/monitor.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,19 +134,22 @@ mod test {
134134

135135
use super::*;
136136

137-
fn network_subgraph_client() -> &'static SubgraphClient {
138-
Box::leak(Box::new(SubgraphClient::new(
139-
reqwest::Client::new(),
140-
None,
141-
DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(),
142-
)))
137+
async fn network_subgraph_client() -> &'static SubgraphClient {
138+
Box::leak(Box::new(
139+
SubgraphClient::new(
140+
reqwest::Client::new(),
141+
None,
142+
DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(),
143+
)
144+
.await,
145+
))
143146
}
144147

145148
#[tokio::test]
146149
#[ignore = "depends on the defunct hosted-service"]
147150
async fn test_network_query() {
148151
let result = get_allocations(
149-
network_subgraph_client(),
152+
network_subgraph_client().await,
150153
Address::from_str("0x326c584e0f0eab1f1f83c93cc6ae1acc0feba0bc").unwrap(),
151154
Duration::from_secs(1712448507),
152155
)
@@ -158,7 +161,7 @@ mod test {
158161
#[ignore = "depends on the defunct hosted-service"]
159162
async fn test_network_query_empty_response() {
160163
let result = get_allocations(
161-
network_subgraph_client(),
164+
network_subgraph_client().await,
162165
Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(),
163166
Duration::from_secs(1712448507),
164167
)

common/src/attestations/dispute_manager.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ mod test {
6565
*test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT
6666
))
6767
.unwrap(),
68-
);
68+
)
69+
.await;
6970

7071
// Mock result for current epoch requests
7172
mock_server

common/src/escrow_accounts.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -213,16 +213,19 @@ mod tests {
213213
async fn test_current_accounts() {
214214
// Set up a mock escrow subgraph
215215
let mock_server = MockServer::start().await;
216-
let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new(
217-
reqwest::Client::new(),
218-
None,
219-
DeploymentDetails::for_query_url(&format!(
220-
"{}/subgraphs/id/{}",
221-
&mock_server.uri(),
222-
*test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT
223-
))
224-
.unwrap(),
225-
)));
216+
let escrow_subgraph = Box::leak(Box::new(
217+
SubgraphClient::new(
218+
reqwest::Client::new(),
219+
None,
220+
DeploymentDetails::for_query_url(&format!(
221+
"{}/subgraphs/id/{}",
222+
&mock_server.uri(),
223+
*test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT
224+
))
225+
.unwrap(),
226+
)
227+
.await,
228+
));
226229

227230
let mock = Mock::given(method("POST"))
228231
.and(path(format!(

common/src/indexer_service/http/indexer_service.rs

Lines changed: 56 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -205,36 +205,39 @@ impl IndexerService {
205205
.build()
206206
.expect("Failed to init HTTP client");
207207

208-
let network_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new(
209-
http_client.clone(),
210-
options
211-
.config
212-
.subgraphs
213-
.network
214-
.config
215-
.deployment_id
216-
.map(|deployment| {
217-
DeploymentDetails::for_graph_node_url(
218-
options.config.graph_node.status_url.clone(),
219-
options.config.graph_node.query_url.clone(),
220-
deployment,
221-
)
222-
})
223-
.transpose()
224-
.expect(
225-
"Failed to parse graph node query endpoint and network subgraph deployment",
226-
),
227-
DeploymentDetails::for_query_url_with_token(
228-
options.config.subgraphs.network.config.query_url.as_ref(),
208+
let network_subgraph: &'static SubgraphClient = Box::leak(Box::new(
209+
SubgraphClient::new(
210+
http_client.clone(),
229211
options
230212
.config
231213
.subgraphs
232214
.network
233215
.config
234-
.query_auth_token
235-
.clone(),
236-
)?,
237-
)));
216+
.deployment_id
217+
.map(|deployment| {
218+
DeploymentDetails::for_graph_node_url(
219+
options.config.graph_node.status_url.clone(),
220+
options.config.graph_node.query_url.clone(),
221+
deployment,
222+
)
223+
})
224+
.transpose()
225+
.expect(
226+
"Failed to parse graph node query endpoint and network subgraph deployment",
227+
),
228+
DeploymentDetails::for_query_url_with_token(
229+
options.config.subgraphs.network.config.query_url.as_ref(),
230+
options
231+
.config
232+
.subgraphs
233+
.network
234+
.config
235+
.query_auth_token
236+
.clone(),
237+
)?,
238+
)
239+
.await,
240+
));
238241

239242
// Identify the dispute manager for the configured network
240243
let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(3600))
@@ -269,34 +272,39 @@ impl IndexerService {
269272
dispute_manager,
270273
);
271274

272-
let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new(
273-
http_client,
274-
options
275-
.config
276-
.subgraphs
277-
.escrow
278-
.config
279-
.deployment_id
280-
.map(|deployment| {
281-
DeploymentDetails::for_graph_node_url(
282-
options.config.graph_node.status_url.clone(),
283-
options.config.graph_node.query_url.clone(),
284-
deployment,
285-
)
286-
})
287-
.transpose()
288-
.expect("Failed to parse graph node query endpoint and escrow subgraph deployment"),
289-
DeploymentDetails::for_query_url_with_token(
290-
options.config.subgraphs.escrow.config.query_url.as_ref(),
275+
let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new(
276+
SubgraphClient::new(
277+
http_client,
291278
options
292279
.config
293280
.subgraphs
294281
.escrow
295282
.config
296-
.query_auth_token
297-
.clone(),
298-
)?,
299-
)));
283+
.deployment_id
284+
.map(|deployment| {
285+
DeploymentDetails::for_graph_node_url(
286+
options.config.graph_node.status_url.clone(),
287+
options.config.graph_node.query_url.clone(),
288+
deployment,
289+
)
290+
})
291+
.transpose()
292+
.expect(
293+
"Failed to parse graph node query endpoint and escrow subgraph deployment",
294+
),
295+
DeploymentDetails::for_query_url_with_token(
296+
options.config.subgraphs.escrow.config.query_url.as_ref(),
297+
options
298+
.config
299+
.subgraphs
300+
.escrow
301+
.config
302+
.query_auth_token
303+
.clone(),
304+
)?,
305+
)
306+
.await,
307+
));
300308

301309
let escrow_accounts = escrow_accounts(
302310
escrow_subgraph,

common/src/subgraph_client/client.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
use super::monitor::{monitor_deployment_status, DeploymentStatus};
55
use anyhow::anyhow;
66
use axum::body::Bytes;
7-
use eventuals::Eventual;
87
use graphql_client::GraphQLQuery;
98
use reqwest::{header, Url};
109
use serde_json::{Map, Value};
@@ -13,6 +12,7 @@ use thegraph_graphql_http::{
1312
graphql::{Document, IntoDocument},
1413
http::request::{IntoRequestParameters, RequestParameters},
1514
};
15+
use tokio::sync::watch::Receiver;
1616
use tracing::warn;
1717

1818
#[derive(Clone)]
@@ -140,18 +140,24 @@ impl DeploymentDetails {
140140

141141
struct DeploymentClient {
142142
pub http_client: reqwest::Client,
143-
pub status: Option<Eventual<DeploymentStatus>>,
143+
pub status: Option<Receiver<DeploymentStatus>>,
144144
pub query_url: Url,
145145
}
146146

147147
impl DeploymentClient {
148-
pub fn new(http_client: reqwest::Client, details: DeploymentDetails) -> Self {
148+
pub async fn new(http_client: reqwest::Client, details: DeploymentDetails) -> Self {
149149
Self {
150150
http_client,
151-
status: details
152-
.deployment
153-
.zip(details.status_url)
154-
.map(|(deployment, url)| monitor_deployment_status(deployment, url)),
151+
status: match details.deployment.zip(details.status_url) {
152+
Some((deployment, url)) => Some(
153+
monitor_deployment_status(deployment, url)
154+
.await
155+
.expect(&format!(
156+
"Failed to initialize monitoring for deployment {deployment}"
157+
)),
158+
),
159+
None => None,
160+
},
155161
query_url: details.query_url,
156162
}
157163
}
@@ -161,7 +167,7 @@ impl DeploymentClient {
161167
variables: T::Variables,
162168
) -> Result<ResponseResult<T::ResponseData>, anyhow::Error> {
163169
if let Some(ref status) = self.status {
164-
let deployment_status = status.value().await.expect("reading deployment status");
170+
let deployment_status = status.borrow();
165171

166172
if !deployment_status.synced || &deployment_status.health != "healthy" {
167173
return Err(anyhow!(
@@ -198,7 +204,7 @@ impl DeploymentClient {
198204

199205
pub async fn query_raw(&self, body: Bytes) -> Result<reqwest::Response, anyhow::Error> {
200206
if let Some(ref status) = self.status {
201-
let deployment_status = status.value().await.expect("reading deployment status");
207+
let deployment_status = status.borrow();
202208

203209
if !deployment_status.synced || &deployment_status.health != "healthy" {
204210
return Err(anyhow!(
@@ -226,14 +232,17 @@ pub struct SubgraphClient {
226232
}
227233

228234
impl SubgraphClient {
229-
pub fn new(
235+
pub async fn new(
230236
http_client: reqwest::Client,
231237
local_deployment: Option<DeploymentDetails>,
232238
remote_deployment: DeploymentDetails,
233239
) -> Self {
234240
Self {
235-
local_client: local_deployment.map(|d| DeploymentClient::new(http_client.clone(), d)),
236-
remote_client: DeploymentClient::new(http_client, remote_deployment),
241+
local_client: match local_deployment {
242+
Some(d) => Some(DeploymentClient::new(http_client.clone(), d).await),
243+
None => None,
244+
},
245+
remote_client: DeploymentClient::new(http_client, remote_deployment).await,
237246
}
238247
}
239248

@@ -335,12 +344,13 @@ mod test {
335344
mock_server
336345
}
337346

338-
fn network_subgraph_client() -> SubgraphClient {
347+
async fn network_subgraph_client() -> SubgraphClient {
339348
SubgraphClient::new(
340349
reqwest::Client::new(),
341350
None,
342351
DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(),
343352
)
353+
.await
344354
}
345355

346356
#[derive(GraphQLQuery)]
@@ -359,6 +369,7 @@ mod test {
359369

360370
// Check that the response is valid JSON
361371
let result = network_subgraph_client()
372+
.await
362373
.query::<CurrentEpoch, _>(current_epoch::Variables {})
363374
.await
364375
.unwrap();
@@ -447,6 +458,7 @@ mod test {
447458

448459
// Query the subgraph
449460
let data = client
461+
.await
450462
.query::<UserQuery, _>(user_query::Variables {})
451463
.await
452464
.expect("Query should succeed")
@@ -527,6 +539,7 @@ mod test {
527539

528540
// Query the subgraph
529541
let data = client
542+
.await
530543
.query::<UserQuery, _>(user_query::Variables {})
531544
.await
532545
.expect("Query should succeed")
@@ -607,6 +620,7 @@ mod test {
607620

608621
// Query the subgraph
609622
let data = client
623+
.await
610624
.query::<UserQuery, _>(user_query::Variables {})
611625
.await
612626
.expect("Query should succeed")

0 commit comments

Comments
 (0)