Skip to content

Commit f8bc78e

Browse files
test(opentelemetry sink): add gRPC integration and e2e tests
1 parent 7f8d9db commit f8bc78e

File tree

9 files changed

+129
-2
lines changed

9 files changed

+129
-2
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1030,7 +1030,7 @@ mongodb_metrics-integration-tests = ["sources-mongodb_metrics"]
10301030
mqtt-integration-tests = ["sinks-mqtt", "sources-mqtt"]
10311031
nats-integration-tests = ["sinks-nats", "sources-nats"]
10321032
nginx-integration-tests = ["sources-nginx_metrics"]
1033-
opentelemetry-integration-tests = ["sources-opentelemetry", "dep:prost"]
1033+
opentelemetry-integration-tests = ["sources-opentelemetry", "sinks-opentelemetry", "dep:prost"]
10341034
postgresql_metrics-integration-tests = ["sources-postgresql_metrics"]
10351035
postgres_sink-integration-tests = ["sinks-postgres"]
10361036
prometheus-integration-tests = ["sinks-prometheus", "sources-prometheus", "sinks-influxdb"]
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use futures::stream;
2+
use prost::Message as _;
3+
use vector_lib::{
4+
codecs::decoding::format::{Deserializer as _, OtlpDeserializer},
5+
config::LogNamespace,
6+
opentelemetry::proto::{
7+
collector::logs::v1::ExportLogsServiceRequest,
8+
logs::v1::{LogRecord, ResourceLogs, ScopeLogs},
9+
resource::v1::Resource,
10+
},
11+
};
12+
13+
use super::grpc::GrpcSinkConfig;
14+
use crate::{
15+
config::SinkContext,
16+
test_util::{
17+
components::{HTTP_SINK_TAGS, run_and_assert_sink_compliance},
18+
wait_for_tcp,
19+
},
20+
};
21+
22+
fn sink_grpc_address() -> String {
23+
std::env::var("OTEL_GRPC_SINK_ADDRESS")
24+
.unwrap_or_else(|_| "opentelemetry-collector:4317".to_owned())
25+
}
26+
27+
fn otlp_log_event() -> vector_lib::event::Event {
28+
let req = ExportLogsServiceRequest {
29+
resource_logs: vec![ResourceLogs {
30+
resource: Some(Resource {
31+
attributes: vec![],
32+
dropped_attributes_count: 0,
33+
}),
34+
scope_logs: vec![ScopeLogs {
35+
scope: None,
36+
log_records: vec![LogRecord {
37+
severity_text: "INFO".to_string(),
38+
body: Some(vector_lib::opentelemetry::proto::common::v1::AnyValue {
39+
value: Some(
40+
vector_lib::opentelemetry::proto::common::v1::any_value::Value::StringValue(
41+
"integration test log message".to_string(),
42+
),
43+
),
44+
}),
45+
..Default::default()
46+
}],
47+
schema_url: String::new(),
48+
}],
49+
schema_url: String::new(),
50+
}],
51+
};
52+
53+
let bytes = bytes::Bytes::from(req.encode_to_vec());
54+
let mut events = OtlpDeserializer::default()
55+
.parse(bytes, LogNamespace::Legacy)
56+
.expect("failed to deserialize OTLP log event");
57+
events.remove(0)
58+
}
59+
60+
#[tokio::test]
61+
async fn delivers_logs_via_grpc() {
62+
let address = sink_grpc_address();
63+
wait_for_tcp(&address).await;
64+
65+
let config: GrpcSinkConfig = toml::from_str(&format!(
66+
r#"
67+
endpoint = "http://{address}"
68+
"#
69+
))
70+
.unwrap();
71+
72+
let (sink, _healthcheck) = config.build(SinkContext::default()).await.unwrap();
73+
74+
let events = vec![otlp_log_event()];
75+
run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
76+
}

src/sinks/opentelemetry/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
mod grpc;
22

3+
#[cfg(all(test, feature = "opentelemetry-integration-tests"))]
4+
mod integration_tests;
5+
36
use indoc::indoc;
47
use vector_config::component::GenerateConfig;
58
use vector_lib::{

tests/e2e/opentelemetry-logs/config/compose.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ services:
6060
target: /output
6161
ports:
6262
- "${OTEL_COLLECTOR_SINK_HTTP_PORT:-5318}:5318"
63+
- "${OTEL_COLLECTOR_SINK_GRPC_PORT:-14317}:4317"
6364

6465
vector:
6566
container_name: vector-otel-logs-e2e

tests/e2e/opentelemetry-logs/config/test.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ runner:
1111
OTEL_COLLECTOR_SOURCE_GRPC_PORT: '4317'
1212
OTEL_COLLECTOR_SOURCE_HTTP_PORT: '4318'
1313
OTEL_COLLECTOR_SINK_HTTP_PORT: '5318'
14+
OTEL_COLLECTOR_SINK_GRPC_PORT: '14317'
1415

1516
matrix:
1617
# Determines which `otel/opentelemetry-collector-contrib` version to use
1718
collector_version: [ 'latest' ]
18-
vector_config: [ 'vector_default.yaml', 'vector_otlp.yaml' ]
19+
vector_config: [ 'vector_default.yaml', 'vector_otlp.yaml', 'vector_grpc.yaml' ]
1920

2021
# Only trigger this integration test if relevant OTEL source/sink files change
2122
paths:

tests/e2e/opentelemetry-logs/data/collector-sink.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ receivers:
33
protocols:
44
http:
55
endpoint: "0.0.0.0:5318"
6+
grpc:
7+
endpoint: "0.0.0.0:4317"
68

79
processors:
810
batch: { }
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
sources:
2+
source0:
3+
type: opentelemetry
4+
grpc:
5+
address: 0.0.0.0:4317
6+
http:
7+
address: 0.0.0.0:4318
8+
keepalive:
9+
max_connection_age_jitter_factor: 0.1
10+
max_connection_age_secs: 300
11+
use_otlp_decoding: true
12+
13+
internal_metrics:
14+
type: internal_metrics
15+
scrape_interval_secs: 60
16+
17+
sinks:
18+
otel_sink:
19+
inputs:
20+
- source0.logs
21+
type: opentelemetry
22+
protocol:
23+
type: grpc
24+
endpoint: http://otel-collector-sink:4317
25+
26+
otel_file_sink:
27+
type: file
28+
path: "/output/opentelemetry-logs/vector-file-sink.log"
29+
inputs:
30+
- source0.logs
31+
encoding:
32+
codec: json
33+
34+
metrics_file_sink:
35+
type: file
36+
path: "/output/opentelemetry-logs/vector-internal-metrics-sink.log"
37+
inputs:
38+
- internal_metrics
39+
encoding:
40+
codec: json

tests/integration/opentelemetry/config/test.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ runner:
77
env:
88
OTEL_HEALTH_URL: http://opentelemetry-collector:13133
99
OTEL_OTLPHTTP_URL: http://opentelemetry-collector:9876
10+
OTEL_GRPC_SINK_ADDRESS: opentelemetry-collector:4317
1011

1112
matrix:
1213
version: [0.56.0]
@@ -15,5 +16,6 @@ matrix:
1516
# expressions are evaluated using https://github.com/micromatch/picomatch
1617
paths:
1718
- "src/sources/opentelemetry/**"
19+
- "src/sinks/opentelemetry/**"
1820
- "src/sources/util/**"
1921
- "tests/integration/opentelemetry/**"

tests/integration/opentelemetry/data/config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ receivers:
33
protocols:
44
http:
55
endpoint: 0.0.0.0:9876
6+
grpc:
7+
endpoint: 0.0.0.0:4317
68

79
exporters:
810
otlp:

0 commit comments

Comments
 (0)