Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 199 additions & 26 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ thegraph-core = { git = "https://github.com/edgeandnode/toolshed", rev = "85ee00
"subgraph-client",
] }
thegraph-graphql-http = "0.2.0"
graphql_client = { version = "0.14.0", features = ["reqwest-rustls"] }
1 change: 1 addition & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ axum.workspace = true
lazy_static.workspace = true
thegraph-graphql-http.workspace = true
build-info.workspace = true
graphql_client.workspace = true

serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
Expand Down
125 changes: 82 additions & 43 deletions common/src/allocations/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,58 @@

use std::{
collections::HashMap,
str::FromStr,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use super::Allocation;
use crate::prelude::SubgraphClient;
use alloy::primitives::{TxHash, B256, U256};
use eventuals::{timer, Eventual, EventualExt};
use thegraph_core::Address;
use graphql_client::GraphQLQuery;
use thegraph_core::{Address, DeploymentId};
use tokio::time::sleep;
use tracing::warn;

type BigInt = U256;
type Bytes = B256;

#[derive(GraphQLQuery)]
#[graphql(
schema_path = "../graphql/network.schema.graphql",
query_path = "../graphql/allocations.query.graphql",
response_derives = "Debug",
variables_derives = "Clone"
)]
pub struct AllocationsQuery;

impl TryFrom<allocations_query::AllocationFragment> for Allocation {
type Error = anyhow::Error;

fn try_from(
value: allocations_query::AllocationsQueryAllocations,
) -> Result<Self, Self::Error> {
Ok(Self {
id: Address::from_str(&value.id)?,
status: super::AllocationStatus::Null,
subgraph_deployment: super::SubgraphDeployment {
id: DeploymentId::from_str(&value.subgraph_deployment.id)?,
denied_at: Some(value.subgraph_deployment.denied_at as u64),
},
indexer: Address::from_str(&value.indexer.id)?,
allocated_tokens: value.allocated_tokens,
created_at_epoch: value.created_at_epoch as u64,
created_at_block_hash: value.created_at_block_hash.to_string(),
closed_at_epoch: value.closed_at_epoch.map(|v| v as u64),
closed_at_epoch_start_block_hash: None,
previous_epoch_start_block_hash: None,
poi: None,
query_fee_rebates: None,
query_fees_collected: None,
})
}
}

/// An always up-to-date list of an indexer's active and recently closed allocations.
pub fn indexer_allocations(
network_subgraph: &'static SubgraphClient,
Expand Down Expand Up @@ -56,49 +98,46 @@ pub async fn get_allocations(
.expect("Time went backwards");
let closed_at_threshold = since_the_epoch - recently_closed_allocation_buffer;

let query = format!(
r#"
allocations(
block: $block
orderBy: id
orderDirection: asc
first: $first
where: {{
and: [
{{ id_gt: $last }}
{{ indexer_: {{ id: "{}" }} }}
{{
or: [
{{ status: Active }}
{{ and: [{{ status: Closed, closedAt_gte: {} }}] }}
]
}}
]
}}
) {{
id
indexer {{
id
}}
allocatedTokens
createdAtBlockHash
createdAtEpoch
closedAtEpoch
subgraphDeployment {{
id
deniedAt
}}
}}
"#,
indexer_address.to_string().to_ascii_lowercase(),
closed_at_threshold.as_secs(),
);
let responses = network_subgraph
.paginated_query::<Allocation>(query, 200)
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
let mut hash: Option<TxHash> = None;
let mut last: Option<String> = None;
let mut responses = vec![];
let page_size = 200;
loop {
let result = network_subgraph
.query::<AllocationsQuery, _>(allocations_query::Variables {
indexer: indexer_address.to_string().to_ascii_lowercase(),
closed_at_threshold: closed_at_threshold.as_secs() as i64,
first: page_size,
last: last.unwrap_or_default(),
block: hash.map(|hash| allocations_query::Block_height {
hash: Some(hash),
number: None,
number_gte: None,
}),
})
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))?;

let mut data = result?;
let page_len = data.allocations.len();

hash = data.meta.and_then(|meta| meta.block.hash);
last = data.allocations.last().map(|entry| entry.id.to_string());

responses.append(&mut data.allocations);
if (page_len as i64) < page_size {
break;
}
}
let responses = responses
.into_iter()
.map(|allocation| allocation.try_into())
.collect::<Result<Vec<Allocation>, _>>()?;

Ok(HashMap::from_iter(responses.into_iter().map(|a| (a.id, a))))
Ok(responses
.into_iter()
.map(|allocation| (allocation.id, allocation))
.collect())
}

#[cfg(test)]
Expand Down
39 changes: 15 additions & 24 deletions common/src/attestations/dispute_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,32 @@
use std::time::Duration;

use eventuals::{timer, Eventual, EventualExt};
use serde::Deserialize;
use graphql_client::GraphQLQuery;
use thegraph_core::Address;
use tokio::time::sleep;
use tracing::warn;

use crate::subgraph_client::{Query, SubgraphClient};
use crate::subgraph_client::SubgraphClient;

type Bytes = Address;

#[derive(GraphQLQuery)]
#[graphql(
schema_path = "../graphql/network.schema.graphql",
query_path = "../graphql/dispute.query.graphql",
response_derives = "Debug",
variables_derives = "Clone"
)]
struct DisputeManager;

pub fn dispute_manager(
network_subgraph: &'static SubgraphClient,
interval: Duration,
) -> Eventual<Address> {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct DisputeManagerResponse {
graph_network: Option<GraphNetwork>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GraphNetwork {
dispute_manager: Address,
}

timer(interval).map_with_retry(
move |_| async move {
let response = network_subgraph
.query::<DisputeManagerResponse>(Query::new(
r#"
query network {
graphNetwork(id: 1) {
disputeManager
}
}
"#,
))
.query::<DisputeManager, _>(dispute_manager::Variables {})
.await
.map_err(|e| e.to_string())?;

Expand All @@ -49,7 +40,7 @@ pub fn dispute_manager(
})
},
move |err: String| {
warn!("Failed to query dispute manager for network: {}", err,);
warn!("Failed to query dispute manager: {}", err);

// Sleep for a bit before we retry
sleep(interval.div_f32(2.0))
Expand Down
Loading
Loading