Skip to content

Commit 86a8367

Browse files
authored
PollingBlockStream: Move sync status logic to instance manager (#3108)
* polling_block_stream: Move sync status logic to instance manager * instance_manager: Add test for is_deployment_synced * polling_block_stream: Remove last subgraph store dependency This moves the time to sync metrics from the PollingBlockStream to the instance_manager. * polling_block_stream: Remove subgraph_store field/parameter * instance_manager: Make sync status compare sub.block >= (chain.block - 1) * store: Add chain head cache to ChainStore * blockchain: Remove unused writable store dependency
1 parent 657f059 commit 86a8367

File tree

8 files changed

+69
-34
lines changed

8 files changed

+69
-34
lines changed

chain/ethereum/src/chain.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,6 @@ impl Blockchain for Chain {
232232
async fn new_polling_block_stream(
233233
&self,
234234
deployment: DeploymentLocator,
235-
writable: Arc<dyn WritableStore>,
236235
start_blocks: Vec<BlockNumber>,
237236
subgraph_start_block: Option<BlockPtr>,
238237
filter: Arc<Self::TriggerFilter>,
@@ -272,7 +271,6 @@ impl Blockchain for Chain {
272271
};
273272

274273
Ok(Box::new(PollingBlockStream::new(
275-
writable,
276274
chain_store,
277275
chain_head_update_stream,
278276
adapter,

chain/near/src/chain.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ impl Blockchain for Chain {
143143
async fn new_polling_block_stream(
144144
&self,
145145
_deployment: DeploymentLocator,
146-
_writable: Arc<dyn WritableStore>,
147146
_start_blocks: Vec<BlockNumber>,
148147
_subgraph_start_block: Option<BlockPtr>,
149148
_filter: Arc<Self::TriggerFilter>,

core/src/subgraph/instance_manager.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use std::time::{Duration, Instant};
3030
use tokio::task;
3131

3232
const MINUTE: Duration = Duration::from_secs(60);
33+
3334
const BUFFERED_BLOCK_STREAM_SIZE: usize = 100;
3435
const BUFFERED_FIREHOSE_STREAM_SIZE: usize = 1;
3536

@@ -481,7 +482,6 @@ async fn new_block_stream<C: Blockchain>(
481482

482483
chain.new_polling_block_stream(
483484
inputs.deployment.clone(),
484-
inputs.store.clone(),
485485
inputs.start_blocks.clone(),
486486
start_block,
487487
Arc::new(filter.clone()),
@@ -514,6 +514,7 @@ where
514514
let id_for_err = inputs.deployment.hash.clone();
515515
let mut should_try_unfail_deterministic = true;
516516
let mut should_try_unfail_non_deterministic = true;
517+
let mut synced = false;
517518

518519
// Exponential backoff that starts with two minutes and keeps
519520
// increasing its timeout exponentially until it reaches the ceiling.
@@ -532,6 +533,8 @@ where
532533
.await?
533534
.map_err(CancelableError::Error)
534535
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
536+
let chain = inputs.chain.clone();
537+
let chain_store = chain.chain_store();
535538

536539
// Keep the stream's cancel guard around to be able to shut it down
537540
// when the subgraph deployment is unassigned
@@ -706,6 +709,20 @@ where
706709

707710
match res {
708711
Ok(needs_restart) => {
712+
// Once synced, no need to try to update the status again.
713+
if !synced && is_deployment_synced(&block_ptr, chain_store.cached_head_ptr()?) {
714+
// Updating the sync status is an one way operation.
715+
// This state change exists: not synced -> synced
716+
// This state change does NOT: synced -> not synced
717+
inputs.store.deployment_synced()?;
718+
719+
// Stop trying to update the sync status.
720+
synced = true;
721+
722+
// Stop recording time-to-sync metrics.
723+
ctx.block_stream_metrics.stopwatch.disable();
724+
}
725+
709726
// Keep trying to unfail subgraph for everytime it advances block(s) until it's
710727
// health is not Failed anymore.
711728
if should_try_unfail_non_deterministic {
@@ -1293,3 +1310,35 @@ fn persist_dynamic_data_sources<T: RuntimeHostBuilder<C>, C: Blockchain>(
12931310
// Merge filters from data sources into the block stream builder
12941311
ctx.state.filter.extend(data_sources.iter());
12951312
}
1313+
1314+
/// Checks if the Deployment BlockPtr is at least one block behind to the chain head.
1315+
fn is_deployment_synced(deployment_head_ptr: &BlockPtr, chain_head_ptr: Option<BlockPtr>) -> bool {
1316+
matches!((deployment_head_ptr, &chain_head_ptr), (b1, Some(b2)) if b1.number >= (b2.number - 1))
1317+
}
1318+
1319+
#[test]
1320+
fn test_is_deployment_synced() {
1321+
let block_0 = BlockPtr::try_from((
1322+
"bd34884280958002c51d3f7b5f853e6febeba33de0f40d15b0363006533c924f",
1323+
0,
1324+
))
1325+
.unwrap();
1326+
let block_1 = BlockPtr::try_from((
1327+
"8511fa04b64657581e3f00e14543c1d522d5d7e771b54aa3060b662ade47da13",
1328+
1,
1329+
))
1330+
.unwrap();
1331+
let block_2 = BlockPtr::try_from((
1332+
"b98fb783b49de5652097a989414c767824dff7e7fd765a63b493772511db81c1",
1333+
2,
1334+
))
1335+
.unwrap();
1336+
1337+
assert!(!is_deployment_synced(&block_0, None));
1338+
assert!(!is_deployment_synced(&block_2, None));
1339+
1340+
assert!(!is_deployment_synced(&block_0, Some(block_2.clone())));
1341+
1342+
assert!(is_deployment_synced(&block_1, Some(block_2.clone())));
1343+
assert!(is_deployment_synced(&block_2, Some(block_2.clone())));
1344+
}

graph/src/blockchain/mock.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ impl Blockchain for MockBlockchain {
308308
async fn new_polling_block_stream(
309309
&self,
310310
_deployment: crate::components::store::DeploymentLocator,
311-
_writable: Arc<dyn crate::components::store::WritableStore>,
312311
_start_blocks: Vec<crate::components::store::BlockNumber>,
313312
_subgraph_start_block: Option<BlockPtr>,
314313
_filter: std::sync::Arc<Self::TriggerFilter>,

graph/src/blockchain/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static {
121121
async fn new_polling_block_stream(
122122
&self,
123123
deployment: DeploymentLocator,
124-
writable: Arc<dyn WritableStore>,
125124
start_blocks: Vec<BlockNumber>,
126125
subgraph_start_block: Option<BlockPtr>,
127126
filter: Arc<Self::TriggerFilter>,

graph/src/blockchain/polling_block_stream.rs

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use super::block_stream::{
1414
use super::{Block, BlockPtr, Blockchain};
1515

1616
use crate::components::store::BlockNumber;
17-
use crate::components::store::WritableStore;
1817
use crate::data::subgraph::UnifiedMappingApiVersion;
1918
use crate::prelude::*;
2019
#[cfg(debug_assertions)]
@@ -82,7 +81,6 @@ struct PollingBlockStreamContext<C>
8281
where
8382
C: Blockchain,
8483
{
85-
subgraph_store: Arc<dyn WritableStore>,
8684
chain_store: Arc<dyn ChainStore>,
8785
adapter: Arc<C::TriggersAdapter>,
8886
node_id: NodeId,
@@ -107,7 +105,6 @@ where
107105
impl<C: Blockchain> Clone for PollingBlockStreamContext<C> {
108106
fn clone(&self) -> Self {
109107
Self {
110-
subgraph_store: self.subgraph_store.cheap_clone(),
111108
chain_store: self.chain_store.cheap_clone(),
112109
adapter: self.adapter.clone(),
113110
node_id: self.node_id.clone(),
@@ -153,7 +150,6 @@ where
153150
C: Blockchain,
154151
{
155152
pub fn new(
156-
subgraph_store: Arc<dyn WritableStore>,
157153
chain_store: Arc<dyn ChainStore>,
158154
chain_head_update_stream: ChainHeadUpdateStream,
159155
adapter: Arc<C::TriggersAdapter>,
@@ -175,7 +171,6 @@ where
175171
chain_head_update_stream,
176172
ctx: PollingBlockStreamContext {
177173
current_block: start_block,
178-
subgraph_store,
179174
chain_store,
180175
adapter,
181176
node_id,
@@ -215,9 +210,6 @@ where
215210
continue;
216211
}
217212
ReconciliationStep::Done => {
218-
// Reconciliation is complete, so try to mark subgraph as Synced
219-
ctx.update_subgraph_synced_status()?;
220-
221213
return Ok(NextBlocks::Done);
222214
}
223215
ReconciliationStep::Revert(from, to) => return Ok(NextBlocks::Revert(from, to)),
@@ -484,25 +476,6 @@ where
484476

485477
Ok(ptr)
486478
}
487-
488-
/// Set subgraph deployment entity synced flag if and only if the subgraph block pointer is
489-
/// caught up to the head block pointer.
490-
fn update_subgraph_synced_status(&self) -> Result<(), StoreError> {
491-
let head_ptr_opt = self.chain_store.chain_head_ptr()?;
492-
let subgraph_ptr = self.current_block.clone();
493-
494-
if head_ptr_opt != subgraph_ptr || head_ptr_opt.is_none() || subgraph_ptr.is_none() {
495-
// Not synced yet
496-
Ok(())
497-
} else {
498-
// Synced
499-
500-
// Stop recording time-to-sync metrics.
501-
self.metrics.stopwatch.disable();
502-
503-
self.subgraph_store.deployment_synced()
504-
}
505-
}
506479
}
507480

508481
impl<C: Blockchain> BlockStream<C> for PollingBlockStream<C> {}

graph/src/components/store.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,6 +1170,9 @@ pub trait ChainStore: Send + Sync + 'static {
11701170
/// The head block pointer will be None on initial set up.
11711171
fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error>;
11721172

1173+
/// In-memory time cached version of `chain_head_ptr`.
1174+
fn cached_head_ptr(&self) -> Result<Option<BlockPtr>, Error>;
1175+
11731176
/// Get the current head block cursor for this chain.
11741177
///
11751178
/// The head block cursor will be None on initial set up.

store/postgres/src/chain_store.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use diesel::sql_types::Text;
55
use diesel::{insert_into, update};
66
use graph::blockchain::{Block, ChainIdentifier};
77
use graph::prelude::web3::types::H256;
8+
use graph::util::timed_cache::TimedCache;
89
use graph::{
910
constraint_violation,
1011
prelude::{
@@ -19,6 +20,7 @@ use std::{
1920
convert::{TryFrom, TryInto},
2021
iter::FromIterator,
2122
sync::Arc,
23+
time::Duration,
2224
};
2325

2426
use graph::prelude::{
@@ -1176,6 +1178,7 @@ pub struct ChainStore {
11761178
genesis_block_ptr: BlockPtr,
11771179
status: ChainStatus,
11781180
chain_head_update_sender: ChainHeadUpdateSender,
1181+
block_cache: TimedCache<&'static str, BlockPtr>,
11791182
}
11801183

11811184
impl ChainStore {
@@ -1194,6 +1197,7 @@ impl ChainStore {
11941197
genesis_block_ptr: BlockPtr::new(net_identifier.genesis_block_hash.clone(), 0),
11951198
status,
11961199
chain_head_update_sender,
1200+
block_cache: TimedCache::new(Duration::from_secs(5)),
11971201
};
11981202

11991203
store
@@ -1408,11 +1412,22 @@ impl ChainStoreTrait for ChainStore {
14081412
(None, None) => None,
14091413
_ => unreachable!(),
14101414
})
1411-
.and_then(|opt| opt)
1415+
.and_then(|opt: Option<BlockPtr>| opt)
1416+
.map(|head| {
1417+
self.block_cache.set("head", Arc::new(head.clone()));
1418+
head
1419+
})
14121420
})
14131421
.map_err(Error::from)
14141422
}
14151423

1424+
fn cached_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
1425+
match self.block_cache.get("head") {
1426+
Some(head) => Ok(Some(head.as_ref().clone())),
1427+
None => self.chain_head_ptr(),
1428+
}
1429+
}
1430+
14161431
fn chain_head_cursor(&self) -> Result<Option<String>, Error> {
14171432
use public::ethereum_networks::dsl::*;
14181433

0 commit comments

Comments
 (0)