diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index 931e52e2dd5..89c3e12039d 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -435,3 +435,9 @@ impl ToSql for BlockTime { >::to_sql(&self.0, out) } } + +impl FromSql for BlockTime { + fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result { + >::from_sql(bytes).map(|ts| Self(ts)) + } +} diff --git a/graph/src/data/store/scalar/timestamp.rs b/graph/src/data/store/scalar/timestamp.rs index 0bbf72e36e5..02769d4adf8 100644 --- a/graph/src/data/store/scalar/timestamp.rs +++ b/graph/src/data/store/scalar/timestamp.rs @@ -2,6 +2,7 @@ use chrono::{DateTime, Utc}; use diesel::deserialize::FromSql; use diesel::pg::PgValue; use diesel::serialize::ToSql; +use diesel::sql_types::Timestamptz; use serde::{self, Deserialize, Serialize}; use stable_hash::StableHash; @@ -95,12 +96,12 @@ impl Display for Timestamp { } } -impl ToSql for Timestamp { +impl ToSql for Timestamp { fn to_sql<'b>( &'b self, out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>, ) -> diesel::serialize::Result { - <_ as ToSql>::to_sql(&self.0, &mut out.reborrow()) + <_ as ToSql>::to_sql(&self.0, &mut out.reborrow()) } } diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 6250d6aa14d..ce574c94253 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -120,6 +120,14 @@ pub struct EnvVarsStore { pub use_brin_for_all_query_types: bool, /// Temporary env var to disable certain lookups in the chain store pub disable_block_cache_for_lookup: bool, + /// Temporary env var to fall back to the old broken way of determining + /// the time of the last rollup from the POI table instead of the new + /// way that fixes + /// https://github.com/graphprotocol/graph-node/issues/5530 Remove this + /// and all code that is dead as a consequence once this has been vetted + /// sufficiently, probably after 2024-12-01 + /// Defaults to `false`, i.e. using the new fixed behavior + pub last_rollup_from_poi: bool, } // This does not print any values avoid accidentally leaking any sensitive env vars @@ -168,6 +176,7 @@ impl From for EnvVarsStore { create_gin_indexes: x.create_gin_indexes, use_brin_for_all_query_types: x.use_brin_for_all_query_types, disable_block_cache_for_lookup: x.disable_block_cache_for_lookup, + last_rollup_from_poi: x.last_rollup_from_poi, } } } @@ -229,6 +238,8 @@ pub struct InnerStore { use_brin_for_all_query_types: bool, #[envconfig(from = "GRAPH_STORE_DISABLE_BLOCK_CACHE_FOR_LOOKUP", default = "false")] disable_block_cache_for_lookup: bool, + #[envconfig(from = "GRAPH_STORE_LAST_ROLLUP_FROM_POI", default = "false")] + last_rollup_from_poi: bool, } #[derive(Clone, Copy, Debug)] diff --git a/server/graphman/tests/deployment_query.rs b/server/graphman/tests/deployment_query.rs index f39a1e0cd9a..87ac04c3ca3 100644 --- a/server/graphman/tests/deployment_query.rs +++ b/server/graphman/tests/deployment_query.rs @@ -1,10 +1,14 @@ pub mod util; +use graph::components::store::{QueryStoreManager, SubgraphStore}; use graph::data::subgraph::DeploymentHash; +use graph::prelude::QueryTarget; + use serde_json::json; use test_store::store::create_test_subgraph; use test_store::store::NETWORK_NAME; -use test_store::store::NODE_ID; +use test_store::STORE; +use test_store::SUBGRAPH_STORE; use self::util::client::send_graphql_request; use self::util::run_test; @@ -54,6 +58,15 @@ fn graphql_returns_deployment_info() { .await; let namespace = format!("sgd{}", locator.id); + let node = SUBGRAPH_STORE.assigned_node(&locator).unwrap().unwrap(); + let qs = STORE + .query_store( + QueryTarget::Deployment(locator.hash.clone(), Default::default()), + false, + ) + .await + .expect("could get a query store"); + let shard = qs.shard(); let expected_resp = json!({ "data": { @@ -63,8 +76,8 @@ fn graphql_returns_deployment_info() { "hash": "subgraph_1", "namespace": namespace, "name": "subgraph_1", - "nodeId": NODE_ID.to_string(), - "shard": "primary", + "nodeId": node.to_string(), + "shard": shard, "chain": NETWORK_NAME, "versionStatus": "current", "isActive": true, diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 5d418987e35..def46ce9244 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -910,7 +910,11 @@ impl DeploymentStore { let mut conn = self.get_conn()?; let layout = store.layout(&mut conn, site.cheap_clone())?; - layout.block_time(&mut conn, block) + if ENV_VARS.store.last_rollup_from_poi { + layout.block_time(&mut conn, block) + } else { + layout.last_rollup(&mut conn) + } } pub(crate) async fn supports_proof_of_indexing<'a>( diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 860449bd42a..be9f889c84a 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -1039,6 +1039,20 @@ impl Layout { Ok(block_time) } + /// Find the time of the last rollup for the subgraph. We do this by + /// looking for the maximum timestamp in any aggregation table and + /// adding a little bit more than the corresponding interval to it. This + /// method crucially depends on the fact that we always write the rollup + /// for all aggregations, meaning that if some aggregations do not have + /// an entry with the maximum timestamp that there was just no data for + /// that interval, but we did try to aggregate at that time. + pub(crate) fn last_rollup( + &self, + conn: &mut PgConnection, + ) -> Result, StoreError> { + Rollup::last_rollup(&self.rollups, conn) + } + /// Construct `Rolllup` for each of the aggregation mappings /// `schema.agg_mappings()` and return them in the same order as the /// aggregation mappings diff --git a/store/postgres/src/relational/rollup.rs b/store/postgres/src/relational/rollup.rs index 7a55bf20a75..b9177a0052b 100644 --- a/store/postgres/src/relational/rollup.rs +++ b/store/postgres/src/relational/rollup.rs @@ -60,7 +60,7 @@ use std::sync::Arc; use diesel::{sql_query, PgConnection, RunQueryDsl as _}; -use diesel::sql_types::{Integer, Timestamptz}; +use diesel::sql_types::{Integer, Nullable, Timestamptz}; use graph::blockchain::BlockTime; use graph::components::store::{BlockNumber, StoreError}; use graph::constraint_violation; @@ -70,6 +70,7 @@ use graph::schema::{ }; use graph::sqlparser::ast as p; use graph::sqlparser::parser::ParserError; +use itertools::Itertools; use crate::relational::Table; @@ -229,6 +230,10 @@ pub(crate) struct Rollup { #[allow(dead_code)] agg_table: Arc, insert_sql: String, + /// A query that determines the last time a rollup was done. The query + /// finds the latest timestamp in the aggregation table and adds the + /// length of the aggregation interval to deduce the last rollup time + last_rollup_sql: String, } impl Rollup { @@ -256,10 +261,12 @@ impl Rollup { ); let mut insert_sql = String::new(); sql.insert(&mut insert_sql)?; + let last_rollup_sql = sql.last_rollup(); Ok(Self { interval, agg_table, insert_sql, + last_rollup_sql, }) } @@ -275,6 +282,32 @@ impl Rollup { .bind::(block); query.execute(conn) } + + pub(crate) fn last_rollup( + rollups: &[Rollup], + conn: &mut PgConnection, + ) -> Result, StoreError> { + #[derive(QueryableByName)] + #[diesel(check_for_backend(diesel::pg::Pg))] + struct BlockTimeRes { + #[diesel(sql_type = Nullable)] + last_rollup: Option, + } + + if rollups.is_empty() { + return Ok(None); + } + + let union_all = rollups + .iter() + .map(|rollup| &rollup.last_rollup_sql) + .join(" union all "); + let query = format!("select max(last_rollup) as last_rollup from ({union_all}) as a"); + let last_rollup = sql_query(&query) + .get_result::(conn) + .map(|res| res.last_rollup)?; + Ok(last_rollup) + } } struct RollupSql<'a> { @@ -479,6 +512,19 @@ impl<'a> RollupSql<'a> { self.insert_bucket(w) } } + + /// Generate a query that selects the timestamp of the last rollup + fn last_rollup(&self) -> String { + // The timestamp column contains the timestamp of the start of the + // last bucket. The last rollup was therefore at least + // `self.interval` after that. We add 1 second to make sure we are + // well within the next bucket + let secs = self.interval.as_duration().as_secs() + 1; + format!( + "select max(timestamp) + '{} s'::interval as last_rollup from {}", + secs, self.agg_table.qualified_name + ) + } } /// Write the elements in `list` separated by commas into `w`. The list