Skip to content

Commit 22bca4e

Browse files
committed
graph, store: Change how the time of the last rollup is determined
When graph-node is restarted, we need to determine for subgraphs with aggregations when the last rollup was triggered to ensure aggregations get filled without gaps or duplication. The code used the `block_time` column in the PoI table for that but that is not correct as the PoI table only records blocks and times for which the subgraph actually has writes. When the subgraph scans through a largish number of blocks without changes, we only update the head pointer but also do rollups as aggregation intervals pass. Because of that, we might perform a rollup without a corresponding entry in the PoI table. With this change, we actually find the maximum timestamp from all aggregation tables to tell us when the last rollup was triggered as that data reflects when rollups happened accurately. For safety, this new behavior can be turned off by setting `GRAPH_STORE_LAST_ROLLUP_FROM_POI=true` to return to the old buggy behavior in case the new behavior causes some other unexpected problems. Fixes #5530
1 parent 22f805d commit 22bca4e

File tree

4 files changed

+77
-2
lines changed

4 files changed

+77
-2
lines changed

graph/src/env/store.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,14 @@ pub struct EnvVarsStore {
120120
pub use_brin_for_all_query_types: bool,
121121
/// Temporary env var to disable certain lookups in the chain store
122122
pub disable_block_cache_for_lookup: bool,
123+
/// Temporary env var to fall back to the old broken way of determining
124+
/// the time of the last rollup from the POI table instead of the new
125+
/// way that fixes
126+
/// https://github.com/graphprotocol/graph-node/issues/5530 Remove this
127+
/// and all code that is dead as a consequence once this has been vetted
128+
/// sufficiently, probably after 2024-12-01
129+
/// Defaults to `false`, i.e. using the new fixed behavior
130+
pub last_rollup_from_poi: bool,
123131
}
124132

125133
// This does not print any values avoid accidentally leaking any sensitive env vars
@@ -168,6 +176,7 @@ impl From<InnerStore> for EnvVarsStore {
168176
create_gin_indexes: x.create_gin_indexes,
169177
use_brin_for_all_query_types: x.use_brin_for_all_query_types,
170178
disable_block_cache_for_lookup: x.disable_block_cache_for_lookup,
179+
last_rollup_from_poi: x.last_rollup_from_poi,
171180
}
172181
}
173182
}
@@ -229,6 +238,8 @@ pub struct InnerStore {
229238
use_brin_for_all_query_types: bool,
230239
#[envconfig(from = "GRAPH_STORE_DISABLE_BLOCK_CACHE_FOR_LOOKUP", default = "false")]
231240
disable_block_cache_for_lookup: bool,
241+
#[envconfig(from = "GRAPH_STORE_LAST_ROLLUP_FROM_POI", default = "false")]
242+
last_rollup_from_poi: bool,
232243
}
233244

234245
#[derive(Clone, Copy, Debug)]

store/postgres/src/deployment_store.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,11 @@ impl DeploymentStore {
910910

911911
let mut conn = self.get_conn()?;
912912
let layout = store.layout(&mut conn, site.cheap_clone())?;
913-
layout.block_time(&mut conn, block)
913+
if ENV_VARS.store.last_rollup_from_poi {
914+
layout.block_time(&mut conn, block)
915+
} else {
916+
layout.last_rollup(&mut conn)
917+
}
914918
}
915919

916920
pub(crate) async fn supports_proof_of_indexing<'a>(

store/postgres/src/relational.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,6 +1039,20 @@ impl Layout {
10391039
Ok(block_time)
10401040
}
10411041

1042+
/// Find the time of the last rollup for the subgraph. We do this by
1043+
/// looking for the maximum timestamp in any aggregation table and
1044+
/// adding a little bit more than the corresponding interval to it. This
1045+
/// method crucially depends on the fact that we always write the rollup
1046+
/// for all aggregations, meaning that if some aggregations do not have
1047+
/// an entry with the maximum timestamp that there was just no data for
1048+
/// that interval, but we did try to aggregate at that time.
1049+
pub(crate) fn last_rollup(
1050+
&self,
1051+
conn: &mut PgConnection,
1052+
) -> Result<Option<BlockTime>, StoreError> {
1053+
Rollup::last_rollup(&self.rollups, conn)
1054+
}
1055+
10421056
/// Construct `Rolllup` for each of the aggregation mappings
10431057
/// `schema.agg_mappings()` and return them in the same order as the
10441058
/// aggregation mappings

store/postgres/src/relational/rollup.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ use std::sync::Arc;
6060

6161
use diesel::{sql_query, PgConnection, RunQueryDsl as _};
6262

63-
use diesel::sql_types::{Integer, Timestamptz};
63+
use diesel::sql_types::{Integer, Nullable, Timestamptz};
6464
use graph::blockchain::BlockTime;
6565
use graph::components::store::{BlockNumber, StoreError};
6666
use graph::constraint_violation;
@@ -70,6 +70,7 @@ use graph::schema::{
7070
};
7171
use graph::sqlparser::ast as p;
7272
use graph::sqlparser::parser::ParserError;
73+
use itertools::Itertools;
7374

7475
use crate::relational::Table;
7576

@@ -229,6 +230,10 @@ pub(crate) struct Rollup {
229230
#[allow(dead_code)]
230231
agg_table: Arc<Table>,
231232
insert_sql: String,
233+
/// A query that determines the last time a rollup was done. The query
234+
/// finds the latest timestamp in the aggregation table and adds the
235+
/// length of the aggregation interval to deduce the last rollup time
236+
last_rollup_sql: String,
232237
}
233238

234239
impl Rollup {
@@ -256,10 +261,12 @@ impl Rollup {
256261
);
257262
let mut insert_sql = String::new();
258263
sql.insert(&mut insert_sql)?;
264+
let last_rollup_sql = sql.last_rollup();
259265
Ok(Self {
260266
interval,
261267
agg_table,
262268
insert_sql,
269+
last_rollup_sql,
263270
})
264271
}
265272

@@ -275,6 +282,32 @@ impl Rollup {
275282
.bind::<Integer, _>(block);
276283
query.execute(conn)
277284
}
285+
286+
pub(crate) fn last_rollup(
287+
rollups: &[Rollup],
288+
conn: &mut PgConnection,
289+
) -> Result<Option<BlockTime>, StoreError> {
290+
#[derive(QueryableByName)]
291+
#[diesel(check_for_backend(diesel::pg::Pg))]
292+
struct BlockTimeRes {
293+
#[diesel(sql_type = Nullable<Timestamptz>)]
294+
last_rollup: Option<BlockTime>,
295+
}
296+
297+
if rollups.is_empty() {
298+
return Ok(None);
299+
}
300+
301+
let union_all = rollups
302+
.iter()
303+
.map(|rollup| &rollup.last_rollup_sql)
304+
.join(" union all ");
305+
let query = format!("select max(last_rollup) as last_rollup from ({union_all}) as a");
306+
let last_rollup = sql_query(&query)
307+
.get_result::<BlockTimeRes>(conn)
308+
.map(|res| res.last_rollup)?;
309+
Ok(last_rollup)
310+
}
278311
}
279312

280313
struct RollupSql<'a> {
@@ -479,6 +512,19 @@ impl<'a> RollupSql<'a> {
479512
self.insert_bucket(w)
480513
}
481514
}
515+
516+
/// Generate a query that selects the timestamp of the last rollup
517+
fn last_rollup(&self) -> String {
518+
// The timestamp column contains the timestamp of the start of the
519+
// last bucket. The last rollup was therefore at least
520+
// `self.interval` after that. We add 1 second to make sure we are
521+
// well within the next bucket
522+
let secs = self.interval.as_duration().as_secs() + 1;
523+
format!(
524+
"select max(timestamp) + '{} s'::interval as last_rollup from {}",
525+
secs, self.agg_table.qualified_name
526+
)
527+
}
482528
}
483529

484530
/// Write the elements in `list` separated by commas into `w`. The list

0 commit comments

Comments
 (0)