Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] }
tonic-build = { version = "0.12.3", features = ["prost"] }
tower-http = { version = "0.6.6", features = ["cors"] }
wasmparser = "0.118.1"
wasmtime = "33.0.2"
wasmtime = { version = "33.0.2", features = ["async"] }
substreams = "=0.6.0"
substreams-entity-change = "2"
substreams-near-core = "=0.10.2"
Expand Down
411 changes: 207 additions & 204 deletions chain/ethereum/src/runtime/abi.rs

Large diffs are not rendered by default.

109 changes: 66 additions & 43 deletions chain/ethereum/src/runtime/runtime_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use graph::data::store::scalar::BigInt;
use graph::data::subgraph::{API_VERSION_0_0_4, API_VERSION_0_0_9};
use graph::data_source;
use graph::data_source::common::{ContractCall, MappingABI};
use graph::futures03::FutureExt as _;
use graph::prelude::web3::types::H160;
use graph::runtime::gas::Gas;
use graph::runtime::{AscIndexId, IndexForAscTypeId};
Expand Down Expand Up @@ -95,20 +96,27 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
let call_cache = call_cache.clone();
let abis = abis.clone();
move |ctx, wasm_ptr| {
let eth_adapter =
eth_adapters.call_or_cheapest(Some(&NodeCapabilities {
archive,
traces: false,
}))?;
ethereum_call(
&eth_adapter,
call_cache.clone(),
ctx,
wasm_ptr,
&abis,
eth_call_gas,
)
.map(|ptr| ptr.wasm_ptr())
let eth_adapters = eth_adapters.cheap_clone();
let call_cache = call_cache.cheap_clone();
let abis = abis.cheap_clone();
async move {
let eth_adapter =
eth_adapters.call_or_cheapest(Some(&NodeCapabilities {
archive,
traces: false,
}))?;
ethereum_call(
&eth_adapter,
call_cache.clone(),
ctx,
wasm_ptr,
&abis,
eth_call_gas,
)
.await
.map(|ptr| ptr.wasm_ptr())
}
.boxed()
}
}),
},
Expand All @@ -117,26 +125,37 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
func: Arc::new({
let eth_adapters = eth_adapters.clone();
move |ctx, wasm_ptr| {
let eth_adapter =
eth_adapters.unverified_cheapest_with(&NodeCapabilities {
archive,
traces: false,
})?;
eth_get_balance(&eth_adapter, ctx, wasm_ptr).map(|ptr| ptr.wasm_ptr())
let eth_adapters = eth_adapters.cheap_clone();
async move {
let eth_adapter =
eth_adapters.unverified_cheapest_with(&NodeCapabilities {
archive,
traces: false,
})?;
eth_get_balance(&eth_adapter, ctx, wasm_ptr)
.await
.map(|ptr| ptr.wasm_ptr())
}
.boxed()
}
}),
},
HostFn {
name: "ethereum.hasCode",
func: Arc::new({
let eth_adapters = eth_adapters.clone();
move |ctx, wasm_ptr| {
let eth_adapter =
eth_adapters.unverified_cheapest_with(&NodeCapabilities {
archive,
traces: false,
})?;
eth_has_code(&eth_adapter, ctx, wasm_ptr).map(|ptr| ptr.wasm_ptr())
let eth_adapters = eth_adapters.cheap_clone();
async move {
let eth_adapter =
eth_adapters.unverified_cheapest_with(&NodeCapabilities {
archive,
traces: false,
})?;
eth_has_code(&eth_adapter, ctx, wasm_ptr)
.await
.map(|ptr| ptr.wasm_ptr())
}
.boxed()
}
}),
},
Expand Down Expand Up @@ -170,10 +189,10 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
}

/// function ethereum.call(call: SmartContractCall): Array<Token> | null
fn ethereum_call(
async fn ethereum_call(
eth_adapter: &EthereumAdapter,
call_cache: Arc<dyn EthereumCallCache>,
ctx: HostFnCtx,
ctx: HostFnCtx<'_>,
wasm_ptr: u32,
abis: &[Arc<MappingABI>],
eth_call_gas: Option<u32>,
Expand All @@ -199,14 +218,15 @@ fn ethereum_call(
abis,
eth_call_gas,
ctx.metrics.cheap_clone(),
)?;
)
.await?;
match result {
Some(tokens) => Ok(asc_new(ctx.heap, tokens.as_slice(), &ctx.gas)?),
Some(tokens) => Ok(asc_new(ctx.heap, tokens.as_slice(), &ctx.gas).await?),
None => Ok(AscPtr::null()),
}
}

fn eth_get_balance(
async fn eth_get_balance(
eth_adapter: &EthereumAdapter,
ctx: HostFnCtx<'_>,
wasm_ptr: u32,
Expand All @@ -225,12 +245,14 @@ fn eth_get_balance(

let address: H160 = asc_get(ctx.heap, wasm_ptr.into(), &ctx.gas, 0)?;

let result = graph::block_on(eth_adapter.get_balance(logger, address, block_ptr.clone()));
let result = eth_adapter
.get_balance(logger, address, block_ptr.clone())
.await;

match result {
Ok(v) => {
let bigint = BigInt::from_unsigned_u256(&v);
Ok(asc_new(ctx.heap, &bigint, &ctx.gas)?)
Ok(asc_new(ctx.heap, &bigint, &ctx.gas).await?)
}
// Retry on any kind of error
Err(EthereumRpcError::Web3Error(e)) => Err(HostExportError::PossibleReorg(e.into())),
Expand All @@ -240,7 +262,7 @@ fn eth_get_balance(
}
}

fn eth_has_code(
async fn eth_has_code(
eth_adapter: &EthereumAdapter,
ctx: HostFnCtx<'_>,
wasm_ptr: u32,
Expand All @@ -259,11 +281,13 @@ fn eth_has_code(

let address: H160 = asc_get(ctx.heap, wasm_ptr.into(), &ctx.gas, 0)?;

let result = graph::block_on(eth_adapter.get_code(logger, address, block_ptr.clone()))
let result = eth_adapter
.get_code(logger, address, block_ptr.clone())
.await
.map(|v| !v.0.is_empty());

match result {
Ok(v) => Ok(asc_new(ctx.heap, &AscWrapped { inner: v }, &ctx.gas)?),
Ok(v) => Ok(asc_new(ctx.heap, &AscWrapped { inner: v }, &ctx.gas).await?),
// Retry on any kind of error
Err(EthereumRpcError::Web3Error(e)) => Err(HostExportError::PossibleReorg(e.into())),
Err(EthereumRpcError::Timeout) => Err(HostExportError::PossibleReorg(
Expand All @@ -273,7 +297,7 @@ fn eth_has_code(
}

/// Returns `Ok(None)` if the call was reverted.
fn eth_call(
async fn eth_call(
eth_adapter: &EthereumAdapter,
call_cache: Arc<dyn EthereumCallCache>,
logger: &Logger,
Expand Down Expand Up @@ -331,11 +355,10 @@ fn eth_call(
// Run Ethereum call in tokio runtime
let logger1 = logger.clone();
let call_cache = call_cache.clone();
let (result, source) =
match graph::block_on(eth_adapter.contract_call(&logger1, &call, call_cache)) {
Ok((result, source)) => (Ok(result), source),
Err(e) => (Err(e), call::Source::Rpc),
};
let (result, source) = match eth_adapter.contract_call(&logger1, &call, call_cache).await {
Ok((result, source)) => (Ok(result), source),
Err(e) => (Err(e), call::Source::Rpc),
};
let result = match result {
Ok(res) => Ok(res),

Expand Down
33 changes: 23 additions & 10 deletions chain/ethereum/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use graph::data::subgraph::API_VERSION_0_0_2;
use graph::data::subgraph::API_VERSION_0_0_6;
use graph::data::subgraph::API_VERSION_0_0_7;
use graph::data_source::common::DeclaredCall;
use graph::prelude::async_trait;
use graph::prelude::ethabi::ethereum_types::H160;
use graph::prelude::ethabi::ethereum_types::H256;
use graph::prelude::ethabi::ethereum_types::U128;
Expand Down Expand Up @@ -129,8 +130,9 @@ impl std::fmt::Debug for MappingTrigger {
}
}

#[async_trait]
impl ToAscPtr for MappingTrigger {
fn to_asc_ptr<H: AscHeap>(
async fn to_asc_ptr<H: AscHeap>(
self,
heap: &mut H,
gas: &GasCounter,
Expand Down Expand Up @@ -159,28 +161,31 @@ impl ToAscPtr for MappingTrigger {
>,
_,
_,
>(heap, &(ethereum_event_data, receipt.as_deref()), gas)?
>(heap, &(ethereum_event_data, receipt.as_deref()), gas)
.await?
.erase()
} else if api_version >= &API_VERSION_0_0_6 {
asc_new::<
AscEthereumEvent<AscEthereumTransaction_0_0_6, AscEthereumBlock_0_0_6>,
_,
_,
>(heap, &ethereum_event_data, gas)?
>(heap, &ethereum_event_data, gas)
.await?
.erase()
} else if api_version >= &API_VERSION_0_0_2 {
asc_new::<
AscEthereumEvent<AscEthereumTransaction_0_0_2, AscEthereumBlock>,
_,
_,
>(heap, &ethereum_event_data, gas)?
>(heap, &ethereum_event_data, gas)
.await?
.erase()
} else {
asc_new::<
AscEthereumEvent<AscEthereumTransaction_0_0_1, AscEthereumBlock>,
_,
_,
>(heap, &ethereum_event_data, gas)?
>(heap, &ethereum_event_data, gas).await?
.erase()
}
}
Expand All @@ -197,25 +202,33 @@ impl ToAscPtr for MappingTrigger {
AscEthereumCall_0_0_3<AscEthereumTransaction_0_0_6, AscEthereumBlock_0_0_6>,
_,
_,
>(heap, &call, gas)?
>(heap, &call, gas)
.await?
.erase()
} else if heap.api_version() >= &Version::new(0, 0, 3) {
asc_new::<
AscEthereumCall_0_0_3<AscEthereumTransaction_0_0_2, AscEthereumBlock>,
_,
_,
>(heap, &call, gas)?
>(heap, &call, gas)
.await?
.erase()
} else {
asc_new::<AscEthereumCall, _, _>(heap, &call, gas)?.erase()
asc_new::<AscEthereumCall, _, _>(heap, &call, gas)
.await?
.erase()
}
}
MappingTrigger::Block { block } => {
let block = EthereumBlockData::from(block.as_ref());
if heap.api_version() >= &Version::new(0, 0, 6) {
asc_new::<AscEthereumBlock_0_0_6, _, _>(heap, &block, gas)?.erase()
asc_new::<AscEthereumBlock_0_0_6, _, _>(heap, &block, gas)
.await?
.erase()
} else {
asc_new::<AscEthereumBlock, _, _>(heap, &block, gas)?.erase()
asc_new::<AscEthereumBlock, _, _>(heap, &block, gas)
.await?
.erase()
}
}
})
Expand Down
Loading