Skip to content

Commit 45a29c2

Browse files
authored
chain/ethereum, docs: Update block ingestor to fetch receipts in parallel. (#3030)
Replaces `web3:Batch` with buffered, concurrent RPC calls when fetching transaction receipts for a block. The `fn load_full_block` was updated to use std`::future::Future`, and the actual handling of fetching transaction receipts was moved to a dedicated function that uses a retry strategy for each request. This behaviour is considered experimental, so it's fenced by an environment variable named `GRAPH_EXPERIMENTAL_FETCH_TXN_RECEIPTS_CONCURRENTLY`, which must be set to any value for it to take effect. Fixes #3018
1 parent 68eaac1 commit 45a29c2

File tree

7 files changed

+188
-103
lines changed

7 files changed

+188
-103
lines changed

chain/ethereum/src/adapter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ pub trait EthereumAdapter: Send + Sync + 'static {
627627
&self,
628628
logger: &Logger,
629629
block: LightEthereumBlock,
630-
) -> Box<dyn Future<Item = EthereumBlock, Error = bc::IngestorError> + Send>;
630+
) -> Pin<Box<dyn std::future::Future<Output = Result<EthereumBlock, bc::IngestorError>> + Send>>;
631631

632632
/// Load block pointer for the specified `block number`.
633633
fn block_pointer_from_number(

chain/ethereum/src/chain.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,6 @@ impl IngestorAdapterTrait<Chain> for IngestorAdapter {
704704
let ethereum_block = self
705705
.eth_adapter
706706
.load_full_block(&self.logger, block)
707-
.compat()
708707
.await?;
709708

710709
// We need something that implements `Block` to store the block; the

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 173 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@ use graph::{
2929
use graph::{
3030
components::ethereum::*,
3131
prelude::web3::api::Web3,
32-
prelude::web3::transports::batch::Batch,
32+
prelude::web3::transports::Batch,
3333
prelude::web3::types::{Trace, TraceFilter, TraceFilterBuilder, H160},
3434
};
3535
use itertools::Itertools;
3636
use lazy_static::lazy_static;
3737
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
3838
use std::convert::TryFrom;
3939
use std::iter::FromIterator;
40+
use std::pin::Pin;
4041
use std::sync::Arc;
4142
use std::time::Instant;
4243

@@ -106,6 +107,18 @@ lazy_static! {
106107
.map(|s| s.split(';').filter(|s| s.len() > 0).map(ToOwned::to_owned).collect())
107108
.unwrap_or(Vec::new())
108109
};
110+
111+
static ref MAX_CONCURRENT_JSON_RPC_CALLS: usize = std::env::var(
112+
"GRAPH_ETHEREUM_BLOCK_INGESTOR_MAX_CONCURRENT_JSON_RPC_CALLS_FOR_TXN_RECEIPTS"
113+
)
114+
.unwrap_or("1000".into())
115+
.parse::<usize>()
116+
.expect("invalid GRAPH_ETHEREUM_BLOCK_INGESTOR_MAX_CONCURRENT_JSON_RPC_CALLS_FOR_TXN_RECEIPTS env var");
117+
118+
static ref FETCH_RECEIPTS_CONCURRENTLY: bool = std::env::var("GRAPH_EXPERIMENTAL_FETCH_TXN_RECEIPTS_CONCURRENTLY")
119+
.is_ok();
120+
121+
109122
}
110123

111124
/// Gas limit for `eth_call`. The value of 50_000_000 is a protocol-wide parameter so this
@@ -719,7 +732,7 @@ impl EthereumAdapter {
719732
)
720733
}))
721734
// Real limits on the number of parallel requests are imposed within the adapter.
722-
.buffered(1000)
735+
.buffered(*MAX_CONCURRENT_JSON_RPC_CALLS)
723736
.try_concat()
724737
.boxed()
725738
}
@@ -1052,115 +1065,56 @@ impl EthereumAdapterTrait for EthereumAdapter {
10521065
&self,
10531066
logger: &Logger,
10541067
block: LightEthereumBlock,
1055-
) -> Box<dyn Future<Item = EthereumBlock, Error = IngestorError> + Send> {
1068+
) -> Pin<Box<dyn std::future::Future<Output = Result<EthereumBlock, IngestorError>> + Send>>
1069+
{
1070+
let web3 = Arc::clone(&self.web3);
10561071
let logger = logger.clone();
10571072
let block_hash = block.hash.expect("block is missing block hash");
10581073

10591074
// The early return is necessary for correctness, otherwise we'll
10601075
// request an empty batch which is not valid in JSON-RPC.
10611076
if block.transactions.is_empty() {
10621077
trace!(logger, "Block {} contains no transactions", block_hash);
1063-
return Box::new(future::ok(EthereumBlock {
1078+
return Box::pin(std::future::ready(Ok(EthereumBlock {
10641079
block: Arc::new(block),
10651080
transaction_receipts: Vec::new(),
1066-
}));
1081+
})));
10671082
}
1068-
let web3 = self.web3.clone();
10691083

1070-
// Retry, but eventually give up.
1071-
// A receipt might be missing because the block was uncled, and the
1072-
// transaction never made it back into the main chain.
1073-
Box::new(
1074-
retry("batch eth_getTransactionReceipt RPC call", &logger)
1075-
.limit(*REQUEST_RETRIES)
1076-
.no_logging()
1077-
.timeout_secs(*JSON_RPC_TIMEOUT)
1078-
.run(move || {
1079-
let block = block.clone();
1080-
let batching_web3 = Web3::new(Batch::new(web3.transport().clone()));
1084+
let hashes: Vec<_> = block
1085+
.transactions
1086+
.iter()
1087+
.map(|txn| txn.hash.clone())
1088+
.collect();
10811089

1082-
let receipt_futures = block
1083-
.transactions
1084-
.iter()
1085-
.map(|tx| {
1086-
let logger = logger.clone();
1087-
let tx_hash = tx.hash;
1088-
1089-
Box::pin(batching_web3.eth().transaction_receipt(tx_hash))
1090-
.compat()
1091-
.from_err()
1092-
.map_err(IngestorError::Unknown)
1093-
.and_then(move |receipt_opt| {
1094-
receipt_opt.ok_or_else(move || {
1095-
// No receipt was returned.
1096-
//
1097-
// This can be because the Ethereum node no longer
1098-
// considers this block to be part of the main chain,
1099-
// and so the transaction is no longer in the main
1100-
// chain. Nothing we can do from here except give up
1101-
// trying to ingest this block.
1102-
//
1103-
// This could also be because the receipt is simply not
1104-
// available yet. For that case, we should retry until
1105-
// it becomes available.
1106-
IngestorError::ReceiptUnavailable(block_hash, tx_hash)
1107-
})
1108-
})
1109-
.and_then(move |receipt| {
1110-
// Check if the receipt has a block hash and is for the right
1111-
// block. Parity nodes seem to return receipts with no block
1112-
// hash when a transaction is no longer in the main chain, so
1113-
// treat that case the same as a receipt being absent entirely.
1114-
if receipt.block_hash != Some(block_hash) {
1115-
info!(
1116-
logger, "receipt block mismatch";
1117-
"receipt_block_hash" =>
1118-
receipt.block_hash.unwrap_or_default().to_string(),
1119-
"block_hash" =>
1120-
block_hash.to_string(),
1121-
"tx_hash" => tx_hash.to_string(),
1122-
);
1123-
1124-
// If the receipt came from a different block, then the
1125-
// Ethereum node no longer considers this block to be
1126-
// in the main chain. Nothing we can do from here
1127-
// except give up trying to ingest this block.
1128-
// There is no way to get the transaction receipt from
1129-
// this block.
1130-
Err(IngestorError::BlockUnavailable(block_hash))
1131-
} else {
1132-
Ok(receipt)
1133-
}
1134-
})
1135-
})
1136-
.collect::<Vec<_>>();
1090+
let receipts_future = if *FETCH_RECEIPTS_CONCURRENTLY {
1091+
let hash_stream = graph::tokio_stream::iter(hashes);
1092+
let receipt_stream = graph::tokio_stream::StreamExt::map(hash_stream, move |tx_hash| {
1093+
fetch_transaction_receipt_with_retry(
1094+
web3.cheap_clone(),
1095+
tx_hash,
1096+
block_hash,
1097+
logger.cheap_clone(),
1098+
)
1099+
})
1100+
.buffered(*MAX_CONCURRENT_JSON_RPC_CALLS);
1101+
graph::tokio_stream::StreamExt::collect::<Result<Vec<TransactionReceipt>, IngestorError>>(
1102+
receipt_stream,
1103+
).boxed()
1104+
} else {
1105+
// Deprecated batching retrieval of transaction receipts.
1106+
fetch_transaction_receipts_in_batch_with_retry(web3, hashes, block_hash, logger).boxed()
1107+
};
11371108

1138-
Box::pin(batching_web3.transport().submit_batch())
1139-
.compat()
1140-
.from_err()
1141-
.map_err(IngestorError::Unknown)
1142-
.and_then(move |_| {
1143-
stream::futures_ordered(receipt_futures).collect().map(
1144-
move |transaction_receipts| EthereumBlock {
1145-
block: Arc::new(block),
1146-
transaction_receipts,
1147-
},
1148-
)
1149-
})
1150-
.compat()
1151-
})
1152-
.map_err(move |e| {
1153-
e.into_inner().unwrap_or_else(move || {
1154-
anyhow!(
1155-
"Ethereum node took too long to return receipts for block {}",
1156-
block_hash
1157-
)
1158-
.into()
1159-
})
1160-
})
1161-
.boxed()
1162-
.compat(),
1163-
)
1109+
let block_future =
1110+
futures03::TryFutureExt::map_ok(receipts_future, move |transaction_receipts| {
1111+
EthereumBlock {
1112+
block: Arc::new(block),
1113+
transaction_receipts,
1114+
}
1115+
});
1116+
1117+
Box::pin(block_future)
11641118
}
11651119

11661120
fn block_pointer_from_number(
@@ -1846,3 +1800,124 @@ async fn filter_call_triggers_from_unsuccessful_transactions(
18461800
}
18471801
Ok(block)
18481802
}
1803+
1804+
/// Deprecated. Wraps the [`fetch_transaction_receipts_in_batch`] in a retry loop.
1805+
async fn fetch_transaction_receipts_in_batch_with_retry(
1806+
web3: Arc<Web3<Transport>>,
1807+
hashes: Vec<H256>,
1808+
block_hash: H256,
1809+
logger: Logger,
1810+
) -> Result<Vec<TransactionReceipt>, IngestorError> {
1811+
retry("batch eth_getTransactionReceipt RPC call", &logger)
1812+
.limit(*REQUEST_RETRIES)
1813+
.no_logging()
1814+
.timeout_secs(*JSON_RPC_TIMEOUT)
1815+
.run(move || {
1816+
let web3 = web3.cheap_clone();
1817+
let hashes = hashes.clone();
1818+
let logger = logger.cheap_clone();
1819+
fetch_transaction_receipts_in_batch(web3, hashes, block_hash, logger).boxed()
1820+
})
1821+
.await
1822+
.map_err(|_timeout| anyhow!(block_hash).into())
1823+
}
1824+
1825+
/// Deprecated. Attempts to fetch multiple transaction receipts in a batching contex.
1826+
async fn fetch_transaction_receipts_in_batch(
1827+
web3: Arc<Web3<Transport>>,
1828+
hashes: Vec<H256>,
1829+
block_hash: H256,
1830+
logger: Logger,
1831+
) -> Result<Vec<TransactionReceipt>, IngestorError> {
1832+
let batching_web3 = Web3::new(Batch::new(web3.transport().clone()));
1833+
let receipt_futures = hashes
1834+
.into_iter()
1835+
.map(|hash| {
1836+
let logger = logger.cheap_clone();
1837+
batching_web3
1838+
.eth()
1839+
.transaction_receipt(hash.clone())
1840+
.map_err(|web3_error| IngestorError::from(web3_error))
1841+
.and_then(move |some_receipt| async move {
1842+
resolve_transaction_receipt(some_receipt, hash, block_hash, logger)
1843+
})
1844+
})
1845+
.collect::<Vec<_>>();
1846+
1847+
batching_web3.transport().submit_batch().await?;
1848+
1849+
let mut collected = vec![];
1850+
for receipt in receipt_futures.into_iter() {
1851+
collected.push(receipt.await?)
1852+
}
1853+
Ok(collected)
1854+
}
1855+
1856+
/// Retries fetching a single transaction receipt.
1857+
async fn fetch_transaction_receipt_with_retry(
1858+
web3: Arc<Web3<Transport>>,
1859+
transaction_hash: H256,
1860+
block_hash: H256,
1861+
logger: Logger,
1862+
) -> Result<TransactionReceipt, IngestorError> {
1863+
let logger = logger.cheap_clone();
1864+
retry("batch eth_getTransactionReceipt RPC call", &logger)
1865+
.limit(*REQUEST_RETRIES)
1866+
.no_logging()
1867+
.timeout_secs(*JSON_RPC_TIMEOUT)
1868+
.run(move || web3.eth().transaction_receipt(transaction_hash).boxed())
1869+
.await
1870+
.map_err(|_timeout| anyhow!(block_hash).into())
1871+
.and_then(move |some_receipt| {
1872+
resolve_transaction_receipt(some_receipt, transaction_hash, block_hash, logger)
1873+
})
1874+
}
1875+
1876+
fn resolve_transaction_receipt(
1877+
transaction_receipt: Option<TransactionReceipt>,
1878+
transaction_hash: H256,
1879+
block_hash: H256,
1880+
logger: Logger,
1881+
) -> Result<TransactionReceipt, IngestorError> {
1882+
match transaction_receipt {
1883+
// A receipt might be missing because the block was uncled, and the transaction never
1884+
// made it back into the main chain.
1885+
Some(receipt) => {
1886+
// Check if the receipt has a block hash and is for the right block. Parity nodes seem
1887+
// to return receipts with no block hash when a transaction is no longer in the main
1888+
// chain, so treat that case the same as a receipt being absent entirely.
1889+
if receipt.block_hash != Some(block_hash) {
1890+
info!(
1891+
logger, "receipt block mismatch";
1892+
"receipt_block_hash" =>
1893+
receipt.block_hash.unwrap_or_default().to_string(),
1894+
"block_hash" =>
1895+
block_hash.to_string(),
1896+
"tx_hash" => transaction_hash.to_string(),
1897+
);
1898+
1899+
// If the receipt came from a different block, then the Ethereum node no longer
1900+
// considers this block to be in the main chain. Nothing we can do from here except
1901+
// give up trying to ingest this block. There is no way to get the transaction
1902+
// receipt from this block.
1903+
Err(IngestorError::BlockUnavailable(block_hash.clone()))
1904+
} else {
1905+
Ok(receipt)
1906+
}
1907+
}
1908+
None => {
1909+
// No receipt was returned.
1910+
//
1911+
// This can be because the Ethereum node no longer considers this block to be part of
1912+
// the main chain, and so the transaction is no longer in the main chain. Nothing we can
1913+
// do from here except give up trying to ingest this block.
1914+
//
1915+
// This could also be because the receipt is simply not available yet. For that case, we
1916+
// should retry until it becomes available.
1917+
Err(IngestorError::ReceiptUnavailable(
1918+
block_hash,
1919+
transaction_hash,
1920+
))
1921+
}
1922+
}
1923+
}

chain/ethereum/src/network_indexer/network_indexer.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ fn fetch_block_and_ommers_by_number(
204204
adapter
205205
.clone()
206206
.block_by_number(&logger, block_number)
207-
.from_err()
207+
.map_err(Into::into)
208208
)
209209
.and_then(move |block| match block {
210210
None => {
@@ -224,8 +224,9 @@ fn fetch_block_and_ommers_by_number(
224224
fetch_full_block_problems,
225225
adapter_for_full_block
226226
.load_full_block(&logger_for_full_block, block)
227-
.from_err()
227+
.map_err(Into::into)
228228
)
229+
.compat()
229230
.and_then(move |block| {
230231
fetch_ommers(
231232
logger_for_ommers.clone(),

chain/ethereum/tests/network_indexer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ fn create_mock_ethereum_adapter(
246246
.expect_load_full_block()
247247
.returning(move |_, block: LightEthereumBlock| {
248248
let chains = chains_for_load_full_block.lock().unwrap();
249-
Box::new(future::result(
249+
Box::pin(std::future::ready(
250250
chains
251251
.current_chain()
252252
.ok_or_else(|| anyhow!("unknown chain {:?}", chains.index()))

docs/environment-variables.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ those.
3434
subgraph if the limit is reached, but will simply restart the syncing step,
3535
so it can be low. This limit guards against scenarios such as requesting a
3636
block hash that has been reorged. Defaults to 10.
37+
- `GRAPH_ETHEREUM_BLOCK_INGESTOR_MAX_CONCURRENT_JSON_RPC_CALLS_FOR_TXN_RECEIPTS`:
38+
The maximum number of concurrent requests made against Ethereum for
39+
requesting transaction receipts during block ingestion.
40+
Defaults to 1,000.
3741
- `GRAPH_ETHEREUM_CLEANUP_BLOCKS` : Set to `true` to clean up unneeded
3842
blocks from the cache in the database. When this is `false` or unset (the
3943
default), blocks will never be removed from the block cache. This setting

graph/src/blockchain/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,12 @@ impl From<Error> for IngestorError {
154154
}
155155
}
156156

157+
impl From<web3::Error> for IngestorError {
158+
fn from(e: web3::Error) -> Self {
159+
IngestorError::Unknown(anyhow::anyhow!(e))
160+
}
161+
}
162+
157163
#[async_trait]
158164
pub trait IngestorAdapter<C: Blockchain> {
159165
fn logger(&self) -> &Logger;

0 commit comments

Comments
 (0)