Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/subgraph/inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct IndexingInputs<C: Blockchain> {
pub start_blocks: Vec<BlockNumber>,
pub end_blocks: BTreeSet<BlockNumber>,
pub stop_block: Option<BlockNumber>,
pub max_end_block: Option<BlockNumber>,
pub store: Arc<dyn WritableStore>,
pub debug_fork: Option<Arc<dyn SubgraphFork>>,
pub triggers_adapter: Arc<dyn TriggersAdapter<C>>,
Expand All @@ -40,6 +41,7 @@ impl<C: Blockchain> IndexingInputs<C> {
start_blocks,
end_blocks,
stop_block,
max_end_block,
store: _,
debug_fork,
triggers_adapter,
Expand All @@ -57,6 +59,7 @@ impl<C: Blockchain> IndexingInputs<C> {
start_blocks: start_blocks.clone(),
end_blocks: end_blocks.clone(),
stop_block: stop_block.clone(),
max_end_block: max_end_block.clone(),
store,
debug_fork: debug_fork.clone(),
triggers_adapter: triggers_adapter.clone(),
Expand Down
14 changes: 14 additions & 0 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,19 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
})
.collect();

// We can set `max_end_block` to the maximum of `end_blocks` and stop the subgraph
// at that point only when there are no dynamic data sources present and offchain
// data sources exist. This is because:
// - Dynamic data sources do not have a defined `end_block`, so we can't determine
// when to stop processing them.
// - Offchain data sources might still require processing beyond the end block of
// onchain data sources, necessitating the continuation of the subgraph.
let max_end_block: Option<BlockNumber> = if data_sources.len() == end_blocks.len() {
end_blocks.iter().max().cloned()
} else {
None
};

let templates = Arc::new(manifest.templates.clone());

// Obtain the debug fork from the subgraph store
Expand Down Expand Up @@ -419,6 +432,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
start_blocks,
end_blocks,
stop_block,
max_end_block,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a stop_block, why not set it instead of a separate max_end_block?

store,
debug_fork,
triggers_adapter,
Expand Down
29 changes: 26 additions & 3 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ where
.unfail_deterministic_error(&current_ptr, &parent_ptr)
.await?;
}

// Stop subgraph when we reach maximum endblock.
if let Some(max_end_block) = self.inputs.max_end_block {
if max_end_block <= current_ptr.block_number() {
info!(self.logger, "Stopping subgraph as we reached maximum endBlock";
"max_end_block" => max_end_block,
"current_block" => current_ptr.block_number());
self.inputs.store.flush().await?;
return Ok(self);
}
}
}

loop {
Expand Down Expand Up @@ -837,9 +848,21 @@ where
}
}

if let Some(stop_block) = &self.inputs.stop_block {
if block_ptr.number >= *stop_block {
info!(self.logger, "stop block reached for subgraph");
if let Some(stop_block) = self.inputs.stop_block {
if block_ptr.number >= stop_block {
info!(self.logger, "Stop block reached for subgraph");
return Ok(Action::Stop);
}
}

if let Some(max_end_block) = self.inputs.max_end_block {
if block_ptr.number >= max_end_block {
info!(
self.logger,
"Stopping subgraph as maximum endBlock reached";
"max_end_block" => max_end_block,
"current_block" => block_ptr.number
);
return Ok(Action::Stop);
}
}
Expand Down
10 changes: 10 additions & 0 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,16 @@ pub enum BlockStreamEvent<C: Blockchain> {
ProcessWasmBlock(BlockPtr, BlockTime, Box<[u8]>, String, FirehoseCursor),
}

impl<C: Blockchain> BlockStreamEvent<C> {
pub fn block_ptr(&self) -> BlockPtr {
match self {
BlockStreamEvent::Revert(ptr, _) => ptr.clone(),
BlockStreamEvent::ProcessBlock(block, _) => block.ptr(),
BlockStreamEvent::ProcessWasmBlock(ptr, _, _, _, _) => ptr.clone(),
}
}
}

impl<C: Blockchain> Clone for BlockStreamEvent<C>
where
C::TriggerData: Clone,
Expand Down
20 changes: 20 additions & 0 deletions tests/runner-tests/end-block/subgraph.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,24 @@ dataSources:
eventHandlers:
- event: TestEvent(string)
handler: handleTestEvent
file: ./src/mapping.ts
# Datasource without endBlock to keep the subgraph running
- kind: ethereum/contract
name: Contract2
network: test
source:
address: "0x0000000000000000000000000000000000000001"
abi: Contract
mapping:
kind: ethereum/events
apiVersion: 0.0.7
language: wasm/assemblyscript
entities:
- Gravatar
abis:
- name: Contract
file: ./abis/Contract.abi
eventHandlers:
- event: TestEvent(string)
handler: handleTestEvent
file: ./src/mapping.ts
Loading