Skip to content

Commit 3568cb3

Browse files
committed
all: Remove 'wasm block' processing
This was only used for substreams
1 parent 500a8ae commit 3568cb3

File tree

8 files changed

+8
-342
lines changed

8 files changed

+8
-342
lines changed

core/src/subgraph/context/instance/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,4 @@ where
255255
pub fn hosts_len(&self) -> usize {
256256
self.onchain_hosts.len() + self.offchain_hosts.len()
257257
}
258-
259-
pub fn first_host(&self) -> Option<&Arc<T::Host>> {
260-
self.onchain_hosts.hosts().first()
261-
}
262258
}

core/src/subgraph/context/mod.rs

Lines changed: 5 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,8 @@ use crate::polling_monitor::{
66
use anyhow::{self, Error};
77
use bytes::Bytes;
88
use graph::{
9-
blockchain::{BlockTime, Blockchain, TriggerFilterWrapper},
10-
components::{
11-
store::{DeploymentId, SubgraphFork},
12-
subgraph::{HostMetrics, MappingError, RuntimeHost as _, SharedProofOfIndexing},
13-
},
9+
blockchain::{Blockchain, TriggerFilterWrapper},
10+
components::{store::DeploymentId, subgraph::HostMetrics},
1411
data::subgraph::SubgraphManifest,
1512
data_source::{
1613
causality_region::CausalityRegionSeq,
@@ -20,14 +17,13 @@ use graph::{
2017
derive::CheapClone,
2118
ipfs::IpfsContext,
2219
prelude::{
23-
BlockNumber, BlockPtr, BlockState, CancelGuard, CheapClone, DeploymentHash,
24-
MetricsRegistry, RuntimeHostBuilder, SubgraphCountMetric, SubgraphInstanceMetrics,
25-
TriggerProcessor,
20+
BlockNumber, CancelGuard, CheapClone, DeploymentHash, MetricsRegistry, RuntimeHostBuilder,
21+
SubgraphCountMetric, TriggerProcessor,
2622
},
2723
slog::Logger,
2824
};
25+
use std::collections::HashMap;
2926
use std::sync::{Arc, RwLock};
30-
use std::{collections::HashMap, time::Instant};
3127
use tokio::sync::mpsc;
3228

3329
use self::instance::SubgraphInstance;
@@ -109,59 +105,6 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
109105
}
110106
}
111107

112-
pub async fn process_block(
113-
&self,
114-
logger: &Logger,
115-
block_ptr: BlockPtr,
116-
block_time: BlockTime,
117-
block_data: Box<[u8]>,
118-
handler: String,
119-
mut state: BlockState,
120-
proof_of_indexing: &SharedProofOfIndexing,
121-
causality_region: &str,
122-
debug_fork: &Option<Arc<dyn SubgraphFork>>,
123-
subgraph_metrics: &Arc<SubgraphInstanceMetrics>,
124-
instrument: bool,
125-
) -> Result<BlockState, MappingError> {
126-
let error_count = state.deterministic_errors.len();
127-
128-
proof_of_indexing.start_handler(causality_region);
129-
130-
let start = Instant::now();
131-
132-
// This flow is expected to have a single data source(and a corresponding host) which
133-
// gets executed every block.
134-
state = self
135-
.instance
136-
.first_host()
137-
.expect("Expected this flow to have exactly one host")
138-
.process_block(
139-
logger,
140-
block_ptr,
141-
block_time,
142-
block_data,
143-
handler,
144-
state,
145-
proof_of_indexing.cheap_clone(),
146-
debug_fork,
147-
instrument,
148-
)
149-
.await?;
150-
151-
let elapsed = start.elapsed().as_secs_f64();
152-
subgraph_metrics.observe_trigger_processing_duration(elapsed);
153-
154-
if state.deterministic_errors.len() != error_count {
155-
assert!(state.deterministic_errors.len() == error_count + 1);
156-
157-
// If a deterministic error has happened, write a new
158-
// ProofOfIndexingEvent::DeterministicError to the SharedProofOfIndexing.
159-
proof_of_indexing.write_deterministic_error(logger, causality_region);
160-
}
161-
162-
Ok(state)
163-
}
164-
165108
/// Removes data sources hosts with a creation block greater or equal to `reverted_block`, so
166109
/// that they are no longer candidates for `process_trigger`.
167110
///

core/src/subgraph/runner.rs

Lines changed: 0 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ const MINUTE: Duration = Duration::from_secs(60);
4747
const SKIP_PTR_UPDATES_THRESHOLD: Duration = Duration::from_secs(60 * 5);
4848
const HANDLE_REVERT_SECTION_NAME: &str = "handle_revert";
4949
const PROCESS_BLOCK_SECTION_NAME: &str = "process_block";
50-
const PROCESS_WASM_BLOCK_SECTION_NAME: &str = "process_wasm_block";
5150
const PROCESS_TRIGGERS_SECTION_NAME: &str = "process_triggers";
5251
const HANDLE_CREATED_DS_SECTION_NAME: &str = "handle_new_data_sources";
5352

@@ -869,37 +868,6 @@ where
869868
Ok(Arc::new(block))
870869
}
871870

872-
async fn process_wasm_block(
873-
&mut self,
874-
proof_of_indexing: &SharedProofOfIndexing,
875-
block_ptr: BlockPtr,
876-
block_time: BlockTime,
877-
block_data: Box<[u8]>,
878-
handler: String,
879-
causality_region: &str,
880-
) -> Result<BlockState, MappingError> {
881-
let block_state = BlockState::new(
882-
self.inputs.store.clone(),
883-
std::mem::take(&mut self.state.entity_lfu_cache),
884-
);
885-
886-
self.ctx
887-
.process_block(
888-
&self.logger,
889-
block_ptr,
890-
block_time,
891-
block_data,
892-
handler,
893-
block_state,
894-
proof_of_indexing,
895-
causality_region,
896-
&self.inputs.debug_fork,
897-
&self.metrics.subgraph,
898-
self.inputs.instrument,
899-
)
900-
.await
901-
}
902-
903871
fn create_dynamic_data_sources(
904872
&mut self,
905873
created_data_sources: Vec<InstanceDSTemplateInfo>,
@@ -1171,20 +1139,6 @@ where
11711139
) -> Result<Action, Error> {
11721140
let stopwatch = &self.metrics.stream.stopwatch;
11731141
let action = match event {
1174-
Some(Ok(BlockStreamEvent::ProcessWasmBlock(
1175-
block_ptr,
1176-
block_time,
1177-
data,
1178-
handler,
1179-
cursor,
1180-
))) => {
1181-
let _section = stopwatch.start_section(PROCESS_WASM_BLOCK_SECTION_NAME);
1182-
let res = self
1183-
.handle_process_wasm_block(block_ptr.clone(), block_time, data, handler, cursor)
1184-
.await;
1185-
let start = Instant::now();
1186-
self.handle_action(start, block_ptr, res).await?
1187-
}
11881142
Some(Ok(BlockStreamEvent::ProcessBlock(block, cursor))) => {
11891143
let _section = stopwatch.start_section(PROCESS_BLOCK_SECTION_NAME);
11901144
self.handle_process_block(block, cursor).await?
@@ -1335,82 +1289,6 @@ where
13351289
C: Blockchain,
13361290
T: RuntimeHostBuilder<C>,
13371291
{
1338-
async fn handle_process_wasm_block(
1339-
&mut self,
1340-
block_ptr: BlockPtr,
1341-
block_time: BlockTime,
1342-
block_data: Box<[u8]>,
1343-
handler: String,
1344-
cursor: FirehoseCursor,
1345-
) -> Result<Action, ProcessingError> {
1346-
let logger = self.logger.new(o!(
1347-
"block_number" => format!("{:?}", block_ptr.number),
1348-
"block_hash" => format!("{}", block_ptr.hash)
1349-
));
1350-
1351-
debug!(logger, "Start processing wasm block";);
1352-
1353-
self.metrics
1354-
.stream
1355-
.deployment_head
1356-
.set(block_ptr.number as f64);
1357-
1358-
let proof_of_indexing =
1359-
SharedProofOfIndexing::new(block_ptr.number, self.inputs.poi_version);
1360-
1361-
// Causality region for onchain triggers.
1362-
let causality_region = PoICausalityRegion::from_network(&self.inputs.network);
1363-
1364-
let block_state = {
1365-
match self
1366-
.process_wasm_block(
1367-
&proof_of_indexing,
1368-
block_ptr.clone(),
1369-
block_time,
1370-
block_data,
1371-
handler,
1372-
&causality_region,
1373-
)
1374-
.await
1375-
{
1376-
// Triggers processed with no errors or with only deterministic errors.
1377-
Ok(block_state) => block_state,
1378-
1379-
// Some form of unknown or non-deterministic error ocurred.
1380-
Err(MappingError::Unknown(e)) => return Err(ProcessingError::Unknown(e).into()),
1381-
Err(MappingError::PossibleReorg(e)) => {
1382-
info!(logger,
1383-
"Possible reorg detected, retrying";
1384-
"error" => format!("{:#}", e),
1385-
);
1386-
1387-
// In case of a possible reorg, we want this function to do nothing and restart the
1388-
// block stream so it has a chance to detect the reorg.
1389-
//
1390-
// The state is unchanged at this point, except for having cleared the entity cache.
1391-
// Losing the cache is a bit annoying but not an issue for correctness.
1392-
//
1393-
// See also b21fa73b-6453-4340-99fb-1a78ec62efb1.
1394-
return Ok(Action::Restart);
1395-
}
1396-
}
1397-
};
1398-
1399-
self.transact_block_state(
1400-
&logger,
1401-
block_ptr.clone(),
1402-
cursor.clone(),
1403-
block_time,
1404-
block_state,
1405-
proof_of_indexing,
1406-
vec![],
1407-
vec![],
1408-
)
1409-
.await?;
1410-
1411-
Ok(Action::Continue)
1412-
}
1413-
14141292
async fn handle_process_block(
14151293
&mut self,
14161294
block: BlockWithTriggers<C>,

graph/src/blockchain/block_stream.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::sync::Arc;
1010
use thiserror::Error;
1111
use tokio::sync::mpsc::{self, Receiver, Sender};
1212

13-
use super::{Block, BlockPtr, BlockTime, Blockchain, Trigger, TriggerFilterWrapper};
13+
use super::{Block, BlockPtr, Blockchain, Trigger, TriggerFilterWrapper};
1414
use crate::anyhow::Result;
1515
use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore};
1616
use crate::data::subgraph::UnifiedMappingApiVersion;
@@ -721,7 +721,6 @@ pub enum BlockStreamEvent<C: Blockchain> {
721721
Revert(BlockPtr, FirehoseCursor),
722722

723723
ProcessBlock(BlockWithTriggers<C>, FirehoseCursor),
724-
ProcessWasmBlock(BlockPtr, BlockTime, Box<[u8]>, String, FirehoseCursor),
725724
}
726725

727726
#[derive(Clone)]

graph/src/components/subgraph/host.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use anyhow::Error;
66
use async_trait::async_trait;
77
use futures01::sync::mpsc;
88

9-
use crate::blockchain::BlockTime;
109
use crate::components::metrics::gas::GasMetrics;
1110
use crate::components::store::SubgraphFork;
1211
use crate::data_source::{
@@ -71,19 +70,6 @@ pub trait RuntimeHost<C: Blockchain>: Send + Sync + 'static {
7170
logger: &Logger,
7271
) -> Result<Option<TriggerWithHandler<MappingTrigger<C>>>, Error>;
7372

74-
async fn process_block(
75-
&self,
76-
logger: &Logger,
77-
block_ptr: BlockPtr,
78-
block_time: BlockTime,
79-
block_data: Box<[u8]>,
80-
handler: String,
81-
state: BlockState,
82-
proof_of_indexing: SharedProofOfIndexing,
83-
debug_fork: &Option<Arc<dyn SubgraphFork>>,
84-
instrument: bool,
85-
) -> Result<BlockState, MappingError>;
86-
8773
async fn process_mapping_trigger(
8874
&self,
8975
logger: &Logger,

0 commit comments

Comments
 (0)