From b8939aa1147a69d2d152e3c87cfd7df0ae00c9a3 Mon Sep 17 00:00:00 2001 From: Sebastian Lorenz Date: Tue, 22 Jul 2025 19:52:39 +0200 Subject: [PATCH 1/2] node, chain: Add extensible compression support for RPC requests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace boolean compression_enabled with Compression enum (None, Gzip) - Support per-provider compression configuration via "compression" field - Add placeholders for future compression methods (Brotli, Deflate) - Update transport layer to handle compression enum with match statement - Add comprehensive unit tests for compression configuration parsing - Update example configuration and documentation Configuration examples: compression = "gzip" # Enable gzip compression compression = "none" # Disable compression (default) Addresses issue #5671 with future-extensible design. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- Cargo.lock | 14 ++++ chain/ethereum/src/transport.rs | 23 +++-- graph/Cargo.toml | 2 +- graph/src/endpoint.rs | 21 +++++ node/claude/plans/gzip-compression-plan.md | 98 ++++++++++++++++++++++ node/resources/tests/full_config.toml | 1 + node/src/chain.rs | 4 +- node/src/config.rs | 72 ++++++++++++++++ 8 files changed, 228 insertions(+), 7 deletions(-) create mode 100644 node/claude/plans/gzip-compression-plan.md diff --git a/Cargo.lock b/Cargo.lock index caf8fd8ffb5..5f3491fc209 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -173,6 +173,19 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-compression" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddb939d66e4ae03cee6091612804ba446b12878410cfa17f785f4dd67d4014e8" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-graphql" version = "7.0.15" @@ -4447,6 +4460,7 @@ version = "0.12.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d19c46a6fdd48bc4dab94b6103fccc55d34c67cc0ad04653aad4ea2a07cd7bbb" dependencies = [ + "async-compression", "base64 0.22.1", "bytes", "encoding_rs", diff --git a/chain/ethereum/src/transport.rs b/chain/ethereum/src/transport.rs index ef571efacb8..5c58403ac9d 100644 --- a/chain/ethereum/src/transport.rs +++ b/chain/ethereum/src/transport.rs @@ -1,5 +1,5 @@ use graph::components::network_provider::ProviderName; -use graph::endpoint::{EndpointMetrics, RequestLabels}; +use graph::endpoint::{Compression, EndpointMetrics, RequestLabels}; use jsonrpc_core::types::Call; use jsonrpc_core::Value; @@ -54,12 +54,25 @@ impl Transport { headers: graph::http::HeaderMap, metrics: Arc, provider: impl AsRef, + compression: Compression, ) -> Self { // Unwrap: This only fails if something is wrong with the system's TLS config. - let client = reqwest::Client::builder() - .default_headers(headers) - .build() - .unwrap(); + let mut client_builder = reqwest::Client::builder().default_headers(headers); + + match compression { + Compression::Gzip => { + // Enable gzip compression/decompression for requests and responses + client_builder = client_builder.gzip(true); + } + Compression::None => { + // No compression + } // Future compression methods can be handled here: + // Compression::Brotli => { + // client_builder = client_builder.brotli(true); + // } + } + + let client = client_builder.build().unwrap(); Transport::RPC { client: http::Http::with_client(client, rpc), diff --git a/graph/Cargo.toml b/graph/Cargo.toml index bb4287b2c31..655adfed97d 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -26,7 +26,7 @@ chrono = "0.4.41" envconfig = "0.11.0" Inflector = "0.11.3" atty = "0.2" -reqwest = { version = "0.12.15", features = ["json", "stream", "multipart"] } +reqwest = { version = "0.12.15", features = ["json", "stream", "multipart", "gzip"] } ethabi = "17.2" hex = "0.4.3" http0 = { version = "0", package = "http" } diff --git a/graph/src/endpoint.rs b/graph/src/endpoint.rs index bdff8dc8135..9602095da28 100644 --- a/graph/src/endpoint.rs +++ b/graph/src/endpoint.rs @@ -7,6 +7,7 @@ use std::{ }; use prometheus::IntCounterVec; +use serde::{Deserialize, Serialize}; use slog::{warn, Logger}; use crate::components::network_provider::ProviderName; @@ -17,6 +18,26 @@ use crate::{components::metrics::MetricsRegistry, data::value::Word}; /// avoid locking since we don't need to modify the entire struture. type ProviderCount = Arc>; +/// Compression methods for RPC transports +#[derive(Copy, Clone, Debug, Deserialize, Serialize, PartialEq)] +pub enum Compression { + #[serde(rename = "none")] + None, + #[serde(rename = "gzip")] + Gzip, + // Future compression methods can be added here: + // #[serde(rename = "brotli")] + // Brotli, + // #[serde(rename = "deflate")] + // Deflate, +} + +impl Default for Compression { + fn default() -> Self { + Compression::None + } +} + /// This struct represents all the current labels except for the result /// which is added separately. If any new labels are necessary they should /// remain in the same order as added in [`EndpointMetrics::new`] diff --git a/node/claude/plans/gzip-compression-plan.md b/node/claude/plans/gzip-compression-plan.md new file mode 100644 index 00000000000..407105af27c --- /dev/null +++ b/node/claude/plans/gzip-compression-plan.md @@ -0,0 +1,98 @@ +# Plan: Implement Extensible Compression for RPC Requests + +## Overview +Add extensible compression support for Graph Node's outgoing RPC requests to upstream providers, configurable on a per-provider basis with future compression methods in mind. + +## Implementation Steps (COMPLETED) + +### 1. ✅ Create Compression Enum (`node/src/config.rs`) +- Added `Compression` enum with `None` and `Gzip` variants +- Commented placeholders for future compression methods (Brotli, Deflate) +- Default implementation returns `Compression::None` + +### 2. ✅ Update Configuration Structure (`node/src/config.rs`) +- Replaced `compression_enabled: bool` with `compression: Compression` field in `Web3Provider` struct +- Updated all existing code to use new enum +- Added unit tests for both "gzip" and "none" compression options + +### 3. ✅ Modify HTTP Transport (`chain/ethereum/src/transport.rs`) +- Updated `Transport::new_rpc()` to accept `Compression` enum parameter +- Implemented match statement for different compression types +- Added comments showing where future compression methods can be added +- Uses reqwest's `.gzip(true)` for automatic compression/decompression + +### 4. ✅ Update Transport Creation (`node/src/chain.rs`) +- Pass compression enum from config to transport +- Updated logging to show compression method using debug format + +### 5. ✅ Update Dependencies (`graph/Cargo.toml`) +- Added "gzip" feature to reqwest dependency + +### 6. ✅ Update Test Configuration +- Updated `full_config.toml` example to use new enum format +- Added comprehensive unit tests for compression parsing + +## Configuration Examples + +### Gzip Compression +```toml +[chains.mainnet] +provider = [ + { + label = "mainnet-rpc", + details = { + type = "web3", + url = "http://rpc.example.com", + features = ["archive"], + compression = "gzip" + } + } +] +``` + +### No Compression (Default) +```toml +[chains.mainnet] +provider = [ + { + label = "mainnet-rpc", + details = { + type = "web3", + url = "http://rpc.example.com", + features = ["archive"], + compression = "none" # or omit entirely + } + } +] +``` + +### Future Extension Example +```rust +// Future compression methods can be easily added: +#[derive(Copy, Clone, Debug, Deserialize, Serialize, PartialEq)] +pub enum Compression { + #[serde(rename = "none")] + None, + #[serde(rename = "gzip")] + Gzip, + #[serde(rename = "brotli")] + Brotli, + #[serde(rename = "deflate")] + Deflate, +} + +// And handled in transport: +match compression { + Compression::Gzip => client_builder = client_builder.gzip(true), + Compression::Brotli => client_builder = client_builder.brotli(true), + Compression::Deflate => client_builder = client_builder.deflate(true), + Compression::None => {} // No compression +} +``` + +## Benefits of This Implementation +- **Extensible**: Easy to add new compression methods without breaking changes +- **Backward Compatible**: Defaults to no compression, existing configs work unchanged +- **Type Safe**: Enum prevents invalid compression method strings +- **Future Proof**: Clear pattern for adding Brotli, Deflate, etc. +- **Per-Provider**: Each RPC provider can have different compression settings \ No newline at end of file diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 1f907539194..9d1a8e5f835 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -48,6 +48,7 @@ shard = "primary" provider = [ { label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, { label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }}, + { label = "mainnet-2", details = { type = "web3", url = "http://rpc.mainnet.io", features = ["archive"], compression = "gzip" }}, { label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }}, { label = "substreams", details = { type = "substreams", url = "http://localhost:9000", features = [] }}, ] diff --git a/node/src/chain.rs b/node/src/chain.rs index 343b783908f..f3444c436e8 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -282,7 +282,8 @@ pub async fn create_ethereum_networks_for_chain( logger, "Creating transport"; "url" => &web3.url, - "capabilities" => capabilities + "capabilities" => capabilities, + "compression" => ?web3.compression ); use crate::config::Transport::*; @@ -293,6 +294,7 @@ pub async fn create_ethereum_networks_for_chain( web3.headers.clone(), endpoint_metrics.cheap_clone(), &provider.label, + web3.compression, ), Ipc => Transport::new_ipc(&web3.url).await, Ws => Transport::new_ws(&web3.url).await, diff --git a/node/src/config.rs b/node/src/config.rs index 83ea7bf1cc3..99f7a6f4d44 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -2,6 +2,7 @@ use graph::{ anyhow::Error, blockchain::BlockchainKind, components::network_provider::ChainName, + endpoint::Compression, env::ENV_VARS, firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN}, itertools::Itertools, @@ -502,6 +503,7 @@ impl ChainSection { features, headers: Default::default(), rules: vec![], + compression: Compression::None, }), }; let entry = chains.entry(name.to_string()).or_insert_with(|| Chain { @@ -705,6 +707,10 @@ pub struct Web3Provider { #[serde(default, rename = "match")] rules: Vec, + + /// Compression method for RPC requests and responses + #[serde(default)] + pub compression: Compression, } impl Web3Provider { @@ -901,6 +907,7 @@ impl<'de> Deserialize<'de> for Provider { .ok_or_else(|| serde::de::Error::missing_field("features"))?, headers: headers.unwrap_or_else(HeaderMap::new), rules: nodes, + compression: Compression::None, }), }; @@ -1307,6 +1314,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1333,6 +1341,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1440,6 +1449,7 @@ mod tests { features, headers, rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1465,6 +1475,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1834,6 +1845,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1846,6 +1858,66 @@ mod tests { assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled); } + #[test] + fn it_parses_web3_provider_with_compression() { + let actual = toml::from_str( + r#" + label = "compressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "gzip" } + "#, + ) + .unwrap(); + + assert_eq!( + Provider { + label: "compressed".to_owned(), + details: ProviderDetails::Web3(Web3Provider { + transport: Transport::Rpc, + url: "http://localhost:8545".to_owned(), + features: { + let mut features = BTreeSet::new(); + features.insert("archive".to_string()); + features + }, + headers: HeaderMap::new(), + rules: Vec::new(), + compression: Compression::Gzip, + }), + }, + actual + ); + } + + #[test] + fn it_parses_web3_provider_with_no_compression() { + let actual = toml::from_str( + r#" + label = "uncompressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "none" } + "#, + ) + .unwrap(); + + assert_eq!( + Provider { + label: "uncompressed".to_owned(), + details: ProviderDetails::Web3(Web3Provider { + transport: Transport::Rpc, + url: "http://localhost:8545".to_owned(), + features: { + let mut features = BTreeSet::new(); + features.insert("archive".to_string()); + features + }, + headers: HeaderMap::new(), + rules: Vec::new(), + compression: Compression::None, + }), + }, + actual + ); + } + #[test] fn duplicated_labels_are_not_allowed_within_chain() { let mut actual = toml::from_str::( From 8dc6615ffb6ec3a124c042cf3a7133d2683c8892 Mon Sep 17 00:00:00 2001 From: Sebastian Lorenz Date: Tue, 22 Jul 2025 21:48:25 +0200 Subject: [PATCH 2/2] delete node/claude/plans/gzip-compression-plan.md --- node/claude/plans/gzip-compression-plan.md | 98 ---------------------- 1 file changed, 98 deletions(-) delete mode 100644 node/claude/plans/gzip-compression-plan.md diff --git a/node/claude/plans/gzip-compression-plan.md b/node/claude/plans/gzip-compression-plan.md deleted file mode 100644 index 407105af27c..00000000000 --- a/node/claude/plans/gzip-compression-plan.md +++ /dev/null @@ -1,98 +0,0 @@ -# Plan: Implement Extensible Compression for RPC Requests - -## Overview -Add extensible compression support for Graph Node's outgoing RPC requests to upstream providers, configurable on a per-provider basis with future compression methods in mind. - -## Implementation Steps (COMPLETED) - -### 1. ✅ Create Compression Enum (`node/src/config.rs`) -- Added `Compression` enum with `None` and `Gzip` variants -- Commented placeholders for future compression methods (Brotli, Deflate) -- Default implementation returns `Compression::None` - -### 2. ✅ Update Configuration Structure (`node/src/config.rs`) -- Replaced `compression_enabled: bool` with `compression: Compression` field in `Web3Provider` struct -- Updated all existing code to use new enum -- Added unit tests for both "gzip" and "none" compression options - -### 3. ✅ Modify HTTP Transport (`chain/ethereum/src/transport.rs`) -- Updated `Transport::new_rpc()` to accept `Compression` enum parameter -- Implemented match statement for different compression types -- Added comments showing where future compression methods can be added -- Uses reqwest's `.gzip(true)` for automatic compression/decompression - -### 4. ✅ Update Transport Creation (`node/src/chain.rs`) -- Pass compression enum from config to transport -- Updated logging to show compression method using debug format - -### 5. ✅ Update Dependencies (`graph/Cargo.toml`) -- Added "gzip" feature to reqwest dependency - -### 6. ✅ Update Test Configuration -- Updated `full_config.toml` example to use new enum format -- Added comprehensive unit tests for compression parsing - -## Configuration Examples - -### Gzip Compression -```toml -[chains.mainnet] -provider = [ - { - label = "mainnet-rpc", - details = { - type = "web3", - url = "http://rpc.example.com", - features = ["archive"], - compression = "gzip" - } - } -] -``` - -### No Compression (Default) -```toml -[chains.mainnet] -provider = [ - { - label = "mainnet-rpc", - details = { - type = "web3", - url = "http://rpc.example.com", - features = ["archive"], - compression = "none" # or omit entirely - } - } -] -``` - -### Future Extension Example -```rust -// Future compression methods can be easily added: -#[derive(Copy, Clone, Debug, Deserialize, Serialize, PartialEq)] -pub enum Compression { - #[serde(rename = "none")] - None, - #[serde(rename = "gzip")] - Gzip, - #[serde(rename = "brotli")] - Brotli, - #[serde(rename = "deflate")] - Deflate, -} - -// And handled in transport: -match compression { - Compression::Gzip => client_builder = client_builder.gzip(true), - Compression::Brotli => client_builder = client_builder.brotli(true), - Compression::Deflate => client_builder = client_builder.deflate(true), - Compression::None => {} // No compression -} -``` - -## Benefits of This Implementation -- **Extensible**: Easy to add new compression methods without breaking changes -- **Backward Compatible**: Defaults to no compression, existing configs work unchanged -- **Type Safe**: Enum prevents invalid compression method strings -- **Future Proof**: Clear pattern for adding Brotli, Deflate, etc. -- **Per-Provider**: Each RPC provider can have different compression settings \ No newline at end of file