Skip to content

Commit 50c27f8

Browse files
JannisJannis Pohlmann
authored andcommitted
store/postgres: Test store subscriptions with separate meta data stream
This is because entity operations for meta data entities and subgraph block pointer updates are now emitted via the subgraph of subgraphs rather than the subgraph itself. Not sure why they weren't before yet.
1 parent f5db182 commit 50c27f8

File tree

1 file changed

+57
-32
lines changed

1 file changed

+57
-32
lines changed

store/postgres/tests/store.rs

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1876,7 +1876,9 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
18761876
)
18771877
.unwrap();
18781878

1879-
// Create a store subscription
1879+
// Create store subscriptions
1880+
let meta_subscription =
1881+
subscribe_and_consume(store.clone(), &SUBGRAPHS_ID, "SubgraphDeployment");
18801882
let subscription = subscribe_and_consume(store.clone(), &subgraph_id, "User");
18811883

18821884
// Add two entities to the store
@@ -1948,6 +1950,24 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
19481950
)
19491951
.unwrap();
19501952

1953+
// We're expecting two meta data events to be written to the meta data subscription
1954+
let meta_expected = vec![
1955+
StoreEvent::new(vec![EntityChange {
1956+
subgraph_id: SubgraphDeploymentId::new("subgraphs").unwrap(),
1957+
entity_type: "SubgraphDeployment".to_owned(),
1958+
entity_id: "EntityChangeTestSubgraph".to_owned(),
1959+
operation: EntityChangeOperation::Set,
1960+
}]),
1961+
StoreEvent::new(vec![EntityChange {
1962+
subgraph_id: SubgraphDeploymentId::new("subgraphs").unwrap(),
1963+
entity_type: "SubgraphDeployment".to_owned(),
1964+
entity_id: "EntityChangeTestSubgraph".to_owned(),
1965+
operation: EntityChangeOperation::Set,
1966+
}]),
1967+
];
1968+
1969+
check_events(meta_subscription, meta_expected);
1970+
19511971
// We're expecting two events to be written to the subscription stream
19521972
let expected = vec![
19531973
StoreEvent::new(vec![
@@ -1963,12 +1983,6 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
19631983
entity_id: added_entities[1].clone().0,
19641984
operation: EntityChangeOperation::Set,
19651985
},
1966-
EntityChange {
1967-
subgraph_id: SubgraphDeploymentId::new("subgraphs").unwrap(),
1968-
entity_type: "SubgraphDeployment".to_owned(),
1969-
entity_id: "EntityChangeTestSubgraph".to_owned(),
1970-
operation: EntityChangeOperation::Set,
1971-
},
19721986
]),
19731987
StoreEvent::new(vec![
19741988
EntityChange {
@@ -1983,12 +1997,6 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
19831997
entity_id: added_entities[1].clone().0,
19841998
operation: EntityChangeOperation::Removed,
19851999
},
1986-
EntityChange {
1987-
subgraph_id: SubgraphDeploymentId::new("subgraphs").unwrap(),
1988-
entity_type: "SubgraphDeployment".to_owned(),
1989-
entity_id: "EntityChangeTestSubgraph".to_owned(),
1990-
operation: EntityChangeOperation::Set,
1991-
},
19922000
]),
19932001
];
19942002

@@ -1999,13 +2007,22 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
19992007
#[test]
20002008
fn throttle_subscription_delivers() {
20012009
run_test(|store| {
2002-
let subscription = subscribe_and_consume(store.clone(), &TEST_SUBGRAPH_ID, "user");
2003-
let subscription = subscription.throttle_while_syncing(
2004-
&*LOGGER,
2005-
store.clone(),
2006-
TEST_SUBGRAPH_ID.clone(),
2007-
Duration::from_millis(500),
2008-
);
2010+
let meta_subscription =
2011+
subscribe_and_consume(store.clone(), &SUBGRAPHS_ID, "SubgraphDeployment")
2012+
.throttle_while_syncing(
2013+
&*LOGGER,
2014+
store.clone(),
2015+
SUBGRAPHS_ID.clone(),
2016+
Duration::from_millis(500),
2017+
);
2018+
2019+
let subscription = subscribe_and_consume(store.clone(), &TEST_SUBGRAPH_ID, "user")
2020+
.throttle_while_syncing(
2021+
&*LOGGER,
2022+
store.clone(),
2023+
TEST_SUBGRAPH_ID.clone(),
2024+
Duration::from_millis(500),
2025+
);
20092026

20102027
let user4 = create_test_entity(
20112028
"4",
@@ -2027,10 +2044,18 @@ fn throttle_subscription_delivers() {
20272044
)
20282045
.unwrap();
20292046

2030-
let expected = StoreEvent::new(vec![
2031-
make_entity_change("user", "4", EntityChangeOperation::Set),
2032-
make_deployment_change("testsubgraph", EntityChangeOperation::Set),
2033-
]);
2047+
let meta_expected = StoreEvent::new(vec![make_deployment_change(
2048+
"testsubgraph",
2049+
EntityChangeOperation::Set,
2050+
)]);
2051+
2052+
check_events(meta_subscription, vec![meta_expected]);
2053+
2054+
let expected = StoreEvent::new(vec![make_entity_change(
2055+
"user",
2056+
"4",
2057+
EntityChangeOperation::Set,
2058+
)]);
20342059

20352060
check_events(subscription, vec![expected])
20362061
})
@@ -2040,14 +2065,14 @@ fn throttle_subscription_delivers() {
20402065
fn throttle_subscription_throttles() {
20412066
run_test(
20422067
|store| -> Box<Future<Item = (), Error = graph::tokio_timer::timeout::Error<()>> + Send> {
2043-
let subscription = subscribe_and_consume(store.clone(), &TEST_SUBGRAPH_ID, "user");
2044-
// Throttle subscriptions for a very long time
2045-
let subscription = subscription.throttle_while_syncing(
2046-
&*LOGGER,
2047-
store.clone(),
2048-
TEST_SUBGRAPH_ID.clone(),
2049-
Duration::from_secs(30),
2050-
);
2068+
// Throttle for a very long time (30s)
2069+
let subscription = subscribe_and_consume(store.clone(), &TEST_SUBGRAPH_ID, "user")
2070+
.throttle_while_syncing(
2071+
&*LOGGER,
2072+
store.clone(),
2073+
TEST_SUBGRAPH_ID.clone(),
2074+
Duration::from_secs(30),
2075+
);
20512076

20522077
let user4 = create_test_entity(
20532078
"4",

0 commit comments

Comments
 (0)