Skip to content

Commit c3e8a2b

Browse files
Performance improvements (#202)
- No preserve_order, we manually serialize the response on our own Closes #201 - We also manually serialize the representations sent to the subgraphs - Avoid extra allocations in the executor - Parsing and normalization caching in the gateway #245 - Better parallelism in ParallelNode execution - Use FuturesUnordered instead of a vector #226 - Replace serde_json deserializer with sonic_rs deserializer #257 Closes #215 - Replace reqwest with lower level hyper_util combining with above #254 - Use mimalloc as an allocator #256 - Avoid extra allocations and cloning in the gateway layers #261 - Remove HttpRequestParams layer to prevent extra copy/clone/allocations #287 - Projection planning to handle conflicting fields on the selection set PR #285 & Bug why it is needed? #249 - Use io::write instead of concatenating strings #288 Closes #284 - Other kind of improvements can be followed with the commits --------- Co-authored-by: Kamil Kisiela <[email protected]>
1 parent e4b3417 commit c3e8a2b

34 files changed

+2256
-4334
lines changed

Cargo.lock

Lines changed: 170 additions & 606 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench/k6.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import { githubComment } from "https://raw.githubusercontent.com/dotansimha/k6-g
55

66
const endpoint = __ENV.GATEWAY_ENDPOINT || "http://0.0.0.0:4000/graphql";
77
const vus = __ENV.BENCH_VUS ? parseInt(__ENV.BENCH_VUS) : 50;
8-
const time = __ENV.BENCH_OVER_TIME || "30s";
8+
const duration = __ENV.BENCH_OVER_TIME || "30s";
99

1010
export const options = {
11-
vus: vus,
12-
duration: time,
11+
vus,
12+
duration,
1313
};
1414

1515
export function setup() {
@@ -57,7 +57,7 @@ export function handleSummary(data) {
5757
},
5858
});
5959
}
60-
return handleBenchmarkSummary(data, { vus, time });
60+
return handleBenchmarkSummary(data, { vus, duration });
6161
}
6262

6363
let printIdentifiersMap = {};

bin/dev-cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ query-planner = { path = "../../lib/query-planner" }
1212
graphql-parser = "0.4.1"
1313
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
1414
tracing-tree = "0.4.0"
15-
serde_json = { version = "1.0.120", features = ["preserve_order"] }
15+
serde_json = "1.0.140"

bin/gateway/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ path = "src/main.rs"
1111
query-planner = { path = "../../lib/query-planner" }
1212
query-plan-executor = { path = "../../lib/query-plan-executor" }
1313

14+
mimalloc = { version = "0.1.47", features = ["override"] }
15+
1416
# Tokio runtime
1517
tokio = { version = "1.38.0", features = ["full"] }
1618

@@ -20,7 +22,8 @@ graphql-tools = "0.4.0" # Using version from original file
2022

2123
# Serialization
2224
serde = { version = "1.0.203", features = ["derive"] }
23-
serde_json = { version = "1.0.120", features = ["preserve_order"] }
25+
serde_json = "1.0.140"
26+
sonic-rs = "0.3"
2427

2528
# HTTP client and caching
2629
moka = { version = "0.12.8", features = ["future"] }
@@ -48,6 +51,7 @@ tower-http = { version = "0.6.6", features = [
4851
"cors",
4952
"request-id",
5053
] }
54+
http-body-util = "0.1.3"
5155

5256
# Utils
5357
thiserror = "2.0.12"

bin/gateway/src/main.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use axum::{
1818
Router,
1919
};
2020
use http::Request;
21+
use mimalloc::MiMalloc;
2122
use tokio::signal;
2223

2324
use axum::Extension;
@@ -28,7 +29,6 @@ use crate::pipeline::{
2829
coerce_variables_service::CoerceVariablesService, execution_service::ExecutionService,
2930
graphiql_service::GraphiQLResponderService,
3031
graphql_request_params::GraphQLRequestParamsExtractor,
31-
http_request_params::HttpRequestParamsExtractor,
3232
normalize_service::GraphQLOperationNormalizationService, parser_service::GraphQLParserService,
3333
progressive_override_service::ProgressiveOverrideExtractor,
3434
query_plan_service::QueryPlanService, validation_service::GraphQLValidationService,
@@ -43,6 +43,9 @@ use tower_http::{
4343
};
4444
use tracing::info;
4545

46+
#[global_allocator]
47+
static GLOBAL: MiMalloc = MiMalloc;
48+
4649
#[tokio::main]
4750
async fn main() -> Result<(), Box<dyn std::error::Error>> {
4851
let perfetto_file = env::var("PERFETTO_OUT").ok().is_some_and(|v| v == "1");
@@ -95,14 +98,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9598
)
9699
}),
97100
)
98-
.layer(HttpRequestParamsExtractor::new_layer())
99101
.layer(GraphiQLResponderService::new_layer())
100102
.layer(GraphQLRequestParamsExtractor::new_layer())
101103
.layer(GraphQLParserService::new_layer())
104+
.layer(GraphQLValidationService::new_layer())
102105
.layer(ProgressiveOverrideExtractor::new_layer())
103106
.layer(GraphQLOperationNormalizationService::new_layer())
104107
.layer(CoerceVariablesService::new_layer())
105-
.layer(GraphQLValidationService::new_layer())
106108
.layer(QueryPlanService::new_layer())
107109
.layer(PropagateRequestIdLayer::new(REQUEST_ID_HEADER_NAME.clone()))
108110
.service(ExecutionService::new(expose_query_plan));

bin/gateway/src/pipeline/coerce_variables_service.rs

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@ use std::collections::HashMap;
22
use std::sync::Arc;
33

44
use axum::body::Body;
5-
use http::Request;
5+
use http::{Method, Request};
66
use query_plan_executor::variables::collect_variables;
7-
use query_plan_executor::ExecutionRequest;
7+
use query_planner::state::supergraph_state::OperationKind;
88
use serde_json::Value;
9-
use tracing::{trace, warn};
9+
use tracing::{error, trace, warn};
1010

11-
use crate::pipeline::error::{PipelineError, PipelineErrorVariant};
11+
use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, PipelineErrorVariant};
1212
use crate::pipeline::gateway_layer::{
1313
GatewayPipelineLayer, GatewayPipelineStepDecision, ProcessorLayer,
1414
};
15-
use crate::pipeline::http_request_params::HttpRequestParams;
15+
use crate::pipeline::graphql_request_params::ExecutionRequest;
1616
use crate::pipeline::normalize_service::GraphQLNormalizationPayload;
1717
use crate::shared_state::GatewaySharedState;
1818

@@ -35,30 +35,44 @@ impl GatewayPipelineLayer for CoerceVariablesService {
3535
#[tracing::instrument(level = "trace", name = "CoerceVariablesService", skip_all)]
3636
async fn process(
3737
&self,
38-
mut req: Request<Body>,
39-
) -> Result<(Request<Body>, GatewayPipelineStepDecision), PipelineError> {
38+
req: &mut Request<Body>,
39+
) -> Result<GatewayPipelineStepDecision, PipelineError> {
4040
let normalized_operation = req
4141
.extensions()
42-
.get::<GraphQLNormalizationPayload>()
42+
.get::<Arc<GraphQLNormalizationPayload>>()
4343
.ok_or_else(|| {
44-
PipelineErrorVariant::InternalServiceError("GraphQLNormalizationPayload is missing")
44+
req.new_pipeline_error(PipelineErrorVariant::InternalServiceError(
45+
"GraphQLNormalizationPayload is missing",
46+
))
4547
})?;
4648

47-
let http_payload = req.extensions().get::<HttpRequestParams>().ok_or_else(|| {
48-
PipelineErrorVariant::InternalServiceError("HttpRequestParams is missing")
49-
})?;
50-
5149
let execution_params = req.extensions().get::<ExecutionRequest>().ok_or_else(|| {
52-
PipelineErrorVariant::InternalServiceError("ExecutionRequest is missing")
50+
req.new_pipeline_error(PipelineErrorVariant::InternalServiceError(
51+
"ExecutionRequest is missing",
52+
))
5353
})?;
5454

5555
let app_state = req
5656
.extensions()
5757
.get::<Arc<GatewaySharedState>>()
5858
.ok_or_else(|| {
59-
PipelineErrorVariant::InternalServiceError("GatewaySharedState is missing")
59+
req.new_pipeline_error(PipelineErrorVariant::InternalServiceError(
60+
"GatewaySharedState is missing",
61+
))
6062
})?;
6163

64+
if req.method() == Method::GET {
65+
if let Some(OperationKind::Mutation) =
66+
normalized_operation.operation_for_plan.operation_kind
67+
{
68+
error!("Mutation is not allowed over GET, stopping");
69+
70+
return Err(
71+
req.new_pipeline_error(PipelineErrorVariant::MutationNotAllowedOverHttpGet)
72+
);
73+
}
74+
}
75+
6276
match collect_variables(
6377
&normalized_operation.operation_for_plan,
6478
&execution_params.variables,
@@ -74,18 +88,16 @@ impl GatewayPipelineLayer for CoerceVariablesService {
7488
variables_map: values,
7589
});
7690

77-
Ok((req, GatewayPipelineStepDecision::Continue))
91+
Ok(GatewayPipelineStepDecision::Continue)
7892
}
7993
Err(err_msg) => {
8094
warn!(
8195
"failed to collect variables from incoming request: {}",
8296
err_msg
8397
);
84-
85-
return Err(PipelineError::new_with_accept_header(
86-
PipelineErrorVariant::VariablesCoercionError(err_msg),
87-
http_payload.accept_header.clone(),
88-
));
98+
return Err(
99+
req.new_pipeline_error(PipelineErrorVariant::VariablesCoercionError(err_msg))
100+
);
89101
}
90102
}
91103
}

bin/gateway/src/pipeline/error.rs

Lines changed: 36 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,27 @@ use std::{collections::HashMap, sync::Arc};
22

33
use axum::{body::Body, extract::rejection::QueryRejection, response::IntoResponse};
44
use graphql_tools::validation::utils::ValidationError;
5-
use http::{Response, StatusCode};
5+
use http::{HeaderName, Method, Request, Response, StatusCode};
66
use query_plan_executor::{ExecutionResult, GraphQLError};
77
use query_planner::{ast::normalization::error::NormalizationError, planner::PlannerError};
88
use serde_json::Value;
99

10-
use crate::pipeline::http_request_params::APPLICATION_JSON;
10+
use crate::pipeline::header::{RequestAccepts, APPLICATION_GRAPHQL_RESPONSE_JSON_STR};
1111

1212
#[derive(Debug)]
1313
pub struct PipelineError {
14-
pub accept_header: Option<String>,
14+
pub accept_ok: bool,
1515
pub error: PipelineErrorVariant,
1616
}
1717

18-
impl PipelineError {
19-
pub fn new_with_accept_header(
20-
error: PipelineErrorVariant,
21-
accept_header_value: String,
22-
) -> Self {
23-
Self {
24-
accept_header: Some(accept_header_value),
25-
error,
26-
}
18+
pub trait PipelineErrorFromAcceptHeader {
19+
fn new_pipeline_error(&self, error: PipelineErrorVariant) -> PipelineError;
20+
}
21+
22+
impl PipelineErrorFromAcceptHeader for Request<Body> {
23+
fn new_pipeline_error(&self, error: PipelineErrorVariant) -> PipelineError {
24+
let accept_ok = !self.accepts_content_type(&APPLICATION_GRAPHQL_RESPONSE_JSON_STR);
25+
PipelineError { accept_ok, error }
2726
}
2827
}
2928

@@ -35,9 +34,9 @@ pub enum PipelineErrorVariant {
3534

3635
// HTTP-related errors
3736
#[error("Unsupported HTTP method: {0}")]
38-
UnsupportedHttpMethod(String),
37+
UnsupportedHttpMethod(Method),
3938
#[error("Header '{0}' has invalid value")]
40-
InvalidHeaderValue(String),
39+
InvalidHeaderValue(HeaderName),
4140
#[error("Failed to read body: {0}")]
4241
FailedToReadBodyBytes(axum::Error),
4342
#[error("Content-Type header is missing")]
@@ -55,11 +54,11 @@ pub enum PipelineErrorVariant {
5554

5655
// GraphQL-specific errors
5756
#[error("Failed to parse GraphQL request payload")]
58-
FailedToParseBody(serde_json::Error),
57+
FailedToParseBody(sonic_rs::Error),
5958
#[error("Failed to parse GraphQL variables JSON")]
60-
FailedToParseVariables(serde_json::Error),
59+
FailedToParseVariables(sonic_rs::Error),
6160
#[error("Failed to parse GraphQL extensions JSON")]
62-
FailedToParseExtensions(serde_json::Error),
61+
FailedToParseExtensions(sonic_rs::Error),
6362
#[error("Failed to parse GraphQL operation")]
6463
FailedToParseOperation(graphql_parser::query::ParseError),
6564
#[error("Failed to normalize GraphQL operation")]
@@ -78,19 +77,27 @@ impl PipelineErrorVariant {
7877
Self::UnsupportedHttpMethod(_) => "METHOD_NOT_ALLOWED",
7978
Self::PlannerError(_) => "QUERY_PLAN_BUILD_FAILED",
8079
Self::InternalServiceError(_) => "INTERNAL_SERVER_ERROR",
80+
Self::FailedToParseOperation(_) => "GRAPHQL_PARSE_FAILED",
81+
Self::ValidationErrors(_) => "GRAPHQL_VALIDATION_FAILED",
82+
Self::VariablesCoercionError(_) => "BAD_USER_INPUT",
83+
Self::NormalizationError(NormalizationError::OperationNotFound) => {
84+
"OPERATION_RESOLUTION_FAILURE"
85+
}
86+
Self::NormalizationError(NormalizationError::SpecifiedOperationNotFound {
87+
operation_name: _,
88+
}) => "OPERATION_RESOLUTION_FAILURE",
89+
Self::NormalizationError(NormalizationError::MultipleMatchingOperationsFound) => {
90+
"OPERATION_RESOLUTION_FAILURE"
91+
}
8192
_ => "BAD_REQUEST",
8293
}
8394
}
8495

8596
pub fn graphql_error_message(&self) -> String {
8697
match self {
87-
Self::PlannerError(_) | Self::InternalServiceError(_) => {
88-
return "Unexpected error".to_string()
89-
}
90-
_ => {}
98+
Self::PlannerError(_) | Self::InternalServiceError(_) => "Unexpected error".to_string(),
99+
_ => self.to_string(),
91100
}
92-
93-
self.to_string()
94101
}
95102

96103
pub fn default_status_code(&self, prefer_ok: bool) -> StatusCode {
@@ -111,34 +118,17 @@ impl PipelineErrorVariant {
111118
(Self::VariablesCoercionError(_), false) => StatusCode::BAD_REQUEST,
112119
(Self::VariablesCoercionError(_), true) => StatusCode::OK,
113120
(Self::MutationNotAllowedOverHttpGet, _) => StatusCode::METHOD_NOT_ALLOWED,
114-
(Self::ValidationErrors(_), _) => StatusCode::BAD_REQUEST,
115-
(Self::MissingContentTypeHeader, _) => StatusCode::BAD_REQUEST,
116-
(Self::UnsupportedContentType, _) => StatusCode::BAD_REQUEST,
117-
}
118-
}
119-
}
120-
121-
impl From<PipelineError> for PipelineErrorVariant {
122-
fn from(error: PipelineError) -> Self {
123-
error.error
124-
}
125-
}
126-
127-
impl From<PipelineErrorVariant> for PipelineError {
128-
fn from(error: PipelineErrorVariant) -> Self {
129-
Self {
130-
error,
131-
accept_header: None,
121+
(Self::ValidationErrors(_), true) => StatusCode::OK,
122+
(Self::ValidationErrors(_), false) => StatusCode::BAD_REQUEST,
123+
(Self::MissingContentTypeHeader, _) => StatusCode::NOT_ACCEPTABLE,
124+
(Self::UnsupportedContentType, _) => StatusCode::UNSUPPORTED_MEDIA_TYPE,
132125
}
133126
}
134127
}
135128

136129
impl IntoResponse for PipelineError {
137130
fn into_response(self) -> Response<Body> {
138-
let accept_ok = self
139-
.accept_header
140-
.is_some_and(|v| v.contains(APPLICATION_JSON.to_str().unwrap()));
141-
let status = self.error.default_status_code(accept_ok);
131+
let status = self.error.default_status_code(self.accept_ok);
142132

143133
if let PipelineErrorVariant::ValidationErrors(validation_errors) = self.error {
144134
let validation_error_result = ExecutionResult {
@@ -148,7 +138,7 @@ impl IntoResponse for PipelineError {
148138
};
149139

150140
return (
151-
StatusCode::OK,
141+
status,
152142
serde_json::to_string(&validation_error_result).unwrap(),
153143
)
154144
.into_response();

0 commit comments

Comments
 (0)