Skip to content

Commit ee5f31e

Browse files
committed
feat(executor): handle auto decompression of subgraph responses
1 parent 61aea8b commit ee5f31e

File tree

7 files changed

+276
-4
lines changed

7 files changed

+276
-4
lines changed

Cargo.lock

Lines changed: 136 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/router/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ reqwest-retry = { workspace = true }
4545
reqwest-middleware = { workspace = true }
4646
vrl = { workspace = true }
4747
serde_json = { workspace = true }
48+
tokio-util = { workspace = true }
4849

4950
mimalloc = { version = "0.1.48", features = ["v3"] }
5051
moka = { version = "0.12.10", features = ["future"] }
5152
ulid = "1.2.1"
52-
tokio-util = "0.7.16"
5353
cookie = "0.18.1"
5454
regex-automata = "0.4.10"
5555
arc-swap = "1.7.1"

lib/executor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ strum = { version = "0.27.2", features = ["derive"] }
3737
ntex-http = "0.1.15"
3838
ordered-float = "4.2.0"
3939
hyper-tls = { version = "0.6.0", features = ["vendored"] }
40+
async-compression = { version = "0.4.3", features = ["all"]}
4041
hyper-util = { version = "0.1.16", features = [
4142
"client",
4243
"client-legacy",
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use async_compression::futures::bufread::{
2+
BrotliDecoder, DeflateDecoder, GzipDecoder, ZstdDecoder,
3+
};
4+
use bytes::Bytes;
5+
use futures::{future::BoxFuture, AsyncReadExt};
6+
7+
use crate::executors::error::SubgraphExecutorError;
8+
9+
pub enum CompressionType {
10+
Gzip,
11+
Deflate,
12+
Brotli,
13+
Zstd,
14+
Multiple(Vec<CompressionType>),
15+
Identity,
16+
}
17+
18+
19+
impl CompressionType {
20+
pub fn header_value(&self) -> String {
21+
match self {
22+
CompressionType::Gzip => "gzip".to_string(),
23+
CompressionType::Deflate => "deflate".to_string(),
24+
CompressionType::Brotli => "br".to_string(),
25+
CompressionType::Zstd => "zstd".to_string(),
26+
CompressionType::Identity => "identity".to_string(),
27+
CompressionType::Multiple(types) => types
28+
.iter()
29+
.map(|t| t.header_value().to_string())
30+
.collect::<Vec<String>>()
31+
.join(", ")
32+
.to_string(),
33+
}
34+
}
35+
pub fn from_encoding_header(
36+
encoding: &str,
37+
) -> Result<CompressionType, SubgraphExecutorError> {
38+
let encodings: Vec<&str> = encoding.split(',').map(|s| s.trim()).collect();
39+
if encodings.len() > 1 {
40+
let types = encodings
41+
.iter()
42+
.map(|&e| CompressionType::from_encoding_header(e))
43+
.collect::<Result<Vec<CompressionType>, SubgraphExecutorError>>()?;
44+
Ok(CompressionType::Multiple(types))
45+
} else {
46+
match encodings[0].to_lowercase().as_str() {
47+
"gzip" => Ok(CompressionType::Gzip),
48+
"deflate" => Ok(CompressionType::Deflate),
49+
"br" => Ok(CompressionType::Brotli),
50+
"zstd" => Ok(CompressionType::Zstd),
51+
"identity" => Ok(CompressionType::Identity),
52+
"none" => Ok(CompressionType::Identity),
53+
_ => Err(SubgraphExecutorError::UnknownEncoding(encoding.to_string())),
54+
}
55+
}
56+
}
57+
pub fn decompress<'a>(&'a self, body: Bytes) -> BoxFuture<'a, Result<Bytes, SubgraphExecutorError>> {
58+
Box::pin(async move {
59+
match self {
60+
CompressionType::Gzip => {
61+
let mut decoder = GzipDecoder::new(body.as_ref());
62+
let mut buf = Vec::new();
63+
let _ = decoder
64+
.read_to_end(&mut buf)
65+
.await
66+
.map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string()));
67+
Ok(Bytes::from(buf))
68+
}
69+
CompressionType::Deflate => {
70+
let mut decoder = DeflateDecoder::new(body.as_ref());
71+
let mut buf = Vec::new();
72+
let _ = decoder
73+
.read_to_end(&mut buf)
74+
.await
75+
.map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string()));
76+
Ok(Bytes::from(buf))
77+
}
78+
CompressionType::Brotli => {
79+
let mut decoder = BrotliDecoder::new(body.as_ref());
80+
let mut buf = Vec::new();
81+
let _ = decoder
82+
.read_to_end(&mut buf)
83+
.await
84+
.map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string()));
85+
Ok(Bytes::from(buf))
86+
}
87+
CompressionType::Zstd => {
88+
let mut decoder = ZstdDecoder::new(body.as_ref());
89+
let mut buf = Vec::new();
90+
let _ = decoder
91+
.read_to_end(&mut buf)
92+
.await
93+
.map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string()));
94+
Ok(Bytes::from(buf))
95+
}
96+
CompressionType::Multiple(types) => {
97+
let mut decompressed_body = body;
98+
for ctype in types {
99+
decompressed_body = ctype.decompress(decompressed_body).await?;
100+
}
101+
Ok(decompressed_body)
102+
}
103+
CompressionType::Identity => Ok(body),
104+
}
105+
})
106+
}
107+
}

lib/executor/src/executors/error.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ pub enum SubgraphExecutorError {
2020
RequestFailure(String, String),
2121
#[error("Failed to serialize variable \"{0}\": {1}")]
2222
VariablesSerializationFailure(String, String),
23+
#[error("An unknown encoding \"{0}\" was specified in the 'Content-Encoding' header.")]
24+
UnknownEncoding(String),
25+
#[error("Decompression failed for encoding \"{0}\": {1}")]
26+
DecompressionFailed(String, String),
2327
}
2428

2529
impl From<SubgraphExecutorError> for GraphQLError {
@@ -75,7 +79,9 @@ impl SubgraphExecutorError {
7579
SubgraphExecutorError::RequestFailure(_, _) => "SUBGRAPH_REQUEST_FAILURE",
7680
SubgraphExecutorError::VariablesSerializationFailure(_, _) => {
7781
"SUBGRAPH_VARIABLES_SERIALIZATION_FAILURE"
78-
}
82+
},
83+
SubgraphExecutorError::UnknownEncoding(_) => "SUBGRAPH_UNKNOWN_ENCODING",
84+
SubgraphExecutorError::DecompressionFailed(_, _) => "SUBGRAPH_DECOMPRESSION_FAILED",
7985
}
8086
}
8187
}

0 commit comments

Comments
 (0)