Skip to content

Commit e6cf737

Browse files
core: stop scanning reaching max endBlock
1 parent bc85aa8 commit e6cf737

File tree

4 files changed

+83
-38
lines changed

4 files changed

+83
-38
lines changed

core/src/subgraph/inputs.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct IndexingInputs<C: Blockchain> {
1717
pub start_blocks: Vec<BlockNumber>,
1818
pub end_blocks: BTreeSet<BlockNumber>,
1919
pub stop_block: Option<BlockNumber>,
20+
pub max_end_block: Option<BlockNumber>,
2021
pub store: Arc<dyn WritableStore>,
2122
pub debug_fork: Option<Arc<dyn SubgraphFork>>,
2223
pub triggers_adapter: Arc<dyn TriggersAdapter<C>>,
@@ -40,6 +41,7 @@ impl<C: Blockchain> IndexingInputs<C> {
4041
start_blocks,
4142
end_blocks,
4243
stop_block,
44+
max_end_block,
4345
store: _,
4446
debug_fork,
4547
triggers_adapter,
@@ -57,6 +59,7 @@ impl<C: Blockchain> IndexingInputs<C> {
5759
start_blocks: start_blocks.clone(),
5860
end_blocks: end_blocks.clone(),
5961
stop_block: stop_block.clone(),
62+
max_end_block: max_end_block.clone(),
6063
store,
6164
debug_fork: debug_fork.clone(),
6265
triggers_adapter: triggers_adapter.clone(),

core/src/subgraph/instance_manager.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,13 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
331331
})
332332
.collect();
333333

334+
let max_end_block: Option<BlockNumber> = if manifest.data_sources.len() == end_blocks.len()
335+
{
336+
end_blocks.iter().max().cloned()
337+
} else {
338+
None
339+
};
340+
334341
let templates = Arc::new(manifest.templates.clone());
335342

336343
// Obtain the debug fork from the subgraph store
@@ -419,6 +426,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
419426
start_blocks,
420427
end_blocks,
421428
stop_block,
429+
max_end_block,
422430
store,
423431
debug_fork,
424432
triggers_adapter,

core/src/subgraph/runner.rs

Lines changed: 62 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,17 @@ where
197197
.unfail_deterministic_error(&current_ptr, &parent_ptr)
198198
.await?;
199199
}
200+
201+
// Stop subgraph when we reach maximum endblock.
202+
if let Some(max_end_block) = self.inputs.max_end_block {
203+
if max_end_block < current_ptr.block_number() {
204+
info!(self.logger, "Stopping subgraph as we reached maximum endBlock";
205+
"max_end_block" => max_end_block,
206+
"current_block" => current_ptr.block_number());
207+
self.inputs.store.flush().await?;
208+
return Ok(self);
209+
}
210+
}
200211
}
201212

202213
loop {
@@ -987,44 +998,57 @@ where
987998
cancel_handle: &CancelHandle,
988999
) -> Result<Action, Error> {
9891000
let action = match event {
990-
Some(Ok(BlockStreamEvent::ProcessWasmBlock(
991-
block_ptr,
992-
block_time,
993-
data,
994-
handler,
995-
cursor,
996-
))) => {
997-
let _section = self
998-
.metrics
999-
.stream
1000-
.stopwatch
1001-
.start_section(PROCESS_WASM_BLOCK_SECTION_NAME);
1002-
self.handle_process_wasm_block(
1003-
block_ptr,
1004-
block_time,
1005-
data,
1006-
handler,
1007-
cursor,
1008-
cancel_handle,
1009-
)
1010-
.await?
1011-
}
1012-
Some(Ok(BlockStreamEvent::ProcessBlock(block, cursor))) => {
1013-
let _section = self
1014-
.metrics
1015-
.stream
1016-
.stopwatch
1017-
.start_section(PROCESS_BLOCK_SECTION_NAME);
1018-
self.handle_process_block(block, cursor, cancel_handle)
1019-
.await?
1020-
}
1021-
Some(Ok(BlockStreamEvent::Revert(revert_to_ptr, cursor))) => {
1022-
let _section = self
1023-
.metrics
1024-
.stream
1025-
.stopwatch
1026-
.start_section(HANDLE_REVERT_SECTION_NAME);
1027-
self.handle_revert(revert_to_ptr, cursor).await?
1001+
Some(Ok(event)) => {
1002+
if self.inputs.max_end_block.map_or(false, |max_end_block| {
1003+
event.block_ptr().block_number() > max_end_block
1004+
}) {
1005+
info!(self.logger, "Stopping subgraph as we reached maximum endBlock";
1006+
"max_end_block" => self.inputs.max_end_block,
1007+
"current_block" => event.block_ptr().block_number());
1008+
return Ok(Action::Stop);
1009+
}
1010+
1011+
match event {
1012+
BlockStreamEvent::ProcessWasmBlock(
1013+
block_ptr,
1014+
block_time,
1015+
data,
1016+
handler,
1017+
cursor,
1018+
) => {
1019+
let _section = self
1020+
.metrics
1021+
.stream
1022+
.stopwatch
1023+
.start_section(PROCESS_WASM_BLOCK_SECTION_NAME);
1024+
self.handle_process_wasm_block(
1025+
block_ptr,
1026+
block_time,
1027+
data,
1028+
handler,
1029+
cursor,
1030+
cancel_handle,
1031+
)
1032+
.await?
1033+
}
1034+
BlockStreamEvent::ProcessBlock(block, cursor) => {
1035+
let _section = self
1036+
.metrics
1037+
.stream
1038+
.stopwatch
1039+
.start_section(PROCESS_BLOCK_SECTION_NAME);
1040+
self.handle_process_block(block, cursor, cancel_handle)
1041+
.await?
1042+
}
1043+
BlockStreamEvent::Revert(revert_to_ptr, cursor) => {
1044+
let _section = self
1045+
.metrics
1046+
.stream
1047+
.stopwatch
1048+
.start_section(HANDLE_REVERT_SECTION_NAME);
1049+
self.handle_revert(revert_to_ptr, cursor).await?
1050+
}
1051+
}
10281052
}
10291053
// Log and drop the errors from the block_stream
10301054
// The block stream will continue attempting to produce blocks

graph/src/blockchain/block_stream.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,16 @@ pub enum BlockStreamEvent<C: Blockchain> {
541541
ProcessWasmBlock(BlockPtr, BlockTime, Box<[u8]>, String, FirehoseCursor),
542542
}
543543

544+
impl<C: Blockchain> BlockStreamEvent<C> {
545+
pub fn block_ptr(&self) -> BlockPtr {
546+
match self {
547+
BlockStreamEvent::Revert(ptr, _) => ptr.clone(),
548+
BlockStreamEvent::ProcessBlock(block, _) => block.ptr(),
549+
BlockStreamEvent::ProcessWasmBlock(ptr, _, _, _, _) => ptr.clone(),
550+
}
551+
}
552+
}
553+
544554
impl<C: Blockchain> Clone for BlockStreamEvent<C>
545555
where
546556
C::TriggerData: Clone,

0 commit comments

Comments
 (0)