diff --git a/Cargo.lock b/Cargo.lock index caf8fd8ffb5..5f3491fc209 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -173,6 +173,19 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-compression" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddb939d66e4ae03cee6091612804ba446b12878410cfa17f785f4dd67d4014e8" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-graphql" version = "7.0.15" @@ -4447,6 +4460,7 @@ version = "0.12.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d19c46a6fdd48bc4dab94b6103fccc55d34c67cc0ad04653aad4ea2a07cd7bbb" dependencies = [ + "async-compression", "base64 0.22.1", "bytes", "encoding_rs", diff --git a/chain/ethereum/src/transport.rs b/chain/ethereum/src/transport.rs index ef571efacb8..5c58403ac9d 100644 --- a/chain/ethereum/src/transport.rs +++ b/chain/ethereum/src/transport.rs @@ -1,5 +1,5 @@ use graph::components::network_provider::ProviderName; -use graph::endpoint::{EndpointMetrics, RequestLabels}; +use graph::endpoint::{Compression, EndpointMetrics, RequestLabels}; use jsonrpc_core::types::Call; use jsonrpc_core::Value; @@ -54,12 +54,25 @@ impl Transport { headers: graph::http::HeaderMap, metrics: Arc, provider: impl AsRef, + compression: Compression, ) -> Self { // Unwrap: This only fails if something is wrong with the system's TLS config. - let client = reqwest::Client::builder() - .default_headers(headers) - .build() - .unwrap(); + let mut client_builder = reqwest::Client::builder().default_headers(headers); + + match compression { + Compression::Gzip => { + // Enable gzip compression/decompression for requests and responses + client_builder = client_builder.gzip(true); + } + Compression::None => { + // No compression + } // Future compression methods can be handled here: + // Compression::Brotli => { + // client_builder = client_builder.brotli(true); + // } + } + + let client = client_builder.build().unwrap(); Transport::RPC { client: http::Http::with_client(client, rpc), diff --git a/graph/Cargo.toml b/graph/Cargo.toml index bb4287b2c31..655adfed97d 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -26,7 +26,7 @@ chrono = "0.4.41" envconfig = "0.11.0" Inflector = "0.11.3" atty = "0.2" -reqwest = { version = "0.12.15", features = ["json", "stream", "multipart"] } +reqwest = { version = "0.12.15", features = ["json", "stream", "multipart", "gzip"] } ethabi = "17.2" hex = "0.4.3" http0 = { version = "0", package = "http" } diff --git a/graph/src/endpoint.rs b/graph/src/endpoint.rs index bdff8dc8135..9602095da28 100644 --- a/graph/src/endpoint.rs +++ b/graph/src/endpoint.rs @@ -7,6 +7,7 @@ use std::{ }; use prometheus::IntCounterVec; +use serde::{Deserialize, Serialize}; use slog::{warn, Logger}; use crate::components::network_provider::ProviderName; @@ -17,6 +18,26 @@ use crate::{components::metrics::MetricsRegistry, data::value::Word}; /// avoid locking since we don't need to modify the entire struture. type ProviderCount = Arc>; +/// Compression methods for RPC transports +#[derive(Copy, Clone, Debug, Deserialize, Serialize, PartialEq)] +pub enum Compression { + #[serde(rename = "none")] + None, + #[serde(rename = "gzip")] + Gzip, + // Future compression methods can be added here: + // #[serde(rename = "brotli")] + // Brotli, + // #[serde(rename = "deflate")] + // Deflate, +} + +impl Default for Compression { + fn default() -> Self { + Compression::None + } +} + /// This struct represents all the current labels except for the result /// which is added separately. If any new labels are necessary they should /// remain in the same order as added in [`EndpointMetrics::new`] diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 1f907539194..9d1a8e5f835 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -48,6 +48,7 @@ shard = "primary" provider = [ { label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, { label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }}, + { label = "mainnet-2", details = { type = "web3", url = "http://rpc.mainnet.io", features = ["archive"], compression = "gzip" }}, { label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }}, { label = "substreams", details = { type = "substreams", url = "http://localhost:9000", features = [] }}, ] diff --git a/node/src/chain.rs b/node/src/chain.rs index 343b783908f..f3444c436e8 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -282,7 +282,8 @@ pub async fn create_ethereum_networks_for_chain( logger, "Creating transport"; "url" => &web3.url, - "capabilities" => capabilities + "capabilities" => capabilities, + "compression" => ?web3.compression ); use crate::config::Transport::*; @@ -293,6 +294,7 @@ pub async fn create_ethereum_networks_for_chain( web3.headers.clone(), endpoint_metrics.cheap_clone(), &provider.label, + web3.compression, ), Ipc => Transport::new_ipc(&web3.url).await, Ws => Transport::new_ws(&web3.url).await, diff --git a/node/src/config.rs b/node/src/config.rs index 83ea7bf1cc3..99f7a6f4d44 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -2,6 +2,7 @@ use graph::{ anyhow::Error, blockchain::BlockchainKind, components::network_provider::ChainName, + endpoint::Compression, env::ENV_VARS, firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN}, itertools::Itertools, @@ -502,6 +503,7 @@ impl ChainSection { features, headers: Default::default(), rules: vec![], + compression: Compression::None, }), }; let entry = chains.entry(name.to_string()).or_insert_with(|| Chain { @@ -705,6 +707,10 @@ pub struct Web3Provider { #[serde(default, rename = "match")] rules: Vec, + + /// Compression method for RPC requests and responses + #[serde(default)] + pub compression: Compression, } impl Web3Provider { @@ -901,6 +907,7 @@ impl<'de> Deserialize<'de> for Provider { .ok_or_else(|| serde::de::Error::missing_field("features"))?, headers: headers.unwrap_or_else(HeaderMap::new), rules: nodes, + compression: Compression::None, }), }; @@ -1307,6 +1314,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1333,6 +1341,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1440,6 +1449,7 @@ mod tests { features, headers, rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1465,6 +1475,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1834,6 +1845,7 @@ mod tests { features: BTreeSet::new(), headers: HeaderMap::new(), rules: Vec::new(), + compression: Compression::None, }), }, actual @@ -1846,6 +1858,66 @@ mod tests { assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled); } + #[test] + fn it_parses_web3_provider_with_compression() { + let actual = toml::from_str( + r#" + label = "compressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "gzip" } + "#, + ) + .unwrap(); + + assert_eq!( + Provider { + label: "compressed".to_owned(), + details: ProviderDetails::Web3(Web3Provider { + transport: Transport::Rpc, + url: "http://localhost:8545".to_owned(), + features: { + let mut features = BTreeSet::new(); + features.insert("archive".to_string()); + features + }, + headers: HeaderMap::new(), + rules: Vec::new(), + compression: Compression::Gzip, + }), + }, + actual + ); + } + + #[test] + fn it_parses_web3_provider_with_no_compression() { + let actual = toml::from_str( + r#" + label = "uncompressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive"], compression = "none" } + "#, + ) + .unwrap(); + + assert_eq!( + Provider { + label: "uncompressed".to_owned(), + details: ProviderDetails::Web3(Web3Provider { + transport: Transport::Rpc, + url: "http://localhost:8545".to_owned(), + features: { + let mut features = BTreeSet::new(); + features.insert("archive".to_string()); + features + }, + headers: HeaderMap::new(), + rules: Vec::new(), + compression: Compression::None, + }), + }, + actual + ); + } + #[test] fn duplicated_labels_are_not_allowed_within_chain() { let mut actual = toml::from_str::(