Skip to content

Commit be90031

Browse files
committed
Subgraph Composition: Fix Entity Ordering with different Idtyp
1 parent 863f002 commit be90031

File tree

3 files changed

+106
-33
lines changed

3 files changed

+106
-33
lines changed

store/postgres/src/relational.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,13 @@ impl Layout {
603603
Ok((ewt, block))
604604
};
605605

606+
fn compare_entity_data_ext(a: &EntityDataExt, b: &EntityDataExt) -> std::cmp::Ordering {
607+
a.block_number
608+
.cmp(&b.block_number)
609+
.then_with(|| a.entity.cmp(&b.entity))
610+
.then_with(|| a.id.cmp(&b.id))
611+
}
612+
606613
// The algorithm is a similar to merge sort algorithm and it relays on the fact that both vectors
607614
// are ordered by (block_number, entity_type, entity_id). It advances simultaneously entities from
608615
// both lower_vec and upper_vec and tries to match entities that have entries in both vectors for
@@ -616,7 +623,7 @@ impl Layout {
616623
while lower_now.is_some() || upper_now.is_some() {
617624
let (ewt, block) = match (lower_now, upper_now) {
618625
(Some(lower), Some(upper)) => {
619-
match lower.cmp(&upper) {
626+
match compare_entity_data_ext(lower, upper) {
620627
std::cmp::Ordering::Greater => {
621628
// we have upper bound at this block, but no lower bounds at the same block so it's deletion
622629
let (ewt, block) = transform(upper, EntityOperationKind::Delete)?;

store/postgres/src/relational_queries.rs

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use graph::schema::{EntityType, FulltextAlgorithm, FulltextConfig, InputSchema};
2727
use graph::{components::store::AttributeNames, data::store::scalar};
2828
use inflector::Inflector;
2929
use itertools::Itertools;
30-
use std::cmp::Ordering;
3130
use std::collections::{BTreeMap, BTreeSet, HashSet};
3231
use std::convert::TryFrom;
3332
use std::fmt::{self, Display};
@@ -541,48 +540,20 @@ impl EntityData {
541540
}
542541
}
543542

544-
#[derive(QueryableByName, Clone, Debug, Default, Eq)]
543+
#[derive(QueryableByName, Clone, Debug, Default)]
545544
pub struct EntityDataExt {
546545
#[diesel(sql_type = Text)]
547546
pub entity: String,
548547
#[diesel(sql_type = Jsonb)]
549548
pub data: serde_json::Value,
550549
#[diesel(sql_type = Integer)]
551550
pub block_number: i32,
552-
#[diesel(sql_type = Text)]
553-
pub id: String,
551+
#[diesel(sql_type = Binary)]
552+
pub id: Vec<u8>,
554553
#[diesel(sql_type = BigInt)]
555554
pub vid: i64,
556555
}
557556

558-
impl Ord for EntityDataExt {
559-
fn cmp(&self, other: &Self) -> Ordering {
560-
let ord = self.block_number.cmp(&other.block_number);
561-
if ord != Ordering::Equal {
562-
ord
563-
} else {
564-
let ord = self.entity.cmp(&other.entity);
565-
if ord != Ordering::Equal {
566-
ord
567-
} else {
568-
self.id.cmp(&other.id)
569-
}
570-
}
571-
}
572-
}
573-
574-
impl PartialOrd for EntityDataExt {
575-
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
576-
Some(self.cmp(other))
577-
}
578-
}
579-
580-
impl PartialEq for EntityDataExt {
581-
fn eq(&self, other: &Self) -> bool {
582-
self.cmp(other) == Ordering::Equal
583-
}
584-
}
585-
586557
/// The equivalent of `graph::data::store::Value` but in a form that does
587558
/// not require further transformation during `walk_ast`. This form takes
588559
/// the idiosyncrasies of how we serialize values into account (e.g., that

store/test-store/tests/postgres/writable.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,32 @@ const SCHEMA_GQL: &str = "
2828
id: ID!,
2929
count: Int!,
3030
}
31+
type BytesId @entity {
32+
id: Bytes!,
33+
value: String!
34+
}
35+
type Int8Id @entity {
36+
id: Int8!,
37+
value: String!
38+
}
39+
type StringId @entity {
40+
id: String!,
41+
value: String!
42+
}
43+
type PoolCreated @entity(immutable: true) {
44+
id: Bytes!,
45+
token0: Bytes!,
46+
token1: Bytes!,
47+
fee: Int!,
48+
tickSpacing: Int!,
49+
pool: Bytes!,
50+
blockNumber: BigInt!,
51+
blockTimestamp: BigInt!,
52+
transactionHash: Bytes!,
53+
transactionFrom: Bytes!,
54+
transactionGasPrice: BigInt!,
55+
logIndex: BigInt!
56+
}
3157
";
3258

3359
const COUNTER: &str = "Counter";
@@ -407,3 +433,72 @@ fn read_immutable_only_range_test() {
407433
assert_eq!(e.len(), 4);
408434
})
409435
}
436+
437+
#[test]
438+
fn read_range_pool_created_test() {
439+
run_test(|store, writable, sourceable, deployment| async move {
440+
let result_entities = vec![
441+
format!("(1, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369621), blockTimestamp: BigInt(1620243254), fee: Int(500), id: Bytes(0xff80818283848586), logIndex: BigInt(0), pool: Bytes(0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8), tickSpacing: Int(10), token0: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000000), vid: Int8(1) }}, vid: 1 }}])"),
442+
format!("(2, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369622), blockTimestamp: BigInt(1620243255), fee: Int(3000), id: Bytes(0xff90919293949596), logIndex: BigInt(1), pool: Bytes(0x4585fe77225b41b697c938b018e2ac67ac5a20c0), tickSpacing: Int(60), token0: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000001), vid: Int8(2) }}, vid: 2 }}])"),
443+
];
444+
445+
// Rest of the test remains the same
446+
let subgraph_store = store.subgraph_store();
447+
writable.deployment_synced(block_pointer(0)).unwrap();
448+
449+
let pool_created_type = TEST_SUBGRAPH_SCHEMA.entity_type("PoolCreated").unwrap();
450+
let entity_types = vec![pool_created_type.clone()];
451+
452+
for count in (1..=2).map(|x| x as i64) {
453+
let id = if count == 1 {
454+
"0xff80818283848586"
455+
} else {
456+
"0xff90919293949596"
457+
};
458+
459+
let data = entity! { TEST_SUBGRAPH_SCHEMA =>
460+
id: id,
461+
token0: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" },
462+
token1: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
463+
fee: if count == 1 { 500 } else { 3000 },
464+
tickSpacing: if count == 1 { 10 } else { 60 },
465+
pool: if count == 1 { "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8" } else { "0x4585fe77225b41b697c938b018e2ac67ac5a20c0" },
466+
blockNumber: 12369621 + count - 1,
467+
blockTimestamp: 1620243254 + count - 1,
468+
transactionHash: format!("0x1234{:0>76}", if count == 1 { "0" } else { "1" }),
469+
transactionFrom: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" },
470+
transactionGasPrice: 100000000000i64,
471+
logIndex: count - 1,
472+
vid: count
473+
};
474+
475+
let key = pool_created_type.parse_key(id).unwrap();
476+
let op = EntityOperation::Set {
477+
key: key.clone(),
478+
data,
479+
};
480+
481+
transact_entity_operations(
482+
&subgraph_store,
483+
&deployment,
484+
block_pointer(count as u8),
485+
vec![op],
486+
)
487+
.await
488+
.unwrap();
489+
}
490+
writable.flush().await.unwrap();
491+
writable.deployment_synced(block_pointer(0)).unwrap();
492+
493+
let br: Range<BlockNumber> = 0..18;
494+
let e: BTreeMap<i32, Vec<EntitySourceOperation>> = sourceable
495+
.get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone())
496+
.unwrap();
497+
assert_eq!(e.len(), 2);
498+
for en in &e {
499+
let index = *en.0 - 1;
500+
let a = result_entities[index as usize].clone();
501+
assert_eq!(a, format!("{:?}", en));
502+
}
503+
})
504+
}

0 commit comments

Comments
 (0)