diff --git a/Cargo.lock b/Cargo.lock index d828dd8d2..871905834 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,6 +78,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -166,6 +181,20 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-compression" +version = "0.4.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93c1f86859c1af3d514fa19e8323147ff10ea98684e6c7b307912509f50e67b2" +dependencies = [ + "compression-codecs", + "compression-core", + "futures-core", + "futures-io", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-graphql" version = "7.0.17" @@ -477,6 +506,27 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc0b364ead1874514c8c2855ab558056ebfeb775653e7ae45ff72f28f8f3166c" +[[package]] +name = "brotli" +version = "8.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -498,6 +548,15 @@ dependencies = [ "serde", ] +[[package]] +name = "bzip2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a53fac24f34a81bc9954b5d6cfce0c21e18ec6959f44f56e8e90e4bb7c346c" +dependencies = [ + "libbz2-rs-sys", +] + [[package]] name = "cast" version = "0.3.0" @@ -725,6 +784,30 @@ dependencies = [ "sha1", ] +[[package]] +name = "compression-codecs" +version = "0.4.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "680dc087785c5230f8e8843e2e57ac7c1c90488b6a91b88caa265410568f441b" +dependencies = [ + "brotli", + "bzip2", + "compression-core", + "deflate64", + "flate2", + "liblzma", + "lz4", + "memchr", + "zstd", + "zstd-safe", +] + +[[package]] +name = "compression-core" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a9b614a5787ef0c8802a55766480563cb3a93b435898c422ed2a359cf811582" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1194,6 +1277,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "deflate64" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26bf8fc351c5ed29b5c2f0cbbac1b209b74f60ecd62e675a998df72c49af5204" + [[package]] name = "der" version = "0.7.10" @@ -2046,6 +2135,7 @@ name = "hive-router-plan-executor" version = "6.0.1" dependencies = [ "ahash", + "async-compression", "async-trait", "bumpalo", "bytes", @@ -2668,6 +2758,12 @@ dependencies = [ "spin", ] +[[package]] +name = "libbz2-rs-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" + [[package]] name = "libc" version = "0.2.177" @@ -2684,6 +2780,27 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "liblzma" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73c36d08cad03a3fbe2c4e7bb3a9e84c57e4ee4135ed0b065cade3d98480c648" +dependencies = [ + "liblzma-sys", + "num_cpus", +] + +[[package]] +name = "liblzma-sys" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b9596486f6d60c3bbe644c0e1be1aa6ccc472ad630fe8927b456973d7cb736" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "libm" version = "0.2.15" @@ -2742,6 +2859,25 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "lz4" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4" +dependencies = [ + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "lz4_flex" version = "0.11.5" diff --git a/bin/router/Cargo.toml b/bin/router/Cargo.toml index 4969b0a46..f1528c5c2 100644 --- a/bin/router/Cargo.toml +++ b/bin/router/Cargo.toml @@ -45,11 +45,11 @@ reqwest-retry = { workspace = true } reqwest-middleware = { workspace = true } vrl = { workspace = true } serde_json = { workspace = true } +tokio-util = { workspace = true } mimalloc = { version = "0.1.48", features = ["v3"] } moka = { version = "0.12.10", features = ["future"] } ulid = "1.2.1" -tokio-util = "0.7.16" cookie = "0.18.1" regex-automata = "0.4.10" arc-swap = "1.7.1" diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index 27f7af1bf..9bfa08047 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -37,6 +37,7 @@ strum = { version = "0.27.2", features = ["derive"] } ntex-http = "0.1.15" ordered-float = "4.2.0" hyper-tls = { version = "0.6.0", features = ["vendored"] } +async-compression = { version = "0.4.3", features = ["all"]} hyper-util = { version = "0.1.16", features = [ "client", "client-legacy", diff --git a/lib/executor/src/executors/compression.rs b/lib/executor/src/executors/compression.rs new file mode 100644 index 000000000..1976ef7de --- /dev/null +++ b/lib/executor/src/executors/compression.rs @@ -0,0 +1,117 @@ +use async_compression::futures::bufread::{ + BrotliDecoder, DeflateDecoder, GzipDecoder, ZstdDecoder, +}; +use bytes::Bytes; +use futures::{future::BoxFuture, AsyncReadExt}; + +use crate::executors::error::SubgraphExecutorError; + +pub enum CompressionType { + Gzip, + Deflate, + Brotli, + Zstd, + Multiple(Vec), + Identity, +} + +impl CompressionType { + pub fn accept_encoding() -> &'static str { + "gzip, deflate, br, zstd" + } + pub fn header_value(&self) -> String { + match self { + CompressionType::Gzip => "gzip".to_string(), + CompressionType::Deflate => "deflate".to_string(), + CompressionType::Brotli => "br".to_string(), + CompressionType::Zstd => "zstd".to_string(), + CompressionType::Identity => "identity".to_string(), + CompressionType::Multiple(types) => types + .iter() + .map(|t| t.header_value().to_string()) + .collect::>() + .join(", "), + } + } + pub fn from_encoding_header(encoding: &str) -> Result { + let encodings: Vec<&str> = encoding.split(',').map(|s| s.trim()).collect(); + if encodings.len() > 1 { + let types = encodings + .iter() + .map(|&e| CompressionType::from_encoding_header(e)) + .collect::, SubgraphExecutorError>>()?; + Ok(CompressionType::Multiple(types)) + } else { + match encodings[0].to_lowercase().as_str() { + "gzip" => Ok(CompressionType::Gzip), + "deflate" => Ok(CompressionType::Deflate), + "br" => Ok(CompressionType::Brotli), + "zstd" => Ok(CompressionType::Zstd), + "identity" => Ok(CompressionType::Identity), + "none" => Ok(CompressionType::Identity), + _ => Err(SubgraphExecutorError::UnknownEncoding(encoding.to_string())), + } + } + } + pub fn decompress<'a>( + &'a self, + body: Bytes, + ) -> BoxFuture<'a, Result> { + Box::pin(async move { + match self { + CompressionType::Gzip => { + let mut decoder = GzipDecoder::new(body.as_ref()); + let mut buf = Vec::new(); + decoder.read_to_end(&mut buf).await.map_err(|e| { + SubgraphExecutorError::DecompressionFailed( + self.header_value(), + e.to_string(), + ) + })?; + Ok(Bytes::from(buf)) + } + CompressionType::Deflate => { + let mut decoder = DeflateDecoder::new(body.as_ref()); + let mut buf = Vec::new(); + decoder.read_to_end(&mut buf).await.map_err(|e| { + SubgraphExecutorError::DecompressionFailed( + self.header_value(), + e.to_string(), + ) + })?; + Ok(Bytes::from(buf)) + } + CompressionType::Brotli => { + let mut decoder = BrotliDecoder::new(body.as_ref()); + let mut buf = Vec::new(); + decoder.read_to_end(&mut buf).await.map_err(|e| { + SubgraphExecutorError::DecompressionFailed( + self.header_value(), + e.to_string(), + ) + })?; + Ok(Bytes::from(buf)) + } + CompressionType::Zstd => { + let mut decoder = ZstdDecoder::new(body.as_ref()); + let mut buf = Vec::new(); + decoder.read_to_end(&mut buf).await.map_err(|e| { + SubgraphExecutorError::DecompressionFailed( + self.header_value(), + e.to_string(), + ) + })?; + Ok(Bytes::from(buf)) + } + CompressionType::Multiple(types) => { + let mut decompressed_body = body; + for ctype in types { + decompressed_body = ctype.decompress(decompressed_body).await?; + } + Ok(decompressed_body) + } + CompressionType::Identity => Ok(body), + } + }) + } +} diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 2234f524c..8425a2a22 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -20,6 +20,10 @@ pub enum SubgraphExecutorError { RequestFailure(String, String), #[error("Failed to serialize variable \"{0}\": {1}")] VariablesSerializationFailure(String, String), + #[error("An unknown encoding \"{0}\" was specified in the 'Content-Encoding' header.")] + UnknownEncoding(String), + #[error("Decompression failed for encoding \"{0}\": {1}")] + DecompressionFailed(String, String), } impl From for GraphQLError { @@ -76,6 +80,8 @@ impl SubgraphExecutorError { SubgraphExecutorError::VariablesSerializationFailure(_, _) => { "SUBGRAPH_VARIABLES_SERIALIZATION_FAILURE" } + SubgraphExecutorError::UnknownEncoding(_) => "SUBGRAPH_UNKNOWN_ENCODING", + SubgraphExecutorError::DecompressionFailed(_, _) => "SUBGRAPH_DECOMPRESSION_FAILED", } } } diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 29b392567..2b39c83a3 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use crate::executors::common::HttpExecutionResponse; +use crate::executors::compression::CompressionType; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; use dashmap::DashMap; use hive_router_config::HiveRouterConfig; @@ -62,6 +63,10 @@ impl HTTPSubgraphExecutor { http::header::CONNECTION, HeaderValue::from_static("keep-alive"), ); + header_map.insert( + http::header::ACCEPT_ENCODING, + HeaderValue::from_static(CompressionType::accept_encoding()), + ); Self { subgraph_name, @@ -162,7 +167,7 @@ impl HTTPSubgraphExecutor { ); let (parts, body) = res.into_parts(); - let body = body + let mut body = body .collect() .await .map_err(|e| { @@ -170,6 +175,17 @@ impl HTTPSubgraphExecutor { })? .to_bytes(); + let content_encoding_header = parts + .headers + .get(http::header::CONTENT_ENCODING) + .and_then(|v| v.to_str().ok()); + + if let Some(content_encoding_header) = content_encoding_header { + let decompressor = CompressionType::from_encoding_header(content_encoding_header)?; + + body = decompressor.decompress(body).await?; + } + if body.is_empty() { return Err(SubgraphExecutorError::RequestFailure( self.endpoint.to_string(), diff --git a/lib/executor/src/executors/mod.rs b/lib/executor/src/executors/mod.rs index 520ff5f94..4745dd7e0 100644 --- a/lib/executor/src/executors/mod.rs +++ b/lib/executor/src/executors/mod.rs @@ -1,4 +1,5 @@ pub mod common; +pub mod compression; pub mod dedupe; pub mod error; pub mod http;