Skip to content

Commit 3ec851a

Browse files
committed
feat: Implement gzip compression
1 parent 3c674db commit 3ec851a

File tree

4 files changed

+84
-31
lines changed

4 files changed

+84
-31
lines changed

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,20 @@ impl LogExporter for OtlpHttpClient {
1414
.clone()
1515
.ok_or(OTelSdkError::AlreadyShutdown)?;
1616

17-
let (body, content_type) = self
17+
let (body, content_type, content_encoding) = self
1818
.build_logs_export_body(batch)
1919
.map_err(OTelSdkError::InternalFailure)?;
2020

21-
let mut request = http::Request::builder()
21+
let mut request_builder = http::Request::builder()
2222
.method(Method::POST)
2323
.uri(&self.collector_endpoint)
24-
.header(CONTENT_TYPE, content_type)
24+
.header(CONTENT_TYPE, content_type);
25+
26+
if let Some(encoding) = content_encoding {
27+
request_builder = request_builder.header("Content-Encoding", encoding);
28+
}
29+
30+
let mut request = request_builder
2531
.body(body.into())
2632
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
2733

opentelemetry-otlp/src/exporter/http/metrics.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,20 @@ impl MetricsClient for OtlpHttpClient {
1919
_ => Err(OTelSdkError::AlreadyShutdown),
2020
})?;
2121

22-
let (body, content_type) = self.build_metrics_export_body(metrics).ok_or_else(|| {
22+
let (body, content_type, content_encoding) = self.build_metrics_export_body(metrics).ok_or_else(|| {
2323
OTelSdkError::InternalFailure("Failed to serialize metrics".to_string())
2424
})?;
25-
let mut request = http::Request::builder()
25+
26+
let mut request_builder = http::Request::builder()
2627
.method(Method::POST)
2728
.uri(&self.collector_endpoint)
28-
.header(CONTENT_TYPE, content_type)
29+
.header(CONTENT_TYPE, content_type);
30+
31+
if let Some(encoding) = content_encoding {
32+
request_builder = request_builder.header("Content-Encoding", encoding);
33+
}
34+
35+
let mut request = request_builder
2936
.body(body.into())
3037
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
3138

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use super::{
44
};
55
use crate::{ExportConfig, Protocol, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS};
66
use http::{HeaderName, HeaderValue, Uri};
7-
#[cfg(feature = "http-json")]
87
use opentelemetry::otel_debug;
98
use opentelemetry_http::HttpClient;
109
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
@@ -278,14 +277,37 @@ pub(crate) struct OtlpHttpClient {
278277
headers: HashMap<HeaderName, HeaderValue>,
279278
protocol: Protocol,
280279
_timeout: Duration,
281-
#[allow(dead_code)] // TODO: Remove when compression implementation is added
282280
compression: Option<crate::Compression>,
283281
#[allow(dead_code)]
284282
// <allow dead> would be removed once we support set_resource for metrics and traces.
285283
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
286284
}
287285

288286
impl OtlpHttpClient {
287+
/// Compress data using gzip if compression is enabled and the feature is available
288+
fn compress_body(&self, body: Vec<u8>) -> Result<(Vec<u8>, Option<&'static str>), String> {
289+
match self.compression {
290+
#[cfg(feature = "gzip-http")]
291+
Some(crate::Compression::Gzip) => {
292+
use std::io::Write;
293+
use flate2::{write::GzEncoder, Compression};
294+
295+
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
296+
encoder.write_all(&body).map_err(|e| e.to_string())?;
297+
let compressed = encoder.finish().map_err(|e| e.to_string())?;
298+
Ok((compressed, Some("gzip")))
299+
}
300+
#[cfg(not(feature = "gzip-http"))]
301+
Some(crate::Compression::Gzip) => {
302+
Err("gzip compression requested but gzip-http feature not enabled".to_string())
303+
}
304+
Some(crate::Compression::Zstd) => {
305+
Err("zstd compression not implemented yet".to_string())
306+
}
307+
None => Ok((body, None)),
308+
}
309+
}
310+
289311
#[allow(clippy::mutable_key_type)] // http headers are not mutated
290312
fn new(
291313
client: Arc<dyn HttpClient>,
@@ -310,59 +332,73 @@ impl OtlpHttpClient {
310332
fn build_trace_export_body(
311333
&self,
312334
spans: Vec<SpanData>,
313-
) -> Result<(Vec<u8>, &'static str), String> {
335+
) -> Result<(Vec<u8>, &'static str, Option<&'static str>), String> {
314336
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
315337
let resource_spans = group_spans_by_resource_and_scope(spans, &self.resource);
316338

317339
let req = ExportTraceServiceRequest { resource_spans };
318-
match self.protocol {
340+
let (body, content_type) = match self.protocol {
319341
#[cfg(feature = "http-json")]
320342
Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
321-
Ok(json) => Ok((json.into_bytes(), "application/json")),
322-
Err(e) => Err(e.to_string()),
343+
Ok(json) => (json.into_bytes(), "application/json"),
344+
Err(e) => return Err(e.to_string()),
323345
},
324-
_ => Ok((req.encode_to_vec(), "application/x-protobuf")),
325-
}
346+
_ => (req.encode_to_vec(), "application/x-protobuf"),
347+
};
348+
349+
let (compressed_body, content_encoding) = self.compress_body(body)?;
350+
Ok((compressed_body, content_type, content_encoding))
326351
}
327352

328353
#[cfg(feature = "logs")]
329354
fn build_logs_export_body(
330355
&self,
331356
logs: LogBatch<'_>,
332-
) -> Result<(Vec<u8>, &'static str), String> {
357+
) -> Result<(Vec<u8>, &'static str, Option<&'static str>), String> {
333358
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
334359
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
335360
let req = ExportLogsServiceRequest { resource_logs };
336361

337-
match self.protocol {
362+
let (body, content_type) = match self.protocol {
338363
#[cfg(feature = "http-json")]
339364
Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
340-
Ok(json) => Ok((json.into(), "application/json")),
341-
Err(e) => Err(e.to_string()),
365+
Ok(json) => (json.into_bytes(), "application/json"),
366+
Err(e) => return Err(e.to_string()),
342367
},
343-
_ => Ok((req.encode_to_vec(), "application/x-protobuf")),
344-
}
368+
_ => (req.encode_to_vec(), "application/x-protobuf"),
369+
};
370+
371+
let (compressed_body, content_encoding) = self.compress_body(body)?;
372+
Ok((compressed_body, content_type, content_encoding))
345373
}
346374

347375
#[cfg(feature = "metrics")]
348376
fn build_metrics_export_body(
349377
&self,
350378
metrics: &ResourceMetrics,
351-
) -> Option<(Vec<u8>, &'static str)> {
379+
) -> Option<(Vec<u8>, &'static str, Option<&'static str>)> {
352380
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
353381

354382
let req: ExportMetricsServiceRequest = metrics.into();
355383

356-
match self.protocol {
384+
let (body, content_type) = match self.protocol {
357385
#[cfg(feature = "http-json")]
358386
Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
359-
Ok(json) => Some((json.into(), "application/json")),
387+
Ok(json) => (json.into_bytes(), "application/json"),
360388
Err(e) => {
361389
otel_debug!(name: "JsonSerializationFaied", error = e.to_string());
362-
None
390+
return None;
363391
}
364392
},
365-
_ => Some((req.encode_to_vec(), "application/x-protobuf")),
393+
_ => (req.encode_to_vec(), "application/x-protobuf"),
394+
};
395+
396+
match self.compress_body(body) {
397+
Ok((compressed_body, content_encoding)) => Some((compressed_body, content_type, content_encoding)),
398+
Err(e) => {
399+
otel_debug!(name: "CompressionFailed", error = e);
400+
None
401+
}
366402
}
367403
}
368404
}

opentelemetry-otlp/src/exporter/http/trace.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,21 @@ impl SpanExporter for OtlpHttpClient {
2222
Err(err) => return Err(err),
2323
};
2424

25-
let (body, content_type) = match self.build_trace_export_body(batch) {
26-
Ok(body) => body,
25+
let (body, content_type, content_encoding) = match self.build_trace_export_body(batch) {
26+
Ok(result) => result,
2727
Err(e) => return Err(OTelSdkError::InternalFailure(e.to_string())),
2828
};
2929

30-
let mut request = match http::Request::builder()
30+
let mut request_builder = http::Request::builder()
3131
.method(Method::POST)
3232
.uri(&self.collector_endpoint)
33-
.header(CONTENT_TYPE, content_type)
34-
.body(body.into())
35-
{
33+
.header(CONTENT_TYPE, content_type);
34+
35+
if let Some(encoding) = content_encoding {
36+
request_builder = request_builder.header("Content-Encoding", encoding);
37+
}
38+
39+
let mut request = match request_builder.body(body.into()) {
3640
Ok(req) => req,
3741
Err(e) => return Err(OTelSdkError::InternalFailure(e.to_string())),
3842
};

0 commit comments

Comments
 (0)