Skip to content

Commit b1dfbaf

Browse files
author
Matthieu Vachon
authored
Initial addition of FirehoseBlockIngestor for NEAR (and other Firehose chains) (#3041)
* Initial addition of FirehoseBlockIngestor for NEAR (and other Firehose chains) The actual FirehoseBlockIngestor is generic enough to be re-usable on all chains that have Firehose support. This is the initial implementation that just follow chain head and update the head blocks. Rather naive in its approach right now. Current limitations that I would like to discuss: - Chain net_version and genesis_hash already exist with a default value. Now the value is fetched from Firehose correctly so there is a mismatch. How we should handle that, just delete the prior record "manually" before deploying? Automatically override if current value is XYZ? I would opt for the latter. - Still need to discuss how we want to handler backfilling. There is different options available: backfill straight inside graph-node with a different cursor, backfill with graph-man in parallel fashion, backfill on demand when requestion block number to block hash - Currently doing 3 calls to database: upsert_block, attemp_chain_update and set_head_cursor, I would like to refactor that inside 1 efficient transaction instead (if possible, wanted to discuss that first before doing it). Also optimized memory usage a bit by only unpacking via `HeaderOnlyBlock` structure.
1 parent f2ea6b9 commit b1dfbaf

File tree

15 files changed

+646
-114
lines changed

15 files changed

+646
-114
lines changed

chain/ethereum/src/codec.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
#[path = "protobuf/dfuse.ethereum.codec.v1.rs"]
22
mod pbcodec;
33

4-
use graph::prelude::{
5-
web3,
6-
web3::types::TransactionReceipt as w3TransactionReceipt,
7-
web3::types::{Bytes, H160, H2048, H256, H64, U256, U64},
8-
EthereumBlock, EthereumBlockWithCalls, EthereumCall, LightEthereumBlock,
4+
use graph::{
5+
blockchain::{Block as BlockchainBlock, BlockPtr},
6+
prelude::{
7+
web3,
8+
web3::types::TransactionReceipt as w3TransactionReceipt,
9+
web3::types::{Bytes, H160, H2048, H256, H64, U256, U64},
10+
BlockNumber, EthereumBlock, EthereumBlockWithCalls, EthereumCall, LightEthereumBlock,
11+
},
912
};
13+
use std::convert::TryFrom;
1014
use std::sync::Arc;
1115

1216
use crate::chain::BlockFinality;
@@ -295,3 +299,37 @@ impl Into<EthereumBlockWithCalls> for &Block {
295299
}
296300
}
297301
}
302+
303+
impl From<Block> for BlockPtr {
304+
fn from(b: Block) -> BlockPtr {
305+
(&b).into()
306+
}
307+
}
308+
309+
impl<'a> From<&'a Block> for BlockPtr {
310+
fn from(b: &'a Block) -> BlockPtr {
311+
BlockPtr::from((H256::from_slice(b.hash.as_ref()), b.number))
312+
}
313+
}
314+
315+
impl BlockchainBlock for Block {
316+
fn number(&self) -> i32 {
317+
BlockNumber::try_from(self.number).unwrap()
318+
}
319+
320+
fn ptr(&self) -> BlockPtr {
321+
self.into()
322+
}
323+
324+
fn parent_ptr(&self) -> Option<BlockPtr> {
325+
let parent_hash = &self.header.as_ref().unwrap().parent_hash;
326+
327+
match parent_hash.len() {
328+
0 => None,
329+
_ => Some(BlockPtr::from((
330+
H256::from_slice(parent_hash.as_ref()),
331+
self.number - 1,
332+
))),
333+
}
334+
}
335+
}

chain/near/proto/codec.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@ message Block {
1212
repeated StateChangeWithCause state_changes = 5;
1313
}
1414

15+
// HeaderOnlyBlock is a standard [Block] structure where all other fields are
16+
// removed so that hydrating that object from a [Block] bytes payload will
17+
// drastically reduced allocated memory required to hold the full block.
18+
//
19+
// This can be used to unpack a [Block] when only the [BlockHeader] information
20+
// is required and greatly reduced required memory.
21+
message HeaderOnlyBlock {
22+
BlockHeader header = 2;
23+
}
24+
1525
message StateChangeWithCause {
1626
StateChangeValue value = 1;
1727
StateChangeCause cause = 2;

chain/near/src/codec.rs

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
mod pbcodec;
33

44
use graph::{
5-
blockchain::Block as Blockchainblock,
5+
blockchain::Block as BlockchainBlock,
66
blockchain::BlockPtr,
77
prelude::{hex, web3::types::H256, BlockNumber},
88
};
@@ -23,18 +23,28 @@ impl LowerHex for &CryptoHash {
2323
}
2424
}
2525

26+
impl BlockHeader {
27+
pub fn parent_ptr(&self) -> Option<BlockPtr> {
28+
match (self.prev_hash.as_ref(), self.prev_height) {
29+
(Some(hash), number) => Some(BlockPtr::from((hash.into(), number))),
30+
_ => None,
31+
}
32+
}
33+
}
34+
35+
impl<'a> From<&'a BlockHeader> for BlockPtr {
36+
fn from(b: &'a BlockHeader) -> BlockPtr {
37+
BlockPtr::from((b.hash.as_ref().unwrap().into(), b.height))
38+
}
39+
}
40+
2641
impl Block {
2742
pub fn header(&self) -> &BlockHeader {
2843
self.header.as_ref().unwrap()
2944
}
3045

3146
pub fn parent_ptr(&self) -> Option<BlockPtr> {
32-
let header = self.header();
33-
34-
match (header.prev_hash.as_ref(), header.prev_height) {
35-
(Some(hash), number) => Some(BlockPtr::from((hash.into(), number))),
36-
_ => None,
37-
}
47+
self.header().parent_ptr()
3848
}
3949
}
4050

@@ -46,14 +56,11 @@ impl From<Block> for BlockPtr {
4656

4757
impl<'a> From<&'a Block> for BlockPtr {
4858
fn from(b: &'a Block) -> BlockPtr {
49-
let header = b.header();
50-
let hash: H256 = header.hash.as_ref().unwrap().into();
51-
52-
BlockPtr::from((hash, header.height))
59+
BlockPtr::from(b.header())
5360
}
5461
}
5562

56-
impl Blockchainblock for Block {
63+
impl BlockchainBlock for Block {
5764
fn number(&self) -> i32 {
5865
BlockNumber::try_from(self.header().height).unwrap()
5966
}
@@ -67,6 +74,32 @@ impl Blockchainblock for Block {
6774
}
6875
}
6976

77+
impl HeaderOnlyBlock {
78+
pub fn header(&self) -> &BlockHeader {
79+
self.header.as_ref().unwrap()
80+
}
81+
}
82+
83+
impl<'a> From<&'a HeaderOnlyBlock> for BlockPtr {
84+
fn from(b: &'a HeaderOnlyBlock) -> BlockPtr {
85+
BlockPtr::from(b.header())
86+
}
87+
}
88+
89+
impl BlockchainBlock for HeaderOnlyBlock {
90+
fn number(&self) -> i32 {
91+
BlockNumber::try_from(self.header().height).unwrap()
92+
}
93+
94+
fn ptr(&self) -> BlockPtr {
95+
self.into()
96+
}
97+
98+
fn parent_ptr(&self) -> Option<BlockPtr> {
99+
self.header().parent_ptr()
100+
}
101+
}
102+
70103
impl execution_outcome::Status {
71104
pub fn is_success(&self) -> bool {
72105
use execution_outcome::Status::*;

chain/near/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@ mod runtime;
77
mod trigger;
88

99
pub use crate::chain::Chain;
10+
pub use codec::Block;
11+
pub use codec::HeaderOnlyBlock;

chain/near/src/protobuf/sf.near.codec.v1.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,17 @@ pub struct Block {
1111
#[prost(message, repeated, tag = "5")]
1212
pub state_changes: ::prost::alloc::vec::Vec<StateChangeWithCause>,
1313
}
14+
/// HeaderOnlyBlock is a standard [Block] structure where all other fields are
15+
/// removed so that hydrating that object from a [Block] bytes payload will
16+
/// drastically reduced allocated memory required to hold the full block.
17+
///
18+
/// This can be used to unpack a [Block] when only the [BlockHeader] information
19+
/// is required and greatly reduced required memory.
20+
#[derive(Clone, PartialEq, ::prost::Message)]
21+
pub struct HeaderOnlyBlock {
22+
#[prost(message, optional, tag = "2")]
23+
pub header: ::core::option::Option<BlockHeader>,
24+
}
1425
#[derive(Clone, PartialEq, ::prost::Message)]
1526
pub struct StateChangeWithCause {
1627
#[prost(message, optional, tag = "1")]
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
use std::{marker::PhantomData, sync::Arc, time::Duration};
2+
3+
use crate::{
4+
blockchain::Block as BlockchainBlock,
5+
components::store::ChainStore,
6+
firehose::{bstream, decode_firehose_block, endpoints::FirehoseEndpoint},
7+
prelude::{error, info, Logger},
8+
util::backoff::ExponentialBackoff,
9+
};
10+
use anyhow::{Context, Error};
11+
use futures03::StreamExt;
12+
use slog::trace;
13+
use tonic::Streaming;
14+
15+
pub struct FirehoseBlockIngestor<M>
16+
where
17+
M: prost::Message + BlockchainBlock + Default + 'static,
18+
{
19+
chain_store: Arc<dyn ChainStore>,
20+
endpoint: Arc<FirehoseEndpoint>,
21+
logger: Logger,
22+
23+
phantom: PhantomData<M>,
24+
}
25+
26+
impl<M> FirehoseBlockIngestor<M>
27+
where
28+
M: prost::Message + BlockchainBlock + Default + 'static,
29+
{
30+
pub fn new(
31+
chain_store: Arc<dyn ChainStore>,
32+
endpoint: Arc<FirehoseEndpoint>,
33+
logger: Logger,
34+
) -> FirehoseBlockIngestor<M> {
35+
FirehoseBlockIngestor {
36+
chain_store,
37+
endpoint,
38+
logger,
39+
phantom: PhantomData {},
40+
}
41+
}
42+
43+
pub async fn run(self) {
44+
let mut latest_cursor = self.fetch_head_cursor().await;
45+
let mut backoff =
46+
ExponentialBackoff::new(Duration::from_millis(250), Duration::from_secs(30));
47+
48+
loop {
49+
info!(
50+
self.logger,
51+
"Blockstream disconnected, connecting"; "endpoint uri" => format_args!("{}", self.endpoint), "cursor" => format_args!("{}", latest_cursor),
52+
);
53+
54+
let result = self
55+
.endpoint
56+
.clone()
57+
.stream_blocks(bstream::BlocksRequestV2 {
58+
// Starts at current HEAD block of the chain (viewed from Firehose side)
59+
start_block_num: -1,
60+
start_cursor: latest_cursor.clone(),
61+
fork_steps: vec![
62+
bstream::ForkStep::StepNew as i32,
63+
bstream::ForkStep::StepUndo as i32,
64+
],
65+
..Default::default()
66+
})
67+
.await;
68+
69+
match result {
70+
Ok(stream) => {
71+
// Consume the stream of blocks until an error is hit
72+
latest_cursor = self.process_blocks(latest_cursor, stream).await
73+
}
74+
Err(e) => {
75+
error!(self.logger, "Unable to connect to endpoint: {:?}", e);
76+
}
77+
}
78+
79+
// If we reach this point, we must wait a bit before retrying
80+
backoff.sleep_async().await;
81+
}
82+
}
83+
84+
async fn fetch_head_cursor(&self) -> String {
85+
let mut backoff =
86+
ExponentialBackoff::new(Duration::from_millis(250), Duration::from_secs(30));
87+
loop {
88+
match self.chain_store.clone().chain_head_cursor() {
89+
Ok(cursor) => return cursor.unwrap_or_else(|| "".to_string()),
90+
Err(e) => {
91+
error!(self.logger, "Fetching chain head cursor failed: {:?}", e);
92+
93+
backoff.sleep_async().await;
94+
}
95+
}
96+
}
97+
}
98+
99+
/// Consumes the incoming stream of blocks infinitely until it hits an error. In which case
100+
/// the error is logged right away and the latest available cursor is returned
101+
/// upstream for future consumption.
102+
async fn process_blocks(
103+
&self,
104+
cursor: String,
105+
mut stream: Streaming<bstream::BlockResponseV2>,
106+
) -> String {
107+
use bstream::ForkStep::*;
108+
109+
let mut latest_cursor = cursor;
110+
111+
while let Some(message) = stream.next().await {
112+
match message {
113+
Ok(v) => {
114+
let step = bstream::ForkStep::from_i32(v.step)
115+
.expect("Fork step should always match to known value");
116+
117+
let result = match step {
118+
StepNew => self.process_new_block(&v).await,
119+
StepUndo => {
120+
trace!(self.logger, "Received undo block to ingest, skipping");
121+
Ok(())
122+
}
123+
StepIrreversible | StepUnknown => panic!(
124+
"We explicitly requested StepNew|StepUndo but received something else"
125+
),
126+
};
127+
128+
if let Err(e) = result {
129+
error!(self.logger, "Process block failed: {:?}", e);
130+
break;
131+
}
132+
133+
latest_cursor = v.cursor;
134+
}
135+
Err(e) => {
136+
info!(
137+
self.logger,
138+
"An error occurred while streaming blocks: {}", e
139+
);
140+
break;
141+
}
142+
}
143+
}
144+
145+
error!(
146+
self.logger,
147+
"Stream blocks complete unexpectedly, expecting stream to always stream blocks"
148+
);
149+
latest_cursor
150+
}
151+
152+
async fn process_new_block(&self, response: &bstream::BlockResponseV2) -> Result<(), Error> {
153+
let block = decode_firehose_block::<M>(response)
154+
.context("Mapping firehose block to blockchain::Block")?;
155+
156+
trace!(self.logger, "Received new block to ingest {}", block.ptr());
157+
158+
self.chain_store
159+
.clone()
160+
.set_chain_head(block, response.cursor.clone())
161+
.await
162+
.context("Updating chain head")?;
163+
164+
Ok(())
165+
}
166+
}

graph/src/blockchain/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
pub mod block_ingestor;
66
pub mod block_stream;
7+
pub mod firehose_block_ingestor;
78
pub mod firehose_block_stream;
89
pub mod polling_block_stream;
910
mod types;

graph/src/components/store.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1324,6 +1324,21 @@ pub trait ChainStore: Send + Sync + 'static {
13241324
/// The head block pointer will be None on initial set up.
13251325
fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error>;
13261326

1327+
/// Get the current head block cursor for this chain.
1328+
///
1329+
/// The head block cursor will be None on initial set up.
1330+
fn chain_head_cursor(&self) -> Result<Option<String>, Error>;
1331+
1332+
/// This method does actually three operations:
1333+
/// - Upserts received block into blocks table
1334+
/// - Update chain head block into networks table
1335+
/// - Update chain head cursor into networks table
1336+
async fn set_chain_head(
1337+
self: Arc<Self>,
1338+
block: Arc<dyn Block>,
1339+
cursor: String,
1340+
) -> Result<(), Error>;
1341+
13271342
/// Returns the blocks present in the store.
13281343
fn blocks(&self, hashes: &[H256]) -> Result<Vec<serde_json::Value>, Error>;
13291344

0 commit comments

Comments
 (0)