Skip to content

Commit 52a1c65

Browse files
authored
fix(opentelemetry source): fix HTTP not decompressing payloads (#24068)
* fix(opentelemetry source): fix HTTP not decompressing payloads * update test modules * changelog * handle and emit bad request errors * avoid double counting compression type error * revert renaming operation * clippy fix
1 parent f3d2608 commit 52a1c65

File tree

5 files changed

+99
-59
lines changed

5 files changed

+99
-59
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Fixed a `opentelemetry` source bug where HTTP payloads were not decompressed according to the request headers.
2+
This only applied when `use_otlp_decoding` (recently added) was set to `true`.
3+
4+
authors: pront

src/sources/opentelemetry/http.rs

Lines changed: 81 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::{
3434
common::http::ErrorMessage,
3535
event::Event,
3636
http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
37-
internal_events::{EventsReceived, StreamClosedError},
37+
internal_events::{EventsReceived, HttpBadRequest, StreamClosedError},
3838
shutdown::ShutdownSignal,
3939
sources::{
4040
http_server::HttpConfigParamKind,
@@ -144,6 +144,26 @@ fn enrich_events(
144144
);
145145
}
146146

147+
fn emit_decode_error(error: impl std::fmt::Display) -> ErrorMessage {
148+
let message = format!("Could not decode request: {error}");
149+
emit!(HttpBadRequest::new(
150+
StatusCode::BAD_REQUEST.as_u16(),
151+
&message
152+
));
153+
ErrorMessage::new(StatusCode::BAD_REQUEST, message)
154+
}
155+
156+
fn parse_with_deserializer(
157+
deserializer: &OtlpDeserializer,
158+
body: Bytes,
159+
log_namespace: LogNamespace,
160+
) -> Result<Vec<Event>, ErrorMessage> {
161+
deserializer
162+
.parse(body, log_namespace)
163+
.map(|r| r.into_vec())
164+
.map_err(emit_decode_error)
165+
}
166+
147167
fn build_ingest_filter<Resp, F>(
148168
telemetry_type: &'static str,
149169
acknowledgements: bool,
@@ -194,21 +214,29 @@ fn build_warp_log_filter(
194214
deserializer: Option<OtlpDeserializer>,
195215
) -> BoxedFilter<(Response,)> {
196216
let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
197-
if let Some(d) = deserializer.as_ref() {
198-
d.parse(body, log_namespace)
199-
.map(|r| r.into_vec())
200-
.map_err(|e| ErrorMessage::new(StatusCode::BAD_REQUEST, e.to_string()))
201-
} else {
202-
decode(encoding_header.as_deref(), body)
203-
.and_then(|body| {
204-
bytes_received.emit(ByteSize(body.len()));
205-
decode_log_body(body, log_namespace, &events_received)
206-
})
207-
.map(|mut events| {
208-
enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
209-
events
210-
})
211-
}
217+
decode(encoding_header.as_deref(), body)
218+
.inspect_err(|err| {
219+
// Other status codes are already handled by `sources::util::decode` (tech debt).
220+
if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
221+
emit!(HttpBadRequest::new(
222+
err.status_code().as_u16(),
223+
err.message()
224+
));
225+
}
226+
})
227+
.and_then(|decoded_body| {
228+
bytes_received.emit(ByteSize(decoded_body.len()));
229+
if let Some(d) = deserializer.as_ref() {
230+
parse_with_deserializer(d, decoded_body, log_namespace)
231+
} else {
232+
decode_log_body(decoded_body, log_namespace, &events_received).map(
233+
|mut events| {
234+
enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
235+
events
236+
},
237+
)
238+
}
239+
})
212240
};
213241

214242
build_ingest_filter::<ExportLogsServiceResponse, _>(
@@ -226,16 +254,24 @@ fn build_warp_metrics_filter(
226254
deserializer: Option<OtlpDeserializer>,
227255
) -> BoxedFilter<(Response,)> {
228256
let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
229-
if let Some(d) = deserializer.as_ref() {
230-
d.parse(body, LogNamespace::default())
231-
.map(|r| r.into_vec())
232-
.map_err(|e| ErrorMessage::new(StatusCode::BAD_REQUEST, e.to_string()))
233-
} else {
234-
decode(encoding_header.as_deref(), body).and_then(|body| {
235-
bytes_received.emit(ByteSize(body.len()));
236-
decode_metrics_body(body, &events_received)
257+
decode(encoding_header.as_deref(), body)
258+
.inspect_err(|err| {
259+
// Other status codes are already handled by `sources::util::decode` (tech debt).
260+
if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
261+
emit!(HttpBadRequest::new(
262+
err.status_code().as_u16(),
263+
err.message()
264+
));
265+
}
266+
})
267+
.and_then(|decoded_body| {
268+
bytes_received.emit(ByteSize(decoded_body.len()));
269+
if let Some(d) = deserializer.as_ref() {
270+
parse_with_deserializer(d, decoded_body, LogNamespace::default())
271+
} else {
272+
decode_metrics_body(decoded_body, &events_received)
273+
}
237274
})
238-
}
239275
};
240276

241277
build_ingest_filter::<ExportMetricsServiceResponse, _>(
@@ -254,16 +290,24 @@ fn build_warp_trace_filter(
254290
deserializer: Option<OtlpDeserializer>,
255291
) -> BoxedFilter<(Response,)> {
256292
let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
257-
if let Some(d) = deserializer.as_ref() {
258-
d.parse(body, LogNamespace::default())
259-
.map(|r| r.into_vec())
260-
.map_err(|e| ErrorMessage::new(StatusCode::BAD_REQUEST, e.to_string()))
261-
} else {
262-
decode(encoding_header.as_deref(), body).and_then(|body| {
263-
bytes_received.emit(ByteSize(body.len()));
264-
decode_trace_body(body, &events_received)
293+
decode(encoding_header.as_deref(), body)
294+
.inspect_err(|err| {
295+
// Other status codes are already handled by `sources::util::decode` (tech debt).
296+
if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
297+
emit!(HttpBadRequest::new(
298+
err.status_code().as_u16(),
299+
err.message()
300+
));
301+
}
302+
})
303+
.and_then(|decoded_body| {
304+
bytes_received.emit(ByteSize(decoded_body.len()));
305+
if let Some(d) = deserializer.as_ref() {
306+
parse_with_deserializer(d, decoded_body, LogNamespace::default())
307+
} else {
308+
decode_trace_body(decoded_body, &events_received)
309+
}
265310
})
266-
}
267311
};
268312

269313
build_ingest_filter::<ExportTraceServiceResponse, _>(
@@ -278,12 +322,7 @@ fn decode_trace_body(
278322
body: Bytes,
279323
events_received: &Registered<EventsReceived>,
280324
) -> Result<Vec<Event>, ErrorMessage> {
281-
let request = ExportTraceServiceRequest::decode(body).map_err(|error| {
282-
ErrorMessage::new(
283-
StatusCode::BAD_REQUEST,
284-
format!("Could not decode request: {error}"),
285-
)
286-
})?;
325+
let request = ExportTraceServiceRequest::decode(body).map_err(emit_decode_error)?;
287326

288327
let events: Vec<Event> = request
289328
.resource_spans
@@ -304,12 +343,7 @@ fn decode_log_body(
304343
log_namespace: LogNamespace,
305344
events_received: &Registered<EventsReceived>,
306345
) -> Result<Vec<Event>, ErrorMessage> {
307-
let request = ExportLogsServiceRequest::decode(body).map_err(|error| {
308-
ErrorMessage::new(
309-
StatusCode::BAD_REQUEST,
310-
format!("Could not decode request: {error}"),
311-
)
312-
})?;
346+
let request = ExportLogsServiceRequest::decode(body).map_err(emit_decode_error)?;
313347

314348
let events: Vec<Event> = request
315349
.resource_logs
@@ -329,12 +363,7 @@ fn decode_metrics_body(
329363
body: Bytes,
330364
events_received: &Registered<EventsReceived>,
331365
) -> Result<Vec<Event>, ErrorMessage> {
332-
let request = ExportMetricsServiceRequest::decode(body).map_err(|error| {
333-
ErrorMessage::new(
334-
StatusCode::BAD_REQUEST,
335-
format!("Could not decode request: {error}"),
336-
)
337-
})?;
366+
let request = ExportMetricsServiceRequest::decode(body).map_err(emit_decode_error)?;
338367

339368
let events: Vec<Event> = request
340369
.resource_metrics

src/sources/util/http/encoding.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ use warp::http::StatusCode;
77

88
use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError};
99

10+
/// Decompresses the body based on the Content-Encoding header.
11+
///
12+
/// Supports gzip, deflate, snappy, zstd, and identity (no compression).
1013
pub fn decode(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
1114
if let Some(encodings) = header {
1215
for encoding in encodings.rsplit(',').map(str::trim) {

tests/data/e2e/opentelemetry/logs/collector-source.yaml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,22 @@ processors:
99
batch: { }
1010

1111
exporters:
12-
otlp:
12+
otlp/grpc:
1313
endpoint: vector:4317
1414
tls:
1515
insecure: true
16+
otlphttp/vector:
17+
endpoint: http://vector:4318
18+
tls:
19+
insecure: true
1620
debug: { }
1721

1822
service:
1923
pipelines:
2024
logs:
2125
receivers: [ otlp ]
2226
processors: [ batch ]
23-
exporters: [ debug, otlp ]
27+
exporters: [ debug, otlp/grpc, otlphttp/vector ]
2428

2529
metrics:
2630
receivers: [ otlp ]

tests/e2e/opentelemetry/logs/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use vector_lib::opentelemetry::proto::common::v1::any_value::Value as AnyValueEn
77
use vector_lib::opentelemetry::proto::{DESCRIPTOR_BYTES, LOGS_REQUEST_MESSAGE_TYPE};
88
use vrl::value::Value as VrlValue;
99

10-
const EXPECTED_LOG_COUNT: usize = 100;
10+
const EXPECTED_LOG_COUNT: usize = 200; // 100 via gRPC + 100 via HTTP
1111

1212
fn read_file_helper(filename: &str) -> Result<String, io::Error> {
1313
let local_path = Path::new("/output/opentelemetry-logs").join(filename);
@@ -186,15 +186,14 @@ fn vector_sink_otel_sink_logs_match() {
186186
let vector_request = parse_export_logs_request(&vector_content)
187187
.expect("Failed to parse vector logs as ExportLogsServiceRequest");
188188

189-
// Count total log records in collector output
189+
// Count total log records
190190
let collector_log_count = collector_request
191191
.resource_logs
192192
.iter()
193193
.flat_map(|rl| &rl.scope_logs)
194194
.flat_map(|sl| &sl.log_records)
195195
.count();
196196

197-
// Count total log records in vector output
198197
let vector_log_count = vector_request
199198
.resource_logs
200199
.iter()
@@ -220,9 +219,10 @@ fn vector_sink_otel_sink_logs_match() {
220219
assert_log_records_static_fields(&collector_request);
221220
assert_log_records_static_fields(&vector_request);
222221

223-
// Compare the requests - this compares the full protobuf structure
222+
// Both collector and Vector receive 200 logs total (100 via gRPC + 100 via HTTP).
223+
// Compare them directly to verify the entire pipeline works correctly.
224224
assert_eq!(
225225
collector_request, vector_request,
226-
"ExportLogsServiceRequest mismatch between collector and vector"
226+
"Collector and Vector log requests should match"
227227
);
228228
}

0 commit comments

Comments
 (0)