Skip to content

Commit c066e3d

Browse files
graph: Stop subgraphs passing max endBlock
1 parent 9458fc5 commit c066e3d

File tree

4 files changed

+53
-3
lines changed

4 files changed

+53
-3
lines changed

core/src/subgraph/inputs.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct IndexingInputs<C: Blockchain> {
1717
pub start_blocks: Vec<BlockNumber>,
1818
pub end_blocks: BTreeSet<BlockNumber>,
1919
pub stop_block: Option<BlockNumber>,
20+
pub max_end_block: Option<BlockNumber>,
2021
pub store: Arc<dyn WritableStore>,
2122
pub debug_fork: Option<Arc<dyn SubgraphFork>>,
2223
pub triggers_adapter: Arc<dyn TriggersAdapter<C>>,
@@ -40,6 +41,7 @@ impl<C: Blockchain> IndexingInputs<C> {
4041
start_blocks,
4142
end_blocks,
4243
stop_block,
44+
max_end_block,
4345
store: _,
4446
debug_fork,
4547
triggers_adapter,
@@ -57,6 +59,7 @@ impl<C: Blockchain> IndexingInputs<C> {
5759
start_blocks: start_blocks.clone(),
5860
end_blocks: end_blocks.clone(),
5961
stop_block: stop_block.clone(),
62+
max_end_block: max_end_block.clone(),
6063
store,
6164
debug_fork: debug_fork.clone(),
6265
triggers_adapter: triggers_adapter.clone(),

core/src/subgraph/instance_manager.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,19 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
331331
})
332332
.collect();
333333

334+
// We can set `max_end_block` to the maximum of `end_blocks` and stop the subgraph
335+
// at that point only when there are no dynamic data sources present and offchain
336+
// data sources exist. This is because:
337+
// - Dynamic data sources do not have a defined `end_block`, so we can't determine
338+
// when to stop processing them.
339+
// - Offchain data sources might still require processing beyond the end block of
340+
// onchain data sources, necessitating the continuation of the subgraph.
341+
let max_end_block: Option<BlockNumber> = if data_sources.len() == end_blocks.len() {
342+
end_blocks.iter().max().cloned()
343+
} else {
344+
None
345+
};
346+
334347
let templates = Arc::new(manifest.templates.clone());
335348

336349
// Obtain the debug fork from the subgraph store
@@ -419,6 +432,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
419432
start_blocks,
420433
end_blocks,
421434
stop_block,
435+
max_end_block,
422436
store,
423437
debug_fork,
424438
triggers_adapter,

core/src/subgraph/runner.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,17 @@ where
197197
.unfail_deterministic_error(&current_ptr, &parent_ptr)
198198
.await?;
199199
}
200+
201+
// Stop subgraph when we reach maximum endblock.
202+
if let Some(max_end_block) = self.inputs.max_end_block {
203+
if max_end_block <= current_ptr.block_number() {
204+
info!(self.logger, "Stopping subgraph as we reached maximum endBlock";
205+
"max_end_block" => max_end_block,
206+
"current_block" => current_ptr.block_number());
207+
self.inputs.store.flush().await?;
208+
return Ok(self);
209+
}
210+
}
200211
}
201212

202213
loop {
@@ -837,9 +848,21 @@ where
837848
}
838849
}
839850

840-
if let Some(stop_block) = &self.inputs.stop_block {
841-
if block_ptr.number >= *stop_block {
842-
info!(self.logger, "stop block reached for subgraph");
851+
if let Some(stop_block) = self.inputs.stop_block {
852+
if block_ptr.number >= stop_block {
853+
info!(self.logger, "Stop block reached for subgraph");
854+
return Ok(Action::Stop);
855+
}
856+
}
857+
858+
if let Some(max_end_block) = self.inputs.max_end_block {
859+
if block_ptr.number >= max_end_block {
860+
info!(
861+
self.logger,
862+
"Stopping subgraph as maximum endBlock reached";
863+
"max_end_block" => max_end_block,
864+
"current_block" => block_ptr.number
865+
);
843866
return Ok(Action::Stop);
844867
}
845868
}

graph/src/blockchain/block_stream.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,16 @@ pub enum BlockStreamEvent<C: Blockchain> {
541541
ProcessWasmBlock(BlockPtr, BlockTime, Box<[u8]>, String, FirehoseCursor),
542542
}
543543

544+
impl<C: Blockchain> BlockStreamEvent<C> {
545+
pub fn block_ptr(&self) -> BlockPtr {
546+
match self {
547+
BlockStreamEvent::Revert(ptr, _) => ptr.clone(),
548+
BlockStreamEvent::ProcessBlock(block, _) => block.ptr(),
549+
BlockStreamEvent::ProcessWasmBlock(ptr, _, _, _, _) => ptr.clone(),
550+
}
551+
}
552+
}
553+
544554
impl<C: Blockchain> Clone for BlockStreamEvent<C>
545555
where
546556
C::TriggerData: Clone,

0 commit comments

Comments
 (0)