Skip to content

Commit 2465e92

Browse files
committed
chain/ethereum: migrate traces to use alloy
1 parent dbf24ce commit 2465e92

File tree

2 files changed

+158
-101
lines changed

2 files changed

+158
-101
lines changed

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 137 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,27 @@ use graph::futures03::future::try_join_all;
1919
use graph::futures03::{
2020
self, compat::Future01CompatExt, FutureExt, StreamExt, TryFutureExt, TryStreamExt,
2121
};
22-
use graph::prelude::alloy;
23-
use graph::prelude::alloy::primitives::B256;
24-
use graph::prelude::alloy::rpc::types::{TransactionInput, TransactionRequest};
25-
use graph::prelude::alloy::transports::RpcError;
26-
27-
use graph::prelude::alloy_transaction_receipt_to_web3_transaction_receipt;
28-
use graph::prelude::h256_to_b256;
29-
use graph::prelude::tokio::try_join;
30-
use graph::prelude::{alloy_log_to_web3_log, b256_to_h256};
22+
use graph::prelude::{
23+
alloy::{
24+
self,
25+
primitives::B256,
26+
providers::{
27+
ext::TraceApi,
28+
fillers::{
29+
BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller,
30+
},
31+
Identity, Provider, RootProvider,
32+
},
33+
rpc::types::{
34+
trace::{filter::TraceFilter as AlloyTraceFilter, parity::LocalizedTransactionTrace},
35+
TransactionInput, TransactionRequest,
36+
},
37+
transports::{RpcError, TransportErrorKind},
38+
},
39+
alloy_log_to_web3_log, alloy_transaction_receipt_to_web3_transaction_receipt, b256_to_h256,
40+
h160_to_alloy_address, h256_to_b256,
41+
tokio::try_join,
42+
};
3143
use graph::slog::o;
3244
use graph::tokio::sync::RwLock;
3345
use graph::tokio::time::timeout;
@@ -41,11 +53,7 @@ use graph::{
4153
TimeoutError,
4254
},
4355
};
44-
use graph::{
45-
components::ethereum::*,
46-
prelude::web3::api::Web3,
47-
prelude::web3::types::{Trace, TraceFilter, TraceFilterBuilder, H160},
48-
};
56+
use graph::{components::ethereum::*, prelude::web3::api::Web3, prelude::web3::types::H160};
4957
use itertools::Itertools;
5058
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
5159
use std::convert::TryFrom;
@@ -73,12 +81,20 @@ use crate::{
7381
ENV_VARS,
7482
};
7583

84+
type AlloyProvider = FillProvider<
85+
JoinFill<
86+
Identity,
87+
JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
88+
>,
89+
RootProvider,
90+
>;
91+
7692
#[derive(Clone)]
7793
pub struct EthereumAdapter {
7894
logger: Logger,
7995
provider: String,
8096
web3: Arc<Web3<Transport>>,
81-
alloy: Arc<dyn alloy::providers::Provider>,
97+
alloy: Arc<AlloyProvider>,
8298
metrics: Arc<ProviderEthRpcMetrics>,
8399
supports_eip_1898: bool,
84100
call_only: bool,
@@ -164,79 +180,26 @@ impl EthereumAdapter {
164180
subgraph_metrics: Arc<SubgraphEthRpcMetrics>,
165181
from: BlockNumber,
166182
to: BlockNumber,
167-
addresses: Vec<H160>,
168-
) -> Result<Vec<Trace>, Error> {
183+
addresses: Vec<alloy::primitives::Address>,
184+
) -> Result<Vec<LocalizedTransactionTrace>, Error> {
169185
assert!(!self.call_only);
170186

171-
let eth = self.clone();
172187
let retry_log_message =
173188
format!("trace_filter RPC call for block range: [{}..{}]", from, to);
189+
let eth = self.clone();
190+
174191
retry(retry_log_message, &logger)
175192
.redact_log_urls(true)
176193
.limit(ENV_VARS.request_retries)
177194
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
178195
.run(move || {
179-
let trace_filter: TraceFilter = match addresses.len() {
180-
0 => TraceFilterBuilder::default()
181-
.from_block(from.into())
182-
.to_block(to.into())
183-
.build(),
184-
_ => TraceFilterBuilder::default()
185-
.from_block(from.into())
186-
.to_block(to.into())
187-
.to_address(addresses.clone())
188-
.build(),
189-
};
190-
191-
let eth = eth.cheap_clone();
192-
let logger_for_triggers = logger.clone();
193-
let logger_for_error = logger.clone();
194-
let start = Instant::now();
196+
let eth = eth.clone();
197+
let logger = logger.clone();
195198
let subgraph_metrics = subgraph_metrics.clone();
196-
let provider_metrics = eth.metrics.clone();
197-
let provider = self.provider.clone();
198-
199+
let addresses = addresses.clone();
199200
async move {
200-
let result = eth
201-
.web3
202-
.trace()
203-
.filter(trace_filter)
201+
eth.execute_trace_filter_request(logger, subgraph_metrics, from, to, addresses)
204202
.await
205-
.map(move |traces| {
206-
if !traces.is_empty() {
207-
if to == from {
208-
debug!(
209-
logger_for_triggers,
210-
"Received {} traces for block {}",
211-
traces.len(),
212-
to
213-
);
214-
} else {
215-
debug!(
216-
logger_for_triggers,
217-
"Received {} traces for blocks [{}, {}]",
218-
traces.len(),
219-
from,
220-
to
221-
);
222-
}
223-
}
224-
traces
225-
})
226-
.map_err(Error::from);
227-
228-
let elapsed = start.elapsed().as_secs_f64();
229-
provider_metrics.observe_request(elapsed, "trace_filter", &provider);
230-
subgraph_metrics.observe_request(elapsed, "trace_filter", &provider);
231-
if let Err(e) = &result {
232-
provider_metrics.add_error("trace_filter", &provider);
233-
subgraph_metrics.add_error("trace_filter", &provider);
234-
debug!(
235-
logger_for_error,
236-
"Error querying traces error = {:#} from = {} to = {}", e, from, to
237-
);
238-
}
239-
result
240203
}
241204
})
242205
.map_err(move |e| {
@@ -252,6 +215,93 @@ impl EthereumAdapter {
252215
.await
253216
}
254217

218+
async fn execute_trace_filter_request(
219+
&self,
220+
logger: Logger,
221+
subgraph_metrics: Arc<SubgraphEthRpcMetrics>,
222+
from: BlockNumber,
223+
to: BlockNumber,
224+
addresses: Vec<alloy::primitives::Address>,
225+
) -> Result<Vec<LocalizedTransactionTrace>, Error> {
226+
let alloy_trace_filter = Self::build_trace_filter(from, to, &addresses);
227+
let start = Instant::now();
228+
229+
let result = self.alloy.trace_filter(&alloy_trace_filter).await;
230+
231+
if let Ok(traces) = &result {
232+
self.log_trace_results(&logger, from, to, traces.len());
233+
}
234+
235+
self.record_trace_metrics(
236+
&subgraph_metrics,
237+
start.elapsed().as_secs_f64(),
238+
&result,
239+
from,
240+
to,
241+
&logger,
242+
);
243+
244+
result.map_err(Error::from)
245+
}
246+
247+
fn build_trace_filter(
248+
from: BlockNumber,
249+
to: BlockNumber,
250+
addresses: &[alloy::primitives::Address],
251+
) -> AlloyTraceFilter {
252+
let filter = AlloyTraceFilter::default()
253+
.from_block(from as u64)
254+
.to_block(to as u64);
255+
256+
if !addresses.is_empty() {
257+
filter.to_address(addresses.to_vec())
258+
} else {
259+
filter
260+
}
261+
}
262+
263+
fn log_trace_results(
264+
&self,
265+
logger: &Logger,
266+
from: BlockNumber,
267+
to: BlockNumber,
268+
trace_len: usize,
269+
) {
270+
if trace_len > 0 {
271+
if to == from {
272+
debug!(logger, "Received {} traces for block {}", trace_len, to);
273+
} else {
274+
debug!(
275+
logger,
276+
"Received {} traces for blocks [{}, {}]", trace_len, from, to
277+
);
278+
}
279+
}
280+
}
281+
282+
fn record_trace_metrics(
283+
&self,
284+
subgraph_metrics: &Arc<SubgraphEthRpcMetrics>,
285+
elapsed: f64,
286+
result: &Result<Vec<LocalizedTransactionTrace>, RpcError<TransportErrorKind>>,
287+
from: BlockNumber,
288+
to: BlockNumber,
289+
logger: &Logger,
290+
) {
291+
self.metrics
292+
.observe_request(elapsed, "trace_filter", &self.provider);
293+
subgraph_metrics.observe_request(elapsed, "trace_filter", &self.provider);
294+
295+
if let Err(e) = result {
296+
self.metrics.add_error("trace_filter", &self.provider);
297+
subgraph_metrics.add_error("trace_filter", &self.provider);
298+
debug!(
299+
logger,
300+
"Error querying traces error = {:#} from = {} to = {}", e, from, to
301+
);
302+
}
303+
}
304+
255305
// This is a lazy check for block receipt support. It is only called once and then the result is
256306
// cached. The result is not used for anything critical, so it is fine to be lazy.
257307
async fn check_block_receipt_support_and_update_cache(
@@ -368,8 +418,8 @@ impl EthereumAdapter {
368418
subgraph_metrics: Arc<SubgraphEthRpcMetrics>,
369419
from: BlockNumber,
370420
to: BlockNumber,
371-
addresses: Vec<H160>,
372-
) -> impl futures03::Stream<Item = Result<Trace, Error>> + Send {
421+
addresses: Vec<alloy::primitives::Address>,
422+
) -> impl futures03::Stream<Item = Result<LocalizedTransactionTrace, Error>> + Send {
373423
if from > to {
374424
panic!(
375425
"Can not produce a call stream on a backwards block range: from = {}, to = {}",
@@ -900,6 +950,11 @@ impl EthereumAdapter {
900950
addresses = vec![];
901951
}
902952

953+
let addresses = addresses
954+
.iter()
955+
.map(|addr| h160_to_alloy_address(*addr))
956+
.collect();
957+
903958
Box::new(
904959
eth.trace_stream(logger, subgraph_metrics, from, to, addresses)
905960
.try_filter_map(move |trace| {
@@ -987,11 +1042,11 @@ impl EthereumAdapter {
9871042
logger: &Logger,
9881043
subgraph_metrics: Arc<SubgraphEthRpcMetrics>,
9891044
block_number: BlockNumber,
990-
block_hash: H256,
1045+
block_hash: alloy::primitives::B256,
9911046
) -> Result<Vec<EthereumCall>, Error> {
9921047
let eth = self.clone();
9931048
let addresses = Vec::new();
994-
let traces: Vec<Trace> = eth
1049+
let traces: Vec<LocalizedTransactionTrace> = eth
9951050
.trace_stream(
9961051
logger,
9971052
subgraph_metrics.clone(),
@@ -1017,7 +1072,7 @@ impl EthereumAdapter {
10171072
// all the traces for the block, we need to ensure that the
10181073
// block hash for the traces is equal to the desired block hash.
10191074
// Assume all traces are for the same block.
1020-
if traces.iter().nth(0).unwrap().block_hash != block_hash {
1075+
if traces.iter().nth(0).unwrap().block_hash != Some(block_hash) {
10211076
return Err(anyhow!(
10221077
"Trace stream returned traces for an unexpected block: \
10231078
number = `{}`, hash = `{}`",
@@ -1835,7 +1890,7 @@ pub(crate) async fn get_calls(
18351890
subgraph_metrics.clone(),
18361891
BlockNumber::try_from(ethereum_block.block.number.unwrap().as_u64())
18371892
.unwrap(),
1838-
ethereum_block.block.hash.unwrap(),
1893+
h256_to_b256(ethereum_block.block.hash.unwrap()),
18391894
)
18401895
.await?
18411896
};

graph/src/components/ethereum/types.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1+
use alloy::rpc::types::trace::parity::{Action, LocalizedTransactionTrace, TraceOutput};
12
use serde::{Deserialize, Serialize};
23
use std::{convert::TryFrom, sync::Arc};
3-
use web3::types::{
4-
Action, Address, Block, Bytes, Log, Res, Trace, Transaction, TransactionReceipt, H256, U256,
5-
U64,
6-
};
4+
use web3::types::{Address, Block, Bytes, Log, Transaction, TransactionReceipt, H256, U256, U64};
75

86
use crate::{
97
blockchain::{BlockPtr, BlockTime},
10-
prelude::BlockNumber,
8+
prelude::{alloy_address_to_h160, b256_to_h256, BlockNumber},
9+
util::conversions::{alloy_bytes_to_web3_bytes, alloy_u256_to_web3_u256, u64_to_web3_u256},
1110
};
1211

1312
pub type LightEthereumBlock = Block<Transaction>;
@@ -125,21 +124,24 @@ pub struct EthereumCall {
125124
}
126125

127126
impl EthereumCall {
128-
pub fn try_from_trace(trace: &Trace) -> Option<Self> {
127+
pub fn try_from_trace(trace: &LocalizedTransactionTrace) -> Option<Self> {
129128
// The parity-ethereum tracing api returns traces for operations which had execution errors.
130129
// Filter errorful traces out, since call handlers should only run on successful CALLs.
131-
if trace.error.is_some() {
130+
131+
let tx_trace = &trace.trace;
132+
133+
if tx_trace.error.is_some() {
132134
return None;
133135
}
134136
// We are only interested in traces from CALLs
135-
let call = match &trace.action {
137+
let call = match &tx_trace.action {
136138
// Contract to contract value transfers compile to the CALL opcode
137139
// and have no input. Call handlers are for triggering on explicit method calls right now.
138140
Action::Call(call) if call.input.0.len() >= 4 => call,
139141
_ => return None,
140142
};
141-
let (output, gas_used) = match &trace.result {
142-
Some(Res::Call(result)) => (result.output.clone(), result.gas_used),
143+
let (output, gas_used) = match &tx_trace.result {
144+
Some(TraceOutput::Call(result)) => (result.output.clone(), result.gas_used),
143145
_ => return None,
144146
};
145147

@@ -148,15 +150,15 @@ impl EthereumCall {
148150
let transaction_index = trace.transaction_position? as u64;
149151

150152
Some(EthereumCall {
151-
from: call.from,
152-
to: call.to,
153-
value: call.value,
154-
gas_used,
155-
input: call.input.clone(),
156-
output,
157-
block_number: trace.block_number as BlockNumber,
158-
block_hash: trace.block_hash,
159-
transaction_hash: trace.transaction_hash,
153+
from: alloy_address_to_h160(call.from),
154+
to: alloy_address_to_h160(call.to),
155+
value: alloy_u256_to_web3_u256(call.value),
156+
gas_used: u64_to_web3_u256(gas_used),
157+
input: alloy_bytes_to_web3_bytes(call.input.clone()),
158+
output: alloy_bytes_to_web3_bytes(output),
159+
block_number: trace.block_number? as BlockNumber,
160+
block_hash: trace.block_hash.map(|h| b256_to_h256(h))?,
161+
transaction_hash: trace.transaction_hash.map(|h| b256_to_h256(h)),
160162
transaction_index,
161163
})
162164
}

0 commit comments

Comments
 (0)