Skip to content

Commit 1b7e7b0

Browse files
authored
refactor stream (#3085)
1 parent 80d079b commit 1b7e7b0

File tree

6 files changed

+158
-135
lines changed

6 files changed

+158
-135
lines changed

chain/ethereum/src/chain.rs

Lines changed: 92 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -119,97 +119,6 @@ impl Chain {
119119
is_ingestible,
120120
}
121121
}
122-
123-
async fn new_polling_block_stream(
124-
&self,
125-
deployment: DeploymentLocator,
126-
start_blocks: Vec<BlockNumber>,
127-
adapter: Arc<TriggersAdapter>,
128-
filter: Arc<TriggerFilter>,
129-
metrics: Arc<BlockStreamMetrics>,
130-
unified_api_version: UnifiedMappingApiVersion,
131-
) -> Result<Box<dyn BlockStream<Self>>, Error> {
132-
let logger = self
133-
.logger_factory
134-
.subgraph_logger(&deployment)
135-
.new(o!("component" => "BlockStream"));
136-
let chain_store = self.chain_store().clone();
137-
let writable = self
138-
.subgraph_store
139-
.cheap_clone()
140-
.writable(logger.clone(), deployment.id)
141-
.await
142-
.with_context(|| format!("no store for deployment `{}`", deployment.hash))?;
143-
let chain_head_update_stream = self
144-
.chain_head_update_listener
145-
.subscribe(self.name.clone(), logger.clone());
146-
147-
// Special case: Detect Celo and set the threshold to 0, so that eth_getLogs is always used.
148-
// This is ok because Celo blocks are always final. And we _need_ to do this because
149-
// some events appear only in eth_getLogs but not in transaction receipts.
150-
// See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50.
151-
let chain_id = self.eth_adapters.cheapest().unwrap().chain_id().await?;
152-
let reorg_threshold = match CELO_CHAIN_IDS.contains(&chain_id) {
153-
false => self.reorg_threshold,
154-
true => 0,
155-
};
156-
157-
let start_block = writable.block_ptr()?;
158-
159-
Ok(Box::new(PollingBlockStream::new(
160-
writable,
161-
chain_store,
162-
chain_head_update_stream,
163-
adapter,
164-
self.node_id.clone(),
165-
deployment.hash,
166-
filter,
167-
start_blocks,
168-
reorg_threshold,
169-
logger,
170-
metrics,
171-
*MAX_BLOCK_RANGE_SIZE,
172-
*TARGET_TRIGGERS_PER_BLOCK_RANGE,
173-
unified_api_version,
174-
start_block,
175-
)))
176-
}
177-
178-
async fn new_firehose_block_stream(
179-
&self,
180-
deployment: DeploymentLocator,
181-
start_blocks: Vec<BlockNumber>,
182-
adapter: Arc<TriggersAdapter>,
183-
filter: Arc<TriggerFilter>,
184-
) -> Result<Box<dyn BlockStream<Self>>, Error> {
185-
let firehose_endpoint = match self.firehose_endpoints.random() {
186-
Some(e) => e.clone(),
187-
None => return Err(anyhow::format_err!("no firehose endpoint available",)),
188-
};
189-
190-
let logger = self
191-
.logger_factory
192-
.subgraph_logger(&deployment)
193-
.new(o!("component" => "FirehoseBlockStream"));
194-
195-
let firehose_mapper = Arc::new(FirehoseMapper {});
196-
let firehose_cursor = self
197-
.subgraph_store
198-
.cheap_clone()
199-
.writable(logger.clone(), deployment.id)
200-
.await?
201-
.block_cursor()?;
202-
203-
Ok(Box::new(FirehoseBlockStream::new(
204-
firehose_endpoint,
205-
firehose_cursor,
206-
firehose_mapper,
207-
adapter,
208-
filter,
209-
start_blocks,
210-
logger,
211-
)))
212-
}
213122
}
214123

215124
#[async_trait]
@@ -279,11 +188,12 @@ impl Blockchain for Chain {
279188
Ok(Arc::new(adapter))
280189
}
281190

282-
async fn new_block_stream(
191+
async fn new_firehose_block_stream(
283192
&self,
284193
deployment: DeploymentLocator,
285194
start_blocks: Vec<BlockNumber>,
286-
filter: Arc<TriggerFilter>,
195+
firehose_cursor: Option<String>,
196+
filter: Arc<Self::TriggerFilter>,
287197
metrics: Arc<BlockStreamMetrics>,
288198
unified_api_version: UnifiedMappingApiVersion,
289199
) -> Result<Box<dyn BlockStream<Self>>, Error> {
@@ -300,20 +210,93 @@ impl Blockchain for Chain {
300210
self.name, requirements
301211
));
302212

303-
if self.firehose_endpoints.len() > 0 {
304-
self.new_firehose_block_stream(deployment, start_blocks, adapter, filter)
305-
.await
306-
} else {
307-
self.new_polling_block_stream(
308-
deployment,
309-
start_blocks,
310-
adapter,
311-
filter,
312-
metrics,
313-
unified_api_version,
213+
let firehose_endpoint = match self.firehose_endpoints.random() {
214+
Some(e) => e.clone(),
215+
None => return Err(anyhow::format_err!("no firehose endpoint available",)),
216+
};
217+
218+
let logger = self
219+
.logger_factory
220+
.subgraph_logger(&deployment)
221+
.new(o!("component" => "FirehoseBlockStream"));
222+
223+
let firehose_mapper = Arc::new(FirehoseMapper {});
224+
225+
Ok(Box::new(FirehoseBlockStream::new(
226+
firehose_endpoint,
227+
firehose_cursor,
228+
firehose_mapper,
229+
adapter,
230+
filter,
231+
start_blocks,
232+
logger,
233+
)))
234+
}
235+
236+
async fn new_polling_block_stream(
237+
&self,
238+
deployment: DeploymentLocator,
239+
start_blocks: Vec<BlockNumber>,
240+
subgraph_start_block: Option<BlockPtr>,
241+
filter: Arc<Self::TriggerFilter>,
242+
metrics: Arc<BlockStreamMetrics>,
243+
unified_api_version: UnifiedMappingApiVersion,
244+
) -> Result<Box<dyn BlockStream<Self>>, Error> {
245+
let requirements = filter.node_capabilities();
246+
let adapter = self
247+
.triggers_adapter(
248+
&deployment,
249+
&requirements,
250+
unified_api_version.clone(),
251+
metrics.stopwatch.clone(),
314252
)
253+
.expect(&format!(
254+
"no adapter for network {} with capabilities {}",
255+
self.name, requirements
256+
));
257+
258+
let logger = self
259+
.logger_factory
260+
.subgraph_logger(&deployment)
261+
.new(o!("component" => "BlockStream"));
262+
let chain_store = self.chain_store().clone();
263+
let writable = self
264+
.subgraph_store
265+
.cheap_clone()
266+
.writable(logger.clone(), deployment.id)
315267
.await
316-
}
268+
.with_context(|| format!("no store for deployment `{}`", deployment.hash))?;
269+
let chain_head_update_stream = self
270+
.chain_head_update_listener
271+
.subscribe(self.name.clone(), logger.clone());
272+
273+
// Special case: Detect Celo and set the threshold to 0, so that eth_getLogs is always used.
274+
// This is ok because Celo blocks are always final. And we _need_ to do this because
275+
// some events appear only in eth_getLogs but not in transaction receipts.
276+
// See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50.
277+
let chain_id = self.eth_adapters.cheapest().unwrap().chain_id().await?;
278+
let reorg_threshold = match CELO_CHAIN_IDS.contains(&chain_id) {
279+
false => self.reorg_threshold,
280+
true => 0,
281+
};
282+
283+
Ok(Box::new(PollingBlockStream::new(
284+
writable,
285+
chain_store,
286+
chain_head_update_stream,
287+
adapter,
288+
self.node_id.clone(),
289+
deployment.hash,
290+
filter,
291+
start_blocks,
292+
reorg_threshold,
293+
logger,
294+
metrics,
295+
*MAX_BLOCK_RANGE_SIZE,
296+
*TARGET_TRIGGERS_PER_BLOCK_RANGE,
297+
unified_api_version,
298+
subgraph_start_block,
299+
)))
317300
}
318301

319302
fn ingestor_adapter(&self) -> Arc<Self::IngestorAdapter> {
@@ -365,6 +348,10 @@ impl Blockchain for Chain {
365348
call_cache: self.call_cache.cheap_clone(),
366349
})
367350
}
351+
352+
fn is_firehose_supported(&self) -> bool {
353+
self.firehose_endpoints.len() > 0
354+
}
368355
}
369356

370357
/// This is used in `EthereumAdapter::triggers_in_block`, called when re-processing a block for

chain/near/src/chain.rs

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ use graph::{
1616
components::store::DeploymentLocator,
1717
firehose::bstream,
1818
log::factory::{ComponentLoggerConfig, ElasticComponentLoggerConfig},
19-
prelude::{
20-
async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory, SubgraphStore,
21-
},
19+
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
2220
};
2321
use prost::Message;
2422
use std::sync::Arc;
@@ -39,7 +37,6 @@ pub struct Chain {
3937
name: String,
4038
firehose_endpoints: Arc<FirehoseNetworkEndpoints>,
4139
chain_store: Arc<dyn ChainStore>,
42-
subgraph_store: Arc<dyn SubgraphStore>,
4340
}
4441

4542
impl std::fmt::Debug for Chain {
@@ -53,15 +50,13 @@ impl Chain {
5350
logger_factory: LoggerFactory,
5451
name: String,
5552
chain_store: Arc<dyn ChainStore>,
56-
subgraph_store: Arc<dyn SubgraphStore>,
5753
firehose_endpoints: FirehoseNetworkEndpoints,
5854
) -> Self {
5955
Chain {
6056
logger_factory,
6157
name,
6258
firehose_endpoints: Arc::new(firehose_endpoints),
6359
chain_store,
64-
subgraph_store,
6560
}
6661
}
6762
}
@@ -105,11 +100,12 @@ impl Blockchain for Chain {
105100
Ok(Arc::new(adapter))
106101
}
107102

108-
async fn new_block_stream(
103+
async fn new_firehose_block_stream(
109104
&self,
110105
deployment: DeploymentLocator,
111106
start_blocks: Vec<BlockNumber>,
112-
filter: Arc<TriggerFilter>,
107+
firehose_cursor: Option<String>,
108+
filter: Arc<Self::TriggerFilter>,
113109
metrics: Arc<BlockStreamMetrics>,
114110
unified_api_version: UnifiedMappingApiVersion,
115111
) -> Result<Box<dyn BlockStream<Self>>, Error> {
@@ -133,12 +129,6 @@ impl Blockchain for Chain {
133129
.new(o!("component" => "FirehoseBlockStream"));
134130

135131
let firehose_mapper = Arc::new(FirehoseMapper {});
136-
let firehose_cursor = self
137-
.subgraph_store
138-
.cheap_clone()
139-
.writable(logger.clone(), deployment.id)
140-
.await?
141-
.block_cursor()?;
142132

143133
Ok(Box::new(FirehoseBlockStream::new(
144134
firehose_endpoint,
@@ -151,6 +141,18 @@ impl Blockchain for Chain {
151141
)))
152142
}
153143

144+
async fn new_polling_block_stream(
145+
&self,
146+
_deployment: DeploymentLocator,
147+
_start_blocks: Vec<BlockNumber>,
148+
_subgraph_start_block: Option<BlockPtr>,
149+
_filter: Arc<Self::TriggerFilter>,
150+
_metrics: Arc<BlockStreamMetrics>,
151+
_unified_api_version: UnifiedMappingApiVersion,
152+
) -> Result<Box<dyn BlockStream<Self>>, Error> {
153+
panic!("NEAR does not support polling block stream")
154+
}
155+
154156
fn ingestor_adapter(&self) -> Arc<Self::IngestorAdapter> {
155157
let logger = self
156158
.logger_factory
@@ -187,6 +189,10 @@ impl Blockchain for Chain {
187189
fn runtime_adapter(&self) -> Arc<Self::RuntimeAdapter> {
188190
Arc::new(RuntimeAdapter {})
189191
}
192+
193+
fn is_firehose_supported(&self) -> bool {
194+
true
195+
}
190196
}
191197

192198
pub struct TriggersAdapter {}

core/src/subgraph/instance_manager.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -480,19 +480,37 @@ where
480480

481481
let block_stream_canceler = CancelGuard::new();
482482
let block_stream_cancel_handle = block_stream_canceler.handle();
483-
let mut block_stream = ctx
484-
.inputs
485-
.chain
486-
.new_block_stream(
487-
ctx.inputs.deployment.clone(),
488-
ctx.inputs.start_blocks.clone(),
489-
Arc::new(ctx.state.filter.clone()),
490-
ctx.block_stream_metrics.clone(),
491-
ctx.inputs.unified_api_version.clone(),
492-
)
493-
.await?
494-
.map_err(CancelableError::Error)
495-
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
483+
let chain = ctx.inputs.chain.clone();
484+
485+
let mut block_stream = match chain.is_firehose_supported() {
486+
true => {
487+
let firehose_cursor = ctx.inputs.store.block_cursor()?;
488+
489+
chain.new_firehose_block_stream(
490+
ctx.inputs.deployment.clone(),
491+
ctx.inputs.start_blocks.clone(),
492+
firehose_cursor,
493+
Arc::new(ctx.state.filter.clone()),
494+
ctx.block_stream_metrics.clone(),
495+
ctx.inputs.unified_api_version.clone(),
496+
)
497+
}
498+
false => {
499+
let start_block = ctx.inputs.store.block_ptr()?;
500+
501+
chain.new_polling_block_stream(
502+
ctx.inputs.deployment.clone(),
503+
ctx.inputs.start_blocks.clone(),
504+
start_block,
505+
Arc::new(ctx.state.filter.clone()),
506+
ctx.block_stream_metrics.clone(),
507+
ctx.inputs.unified_api_version.clone(),
508+
)
509+
}
510+
}
511+
.await?
512+
.map_err(CancelableError::Error)
513+
.cancelable(&block_stream_canceler, || Err(CancelableError::Cancel));
496514

497515
// Keep the stream's cancel guard around to be able to shut it down
498516
// when the subgraph deployment is unassigned

0 commit comments

Comments
 (0)