Skip to content

Commit 911142d

Browse files
committed
Timeout Executor approach
1 parent 3cbb198 commit 911142d

File tree

5 files changed

+181
-166
lines changed

5 files changed

+181
-166
lines changed

lib/executor/src/executors/error.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
use std::time::Duration;
22

3+
use bytes::{BufMut, Bytes, BytesMut};
4+
5+
use crate::response::graphql_error::GraphQLError;
6+
37
#[derive(thiserror::Error, Debug, Clone)]
48
pub enum SubgraphExecutorError {
59
#[error("Failed to parse endpoint \"{0}\" as URI: {1}")]
@@ -10,6 +14,20 @@ pub enum SubgraphExecutorError {
1014
RequestFailure(String, String),
1115
#[error("Failed to serialize variable \"{0}\": {1}")]
1216
VariablesSerializationFailure(String, String),
13-
#[error("Request to subgraph \"{0}\" timed out after {1:?}")]
14-
RequestTimeout(String, Duration),
17+
#[error("Failed to parse timeout duration from expression: {0}")]
18+
TimeoutExpressionParseFailure(String),
19+
#[error("Request timed out after {0:?}")]
20+
RequestTimeout(Duration),
21+
}
22+
pub fn error_to_graphql_bytes(endpoint: &http::Uri, e: SubgraphExecutorError) -> Bytes {
23+
let graphql_error: GraphQLError =
24+
format!("Failed to execute request to subgraph {}: {}", endpoint, e).into();
25+
let errors = vec![graphql_error];
26+
// This unwrap is safe as GraphQLError serialization shouldn't fail.
27+
let errors_bytes = sonic_rs::to_vec(&errors).unwrap();
28+
let mut buffer = BytesMut::new();
29+
buffer.put_slice(b"{\"errors\":");
30+
buffer.put_slice(&errors_bytes);
31+
buffer.put_slice(b"}");
32+
buffer.freeze()
1533
}

lib/executor/src/executors/http.rs

Lines changed: 15 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,25 @@
11
use std::sync::Arc;
2-
use std::time::Duration;
32

43
use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse};
5-
use crate::executors::timeout::HTTPTimeout;
64
use dashmap::DashMap;
75
use futures::TryFutureExt;
86
use hive_router_config::traffic_shaping::TrafficShapingExecutorConfig;
9-
use hyper::body::Incoming;
107
use tokio::sync::OnceCell;
118

129
use async_trait::async_trait;
1310

14-
use bytes::{BufMut, Bytes, BytesMut};
11+
use bytes::{BufMut, Bytes};
12+
use http::HeaderMap;
1513
use http::HeaderValue;
16-
use http::{HeaderMap, Request, Response};
1714
use http_body_util::BodyExt;
1815
use http_body_util::Full;
1916
use hyper::Version;
2017
use hyper_tls::HttpsConnector;
2118
use hyper_util::client::legacy::{connect::HttpConnector, Client};
2219
use tokio::sync::Semaphore;
23-
use tracing::warn;
2420

2521
use crate::executors::common::{HttpExecutionRequest, HttpExecutionResponse};
26-
use crate::executors::error::SubgraphExecutorError;
27-
use crate::response::graphql_error::GraphQLError;
22+
use crate::executors::error::{error_to_graphql_bytes, SubgraphExecutorError};
2823
use crate::utils::consts::CLOSE_BRACE;
2924
use crate::utils::consts::COLON;
3025
use crate::utils::consts::COMMA;
@@ -39,7 +34,6 @@ pub struct HTTPSubgraphExecutor {
3934
pub semaphore: Arc<Semaphore>,
4035
pub config: Arc<TrafficShapingExecutorConfig>,
4136
pub in_flight_requests: Arc<DashMap<u64, Arc<OnceCell<SharedResponse>>, ABuildHasher>>,
42-
pub timeout: Option<HTTPTimeout>,
4337
}
4438

4539
const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{";
@@ -63,29 +57,13 @@ impl HTTPSubgraphExecutor {
6357
HeaderValue::from_static("keep-alive"),
6458
);
6559

66-
let timeout = if let Some(timeout_config) = &config.timeout {
67-
match HTTPTimeout::try_from(timeout_config) {
68-
Ok(timeout) => Some(timeout),
69-
Err(diagnostic) => {
70-
warn!(
71-
"Failed to parse timeout expression for subgraph {}: {:#?}",
72-
endpoint, diagnostic
73-
);
74-
None
75-
}
76-
}
77-
} else {
78-
None
79-
};
80-
8160
Self {
8261
endpoint,
8362
http_client,
8463
header_map,
8564
semaphore,
8665
config,
8766
in_flight_requests,
88-
timeout,
8967
}
9068
}
9169

@@ -137,23 +115,10 @@ impl HTTPSubgraphExecutor {
137115
Ok(body)
138116
}
139117

140-
pub async fn send_request_to_client(
141-
&self,
142-
req: Request<Full<Bytes>>,
143-
) -> Result<Response<Incoming>, SubgraphExecutorError> {
144-
self.http_client
145-
.request(req)
146-
.map_err(|e| {
147-
SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string())
148-
})
149-
.await
150-
}
151-
152118
async fn _send_request(
153119
&self,
154120
body: Vec<u8>,
155121
headers: HeaderMap,
156-
timeout: Option<Duration>,
157122
) -> Result<SharedResponse, SubgraphExecutorError> {
158123
let mut req = hyper::Request::builder()
159124
.method(http::Method::POST)
@@ -166,11 +131,13 @@ impl HTTPSubgraphExecutor {
166131

167132
*req.headers_mut() = headers;
168133

169-
let res = if let Some(timeout) = timeout {
170-
self.send_request_with_timeout(req, timeout).await?
171-
} else {
172-
self.send_request_to_client(req).await?
173-
};
134+
let res = self
135+
.http_client
136+
.request(req)
137+
.map_err(|e| {
138+
SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string())
139+
})
140+
.await?;
174141

175142
let (parts, body) = res.into_parts();
176143

@@ -186,22 +153,6 @@ impl HTTPSubgraphExecutor {
186153
headers: parts.headers,
187154
})
188155
}
189-
190-
fn error_to_graphql_bytes(&self, e: SubgraphExecutorError) -> Bytes {
191-
let graphql_error: GraphQLError = format!(
192-
"Failed to execute request to subgraph {}: {}",
193-
self.endpoint, e
194-
)
195-
.into();
196-
let errors = vec![graphql_error];
197-
// This unwrap is safe as GraphQLError serialization shouldn't fail.
198-
let errors_bytes = sonic_rs::to_vec(&errors).unwrap();
199-
let mut buffer = BytesMut::new();
200-
buffer.put_slice(b"{\"errors\":");
201-
buffer.put_slice(&errors_bytes);
202-
buffer.put_slice(b"}");
203-
buffer.freeze()
204-
}
205156
}
206157

207158
#[async_trait]
@@ -214,7 +165,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
214165
Ok(body) => body,
215166
Err(e) => {
216167
return HttpExecutionResponse {
217-
body: self.error_to_graphql_bytes(e),
168+
body: error_to_graphql_bytes(&self.endpoint, e),
218169
headers: Default::default(),
219170
}
220171
}
@@ -229,14 +180,13 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
229180
// This unwrap is safe because the semaphore is never closed during the application's lifecycle.
230181
// `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
231182
let _permit = self.semaphore.acquire().await.unwrap();
232-
let timeout = self.get_timeout_duration(execution_request.client_request);
233-
return match self._send_request(body, headers, timeout).await {
183+
return match self._send_request(body, headers).await {
234184
Ok(shared_response) => HttpExecutionResponse {
235185
body: shared_response.body,
236186
headers: shared_response.headers,
237187
},
238188
Err(e) => HttpExecutionResponse {
239-
body: self.error_to_graphql_bytes(e),
189+
body: error_to_graphql_bytes(&self.endpoint, e),
240190
headers: Default::default(),
241191
},
242192
};
@@ -255,12 +205,11 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
255205

256206
let response_result = cell
257207
.get_or_try_init(|| async {
258-
let timeout = self.get_timeout_duration(execution_request.client_request);
259208
let res = {
260209
// This unwrap is safe because the semaphore is never closed during the application's lifecycle.
261210
// `acquire()` only fails if the semaphore is closed, so this will always return `Ok`.
262211
let _permit = self.semaphore.acquire().await.unwrap();
263-
self._send_request(body, headers, timeout).await
212+
self._send_request(body, headers).await
264213
};
265214
// It's important to remove the entry from the map before returning the result.
266215
// This ensures that once the OnceCell is set, no future requests can join it.
@@ -276,7 +225,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
276225
headers: shared_response.headers.clone(),
277226
},
278227
Err(e) => HttpExecutionResponse {
279-
body: self.error_to_graphql_bytes(e.clone()),
228+
body: error_to_graphql_bytes(&self.endpoint, e.clone()),
280229
headers: Default::default(),
281230
},
282231
}

lib/executor/src/executors/map.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::{
2020
dedupe::{ABuildHasher, SharedResponse},
2121
error::SubgraphExecutorError,
2222
http::HTTPSubgraphExecutor,
23+
timeout::TimeoutExecutor,
2324
},
2425
response::graphql_error::GraphQLError,
2526
};
@@ -130,15 +131,23 @@ impl SubgraphExecutorMap {
130131
.map(|cfg| Arc::new(cfg.clone()))
131132
.unwrap_or_else(|| global_config_arc.clone());
132133

133-
let executor = HTTPSubgraphExecutor::new(
134-
endpoint_uri,
134+
let timeout_config = subgraph_config.and_then(|cfg| cfg.timeout.as_ref());
135+
136+
let mut executor = HTTPSubgraphExecutor::new(
137+
endpoint_uri.clone(),
135138
http_client,
136139
semaphore,
137-
config_arc,
140+
config_arc.clone(),
138141
inflight_requests,
139-
);
142+
)
143+
.to_boxed_arc();
144+
145+
if let Some(timeout_config) = timeout_config {
146+
executor = TimeoutExecutor::try_new(endpoint_uri, timeout_config, executor)?
147+
.to_boxed_arc();
148+
}
140149

141-
Ok((subgraph_name, executor.to_boxed_arc()))
150+
Ok((subgraph_name, executor))
142151
})
143152
.collect::<Result<HashMap<_, _>, SubgraphExecutorError>>()?;
144153

0 commit comments

Comments
 (0)