11use std:: sync:: Arc ;
2- use std:: time:: Duration ;
32
43use crate :: executors:: dedupe:: { request_fingerprint, ABuildHasher , SharedResponse } ;
5- use crate :: executors:: timeout:: HTTPTimeout ;
64use dashmap:: DashMap ;
75use futures:: TryFutureExt ;
86use hive_router_config:: traffic_shaping:: TrafficShapingExecutorConfig ;
9- use hyper:: body:: Incoming ;
107use tokio:: sync:: OnceCell ;
118
129use async_trait:: async_trait;
1310
14- use bytes:: { BufMut , Bytes , BytesMut } ;
11+ use bytes:: { BufMut , Bytes } ;
12+ use http:: HeaderMap ;
1513use http:: HeaderValue ;
16- use http:: { HeaderMap , Request , Response } ;
1714use http_body_util:: BodyExt ;
1815use http_body_util:: Full ;
1916use hyper:: Version ;
2017use hyper_tls:: HttpsConnector ;
2118use hyper_util:: client:: legacy:: { connect:: HttpConnector , Client } ;
2219use tokio:: sync:: Semaphore ;
23- use tracing:: warn;
2420
2521use crate :: executors:: common:: { HttpExecutionRequest , HttpExecutionResponse } ;
26- use crate :: executors:: error:: SubgraphExecutorError ;
27- use crate :: response:: graphql_error:: GraphQLError ;
22+ use crate :: executors:: error:: { error_to_graphql_bytes, SubgraphExecutorError } ;
2823use crate :: utils:: consts:: CLOSE_BRACE ;
2924use crate :: utils:: consts:: COLON ;
3025use crate :: utils:: consts:: COMMA ;
@@ -39,7 +34,6 @@ pub struct HTTPSubgraphExecutor {
3934 pub semaphore : Arc < Semaphore > ,
4035 pub config : Arc < TrafficShapingExecutorConfig > ,
4136 pub in_flight_requests : Arc < DashMap < u64 , Arc < OnceCell < SharedResponse > > , ABuildHasher > > ,
42- pub timeout : Option < HTTPTimeout > ,
4337}
4438
4539const FIRST_VARIABLE_STR : & [ u8 ] = b",\" variables\" :{" ;
@@ -63,29 +57,13 @@ impl HTTPSubgraphExecutor {
6357 HeaderValue :: from_static ( "keep-alive" ) ,
6458 ) ;
6559
66- let timeout = if let Some ( timeout_config) = & config. timeout {
67- match HTTPTimeout :: try_from ( timeout_config) {
68- Ok ( timeout) => Some ( timeout) ,
69- Err ( diagnostic) => {
70- warn ! (
71- "Failed to parse timeout expression for subgraph {}: {:#?}" ,
72- endpoint, diagnostic
73- ) ;
74- None
75- }
76- }
77- } else {
78- None
79- } ;
80-
8160 Self {
8261 endpoint,
8362 http_client,
8463 header_map,
8564 semaphore,
8665 config,
8766 in_flight_requests,
88- timeout,
8967 }
9068 }
9169
@@ -137,23 +115,10 @@ impl HTTPSubgraphExecutor {
137115 Ok ( body)
138116 }
139117
140- pub async fn send_request_to_client (
141- & self ,
142- req : Request < Full < Bytes > > ,
143- ) -> Result < Response < Incoming > , SubgraphExecutorError > {
144- self . http_client
145- . request ( req)
146- . map_err ( |e| {
147- SubgraphExecutorError :: RequestFailure ( self . endpoint . to_string ( ) , e. to_string ( ) )
148- } )
149- . await
150- }
151-
152118 async fn _send_request (
153119 & self ,
154120 body : Vec < u8 > ,
155121 headers : HeaderMap ,
156- timeout : Option < Duration > ,
157122 ) -> Result < SharedResponse , SubgraphExecutorError > {
158123 let mut req = hyper:: Request :: builder ( )
159124 . method ( http:: Method :: POST )
@@ -166,11 +131,13 @@ impl HTTPSubgraphExecutor {
166131
167132 * req. headers_mut ( ) = headers;
168133
169- let res = if let Some ( timeout) = timeout {
170- self . send_request_with_timeout ( req, timeout) . await ?
171- } else {
172- self . send_request_to_client ( req) . await ?
173- } ;
134+ let res = self
135+ . http_client
136+ . request ( req)
137+ . map_err ( |e| {
138+ SubgraphExecutorError :: RequestFailure ( self . endpoint . to_string ( ) , e. to_string ( ) )
139+ } )
140+ . await ?;
174141
175142 let ( parts, body) = res. into_parts ( ) ;
176143
@@ -186,22 +153,6 @@ impl HTTPSubgraphExecutor {
186153 headers : parts. headers ,
187154 } )
188155 }
189-
190- fn error_to_graphql_bytes ( & self , e : SubgraphExecutorError ) -> Bytes {
191- let graphql_error: GraphQLError = format ! (
192- "Failed to execute request to subgraph {}: {}" ,
193- self . endpoint, e
194- )
195- . into ( ) ;
196- let errors = vec ! [ graphql_error] ;
197- // This unwrap is safe as GraphQLError serialization shouldn't fail.
198- let errors_bytes = sonic_rs:: to_vec ( & errors) . unwrap ( ) ;
199- let mut buffer = BytesMut :: new ( ) ;
200- buffer. put_slice ( b"{\" errors\" :" ) ;
201- buffer. put_slice ( & errors_bytes) ;
202- buffer. put_slice ( b"}" ) ;
203- buffer. freeze ( )
204- }
205156}
206157
207158#[ async_trait]
@@ -214,7 +165,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
214165 Ok ( body) => body,
215166 Err ( e) => {
216167 return HttpExecutionResponse {
217- body : self . error_to_graphql_bytes ( e) ,
168+ body : error_to_graphql_bytes ( & self . endpoint , e) ,
218169 headers : Default :: default ( ) ,
219170 }
220171 }
@@ -229,14 +180,13 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
229180 // This unwrap is safe because the semaphore is never closed during the application's lifecycle.
230181 // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
231182 let _permit = self . semaphore . acquire ( ) . await . unwrap ( ) ;
232- let timeout = self . get_timeout_duration ( execution_request. client_request ) ;
233- return match self . _send_request ( body, headers, timeout) . await {
183+ return match self . _send_request ( body, headers) . await {
234184 Ok ( shared_response) => HttpExecutionResponse {
235185 body : shared_response. body ,
236186 headers : shared_response. headers ,
237187 } ,
238188 Err ( e) => HttpExecutionResponse {
239- body : self . error_to_graphql_bytes ( e) ,
189+ body : error_to_graphql_bytes ( & self . endpoint , e) ,
240190 headers : Default :: default ( ) ,
241191 } ,
242192 } ;
@@ -255,12 +205,11 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
255205
256206 let response_result = cell
257207 . get_or_try_init ( || async {
258- let timeout = self . get_timeout_duration ( execution_request. client_request ) ;
259208 let res = {
260209 // This unwrap is safe because the semaphore is never closed during the application's lifecycle.
261210 // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
262211 let _permit = self . semaphore . acquire ( ) . await . unwrap ( ) ;
263- self . _send_request ( body, headers, timeout ) . await
212+ self . _send_request ( body, headers) . await
264213 } ;
265214 // It's important to remove the entry from the map before returning the result.
266215 // This ensures that once the OnceCell is set, no future requests can join it.
@@ -276,7 +225,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
276225 headers : shared_response. headers . clone ( ) ,
277226 } ,
278227 Err ( e) => HttpExecutionResponse {
279- body : self . error_to_graphql_bytes ( e. clone ( ) ) ,
228+ body : error_to_graphql_bytes ( & self . endpoint , e. clone ( ) ) ,
280229 headers : Default :: default ( ) ,
281230 } ,
282231 }
0 commit comments