1
1
use graph:: components:: network_provider:: ProviderName ;
2
- use graph:: endpoint:: EndpointMetrics ;
3
-
2
+ use graph:: endpoint:: { ConnectionType , EndpointMetrics , RequestLabels } ;
3
+ use graph :: prelude :: alloy :: rpc :: json_rpc :: { RequestPacket , ResponsePacket } ;
4
4
use graph:: prelude:: * ;
5
5
use graph:: url:: Url ;
6
6
use std:: sync:: Arc ;
7
+ use std:: task:: { Context , Poll } ;
8
+ use tower:: Service ;
9
+
10
+ use alloy:: transports:: { TransportError , TransportFut } ;
7
11
8
- // Alloy imports for transport types
9
12
use graph:: prelude:: alloy:: transports:: { http:: Http , ipc:: IpcConnect , ws:: WsConnect } ;
10
13
11
14
/// Abstraction over different transport types for Alloy providers.
12
15
#[ derive( Clone , Debug ) ]
13
16
pub enum Transport {
14
17
RPC {
15
- client : Http < reqwest :: Client > ,
18
+ client : alloy :: rpc :: client :: RpcClient ,
16
19
metrics : Arc < EndpointMetrics > ,
17
20
provider : ProviderName ,
18
21
url : String ,
@@ -43,9 +46,6 @@ impl Transport {
43
46
}
44
47
45
48
/// Creates a JSON-RPC over HTTP transport.
46
- ///
47
- /// Note: JSON-RPC over HTTP doesn't always support subscribing to new
48
- /// blocks (one such example is Infura's HTTP endpoint).
49
49
pub fn new_rpc (
50
50
rpc : Url ,
51
51
headers : graph:: http:: HeaderMap ,
@@ -60,93 +60,83 @@ impl Transport {
60
60
61
61
let rpc_url = rpc. to_string ( ) ;
62
62
63
+ // Create HTTP transport with metrics collection
64
+ let http_transport = Http :: with_client ( client, rpc) ;
65
+ let metrics_transport =
66
+ MetricsHttp :: new ( http_transport, metrics. clone ( ) , provider. as_ref ( ) . into ( ) ) ;
67
+ let rpc_client = alloy:: rpc:: client:: RpcClient :: new ( metrics_transport, false ) ;
68
+
63
69
Transport :: RPC {
64
- client : Http :: with_client ( client , rpc ) ,
70
+ client : rpc_client ,
65
71
metrics,
66
72
provider : provider. as_ref ( ) . into ( ) ,
67
73
url : rpc_url,
68
74
}
69
75
}
70
76
}
71
77
72
- /*
73
- impl web3::Transport for Transport {
74
- type Out = Pin<Box<dyn Future<Output = Result<Value, web3::error::Error>> + Send + 'static>>;
75
-
76
- fn prepare(&self, method: &str, params: Vec<Value>) -> (RequestId, Call) {
77
- match self {
78
- Transport::RPC {
79
- client,
80
- metrics: _,
81
- provider: _,
82
- url: _,
83
- } => client.prepare(method, params),
84
- Transport::IPC { transport, path: _ } => transport.prepare(method, params),
85
- Transport::WS { transport, url: _ } => transport.prepare(method, params),
78
+ /// Custom HTTP transport wrapper that collects metrics
79
+ #[ derive( Clone ) ]
80
+ pub struct MetricsHttp {
81
+ inner : Http < reqwest:: Client > ,
82
+ metrics : Arc < EndpointMetrics > ,
83
+ provider : ProviderName ,
84
+ }
85
+
86
+ impl MetricsHttp {
87
+ pub fn new (
88
+ inner : Http < reqwest:: Client > ,
89
+ metrics : Arc < EndpointMetrics > ,
90
+ provider : ProviderName ,
91
+ ) -> Self {
92
+ Self {
93
+ inner,
94
+ metrics,
95
+ provider,
86
96
}
87
97
}
98
+ }
99
+
100
+ // Implement tower::Service trait for MetricsHttp to intercept RPC calls
101
+ impl Service < RequestPacket > for MetricsHttp {
102
+ type Response = ResponsePacket ;
103
+ type Error = TransportError ;
104
+ type Future = TransportFut < ' static > ;
105
+
106
+ fn poll_ready ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
107
+ self . inner . poll_ready ( cx)
108
+ }
88
109
89
- fn send(&self, id: RequestId, request: Call) -> Self::Out {
90
- match self {
91
- Transport::RPC {
92
- client,
93
- metrics,
110
+ fn call ( & mut self , request : RequestPacket ) -> Self :: Future {
111
+ let metrics = self . metrics . clone ( ) ;
112
+ let provider = self . provider . clone ( ) ;
113
+ let mut inner = self . inner . clone ( ) ;
114
+
115
+ Box :: pin ( async move {
116
+ // Extract method name from request
117
+ let method = match & request {
118
+ RequestPacket :: Single ( req) => req. method ( ) . to_string ( ) ,
119
+ RequestPacket :: Batch ( reqs) => reqs
120
+ . first ( )
121
+ . map ( |r| r. method ( ) . to_string ( ) )
122
+ . unwrap_or_else ( || "batch" . to_string ( ) ) ,
123
+ } ;
124
+
125
+ let labels = RequestLabels {
94
126
provider,
95
- url: _,
96
- } => {
97
- let metrics = metrics.cheap_clone();
98
- let client = client.clone();
99
- let method = match request {
100
- Call::MethodCall(ref m) => m.method.as_str(),
101
- _ => "unknown",
102
- };
103
-
104
- let labels = RequestLabels {
105
- provider: provider.clone(),
106
- req_type: method.into(),
107
- conn_type: graph::endpoint::ConnectionType::Rpc,
108
- };
109
- let out = async move {
110
- let out = client.send(id, request).await;
111
- match out {
112
- Ok(_) => metrics.success(&labels),
113
- Err(_) => metrics.failure(&labels),
114
- }
115
-
116
- out
117
- };
118
-
119
- Box::pin(out)
127
+ req_type : method. into ( ) ,
128
+ conn_type : ConnectionType :: Rpc ,
129
+ } ;
130
+
131
+ // Call inner transport and track metrics
132
+ let result = inner. call ( request) . await ;
133
+
134
+ match & result {
135
+ Ok ( _) => metrics. success ( & labels) ,
136
+ Err ( _) => metrics. failure ( & labels) ,
120
137
}
121
- Transport::IPC { transport, path: _ } => Box::pin(transport.send(id, request)),
122
- Transport::WS { transport, url: _ } => Box::pin(transport.send(id, request)),
123
- }
124
- }
125
- }
126
- */
127
-
128
- /*
129
- impl web3::BatchTransport for Transport {
130
- type Batch = Box<
131
- dyn Future<Output = Result<Vec<Result<Value, web3::error::Error>>, web3::error::Error>>
132
- + Send
133
- + Unpin,
134
- >;
135
-
136
- fn send_batch<T>(&self, requests: T) -> Self::Batch
137
- where
138
- T: IntoIterator<Item = (RequestId, Call)>,
139
- {
140
- match self {
141
- Transport::RPC {
142
- client,
143
- metrics: _,
144
- provider: _,
145
- url: _,
146
- } => Box::new(client.send_batch(requests)),
147
- Transport::IPC { transport, path: _ } => Box::new(transport.send_batch(requests)),
148
- Transport::WS { transport, url: _ } => Box::new(transport.send_batch(requests)),
149
- }
138
+
139
+ result
140
+ } )
150
141
}
151
142
}
152
- */
0 commit comments