From c61f6b6c21247e28dabff3f8f82984bd62b8a24f Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 14 Nov 2025 16:48:02 +0300 Subject: [PATCH 1/3] feat(executor): handle auto decompression of subgraph responses --- Cargo.lock | 136 ++++++++++++++++++++++ bin/router/Cargo.toml | 2 +- lib/executor/Cargo.toml | 1 + lib/executor/src/executors/compression.rs | 107 +++++++++++++++++ lib/executor/src/executors/error.rs | 8 +- lib/executor/src/executors/http.rs | 25 +++- lib/executor/src/executors/mod.rs | 1 + 7 files changed, 276 insertions(+), 4 deletions(-) create mode 100644 lib/executor/src/executors/compression.rs diff --git a/Cargo.lock b/Cargo.lock index d828dd8d2..871905834 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,6 +78,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -166,6 +181,20 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-compression" +version = "0.4.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93c1f86859c1af3d514fa19e8323147ff10ea98684e6c7b307912509f50e67b2" +dependencies = [ + "compression-codecs", + "compression-core", + "futures-core", + "futures-io", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-graphql" version = "7.0.17" @@ -477,6 +506,27 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc0b364ead1874514c8c2855ab558056ebfeb775653e7ae45ff72f28f8f3166c" +[[package]] +name = "brotli" +version = "8.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -498,6 +548,15 @@ dependencies = [ "serde", ] +[[package]] +name = "bzip2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a53fac24f34a81bc9954b5d6cfce0c21e18ec6959f44f56e8e90e4bb7c346c" +dependencies = [ + "libbz2-rs-sys", +] + [[package]] name = "cast" version = "0.3.0" @@ -725,6 +784,30 @@ dependencies = [ "sha1", ] +[[package]] +name = "compression-codecs" +version = "0.4.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "680dc087785c5230f8e8843e2e57ac7c1c90488b6a91b88caa265410568f441b" +dependencies = [ + "brotli", + "bzip2", + "compression-core", + "deflate64", + "flate2", + "liblzma", + "lz4", + "memchr", + "zstd", + "zstd-safe", +] + +[[package]] +name = "compression-core" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a9b614a5787ef0c8802a55766480563cb3a93b435898c422ed2a359cf811582" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1194,6 +1277,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "deflate64" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26bf8fc351c5ed29b5c2f0cbbac1b209b74f60ecd62e675a998df72c49af5204" + [[package]] name = "der" version = "0.7.10" @@ -2046,6 +2135,7 @@ name = "hive-router-plan-executor" version = "6.0.1" dependencies = [ "ahash", + "async-compression", "async-trait", "bumpalo", "bytes", @@ -2668,6 +2758,12 @@ dependencies = [ "spin", ] +[[package]] +name = "libbz2-rs-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" + [[package]] name = "libc" version = "0.2.177" @@ -2684,6 +2780,27 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "liblzma" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73c36d08cad03a3fbe2c4e7bb3a9e84c57e4ee4135ed0b065cade3d98480c648" +dependencies = [ + "liblzma-sys", + "num_cpus", +] + +[[package]] +name = "liblzma-sys" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b9596486f6d60c3bbe644c0e1be1aa6ccc472ad630fe8927b456973d7cb736" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "libm" version = "0.2.15" @@ -2742,6 +2859,25 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "lz4" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4" +dependencies = [ + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "lz4_flex" version = "0.11.5" diff --git a/bin/router/Cargo.toml b/bin/router/Cargo.toml index 4969b0a46..f1528c5c2 100644 --- a/bin/router/Cargo.toml +++ b/bin/router/Cargo.toml @@ -45,11 +45,11 @@ reqwest-retry = { workspace = true } reqwest-middleware = { workspace = true } vrl = { workspace = true } serde_json = { workspace = true } +tokio-util = { workspace = true } mimalloc = { version = "0.1.48", features = ["v3"] } moka = { version = "0.12.10", features = ["future"] } ulid = "1.2.1" -tokio-util = "0.7.16" cookie = "0.18.1" regex-automata = "0.4.10" arc-swap = "1.7.1" diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index 27f7af1bf..9bfa08047 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -37,6 +37,7 @@ strum = { version = "0.27.2", features = ["derive"] } ntex-http = "0.1.15" ordered-float = "4.2.0" hyper-tls = { version = "0.6.0", features = ["vendored"] } +async-compression = { version = "0.4.3", features = ["all"]} hyper-util = { version = "0.1.16", features = [ "client", "client-legacy", diff --git a/lib/executor/src/executors/compression.rs b/lib/executor/src/executors/compression.rs new file mode 100644 index 000000000..cf29363b4 --- /dev/null +++ b/lib/executor/src/executors/compression.rs @@ -0,0 +1,107 @@ +use async_compression::futures::bufread::{ + BrotliDecoder, DeflateDecoder, GzipDecoder, ZstdDecoder, +}; +use bytes::Bytes; +use futures::{future::BoxFuture, AsyncReadExt}; + +use crate::executors::error::SubgraphExecutorError; + +pub enum CompressionType { + Gzip, + Deflate, + Brotli, + Zstd, + Multiple(Vec), + Identity, +} + + +impl CompressionType { + pub fn header_value(&self) -> String { + match self { + CompressionType::Gzip => "gzip".to_string(), + CompressionType::Deflate => "deflate".to_string(), + CompressionType::Brotli => "br".to_string(), + CompressionType::Zstd => "zstd".to_string(), + CompressionType::Identity => "identity".to_string(), + CompressionType::Multiple(types) => types + .iter() + .map(|t| t.header_value().to_string()) + .collect::>() + .join(", ") + .to_string(), + } + } + pub fn from_encoding_header( + encoding: &str, + ) -> Result { + let encodings: Vec<&str> = encoding.split(',').map(|s| s.trim()).collect(); + if encodings.len() > 1 { + let types = encodings + .iter() + .map(|&e| CompressionType::from_encoding_header(e)) + .collect::, SubgraphExecutorError>>()?; + Ok(CompressionType::Multiple(types)) + } else { + match encodings[0].to_lowercase().as_str() { + "gzip" => Ok(CompressionType::Gzip), + "deflate" => Ok(CompressionType::Deflate), + "br" => Ok(CompressionType::Brotli), + "zstd" => Ok(CompressionType::Zstd), + "identity" => Ok(CompressionType::Identity), + "none" => Ok(CompressionType::Identity), + _ => Err(SubgraphExecutorError::UnknownEncoding(encoding.to_string())), + } + } + } + pub fn decompress<'a>(&'a self, body: Bytes) -> BoxFuture<'a, Result> { + Box::pin(async move { + match self { + CompressionType::Gzip => { + let mut decoder = GzipDecoder::new(body.as_ref()); + let mut buf = Vec::new(); + let _ = decoder + .read_to_end(&mut buf) + .await + .map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string())); + Ok(Bytes::from(buf)) + } + CompressionType::Deflate => { + let mut decoder = DeflateDecoder::new(body.as_ref()); + let mut buf = Vec::new(); + let _ = decoder + .read_to_end(&mut buf) + .await + .map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string())); + Ok(Bytes::from(buf)) + } + CompressionType::Brotli => { + let mut decoder = BrotliDecoder::new(body.as_ref()); + let mut buf = Vec::new(); + let _ = decoder + .read_to_end(&mut buf) + .await + .map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string())); + Ok(Bytes::from(buf)) + } + CompressionType::Zstd => { + let mut decoder = ZstdDecoder::new(body.as_ref()); + let mut buf = Vec::new(); + let _ = decoder + .read_to_end(&mut buf) + .await + .map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string())); + Ok(Bytes::from(buf)) + } + CompressionType::Multiple(types) => { + let mut decompressed_body = body; + for ctype in types { + decompressed_body = ctype.decompress(decompressed_body).await?; + } + Ok(decompressed_body) + } + CompressionType::Identity => Ok(body), + } + }) + } +} diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 2234f524c..1740579e6 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -20,6 +20,10 @@ pub enum SubgraphExecutorError { RequestFailure(String, String), #[error("Failed to serialize variable \"{0}\": {1}")] VariablesSerializationFailure(String, String), + #[error("An unknown encoding \"{0}\" was specified in the 'Content-Encoding' header.")] + UnknownEncoding(String), + #[error("Decompression failed for encoding \"{0}\": {1}")] + DecompressionFailed(String, String), } impl From for GraphQLError { @@ -75,7 +79,9 @@ impl SubgraphExecutorError { SubgraphExecutorError::RequestFailure(_, _) => "SUBGRAPH_REQUEST_FAILURE", SubgraphExecutorError::VariablesSerializationFailure(_, _) => { "SUBGRAPH_VARIABLES_SERIALIZATION_FAILURE" - } + }, + SubgraphExecutorError::UnknownEncoding(_) => "SUBGRAPH_UNKNOWN_ENCODING", + SubgraphExecutorError::DecompressionFailed(_, _) => "SUBGRAPH_DECOMPRESSION_FAILED", } } } diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 29b392567..cd5cbe606 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use crate::executors::common::HttpExecutionResponse; +use crate::executors::compression::CompressionType; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; use dashmap::DashMap; use hive_router_config::HiveRouterConfig; @@ -11,8 +12,8 @@ use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use http::HeaderMap; use http::HeaderValue; -use http_body_util::BodyExt; use http_body_util::Full; +use http_body_util::{BodyExt}; use hyper::Version; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::{connect::HttpConnector, Client}; @@ -62,6 +63,10 @@ impl HTTPSubgraphExecutor { http::header::CONNECTION, HeaderValue::from_static("keep-alive"), ); + header_map.insert( + http::header::ACCEPT_ENCODING, + HeaderValue::from_static("gzip, deflate, br, zstd"), + ); Self { subgraph_name, @@ -162,7 +167,7 @@ impl HTTPSubgraphExecutor { ); let (parts, body) = res.into_parts(); - let body = body + let mut body = body .collect() .await .map_err(|e| { @@ -170,6 +175,22 @@ impl HTTPSubgraphExecutor { })? .to_bytes(); + let content_encoding_header = parts + .headers + .get(http::header::CONTENT_ENCODING) + .and_then(|v| v.to_str().ok()); + + if let Some(content_encoding_header) = content_encoding_header { + let decompressor = CompressionType::from_encoding_header(content_encoding_header) + .map_err(|e| { + SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) + })?; + + body = decompressor.decompress(body).await.map_err(|e| { + SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) + })?; + } + if body.is_empty() { return Err(SubgraphExecutorError::RequestFailure( self.endpoint.to_string(), diff --git a/lib/executor/src/executors/mod.rs b/lib/executor/src/executors/mod.rs index 520ff5f94..b3bc0efb2 100644 --- a/lib/executor/src/executors/mod.rs +++ b/lib/executor/src/executors/mod.rs @@ -3,3 +3,4 @@ pub mod dedupe; pub mod error; pub mod http; pub mod map; +pub mod compression; \ No newline at end of file From a2d58b45e50b1d8f9a9987bdf072309f4120fcc5 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 14 Nov 2025 16:51:56 +0300 Subject: [PATCH 2/3] accept encoding --- lib/executor/src/executors/compression.rs | 53 ++++++++++++++--------- lib/executor/src/executors/error.rs | 2 +- lib/executor/src/executors/http.rs | 4 +- lib/executor/src/executors/mod.rs | 2 +- 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/lib/executor/src/executors/compression.rs b/lib/executor/src/executors/compression.rs index cf29363b4..e248c20d1 100644 --- a/lib/executor/src/executors/compression.rs +++ b/lib/executor/src/executors/compression.rs @@ -15,8 +15,10 @@ pub enum CompressionType { Identity, } - impl CompressionType { + pub fn accept_encoding() -> &'static str { + "gzip, deflate, br, zstd" + } pub fn header_value(&self) -> String { match self { CompressionType::Gzip => "gzip".to_string(), @@ -32,9 +34,7 @@ impl CompressionType { .to_string(), } } - pub fn from_encoding_header( - encoding: &str, - ) -> Result { + pub fn from_encoding_header(encoding: &str) -> Result { let encodings: Vec<&str> = encoding.split(',').map(|s| s.trim()).collect(); if encodings.len() > 1 { let types = encodings @@ -54,43 +54,54 @@ impl CompressionType { } } } - pub fn decompress<'a>(&'a self, body: Bytes) -> BoxFuture<'a, Result> { + pub fn decompress<'a>( + &'a self, + body: Bytes, + ) -> BoxFuture<'a, Result> { Box::pin(async move { match self { CompressionType::Gzip => { let mut decoder = GzipDecoder::new(body.as_ref()); let mut buf = Vec::new(); - let _ = decoder - .read_to_end(&mut buf) - .await - .map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string())); + let _ = decoder.read_to_end(&mut buf).await.map_err(|e| { + SubgraphExecutorError::DecompressionFailed( + self.header_value(), + e.to_string(), + ) + }); Ok(Bytes::from(buf)) } CompressionType::Deflate => { let mut decoder = DeflateDecoder::new(body.as_ref()); let mut buf = Vec::new(); - let _ = decoder - .read_to_end(&mut buf) - .await - .map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string())); + let _ = decoder.read_to_end(&mut buf).await.map_err(|e| { + SubgraphExecutorError::DecompressionFailed( + self.header_value(), + e.to_string(), + ) + }); Ok(Bytes::from(buf)) } CompressionType::Brotli => { let mut decoder = BrotliDecoder::new(body.as_ref()); let mut buf = Vec::new(); - let _ = decoder - .read_to_end(&mut buf) - .await - .map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string())); + let _ = decoder.read_to_end(&mut buf).await.map_err(|e| { + SubgraphExecutorError::DecompressionFailed( + self.header_value(), + e.to_string(), + ) + }); Ok(Bytes::from(buf)) } CompressionType::Zstd => { let mut decoder = ZstdDecoder::new(body.as_ref()); let mut buf = Vec::new(); - let _ = decoder - .read_to_end(&mut buf) - .await - .map_err(|e| SubgraphExecutorError::DecompressionFailed(self.header_value(), e.to_string())); + let _ = decoder.read_to_end(&mut buf).await.map_err(|e| { + SubgraphExecutorError::DecompressionFailed( + self.header_value(), + e.to_string(), + ) + }); Ok(Bytes::from(buf)) } CompressionType::Multiple(types) => { diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 1740579e6..8425a2a22 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -79,7 +79,7 @@ impl SubgraphExecutorError { SubgraphExecutorError::RequestFailure(_, _) => "SUBGRAPH_REQUEST_FAILURE", SubgraphExecutorError::VariablesSerializationFailure(_, _) => { "SUBGRAPH_VARIABLES_SERIALIZATION_FAILURE" - }, + } SubgraphExecutorError::UnknownEncoding(_) => "SUBGRAPH_UNKNOWN_ENCODING", SubgraphExecutorError::DecompressionFailed(_, _) => "SUBGRAPH_DECOMPRESSION_FAILED", } diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index cd5cbe606..2f196a080 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -12,8 +12,8 @@ use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use http::HeaderMap; use http::HeaderValue; +use http_body_util::BodyExt; use http_body_util::Full; -use http_body_util::{BodyExt}; use hyper::Version; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::{connect::HttpConnector, Client}; @@ -65,7 +65,7 @@ impl HTTPSubgraphExecutor { ); header_map.insert( http::header::ACCEPT_ENCODING, - HeaderValue::from_static("gzip, deflate, br, zstd"), + HeaderValue::from_static(CompressionType::accept_encoding()), ); Self { diff --git a/lib/executor/src/executors/mod.rs b/lib/executor/src/executors/mod.rs index b3bc0efb2..4745dd7e0 100644 --- a/lib/executor/src/executors/mod.rs +++ b/lib/executor/src/executors/mod.rs @@ -1,6 +1,6 @@ pub mod common; +pub mod compression; pub mod dedupe; pub mod error; pub mod http; pub mod map; -pub mod compression; \ No newline at end of file From 6f1599338978d5ae3a3e3972b5b07d4e8f057af2 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 14 Nov 2025 17:06:18 +0300 Subject: [PATCH 3/3] .. --- lib/executor/src/executors/compression.rs | 19 +++++++++---------- lib/executor/src/executors/http.rs | 9 ++------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/lib/executor/src/executors/compression.rs b/lib/executor/src/executors/compression.rs index e248c20d1..1976ef7de 100644 --- a/lib/executor/src/executors/compression.rs +++ b/lib/executor/src/executors/compression.rs @@ -30,8 +30,7 @@ impl CompressionType { .iter() .map(|t| t.header_value().to_string()) .collect::>() - .join(", ") - .to_string(), + .join(", "), } } pub fn from_encoding_header(encoding: &str) -> Result { @@ -63,45 +62,45 @@ impl CompressionType { CompressionType::Gzip => { let mut decoder = GzipDecoder::new(body.as_ref()); let mut buf = Vec::new(); - let _ = decoder.read_to_end(&mut buf).await.map_err(|e| { + decoder.read_to_end(&mut buf).await.map_err(|e| { SubgraphExecutorError::DecompressionFailed( self.header_value(), e.to_string(), ) - }); + })?; Ok(Bytes::from(buf)) } CompressionType::Deflate => { let mut decoder = DeflateDecoder::new(body.as_ref()); let mut buf = Vec::new(); - let _ = decoder.read_to_end(&mut buf).await.map_err(|e| { + decoder.read_to_end(&mut buf).await.map_err(|e| { SubgraphExecutorError::DecompressionFailed( self.header_value(), e.to_string(), ) - }); + })?; Ok(Bytes::from(buf)) } CompressionType::Brotli => { let mut decoder = BrotliDecoder::new(body.as_ref()); let mut buf = Vec::new(); - let _ = decoder.read_to_end(&mut buf).await.map_err(|e| { + decoder.read_to_end(&mut buf).await.map_err(|e| { SubgraphExecutorError::DecompressionFailed( self.header_value(), e.to_string(), ) - }); + })?; Ok(Bytes::from(buf)) } CompressionType::Zstd => { let mut decoder = ZstdDecoder::new(body.as_ref()); let mut buf = Vec::new(); - let _ = decoder.read_to_end(&mut buf).await.map_err(|e| { + decoder.read_to_end(&mut buf).await.map_err(|e| { SubgraphExecutorError::DecompressionFailed( self.header_value(), e.to_string(), ) - }); + })?; Ok(Bytes::from(buf)) } CompressionType::Multiple(types) => { diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 2f196a080..2b39c83a3 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -181,14 +181,9 @@ impl HTTPSubgraphExecutor { .and_then(|v| v.to_str().ok()); if let Some(content_encoding_header) = content_encoding_header { - let decompressor = CompressionType::from_encoding_header(content_encoding_header) - .map_err(|e| { - SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) - })?; + let decompressor = CompressionType::from_encoding_header(content_encoding_header)?; - body = decompressor.decompress(body).await.map_err(|e| { - SubgraphExecutorError::RequestFailure(self.endpoint.to_string(), e.to_string()) - })?; + body = decompressor.decompress(body).await?; } if body.is_empty() {