Skip to content

Commit 6179f66

Browse files
authored
refactor(executor, router): define ClientRequestDetails only once and avoid all clones, improve how coerce_variables is done (#532)
1 parent 6909074 commit 6179f66

File tree

12 files changed

+125
-146
lines changed

12 files changed

+125
-146
lines changed

bin/router/src/pipeline/coerce_variables.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub struct CoerceVariablesPayload {
2222
pub fn coerce_request_variables(
2323
req: &HttpRequest,
2424
supergraph: &SupergraphData,
25-
execution_params: ExecutionRequest,
25+
execution_params: &mut ExecutionRequest,
2626
normalized_operation: &Arc<GraphQLNormalizationPayload>,
2727
) -> Result<CoerceVariablesPayload, PipelineError> {
2828
if req.method() == Method::GET {
@@ -37,7 +37,7 @@ pub fn coerce_request_variables(
3737

3838
match collect_variables(
3939
&normalized_operation.operation_for_plan,
40-
execution_params.variables,
40+
&mut execution_params.variables,
4141
&supergraph.metadata,
4242
) {
4343
Ok(values) => {

bin/router/src/pipeline/execution.rs

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::borrow::Cow;
21
use std::collections::HashMap;
32
use std::sync::Arc;
43

@@ -11,11 +10,10 @@ use crate::shared_state::RouterSharedState;
1110
use hive_router_plan_executor::execute_query_plan;
1211
use hive_router_plan_executor::execution::jwt_forward::JwtAuthForwardingPlan;
1312
use hive_router_plan_executor::execution::plan::{
14-
ClientRequestDetails, OperationDetails, PlanExecutionOutput, QueryPlanExecutionContext,
13+
ClientRequestDetails, PlanExecutionOutput, QueryPlanExecutionContext,
1514
};
1615
use hive_router_plan_executor::introspection::resolve::IntrospectionContext;
1716
use hive_router_query_planner::planner::plan_nodes::QueryPlan;
18-
use hive_router_query_planner::state::supergraph_state::OperationKind;
1917
use http::HeaderName;
2018
use ntex::web::HttpRequest;
2119

@@ -29,14 +27,14 @@ enum ExposeQueryPlanMode {
2927
}
3028

3129
#[inline]
32-
pub async fn execute_plan<'a>(
33-
req: &mut HttpRequest,
34-
query: Cow<'a, str>,
30+
pub async fn execute_plan(
31+
req: &HttpRequest,
3532
supergraph: &SupergraphData,
3633
app_state: &Arc<RouterSharedState>,
3734
normalized_payload: &Arc<GraphQLNormalizationPayload>,
3835
query_plan_payload: &Arc<QueryPlan>,
3936
variable_payload: &CoerceVariablesPayload,
37+
client_request_details: &ClientRequestDetails<'_, '_>,
4038
) -> Result<PlanExecutionOutput, PipelineError> {
4139
let mut expose_query_plan = ExposeQueryPlanMode::No;
4240

@@ -86,21 +84,7 @@ pub async fn execute_plan<'a>(
8684
headers_plan: &app_state.headers_plan,
8785
variable_values: &variable_payload.variables_map,
8886
extensions,
89-
client_request: ClientRequestDetails {
90-
method: req.method().clone(),
91-
url: req.uri().clone(),
92-
headers: req.headers(),
93-
operation: OperationDetails {
94-
name: normalized_payload.operation_for_plan.name.clone(),
95-
kind: match normalized_payload.operation_for_plan.operation_kind {
96-
Some(OperationKind::Query) => "query",
97-
Some(OperationKind::Mutation) => "mutation",
98-
Some(OperationKind::Subscription) => "subscription",
99-
None => "query",
100-
},
101-
query,
102-
},
103-
},
87+
client_request: client_request_details,
10488
introspection_context: &introspection_context,
10589
operation_type_name: normalized_payload.root_type_name,
10690
jwt_auth_forwarding: &jwt_forward_plan,

bin/router/src/pipeline/execution_request.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use http::Method;
44
use ntex::util::Bytes;
55
use ntex::web::types::Query;
66
use ntex::web::HttpRequest;
7-
use serde::Deserialize;
7+
use serde::{Deserialize, Deserializer};
88
use sonic_rs::Value;
99
use tracing::{trace, warn};
1010

@@ -25,12 +25,22 @@ struct GETQueryParams {
2525
pub struct ExecutionRequest {
2626
pub query: String,
2727
pub operation_name: Option<String>,
28-
pub variables: Option<HashMap<String, Value>>,
28+
#[serde(default, deserialize_with = "deserialize_null_default")]
29+
pub variables: HashMap<String, Value>,
2930
// TODO: We don't use extensions yet, but we definitely will in the future.
3031
#[allow(dead_code)]
3132
pub extensions: Option<HashMap<String, Value>>,
3233
}
3334

35+
fn deserialize_null_default<'de, D, T>(deserializer: D) -> Result<T, D::Error>
36+
where
37+
T: Default + Deserialize<'de>,
38+
D: Deserializer<'de>,
39+
{
40+
let opt = Option::<T>::deserialize(deserializer)?;
41+
Ok(opt.unwrap_or_default())
42+
}
43+
3444
impl TryInto<ExecutionRequest> for GETQueryParams {
3545
type Error = PipelineErrorVariant;
3646

@@ -42,12 +52,12 @@ impl TryInto<ExecutionRequest> for GETQueryParams {
4252

4353
let variables = match self.variables.as_deref() {
4454
Some(v_str) if !v_str.is_empty() => match sonic_rs::from_str(v_str) {
45-
Ok(vars) => Some(vars),
55+
Ok(vars) => vars,
4656
Err(e) => {
4757
return Err(PipelineErrorVariant::FailedToParseVariables(e));
4858
}
4959
},
50-
_ => None,
60+
_ => HashMap::new(),
5161
};
5262

5363
let extensions = match self.extensions.as_deref() {

bin/router/src/pipeline/mod.rs

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{borrow::Cow, sync::Arc};
1+
use std::sync::Arc;
22

33
use hive_router_plan_executor::execution::plan::{
44
ClientRequestDetails, OperationDetails, PlanExecutionOutput,
@@ -110,7 +110,7 @@ pub async fn execute_pipeline(
110110
) -> Result<PlanExecutionOutput, PipelineError> {
111111
perform_csrf_prevention(req, &shared_state.router_config.csrf)?;
112112

113-
let execution_request = get_execution_request(req, body_bytes).await?;
113+
let mut execution_request = get_execution_request(req, body_bytes).await?;
114114
let parser_payload = parse_operation_with_cache(req, shared_state, &execution_request).await?;
115115
validate_operation_with_cache(req, supergraph, schema_state, shared_state, &parser_payload)
116116
.await?;
@@ -123,34 +123,33 @@ pub async fn execute_pipeline(
123123
&parser_payload,
124124
)
125125
.await?;
126-
let query: Cow<'_, str> = Cow::Owned(execution_request.query.clone());
127126
let variable_payload =
128-
coerce_request_variables(req, supergraph, execution_request, &normalize_payload)?;
127+
coerce_request_variables(req, supergraph, &mut execution_request, &normalize_payload)?;
129128

130129
let query_plan_cancellation_token =
131130
CancellationToken::with_timeout(shared_state.router_config.query_planner.timeout);
132131

133-
let progressive_override_ctx =
134-
request_override_context(&shared_state.override_labels_evaluator, || {
135-
ClientRequestDetails {
136-
method: req.method().clone(),
137-
url: req.uri().clone(),
138-
headers: req.headers(),
139-
operation: OperationDetails {
140-
name: normalize_payload.operation_for_plan.name.clone(),
141-
kind: match normalize_payload.operation_for_plan.operation_kind {
142-
Some(OperationKind::Query) => "query",
143-
Some(OperationKind::Mutation) => "mutation",
144-
Some(OperationKind::Subscription) => "subscription",
145-
None => "query",
146-
},
147-
query: query.clone(),
148-
},
149-
}
150-
})
151-
.map_err(|error| {
152-
req.new_pipeline_error(PipelineErrorVariant::LabelEvaluationError(error))
153-
})?;
132+
let client_request_details = ClientRequestDetails {
133+
method: req.method(),
134+
url: req.uri(),
135+
headers: req.headers(),
136+
operation: OperationDetails {
137+
name: normalize_payload.operation_for_plan.name.as_deref(),
138+
kind: match normalize_payload.operation_for_plan.operation_kind {
139+
Some(OperationKind::Query) => "query",
140+
Some(OperationKind::Mutation) => "mutation",
141+
Some(OperationKind::Subscription) => "subscription",
142+
None => "query",
143+
},
144+
query: &execution_request.query,
145+
},
146+
};
147+
148+
let progressive_override_ctx = request_override_context(
149+
&shared_state.override_labels_evaluator,
150+
&client_request_details,
151+
)
152+
.map_err(|error| req.new_pipeline_error(PipelineErrorVariant::LabelEvaluationError(error)))?;
154153

155154
let query_plan_payload = plan_operation_with_cache(
156155
req,
@@ -164,12 +163,12 @@ pub async fn execute_pipeline(
164163

165164
let execution_result = execute_plan(
166165
req,
167-
query,
168166
supergraph,
169167
shared_state,
170168
&normalize_payload,
171169
&query_plan_payload,
172170
&variable_payload,
171+
&client_request_details,
173172
)
174173
.await?;
175174

bin/router/src/pipeline/progressive_override.rs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,11 @@ pub struct RequestOverrideContext {
5151
}
5252

5353
#[inline]
54-
pub fn request_override_context<'req, F>(
54+
pub fn request_override_context<'exec, 'req>(
5555
override_labels_evaluator: &OverrideLabelsEvaluator,
56-
get_client_request: F,
57-
) -> Result<RequestOverrideContext, LabelEvaluationError>
58-
where
59-
F: FnOnce() -> ClientRequestDetails<'req>,
60-
{
61-
let active_flags = override_labels_evaluator.evaluate(get_client_request)?;
56+
client_request_details: &ClientRequestDetails<'exec, 'req>,
57+
) -> Result<RequestOverrideContext, LabelEvaluationError> {
58+
let active_flags = override_labels_evaluator.evaluate(client_request_details)?;
6259

6360
// Generate the random percentage value for this request.
6461
// Percentage is 0 - 100_000_000_000 (100*PERCENTAGE_SCALE_FACTOR)
@@ -161,25 +158,18 @@ impl OverrideLabelsEvaluator {
161158
})
162159
}
163160

164-
pub(crate) fn evaluate<'req, F>(
161+
pub(crate) fn evaluate<'exec, 'req>(
165162
&self,
166-
get_client_request: F,
167-
) -> Result<HashSet<String>, LabelEvaluationError>
168-
where
169-
F: FnOnce() -> ClientRequestDetails<'req>,
170-
{
163+
client_request: &ClientRequestDetails<'exec, 'req>,
164+
) -> Result<HashSet<String>, LabelEvaluationError> {
171165
let mut active_flags = self.static_enabled_labels.clone();
172166

173167
if self.expressions.is_empty() {
174168
return Ok(active_flags);
175169
}
176170

177-
let client_request = get_client_request();
178171
let mut target = VrlTargetValue {
179-
value: VrlValue::Object(BTreeMap::from([(
180-
"request".into(),
181-
(&client_request).into(),
182-
)])),
172+
value: VrlValue::Object(BTreeMap::from([("request".into(), client_request.into())])),
183173
metadata: VrlValue::Object(BTreeMap::new()),
184174
secrets: VrlSecrets::default(),
185175
};

lib/executor/src/execution/plan.rs

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use std::{
2-
borrow::Cow,
3-
collections::{BTreeSet, HashMap},
4-
};
1+
use std::collections::{BTreeSet, HashMap};
52

63
use bytes::{BufMut, Bytes};
74
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
@@ -51,26 +48,26 @@ use crate::{
5148
},
5249
};
5350

54-
pub struct OperationDetails<'a> {
55-
pub name: Option<String>,
56-
pub query: Cow<'a, str>,
57-
pub kind: &'a str,
51+
pub struct OperationDetails<'exec> {
52+
pub name: Option<&'exec str>,
53+
pub query: &'exec str,
54+
pub kind: &'static str,
5855
}
5956

60-
pub struct ClientRequestDetails<'a> {
61-
pub method: Method,
62-
pub url: http::Uri,
63-
pub headers: &'a NtexHeaderMap,
64-
pub operation: OperationDetails<'a>,
57+
pub struct ClientRequestDetails<'exec, 'req> {
58+
pub method: &'req Method,
59+
pub url: &'req http::Uri,
60+
pub headers: &'req NtexHeaderMap,
61+
pub operation: OperationDetails<'exec>,
6562
}
6663

67-
pub struct QueryPlanExecutionContext<'exec> {
64+
pub struct QueryPlanExecutionContext<'exec, 'req> {
6865
pub query_plan: &'exec QueryPlan,
6966
pub projection_plan: &'exec Vec<FieldProjectionPlan>,
7067
pub headers_plan: &'exec HeaderRulesPlan,
7168
pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
7269
pub extensions: Option<HashMap<String, sonic_rs::Value>>,
73-
pub client_request: ClientRequestDetails<'exec>,
70+
pub client_request: &'exec ClientRequestDetails<'exec, 'req>,
7471
pub introspection_context: &'exec IntrospectionContext<'exec, 'static>,
7572
pub operation_type_name: &'exec str,
7673
pub executors: &'exec SubgraphExecutorMap,
@@ -82,8 +79,8 @@ pub struct PlanExecutionOutput {
8279
pub headers: HeaderMap,
8380
}
8481

85-
pub async fn execute_query_plan<'exec>(
86-
ctx: QueryPlanExecutionContext<'exec>,
82+
pub async fn execute_query_plan<'exec, 'req>(
83+
ctx: QueryPlanExecutionContext<'exec, 'req>,
8784
) -> Result<PlanExecutionOutput, PlanExecutionError> {
8885
let init_value = if let Some(introspection_query) = ctx.introspection_context.query {
8986
resolve_introspection(introspection_query, ctx.introspection_context)
@@ -96,7 +93,7 @@ pub async fn execute_query_plan<'exec>(
9693
ctx.variable_values,
9794
ctx.executors,
9895
ctx.introspection_context.metadata,
99-
&ctx.client_request,
96+
ctx.client_request,
10097
ctx.headers_plan,
10198
ctx.jwt_auth_forwarding,
10299
// Deduplicate subgraph requests only if the operation type is a query
@@ -137,11 +134,11 @@ pub async fn execute_query_plan<'exec>(
137134
})
138135
}
139136

140-
pub struct Executor<'exec> {
137+
pub struct Executor<'exec, 'req> {
141138
variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
142139
schema_metadata: &'exec SchemaMetadata,
143140
executors: &'exec SubgraphExecutorMap,
144-
client_request: &'exec ClientRequestDetails<'exec>,
141+
client_request: &'exec ClientRequestDetails<'exec, 'req>,
145142
headers_plan: &'exec HeaderRulesPlan,
146143
jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
147144
dedupe_subgraph_requests: bool,
@@ -231,12 +228,12 @@ struct PreparedFlattenData {
231228
representation_hash_to_index: HashMap<u64, usize>,
232229
}
233230

234-
impl<'exec> Executor<'exec> {
231+
impl<'exec, 'req> Executor<'exec, 'req> {
235232
pub fn new(
236233
variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
237234
executors: &'exec SubgraphExecutorMap,
238235
schema_metadata: &'exec SchemaMetadata,
239-
client_request: &'exec ClientRequestDetails<'exec>,
236+
client_request: &'exec ClientRequestDetails<'exec, 'req>,
240237
headers_plan: &'exec HeaderRulesPlan,
241238
jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
242239
dedupe_subgraph_requests: bool,

lib/executor/src/executors/map.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,11 @@ impl SubgraphExecutorMap {
115115
Ok(subgraph_executor_map)
116116
}
117117

118-
pub async fn execute<'a>(
118+
pub async fn execute<'a, 'req>(
119119
&self,
120120
subgraph_name: &str,
121121
execution_request: HttpExecutionRequest<'a>,
122-
client_request: &ClientRequestDetails<'a>,
122+
client_request: &ClientRequestDetails<'a, 'req>,
123123
) -> HttpExecutionResponse {
124124
match self.get_or_create_executor(subgraph_name, client_request) {
125125
Ok(Some(executor)) => executor.execute(execution_request).await,
@@ -164,7 +164,7 @@ impl SubgraphExecutorMap {
164164
fn get_or_create_executor(
165165
&self,
166166
subgraph_name: &str,
167-
client_request: &ClientRequestDetails<'_>,
167+
client_request: &ClientRequestDetails<'_, '_>,
168168
) -> Result<Option<SubgraphExecutorBoxedArc>, SubgraphExecutorError> {
169169
let from_expression =
170170
self.get_or_create_executor_from_expression(subgraph_name, client_request)?;
@@ -183,7 +183,7 @@ impl SubgraphExecutorMap {
183183
fn get_or_create_executor_from_expression(
184184
&self,
185185
subgraph_name: &str,
186-
client_request: &ClientRequestDetails<'_>,
186+
client_request: &ClientRequestDetails<'_, '_>,
187187
) -> Result<Option<SubgraphExecutorBoxedArc>, SubgraphExecutorError> {
188188
if let Some(expression) = self.expressions_by_subgraph.get(subgraph_name) {
189189
let original_url_value = VrlValue::Bytes(Bytes::from(

0 commit comments

Comments
 (0)