1+ use std:: collections:: BTreeMap ;
12use std:: sync:: Arc ;
23
4+ use crate :: execution:: client_request_details:: ClientRequestDetails ;
35use crate :: executors:: common:: HttpExecutionResponse ;
46use crate :: executors:: dedupe:: { request_fingerprint, ABuildHasher , SharedResponse } ;
57use dashmap:: DashMap ;
6- use hive_router_config:: HiveRouterConfig ;
8+ use hive_router_config:: traffic_shaping :: DurationOrExpression ;
79use tokio:: sync:: OnceCell ;
810
911use async_trait:: async_trait;
@@ -18,6 +20,7 @@ use hyper_tls::HttpsConnector;
1820use hyper_util:: client:: legacy:: { connect:: HttpConnector , Client } ;
1921use tokio:: sync:: Semaphore ;
2022use tracing:: debug;
23+ use vrl:: core:: Value as VrlValue ;
2124
2225use crate :: executors:: common:: HttpExecutionRequest ;
2326use crate :: executors:: error:: SubgraphExecutorError ;
@@ -35,8 +38,9 @@ pub struct HTTPSubgraphExecutor {
3538 pub http_client : Arc < Client < HttpsConnector < HttpConnector > , Full < Bytes > > > ,
3639 pub header_map : HeaderMap ,
3740 pub semaphore : Arc < Semaphore > ,
38- pub config : Arc < HiveRouterConfig > ,
41+ pub dedupe_enabled : bool ,
3942 pub in_flight_requests : Arc < DashMap < u64 , Arc < OnceCell < SharedResponse > > , ABuildHasher > > ,
43+ pub timeout : DurationOrExpression ,
4044}
4145
4246const FIRST_VARIABLE_STR : & [ u8 ] = b",\" variables\" :{" ;
@@ -50,8 +54,9 @@ impl HTTPSubgraphExecutor {
5054 endpoint : http:: Uri ,
5155 http_client : Arc < HttpClient > ,
5256 semaphore : Arc < Semaphore > ,
53- config : Arc < HiveRouterConfig > ,
57+ dedupe_enabled : bool ,
5458 in_flight_requests : Arc < DashMap < u64 , Arc < OnceCell < SharedResponse > > , ABuildHasher > > ,
59+ timeout : DurationOrExpression ,
5560 ) -> Self {
5661 let mut header_map = HeaderMap :: new ( ) ;
5762 header_map. insert (
@@ -69,14 +74,15 @@ impl HTTPSubgraphExecutor {
6974 http_client,
7075 header_map,
7176 semaphore,
72- config ,
77+ dedupe_enabled ,
7378 in_flight_requests,
79+ timeout,
7480 }
7581 }
7682
77- fn build_request_body < ' a > (
83+ fn build_request_body (
7884 & self ,
79- execution_request : & HttpExecutionRequest < ' a > ,
85+ execution_request : & HttpExecutionRequest < ' _ , ' _ > ,
8086 ) -> Result < Vec < u8 > , SubgraphExecutorError > {
8187 let mut body = Vec :: with_capacity ( 4096 ) ;
8288 body. put ( FIRST_QUOTE_STR ) ;
@@ -137,6 +143,7 @@ impl HTTPSubgraphExecutor {
137143 & self ,
138144 body : Vec < u8 > ,
139145 headers : HeaderMap ,
146+ client_request : & ClientRequestDetails < ' _ , ' _ > ,
140147 ) -> Result < SharedResponse , SubgraphExecutorError > {
141148 let mut req = hyper:: Request :: builder ( )
142149 . method ( http:: Method :: POST )
@@ -151,9 +158,62 @@ impl HTTPSubgraphExecutor {
151158
152159 debug ! ( "making http request to {}" , self . endpoint. to_string( ) ) ;
153160
154- let res = self . http_client . request ( req) . await . map_err ( |e| {
155- SubgraphExecutorError :: RequestFailure ( self . endpoint . to_string ( ) , e. to_string ( ) )
156- } ) ?;
161+ let timeout = match & self . timeout {
162+ DurationOrExpression :: Duration ( dur) => * dur,
163+ DurationOrExpression :: Expression ( expr) => {
164+ let value =
165+ VrlValue :: Object ( BTreeMap :: from ( [ ( "request" . into ( ) , client_request. into ( ) ) ] ) ) ;
166+ let result = expr. execute ( value) . map_err ( |err| {
167+ SubgraphExecutorError :: TimeoutExpressionResolutionFailure ( err. to_string ( ) )
168+ } ) ?;
169+ match result {
170+ VrlValue :: Integer ( i) if i >= 0 => std:: time:: Duration :: from_millis ( i as u64 ) ,
171+ VrlValue :: Float ( f) => {
172+ let f = f. into_inner ( ) ;
173+ if f >= 0.0 {
174+ std:: time:: Duration :: from_millis ( f as u64 )
175+ } else {
176+ return Err ( SubgraphExecutorError :: TimeoutExpressionResolutionFailure (
177+ "Timeout expression resolved to a negative float" . to_string ( ) ,
178+ ) ) ;
179+ }
180+ }
181+ VrlValue :: Bytes ( b) => {
182+ let str: String = String :: from_utf8 ( b. to_vec ( ) ) . map_err ( |e| {
183+ SubgraphExecutorError :: TimeoutExpressionResolutionFailure ( format ! (
184+ "Failed to parse duration string from bytes: {}" ,
185+ e
186+ ) )
187+ } ) ?;
188+ let parsed = humantime:: parse_duration ( & str) . map_err ( |e| {
189+ SubgraphExecutorError :: TimeoutExpressionResolutionFailure ( format ! (
190+ "Failed to parse duration string '{}': {}" ,
191+ str , e
192+ ) )
193+ } ) ?;
194+ parsed
195+ }
196+ _ => {
197+ return Err ( SubgraphExecutorError :: TimeoutExpressionResolutionFailure (
198+ "Timeout expression did not resolve to a non-negative integer or float"
199+ . to_string ( ) ,
200+ ) ) ;
201+ }
202+ }
203+ }
204+ } ;
205+
206+ let res = tokio:: time:: timeout ( timeout, self . http_client . request ( req) )
207+ . await
208+ . map_err ( |_| {
209+ SubgraphExecutorError :: RequestTimeout (
210+ self . endpoint . to_string ( ) ,
211+ timeout. as_millis ( ) as u64 ,
212+ )
213+ } ) ?
214+ . map_err ( |e| {
215+ SubgraphExecutorError :: RequestFailure ( self . endpoint . to_string ( ) , e. to_string ( ) )
216+ } ) ?;
157217
158218 debug ! (
159219 "http request to {} completed, status: {}" ,
@@ -210,9 +270,9 @@ impl HTTPSubgraphExecutor {
210270#[ async_trait]
211271impl SubgraphExecutor for HTTPSubgraphExecutor {
212272 #[ tracing:: instrument( skip_all, fields( subgraph_name = self . subgraph_name) ) ]
213- async fn execute < ' a > (
273+ async fn execute < ' exec , ' req > (
214274 & self ,
215- execution_request : HttpExecutionRequest < ' a > ,
275+ execution_request : HttpExecutionRequest < ' exec , ' req > ,
216276 ) -> HttpExecutionResponse {
217277 let body = match self . build_request_body ( & execution_request) {
218278 Ok ( body) => body,
@@ -230,11 +290,14 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
230290 headers. insert ( key, value. clone ( ) ) ;
231291 } ) ;
232292
233- if !self . config . traffic_shaping . dedupe_enabled || !execution_request. dedupe {
293+ if !self . dedupe_enabled || !execution_request. dedupe {
234294 // This unwrap is safe because the semaphore is never closed during the application's lifecycle.
235295 // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
236296 let _permit = self . semaphore . acquire ( ) . await . unwrap ( ) ;
237- return match self . _send_request ( body, headers) . await {
297+ return match self
298+ . _send_request ( body, headers, execution_request. client_request )
299+ . await
300+ {
238301 Ok ( shared_response) => HttpExecutionResponse {
239302 body : shared_response. body ,
240303 headers : shared_response. headers ,
@@ -266,7 +329,8 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
266329 // This unwrap is safe because the semaphore is never closed during the application's lifecycle.
267330 // `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
268331 let _permit = self . semaphore . acquire ( ) . await . unwrap ( ) ;
269- self . _send_request ( body, headers) . await
332+ self . _send_request ( body, headers, execution_request. client_request )
333+ . await
270334 } ;
271335 // It's important to remove the entry from the map before returning the result.
272336 // This ensures that once the OnceCell is set, no future requests can join it.
0 commit comments