Skip to content

Commit 8444cd5

Browse files
committed
chain/ethereum: Implement support for ws and ipc
1 parent 441acdb commit 8444cd5

File tree

2 files changed

+53
-57
lines changed

2 files changed

+53
-57
lines changed

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ use futures03::{future::BoxFuture, stream::FuturesUnordered};
22
use graph::abi;
33
use graph::abi::DynSolValueExt;
44
use graph::abi::FunctionExt;
5-
use graph::alloy_todo;
65
use graph::blockchain::client::ChainClient;
76
use graph::blockchain::BlockHash;
87
use graph::blockchain::ChainIdentifier;
98
use graph::blockchain::ExtendedBlockPtr;
9+
use graph::components::ethereum::*;
1010
use graph::components::transaction_receipt::LightTransactionReceipt;
1111
use graph::data::store::ethereum::call;
1212
use graph::data::store::scalar;
@@ -52,7 +52,6 @@ use graph::{
5252
ChainStore, CheapClone, DynTryFuture, Error, EthereumCallCache, Logger, TimeoutError,
5353
},
5454
};
55-
use graph::{components::ethereum::*, prelude::web3::api::Web3};
5655
use itertools::Itertools;
5756
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
5857
use std::convert::TryFrom;
@@ -92,7 +91,6 @@ type AlloyProvider = FillProvider<
9291
pub struct EthereumAdapter {
9392
logger: Logger,
9493
provider: String,
95-
web3: Arc<Web3<Transport>>,
9694
alloy: Arc<AlloyProvider>,
9795
metrics: Arc<ProviderEthRpcMetrics>,
9896
supports_eip_1898: bool,
@@ -105,7 +103,6 @@ impl std::fmt::Debug for EthereumAdapter {
105103
f.debug_struct("EthereumAdapter")
106104
.field("logger", &self.logger)
107105
.field("provider", &self.provider)
108-
.field("web3", &self.web3)
109106
.field("alloy", &"<Provider>")
110107
.field("metrics", &self.metrics)
111108
.field("supports_eip_1898", &self.supports_eip_1898)
@@ -120,7 +117,6 @@ impl CheapClone for EthereumAdapter {
120117
Self {
121118
logger: self.logger.clone(),
122119
provider: self.provider.clone(),
123-
web3: self.web3.cheap_clone(),
124120
alloy: self.alloy.clone(),
125121
metrics: self.metrics.cheap_clone(),
126122
supports_eip_1898: self.supports_eip_1898,
@@ -143,28 +139,28 @@ impl EthereumAdapter {
143139
supports_eip_1898: bool,
144140
call_only: bool,
145141
) -> Self {
146-
let rpc_url = match &transport {
147-
Transport::RPC {
148-
client: _,
149-
metrics: _,
150-
provider: _,
151-
url: rpc_url,
152-
} => rpc_url.clone(),
153-
Transport::IPC(_ipc) => alloy_todo!(),
154-
Transport::WS(_web_socket) => alloy_todo!(),
142+
let alloy = match &transport {
143+
Transport::RPC { client, .. } => Arc::new(
144+
alloy::providers::ProviderBuilder::new()
145+
.connect_client(alloy::rpc::client::RpcClient::new(client.clone(), false)),
146+
),
147+
Transport::IPC(ipc_connect) => Arc::new(
148+
alloy::providers::ProviderBuilder::new()
149+
.connect_ipc(ipc_connect.clone())
150+
.await
151+
.unwrap(),
152+
),
153+
Transport::WS(ws_connect) => Arc::new(
154+
alloy::providers::ProviderBuilder::new()
155+
.connect_ws(ws_connect.clone())
156+
.await
157+
.unwrap(),
158+
),
155159
};
156-
let web3 = Arc::new(Web3::new(transport));
157-
let alloy = Arc::new(
158-
alloy::providers::ProviderBuilder::new()
159-
.connect(&rpc_url)
160-
.await
161-
.unwrap(),
162-
);
163160

164161
EthereumAdapter {
165162
logger,
166163
provider,
167-
web3,
168164
alloy,
169165
metrics: provider_metrics,
170166
supports_eip_1898,
@@ -305,7 +301,7 @@ impl EthereumAdapter {
305301
// cached. The result is not used for anything critical, so it is fine to be lazy.
306302
async fn check_block_receipt_support_and_update_cache(
307303
&self,
308-
alloy: Arc<dyn alloy::providers::Provider>,
304+
alloy: Arc<dyn Provider>,
309305
block_hash: B256,
310306
supports_eip_1898: bool,
311307
call_only: bool,
@@ -2156,7 +2152,7 @@ async fn filter_call_triggers_from_unsuccessful_transactions(
21562152

21572153
/// Deprecated. Wraps the [`fetch_transaction_receipts_in_batch`] in a retry loop.
21582154
async fn fetch_transaction_receipts_in_batch_with_retry(
2159-
alloy: Arc<dyn alloy::providers::Provider>,
2155+
alloy: Arc<dyn Provider>,
21602156
hashes: Vec<B256>,
21612157
block_hash: B256,
21622158
logger: Logger,
@@ -2182,7 +2178,7 @@ async fn fetch_transaction_receipts_in_batch_with_retry(
21822178

21832179
/// Deprecated. Attempts to fetch multiple transaction receipts in a batching context.
21842180
async fn fetch_transaction_receipts_in_batch(
2185-
alloy: Arc<dyn alloy::providers::Provider>,
2181+
alloy: Arc<dyn Provider>,
21862182
hashes: Vec<B256>,
21872183
block_hash: B256,
21882184
logger: Logger,
@@ -2212,7 +2208,7 @@ async fn fetch_transaction_receipts_in_batch(
22122208
}
22132209

22142210
async fn batch_get_transaction_receipts(
2215-
provider: Arc<dyn alloy::providers::Provider>,
2211+
provider: Arc<dyn Provider>,
22162212
tx_hashes: Vec<B256>,
22172213
) -> Result<Vec<Option<alloy::rpc::types::TransactionReceipt>>, Box<dyn std::error::Error>> {
22182214
let mut batch = alloy::rpc::client::BatchRequest::new(provider.client());
@@ -2242,7 +2238,7 @@ async fn batch_get_transaction_receipts(
22422238
}
22432239

22442240
pub(crate) async fn check_block_receipt_support(
2245-
alloy: Arc<dyn alloy::providers::Provider>,
2241+
alloy: Arc<dyn Provider>,
22462242
block_hash: B256,
22472243
supports_eip_1898: bool,
22482244
call_only: bool,
@@ -2271,7 +2267,7 @@ pub(crate) async fn check_block_receipt_support(
22712267
// based on whether block receipts are supported or individual transaction receipts
22722268
// need to be fetched.
22732269
async fn fetch_receipts_with_retry(
2274-
alloy: Arc<dyn alloy::providers::Provider>,
2270+
alloy: Arc<dyn Provider>,
22752271
hashes: Vec<B256>,
22762272
block_hash: B256,
22772273
logger: Logger,
@@ -2285,7 +2281,7 @@ async fn fetch_receipts_with_retry(
22852281

22862282
// Fetches receipts for each transaction in the block individually.
22872283
async fn fetch_individual_receipts_with_retry(
2288-
alloy: Arc<dyn alloy::providers::Provider>,
2284+
alloy: Arc<dyn Provider>,
22892285
hashes: Vec<B256>,
22902286
block_hash: B256,
22912287
logger: Logger,
@@ -2316,7 +2312,7 @@ async fn fetch_individual_receipts_with_retry(
23162312

23172313
/// Fetches transaction receipts of all transactions in a block with `eth_getBlockReceipts` call.
23182314
async fn fetch_block_receipts_with_retry(
2319-
alloy: Arc<dyn alloy::providers::Provider>,
2315+
alloy: Arc<dyn Provider>,
23202316
hashes: Vec<B256>,
23212317
block_hash: B256,
23222318
logger: Logger,
@@ -2361,7 +2357,7 @@ async fn fetch_block_receipts_with_retry(
23612357

23622358
/// Retries fetching a single transaction receipt using alloy, then converts to web3 format.
23632359
async fn fetch_transaction_receipt_with_retry(
2364-
alloy: Arc<dyn alloy::providers::Provider>,
2360+
alloy: Arc<dyn Provider>,
23652361
transaction_hash: B256,
23662362
block_hash: B256,
23672363
logger: Logger,

chain/ethereum/src/transport.rs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,33 @@
11
use graph::components::network_provider::ProviderName;
2-
use graph::endpoint::{EndpointMetrics, RequestLabels};
3-
use jsonrpc_core::types::Call;
4-
use jsonrpc_core::Value;
5-
6-
use web3::transports::{http, ipc, ws};
7-
use web3::RequestId;
2+
use graph::endpoint::EndpointMetrics;
83

94
use graph::prelude::*;
105
use graph::url::Url;
11-
use std::future::Future;
6+
use std::sync::Arc;
7+
8+
// Alloy imports for transport types
9+
use graph::prelude::alloy::transports::{http::Http, ipc::IpcConnect, ws::WsConnect};
1210

13-
/// Abstraction over the different web3 transports.
11+
/// Abstraction over different transport types for Alloy providers.
1412
#[derive(Clone, Debug)]
1513
pub enum Transport {
1614
RPC {
17-
client: http::Http,
15+
client: Http<reqwest::Client>,
1816
metrics: Arc<EndpointMetrics>,
1917
provider: ProviderName,
2018
url: String,
2119
},
22-
IPC(ipc::Ipc),
23-
WS(ws::WebSocket),
20+
IPC(IpcConnect<String>),
21+
WS(WsConnect),
2422
}
2523

2624
impl Transport {
2725
/// Creates an IPC transport.
2826
#[cfg(unix)]
2927
pub async fn new_ipc(ipc: &str) -> Self {
30-
ipc::Ipc::new(ipc)
31-
.await
32-
.map(Transport::IPC)
33-
.expect("Failed to connect to Ethereum IPC")
28+
let transport = IpcConnect::new(ipc.to_string());
29+
30+
Transport::IPC(transport)
3431
}
3532

3633
#[cfg(not(unix))]
@@ -40,10 +37,9 @@ impl Transport {
4037

4138
/// Creates a WebSocket transport.
4239
pub async fn new_ws(ws: &str) -> Self {
43-
ws::WebSocket::new(ws)
44-
.await
45-
.map(Transport::WS)
46-
.expect("Failed to connect to Ethereum WS")
40+
let transport = WsConnect::new(ws.to_string());
41+
42+
Transport::WS(transport)
4743
}
4844

4945
/// Creates a JSON-RPC over HTTP transport.
@@ -65,14 +61,15 @@ impl Transport {
6561
let rpc_url = rpc.to_string();
6662

6763
Transport::RPC {
68-
client: http::Http::with_client(client, rpc),
64+
client: Http::with_client(client, rpc),
6965
metrics,
7066
provider: provider.as_ref().into(),
7167
url: rpc_url,
7268
}
7369
}
7470
}
7571

72+
/*
7673
impl web3::Transport for Transport {
7774
type Out = Pin<Box<dyn Future<Output = Result<Value, web3::error::Error>> + Send + 'static>>;
7875
@@ -84,8 +81,8 @@ impl web3::Transport for Transport {
8481
provider: _,
8582
url: _,
8683
} => client.prepare(method, params),
87-
Transport::IPC(ipc) => ipc.prepare(method, params),
88-
Transport::WS(ws) => ws.prepare(method, params),
84+
Transport::IPC { transport, path: _ } => transport.prepare(method, params),
85+
Transport::WS { transport, url: _ } => transport.prepare(method, params),
8986
}
9087
}
9188
@@ -121,12 +118,14 @@ impl web3::Transport for Transport {
121118
122119
Box::pin(out)
123120
}
124-
Transport::IPC(ipc) => Box::pin(ipc.send(id, request)),
125-
Transport::WS(ws) => Box::pin(ws.send(id, request)),
121+
Transport::IPC { transport, path: _ } => Box::pin(transport.send(id, request)),
122+
Transport::WS { transport, url: _ } => Box::pin(transport.send(id, request)),
126123
}
127124
}
128125
}
126+
*/
129127

128+
/*
130129
impl web3::BatchTransport for Transport {
131130
type Batch = Box<
132131
dyn Future<Output = Result<Vec<Result<Value, web3::error::Error>>, web3::error::Error>>
@@ -145,8 +144,9 @@ impl web3::BatchTransport for Transport {
145144
provider: _,
146145
url: _,
147146
} => Box::new(client.send_batch(requests)),
148-
Transport::IPC(ipc) => Box::new(ipc.send_batch(requests)),
149-
Transport::WS(ws) => Box::new(ws.send_batch(requests)),
147+
Transport::IPC { transport, path: _ } => Box::new(transport.send_batch(requests)),
148+
Transport::WS { transport, url: _ } => Box::new(transport.send_batch(requests)),
150149
}
151150
}
152151
}
152+
*/

0 commit comments

Comments
 (0)