Skip to content

Commit 8891353

Browse files
fix(opentelemetry sink): align Protocol enum field types to fix schema conflicts
1 parent a0c8e35 commit 8891353

File tree

1 file changed

+33
-17
lines changed

1 file changed

+33
-17
lines changed

src/sinks/opentelemetry/grpc.rs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@ use crate::{
4343
sinks::{
4444
Healthcheck, VectorSink,
4545
util::{
46-
BatchConfig, RealtimeEventBasedDefaultBatchSettings, ServiceBuilderExt, SinkBuilderExt,
47-
StreamSink, TowerRequestConfig, metadata::RequestMetadataBuilder, retries::RetryLogic,
46+
BatchConfig, Compression, RealtimeEventBasedDefaultBatchSettings, ServiceBuilderExt,
47+
SinkBuilderExt, StreamSink, http::RequestConfig, metadata::RequestMetadataBuilder,
48+
retries::RetryLogic,
4849
},
4950
},
50-
tls::{MaybeTlsSettings, TlsEnableableConfig},
51+
tls::{MaybeTlsSettings, TlsConfig},
5152
};
5253

5354
pub(super) fn with_default_scheme(address: &str, tls: bool) -> crate::Result<Uri> {
@@ -96,24 +97,24 @@ pub struct GrpcSinkConfig {
9697
#[configurable(metadata(docs::examples = "http://localhost:4317"))]
9798
pub endpoint: String,
9899

99-
/// Whether to compress outgoing requests with gzip.
100+
/// Compression codec for outgoing gRPC requests.
100101
///
101-
/// Defaults to `false`.
102+
/// Only `none` and `gzip` are supported for gRPC transport.
103+
#[configurable(derived)]
102104
#[serde(default)]
103-
#[configurable(metadata(docs::advanced))]
104-
pub compression: bool,
105+
pub compression: Compression,
105106

106107
#[configurable(derived)]
107108
#[serde(default)]
108109
pub batch: BatchConfig<RealtimeEventBasedDefaultBatchSettings>,
109110

110111
#[configurable(derived)]
111112
#[serde(default)]
112-
pub request: TowerRequestConfig,
113+
pub request: RequestConfig,
113114

114115
#[configurable(derived)]
115116
#[serde(default)]
116-
pub tls: Option<TlsEnableableConfig>,
117+
pub tls: Option<TlsConfig>,
117118

118119
#[configurable(derived)]
119120
#[serde(
@@ -126,15 +127,30 @@ pub struct GrpcSinkConfig {
126127

127128
impl GrpcSinkConfig {
128129
pub async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
129-
let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
130-
let uri = with_default_scheme(&self.endpoint, tls.is_tls())?;
130+
// Determine TLS from the endpoint scheme; fall back to https when tls config is present.
131+
let uri = with_default_scheme(&self.endpoint, self.tls.is_some())?;
132+
let tls = if uri.scheme_str() == Some("https") {
133+
MaybeTlsSettings::tls_client(self.tls.as_ref())?
134+
} else {
135+
MaybeTlsSettings::Raw(())
136+
};
131137

138+
let use_gzip = match self.compression {
139+
Compression::None => false,
140+
Compression::Gzip(_) => true,
141+
other => {
142+
return Err(format!(
143+
"gRPC transport only supports 'none' or 'gzip' compression, got '{other}'"
144+
)
145+
.into())
146+
}
147+
};
132148
let client = new_grpc_client(&tls, cx.proxy())?;
133-
let service = OtlpGrpcService::new(client, uri, self.compression);
149+
let service = OtlpGrpcService::new(client, uri, use_gzip);
134150

135151
let healthcheck = Box::pin(async move { Ok(()) });
136152

137-
let request_settings = self.request.into_settings();
153+
let request_settings = self.request.tower.into_settings();
138154
let batch_settings = self.batch.into_batcher_settings()?;
139155

140156
let service = ServiceBuilder::new()
@@ -631,20 +647,20 @@ mod tests {
631647
)
632648
.unwrap();
633649
assert_eq!(config.endpoint, "http://localhost:4317");
634-
assert!(!config.compression);
650+
assert_eq!(config.compression, Compression::default());
635651
}
636652

637653
#[test]
638-
fn grpc_config_with_tls() {
654+
fn grpc_config_with_gzip() {
639655
let config: GrpcSinkConfig = toml::from_str(
640656
r#"
641657
endpoint = "https://otelcol.example.com:4317"
642-
compression = true
658+
compression = "gzip"
643659
"#,
644660
)
645661
.unwrap();
646662
assert_eq!(config.endpoint, "https://otelcol.example.com:4317");
647-
assert!(config.compression);
663+
assert!(matches!(config.compression, Compression::Gzip(_)));
648664
}
649665

650666
#[test]

0 commit comments

Comments
 (0)