Skip to content

Commit c1cee73

Browse files
graph: Stop subgraphs passing max endBlock (#5583)
* graph: Stop subgraphs passing max endBlock * update end_block runner test * Update comment Co-authored-by: Krishnanand V P <[email protected]> --------- Co-authored-by: Krishnanand V P <[email protected]>
1 parent 9458fc5 commit c1cee73

File tree

5 files changed

+72
-3
lines changed

5 files changed

+72
-3
lines changed

core/src/subgraph/inputs.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct IndexingInputs<C: Blockchain> {
1717
pub start_blocks: Vec<BlockNumber>,
1818
pub end_blocks: BTreeSet<BlockNumber>,
1919
pub stop_block: Option<BlockNumber>,
20+
pub max_end_block: Option<BlockNumber>,
2021
pub store: Arc<dyn WritableStore>,
2122
pub debug_fork: Option<Arc<dyn SubgraphFork>>,
2223
pub triggers_adapter: Arc<dyn TriggersAdapter<C>>,
@@ -40,6 +41,7 @@ impl<C: Blockchain> IndexingInputs<C> {
4041
start_blocks,
4142
end_blocks,
4243
stop_block,
44+
max_end_block,
4345
store: _,
4446
debug_fork,
4547
triggers_adapter,
@@ -57,6 +59,7 @@ impl<C: Blockchain> IndexingInputs<C> {
5759
start_blocks: start_blocks.clone(),
5860
end_blocks: end_blocks.clone(),
5961
stop_block: stop_block.clone(),
62+
max_end_block: max_end_block.clone(),
6063
store,
6164
debug_fork: debug_fork.clone(),
6265
triggers_adapter: triggers_adapter.clone(),

core/src/subgraph/instance_manager.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,18 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
331331
})
332332
.collect();
333333

334+
// We can set `max_end_block` to the maximum of `end_blocks` and stop the subgraph
335+
// only when there are no dynamic data sources and no offchain data sources present. This is because:
336+
// - Dynamic data sources do not have a defined `end_block`, so we can't determine
337+
// when to stop processing them.
338+
// - Offchain data sources might require processing beyond the end block of
339+
// onchain data sources, so the subgraph needs to continue.
340+
let max_end_block: Option<BlockNumber> = if data_sources.len() == end_blocks.len() {
341+
end_blocks.iter().max().cloned()
342+
} else {
343+
None
344+
};
345+
334346
let templates = Arc::new(manifest.templates.clone());
335347

336348
// Obtain the debug fork from the subgraph store
@@ -419,6 +431,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
419431
start_blocks,
420432
end_blocks,
421433
stop_block,
434+
max_end_block,
422435
store,
423436
debug_fork,
424437
triggers_adapter,

core/src/subgraph/runner.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,17 @@ where
197197
.unfail_deterministic_error(&current_ptr, &parent_ptr)
198198
.await?;
199199
}
200+
201+
// Stop subgraph when we reach maximum endblock.
202+
if let Some(max_end_block) = self.inputs.max_end_block {
203+
if max_end_block <= current_ptr.block_number() {
204+
info!(self.logger, "Stopping subgraph as we reached maximum endBlock";
205+
"max_end_block" => max_end_block,
206+
"current_block" => current_ptr.block_number());
207+
self.inputs.store.flush().await?;
208+
return Ok(self);
209+
}
210+
}
200211
}
201212

202213
loop {
@@ -837,9 +848,21 @@ where
837848
}
838849
}
839850

840-
if let Some(stop_block) = &self.inputs.stop_block {
841-
if block_ptr.number >= *stop_block {
842-
info!(self.logger, "stop block reached for subgraph");
851+
if let Some(stop_block) = self.inputs.stop_block {
852+
if block_ptr.number >= stop_block {
853+
info!(self.logger, "Stop block reached for subgraph");
854+
return Ok(Action::Stop);
855+
}
856+
}
857+
858+
if let Some(max_end_block) = self.inputs.max_end_block {
859+
if block_ptr.number >= max_end_block {
860+
info!(
861+
self.logger,
862+
"Stopping subgraph as maximum endBlock reached";
863+
"max_end_block" => max_end_block,
864+
"current_block" => block_ptr.number
865+
);
843866
return Ok(Action::Stop);
844867
}
845868
}

graph/src/blockchain/block_stream.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,16 @@ pub enum BlockStreamEvent<C: Blockchain> {
541541
ProcessWasmBlock(BlockPtr, BlockTime, Box<[u8]>, String, FirehoseCursor),
542542
}
543543

544+
impl<C: Blockchain> BlockStreamEvent<C> {
545+
pub fn block_ptr(&self) -> BlockPtr {
546+
match self {
547+
BlockStreamEvent::Revert(ptr, _) => ptr.clone(),
548+
BlockStreamEvent::ProcessBlock(block, _) => block.ptr(),
549+
BlockStreamEvent::ProcessWasmBlock(ptr, _, _, _, _) => ptr.clone(),
550+
}
551+
}
552+
}
553+
544554
impl<C: Blockchain> Clone for BlockStreamEvent<C>
545555
where
546556
C::TriggerData: Clone,

tests/runner-tests/end-block/subgraph.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,24 @@ dataSources:
2323
eventHandlers:
2424
- event: TestEvent(string)
2525
handler: handleTestEvent
26+
file: ./src/mapping.ts
27+
# Datasource without endBlock to keep the subgraph running
28+
- kind: ethereum/contract
29+
name: Contract2
30+
network: test
31+
source:
32+
address: "0x0000000000000000000000000000000000000001"
33+
abi: Contract
34+
mapping:
35+
kind: ethereum/events
36+
apiVersion: 0.0.7
37+
language: wasm/assemblyscript
38+
entities:
39+
- Gravatar
40+
abis:
41+
- name: Contract
42+
file: ./abis/Contract.abi
43+
eventHandlers:
44+
- event: TestEvent(string)
45+
handler: handleTestEvent
2646
file: ./src/mapping.ts

0 commit comments

Comments
 (0)