Skip to content

Commit a4004a3

Browse files
committed
chain/ethereum: Implment rpc metrics collection with alloy
1 parent a3b5e3b commit a4004a3

File tree

5 files changed

+84
-87
lines changed

5 files changed

+84
-87
lines changed

Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ repository = "https://github.com/graphprotocol/graph-node"
3939
license = "MIT OR Apache-2.0"
4040

4141
[workspace.dependencies]
42-
alloy = { version = "1.0.33", features = ["dyn-abi", "json-abi", "full", "arbitrary"] }
42+
alloy = { version = "1.0.33", features = ["dyn-abi", "json-abi", "full", "arbitrary", "json-rpc"] }
4343
alloy-rpc-types = "1.0.33"
4444
anyhow = "1.0"
4545
async-graphql = { version = "7.0.17", features = ["chrono"] }
@@ -95,6 +95,7 @@ thiserror = "2.0.16"
9595
tokio = { version = "1.45.1", features = ["full"] }
9696
tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] }
9797
tonic-build = { version = "0.12.3", features = ["prost"] }
98+
tower = { version = "0.5.1", features = ["full"] }
9899
tower-http = { version = "0.6.6", features = ["cors"] }
99100
wasmparser = "0.118.1"
100101
wasmtime = "33.0.2"

chain/ethereum/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ tiny-keccak = "1.5.0"
1515
hex = "0.4.3"
1616
semver = "1.0.27"
1717
thiserror = { workspace = true }
18+
tower = { workspace = true }
1819

1920
itertools = "0.14.0"
2021

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,9 @@ impl EthereumAdapter {
140140
call_only: bool,
141141
) -> Self {
142142
let alloy = match &transport {
143-
Transport::RPC { client, .. } => Arc::new(
144-
alloy::providers::ProviderBuilder::new()
145-
.connect_client(alloy::rpc::client::RpcClient::new(client.clone(), false)),
146-
),
143+
Transport::RPC { client, .. } => {
144+
Arc::new(alloy::providers::ProviderBuilder::new().connect_client(client.clone()))
145+
}
147146
Transport::IPC(ipc_connect) => Arc::new(
148147
alloy::providers::ProviderBuilder::new()
149148
.connect_ipc(ipc_connect.clone())

chain/ethereum/src/transport.rs

Lines changed: 72 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
use graph::components::network_provider::ProviderName;
2-
use graph::endpoint::EndpointMetrics;
3-
2+
use graph::endpoint::{ConnectionType, EndpointMetrics, RequestLabels};
3+
use graph::prelude::alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};
44
use graph::prelude::*;
55
use graph::url::Url;
66
use std::sync::Arc;
7+
use std::task::{Context, Poll};
8+
use tower::Service;
9+
10+
use alloy::transports::{TransportError, TransportFut};
711

8-
// Alloy imports for transport types
912
use graph::prelude::alloy::transports::{http::Http, ipc::IpcConnect, ws::WsConnect};
1013

1114
/// Abstraction over different transport types for Alloy providers.
1215
#[derive(Clone, Debug)]
1316
pub enum Transport {
1417
RPC {
15-
client: Http<reqwest::Client>,
18+
client: alloy::rpc::client::RpcClient,
1619
metrics: Arc<EndpointMetrics>,
1720
provider: ProviderName,
1821
url: String,
@@ -43,9 +46,6 @@ impl Transport {
4346
}
4447

4548
/// Creates a JSON-RPC over HTTP transport.
46-
///
47-
/// Note: JSON-RPC over HTTP doesn't always support subscribing to new
48-
/// blocks (one such example is Infura's HTTP endpoint).
4949
pub fn new_rpc(
5050
rpc: Url,
5151
headers: graph::http::HeaderMap,
@@ -60,93 +60,83 @@ impl Transport {
6060

6161
let rpc_url = rpc.to_string();
6262

63+
// Create HTTP transport with metrics collection
64+
let http_transport = Http::with_client(client, rpc);
65+
let metrics_transport =
66+
MetricsHttp::new(http_transport, metrics.clone(), provider.as_ref().into());
67+
let rpc_client = alloy::rpc::client::RpcClient::new(metrics_transport, false);
68+
6369
Transport::RPC {
64-
client: Http::with_client(client, rpc),
70+
client: rpc_client,
6571
metrics,
6672
provider: provider.as_ref().into(),
6773
url: rpc_url,
6874
}
6975
}
7076
}
7177

72-
/*
73-
impl web3::Transport for Transport {
74-
type Out = Pin<Box<dyn Future<Output = Result<Value, web3::error::Error>> + Send + 'static>>;
75-
76-
fn prepare(&self, method: &str, params: Vec<Value>) -> (RequestId, Call) {
77-
match self {
78-
Transport::RPC {
79-
client,
80-
metrics: _,
81-
provider: _,
82-
url: _,
83-
} => client.prepare(method, params),
84-
Transport::IPC { transport, path: _ } => transport.prepare(method, params),
85-
Transport::WS { transport, url: _ } => transport.prepare(method, params),
78+
/// Custom HTTP transport wrapper that collects metrics
79+
#[derive(Clone)]
80+
pub struct MetricsHttp {
81+
inner: Http<reqwest::Client>,
82+
metrics: Arc<EndpointMetrics>,
83+
provider: ProviderName,
84+
}
85+
86+
impl MetricsHttp {
87+
pub fn new(
88+
inner: Http<reqwest::Client>,
89+
metrics: Arc<EndpointMetrics>,
90+
provider: ProviderName,
91+
) -> Self {
92+
Self {
93+
inner,
94+
metrics,
95+
provider,
8696
}
8797
}
98+
}
99+
100+
// Implement tower::Service trait for MetricsHttp to intercept RPC calls
101+
impl Service<RequestPacket> for MetricsHttp {
102+
type Response = ResponsePacket;
103+
type Error = TransportError;
104+
type Future = TransportFut<'static>;
105+
106+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
107+
self.inner.poll_ready(cx)
108+
}
88109

89-
fn send(&self, id: RequestId, request: Call) -> Self::Out {
90-
match self {
91-
Transport::RPC {
92-
client,
93-
metrics,
110+
fn call(&mut self, request: RequestPacket) -> Self::Future {
111+
let metrics = self.metrics.clone();
112+
let provider = self.provider.clone();
113+
let mut inner = self.inner.clone();
114+
115+
Box::pin(async move {
116+
// Extract method name from request
117+
let method = match &request {
118+
RequestPacket::Single(req) => req.method().to_string(),
119+
RequestPacket::Batch(reqs) => reqs
120+
.first()
121+
.map(|r| r.method().to_string())
122+
.unwrap_or_else(|| "batch".to_string()),
123+
};
124+
125+
let labels = RequestLabels {
94126
provider,
95-
url: _,
96-
} => {
97-
let metrics = metrics.cheap_clone();
98-
let client = client.clone();
99-
let method = match request {
100-
Call::MethodCall(ref m) => m.method.as_str(),
101-
_ => "unknown",
102-
};
103-
104-
let labels = RequestLabels {
105-
provider: provider.clone(),
106-
req_type: method.into(),
107-
conn_type: graph::endpoint::ConnectionType::Rpc,
108-
};
109-
let out = async move {
110-
let out = client.send(id, request).await;
111-
match out {
112-
Ok(_) => metrics.success(&labels),
113-
Err(_) => metrics.failure(&labels),
114-
}
115-
116-
out
117-
};
118-
119-
Box::pin(out)
127+
req_type: method.into(),
128+
conn_type: ConnectionType::Rpc,
129+
};
130+
131+
// Call inner transport and track metrics
132+
let result = inner.call(request).await;
133+
134+
match &result {
135+
Ok(_) => metrics.success(&labels),
136+
Err(_) => metrics.failure(&labels),
120137
}
121-
Transport::IPC { transport, path: _ } => Box::pin(transport.send(id, request)),
122-
Transport::WS { transport, url: _ } => Box::pin(transport.send(id, request)),
123-
}
124-
}
125-
}
126-
*/
127-
128-
/*
129-
impl web3::BatchTransport for Transport {
130-
type Batch = Box<
131-
dyn Future<Output = Result<Vec<Result<Value, web3::error::Error>>, web3::error::Error>>
132-
+ Send
133-
+ Unpin,
134-
>;
135-
136-
fn send_batch<T>(&self, requests: T) -> Self::Batch
137-
where
138-
T: IntoIterator<Item = (RequestId, Call)>,
139-
{
140-
match self {
141-
Transport::RPC {
142-
client,
143-
metrics: _,
144-
provider: _,
145-
url: _,
146-
} => Box::new(client.send_batch(requests)),
147-
Transport::IPC { transport, path: _ } => Box::new(transport.send_batch(requests)),
148-
Transport::WS { transport, url: _ } => Box::new(transport.send_batch(requests)),
149-
}
138+
139+
result
140+
})
150141
}
151142
}
152-
*/

0 commit comments

Comments
 (0)