diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index 1e66fea49c..e1581a3f23 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -2,6 +2,8 @@ ## vNext +- Add HTTP compression support with `gzip-http` and `zstd-http` feature flags + ## 0.30.0 Released 2025-May-23 diff --git a/opentelemetry-otlp/Cargo.toml b/opentelemetry-otlp/Cargo.toml index 9987d233a8..f0dcb2d6ad 100644 --- a/opentelemetry-otlp/Cargo.toml +++ b/opentelemetry-otlp/Cargo.toml @@ -43,6 +43,10 @@ serde = { workspace = true, features = ["derive"], optional = true } thiserror = { workspace = true } serde_json = { workspace = true, optional = true } +# compression dependencies +flate2 = { version = "1.1.2", optional = true } +zstd = { version = "0.13", optional = true } + [dev-dependencies] tokio-stream = { workspace = true, features = ["net"] } opentelemetry_sdk = { features = ["trace", "testing"], path = "../opentelemetry-sdk" } @@ -50,6 +54,7 @@ tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } futures-util = { workspace = true } temp-env = { workspace = true } tonic = { workspace = true, features = ["router", "server"] } +async-trait = { workspace = true } [features] # telemetry pillars and functions @@ -67,6 +72,10 @@ default = ["http-proto", "reqwest-blocking-client", "trace", "metrics", "logs", grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic"] gzip-tonic = ["tonic/gzip"] zstd-tonic = ["tonic/zstd"] + +# http compression +gzip-http = ["flate2"] +zstd-http = ["zstd"] tls = ["tonic/tls-ring"] tls-roots = ["tls", "tonic/tls-native-roots"] tls-webpki-roots = ["tls", "tonic/tls-webpki-roots"] diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index a0c9718fe8..3a66266652 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -1,4 +1,5 @@ use super::OtlpHttpClient; +use http::header::CONTENT_ENCODING; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; @@ -14,14 +15,20 @@ impl LogExporter for OtlpHttpClient { .clone() .ok_or(OTelSdkError::AlreadyShutdown)?; - let (body, content_type) = self + let (body, content_type, content_encoding) = self .build_logs_export_body(batch) .map_err(OTelSdkError::InternalFailure)?; - let mut request = http::Request::builder() + let mut request_builder = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) - .header(CONTENT_TYPE, content_type) + .header(CONTENT_TYPE, content_type); + + if let Some(encoding) = content_encoding { + request_builder = request_builder.header(CONTENT_ENCODING, encoding); + } + + let mut request = request_builder .body(body.into()) .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?; diff --git a/opentelemetry-otlp/src/exporter/http/metrics.rs b/opentelemetry-otlp/src/exporter/http/metrics.rs index a8db53f9ab..11a3041585 100644 --- a/opentelemetry-otlp/src/exporter/http/metrics.rs +++ b/opentelemetry-otlp/src/exporter/http/metrics.rs @@ -19,13 +19,21 @@ impl MetricsClient for OtlpHttpClient { _ => Err(OTelSdkError::AlreadyShutdown), })?; - let (body, content_type) = self.build_metrics_export_body(metrics).ok_or_else(|| { - OTelSdkError::InternalFailure("Failed to serialize metrics".to_string()) - })?; - let mut request = http::Request::builder() + let (body, content_type, content_encoding) = + self.build_metrics_export_body(metrics).ok_or_else(|| { + OTelSdkError::InternalFailure("Failed to serialize metrics".to_string()) + })?; + + let mut request_builder = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) - .header(CONTENT_TYPE, content_type) + .header(CONTENT_TYPE, content_type); + + if let Some(encoding) = content_encoding { + request_builder = request_builder.header("Content-Encoding", encoding); + } + + let mut request = request_builder .body(body.into()) .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?; diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 5d31660ab6..e7a284c33b 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -4,7 +4,6 @@ use super::{ }; use crate::{ExportConfig, Protocol, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS}; use http::{HeaderName, HeaderValue, Uri}; -#[cfg(feature = "http-json")] use opentelemetry::otel_debug; use opentelemetry_http::HttpClient; use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; @@ -48,8 +47,11 @@ pub struct HttpConfig { /// Select the HTTP client client: Option>, - /// Additional headers to send to the collector. + /// Additional headers to send to the OTLP endpoint. headers: Option>, + + /// The compression algorithm to use when communicating with the OTLP endpoint. + compression: Option, } /// Configuration for the OTLP HTTP exporter. @@ -107,6 +109,7 @@ impl HttpExporterBuilder { signal_endpoint_path: &str, signal_timeout_var: &str, signal_http_headers_var: &str, + signal_compression_var: &str, ) -> Result { let endpoint = resolve_http_endpoint( signal_endpoint_var, @@ -114,6 +117,32 @@ impl HttpExporterBuilder { self.exporter_config.endpoint.as_deref(), )?; + let compression = self.resolve_compression(signal_compression_var)?; + + // Validate compression is supported at build time + if let Some(compression_alg) = &compression { + match compression_alg { + crate::Compression::Gzip => { + #[cfg(not(feature = "gzip-http"))] + { + return Err(ExporterBuildError::UnsupportedCompressionAlgorithm( + "gzip compression requested but gzip-http feature not enabled" + .to_string(), + )); + } + } + crate::Compression::Zstd => { + #[cfg(not(feature = "zstd-http"))] + { + return Err(ExporterBuildError::UnsupportedCompressionAlgorithm( + "zstd compression requested but zstd-http feature not enabled" + .to_string(), + )); + } + } + } + } + let timeout = resolve_timeout(signal_timeout_var, self.exporter_config.timeout.as_ref()); #[allow(unused_mut)] // TODO - clippy thinks mut is not needed, but it is @@ -193,15 +222,23 @@ impl HttpExporterBuilder { headers, self.exporter_config.protocol, timeout, + compression, )) } + fn resolve_compression( + &self, + env_override: &str, + ) -> Result, super::ExporterBuildError> { + super::resolve_compression_from_env(self.http_config.compression, env_override) + } + /// Create a log exporter with the current configuration #[cfg(feature = "trace")] pub fn build_span_exporter(mut self) -> Result { use crate::{ - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_HEADERS, - OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, + OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, + OTEL_EXPORTER_OTLP_TRACES_HEADERS, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, }; let client = self.build_client( @@ -209,6 +246,7 @@ impl HttpExporterBuilder { "/v1/traces", OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, OTEL_EXPORTER_OTLP_TRACES_HEADERS, + OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, )?; Ok(crate::SpanExporter::from_http(client)) @@ -218,8 +256,8 @@ impl HttpExporterBuilder { #[cfg(feature = "logs")] pub fn build_log_exporter(mut self) -> Result { use crate::{ - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, OTEL_EXPORTER_OTLP_LOGS_HEADERS, - OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, + OTEL_EXPORTER_OTLP_LOGS_COMPRESSION, OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, + OTEL_EXPORTER_OTLP_LOGS_HEADERS, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, }; let client = self.build_client( @@ -227,6 +265,7 @@ impl HttpExporterBuilder { "/v1/logs", OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, OTEL_EXPORTER_OTLP_LOGS_HEADERS, + OTEL_EXPORTER_OTLP_LOGS_COMPRESSION, )?; Ok(crate::LogExporter::from_http(client)) @@ -239,8 +278,8 @@ impl HttpExporterBuilder { temporality: opentelemetry_sdk::metrics::Temporality, ) -> Result { use crate::{ - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_HEADERS, - OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, + OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, + OTEL_EXPORTER_OTLP_METRICS_HEADERS, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, }; let client = self.build_client( @@ -248,6 +287,7 @@ impl HttpExporterBuilder { "/v1/metrics", OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, OTEL_EXPORTER_OTLP_METRICS_HEADERS, + OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, )?; Ok(crate::MetricExporter::from_http(client, temporality)) @@ -261,12 +301,45 @@ pub(crate) struct OtlpHttpClient { headers: HashMap, protocol: Protocol, _timeout: Duration, + compression: Option, #[allow(dead_code)] // would be removed once we support set_resource for metrics and traces. resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } impl OtlpHttpClient { + /// Compress data using gzip or zstd if the user has requested it and the relevant feature + /// has been enabled. If the user has requested it but the feature has not been enabled, + /// we should catch this at exporter build time and never get here. + fn process_body(&self, body: Vec) -> Result<(Vec, Option<&'static str>), String> { + match self.compression { + #[cfg(feature = "gzip-http")] + Some(crate::Compression::Gzip) => { + use flate2::{write::GzEncoder, Compression}; + use std::io::Write; + + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&body).map_err(|e| e.to_string())?; + let compressed = encoder.finish().map_err(|e| e.to_string())?; + Ok((compressed, Some("gzip"))) + } + #[cfg(not(feature = "gzip-http"))] + Some(crate::Compression::Gzip) => { + Err("gzip compression requested but gzip-http feature not enabled".to_string()) + } + #[cfg(feature = "zstd-http")] + Some(crate::Compression::Zstd) => { + let compressed = zstd::bulk::compress(&body, 0).map_err(|e| e.to_string())?; + Ok((compressed, Some("zstd"))) + } + #[cfg(not(feature = "zstd-http"))] + Some(crate::Compression::Zstd) => { + Err("zstd compression requested but zstd-http feature not enabled".to_string()) + } + None => Ok((body, None)), + } + } + #[allow(clippy::mutable_key_type)] // http headers are not mutated fn new( client: Arc, @@ -274,6 +347,7 @@ impl OtlpHttpClient { headers: HashMap, protocol: Protocol, timeout: Duration, + compression: Option, ) -> Self { OtlpHttpClient { client: Mutex::new(Some(client)), @@ -281,6 +355,7 @@ impl OtlpHttpClient { headers, protocol, _timeout: timeout, + compression, resource: ResourceAttributesWithSchema::default(), } } @@ -289,59 +364,75 @@ impl OtlpHttpClient { fn build_trace_export_body( &self, spans: Vec, - ) -> Result<(Vec, &'static str), String> { + ) -> Result<(Vec, &'static str, Option<&'static str>), String> { use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; let resource_spans = group_spans_by_resource_and_scope(spans, &self.resource); let req = ExportTraceServiceRequest { resource_spans }; - match self.protocol { + let (body, content_type) = match self.protocol { #[cfg(feature = "http-json")] Protocol::HttpJson => match serde_json::to_string_pretty(&req) { - Ok(json) => Ok((json.into_bytes(), "application/json")), - Err(e) => Err(e.to_string()), + Ok(json) => (json.into_bytes(), "application/json"), + Err(e) => return Err(e.to_string()), }, - _ => Ok((req.encode_to_vec(), "application/x-protobuf")), - } + _ => (req.encode_to_vec(), "application/x-protobuf"), + }; + + let (processed_body, content_encoding) = self.process_body(body)?; + Ok((processed_body, content_type, content_encoding)) } #[cfg(feature = "logs")] fn build_logs_export_body( &self, logs: LogBatch<'_>, - ) -> Result<(Vec, &'static str), String> { + ) -> Result<(Vec, &'static str, Option<&'static str>), String> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); let req = ExportLogsServiceRequest { resource_logs }; - match self.protocol { + let (body, content_type) = match self.protocol { #[cfg(feature = "http-json")] Protocol::HttpJson => match serde_json::to_string_pretty(&req) { - Ok(json) => Ok((json.into(), "application/json")), - Err(e) => Err(e.to_string()), + Ok(json) => (json.into_bytes(), "application/json"), + Err(e) => return Err(e.to_string()), }, - _ => Ok((req.encode_to_vec(), "application/x-protobuf")), - } + _ => (req.encode_to_vec(), "application/x-protobuf"), + }; + + let (processed_body, content_encoding) = self.process_body(body)?; + Ok((processed_body, content_type, content_encoding)) } #[cfg(feature = "metrics")] fn build_metrics_export_body( &self, metrics: &ResourceMetrics, - ) -> Option<(Vec, &'static str)> { + ) -> Option<(Vec, &'static str, Option<&'static str>)> { use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; let req: ExportMetricsServiceRequest = metrics.into(); - match self.protocol { + let (body, content_type) = match self.protocol { #[cfg(feature = "http-json")] Protocol::HttpJson => match serde_json::to_string_pretty(&req) { - Ok(json) => Some((json.into(), "application/json")), + Ok(json) => (json.into_bytes(), "application/json"), Err(e) => { otel_debug!(name: "JsonSerializationFaied", error = e.to_string()); - None + return None; } }, - _ => Some((req.encode_to_vec(), "application/x-protobuf")), + _ => (req.encode_to_vec(), "application/x-protobuf"), + }; + + match self.process_body(body) { + Ok((processed_body, content_encoding)) => { + Some((processed_body, content_type, content_encoding)) + } + Err(e) => { + otel_debug!(name: "CompressionFailed", error = e); + None + } } } } @@ -432,6 +523,9 @@ pub trait WithHttpConfig { /// Set additional headers to send to the collector. fn with_headers(self, headers: HashMap) -> Self; + + /// Set the compression algorithm to use when communicating with the collector. + fn with_compression(self, compression: crate::Compression) -> Self; } impl WithHttpConfig for B { @@ -451,6 +545,11 @@ impl WithHttpConfig for B { }); self } + + fn with_compression(mut self, compression: crate::Compression) -> Self { + self.http_client_config().compression = Some(compression); + self + } } #[cfg(test)] @@ -695,6 +794,7 @@ mod tests { http_config: HttpConfig { client: None, headers: Some(initial_headers), + compression: None, }, exporter_config: crate::ExportConfig::default(), }; @@ -744,4 +844,418 @@ mod tests { assert_eq!(url, "http://localhost:4318/v1/tracesbutnotreally"); }); } + + #[cfg(feature = "gzip-http")] + mod compression_tests { + use super::super::OtlpHttpClient; + use flate2::read::GzDecoder; + use opentelemetry_http::{Bytes, HttpClient}; + use std::io::Read; + + #[test] + fn test_gzip_compression_and_decompression() { + let client = OtlpHttpClient::new( + std::sync::Arc::new(MockHttpClient), + "http://localhost:4318".parse().unwrap(), + std::collections::HashMap::new(), + crate::Protocol::HttpBinary, + std::time::Duration::from_secs(10), + Some(crate::Compression::Gzip), + ); + + // Test with some sample data + let test_data = b"Hello, world! This is test data for compression."; + let result = client.process_body(test_data.to_vec()).unwrap(); + let (compressed_body, content_encoding) = result; + + // Verify encoding header is set + assert_eq!(content_encoding, Some("gzip")); + + // Verify we can decompress the body + let mut decoder = GzDecoder::new(&compressed_body[..]); + let mut decompressed = Vec::new(); + decoder.read_to_end(&mut decompressed).unwrap(); + + // Verify decompressed data matches original + assert_eq!(decompressed, test_data); + // Verify compression actually happened (compressed should be different) + assert_ne!(compressed_body, test_data.to_vec()); + } + + #[cfg(feature = "zstd-http")] + #[test] + fn test_zstd_compression_and_decompression() { + let client = OtlpHttpClient::new( + std::sync::Arc::new(MockHttpClient), + "http://localhost:4318".parse().unwrap(), + std::collections::HashMap::new(), + crate::Protocol::HttpBinary, + std::time::Duration::from_secs(10), + Some(crate::Compression::Zstd), + ); + + // Test with some sample data + let test_data = b"Hello, world! This is test data for zstd compression."; + let result = client.process_body(test_data.to_vec()).unwrap(); + let (compressed_body, content_encoding) = result; + + // Verify encoding header is set + assert_eq!(content_encoding, Some("zstd")); + + // Verify we can decompress the body + let decompressed = zstd::bulk::decompress(&compressed_body, test_data.len()).unwrap(); + + // Verify decompressed data matches original + assert_eq!(decompressed, test_data); + // Verify compression actually happened (compressed should be different) + assert_ne!(compressed_body, test_data.to_vec()); + } + + #[test] + fn test_no_compression_when_disabled() { + let client = OtlpHttpClient::new( + std::sync::Arc::new(MockHttpClient), + "http://localhost:4318".parse().unwrap(), + std::collections::HashMap::new(), + crate::Protocol::HttpBinary, + std::time::Duration::from_secs(10), + None, // No compression + ); + + let body = vec![1, 2, 3, 4]; + let result = client.process_body(body.clone()).unwrap(); + let (result_body, content_encoding) = result; + + // Body should be unchanged and no encoding header + assert_eq!(result_body, body); + assert_eq!(content_encoding, None); + } + + #[cfg(not(feature = "gzip-http"))] + #[test] + fn test_gzip_error_when_feature_disabled() { + let client = OtlpHttpClient::new( + std::sync::Arc::new(MockHttpClient), + "http://localhost:4318".parse().unwrap(), + std::collections::HashMap::new(), + crate::Protocol::HttpBinary, + std::time::Duration::from_secs(10), + Some(crate::Compression::Gzip), + ); + + let body = vec![1, 2, 3, 4]; + let result = client.process_body(body); + + // Should return error when gzip requested but feature not enabled + assert!(result.is_err()); + assert!(result + .unwrap_err() + .contains("gzip-http feature not enabled")); + } + + #[cfg(not(feature = "zstd-http"))] + #[test] + fn test_zstd_error_when_feature_disabled() { + let client = OtlpHttpClient::new( + std::sync::Arc::new(MockHttpClient), + "http://localhost:4318".parse().unwrap(), + std::collections::HashMap::new(), + crate::Protocol::HttpBinary, + std::time::Duration::from_secs(10), + Some(crate::Compression::Zstd), + ); + + let body = vec![1, 2, 3, 4]; + let result = client.process_body(body); + + // Should return error when zstd requested but feature not enabled + assert!(result.is_err()); + assert!(result + .unwrap_err() + .contains("zstd-http feature not enabled")); + } + + // Mock HTTP client for testing + #[derive(Debug)] + struct MockHttpClient; + + #[async_trait::async_trait] + impl HttpClient for MockHttpClient { + async fn send_bytes( + &self, + _request: http::Request, + ) -> Result, opentelemetry_http::HttpError> { + Ok(http::Response::builder() + .status(200) + .body(Bytes::new()) + .unwrap()) + } + } + } + + mod export_body_tests { + use super::super::OtlpHttpClient; + use opentelemetry_http::{Bytes, HttpClient}; + use std::collections::HashMap; + + #[derive(Debug)] + struct MockHttpClient; + + #[async_trait::async_trait] + impl HttpClient for MockHttpClient { + async fn send_bytes( + &self, + _request: http::Request, + ) -> Result, opentelemetry_http::HttpError> { + Ok(http::Response::builder() + .status(200) + .body(Bytes::new()) + .unwrap()) + } + } + + fn create_test_client( + protocol: crate::Protocol, + compression: Option, + ) -> OtlpHttpClient { + OtlpHttpClient::new( + std::sync::Arc::new(MockHttpClient), + "http://localhost:4318".parse().unwrap(), + HashMap::new(), + protocol, + std::time::Duration::from_secs(10), + compression, + ) + } + + fn create_test_span_data() -> opentelemetry_sdk::trace::SpanData { + use opentelemetry::trace::Status; + use opentelemetry::trace::{ + SpanContext, SpanId, SpanKind, TraceFlags, TraceId, TraceState, + }; + use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks}; + use std::borrow::Cow; + use std::time::{Duration, SystemTime}; + + let span_context = SpanContext::new( + TraceId::from_u128(123), + SpanId::from_u64(456), + TraceFlags::default(), + false, + TraceState::default(), + ); + SpanData { + span_context, + parent_span_id: SpanId::from_u64(0), + span_kind: SpanKind::Internal, + name: Cow::Borrowed("test_span"), + start_time: SystemTime::UNIX_EPOCH, + end_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1), + attributes: vec![], + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: opentelemetry::InstrumentationScope::default(), + } + } + + #[cfg(feature = "trace")] + #[test] + fn test_build_trace_export_body_binary_protocol() { + let client = create_test_client(crate::Protocol::HttpBinary, None); + let span_data = create_test_span_data(); + + let result = client.build_trace_export_body(vec![span_data]).unwrap(); + let (_body, content_type, content_encoding) = result; + + assert_eq!(content_type, "application/x-protobuf"); + assert_eq!(content_encoding, None); + } + + #[cfg(all(feature = "trace", feature = "http-json"))] + #[test] + fn test_build_trace_export_body_json_protocol() { + let client = create_test_client(crate::Protocol::HttpJson, None); + let span_data = create_test_span_data(); + + let result = client.build_trace_export_body(vec![span_data]).unwrap(); + let (_body, content_type, content_encoding) = result; + + assert_eq!(content_type, "application/json"); + assert_eq!(content_encoding, None); + } + + #[cfg(all(feature = "trace", feature = "gzip-http"))] + #[test] + fn test_build_trace_export_body_with_compression() { + let client = + create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip)); + let span_data = create_test_span_data(); + + let result = client.build_trace_export_body(vec![span_data]).unwrap(); + let (_body, content_type, content_encoding) = result; + + assert_eq!(content_type, "application/x-protobuf"); + assert_eq!(content_encoding, Some("gzip")); + } + + #[cfg(feature = "logs")] + fn create_test_log_batch() -> opentelemetry_sdk::logs::LogBatch<'static> { + use opentelemetry_sdk::logs::LogBatch; + + // Use empty batch for simplicity - the method should still handle protocol/compression correctly + LogBatch::new(&[]) + } + + #[cfg(feature = "logs")] + #[test] + fn test_build_logs_export_body_binary_protocol() { + let client = create_test_client(crate::Protocol::HttpBinary, None); + let batch = create_test_log_batch(); + + let result = client.build_logs_export_body(batch).unwrap(); + let (_body, content_type, content_encoding) = result; + + assert_eq!(content_type, "application/x-protobuf"); + assert_eq!(content_encoding, None); + } + + #[cfg(all(feature = "logs", feature = "http-json"))] + #[test] + fn test_build_logs_export_body_json_protocol() { + let client = create_test_client(crate::Protocol::HttpJson, None); + let batch = create_test_log_batch(); + + let result = client.build_logs_export_body(batch).unwrap(); + let (_body, content_type, content_encoding) = result; + + assert_eq!(content_type, "application/json"); + assert_eq!(content_encoding, None); + } + + #[cfg(all(feature = "logs", feature = "gzip-http"))] + #[test] + fn test_build_logs_export_body_with_compression() { + let client = + create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip)); + let batch = create_test_log_batch(); + + let result = client.build_logs_export_body(batch).unwrap(); + let (_body, content_type, content_encoding) = result; + + assert_eq!(content_type, "application/x-protobuf"); + assert_eq!(content_encoding, Some("gzip")); + } + + #[cfg(feature = "metrics")] + #[test] + fn test_build_metrics_export_body_binary_protocol() { + use opentelemetry_sdk::metrics::data::ResourceMetrics; + + let client = create_test_client(crate::Protocol::HttpBinary, None); + let metrics = ResourceMetrics::default(); + + let result = client.build_metrics_export_body(&metrics).unwrap(); + let (_body, content_type, content_encoding) = result; + + assert_eq!(content_type, "application/x-protobuf"); + assert_eq!(content_encoding, None); + } + + #[cfg(all(feature = "metrics", feature = "http-json"))] + #[test] + fn test_build_metrics_export_body_json_protocol() { + use opentelemetry_sdk::metrics::data::ResourceMetrics; + + let client = create_test_client(crate::Protocol::HttpJson, None); + let metrics = ResourceMetrics::default(); + + let result = client.build_metrics_export_body(&metrics).unwrap(); + let (_body, content_type, content_encoding) = result; + + assert_eq!(content_type, "application/json"); + assert_eq!(content_encoding, None); + } + + #[cfg(all(feature = "metrics", feature = "gzip-http"))] + #[test] + fn test_build_metrics_export_body_with_compression() { + use opentelemetry_sdk::metrics::data::ResourceMetrics; + + let client = + create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip)); + let metrics = ResourceMetrics::default(); + + let result = client.build_metrics_export_body(&metrics).unwrap(); + let (_body, content_type, content_encoding) = result; + + assert_eq!(content_type, "application/x-protobuf"); + assert_eq!(content_encoding, Some("gzip")); + } + + #[cfg(all(feature = "metrics", not(feature = "gzip-http")))] + #[test] + fn test_build_metrics_export_body_compression_error_returns_none() { + use opentelemetry_sdk::metrics::data::ResourceMetrics; + + let client = + create_test_client(crate::Protocol::HttpBinary, Some(crate::Compression::Gzip)); + let metrics = ResourceMetrics::default(); + + // Should return None when compression fails (feature not enabled) + let result = client.build_metrics_export_body(&metrics); + assert!(result.is_none()); + } + + #[test] + fn test_resolve_compression_uses_generic_env_fallback() { + use super::super::HttpExporterBuilder; + use crate::exporter::tests::run_env_test; + + // Test that generic OTEL_EXPORTER_OTLP_COMPRESSION is used when signal-specific env var is not set + run_env_test( + vec![(crate::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip")], + || { + let builder = HttpExporterBuilder::default(); + let result = builder + .resolve_compression("NONEXISTENT_SIGNAL_COMPRESSION") + .unwrap(); + assert_eq!(result, Some(crate::Compression::Gzip)); + }, + ); + } + + #[cfg(all(feature = "trace", not(feature = "gzip-http")))] + #[test] + fn test_build_span_exporter_with_gzip_without_feature() { + use super::super::HttpExporterBuilder; + use crate::{ExporterBuildError, WithHttpConfig}; + + let builder = HttpExporterBuilder::default().with_compression(crate::Compression::Gzip); + + let result = builder.build_span_exporter(); + // This test will fail until the issue is fixed: compression validation should happen at build time + assert!(matches!( + result, + Err(ExporterBuildError::UnsupportedCompressionAlgorithm(_)) + )); + } + + #[cfg(all(feature = "trace", not(feature = "zstd-http")))] + #[test] + fn test_build_span_exporter_with_zstd_without_feature() { + use super::super::HttpExporterBuilder; + use crate::{ExporterBuildError, WithHttpConfig}; + + let builder = HttpExporterBuilder::default().with_compression(crate::Compression::Zstd); + + let result = builder.build_span_exporter(); + // This test will fail until the issue is fixed: compression validation should happen at build time + assert!(matches!( + result, + Err(ExporterBuildError::UnsupportedCompressionAlgorithm(_)) + )); + } + } } diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index 93573d31a4..28bc7fc5fb 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -22,17 +22,21 @@ impl SpanExporter for OtlpHttpClient { Err(err) => return Err(err), }; - let (body, content_type) = match self.build_trace_export_body(batch) { - Ok(body) => body, + let (body, content_type, content_encoding) = match self.build_trace_export_body(batch) { + Ok(result) => result, Err(e) => return Err(OTelSdkError::InternalFailure(e.to_string())), }; - let mut request = match http::Request::builder() + let mut request_builder = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) - .header(CONTENT_TYPE, content_type) - .body(body.into()) - { + .header(CONTENT_TYPE, content_type); + + if let Some(encoding) = content_encoding { + request_builder = request_builder.header("Content-Encoding", encoding); + } + + let mut request = match request_builder.body(body.into()) { Ok(req) => req, Err(e) => return Err(OTelSdkError::InternalFailure(e.to_string())), }; diff --git a/opentelemetry-otlp/src/exporter/mod.rs b/opentelemetry-otlp/src/exporter/mod.rs index 51af6f37a5..97b4973e07 100644 --- a/opentelemetry-otlp/src/exporter/mod.rs +++ b/opentelemetry-otlp/src/exporter/mod.rs @@ -169,6 +169,27 @@ impl FromStr for Compression { } } +/// Resolve compression from environment variables with priority: +/// 1. Provided config value +/// 2. Signal-specific environment variable +/// 3. Generic OTEL_EXPORTER_OTLP_COMPRESSION +/// 4. None (default) +#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] +fn resolve_compression_from_env( + config_compression: Option, + signal_env_var: &str, +) -> Result, ExporterBuildError> { + if let Some(compression) = config_compression { + Ok(Some(compression)) + } else if let Ok(compression) = std::env::var(signal_env_var) { + Ok(Some(compression.parse::()?)) + } else if let Ok(compression) = std::env::var(OTEL_EXPORTER_OTLP_COMPRESSION) { + Ok(Some(compression.parse::()?)) + } else { + Ok(None) + } +} + /// default protocol based on enabled features fn default_protocol() -> Protocol { match OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT { diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index d53a7516be..1fdbe41b51 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -14,10 +14,7 @@ use tonic::transport::ClientTlsConfig; use super::{default_headers, parse_header_string, OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT}; use super::{resolve_timeout, ExporterBuildError}; use crate::exporter::Compression; -use crate::{ - ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT, - OTEL_EXPORTER_OTLP_HEADERS, -}; +use crate::{ExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS}; #[cfg(feature = "logs")] pub(crate) mod logs; @@ -240,15 +237,9 @@ impl TonicExporterBuilder { &self, env_override: &str, ) -> Result, ExporterBuildError> { - if let Some(compression) = self.tonic_config.compression { - Ok(Some(compression.try_into()?)) - } else if let Ok(compression) = env::var(env_override) { - Ok(Some(compression.parse::()?.try_into()?)) - } else if let Ok(compression) = env::var(OTEL_EXPORTER_OTLP_COMPRESSION) { - Ok(Some(compression.parse::()?.try_into()?)) - } else { - Ok(None) - } + super::resolve_compression_from_env(self.tonic_config.compression, env_override)? + .map(|c| c.try_into()) + .transpose() } /// Build a new tonic log exporter @@ -522,7 +513,7 @@ mod tests { run_env_test( vec![ (crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, "zstd"), - (super::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"), + (crate::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"), ], || { let builder = TonicExporterBuilder::default(); @@ -541,7 +532,7 @@ mod tests { run_env_test( vec![ (crate::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, "gzip"), - (super::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"), + (crate::OTEL_EXPORTER_OTLP_COMPRESSION, "gzip"), ], || { let builder = TonicExporterBuilder::default().with_compression(Compression::Zstd); diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index 5f79aa99f4..5887926f29 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -261,6 +261,8 @@ //! The following feature flags offer additional configurations on http: //! //! * `http-proto`: Use http as transport layer, protobuf as body format. This feature is enabled by default. +//! * `gzip-http`: Use gzip compression for HTTP transport. +//! * `zstd-http`: Use zstd compression for HTTP transport. //! * `reqwest-blocking-client`: Use reqwest blocking http client. This feature is enabled by default. //! * `reqwest-client`: Use reqwest http client. //! * `reqwest-rustls`: Use reqwest with TLS with system trust roots via `rustls-native-certs` crate. @@ -281,7 +283,11 @@ //! use opentelemetry_sdk::{trace::{self, RandomIdGenerator, Sampler}, Resource}; //! # #[cfg(feature = "metrics")] //! use opentelemetry_sdk::metrics::Temporality; -//! use opentelemetry_otlp::{Protocol, WithExportConfig, WithTonicConfig}; +//! use opentelemetry_otlp::{Protocol, WithExportConfig, Compression}; +//! # #[cfg(feature = "grpc-tonic")] +//! use opentelemetry_otlp::WithTonicConfig; +//! # #[cfg(any(feature = "http-proto", feature = "http-json"))] +//! use opentelemetry_otlp::WithHttpConfig; //! use std::time::Duration; //! # #[cfg(feature = "grpc-tonic")] //! use tonic::metadata::*; @@ -314,6 +320,19 @@ //! # tracer //! # }; //! +//! // HTTP exporter example with compression +//! # #[cfg(all(feature = "trace", feature = "http-proto"))] +//! # let _http_tracer = { +//! let exporter = opentelemetry_otlp::SpanExporter::builder() +//! .with_http() +//! .with_endpoint("http://localhost:4318/v1/traces") +//! .with_timeout(Duration::from_secs(3)) +//! .with_protocol(Protocol::HttpBinary) +//! .with_compression(Compression::Gzip) // Requires gzip-http feature +//! .build()?; +//! # exporter +//! # }; +//! //! # #[cfg(all(feature = "metrics", feature = "grpc-tonic"))] //! # { //! let exporter = opentelemetry_otlp::MetricExporter::builder() @@ -330,6 +349,19 @@ //! .build(); //! # } //! +//! // HTTP metrics exporter example with compression +//! # #[cfg(all(feature = "metrics", feature = "http-proto"))] +//! # { +//! let exporter = opentelemetry_otlp::MetricExporter::builder() +//! .with_http() +//! .with_endpoint("http://localhost:4318/v1/metrics") +//! .with_protocol(Protocol::HttpBinary) +//! .with_timeout(Duration::from_secs(3)) +//! .with_compression(Compression::Zstd) // Requires zstd-http feature +//! .build() +//! .unwrap(); +//! # } +//! //! # #[cfg(all(feature = "trace", feature = "grpc-tonic"))] //! # { //! tracer.in_span("doing_work", |cx| {