Skip to content

Commit 80d079b

Browse files
Matthieu Vachonmaoueh
andauthored
Replace specialized firehose_triggers_in_block by direct usage of TriggerAdapter::triggers_in_blocks (#3082)
With the revamped `FirehoseBlockStream`, it's now possible to directly re-use the `TriggerAdapter::triggers_in_blocks` code instead of relying on our specialized `firehose_triggers_in_block`. This make the code more re-usable. Eliminated at the same time a useless close in `NearChair::TriggersAdapter`. Also make some `NearChain` now panics instead of silently returning something that is not good. Linked to #3080 Co-authored-by: Matthieu Vachon <[email protected]>
1 parent bd7c3d7 commit 80d079b

File tree

4 files changed

+63
-111
lines changed

4 files changed

+63
-111
lines changed

chain/ethereum/src/chain.rs

Lines changed: 9 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,6 @@ impl Chain {
181181
start_blocks: Vec<BlockNumber>,
182182
adapter: Arc<TriggersAdapter>,
183183
filter: Arc<TriggerFilter>,
184-
_metrics: Arc<BlockStreamMetrics>,
185-
_unified_api_version: UnifiedMappingApiVersion,
186184
) -> Result<Box<dyn BlockStream<Self>>, Error> {
187185
let firehose_endpoint = match self.firehose_endpoints.random() {
188186
Some(e) => e.clone(),
@@ -303,15 +301,8 @@ impl Blockchain for Chain {
303301
));
304302

305303
if self.firehose_endpoints.len() > 0 {
306-
self.new_firehose_block_stream(
307-
deployment,
308-
start_blocks,
309-
adapter,
310-
filter,
311-
metrics,
312-
unified_api_version,
313-
)
314-
.await
304+
self.new_firehose_block_stream(deployment, start_blocks, adapter, filter)
305+
.await
315306
} else {
316307
self.new_polling_block_stream(
317308
deployment,
@@ -579,12 +570,13 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
579570

580571
pub struct FirehoseMapper {}
581572

573+
#[async_trait]
582574
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
583-
fn to_block_stream_event(
575+
async fn to_block_stream_event(
584576
&self,
585-
_logger: &Logger,
577+
logger: &Logger,
586578
response: &bstream::BlockResponseV2,
587-
_adapter: &TriggersAdapter,
579+
adapter: &TriggersAdapter,
588580
filter: &TriggerFilter,
589581
) -> Result<BlockStreamEvent<Chain>, FirehoseError> {
590582
let step = bstream::ForkStep::from_i32(response.step).unwrap_or_else(|| {
@@ -610,8 +602,9 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
610602
match step {
611603
bstream::ForkStep::StepNew => {
612604
let ethereum_block: EthereumBlockWithCalls = (&block).into();
613-
let block_with_triggers =
614-
self.firehose_triggers_in_block(ethereum_block, filter)?;
605+
let block_with_triggers = adapter
606+
.triggers_in_block(logger, BlockFinality::NonFinal(ethereum_block), filter)
607+
.await?;
615608

616609
Ok(BlockStreamEvent::ProcessBlock(
617610
block_with_triggers,
@@ -642,29 +635,6 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
642635
}
643636
}
644637

645-
impl FirehoseMapper {
646-
// FIXME: This should be replaced by using the `TriggersAdapter` struct directly. However, the TriggersAdapter trait
647-
// is async. It's actual async usage is done inside a manual `poll` implementation in `firehose_block_stream#poll_next`
648-
// value. An upcoming improvement will be to remove this `poll_next`. Once the refactor occurs, this should be
649-
// removed and TriggersAdapter::triggers_in_block should be use straight.
650-
fn firehose_triggers_in_block(
651-
&self,
652-
block: EthereumBlockWithCalls,
653-
filter: &TriggerFilter,
654-
) -> Result<BlockWithTriggers<Chain>, FirehoseError> {
655-
let mut triggers = Vec::new();
656-
657-
triggers.append(&mut parse_log_triggers(&filter.log, &block.ethereum_block));
658-
triggers.append(&mut parse_call_triggers(&filter.call, &block)?);
659-
triggers.append(&mut parse_block_triggers(filter.block.clone(), &block));
660-
661-
Ok(BlockWithTriggers::new(
662-
BlockFinality::NonFinal(block),
663-
triggers,
664-
))
665-
}
666-
}
667-
668638
pub struct IngestorAdapter {
669639
logger: Logger,
670640
ancestor_count: i32,

chain/near/src/chain.rs

Lines changed: 51 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -199,33 +199,69 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
199199
_to: BlockNumber,
200200
_filter: &TriggerFilter,
201201
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
202-
// FIXME (NEAR): Scanning triggers makes little sense in Firehose approach, let's see
203-
Ok(vec![])
202+
panic!("Should never be called since not used by FirehoseBlockStream")
204203
}
205204

206205
async fn triggers_in_block(
207206
&self,
208207
_logger: &Logger,
209-
_block: codec::Block,
208+
block: codec::Block,
210209
_filter: &TriggerFilter,
211210
) -> Result<BlockWithTriggers<Chain>, Error> {
212-
// FIXME (NEAR): Share implementation with FirehoseMapper::firehose_triggers_in_block version.
213-
// This is currently unreachable since Near does not yet support dynamic data sources.
214-
todo!()
211+
// TODO: Find the best place to introduce an `Arc` and avoid this clone.
212+
let shared_block = Arc::new(block.clone());
213+
214+
// Filter non-successful or non-action receipts.
215+
let receipts = block.shards.iter().flat_map(|shard| {
216+
shard
217+
.receipt_execution_outcomes
218+
.iter()
219+
.filter_map(|outcome| {
220+
if !outcome
221+
.execution_outcome
222+
.as_ref()?
223+
.outcome
224+
.as_ref()?
225+
.status
226+
.as_ref()?
227+
.is_success()
228+
{
229+
return None;
230+
}
231+
if !matches!(
232+
outcome.receipt.as_ref()?.receipt,
233+
Some(codec::receipt::Receipt::Action(_))
234+
) {
235+
return None;
236+
}
237+
238+
Some(trigger::ReceiptWithOutcome {
239+
outcome: outcome.execution_outcome.as_ref()?.clone(),
240+
receipt: outcome.receipt.as_ref()?.clone(),
241+
block: shared_block.cheap_clone(),
242+
})
243+
})
244+
});
245+
246+
let mut trigger_data: Vec<_> = receipts
247+
.map(|r| NearTrigger::Receipt(Arc::new(r)))
248+
.collect();
249+
250+
trigger_data.push(NearTrigger::Block(shared_block.cheap_clone()));
251+
252+
Ok(BlockWithTriggers::new(block, trigger_data))
215253
}
216254

217255
async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<bool, Error> {
218-
// FIXME (NEAR): Might not be necessary for NEAR support for now
219-
Ok(true)
256+
panic!("Should never be called since not used by FirehoseBlockStream")
220257
}
221258

222259
fn ancestor_block(
223260
&self,
224261
_ptr: BlockPtr,
225262
_offset: BlockNumber,
226263
) -> Result<Option<codec::Block>, Error> {
227-
// FIXME (NEAR): Might not be necessary for NEAR support for now
228-
Ok(None)
264+
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
229265
}
230266

231267
/// Panics if `block` is genesis.
@@ -241,12 +277,13 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
241277

242278
pub struct FirehoseMapper {}
243279

280+
#[async_trait]
244281
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
245-
fn to_block_stream_event(
282+
async fn to_block_stream_event(
246283
&self,
247-
_logger: &Logger,
284+
logger: &Logger,
248285
response: &bstream::BlockResponseV2,
249-
_adapter: &TriggersAdapter,
286+
adapter: &TriggersAdapter,
250287
filter: &TriggerFilter,
251288
) -> Result<BlockStreamEvent<Chain>, FirehoseError> {
252289
let step = bstream::ForkStep::from_i32(response.step).unwrap_or_else(|| {
@@ -271,7 +308,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
271308

272309
match step {
273310
bstream::ForkStep::StepNew => Ok(BlockStreamEvent::ProcessBlock(
274-
self.firehose_triggers_in_block(&block, filter)?,
311+
adapter.triggers_in_block(logger, block, filter).await?,
275312
Some(response.cursor.clone()),
276313
)),
277314

@@ -299,62 +336,6 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
299336
}
300337
}
301338

302-
impl FirehoseMapper {
303-
// FIXME: This should be replaced by using the `TriggersAdapter` struct directly. However, the TriggersAdapter trait
304-
// is async. It's actual async usage is done inside a manual `poll` implementation in `firehose_block_stream#poll_next`
305-
// value. An upcoming improvement will be to remove this `poll_next`. Once the refactor occurs, this should be
306-
// removed and TriggersAdapter::triggers_in_block should be use straight.
307-
fn firehose_triggers_in_block(
308-
&self,
309-
block: &codec::Block,
310-
_filter: &TriggerFilter,
311-
) -> Result<BlockWithTriggers<Chain>, FirehoseError> {
312-
// TODO: Find the best place to introduce an `Arc` and avoid this clone.
313-
let block = Arc::new(block.clone());
314-
315-
// Filter non-successful or non-action receipts.
316-
let receipts = block.shards.iter().flat_map(|shard| {
317-
shard
318-
.receipt_execution_outcomes
319-
.iter()
320-
.filter_map(|outcome| {
321-
if !outcome
322-
.execution_outcome
323-
.as_ref()?
324-
.outcome
325-
.as_ref()?
326-
.status
327-
.as_ref()?
328-
.is_success()
329-
{
330-
return None;
331-
}
332-
if !matches!(
333-
outcome.receipt.as_ref()?.receipt,
334-
Some(codec::receipt::Receipt::Action(_))
335-
) {
336-
return None;
337-
}
338-
339-
Some(trigger::ReceiptWithOutcome {
340-
outcome: outcome.execution_outcome.as_ref()?.clone(),
341-
receipt: outcome.receipt.as_ref()?.clone(),
342-
block: block.cheap_clone(),
343-
})
344-
})
345-
});
346-
347-
let mut trigger_data: Vec<_> = receipts
348-
.map(|r| NearTrigger::Receipt(Arc::new(r)))
349-
.collect();
350-
351-
trigger_data.push(NearTrigger::Block(block.cheap_clone()));
352-
353-
// TODO: `block` should probably be an `Arc` in `BlockWithTriggers` to avoid this clone.
354-
Ok(BlockWithTriggers::new(block.as_ref().clone(), trigger_data))
355-
}
356-
}
357-
358339
pub struct IngestorAdapter {
359340
logger: Logger,
360341
}

graph/src/blockchain/block_stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,9 @@ pub trait TriggersAdapter<C: Blockchain>: Send + Sync {
7777
async fn parent_ptr(&self, block: &BlockPtr) -> Result<Option<BlockPtr>, Error>;
7878
}
7979

80+
#[async_trait]
8081
pub trait FirehoseMapper<C: Blockchain>: Send + Sync {
81-
fn to_block_stream_event(
82+
async fn to_block_stream_event(
8283
&self,
8384
logger: &Logger,
8485
response: &bstream::BlockResponseV2,

graph/src/blockchain/firehose_block_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
9494
for await response in stream {
9595
match response {
9696
Ok(v) => {
97-
match mapper.to_block_stream_event(&logger, &v, &adapter, &filter) {
97+
match mapper.to_block_stream_event(&logger, &v, &adapter, &filter).await {
9898
Ok(event) => {
9999
yield event;
100100

0 commit comments

Comments
 (0)