From 207e31f71a053209c8e449d7d38675050f0258be Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 15 Oct 2024 14:35:33 -0700 Subject: [PATCH 1/3] server: Make deployment_info test work in sharded setup --- server/graphman/tests/deployment_query.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/server/graphman/tests/deployment_query.rs b/server/graphman/tests/deployment_query.rs index f39a1e0cd9a..87ac04c3ca3 100644 --- a/server/graphman/tests/deployment_query.rs +++ b/server/graphman/tests/deployment_query.rs @@ -1,10 +1,14 @@ pub mod util; +use graph::components::store::{QueryStoreManager, SubgraphStore}; use graph::data::subgraph::DeploymentHash; +use graph::prelude::QueryTarget; + use serde_json::json; use test_store::store::create_test_subgraph; use test_store::store::NETWORK_NAME; -use test_store::store::NODE_ID; +use test_store::STORE; +use test_store::SUBGRAPH_STORE; use self::util::client::send_graphql_request; use self::util::run_test; @@ -54,6 +58,15 @@ fn graphql_returns_deployment_info() { .await; let namespace = format!("sgd{}", locator.id); + let node = SUBGRAPH_STORE.assigned_node(&locator).unwrap().unwrap(); + let qs = STORE + .query_store( + QueryTarget::Deployment(locator.hash.clone(), Default::default()), + false, + ) + .await + .expect("could get a query store"); + let shard = qs.shard(); let expected_resp = json!({ "data": { @@ -63,8 +76,8 @@ fn graphql_returns_deployment_info() { "hash": "subgraph_1", "namespace": namespace, "name": "subgraph_1", - "nodeId": NODE_ID.to_string(), - "shard": "primary", + "nodeId": node.to_string(), + "shard": shard, "chain": NETWORK_NAME, "versionStatus": "current", "isActive": true, From 22f805d25a3e8c3ca16279843aae63b01719b713 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 15 Oct 2024 13:01:05 -0700 Subject: [PATCH 2/3] graph: Implement FromSql for a couple time related types --- graph/src/blockchain/types.rs | 6 ++++++ graph/src/data/store/scalar/timestamp.rs | 5 +++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index 931e52e2dd5..89c3e12039d 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -435,3 +435,9 @@ impl ToSql for BlockTime { >::to_sql(&self.0, out) } } + +impl FromSql for BlockTime { + fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result { + >::from_sql(bytes).map(|ts| Self(ts)) + } +} diff --git a/graph/src/data/store/scalar/timestamp.rs b/graph/src/data/store/scalar/timestamp.rs index 0bbf72e36e5..02769d4adf8 100644 --- a/graph/src/data/store/scalar/timestamp.rs +++ b/graph/src/data/store/scalar/timestamp.rs @@ -2,6 +2,7 @@ use chrono::{DateTime, Utc}; use diesel::deserialize::FromSql; use diesel::pg::PgValue; use diesel::serialize::ToSql; +use diesel::sql_types::Timestamptz; use serde::{self, Deserialize, Serialize}; use stable_hash::StableHash; @@ -95,12 +96,12 @@ impl Display for Timestamp { } } -impl ToSql for Timestamp { +impl ToSql for Timestamp { fn to_sql<'b>( &'b self, out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>, ) -> diesel::serialize::Result { - <_ as ToSql>::to_sql(&self.0, &mut out.reborrow()) + <_ as ToSql>::to_sql(&self.0, &mut out.reborrow()) } } From 22bca4e8fbeddd6e0a9c2b34c74302c07cb1a76d Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 15 Oct 2024 13:03:11 -0700 Subject: [PATCH 3/3] graph, store: Change how the time of the last rollup is determined When graph-node is restarted, we need to determine for subgraphs with aggregations when the last rollup was triggered to ensure aggregations get filled without gaps or duplication. The code used the `block_time` column in the PoI table for that but that is not correct as the PoI table only records blocks and times for which the subgraph actually has writes. When the subgraph scans through a largish number of blocks without changes, we only update the head pointer but also do rollups as aggregation intervals pass. Because of that, we might perform a rollup without a corresponding entry in the PoI table. With this change, we actually find the maximum timestamp from all aggregation tables to tell us when the last rollup was triggered as that data reflects when rollups happened accurately. For safety, this new behavior can be turned off by setting `GRAPH_STORE_LAST_ROLLUP_FROM_POI=true` to return to the old buggy behavior in case the new behavior causes some other unexpected problems. Fixes https://github.com/graphprotocol/graph-node/issues/5530 --- graph/src/env/store.rs | 11 ++++++ store/postgres/src/deployment_store.rs | 6 +++- store/postgres/src/relational.rs | 14 ++++++++ store/postgres/src/relational/rollup.rs | 48 ++++++++++++++++++++++++- 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 6250d6aa14d..ce574c94253 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -120,6 +120,14 @@ pub struct EnvVarsStore { pub use_brin_for_all_query_types: bool, /// Temporary env var to disable certain lookups in the chain store pub disable_block_cache_for_lookup: bool, + /// Temporary env var to fall back to the old broken way of determining + /// the time of the last rollup from the POI table instead of the new + /// way that fixes + /// https://github.com/graphprotocol/graph-node/issues/5530 Remove this + /// and all code that is dead as a consequence once this has been vetted + /// sufficiently, probably after 2024-12-01 + /// Defaults to `false`, i.e. using the new fixed behavior + pub last_rollup_from_poi: bool, } // This does not print any values avoid accidentally leaking any sensitive env vars @@ -168,6 +176,7 @@ impl From for EnvVarsStore { create_gin_indexes: x.create_gin_indexes, use_brin_for_all_query_types: x.use_brin_for_all_query_types, disable_block_cache_for_lookup: x.disable_block_cache_for_lookup, + last_rollup_from_poi: x.last_rollup_from_poi, } } } @@ -229,6 +238,8 @@ pub struct InnerStore { use_brin_for_all_query_types: bool, #[envconfig(from = "GRAPH_STORE_DISABLE_BLOCK_CACHE_FOR_LOOKUP", default = "false")] disable_block_cache_for_lookup: bool, + #[envconfig(from = "GRAPH_STORE_LAST_ROLLUP_FROM_POI", default = "false")] + last_rollup_from_poi: bool, } #[derive(Clone, Copy, Debug)] diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 5d418987e35..def46ce9244 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -910,7 +910,11 @@ impl DeploymentStore { let mut conn = self.get_conn()?; let layout = store.layout(&mut conn, site.cheap_clone())?; - layout.block_time(&mut conn, block) + if ENV_VARS.store.last_rollup_from_poi { + layout.block_time(&mut conn, block) + } else { + layout.last_rollup(&mut conn) + } } pub(crate) async fn supports_proof_of_indexing<'a>( diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 860449bd42a..be9f889c84a 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -1039,6 +1039,20 @@ impl Layout { Ok(block_time) } + /// Find the time of the last rollup for the subgraph. We do this by + /// looking for the maximum timestamp in any aggregation table and + /// adding a little bit more than the corresponding interval to it. This + /// method crucially depends on the fact that we always write the rollup + /// for all aggregations, meaning that if some aggregations do not have + /// an entry with the maximum timestamp that there was just no data for + /// that interval, but we did try to aggregate at that time. + pub(crate) fn last_rollup( + &self, + conn: &mut PgConnection, + ) -> Result, StoreError> { + Rollup::last_rollup(&self.rollups, conn) + } + /// Construct `Rolllup` for each of the aggregation mappings /// `schema.agg_mappings()` and return them in the same order as the /// aggregation mappings diff --git a/store/postgres/src/relational/rollup.rs b/store/postgres/src/relational/rollup.rs index 7a55bf20a75..b9177a0052b 100644 --- a/store/postgres/src/relational/rollup.rs +++ b/store/postgres/src/relational/rollup.rs @@ -60,7 +60,7 @@ use std::sync::Arc; use diesel::{sql_query, PgConnection, RunQueryDsl as _}; -use diesel::sql_types::{Integer, Timestamptz}; +use diesel::sql_types::{Integer, Nullable, Timestamptz}; use graph::blockchain::BlockTime; use graph::components::store::{BlockNumber, StoreError}; use graph::constraint_violation; @@ -70,6 +70,7 @@ use graph::schema::{ }; use graph::sqlparser::ast as p; use graph::sqlparser::parser::ParserError; +use itertools::Itertools; use crate::relational::Table; @@ -229,6 +230,10 @@ pub(crate) struct Rollup { #[allow(dead_code)] agg_table: Arc, insert_sql: String, + /// A query that determines the last time a rollup was done. The query + /// finds the latest timestamp in the aggregation table and adds the + /// length of the aggregation interval to deduce the last rollup time + last_rollup_sql: String, } impl Rollup { @@ -256,10 +261,12 @@ impl Rollup { ); let mut insert_sql = String::new(); sql.insert(&mut insert_sql)?; + let last_rollup_sql = sql.last_rollup(); Ok(Self { interval, agg_table, insert_sql, + last_rollup_sql, }) } @@ -275,6 +282,32 @@ impl Rollup { .bind::(block); query.execute(conn) } + + pub(crate) fn last_rollup( + rollups: &[Rollup], + conn: &mut PgConnection, + ) -> Result, StoreError> { + #[derive(QueryableByName)] + #[diesel(check_for_backend(diesel::pg::Pg))] + struct BlockTimeRes { + #[diesel(sql_type = Nullable)] + last_rollup: Option, + } + + if rollups.is_empty() { + return Ok(None); + } + + let union_all = rollups + .iter() + .map(|rollup| &rollup.last_rollup_sql) + .join(" union all "); + let query = format!("select max(last_rollup) as last_rollup from ({union_all}) as a"); + let last_rollup = sql_query(&query) + .get_result::(conn) + .map(|res| res.last_rollup)?; + Ok(last_rollup) + } } struct RollupSql<'a> { @@ -479,6 +512,19 @@ impl<'a> RollupSql<'a> { self.insert_bucket(w) } } + + /// Generate a query that selects the timestamp of the last rollup + fn last_rollup(&self) -> String { + // The timestamp column contains the timestamp of the start of the + // last bucket. The last rollup was therefore at least + // `self.interval` after that. We add 1 second to make sure we are + // well within the next bucket + let secs = self.interval.as_duration().as_secs() + 1; + format!( + "select max(timestamp) + '{} s'::interval as last_rollup from {}", + secs, self.agg_table.qualified_name + ) + } } /// Write the elements in `list` separated by commas into `w`. The list