Skip to content

Commit 8c8dbb4

Browse files
committed
graph, store: Avoid using to_jsonb when looking up a single entity
Our queries all ultimately get their data by doing something like `select to_jsonb(c.*) from ( ... complicated query ... ) c` because when these queries were written it was far from obvious how to generate queries with Diesel that select columns whose number and types aren't known at compile time. The call to `to_jsonb` forces Postgres to encode all data as JSON, which graph-node then has to deserialize which is pretty wasteful both in terms of memory and CPU. This commit is focused on the groundwork for getting rid of these JSON conversions and querying data in a more compact and native form with fewer conversions. It only uses it in the fairly simple case of `Layout.find`, but future changes will expand that use
1 parent 415005f commit 8c8dbb4

File tree

11 files changed

+1155
-79
lines changed

11 files changed

+1155
-79
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ axum = "0.7.5"
3333
chrono = "0.4.38"
3434
clap = { version = "4.5.4", features = ["derive", "env"] }
3535
derivative = "2.2.0"
36-
diesel = { version = "2.2.4", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid"] }
36+
diesel = { version = "2.2.4", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid", "i-implement-a-third-party-backend-and-opt-into-breaking-changes"] }
3737
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
38-
diesel-dynamic-schema = "0.2.1"
38+
diesel-dynamic-schema = { version = "0.2.1", features = ["postgres"] }
3939
diesel_derives = "2.1.4"
4040
diesel_migrations = "2.1.0"
4141
graph = { path = "./graph" }

graph/src/data/store/scalar/bigdecimal.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use diesel::deserialize::FromSqlRow;
22
use diesel::expression::AsExpression;
3-
use num_bigint;
3+
use num_bigint::{self, ToBigInt};
44
use num_traits::FromPrimitive;
55
use serde::{self, Deserialize, Serialize};
66
use stable_hash::{FieldAddress, StableHash};
@@ -10,8 +10,8 @@ use std::fmt::{self, Display, Formatter};
1010
use std::ops::{Add, Div, Mul, Sub};
1111
use std::str::FromStr;
1212

13+
use crate::anyhow::anyhow;
1314
use crate::runtime::gas::{Gas, GasSizeOf};
14-
1515
use old_bigdecimal::BigDecimal as OldBigDecimal;
1616
pub use old_bigdecimal::ToPrimitive;
1717

@@ -60,6 +60,26 @@ impl BigDecimal {
6060
self.0.as_bigint_and_exponent()
6161
}
6262

63+
pub fn is_integer(&self) -> bool {
64+
self.0.is_integer()
65+
}
66+
67+
/// Convert this `BigDecimal` to a `BigInt` if it is an integer, and
68+
/// return an error if it is not. Also return an error if the integer
69+
/// would use too many digits as definied by `BigInt::new`
70+
pub fn to_bigint(&self) -> Result<BigInt, anyhow::Error> {
71+
if !self.is_integer() {
72+
return Err(anyhow!(
73+
"Cannot convert non-integer `BigDecimal` to `BigInt`: {:?}",
74+
self
75+
));
76+
}
77+
let bi = self.0.to_bigint().ok_or_else(|| {
78+
anyhow!("The implementation of `to_bigint` for `OldBigDecimal` always returns `Some`")
79+
})?;
80+
BigInt::new(bi)
81+
}
82+
6383
pub fn digits(&self) -> u64 {
6484
self.0.digits()
6585
}

graph/src/data/store/scalar/bytes.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use diesel::deserialize::FromSql;
2+
use diesel::pg::PgValue;
13
use diesel::serialize::ToSql;
24
use hex;
35
use serde::{self, Deserialize, Serialize};
@@ -115,3 +117,9 @@ impl ToSql<diesel::sql_types::Binary, diesel::pg::Pg> for Bytes {
115117
<_ as ToSql<diesel::sql_types::Binary, _>>::to_sql(self.as_slice(), &mut out.reborrow())
116118
}
117119
}
120+
121+
impl FromSql<diesel::sql_types::Binary, diesel::pg::Pg> for Bytes {
122+
fn from_sql(value: PgValue) -> diesel::deserialize::Result<Self> {
123+
<Vec<u8> as FromSql<diesel::sql_types::Binary, _>>::from_sql(value).map(Bytes::from)
124+
}
125+
}

graph/src/data/store/scalar/timestamp.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use chrono::{DateTime, Utc};
2+
use diesel::deserialize::FromSql;
3+
use diesel::pg::PgValue;
24
use diesel::serialize::ToSql;
35
use serde::{self, Deserialize, Serialize};
46
use stable_hash::StableHash;
@@ -107,3 +109,10 @@ impl GasSizeOf for Timestamp {
107109
Some(Gas::new(std::mem::size_of::<Timestamp>().saturating_into()))
108110
}
109111
}
112+
113+
impl FromSql<diesel::sql_types::Timestamptz, diesel::pg::Pg> for Timestamp {
114+
fn from_sql(value: PgValue) -> diesel::deserialize::Result<Self> {
115+
<DateTime<Utc> as FromSql<diesel::sql_types::Timestamptz, _>>::from_sql(value)
116+
.map(Timestamp)
117+
}
118+
}

graph/src/data/value.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,24 @@ impl PartialEq<&str> for Word {
115115
}
116116
}
117117

118+
impl PartialEq<str> for Word {
119+
fn eq(&self, other: &str) -> bool {
120+
self.as_str() == other
121+
}
122+
}
123+
124+
impl PartialEq<String> for Word {
125+
fn eq(&self, other: &String) -> bool {
126+
self.as_str() == other
127+
}
128+
}
129+
130+
impl PartialEq<Word> for String {
131+
fn eq(&self, other: &Word) -> bool {
132+
self.as_str() == other.as_str()
133+
}
134+
}
135+
118136
impl PartialEq<Word> for &str {
119137
fn eq(&self, other: &Word) -> bool {
120138
self == &other.as_str()

graph/src/data_source/causality_region.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use diesel::{
44
serialize::{Output, ToSql},
55
sql_types::Integer,
66
};
7+
use diesel_derives::AsExpression;
78
use std::fmt;
89

910
use crate::components::subgraph::Entity;
@@ -20,7 +21,10 @@ use crate::derive::CacheWeight;
2021
/// This necessary for determinism because offchain data sources don't have a deterministic order of
2122
/// execution, for example an IPFS file may become available at any point in time. The isolation
2223
/// rules make the indexing result reproducible, given a set of available files.
23-
#[derive(Debug, CacheWeight, Copy, Clone, PartialEq, Eq, FromSqlRow, Hash, PartialOrd, Ord)]
24+
#[derive(
25+
Debug, CacheWeight, Copy, Clone, PartialEq, Eq, FromSqlRow, Hash, PartialOrd, Ord, AsExpression,
26+
)]
27+
#[diesel(sql_type = Integer)]
2428
pub struct CausalityRegion(i32);
2529

2630
impl fmt::Display for CausalityRegion {

store/postgres/src/primary.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,12 @@ impl Borrow<str> for Namespace {
296296
}
297297
}
298298

299+
impl Borrow<str> for &Namespace {
300+
fn borrow(&self) -> &str {
301+
&self.0
302+
}
303+
}
304+
299305
/// A marker that an `i32` references a deployment. Values of this type hold
300306
/// the primary key from the `deployment_schemas` table
301307
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, AsExpression, FromSqlRow)]

store/postgres/src/relational.rs

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,20 @@ mod ddl_tests;
1414
#[cfg(test)]
1515
mod query_tests;
1616

17+
pub(crate) mod dsl;
1718
pub(crate) mod index;
1819
mod prune;
1920
mod rollup;
21+
pub(crate) mod value;
2022

2123
use diesel::deserialize::FromSql;
2224
use diesel::pg::Pg;
2325
use diesel::serialize::{Output, ToSql};
2426
use diesel::sql_types::Text;
2527
use diesel::{connection::SimpleConnection, Connection};
26-
use diesel::{debug_query, sql_query, OptionalExtension, PgConnection, QueryResult, RunQueryDsl};
28+
use diesel::{
29+
debug_query, sql_query, OptionalExtension, PgConnection, QueryDsl, QueryResult, RunQueryDsl,
30+
};
2731
use graph::blockchain::BlockTime;
2832
use graph::cheap_clone::CheapClone;
2933
use graph::components::store::write::{RowGroup, WriteChunk};
@@ -50,6 +54,7 @@ use std::str::FromStr;
5054
use std::sync::{Arc, Mutex};
5155
use std::time::{Duration, Instant};
5256

57+
use crate::relational::value::{FromOidRow, OidRow};
5358
use crate::relational_queries::{
5459
ConflictingEntitiesData, ConflictingEntitiesQuery, FindChangesQuery, FindDerivedQuery,
5560
FindPossibleDeletionsQuery, ReturnedEntityData,
@@ -58,10 +63,10 @@ use crate::{
5863
primary::{Namespace, Site},
5964
relational_queries::{
6065
ClampRangeQuery, EntityData, EntityDeletion, FilterCollection, FilterQuery, FindManyQuery,
61-
FindQuery, InsertQuery, RevertClampQuery, RevertRemoveQuery,
66+
InsertQuery, RevertClampQuery, RevertRemoveQuery,
6267
},
6368
};
64-
use graph::components::store::DerivedEntityQuery;
69+
use graph::components::store::{AttributeNames, DerivedEntityQuery};
6570
use graph::data::store::{Id, IdList, IdType, BYTES_SCALAR};
6671
use graph::data::subgraph::schema::POI_TABLE;
6772
use graph::prelude::{
@@ -172,6 +177,12 @@ impl From<String> for SqlName {
172177
}
173178
}
174179

180+
impl From<SqlName> for Word {
181+
fn from(name: SqlName) -> Self {
182+
Word::from(name.0)
183+
}
184+
}
185+
175186
impl fmt::Display for SqlName {
176187
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177188
self.0.fmt(f)
@@ -184,6 +195,12 @@ impl Borrow<str> for &SqlName {
184195
}
185196
}
186197

198+
impl PartialEq<str> for SqlName {
199+
fn eq(&self, other: &str) -> bool {
200+
self.0 == other
201+
}
202+
}
203+
187204
impl FromSql<Text, Pg> for SqlName {
188205
fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result<Self> {
189206
<String as FromSql<Text, Pg>>::from_sql(bytes).map(|s| SqlName::verbatim(s))
@@ -361,9 +378,11 @@ impl Layout {
361378
}
362379

363380
let table_name = SqlName::verbatim(POI_TABLE.to_owned());
381+
let nsp = catalog.site.namespace.clone();
364382
Table {
365383
object: poi_type.to_owned(),
366384
qualified_name: SqlName::qualified_name(&catalog.site.namespace, &table_name),
385+
nsp,
367386
name: table_name,
368387
columns,
369388
// The position of this table in all the tables for this layout; this
@@ -469,11 +488,19 @@ impl Layout {
469488
key: &EntityKey,
470489
block: BlockNumber,
471490
) -> Result<Option<Entity>, StoreError> {
472-
let table = self.table_for_entity(&key.entity_type)?;
473-
FindQuery::new(table.as_ref(), key, block)
474-
.get_result::<EntityData>(conn)
491+
let table = self.table_for_entity(&key.entity_type)?.dsl_table();
492+
let columns = table.selected_columns::<Entity>(&AttributeNames::All, None)?;
493+
494+
let query = table
495+
.select_cols(&columns)
496+
.filter(table.id_eq(&key.entity_id))
497+
.filter(table.at_block(block))
498+
.filter(table.belongs_to_causality_region(key.causality_region));
499+
500+
query
501+
.get_result::<OidRow>(conn)
475502
.optional()?
476-
.map(|entity_data| entity_data.deserialize_with_layout(self, None))
503+
.map(|row| Entity::from_oid_row(row, &self.input_schema, &columns))
477504
.transpose()
478505
}
479506

@@ -1348,6 +1375,21 @@ impl Column {
13481375
})
13491376
}
13501377

1378+
pub fn pseudo_column(name: &str, column_type: ColumnType) -> Column {
1379+
let field_type = q::Type::NamedType(column_type.to_string());
1380+
let name = SqlName::verbatim(name.to_string());
1381+
let field = Word::from(name.as_str());
1382+
Column {
1383+
name,
1384+
field,
1385+
field_type,
1386+
column_type,
1387+
fulltext_fields: None,
1388+
is_reference: false,
1389+
use_prefix_comparison: false,
1390+
}
1391+
}
1392+
13511393
fn new_fulltext(def: &FulltextDefinition) -> Result<Column, StoreError> {
13521394
SqlName::check_valid_identifier(&def.name, "attribute")?;
13531395
let sql_name = SqlName::from(def.name.as_str());
@@ -1440,6 +1482,9 @@ pub struct Table {
14401482
/// `Stats_hour`, not the overall aggregation type `Stats`.
14411483
pub object: EntityType,
14421484

1485+
/// The namespace in which the table lives
1486+
nsp: Namespace,
1487+
14431488
/// The name of the database table for this type ('thing'), snakecased
14441489
/// version of `object`
14451490
pub name: SqlName,
@@ -1494,10 +1539,11 @@ impl Table {
14941539
.collect::<Result<Vec<Column>, StoreError>>()?;
14951540
let qualified_name = SqlName::qualified_name(&catalog.site.namespace, &table_name);
14961541
let immutable = defn.is_immutable();
1497-
1542+
let nsp = catalog.site.namespace.clone();
14981543
let table = Table {
14991544
object: defn.cheap_clone(),
15001545
name: table_name,
1546+
nsp,
15011547
qualified_name,
15021548
// Default `is_account_like` to `false`; the caller should call
15031549
// `refresh` after constructing the layout, but that requires a
@@ -1516,6 +1562,7 @@ impl Table {
15161562
pub fn new_like(&self, namespace: &Namespace, name: &SqlName) -> Arc<Table> {
15171563
let other = Table {
15181564
object: self.object.clone(),
1565+
nsp: self.nsp.clone(),
15191566
name: name.clone(),
15201567
qualified_name: SqlName::qualified_name(namespace, name),
15211568
columns: self.columns.clone(),
@@ -1590,6 +1637,10 @@ impl Table {
15901637
&crate::block_range::BLOCK_RANGE_COLUMN_SQL
15911638
}
15921639
}
1640+
1641+
pub fn dsl_table(&self) -> dsl::Table<'_> {
1642+
dsl::Table::new(self)
1643+
}
15931644
}
15941645

15951646
#[derive(Clone)]

0 commit comments

Comments
 (0)