Skip to content

Commit fefb057

Browse files
committed
refactor(all): use graphql_client crate
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent f931744 commit fefb057

18 files changed

+13943
-348
lines changed

Cargo.lock

Lines changed: 211 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ tower-http = { version = "0.5.2", features = [
3636
"trace",
3737
] }
3838
tokio-util = "0.7.10"
39+
serde-inline-default = "0.2.0"
40+
graphql_client = { version = "0.14.0", features = ["reqwest-rustls"] }
3941

4042
[dev-dependencies]
4143
env_logger = { version = "0.11.0", default-features = false }

common/src/allocations/monitor.rs

Lines changed: 56 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,58 @@
33

44
use std::{
55
collections::HashMap,
6+
str::FromStr,
67
time::{Duration, SystemTime, UNIX_EPOCH},
78
};
89

910
use super::Allocation;
1011
use crate::prelude::SubgraphClient;
12+
use alloy::primitives::U256;
1113
use eventuals::{timer, Eventual, EventualExt};
12-
use thegraph_core::Address;
14+
use graphql_client::GraphQLQuery;
15+
use thegraph_core::{Address, DeploymentId};
1316
use tokio::time::sleep;
1417
use tracing::warn;
1518

19+
type BigInt = String;
20+
type Bytes = String;
21+
22+
#[derive(GraphQLQuery)]
23+
#[graphql(
24+
schema_path = "../graphql/network.schema.graphql",
25+
query_path = "../graphql/allocations.query.graphql",
26+
response_derives = "Debug",
27+
variables_derives = "Clone"
28+
)]
29+
pub struct AllocationsQuery;
30+
31+
impl TryFrom<allocations_query::AllocationFragment> for Allocation {
32+
type Error = anyhow::Error;
33+
34+
fn try_from(
35+
value: allocations_query::AllocationsQueryAllocations,
36+
) -> Result<Self, Self::Error> {
37+
Ok(Self {
38+
id: Address::from_str(&value.id)?,
39+
status: super::AllocationStatus::Null,
40+
subgraph_deployment: super::SubgraphDeployment {
41+
id: DeploymentId::from_str(&value.id)?,
42+
denied_at: Some(value.subgraph_deployment.denied_at as u64),
43+
},
44+
indexer: Address::from_str(&value.indexer.id)?,
45+
allocated_tokens: U256::from_str(&value.allocated_tokens)?,
46+
created_at_epoch: value.created_at_epoch as u64,
47+
created_at_block_hash: value.created_at_block_hash,
48+
closed_at_epoch: value.closed_at_epoch.map(|v| v as u64),
49+
closed_at_epoch_start_block_hash: None,
50+
previous_epoch_start_block_hash: None,
51+
poi: None,
52+
query_fee_rebates: None,
53+
query_fees_collected: None,
54+
})
55+
}
56+
}
57+
1658
/// An always up-to-date list of an indexer's active and recently closed allocations.
1759
pub fn indexer_allocations(
1860
network_subgraph: &'static SubgraphClient,
@@ -56,49 +98,23 @@ pub async fn get_allocations(
5698
.expect("Time went backwards");
5799
let closed_at_threshold = since_the_epoch - recently_closed_allocation_buffer;
58100

59-
let query = format!(
60-
r#"
61-
allocations(
62-
block: $block
63-
orderBy: id
64-
orderDirection: asc
65-
first: $first
66-
where: {{
67-
and: [
68-
{{ id_gt: $last }}
69-
{{ indexer_: {{ id: "{}" }} }}
70-
{{
71-
or: [
72-
{{ status: Active }}
73-
{{ and: [{{ status: Closed, closedAt_gte: {} }}] }}
74-
]
75-
}}
76-
]
77-
}}
78-
) {{
79-
id
80-
indexer {{
81-
id
82-
}}
83-
allocatedTokens
84-
createdAtBlockHash
85-
createdAtEpoch
86-
closedAtEpoch
87-
subgraphDeployment {{
88-
id
89-
deniedAt
90-
}}
91-
}}
92-
"#,
93-
indexer_address.to_string().to_ascii_lowercase(),
94-
closed_at_threshold.as_secs(),
95-
);
96101
let responses = network_subgraph
97-
.paginated_query::<Allocation>(query, 200)
102+
.paginated_query::<AllocationsQuery, _>(
103+
allocations_query::Variables {
104+
indexer: indexer_address.to_string().to_ascii_lowercase(),
105+
closed_at_threshold: closed_at_threshold.as_secs() as i64,
106+
},
107+
200,
108+
)
98109
.await
99110
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
100111

101-
Ok(HashMap::from_iter(responses.into_iter().map(|a| (a.id, a))))
112+
Ok(HashMap::from_iter(
113+
responses
114+
.into_iter()
115+
.flat_map(|a| a.allocations)
116+
.map(|a| (Address::from_str(&a.id).unwrap(), a.try_into().unwrap())),
117+
))
102118
}
103119

104120
#[cfg(test)]

common/src/attestations/dispute_manager.rs

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,41 +4,32 @@
44
use std::time::Duration;
55

66
use eventuals::{timer, Eventual, EventualExt};
7-
use serde::Deserialize;
7+
use graphql_client::GraphQLQuery;
88
use thegraph_core::Address;
99
use tokio::time::sleep;
1010
use tracing::warn;
1111

12-
use crate::subgraph_client::{Query, SubgraphClient};
12+
use crate::subgraph_client::SubgraphClient;
13+
14+
type Bytes = Address;
15+
16+
#[derive(GraphQLQuery)]
17+
#[graphql(
18+
schema_path = "../graphql/network.schema.graphql",
19+
query_path = "../graphql/dispute.query.graphql",
20+
response_derives = "Debug",
21+
variables_derives = "Clone"
22+
)]
23+
struct DisputeManager;
1324

1425
pub fn dispute_manager(
1526
network_subgraph: &'static SubgraphClient,
1627
interval: Duration,
1728
) -> Eventual<Address> {
18-
#[derive(Deserialize)]
19-
#[serde(rename_all = "camelCase")]
20-
struct DisputeManagerResponse {
21-
graph_network: Option<GraphNetwork>,
22-
}
23-
24-
#[derive(Deserialize)]
25-
#[serde(rename_all = "camelCase")]
26-
struct GraphNetwork {
27-
dispute_manager: Address,
28-
}
29-
3029
timer(interval).map_with_retry(
3130
move |_| async move {
3231
let response = network_subgraph
33-
.query::<DisputeManagerResponse>(Query::new(
34-
r#"
35-
query network {
36-
graphNetwork(id: 1) {
37-
disputeManager
38-
}
39-
}
40-
"#,
41-
))
32+
.query::<DisputeManager, _>(dispute_manager::Variables {})
4233
.await
4334
.map_err(|e| e.to_string())?;
4435

@@ -49,7 +40,7 @@ pub fn dispute_manager(
4940
})
5041
},
5142
move |err: String| {
52-
warn!("Failed to query dispute manager for network: {}", err,);
43+
warn!("Failed to query dispute manager: {}", err);
5344

5445
// Sleep for a bit before we retry
5546
sleep(interval.div_f32(2.0))

common/src/escrow_accounts.rs

Lines changed: 41 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ use std::{
1010
use alloy::primitives::U256;
1111
use anyhow::Result;
1212
use eventuals::{timer, Eventual, EventualExt};
13-
use serde::Deserialize;
13+
use graphql_client::GraphQLQuery;
1414
use thegraph_core::Address;
1515
use thiserror::Error;
1616
use tokio::time::sleep;
1717
use tracing::{error, warn};
1818

19-
use crate::prelude::{Query, SubgraphClient};
19+
use crate::prelude::SubgraphClient;
2020

2121
#[derive(Error, Debug)]
2222
pub enum EscrowAccountsError {
@@ -89,110 +89,59 @@ impl EscrowAccounts {
8989
}
9090
}
9191

92+
type BigInt = U256;
93+
94+
#[derive(GraphQLQuery)]
95+
#[graphql(
96+
schema_path = "../graphql/tap.schema.graphql",
97+
query_path = "../graphql/escrow_account.query.graphql",
98+
response_derives = "Debug",
99+
variables_derives = "Clone"
100+
)]
101+
pub struct EscrowAccountQuery;
102+
92103
pub fn escrow_accounts(
93104
escrow_subgraph: &'static SubgraphClient,
94105
indexer_address: Address,
95106
interval: Duration,
96107
reject_thawing_signers: bool,
97108
) -> Eventual<EscrowAccounts> {
98-
// Types for deserializing the network subgraph response
99-
#[derive(Deserialize)]
100-
#[serde(rename_all = "camelCase")]
101-
struct EscrowAccountsResponse {
102-
escrow_accounts: Vec<EscrowAccount>,
103-
}
104-
// Note that U256's serde implementation is based on serializing the internal bytes, not the string decimal
105-
// representation. This is why we deserialize them as strings below.
106-
#[derive(Deserialize)]
107-
#[serde(rename_all = "camelCase")]
108-
struct EscrowAccount {
109-
balance: String,
110-
total_amount_thawing: String,
111-
sender: Sender,
112-
}
113-
#[derive(Deserialize)]
114-
#[serde(rename_all = "camelCase")]
115-
struct Sender {
116-
id: Address,
117-
signers: Vec<Signer>,
118-
}
119-
#[derive(Deserialize)]
120-
#[serde(rename_all = "camelCase")]
121-
struct Signer {
122-
id: Address,
123-
}
124-
125-
// thawEndTimestamp == 0 means that the signer is not thawing. This also means
126-
// that we don't wait for the thawing period to end before stopping serving
127-
// queries for this signer.
128-
// isAuthorized == true means that the signer is still authorized to sign
129-
// payments in the name of the sender.
130-
let query = if reject_thawing_signers {
131-
r#"
132-
query ($indexer: ID!) {
133-
escrowAccounts(where: {receiver_: {id: $indexer}}) {
134-
balance
135-
totalAmountThawing
136-
sender {
137-
id
138-
signers(
139-
where: {thawEndTimestamp: "0", isAuthorized: true}
140-
) {
141-
id
142-
}
143-
}
144-
}
145-
}
146-
"#
147-
} else {
148-
r#"
149-
query ($indexer: ID!) {
150-
escrowAccounts(where: {receiver_: {id: $indexer}}) {
151-
balance
152-
totalAmountThawing
153-
sender {
154-
id
155-
signers(
156-
where: {isAuthorized: true}
157-
) {
158-
id
159-
}
160-
}
161-
}
162-
}
163-
"#
164-
};
165-
166109
timer(interval).map_with_retry(
167110
move |_| async move {
111+
// thawEndTimestamp == 0 means that the signer is not thawing. This also means
112+
// that we don't wait for the thawing period to end before stopping serving
113+
// queries for this signer.
114+
// isAuthorized == true means that the signer is still authorized to sign
115+
// payments in the name of the sender.
168116
let response = escrow_subgraph
169-
.query::<EscrowAccountsResponse>(Query::new_with_variables(
170-
query,
171-
[("indexer", format!("{:x?}", indexer_address).into())],
172-
))
117+
.query::<EscrowAccountQuery, _>(escrow_account_query::Variables {
118+
indexer: format!("{:x?}", indexer_address),
119+
thaw_end_timestamp: if reject_thawing_signers {
120+
Some(U256::ZERO)
121+
} else {
122+
None
123+
},
124+
})
173125
.await
174126
.map_err(|e| e.to_string())?;
175127

176-
let response = response.map_err(|e| e.to_string())?;
128+
let response = response.unwrap();
177129

178-
let senders_balances = response
130+
let senders_balances: HashMap<Address, U256> = response
179131
.escrow_accounts
180132
.iter()
181133
.map(|account| {
182-
let balance = U256::checked_sub(
183-
U256::from_str(&account.balance)?,
184-
U256::from_str(&account.total_amount_thawing)?,
185-
)
186-
.unwrap_or_else(|| {
187-
warn!(
188-
"Balance minus total amount thawing underflowed for account {}. \
134+
let balance = U256::checked_sub(account.balance, account.total_amount_thawing)
135+
.unwrap_or_else(|| {
136+
warn!(
137+
"Balance minus total amount thawing underflowed for account {}. \
189138
Setting balance to 0, no queries will be served for this sender.",
190-
account.sender.id
191-
);
192-
U256::from(0)
193-
});
139+
account.sender.id
140+
);
141+
U256::from(0)
142+
});
194143

195-
Ok((account.sender.id, balance))
144+
Ok((Address::from_str(&account.sender.id).unwrap(), balance))
196145
})
197146
.collect::<Result<HashMap<_, _>, anyhow::Error>>()
198147
.map_err(|e| format!("{}", e))?;
@@ -201,12 +150,14 @@ pub fn escrow_accounts(
201150
.escrow_accounts
202151
.iter()
203152
.map(|account| {
204-
let sender = account.sender.id;
153+
let sender = Address::from_str(&account.sender.id).unwrap();
205154
let signers = account
206155
.sender
207156
.signers
157+
.as_ref()
158+
.unwrap()
208159
.iter()
209-
.map(|signer| signer.id)
160+
.map(|signer| Address::from_str(&signer.id).unwrap())
210161
.collect();
211162
(sender, signers)
212163
})

0 commit comments

Comments
 (0)