Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions graph/src/blockchain/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,9 @@ impl ToSql<Timestamptz, Pg> for BlockTime {
<Timestamp as ToSql<Timestamptz, Pg>>::to_sql(&self.0, out)
}
}

impl FromSql<Timestamptz, Pg> for BlockTime {
fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result<Self> {
<Timestamp as FromSql<Timestamptz, Pg>>::from_sql(bytes).map(|ts| Self(ts))
}
}
5 changes: 3 additions & 2 deletions graph/src/data/store/scalar/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use chrono::{DateTime, Utc};
use diesel::deserialize::FromSql;
use diesel::pg::PgValue;
use diesel::serialize::ToSql;
use diesel::sql_types::Timestamptz;
use serde::{self, Deserialize, Serialize};
use stable_hash::StableHash;

Expand Down Expand Up @@ -95,12 +96,12 @@ impl Display for Timestamp {
}
}

impl ToSql<diesel::sql_types::Timestamptz, diesel::pg::Pg> for Timestamp {
impl ToSql<Timestamptz, diesel::pg::Pg> for Timestamp {
fn to_sql<'b>(
&'b self,
out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>,
) -> diesel::serialize::Result {
<_ as ToSql<diesel::sql_types::Timestamptz, _>>::to_sql(&self.0, &mut out.reborrow())
<_ as ToSql<Timestamptz, _>>::to_sql(&self.0, &mut out.reborrow())
}
}

Expand Down
11 changes: 11 additions & 0 deletions graph/src/env/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ pub struct EnvVarsStore {
pub use_brin_for_all_query_types: bool,
/// Temporary env var to disable certain lookups in the chain store
pub disable_block_cache_for_lookup: bool,
/// Temporary env var to fall back to the old broken way of determining
/// the time of the last rollup from the POI table instead of the new
/// way that fixes
/// https://github.com/graphprotocol/graph-node/issues/5530 Remove this
/// and all code that is dead as a consequence once this has been vetted
/// sufficiently, probably after 2024-12-01
/// Defaults to `false`, i.e. using the new fixed behavior
pub last_rollup_from_poi: bool,
}

// This does not print any values avoid accidentally leaking any sensitive env vars
Expand Down Expand Up @@ -168,6 +176,7 @@ impl From<InnerStore> for EnvVarsStore {
create_gin_indexes: x.create_gin_indexes,
use_brin_for_all_query_types: x.use_brin_for_all_query_types,
disable_block_cache_for_lookup: x.disable_block_cache_for_lookup,
last_rollup_from_poi: x.last_rollup_from_poi,
}
}
}
Expand Down Expand Up @@ -229,6 +238,8 @@ pub struct InnerStore {
use_brin_for_all_query_types: bool,
#[envconfig(from = "GRAPH_STORE_DISABLE_BLOCK_CACHE_FOR_LOOKUP", default = "false")]
disable_block_cache_for_lookup: bool,
#[envconfig(from = "GRAPH_STORE_LAST_ROLLUP_FROM_POI", default = "false")]
last_rollup_from_poi: bool,
}

#[derive(Clone, Copy, Debug)]
Expand Down
19 changes: 16 additions & 3 deletions server/graphman/tests/deployment_query.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
pub mod util;

use graph::components::store::{QueryStoreManager, SubgraphStore};
use graph::data::subgraph::DeploymentHash;
use graph::prelude::QueryTarget;

use serde_json::json;
use test_store::store::create_test_subgraph;
use test_store::store::NETWORK_NAME;
use test_store::store::NODE_ID;
use test_store::STORE;
use test_store::SUBGRAPH_STORE;

use self::util::client::send_graphql_request;
use self::util::run_test;
Expand Down Expand Up @@ -54,6 +58,15 @@ fn graphql_returns_deployment_info() {
.await;

let namespace = format!("sgd{}", locator.id);
let node = SUBGRAPH_STORE.assigned_node(&locator).unwrap().unwrap();
let qs = STORE
.query_store(
QueryTarget::Deployment(locator.hash.clone(), Default::default()),
false,
)
.await
.expect("could get a query store");
let shard = qs.shard();

let expected_resp = json!({
"data": {
Expand All @@ -63,8 +76,8 @@ fn graphql_returns_deployment_info() {
"hash": "subgraph_1",
"namespace": namespace,
"name": "subgraph_1",
"nodeId": NODE_ID.to_string(),
"shard": "primary",
"nodeId": node.to_string(),
"shard": shard,
"chain": NETWORK_NAME,
"versionStatus": "current",
"isActive": true,
Expand Down
6 changes: 5 additions & 1 deletion store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,11 @@ impl DeploymentStore {

let mut conn = self.get_conn()?;
let layout = store.layout(&mut conn, site.cheap_clone())?;
layout.block_time(&mut conn, block)
if ENV_VARS.store.last_rollup_from_poi {
layout.block_time(&mut conn, block)
} else {
layout.last_rollup(&mut conn)
}
}

pub(crate) async fn supports_proof_of_indexing<'a>(
Expand Down
14 changes: 14 additions & 0 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,20 @@ impl Layout {
Ok(block_time)
}

/// Find the time of the last rollup for the subgraph. We do this by
/// looking for the maximum timestamp in any aggregation table and
/// adding a little bit more than the corresponding interval to it. This
/// method crucially depends on the fact that we always write the rollup
/// for all aggregations, meaning that if some aggregations do not have
/// an entry with the maximum timestamp that there was just no data for
/// that interval, but we did try to aggregate at that time.
pub(crate) fn last_rollup(
&self,
conn: &mut PgConnection,
) -> Result<Option<BlockTime>, StoreError> {
Rollup::last_rollup(&self.rollups, conn)
}

/// Construct `Rolllup` for each of the aggregation mappings
/// `schema.agg_mappings()` and return them in the same order as the
/// aggregation mappings
Expand Down
48 changes: 47 additions & 1 deletion store/postgres/src/relational/rollup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use std::sync::Arc;

use diesel::{sql_query, PgConnection, RunQueryDsl as _};

use diesel::sql_types::{Integer, Timestamptz};
use diesel::sql_types::{Integer, Nullable, Timestamptz};
use graph::blockchain::BlockTime;
use graph::components::store::{BlockNumber, StoreError};
use graph::constraint_violation;
Expand All @@ -70,6 +70,7 @@ use graph::schema::{
};
use graph::sqlparser::ast as p;
use graph::sqlparser::parser::ParserError;
use itertools::Itertools;

use crate::relational::Table;

Expand Down Expand Up @@ -229,6 +230,10 @@ pub(crate) struct Rollup {
#[allow(dead_code)]
agg_table: Arc<Table>,
insert_sql: String,
/// A query that determines the last time a rollup was done. The query
/// finds the latest timestamp in the aggregation table and adds the
/// length of the aggregation interval to deduce the last rollup time
last_rollup_sql: String,
}

impl Rollup {
Expand Down Expand Up @@ -256,10 +261,12 @@ impl Rollup {
);
let mut insert_sql = String::new();
sql.insert(&mut insert_sql)?;
let last_rollup_sql = sql.last_rollup();
Ok(Self {
interval,
agg_table,
insert_sql,
last_rollup_sql,
})
}

Expand All @@ -275,6 +282,32 @@ impl Rollup {
.bind::<Integer, _>(block);
query.execute(conn)
}

pub(crate) fn last_rollup(
rollups: &[Rollup],
conn: &mut PgConnection,
) -> Result<Option<BlockTime>, StoreError> {
#[derive(QueryableByName)]
#[diesel(check_for_backend(diesel::pg::Pg))]
struct BlockTimeRes {
#[diesel(sql_type = Nullable<Timestamptz>)]
last_rollup: Option<BlockTime>,
}

if rollups.is_empty() {
return Ok(None);
}

let union_all = rollups
.iter()
.map(|rollup| &rollup.last_rollup_sql)
.join(" union all ");
let query = format!("select max(last_rollup) as last_rollup from ({union_all}) as a");
let last_rollup = sql_query(&query)
.get_result::<BlockTimeRes>(conn)
.map(|res| res.last_rollup)?;
Ok(last_rollup)
}
}

struct RollupSql<'a> {
Expand Down Expand Up @@ -479,6 +512,19 @@ impl<'a> RollupSql<'a> {
self.insert_bucket(w)
}
}

/// Generate a query that selects the timestamp of the last rollup
fn last_rollup(&self) -> String {
// The timestamp column contains the timestamp of the start of the
// last bucket. The last rollup was therefore at least
// `self.interval` after that. We add 1 second to make sure we are
// well within the next bucket
let secs = self.interval.as_duration().as_secs() + 1;
format!(
"select max(timestamp) + '{} s'::interval as last_rollup from {}",
secs, self.agg_table.qualified_name
)
}
}

/// Write the elements in `list` separated by commas into `w`. The list
Expand Down
Loading