Skip to content

Commit 3749b70

Browse files
fix(sources): collect headers for logs in opentelemetry source with use_otlp_decoding set to true (#24307)
* fix(opetelemetry source): collect headers for logs, metrics, traces with/without use_otlp_decoding * chore: add changelog * chore: renamed changelog * chore: removed adding headers to metrics and traces * chore: Test coverage * cargo fmt * Add newline to changelog * chore: Updated changelog to say that it is for logs only * chore: fixed clippy warning --------- Co-authored-by: Thomas <[email protected]>
1 parent 36a935f commit 3749b70

File tree

3 files changed

+100
-36
lines changed

3 files changed

+100
-36
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed the opentelemetry source to collect HTTP headers for logs with or without `use_otlp_decoding` configuration option.
2+
3+
authors: ozanichkovsky

src/sources/opentelemetry/http.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,13 +229,12 @@ fn build_warp_log_filter(
229229
if let Some(d) = deserializer.as_ref() {
230230
parse_with_deserializer(d, decoded_body, log_namespace)
231231
} else {
232-
decode_log_body(decoded_body, log_namespace, &events_received).map(
233-
|mut events| {
234-
enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
235-
events
236-
},
237-
)
232+
decode_log_body(decoded_body, log_namespace, &events_received)
238233
}
234+
.map(|mut events| {
235+
enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
236+
events
237+
})
239238
})
240239
};
241240

src/sources/opentelemetry/tests.rs

Lines changed: 92 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
net,
23
sync::Arc,
34
time::{SystemTime, UNIX_EPOCH},
45
};
@@ -19,9 +20,7 @@ use vector_lib::{
1920
ExportMetricsServiceRequest, metrics_service_client::MetricsServiceClient,
2021
},
2122
},
22-
common::v1::{
23-
AnyValue, InstrumentationScope, KeyValue, any_value, any_value::Value::StringValue,
24-
},
23+
common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value::Value::StringValue},
2524
logs::v1::{LogRecord, ResourceLogs, ScopeLogs},
2625
metrics::v1::{
2726
AggregationTemporality, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge,
@@ -33,7 +32,6 @@ use vector_lib::{
3332
},
3433
};
3534
use vrl::value;
36-
use warp::http::HeaderMap;
3735

3836
use crate::{
3937
SourceSender,
@@ -1067,36 +1065,39 @@ async fn receive_summary_metric() {
10671065
.await;
10681066
}
10691067

1068+
fn get_source_config_with_headers(
1069+
grpc_addr: net::SocketAddr,
1070+
http_addr: net::SocketAddr,
1071+
use_otlp_decoding: bool,
1072+
) -> OpentelemetryConfig {
1073+
OpentelemetryConfig {
1074+
grpc: GrpcConfig {
1075+
address: grpc_addr,
1076+
tls: Default::default(),
1077+
},
1078+
http: HttpConfig {
1079+
address: http_addr,
1080+
tls: Default::default(),
1081+
keepalive: Default::default(),
1082+
headers: vec![
1083+
"User-Agent".to_string(),
1084+
"X-*".to_string(),
1085+
"AbsentHeader".to_string(),
1086+
],
1087+
},
1088+
acknowledgements: Default::default(),
1089+
log_namespace: Default::default(),
1090+
use_otlp_decoding,
1091+
}
1092+
}
1093+
10701094
#[tokio::test]
1071-
async fn http_headers() {
1095+
async fn http_headers_logs_use_otlp_decoding_false() {
10721096
assert_source_compliance(&SOURCE_TAGS, async {
10731097
let (_guard_0, grpc_addr) = next_addr();
10741098
let (_guard_1, http_addr) = next_addr();
10751099

1076-
let mut headers = HeaderMap::new();
1077-
headers.insert("User-Agent", "test_client".parse().unwrap());
1078-
headers.insert("Upgrade-Insecure-Requests", "false".parse().unwrap());
1079-
headers.insert("X-Test-Header", "true".parse().unwrap());
1080-
1081-
let source = OpentelemetryConfig {
1082-
grpc: GrpcConfig {
1083-
address: grpc_addr,
1084-
tls: Default::default(),
1085-
},
1086-
http: HttpConfig {
1087-
address: http_addr,
1088-
tls: Default::default(),
1089-
keepalive: Default::default(),
1090-
headers: vec![
1091-
"User-Agent".to_string(),
1092-
"X-*".to_string(),
1093-
"AbsentHeader".to_string(),
1094-
],
1095-
},
1096-
acknowledgements: Default::default(),
1097-
log_namespace: Default::default(),
1098-
use_otlp_decoding: false,
1099-
};
1100+
let source = get_source_config_with_headers(grpc_addr, http_addr, false);
11001101
let schema_definitions = source
11011102
.outputs(LogNamespace::Legacy)
11021103
.remove(0)
@@ -1122,7 +1123,7 @@ async fn http_headers() {
11221123
severity_number: 9,
11231124
severity_text: "info".into(),
11241125
body: Some(AnyValue {
1125-
value: Some(any_value::Value::StringValue("log body".into())),
1126+
value: Some(StringValue("log body".into())),
11261127
}),
11271128
attributes: vec![],
11281129
dropped_attributes_count: 0,
@@ -1175,6 +1176,67 @@ async fn http_headers() {
11751176
.await;
11761177
}
11771178

1179+
#[tokio::test]
1180+
async fn http_headers_logs_use_otlp_decoding_true() {
1181+
assert_source_compliance(&SOURCE_TAGS, async {
1182+
let (_guard_0, grpc_addr) = next_addr();
1183+
let (_guard_1, http_addr) = next_addr();
1184+
1185+
let source = get_source_config_with_headers(grpc_addr, http_addr, true);
1186+
1187+
let (sender, logs_output, _) = new_source(EventStatus::Delivered, LOGS.to_string());
1188+
let server = source
1189+
.build(SourceContext::new_test(sender, None))
1190+
.await
1191+
.unwrap();
1192+
tokio::spawn(server);
1193+
test_util::wait_for_tcp(http_addr).await;
1194+
1195+
let client = reqwest::Client::new();
1196+
let req = ExportLogsServiceRequest {
1197+
resource_logs: vec![ResourceLogs {
1198+
resource: None,
1199+
scope_logs: vec![ScopeLogs {
1200+
scope: None,
1201+
log_records: vec![LogRecord {
1202+
time_unix_nano: 1,
1203+
observed_time_unix_nano: 2,
1204+
severity_number: 9,
1205+
severity_text: "info".into(),
1206+
body: Some(AnyValue {
1207+
value: Some(StringValue("log body".into())),
1208+
}),
1209+
attributes: vec![],
1210+
dropped_attributes_count: 0,
1211+
flags: 4,
1212+
// opentelemetry sdk will hex::decode the given trace_id and span_id
1213+
trace_id: str_into_hex_bytes("4ac52aadf321c2e531db005df08792f5"),
1214+
span_id: str_into_hex_bytes("0b9e4bda2a55530d"),
1215+
}],
1216+
schema_url: "v1".into(),
1217+
}],
1218+
schema_url: "v1".into(),
1219+
}],
1220+
};
1221+
let _res = client
1222+
.post(format!("http://{http_addr}/v1/logs"))
1223+
.header("Content-Type", "application/x-protobuf")
1224+
.header("User-Agent", "Test")
1225+
.body(req.encode_to_vec())
1226+
.send()
1227+
.await
1228+
.expect("Failed to send log to Opentelemetry Collector.");
1229+
1230+
let mut output = test_util::collect_ready(logs_output).await;
1231+
assert_eq!(output.len(), 1);
1232+
let actual_event = output.pop().unwrap();
1233+
let log = actual_event.as_log();
1234+
assert_eq!(log["AbsentHeader"], Value::Null);
1235+
assert_eq!(log["User-Agent"], "Test".into());
1236+
})
1237+
.await;
1238+
}
1239+
11781240
pub struct OTelTestEnv {
11791241
pub grpc_addr: String,
11801242
pub config: OpentelemetryConfig,

0 commit comments

Comments
 (0)