Skip to content

Commit 62bba93

Browse files
committed
Reapply "Do not repeat a rollup after restart in some corner cases (#5675)"
This reverts commit 1f2732c, and adds a missing `FromSql` implementation.
1 parent aed1a2a commit 62bba93

File tree

7 files changed

+111
-7
lines changed

7 files changed

+111
-7
lines changed

graph/src/blockchain/types.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,3 +435,9 @@ impl ToSql<Timestamptz, Pg> for BlockTime {
435435
<Timestamp as ToSql<Timestamptz, Pg>>::to_sql(&self.0, out)
436436
}
437437
}
438+
439+
impl FromSql<Timestamptz, Pg> for BlockTime {
440+
fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result<Self> {
441+
<Timestamp as FromSql<Timestamptz, Pg>>::from_sql(bytes).map(|ts| Self(ts))
442+
}
443+
}

graph/src/data/store/scalar/timestamp.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use chrono::{DateTime, Utc};
2+
use diesel::deserialize::FromSql;
3+
use diesel::pg::PgValue;
24
use diesel::serialize::ToSql;
5+
use diesel::sql_types::Timestamptz;
36
use serde::{self, Deserialize, Serialize};
47
use stable_hash::StableHash;
58

@@ -93,12 +96,12 @@ impl Display for Timestamp {
9396
}
9497
}
9598

96-
impl ToSql<diesel::sql_types::Timestamptz, diesel::pg::Pg> for Timestamp {
99+
impl ToSql<Timestamptz, diesel::pg::Pg> for Timestamp {
97100
fn to_sql<'b>(
98101
&'b self,
99102
out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>,
100103
) -> diesel::serialize::Result {
101-
<_ as ToSql<diesel::sql_types::Timestamptz, _>>::to_sql(&self.0, &mut out.reborrow())
104+
<_ as ToSql<Timestamptz, _>>::to_sql(&self.0, &mut out.reborrow())
102105
}
103106
}
104107

@@ -107,3 +110,10 @@ impl GasSizeOf for Timestamp {
107110
Some(Gas::new(std::mem::size_of::<Timestamp>().saturating_into()))
108111
}
109112
}
113+
114+
impl FromSql<diesel::sql_types::Timestamptz, diesel::pg::Pg> for Timestamp {
115+
fn from_sql(value: PgValue) -> diesel::deserialize::Result<Self> {
116+
<DateTime<Utc> as FromSql<diesel::sql_types::Timestamptz, _>>::from_sql(value)
117+
.map(Timestamp)
118+
}
119+
}

graph/src/env/store.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,14 @@ pub struct EnvVarsStore {
120120
pub use_brin_for_all_query_types: bool,
121121
/// Temporary env var to disable certain lookups in the chain store
122122
pub disable_block_cache_for_lookup: bool,
123+
/// Temporary env var to fall back to the old broken way of determining
124+
/// the time of the last rollup from the POI table instead of the new
125+
/// way that fixes
126+
/// https://github.com/graphprotocol/graph-node/issues/5530 Remove this
127+
/// and all code that is dead as a consequence once this has been vetted
128+
/// sufficiently, probably after 2024-12-01
129+
/// Defaults to `false`, i.e. using the new fixed behavior
130+
pub last_rollup_from_poi: bool,
123131
}
124132

125133
// This does not print any values avoid accidentally leaking any sensitive env vars
@@ -168,6 +176,7 @@ impl From<InnerStore> for EnvVarsStore {
168176
create_gin_indexes: x.create_gin_indexes,
169177
use_brin_for_all_query_types: x.use_brin_for_all_query_types,
170178
disable_block_cache_for_lookup: x.disable_block_cache_for_lookup,
179+
last_rollup_from_poi: x.last_rollup_from_poi,
171180
}
172181
}
173182
}
@@ -229,6 +238,8 @@ pub struct InnerStore {
229238
use_brin_for_all_query_types: bool,
230239
#[envconfig(from = "GRAPH_STORE_DISABLE_BLOCK_CACHE_FOR_LOOKUP", default = "false")]
231240
disable_block_cache_for_lookup: bool,
241+
#[envconfig(from = "GRAPH_STORE_LAST_ROLLUP_FROM_POI", default = "false")]
242+
last_rollup_from_poi: bool,
232243
}
233244

234245
#[derive(Clone, Copy, Debug)]

server/graphman/tests/deployment_query.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
pub mod util;
22

3+
use graph::components::store::{QueryStoreManager, SubgraphStore};
34
use graph::data::subgraph::DeploymentHash;
5+
use graph::prelude::QueryTarget;
6+
47
use serde_json::json;
58
use test_store::store::create_test_subgraph;
69
use test_store::store::NETWORK_NAME;
7-
use test_store::store::NODE_ID;
10+
use test_store::STORE;
11+
use test_store::SUBGRAPH_STORE;
812

913
use self::util::client::send_graphql_request;
1014
use self::util::run_test;
@@ -54,6 +58,15 @@ fn graphql_returns_deployment_info() {
5458
.await;
5559

5660
let namespace = format!("sgd{}", locator.id);
61+
let node = SUBGRAPH_STORE.assigned_node(&locator).unwrap().unwrap();
62+
let qs = STORE
63+
.query_store(
64+
QueryTarget::Deployment(locator.hash.clone(), Default::default()),
65+
false,
66+
)
67+
.await
68+
.expect("could get a query store");
69+
let shard = qs.shard();
5770

5871
let expected_resp = json!({
5972
"data": {
@@ -63,8 +76,8 @@ fn graphql_returns_deployment_info() {
6376
"hash": "subgraph_1",
6477
"namespace": namespace,
6578
"name": "subgraph_1",
66-
"nodeId": NODE_ID.to_string(),
67-
"shard": "primary",
79+
"nodeId": node.to_string(),
80+
"shard": shard,
6881
"chain": NETWORK_NAME,
6982
"versionStatus": "current",
7083
"isActive": true,

store/postgres/src/deployment_store.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,11 @@ impl DeploymentStore {
910910

911911
let mut conn = self.get_conn()?;
912912
let layout = store.layout(&mut conn, site.cheap_clone())?;
913-
layout.block_time(&mut conn, block)
913+
if ENV_VARS.store.last_rollup_from_poi {
914+
layout.block_time(&mut conn, block)
915+
} else {
916+
layout.last_rollup(&mut conn)
917+
}
914918
}
915919

916920
pub(crate) async fn supports_proof_of_indexing<'a>(

store/postgres/src/relational.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,20 @@ impl Layout {
10121012
Ok(block_time)
10131013
}
10141014

1015+
/// Find the time of the last rollup for the subgraph. We do this by
1016+
/// looking for the maximum timestamp in any aggregation table and
1017+
/// adding a little bit more than the corresponding interval to it. This
1018+
/// method crucially depends on the fact that we always write the rollup
1019+
/// for all aggregations, meaning that if some aggregations do not have
1020+
/// an entry with the maximum timestamp that there was just no data for
1021+
/// that interval, but we did try to aggregate at that time.
1022+
pub(crate) fn last_rollup(
1023+
&self,
1024+
conn: &mut PgConnection,
1025+
) -> Result<Option<BlockTime>, StoreError> {
1026+
Rollup::last_rollup(&self.rollups, conn)
1027+
}
1028+
10151029
/// Construct `Rolllup` for each of the aggregation mappings
10161030
/// `schema.agg_mappings()` and return them in the same order as the
10171031
/// aggregation mappings

store/postgres/src/relational/rollup.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ use std::sync::Arc;
6060

6161
use diesel::{sql_query, PgConnection, RunQueryDsl as _};
6262

63-
use diesel::sql_types::{Integer, Timestamptz};
63+
use diesel::sql_types::{Integer, Nullable, Timestamptz};
6464
use graph::blockchain::BlockTime;
6565
use graph::components::store::{BlockNumber, StoreError};
6666
use graph::constraint_violation;
@@ -70,6 +70,7 @@ use graph::schema::{
7070
};
7171
use graph::sqlparser::ast as p;
7272
use graph::sqlparser::parser::ParserError;
73+
use itertools::Itertools;
7374

7475
use crate::relational::Table;
7576

@@ -229,6 +230,10 @@ pub(crate) struct Rollup {
229230
#[allow(dead_code)]
230231
agg_table: Arc<Table>,
231232
insert_sql: String,
233+
/// A query that determines the last time a rollup was done. The query
234+
/// finds the latest timestamp in the aggregation table and adds the
235+
/// length of the aggregation interval to deduce the last rollup time
236+
last_rollup_sql: String,
232237
}
233238

234239
impl Rollup {
@@ -256,10 +261,12 @@ impl Rollup {
256261
);
257262
let mut insert_sql = String::new();
258263
sql.insert(&mut insert_sql)?;
264+
let last_rollup_sql = sql.last_rollup();
259265
Ok(Self {
260266
interval,
261267
agg_table,
262268
insert_sql,
269+
last_rollup_sql,
263270
})
264271
}
265272

@@ -275,6 +282,32 @@ impl Rollup {
275282
.bind::<Integer, _>(block);
276283
query.execute(conn)
277284
}
285+
286+
pub(crate) fn last_rollup(
287+
rollups: &[Rollup],
288+
conn: &mut PgConnection,
289+
) -> Result<Option<BlockTime>, StoreError> {
290+
#[derive(QueryableByName)]
291+
#[diesel(check_for_backend(diesel::pg::Pg))]
292+
struct BlockTimeRes {
293+
#[diesel(sql_type = Nullable<Timestamptz>)]
294+
last_rollup: Option<BlockTime>,
295+
}
296+
297+
if rollups.is_empty() {
298+
return Ok(None);
299+
}
300+
301+
let union_all = rollups
302+
.iter()
303+
.map(|rollup| &rollup.last_rollup_sql)
304+
.join(" union all ");
305+
let query = format!("select max(last_rollup) as last_rollup from ({union_all}) as a");
306+
let last_rollup = sql_query(&query)
307+
.get_result::<BlockTimeRes>(conn)
308+
.map(|res| res.last_rollup)?;
309+
Ok(last_rollup)
310+
}
278311
}
279312

280313
struct RollupSql<'a> {
@@ -479,6 +512,19 @@ impl<'a> RollupSql<'a> {
479512
self.insert_bucket(w)
480513
}
481514
}
515+
516+
/// Generate a query that selects the timestamp of the last rollup
517+
fn last_rollup(&self) -> String {
518+
// The timestamp column contains the timestamp of the start of the
519+
// last bucket. The last rollup was therefore at least
520+
// `self.interval` after that. We add 1 second to make sure we are
521+
// well within the next bucket
522+
let secs = self.interval.as_duration().as_secs() + 1;
523+
format!(
524+
"select max(timestamp) + '{} s'::interval as last_rollup from {}",
525+
secs, self.agg_table.qualified_name
526+
)
527+
}
482528
}
483529

484530
/// Write the elements in `list` separated by commas into `w`. The list

0 commit comments

Comments
 (0)