Skip to content

Commit 3f49109

Browse files
removed eventuals
1 parent b22348e commit 3f49109

File tree

10 files changed

+85
-119
lines changed

10 files changed

+85
-119
lines changed

Cargo.lock

Lines changed: 0 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ prometheus = "0.13.3"
1818
anyhow = { version = "1.0.72" }
1919
thiserror = "1.0.49"
2020
async-trait = "0.1.72"
21-
eventuals = "0.6.7"
2221
reqwest = { version = "0.12", features = [
2322
"charset",
2423
"h2",

common/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ thiserror.workspace = true
99
async-trait.workspace = true
1010
alloy.workspace = true
1111
anyhow.workspace = true
12-
eventuals.workspace = true
1312
reqwest.workspace = true
1413
sqlx.workspace = true
1514
tap_core.workspace = true

common/src/indexer_service/http/indexer_service.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ where
118118
}
119119

120120
let status = match self {
121-
122121
Unauthorized => StatusCode::UNAUTHORIZED,
123122

124123
NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR,

common/src/subgraph_client/client.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -191,23 +191,20 @@ impl DeploymentClient {
191191
.send()
192192
.await?)
193193
}
194-
fn check_deployment_status(&self)->Result<(), anyhow::Error>{
194+
fn check_deployment_status(&self) -> Result<(), anyhow::Error> {
195195
if let Some(ref status_rx) = self.status {
196196
let status_ref = status_rx.borrow();
197197
let status = status_ref.as_ref();
198-
match status{
199-
Some(deployment_status)=>{
200-
if !deployment_status.synced || &deployment_status.health != "healthy" {
198+
match status {
199+
Some(deployment_status) => {
200+
if !deployment_status.synced || &deployment_status.health != "healthy" {
201201
return Err(anyhow!(
202202
"Deployment `{}` is not ready or healthy to be queried",
203203
self.query_url
204204
));
205205
}
206206
}
207-
None => return Err(anyhow!(
208-
"Deployment `{}` is not available",
209-
self.query_url
210-
))
207+
None => return Err(anyhow!("Deployment `{}` is not available", self.query_url)),
211208
}
212209
}
213210
Ok(())
@@ -443,7 +440,7 @@ mod test {
443440

444441
// Waiting for status to propegate
445442
tokio::time::sleep(Duration::from_millis(1000)).await;
446-
443+
447444
// Query the subgraph
448445
let data = client
449446
.query::<UserQuery, _>(user_query::Variables {})
@@ -619,4 +616,4 @@ mod test {
619616

620617
assert_eq!(data.user.name, "remote".to_string());
621618
}
622-
}
619+
}

common/src/subgraph_client/monitor.rs

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::time::Duration;
54
use reqwest::Url;
65
use serde::Deserialize;
76
use serde_json::json;
7+
use std::time::Duration;
88
use thegraph_core::DeploymentId;
99
use thegraph_graphql_http::{
1010
http::request::IntoRequestParameters,
1111
http_client::{ReqwestExt, ResponseResult},
1212
};
13-
use tokio::{sync::watch::{self, Receiver}, time::{self, sleep}};
13+
use tokio::{
14+
sync::watch::{self, Receiver},
15+
time::{self, sleep},
16+
};
1417
use tracing::warn;
1518

1619
use super::Query;
@@ -57,29 +60,26 @@ pub fn monitor_deployment_status(
5760
[("ids", json!([deployment.to_string()]))],
5861
);
5962
let response = query::<DeploymentStatusResponse>(status_url, body)
60-
.await
61-
.map_err(|e| {
62-
format!("Failed to query status of deployment `{deployment}`: {e}")
63-
}).unwrap();
63+
.await
64+
.map_err(|e| format!("Failed to query status of deployment `{deployment}`: {e}"))
65+
.unwrap();
6466
let result = response.map_err(|e| format!("{e}")).and_then(|data| {
6567
data.indexing_statuses
6668
.and_then(|statuses| statuses.first().cloned())
6769
.ok_or_else(|| format!("Deployment `{deployment}` not found"))
6870
});
69-
match result{
71+
match result {
7072
Ok(deployment_status) => {
71-
dbg!("Sending......");
72-
tx
73-
.send(Some(deployment_status))
74-
.expect("Failed to update deployment_status channel");
75-
},
76-
Err(err) =>{
73+
tx.send(Some(deployment_status))
74+
.expect("Failed to update deployment_status channel");
75+
}
76+
Err(err) => {
7777
warn!(
7878
"Error querying deployment status for `{}`: {}",
7979
deployment, err
8080
);
8181
sleep(Duration::from_secs(15)).await;
82-
},
82+
}
8383
}
8484
}
8585
});
@@ -122,15 +122,15 @@ mod tests {
122122
.mount(&mock_server)
123123
.await;
124124

125-
let mut status_rx = monitor_deployment_status(deployment, status_url);
126-
status_rx.changed().await.unwrap();
127-
let status_ref = status_rx.borrow();
128-
assert_eq!(
129-
status_ref.as_ref().unwrap().clone(),
130-
DeploymentStatus {
131-
synced: true,
132-
health: "healthy".to_string()
133-
}
125+
let mut status_rx = monitor_deployment_status(deployment, status_url);
126+
status_rx.changed().await.unwrap();
127+
let status_ref = status_rx.borrow();
128+
assert_eq!(
129+
status_ref.as_ref().unwrap().clone(),
130+
DeploymentStatus {
131+
synced: true,
132+
health: "healthy".to_string()
133+
}
134134
);
135135
}
136136

@@ -161,15 +161,15 @@ mod tests {
161161
.mount(&mock_server)
162162
.await;
163163

164-
let mut status_rx = monitor_deployment_status(deployment, status_url);
165-
status_rx.changed().await.unwrap();
166-
let status_ref = status_rx.borrow();
167-
assert_eq!(
168-
status_ref.as_ref().unwrap().clone(),
169-
DeploymentStatus {
170-
synced: false,
171-
health: "healthy".to_string()
172-
}
164+
let mut status_rx = monitor_deployment_status(deployment, status_url);
165+
status_rx.changed().await.unwrap();
166+
let status_ref = status_rx.borrow();
167+
assert_eq!(
168+
status_ref.as_ref().unwrap().clone(),
169+
DeploymentStatus {
170+
synced: false,
171+
health: "healthy".to_string()
172+
}
173173
);
174174
}
175175

@@ -200,15 +200,15 @@ mod tests {
200200
.mount(&mock_server)
201201
.await;
202202

203-
let mut status_rx = monitor_deployment_status(deployment, status_url);
204-
status_rx.changed().await.unwrap();
205-
let status_ref = status_rx.borrow();
206-
assert_eq!(
207-
status_ref.as_ref().unwrap().clone(),
208-
DeploymentStatus {
209-
synced: true,
210-
health: "unhealthy".to_string()
211-
}
203+
let mut status_rx = monitor_deployment_status(deployment, status_url);
204+
status_rx.changed().await.unwrap();
205+
let status_ref = status_rx.borrow();
206+
assert_eq!(
207+
status_ref.as_ref().unwrap().clone(),
208+
DeploymentStatus {
209+
synced: true,
210+
health: "unhealthy".to_string()
211+
}
212212
);
213213
}
214214

tap-agent/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ reqwest.workspace = true
1919
serde.workspace = true
2020
serde_json.workspace = true
2121
thiserror.workspace = true
22-
eventuals.workspace = true
2322
tracing.workspace = true
2423
prometheus.workspace = true
2524
axum.workspace = true

tap-agent/src/tap/context/checks/allocation_id.rs

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,21 @@ use std::time::Duration;
55

66
use alloy::primitives::Address;
77
use anyhow::anyhow;
8-
use eventuals::{Eventual, EventualExt};
98
use graphql_client::GraphQLQuery;
109
use indexer_common::subgraph_client::SubgraphClient;
1110
use tap_core::receipt::{
1211
checks::{Check, CheckError, CheckResult},
1312
state::Checking,
1413
ReceiptWithState,
1514
};
16-
use tokio::time::sleep;
15+
use tokio::{
16+
sync::watch::{self, Receiver},
17+
time::{self, sleep},
18+
};
1719
use tracing::error;
1820

1921
pub struct AllocationId {
20-
tap_allocation_redeemed: Eventual<bool>,
22+
tap_allocation_redeemed: Receiver<Option<bool>>,
2123
allocation_id: Address,
2224
}
2325

@@ -29,7 +31,7 @@ impl AllocationId {
2931
allocation_id: Address,
3032
escrow_subgraph: &'static SubgraphClient,
3133
) -> Self {
32-
let tap_allocation_redeemed = tap_allocation_redeemed_eventual(
34+
let tap_allocation_redeemed = tap_allocation_redeemed_receiver(
3335
allocation_id,
3436
sender_id,
3537
indexer_address,
@@ -56,46 +58,56 @@ impl Check for AllocationId {
5658
};
5759

5860
// Check that the allocation ID is not redeemed yet for this consumer
59-
match self.tap_allocation_redeemed.value().await {
60-
Ok(false) => Ok(()),
61-
Ok(true) => Err(CheckError::Failed(anyhow!(
61+
match *self.tap_allocation_redeemed.borrow() {
62+
Some(false) => Ok(()),
63+
Some(true) => Err(CheckError::Failed(anyhow!(
6264
"Allocation {} already redeemed",
6365
allocation_id
6466
))),
65-
Err(e) => Err(CheckError::Retryable(anyhow!(
66-
"Could not get allocation escrow redemption status from eventual: {:?}",
67-
e
67+
None => Err(CheckError::Retryable(anyhow!(
68+
"Could not get allocation escrow redemption status from channel"
6869
))),
6970
}
7071
}
7172
}
7273

73-
fn tap_allocation_redeemed_eventual(
74+
fn tap_allocation_redeemed_receiver(
7475
allocation_id: Address,
7576
sender_address: Address,
7677
indexer_address: Address,
7778
escrow_subgraph: &'static SubgraphClient,
7879
escrow_polling_interval: Duration,
79-
) -> Eventual<bool> {
80-
eventuals::timer(escrow_polling_interval).map_with_retry(
81-
move |_| async move {
82-
query_escrow_check_transactions(
80+
) -> Receiver<Option<bool>> {
81+
let (tx, rx) = watch::channel(None);
82+
tokio::spawn(async move {
83+
let mut time_interval = time::interval(escrow_polling_interval);
84+
time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
85+
loop {
86+
time_interval.tick().await;
87+
let result = query_escrow_check_transactions(
8388
allocation_id,
8489
sender_address,
8590
indexer_address,
8691
escrow_subgraph,
8792
)
8893
.await
89-
.map_err(|e| e.to_string())
90-
},
91-
move |error: String| {
92-
error!(
93-
"Failed to check the escrow redeem status for allocation {} and sender {}: {}",
94-
allocation_id, sender_address, error
95-
);
96-
sleep(escrow_polling_interval.div_f32(2.))
97-
},
98-
)
94+
.map_err(|e| e.to_string());
95+
match result {
96+
Ok(tap_allocation_redeemed) => {
97+
tx.send(Some(tap_allocation_redeemed))
98+
.expect("Failed to update tap_allocation_redeemed channel");
99+
}
100+
Err(err) => {
101+
error!(
102+
"Failed to check the escrow redeem status for allocation {} and sender {}: {}",
103+
allocation_id, sender_address, err
104+
);
105+
sleep(escrow_polling_interval.div_f32(2.)).await;
106+
}
107+
}
108+
}
109+
});
110+
rx
99111
}
100112

101113
#[derive(GraphQLQuery)]

0 commit comments

Comments
 (0)